File: //lib64/mft/python_tools/mlxste/stedump.py
# Copyright (C) Nov 2020 Mellanox Technologies Ltd. All rights reserved.
#
# This software is available to you under a choice of one of two
# licenses. You may choose to be licensed under the terms of the GNU
# General Public License (GPL) Version 2, available from the file
# COPYING in the main directory of this source tree, or the
# OpenIB.org BSD license below:
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# - Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# - Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --
from __future__ import print_function
import argparse
import os
import sys
if sys.version_info < (3, 4):
print('Error: This tool supports Python 3.4 and up only.')
sys.exit(1)
elif sys.platform != 'linux':
print('Error: This tool supported on Linux system only.')
sys.exit(1)
import tools_version # noqa
from mlxste import api
from mlxste.argtypes import MSTDeviceAction, UnsignedIntegerAction, \
WritableFileType
from mlxste.constants import PacketType, PacketSource, PacketAttributeFlags, \
ProviderType, QPFlags, RunningMode, STATUS_SUCCESS, STATUS_FAIL
from mlxste.errors import RequiredArgumentError
from mlxste.parser import STEParser
from mlxste.providers.provider import Provider
class SteeringDump:
"""A class that represent a steering debugging application.
:cvar str name: The name of the CLI application
:cvar str description: A brief description of what the application does
:ivar Dict metadata: Additional information for analyzing
"""
name = 'stedump'
description = 'Debugging tool for host NIC steering solutions.'
format_cls = argparse.RawTextHelpFormatter
packet_format_help_options = '''
hex - represents one or more packets in hexadecimal format
pcap - represents one or more packets in packet capture format
raw - represents a single packet in binary data format'''
def __init__(self):
self.parser = argparse.ArgumentParser(prog=self.name,
description=self.description,
formatter_class=self.format_cls)
self.metadata = {}
@classmethod
def create_provider(cls, namespace):
"""Creates a specified provider object for the steering parser.
:param Namespace namespace: An object holding parsed CLI arguments
:return: A new provider instance.
:rtype: Provider
"""
return api.create_provider(ProviderType.RESOURCEDUMP.value,
device=namespace.device)
@classmethod
def get_qp_flags(cls, namespace):
"""Calculate and return QP flags based on CLI arguments.
:param Namespace namespace: An object holding parsed CLI arguments
:return: QP flags
:rtype: int
"""
flags = 0
try:
if namespace.force_loopback:
flags |= QPFlags.FORCE_LOOPBACK.value
if namespace.special_root:
flags |= QPFlags.SPECIAL_ROOT.value
except AttributeError:
pass
return flags
@classmethod
def get_packet_attribute_flags(cls, namespace):
"""Calculate and return packet attribute flags based on CLI arguments.
:param Namespace namespace: An object holding parsed CLI arguments
:return: packet attribute flags
:rtype: int
"""
flags = 0
try:
if namespace.packet_source == PacketSource.INGRESS.value:
flags |= PacketAttributeFlags.INGRESS.value
elif namespace.packet_source == PacketSource.EGRESS.value:
flags |= PacketAttributeFlags.EGRESS.value
if namespace.reg_a_value:
flags |= PacketAttributeFlags.REG_A.value
except AttributeError:
pass
return flags
@classmethod
def load_packet_source_metadata(cls, namespace):
"""Loads and return packet source metadata from a namespace object.
:param Namespace namespace: An object holding parsed CLI arguments
:return: Packet source metadata.
:rtype: dict
"""
metadata = {}
try:
metadata['phy_port'] = namespace.phy_port
if namespace.packet_source == PacketSource.EGRESS.value:
metadata['phy_port'] = namespace.phy_port
metadata['qp_flags'] = cls.get_qp_flags(namespace)
metadata['reg_a_value'] = namespace.reg_a_value or 0
metadata['sqn'] = namespace.sqn or 0
metadata['virtual_hca_id'] = namespace.virtual_hca_id or 0
except AttributeError:
pass
return metadata
@classmethod
def load_metadata(cls, namespace):
"""Loads and return metadata from a namespace object.
:param Namespace namespace: An object holding parsed CLI arguments
:return: Metadata.
:rtype: dict
"""
metadata = dict(verbosity=namespace.verbosity)
if hasattr(namespace, 'packet_source'):
metadata['flags'] = cls.get_packet_attribute_flags(namespace)
packet_source_metadata = cls.load_packet_source_metadata(namespace)
metadata.update(packet_source_metadata)
return metadata
@staticmethod
def config_metadata_cli_args(parser):
"""Configure command-line arguments describing metadata attributes for
the given parser.
:param ArgumentParser parser: The argument parser to be configured
"""
# subparsers describes packet source (ingress or egress)
subparsers = parser.add_subparsers(title='packet source',
dest='packet_source',
help='specifies the packet source')
# specific command-line arguments for the egress parser
egress_parser = subparsers.add_parser(PacketSource.EGRESS.value)
# optional arguments (default group)
egress_parser.add_argument('--reg-a',
dest='reg_a_value',
action=UnsignedIntegerAction,
metavar='<reg_a_value>',
default=0,
help='specifies steering register A value '
'(default: %(default)s)')
egress_parser.add_argument('--virtual-hca-id',
action=UnsignedIntegerAction,
metavar='<virtual_hca_id>',
default=0,
help='specifies the source virtual HCA ID '
'(default: %(default)s)')
egress_parser.add_argument('--sqn',
action=UnsignedIntegerAction,
metavar='<sqn>',
default=0,
help='specifies the send queue context '
'number (default: %(default)s)')
egress_parser.add_argument('--force-loopback',
action='store_true',
help='specifies whether to use QP force '
'loopback')
egress_parser.add_argument('--special-root',
action='store_true',
help='specifies whether to use QP special '
'root')
# specific command-line arguments for the ingress parser
ingress_parser = subparsers.add_parser(PacketSource.INGRESS.value)
def config_cli_args(self):
"""Configure command-line arguments for this application."""
# required command-line arguments (custom group)
parser_group_req = self.parser.add_argument_group('required arguments')
parser_group_req.add_argument('--read-file',
required=True,
metavar='<read_file>',
help='specifies the packet(s) filename '
'to read from')
# optional command-line arguments
self.parser.add_argument('--packet-format',
metavar='<packet_format>',
choices=[
PacketType.HEX.value,
PacketType.RAW.value,
PacketType.PCAP.value
],
default=PacketType.HEX.value,
help='specifies the I/O packet file format '
'(default: %(default)s)\n'
'possible values are:' +
self.packet_format_help_options)
self.parser.add_argument('--output-file',
dest='out_file',
type=WritableFileType(),
metavar='<output_file>',
help='redirect the output to specific file '
'(default: stdout)')
self.parser.add_argument('--verbosity',
default=0,
type=int,
choices=[0, 1, 2],
metavar='<verbosity>',
help='increase output verbosity (default: '
'%(default)s)\npossible values are: '
'(%(choices)s)')
self.parser.add_argument('-v', '--version',
action='version',
version=self.version,
help='show version information and exit')
# subparsers describes running modes
subparsers = self.parser.add_subparsers(title='running modes',
dest='mode',
help='specifies the running '
'mode to use')
# specific command-line arguments for live parser
live_parser = subparsers.add_parser(RunningMode.LIVE.value,
help='run in live mode')
# required arguments (custom group)
live_group_req = live_parser.add_argument_group('required arguments')
live_group_req.add_argument('-d', '--device',
required=True,
action=MSTDeviceAction,
metavar='<device>',
help='perform operation for a specified '
'MST device')
live_group_req.add_argument('--port',
required=True,
dest='phy_port',
action=UnsignedIntegerAction,
metavar='<physical_port>',
help='specifies the physical port number')
# add metadata command-line arguments to live mode parser
self.config_metadata_cli_args(live_parser)
# specific command-line arguments for offline parser
offline_parser = subparsers.add_parser(RunningMode.OFFLINE.value,
help='run in offline mode')
def parse_cli_args(self, args=None):
namespace = self.parser.parse_args(args)
# mock add_subparsers required argument behavior (Python 3.7+)
try:
if namespace.mode is None:
raise RequiredArgumentError('mode', prefix=self.name)
elif namespace.mode == RunningMode.LIVE.value:
packet_source = getattr(namespace, 'packet_source', None)
if packet_source is None:
prefix = '{}: {}'.format(self.name, RunningMode.LIVE.value)
raise RequiredArgumentError('packet_source', prefix=prefix)
else:
err = '{} mode is not supported'.format(namespace.mode)
self.parser.exit(2, err + os.linesep)
except RequiredArgumentError as err:
self.parser.print_usage()
self.parser.exit(2, str(err) + os.linesep)
return namespace
def setup(self, namespace):
"""Prepare the application's configuration. This is called immediately
before calling the run method.
:param Namespace namespace: An object holding parsed CLI arguments
"""
# loads metadata from namespace attributes
self.metadata = self.load_metadata(namespace)
def run(self, namespace):
"""Run the application.
:param Namespace namespace: An object holding parsed CLI arguments
"""
# create a new steering parser with a specified provider
provider = self.create_provider(namespace)
steering_parser = STEParser(provider, namespace.out_file, self.metadata)
# loads a list of packets from an input file
packets = api.load_packets(namespace.packet_format, namespace.read_file)
# parsing, filtering and analyzing the input packets
steering_parser.parse(packets)
def teardown(self, namespace):
"""Clean the application's resources. This is called immediately after
the run method has been called.
:param Namespace namespace: An object holding parsed CLI arguments
"""
# close the output file object
if namespace.out_file:
namespace.out_file.close()
def execute(self, args=None):
"""Execute the application.
:param list[str] args: command-line arguments for this application
:return: 0 in case of success and 1 in case of failure.
:rtype: int
"""
# configure command-line arguments
self.config_cli_args()
# parse command-line arguments
namespace = self.parse_cli_args(args)
try:
# prepare application's configuration
self.setup(namespace)
# run the application
self.run(namespace)
except Exception as err:
print('-E-', err, file=(namespace.out_file or sys.stderr))
rc = STATUS_FAIL
else:
rc = STATUS_SUCCESS
finally:
# clean application's resources
self.teardown(namespace)
return rc
@property
def version(self):
"""Returns the application version."""
return tools_version.GetVersionString(self.name, None)
def main(args=None):
"""A main function to running steering analyzer application.
:param list[str] args: command-line arguments for this application
"""
app = SteeringDump()
rc = app.execute(args)
sys.exit(rc)
if __name__ == '__main__':
main()