相关资源:https://pypi.org/project/pycti/#files
https://pypi.org/project/pika/
opencti中使用pika进行数据的消费。
0、opencti中使用pika进行数据的消费
(1)初始化
def __init__(self, connector, api):
threading.Thread.__init__(self)
self.api = api
self.queue_name = connector["config"]["push"]
self.pika_credentials = pika.PlainCredentials(
connector["config"]["connection"]["user"],
connector["config"]["connection"]["pass"],
)
self.pika_parameters = pika.ConnectionParameters(
connector["config"]["connection"]["host"],
connector["config"]["connection"]["port"],
"/",
self.pika_credentials,
)
self.pika_connection = pika.BlockingConnection(self.pika_parameters)
self.channel =self.pika_connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.processing_count =0
(2)读取消息
basic_consume函数参见:https://www.jianshu.com/p/df231c152754
def run(self):
# 程序的主入口
try:
# Consume the queue
logging.info("Thread for queue " +self.queue_name +" started")
self.channel.basic_consume(
queue=self.queue_name, on_message_callback=self._process_message
)
self.channel.start_consuming()
finally:
self.channel.stop_consuming()
logging.info("Thread for queue " +self.queue_name +" terminated")
(3)处理消息的回调函数
def _process_message(self, channel, method, properties, body):
data = json.loads(body)
logging.info(
"Processing a new message (delivery_tag="
+str(method.delivery_tag)
+"), launching a thread..."
)
thread = threading.Thread(
target=self.data_handler,
args=[self.pika_connection, channel, method.delivery_tag, data],
)
thread.start()
while thread.is_alive():# Loop while the thread is processing
self.pika_connection.sleep(0.05)
logging.info("Message processed, thread terminated")
(4)数据处理函数
# Data handling 数据处理
def data_handler(self, connection, channel, delivery_tag, data):
# Set the API headers
applicant_id = data["applicant_id"]
self.api.set_applicant_id_header(applicant_id)
work_id = data["work_id"]if "work_id" in dataelse None
# Execute the import
self.processing_count +=1
content ="Unparseable"
try:
content = base64.b64decode(data["content"]).decode("utf-8")
types = (
data["entities_types"]
if "entities_types" in data and len(data["entities_types"]) >0
else None
)
update = data["update"]if "update" in dataelse False
processing_count =self.processing_count
if self.processing_count == PROCESSING_COUNT:
processing_count =None
# 将数据插入到elasticsearch中?
self.api.stix2.import_bundle_from_json(
content, update, types, processing_count
)
# Ack the message
cb = functools.partial(self.ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
if work_idis not None:
self.api.work.report_expectation(work_id, None)
self.processing_count =0
return True
except Timeoutas te:
logging.warn("A connection timeout occurred: { " +str(te) +" }")
# Platform is under heavy load, wait for unlock & retry almost indefinitely
sleep_jitter =round(random.uniform(10, 30), 2)
time.sleep(sleep_jitter)
self.data_handler(connection, channel, delivery_tag, data)
return True
except RequestExceptionas re:
logging.error("A connection error occurred: { " +str(re) +" }")
time.sleep(60)
logging.info(
"Message (delivery_tag=" +str(delivery_tag) +") NOT acknowledged"
)
cb = functools.partial(self.nack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
self.processing_count =0
return False
except Exception as ex:
error =str(ex)
if "LockError" in errorand self.processing_count < MAX_PROCESSING_COUNT:
# Platform is under heavy load, wait for unlock & retry almost indefinitely
sleep_jitter =round(random.uniform(10, 30), 2)
time.sleep(sleep_jitter)
self.data_handler(connection, channel, delivery_tag, data)
elif (
"MissingReferenceError" in error
and self.processing_count < PROCESSING_COUNT
):
# In case of missing reference, wait & retry
sleep_jitter =round(random.uniform(1, 3), 2)
time.sleep(sleep_jitter)
logging.info(
"Message (delivery_tag="
+str(delivery_tag)
+") reprocess (retry nb: "
+str(self.processing_count)
+")"
)
self.data_handler(connection, channel, delivery_tag, data)
else:
# Platform does not know what to do and raises an error, fail and acknowledge the message
logging.error(str(ex))
self.processing_count =0
cb = functools.partial(self.ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
if work_idis not None:
self.api.work.report_expectation(
work_id, {"error":str(ex), "source": content}
)
return False
from pycti.utils.opencti_stix2import OpenCTIStix2
# indictor的create函数
def create(self, **kwargs):
"""
Create an Indicator object
:paramstr name: the name of the Indicator
:paramstr pattern: stix indicator pattern
:paramstr x_opencti_main_observable_type: type of the observable
:return: Indicator object
:rtype: Indicator
"""
stix_id = kwargs.get("stix_id", None)
created_by = kwargs.get("createdBy", None)
object_marking = kwargs.get("objectMarking", None)
object_label = kwargs.get("objectLabel", None)
external_references = kwargs.get("externalReferences", None)
revoked = kwargs.get("revoked", None)
confidence = kwargs.get("confidence", None)
lang = kwargs.get("lang", None)
created = kwargs.get("created", None)
modified = kwargs.get("modified", None)
pattern_type = kwargs.get("pattern_type", None)
pattern_version = kwargs.get("pattern_version", None)
pattern = kwargs.get("pattern", None)
name = kwargs.get("name", None)
description = kwargs.get("description", None)
indicator_types = kwargs.get("indicator_types", None)
valid_from = kwargs.get("valid_from", None)
valid_until = kwargs.get("valid_until", None)
x_opencti_score = kwargs.get("x_opencti_score", 50)
x_opencti_detection = kwargs.get("x_opencti_detection", False)
x_opencti_main_observable_type = kwargs.get(
"x_opencti_main_observable_type", None
)
kill_chain_phases = kwargs.get("killChainPhases", None)
update = kwargs.get("update", False)
if (
nameis not None
and patternis not None
and x_opencti_main_observable_typeis not None
):
if x_opencti_main_observable_type =="File":
x_opencti_main_observable_type ="StixFile"
self.opencti.log("info", "Creating Indicator {" + name +"}.")
query ="""
mutation IndicatorAdd($input: IndicatorAddInput) {
indicatorAdd(input: $input) {
id
standard_id
entity_type
parent_types
observables {
edges {
node {
id
standard_id
entity_type
}
}
}
}
}
"""
if pattern_typeis None:
pattern_type ="stix2"
result =self.opencti.query(
query,
{
"input": {
"stix_id": stix_id,
"createdBy": created_by,
"objectMarking": object_marking,
"objectLabel": object_label,
"externalReferences": external_references,
"revoked": revoked,
"confidence": confidence,
"lang": lang,
"created": created,
"modified": modified,
"pattern_type": pattern_type,
"pattern_version": pattern_version,
"pattern": pattern,
"name": name,
"description": description,
"indicator_types": indicator_types,
"valid_until": valid_until,
"valid_from": valid_from,
"x_opencti_score": x_opencti_score,
"x_opencti_detection": x_opencti_detection,
"x_opencti_main_observable_type": x_opencti_main_observable_type,
"killChainPhases": kill_chain_phases,
"update": update,
}
},
)
return self.opencti.process_multiple_fields(result["data"]["indicatorAdd"])
else:
self.opencti.log(
"error",
"[opencti_indicator] Missing parameters: name or pattern or x_opencti_main_observable_type",
)
总结函数调用关系:
work.py中:
self.api.stix2.import_bundle_from_json(
content, update, types, processing_count
)
stix2是OpenCTIStix2的对象:
import_bundle_from_json-》
import_bundle-》
import_observable-》
opencti.stix_cyber_observable.create-》
result =self.opencti.query()//Graphql的API
一、介绍
Pika是AMQP 0-9-1协议的纯python实现,包括RabbitMQ的扩展。
(1)支持Python 2.7和3.4+。
(2)由于线程并不适用于所有情况,所以它不需要线程。Pika core也小心翼翼地不去禁止它们。同样的道理也适用于greenlets, callbacks, continuations, 和generators。然而,Pika的内置connection 适配器实例不是线程安全的。
(3)人们可以使用直接套接字、普通的旧select(),或者任何从Python应用程序获取网络事件的方法。Pika试图与所有这些保持兼容,并使其适应新环境尽可能简单。
二、Pika提供以下适配器
pika.adapters.asyncio_connection。AsyncioConnection -用于Python 3异步I/O循环的异步适配器。
pika.BlockingConnection -库之上的同步适配器,使用简单。
pika.SelectConnection—没有第三方依赖关系的异步适配器。
pika.adapters.gevent_connection.GeventConnection—异步适配器,用于Gevent的I/O循环。
pika.adapters.tornado_connection.TornadoConnection -异步适配器,用于Tornado的I/O循环。
pika.adapters.twisted_connection.TwistedProtocolConnection -用于Twisted的I/O循环的异步适配器。
网友评论