美文网首页
opencti中使用的pika简介-worker代码的剖析

opencti中使用的pika简介-worker代码的剖析

作者: Threathunter | 来源:发表于2021-04-08 17:28 被阅读0次

    相关资源:https://pypi.org/project/pycti/#files

    https://pypi.org/project/pika/

    opencti中使用pika进行数据的消费。

    0、opencti中使用pika进行数据的消费

    (1)初始化

    def __init__(self, connector, api):

    threading.Thread.__init__(self)

    self.api = api

    self.queue_name = connector["config"]["push"]

    self.pika_credentials = pika.PlainCredentials(

    connector["config"]["connection"]["user"],

            connector["config"]["connection"]["pass"],

        )

    self.pika_parameters = pika.ConnectionParameters(

    connector["config"]["connection"]["host"],

            connector["config"]["connection"]["port"],

            "/",

            self.pika_credentials,

        )

    self.pika_connection = pika.BlockingConnection(self.pika_parameters)

    self.channel =self.pika_connection.channel()

    self.channel.basic_qos(prefetch_count=1)

    self.processing_count =0

    (2)读取消息

    basic_consume函数参见:https://www.jianshu.com/p/df231c152754

    def run(self):

    # 程序的主入口

        try:

    # Consume the queue

            logging.info("Thread for queue " +self.queue_name +" started")

    self.channel.basic_consume(

    queue=self.queue_name, on_message_callback=self._process_message

    )

    self.channel.start_consuming()

    finally:

    self.channel.stop_consuming()

    logging.info("Thread for queue " +self.queue_name +" terminated")

    (3)处理消息的回调函数

    def _process_message(self, channel, method, properties, body):

    data = json.loads(body)

    logging.info(

    "Processing a new message (delivery_tag="

            +str(method.delivery_tag)

    +"), launching a thread..."

        )

    thread = threading.Thread(

    target=self.data_handler,

            args=[self.pika_connection, channel, method.delivery_tag, data],

        )

    thread.start()

    while thread.is_alive():# Loop while the thread is processing

            self.pika_connection.sleep(0.05)

    logging.info("Message processed, thread terminated")

    (4)数据处理函数

    # Data handling 数据处理

    def data_handler(self, connection, channel, delivery_tag, data):

    # Set the API headers

        applicant_id = data["applicant_id"]

    self.api.set_applicant_id_header(applicant_id)

    work_id = data["work_id"]if "work_id" in dataelse None

        # Execute the import

        self.processing_count +=1

        content ="Unparseable"

        try:

    content = base64.b64decode(data["content"]).decode("utf-8")

    types = (

    data["entities_types"]

    if "entities_types" in data and len(data["entities_types"]) >0

                else None

            )

    update = data["update"]if "update" in dataelse False

            processing_count =self.processing_count

    if self.processing_count == PROCESSING_COUNT:

    processing_count =None

    # 将数据插入到elasticsearch中?

            self.api.stix2.import_bundle_from_json(

    content, update, types, processing_count

    )

    # Ack the message

            cb = functools.partial(self.ack_message, channel, delivery_tag)

    connection.add_callback_threadsafe(cb)

    if work_idis not None:

    self.api.work.report_expectation(work_id, None)

    self.processing_count =0

            return True

        except Timeoutas te:

    logging.warn("A connection timeout occurred: { " +str(te) +" }")

    # Platform is under heavy load, wait for unlock & retry almost indefinitely

            sleep_jitter =round(random.uniform(10, 30), 2)

    time.sleep(sleep_jitter)

    self.data_handler(connection, channel, delivery_tag, data)

    return True

        except RequestExceptionas re:

    logging.error("A connection error occurred: { " +str(re) +" }")

    time.sleep(60)

    logging.info(

    "Message (delivery_tag=" +str(delivery_tag) +") NOT acknowledged"

            )

    cb = functools.partial(self.nack_message, channel, delivery_tag)

    connection.add_callback_threadsafe(cb)

    self.processing_count =0

            return False

        except Exception as ex:

    error =str(ex)

    if "LockError" in errorand self.processing_count < MAX_PROCESSING_COUNT:

    # Platform is under heavy load, wait for unlock & retry almost indefinitely

                sleep_jitter =round(random.uniform(10, 30), 2)

    time.sleep(sleep_jitter)

    self.data_handler(connection, channel, delivery_tag, data)

    elif (

    "MissingReferenceError" in error

    and self.processing_count < PROCESSING_COUNT

    ):

    # In case of missing reference, wait & retry

                sleep_jitter =round(random.uniform(1, 3), 2)

    time.sleep(sleep_jitter)

    logging.info(

    "Message (delivery_tag="

                    +str(delivery_tag)

    +") reprocess (retry nb: "

                    +str(self.processing_count)

    +")"

                )

    self.data_handler(connection, channel, delivery_tag, data)

    else:

    # Platform does not know what to do and raises an error, fail and acknowledge the message

                logging.error(str(ex))

    self.processing_count =0

                cb = functools.partial(self.ack_message, channel, delivery_tag)

    connection.add_callback_threadsafe(cb)

    if work_idis not None:

    self.api.work.report_expectation(

    work_id, {"error":str(ex), "source": content}

    )

    return False

    from pycti.utils.opencti_stix2import OpenCTIStix2

    # indictor的create函数

    def create(self, **kwargs):

    """

    Create an Indicator object

        :paramstr name: the name of the Indicator

        :paramstr pattern: stix indicator pattern

        :paramstr x_opencti_main_observable_type: type of the observable

        :return: Indicator object

        :rtype: Indicator

    """

        stix_id = kwargs.get("stix_id", None)

    created_by = kwargs.get("createdBy", None)

    object_marking = kwargs.get("objectMarking", None)

    object_label = kwargs.get("objectLabel", None)

    external_references = kwargs.get("externalReferences", None)

    revoked = kwargs.get("revoked", None)

    confidence = kwargs.get("confidence", None)

    lang = kwargs.get("lang", None)

    created = kwargs.get("created", None)

    modified = kwargs.get("modified", None)

    pattern_type = kwargs.get("pattern_type", None)

    pattern_version = kwargs.get("pattern_version", None)

    pattern = kwargs.get("pattern", None)

    name = kwargs.get("name", None)

    description = kwargs.get("description", None)

    indicator_types = kwargs.get("indicator_types", None)

    valid_from = kwargs.get("valid_from", None)

    valid_until = kwargs.get("valid_until", None)

    x_opencti_score = kwargs.get("x_opencti_score", 50)

    x_opencti_detection = kwargs.get("x_opencti_detection", False)

    x_opencti_main_observable_type = kwargs.get(

    "x_opencti_main_observable_type", None

        )

    kill_chain_phases = kwargs.get("killChainPhases", None)

    update = kwargs.get("update", False)

    if (

    nameis not None

            and patternis not None

            and x_opencti_main_observable_typeis not None

        ):

    if x_opencti_main_observable_type =="File":

    x_opencti_main_observable_type ="StixFile"

            self.opencti.log("info", "Creating Indicator {" + name +"}.")

    query ="""

    mutation IndicatorAdd($input: IndicatorAddInput) {

    indicatorAdd(input: $input) {

    id

    standard_id

    entity_type

    parent_types

    observables {

    edges {

    node {

    id

    standard_id

    entity_type

    }

    }

    }

    }

    }

    """

            if pattern_typeis None:

    pattern_type ="stix2"

            result =self.opencti.query(

    query,

                {

    "input": {

    "stix_id": stix_id,

                        "createdBy": created_by,

                        "objectMarking": object_marking,

                        "objectLabel": object_label,

                        "externalReferences": external_references,

                        "revoked": revoked,

                        "confidence": confidence,

                        "lang": lang,

                        "created": created,

                        "modified": modified,

                        "pattern_type": pattern_type,

                        "pattern_version": pattern_version,

                        "pattern": pattern,

                        "name": name,

                        "description": description,

                        "indicator_types": indicator_types,

                        "valid_until": valid_until,

                        "valid_from": valid_from,

                        "x_opencti_score": x_opencti_score,

                        "x_opencti_detection": x_opencti_detection,

                        "x_opencti_main_observable_type": x_opencti_main_observable_type,

                        "killChainPhases": kill_chain_phases,

                        "update": update,

                    }

    },

            )

    return self.opencti.process_multiple_fields(result["data"]["indicatorAdd"])

    else:

    self.opencti.log(

    "error",

                "[opencti_indicator] Missing parameters: name or pattern or x_opencti_main_observable_type",

            )

    总结函数调用关系:

    work.py中:

    self.api.stix2.import_bundle_from_json(

    content, update, types, processing_count

    )

    stix2是OpenCTIStix2的对象:

    import_bundle_from_json-》

    import_bundle-》

    import_observable-》

    opencti.stix_cyber_observable.create-》

    result =self.opencti.query()//Graphql的API

    一、介绍

    Pika是AMQP 0-9-1协议的纯python实现,包括RabbitMQ的扩展。

    (1)支持Python 2.7和3.4+。

    (2)由于线程并不适用于所有情况,所以它不需要线程。Pika core也小心翼翼地不去禁止它们。同样的道理也适用于greenlets, callbacks, continuations, 和generators。然而,Pika的内置connection 适配器实例不是线程安全的。

    (3)人们可以使用直接套接字、普通的旧select(),或者任何从Python应用程序获取网络事件的方法。Pika试图与所有这些保持兼容,并使其适应新环境尽可能简单。

    二、Pika提供以下适配器

    pika.adapters.asyncio_connection。AsyncioConnection -用于Python 3异步I/O循环的异步适配器。

    pika.BlockingConnection -库之上的同步适配器,使用简单。

    pika.SelectConnection—没有第三方依赖关系的异步适配器。

    pika.adapters.gevent_connection.GeventConnection—异步适配器,用于Gevent的I/O循环。

    pika.adapters.tornado_connection.TornadoConnection -异步适配器,用于Tornado的I/O循环。

    pika.adapters.twisted_connection.TwistedProtocolConnection -用于Twisted的I/O循环的异步适配器。

    相关文章

      网友评论

          本文标题:opencti中使用的pika简介-worker代码的剖析

          本文链接:https://www.haomeiwen.com/subject/mwfvkltx.html