HEX
Server: nginx/1.22.1
System: Linux VM-16-9-centos 3.10.0-1160.99.1.el7.x86_64 #1 SMP Wed Sep 13 14:19:20 UTC 2023 x86_64
User: www (1001)
PHP: 7.3.31
Disabled: passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
Upload Files
File: //lib64/mft/python_tools/mlxste/stedump.py
# Copyright (C) Nov 2020 Mellanox Technologies Ltd. All rights reserved.
#
# This software is available to you under a choice of one of two
# licenses.  You may choose to be licensed under the terms of the GNU
# General Public License (GPL) Version 2, available from the file
# COPYING in the main directory of this source tree, or the
# OpenIB.org BSD license below:
#
#     Redistribution and use in source and binary forms, with or
#     without modification, are permitted provided that the following
#     conditions are met:
#
#      - Redistributions of source code must retain the above
#        copyright notice, this list of conditions and the following
#        disclaimer.
#
#      - Redistributions in binary form must reproduce the above
#        copyright notice, this list of conditions and the following
#        disclaimer in the documentation and/or other materials
#        provided with the distribution.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --

from __future__ import print_function
import argparse
import os
import sys

if sys.version_info < (3, 4):
    print('Error: This tool supports Python 3.4 and up only.')
    sys.exit(1)
elif sys.platform != 'linux':
    print('Error: This tool supported on Linux system only.')
    sys.exit(1)

import tools_version # noqa

from mlxste import api
from mlxste.argtypes import MSTDeviceAction, UnsignedIntegerAction, \
    WritableFileType
from mlxste.constants import PacketType, PacketSource,  PacketAttributeFlags, \
    ProviderType, QPFlags, RunningMode, STATUS_SUCCESS, STATUS_FAIL
from mlxste.errors import RequiredArgumentError
from mlxste.parser import STEParser
from mlxste.providers.provider import Provider


class SteeringDump:
    """A class that represent a steering debugging application.

    :cvar str name: The name of the CLI application
    :cvar str description: A brief description of what the application does
    :ivar Dict metadata: Additional information for analyzing
    """
    name = 'stedump'
    description = 'Debugging tool for host NIC steering solutions.'

    format_cls = argparse.RawTextHelpFormatter
    packet_format_help_options = '''
    hex - represents one or more packets in hexadecimal format
    pcap - represents one or more packets in packet capture format
    raw - represents a single packet in binary data format'''

    def __init__(self):
        self.parser = argparse.ArgumentParser(prog=self.name,
                                              description=self.description,
                                              formatter_class=self.format_cls)
        self.metadata = {}

    @classmethod
    def create_provider(cls, namespace):
        """Creates a specified provider object for the steering parser.

        :param Namespace namespace: An object holding parsed CLI arguments
        :return: A new provider instance.
        :rtype: Provider
        """
        return api.create_provider(ProviderType.RESOURCEDUMP.value,
                                   device=namespace.device)

    @classmethod
    def get_qp_flags(cls, namespace):
        """Calculate and return QP flags based on CLI arguments.

        :param Namespace namespace: An object holding parsed CLI arguments
        :return: QP flags
        :rtype: int
        """
        flags = 0
        try:
            if namespace.force_loopback:
                flags |= QPFlags.FORCE_LOOPBACK.value
            if namespace.special_root:
                flags |= QPFlags.SPECIAL_ROOT.value
        except AttributeError:
            pass
        return flags

    @classmethod
    def get_packet_attribute_flags(cls, namespace):
        """Calculate and return packet attribute flags based on CLI arguments.

        :param Namespace namespace: An object holding parsed CLI arguments
        :return: packet attribute flags
        :rtype: int
        """
        flags = 0
        try:
            if namespace.packet_source == PacketSource.INGRESS.value:
                flags |= PacketAttributeFlags.INGRESS.value
            elif namespace.packet_source == PacketSource.EGRESS.value:
                flags |= PacketAttributeFlags.EGRESS.value
                if namespace.reg_a_value:
                    flags |= PacketAttributeFlags.REG_A.value
        except AttributeError:
            pass
        return flags

    @classmethod
    def load_packet_source_metadata(cls, namespace):
        """Loads and return packet source metadata from a namespace object.

        :param Namespace namespace: An object holding parsed CLI arguments
        :return: Packet source metadata.
        :rtype: dict
        """
        metadata = {}
        try:
            metadata['phy_port'] = namespace.phy_port
            if namespace.packet_source == PacketSource.EGRESS.value:
                metadata['phy_port'] = namespace.phy_port
                metadata['qp_flags'] = cls.get_qp_flags(namespace)
                metadata['reg_a_value'] = namespace.reg_a_value or 0
                metadata['sqn'] = namespace.sqn or 0
                metadata['virtual_hca_id'] = namespace.virtual_hca_id or 0
        except AttributeError:
            pass
        return metadata

    @classmethod
    def load_metadata(cls, namespace):
        """Loads and return metadata from a namespace object.

        :param Namespace namespace: An object holding parsed CLI arguments
        :return: Metadata.
        :rtype: dict
        """
        metadata = dict(verbosity=namespace.verbosity)
        if hasattr(namespace, 'packet_source'):
            metadata['flags'] = cls.get_packet_attribute_flags(namespace)
            packet_source_metadata = cls.load_packet_source_metadata(namespace)
            metadata.update(packet_source_metadata)
        return metadata

    @staticmethod
    def config_metadata_cli_args(parser):
        """Configure command-line arguments describing metadata attributes for

        the given parser.

        :param ArgumentParser parser: The argument parser to be configured
        """
        # subparsers describes packet source (ingress or egress)
        subparsers = parser.add_subparsers(title='packet source',
                                           dest='packet_source',
                                           help='specifies the packet source')

        # specific command-line arguments for the egress parser
        egress_parser = subparsers.add_parser(PacketSource.EGRESS.value)
        # optional arguments (default group)
        egress_parser.add_argument('--reg-a',
                                   dest='reg_a_value',
                                   action=UnsignedIntegerAction,
                                   metavar='<reg_a_value>',
                                   default=0,
                                   help='specifies steering register A value '
                                        '(default: %(default)s)')
        egress_parser.add_argument('--virtual-hca-id',
                                   action=UnsignedIntegerAction,
                                   metavar='<virtual_hca_id>',
                                   default=0,
                                   help='specifies the source virtual HCA ID '
                                        '(default: %(default)s)')
        egress_parser.add_argument('--sqn',
                                   action=UnsignedIntegerAction,
                                   metavar='<sqn>',
                                   default=0,
                                   help='specifies the send queue context '
                                        'number (default: %(default)s)')
        egress_parser.add_argument('--force-loopback',
                                   action='store_true',
                                   help='specifies whether to use QP force '
                                        'loopback')
        egress_parser.add_argument('--special-root',
                                   action='store_true',
                                   help='specifies whether to use QP special '
                                        'root')

        # specific command-line arguments for the ingress parser
        ingress_parser = subparsers.add_parser(PacketSource.INGRESS.value)

    def config_cli_args(self):
        """Configure command-line arguments for this application."""

        # required command-line arguments (custom group)
        parser_group_req = self.parser.add_argument_group('required arguments')
        parser_group_req.add_argument('--read-file',
                                      required=True,
                                      metavar='<read_file>',
                                      help='specifies the packet(s) filename '
                                           'to read from')
        # optional command-line arguments
        self.parser.add_argument('--packet-format',
                                 metavar='<packet_format>',
                                 choices=[
                                     PacketType.HEX.value,
                                     PacketType.RAW.value,
                                     PacketType.PCAP.value
                                 ],
                                 default=PacketType.HEX.value,
                                 help='specifies the I/O packet file format '
                                      '(default: %(default)s)\n'
                                      'possible values are:' +
                                      self.packet_format_help_options)
        self.parser.add_argument('--output-file',
                                 dest='out_file',
                                 type=WritableFileType(),
                                 metavar='<output_file>',
                                 help='redirect the output to specific file '
                                      '(default: stdout)')
        self.parser.add_argument('--verbosity',
                                 default=0,
                                 type=int,
                                 choices=[0, 1, 2],
                                 metavar='<verbosity>',
                                 help='increase output verbosity (default: '
                                      '%(default)s)\npossible values are: '
                                      '(%(choices)s)')
        self.parser.add_argument('-v', '--version',
                                 action='version',
                                 version=self.version,
                                 help='show version information and exit')

        # subparsers describes running modes
        subparsers = self.parser.add_subparsers(title='running modes',
                                                dest='mode',
                                                help='specifies the running '
                                                     'mode to use')

        # specific command-line arguments for live parser
        live_parser = subparsers.add_parser(RunningMode.LIVE.value,
                                            help='run in live mode')
        # required arguments (custom group)
        live_group_req = live_parser.add_argument_group('required arguments')
        live_group_req.add_argument('-d', '--device',
                                    required=True,
                                    action=MSTDeviceAction,
                                    metavar='<device>',
                                    help='perform operation for a specified '
                                         'MST device')
        live_group_req.add_argument('--port',
                                    required=True,
                                    dest='phy_port',
                                    action=UnsignedIntegerAction,
                                    metavar='<physical_port>',
                                    help='specifies the physical port number')

        # add metadata command-line arguments to live mode parser
        self.config_metadata_cli_args(live_parser)

        # specific command-line arguments for offline parser
        offline_parser = subparsers.add_parser(RunningMode.OFFLINE.value,
                                               help='run in offline mode')

    def parse_cli_args(self, args=None):
        namespace = self.parser.parse_args(args)
        # mock add_subparsers required argument behavior (Python 3.7+)
        try:
            if namespace.mode is None:
                raise RequiredArgumentError('mode', prefix=self.name)
            elif namespace.mode == RunningMode.LIVE.value:
                packet_source = getattr(namespace, 'packet_source', None)
                if packet_source is None:
                    prefix = '{}: {}'.format(self.name, RunningMode.LIVE.value)
                    raise RequiredArgumentError('packet_source', prefix=prefix)
            else:
                err = '{} mode is not supported'.format(namespace.mode)
                self.parser.exit(2, err + os.linesep)
        except RequiredArgumentError as err:
            self.parser.print_usage()
            self.parser.exit(2, str(err) + os.linesep)
        return namespace

    def setup(self, namespace):
        """Prepare the application's configuration. This is called immediately

        before calling the run method.

        :param Namespace namespace: An object holding parsed CLI arguments
        """
        # loads metadata from namespace attributes
        self.metadata = self.load_metadata(namespace)

    def run(self, namespace):
        """Run the application.

        :param Namespace namespace: An object holding parsed CLI arguments
        """
        # create a new steering parser with a specified provider
        provider = self.create_provider(namespace)
        steering_parser = STEParser(provider, namespace.out_file, self.metadata)
        # loads a list of packets from an input file
        packets = api.load_packets(namespace.packet_format, namespace.read_file)
        # parsing, filtering and analyzing the input packets
        steering_parser.parse(packets)

    def teardown(self, namespace):
        """Clean the application's resources. This is called immediately after

        the run method has been called.

        :param Namespace namespace: An object holding parsed CLI arguments
        """
        # close the output file object
        if namespace.out_file:
            namespace.out_file.close()

    def execute(self, args=None):
        """Execute the application.

        :param list[str] args: command-line arguments for this application
        :return: 0 in case of success and 1 in case of failure.
        :rtype: int
        """
        # configure command-line arguments
        self.config_cli_args()
        # parse command-line arguments
        namespace = self.parse_cli_args(args)
        try:
            # prepare application's configuration
            self.setup(namespace)
            # run the application
            self.run(namespace)
        except Exception as err:
            print('-E-', err, file=(namespace.out_file or sys.stderr))
            rc = STATUS_FAIL
        else:
            rc = STATUS_SUCCESS
        finally:
            # clean application's resources
            self.teardown(namespace)
        return rc

    @property
    def version(self):
        """Returns the application version."""
        return tools_version.GetVersionString(self.name, None)


def main(args=None):
    """A main function to running steering analyzer application.

    :param list[str] args: command-line arguments for this application
    """
    app = SteeringDump()
    rc = app.execute(args)
    sys.exit(rc)


if __name__ == '__main__':
    main()