File: //lib64/mft/python_tools/mlxste/crawler.py
# Copyright (C) Nov 2020 Mellanox Technologies Ltd. All rights reserved.
#
# This software is available to you under a choice of one of two
# licenses. You may choose to be licensed under the terms of the GNU
# General Public License (GPL) Version 2, available from the file
# COPYING in the main directory of this source tree, or the
# OpenIB.org BSD license below:
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# - Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# - Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --
import ctypes
from collections.abc import Iterator
from . import stelib
from .constants import ERROR_BUFFER_SIZE, STE_TERMINATOR, STATUS_SUCCESS
from .errors import CrawlerIteratorError, CrawlerSessionError
from .stetypes import ICMAddress
__all__ = ['STECrawler']
class STECrawler(Iterator):
"""A class that represents steering crawler.
:param Provider provider: A provider to fetch STE raw data
:param bytes buffer: The packet raw data
:param Dict metadata: Additional metadata for analyzing packet
:ivar int verbosity: The output verbosity level to use
"""
def __init__(self, provider, buffer, metadata):
self._provider = provider
self._root_ste_ptr = stelib.create_icm_object_address()
self._pkt_attr_ptr = stelib.create_packet_attribute(buffer, metadata)
self._session_ptr = None
self._active = False
self.verbosity = metadata.get('verbosity', 0)
def __del__(self):
self.close()
def __iter__(self):
self._session_ptr = self._create_steering_crawler_session_pointer()
self._active = True
return self
def __next__(self):
if self._active is False:
raise StopIteration
desc_str = ctypes.pointer(ctypes.c_char_p())
# fetch ICM buffer via the specified provider
icm_buffer = self._provider.fetch(self.icm_address)
# get the next item from the iterator
rc = stelib.steering_crawler_iterator(self._session_ptr,
icm_buffer,
self._root_ste_ptr,
desc_str)
output = (desc_str.contents.value or b'').decode('utf-8')
if rc not in [STATUS_SUCCESS, STE_TERMINATOR]:
raise CrawlerIteratorError(output)
elif rc == STE_TERMINATOR:
self._active = False
return output
def _create_steering_crawler_session_pointer(self):
"""Creates a new pointer to steering crawler session.
:return: A new pointer to steering crawler session.
:rtype LP_steering_crawler
"""
session_attr_ptr = stelib.create_ste_crawler_attribute(self.verbosity)
err_buf = ctypes.create_string_buffer(ERROR_BUFFER_SIZE)
session_ptr = stelib.steering_crawler_create_analyze_packet(
session_attr_ptr, self._pkt_attr_ptr, self._root_ste_ptr, err_buf)
if not session_ptr:
raise CrawlerSessionError(err_buf.value.decode('utf-8').strip())
return session_ptr
def close(self):
"""Close the steering crawler session."""
if self._session_ptr is not None:
stelib.steering_crawler_destroy(self._session_ptr)
self._session_ptr = None
def crawl(self, output_file=None):
"""Parse and analyze the steering packet.
:param io.TextIOWrapper output_file: redirect the output to file object
"""
for output_string in self:
if output_string:
print(output_string, file=output_file)
@property
def icm_address(self):
"""Returns the current ICM address object.
:return: The current ICM address object.
:rtype: ICMAddress
"""
seg_type = self._root_ste_ptr.contents.seg_type
gvmi = self._root_ste_ptr.contents.gvmi
index = self._root_ste_ptr.contents.index
return ICMAddress(seg_type=seg_type, gvmi=gvmi, index=index)