HEX
Server: nginx/1.22.1
System: Linux VM-16-9-centos 3.10.0-1160.99.1.el7.x86_64 #1 SMP Wed Sep 13 14:19:20 UTC 2023 x86_64
User: www (1001)
PHP: 7.3.31
Disabled: passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
Upload Files
File: //lib64/mft/python_tools/mlxste/crawler.py
# Copyright (C) Nov 2020 Mellanox Technologies Ltd. All rights reserved.
#
# This software is available to you under a choice of one of two
# licenses.  You may choose to be licensed under the terms of the GNU
# General Public License (GPL) Version 2, available from the file
# COPYING in the main directory of this source tree, or the
# OpenIB.org BSD license below:
#
#     Redistribution and use in source and binary forms, with or
#     without modification, are permitted provided that the following
#     conditions are met:
#
#      - Redistributions of source code must retain the above
#        copyright notice, this list of conditions and the following
#        disclaimer.
#
#      - Redistributions in binary form must reproduce the above
#        copyright notice, this list of conditions and the following
#        disclaimer in the documentation and/or other materials
#        provided with the distribution.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --

import ctypes
from collections.abc import Iterator

from . import stelib
from .constants import ERROR_BUFFER_SIZE, STE_TERMINATOR, STATUS_SUCCESS
from .errors import CrawlerIteratorError, CrawlerSessionError
from .stetypes import ICMAddress

__all__ = ['STECrawler']


class STECrawler(Iterator):
    """A class that represents steering crawler.

    :param Provider provider: A provider to fetch STE raw data
    :param bytes buffer: The packet raw data
    :param Dict metadata: Additional metadata for analyzing packet
    :ivar int verbosity: The output verbosity level to use
    """
    def __init__(self, provider, buffer, metadata):
        self._provider = provider
        self._root_ste_ptr = stelib.create_icm_object_address()
        self._pkt_attr_ptr = stelib.create_packet_attribute(buffer, metadata)
        self._session_ptr = None
        self._active = False
        self.verbosity = metadata.get('verbosity', 0)

    def __del__(self):
        self.close()

    def __iter__(self):
        self._session_ptr = self._create_steering_crawler_session_pointer()
        self._active = True
        return self

    def __next__(self):
        if self._active is False:
            raise StopIteration
        desc_str = ctypes.pointer(ctypes.c_char_p())
        # fetch ICM buffer via the specified provider
        icm_buffer = self._provider.fetch(self.icm_address)
        # get the next item from the iterator
        rc = stelib.steering_crawler_iterator(self._session_ptr,
                                              icm_buffer,
                                              self._root_ste_ptr,
                                              desc_str)
        output = (desc_str.contents.value or b'').decode('utf-8')
        if rc not in [STATUS_SUCCESS, STE_TERMINATOR]:
            raise CrawlerIteratorError(output)
        elif rc == STE_TERMINATOR:
            self._active = False
        return output

    def _create_steering_crawler_session_pointer(self):
        """Creates a new pointer to steering crawler session.

        :return: A new pointer to steering crawler session.
        :rtype LP_steering_crawler
        """
        session_attr_ptr = stelib.create_ste_crawler_attribute(self.verbosity)
        err_buf = ctypes.create_string_buffer(ERROR_BUFFER_SIZE)
        session_ptr = stelib.steering_crawler_create_analyze_packet(
            session_attr_ptr, self._pkt_attr_ptr, self._root_ste_ptr, err_buf)
        if not session_ptr:
            raise CrawlerSessionError(err_buf.value.decode('utf-8').strip())
        return session_ptr

    def close(self):
        """Close the steering crawler session."""
        if self._session_ptr is not None:
            stelib.steering_crawler_destroy(self._session_ptr)
            self._session_ptr = None

    def crawl(self, output_file=None):
        """Parse and analyze the steering packet.

        :param io.TextIOWrapper output_file: redirect the output to file object
        """
        for output_string in self:
            if output_string:
                print(output_string, file=output_file)

    @property
    def icm_address(self):
        """Returns the current ICM address object.

        :return: The current ICM address object.
        :rtype: ICMAddress
        """
        seg_type = self._root_ste_ptr.contents.seg_type
        gvmi = self._root_ste_ptr.contents.gvmi
        index = self._root_ste_ptr.contents.index
        return ICMAddress(seg_type=seg_type, gvmi=gvmi, index=index)