一.Overview
多线程pipeline是支持并行执行的origin的pipeline,支持一个pipeline在多个线程中运行。
多线程pipeline可以充分利用数据收集器所在机器的所有可用cpu。再使用多线程pipeline时,确保为pipeline和数据收集器分配足够资源。
多线程pipeline遵从交付保证,但不保证处理数据批次的顺序。
二.如何工作的?
在配置多线程pipeline时,指定origin使用多少个线程来生成批量数据。还可以配置数据收集器中用于执行管道处理的pipeline runner的最大数量。
pipeline是一个无源管道实例——它包括管道中的所有processor和destination,表示origin之后的所有管道处理。
origin基于所使用的源系统执行多线程,但是对于所有支持多线程管道的源,有以下的共性:
启动pipeline时,origin根据配置的多线程属性创建多个线程。数据采集器根据pipeline Max runner属性创建多个pipeline runner来执行处理。每个线程连接到源系统并创建一批数据,并将批数据传递给可用的pipeline runner。
每个pipeline runner一次处理一批,就像在单个线程上运行的管道一样。当数据流变慢时,pipeline runner空闲地等待,直到需要它们为止,并定期生成一个空批处理。可以配置Runner空闲时间属性,指定间隔。
多线程pipeline在每个批中保存记录的顺序,就像单线程管道一样。但是由于批是由不同的pipeline实例处理的,所以不能保证所有批被写入目的地的顺序。
例如,以下面的多线程pipeline为例。HTTP Server origin处理来自HTTP客户机的HTTP POST和PUT请求。在配置源时,指定要使用的线程数——在本例中是最大并发请求属性:
在最大并发请求设置为5的情况下,启动管道时,origin创建5个线程,数据收集器创建5个管道运行器。在接收到数据后,原点将批传递给每个管道运行程序进行处理。从概念上讲,多线程管道是这样的:
image.png
每个pipeline runner执行与管道其余部分相关联的处理。在将批写入管道目的地(在本例中是Azure Data Lake Store 1和2)之后,pipeline runner可以用于另一批数据。每个批的处理写入都尽可能快,独立于其他pipeline runner处理的批,因此批的写入顺序可能与读顺序不同。
三.支持多线程pipeline的origin
1.Amazon SQS Consumer
2.Azure IoT/Event Hub Consumer
3.CoAP Server
4.Directory
5.Elasticsearch
6.Google Pub/Sub Subscriber
7.Hadoop FS Standalone
8.HTTP Server
9.JDBC Multitable Consumer:通过JDBC连接从多个表读取数据库数据。
10.Kafka Multitopic Consumer
11.Kinesis Consumer
12.MapR DB CDC
13.MapR FS Standalone
14.MapR Multitopic Stream Consumer
15.REST Service
16.SQL Server CDC Client
17.SQL Server Change Tracking
18.TCP Server
19.UDP Multithreaded Source
20.WebSocket Server
21.Dev Data Generator
网友评论