分布式模式下kafka-connect启动运行:
启动命令:bin/connect-distributed.sh conf/connect-distributed.properties
connect-distributed.sh脚本分析,如下图:
第一步:加载运行类ConnectDistributed.java
public class ConnectDistributed {
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
private final Time time = Time.SYSTEM;
private final long initStart = time.hiResClockMs();
public static void main(String[] args) {
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: ConnectDistributed worker.properties");
Exit.exit(1);
}
try {
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
ConnectDistributed connectDistributed = new ConnectDistributed();
Connect connect = connectDistributed.startConnect(workerProps);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) {
log.error("Stopping due to error", t);
Exit.exit(2);
}
}
执行其中start方法
public Connect startConnect(Map<String, String> workerProps) {
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer);
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest); //创建kafka connect 实例,Kafka Connect instance created
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}
return connect;
}
connect.start();启动后就启动了DistributedHerder线程:herder.start();
Connect.java中start方法如下
public void start() {
try {
log.info("Kafka Connect starting");
Exit.addShutdownHook("connect-shutdown-hook", shutdownHook);
herder.start();
rest.initializeResources(herder);
log.info("Kafka Connect started");
} finally {
startLatch.countDown();
}
}
DistributedHerder.java (继承runnable-Runnable接口实现多线程)
DistributedHerder.java类中如下三个对象比较重要:
private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
private ExtendedAssignment assignment;
ExtendedAssignment.java类:The extended assignment of connectors and tasks that includes revoked connectors and tasks as well as a scheduled rebalancing delay.
DistributedHerder.java线程运行的run()方法
@Override
public void run() {
try {
log.info("Herder starting");
startServices();
log.info("Herder started");
while (!stopping.get()) {
tick();
}
halt();
log.info("Herder stopped");
herderMetrics.close();
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Exit.exit(1);
}
}
其中startServices()用来启动worker
protected void startServices() {
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
}
tick()中handleRebalanceCompleted方法,主要执行方法为startWork();
private boolean handleRebalanceCompleted() {
......
startWork();
......
}
具体实现如下:
private void startWork() {
// Start assigned connectors and tasks
log.info("Starting connectors and tasks using config offset {}", assignment.offset());
List<Callable<Void>> callables = new ArrayList<>();
for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
callables.add(getConnectorStartingCallable(connectorName));
}
// These tasks have been stopped by this worker due to task reconfiguration. In order to
// restart them, they are removed just before the overall task startup from the set of
// currently running tasks. Therefore, they'll be restarted only if they are included in
// the assignment that was just received after rebalancing.
runningAssignment.tasks().removeAll(tasksToRestart);
tasksToRestart.clear();
for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
callables.add(getTaskStartingCallable(taskId));
}
startAndStop(callables);
runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
? ExtendedAssignment.empty()
: assignment;
log.info("Finished starting connectors and tasks");
}
其中startWork()方法中有两个主要线程
①getConnectorStartingCallable线程用来启动connector:
private Callable<Void> getConnectorStartingCallable(final String connectorName) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
startConnector(connectorName);
} catch (Throwable t) {
log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
"configuration. This connector will not execute until reconfigured.", t);
onFailure(connectorName, t);
}
return null;
}
};
}
protected ClusterConfigState configState; //Kafka Connect集群中连接器和任务的配置状态的不变快照。
private boolean startConnector(String connectorName) {
log.info("Starting connector {}", connectorName);
final Map<String, String> configProps = configState.connectorConfig(connectorName);
final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
final TargetState initialState = configState.targetState(connectorName);
boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
// just restoring an existing connector.
if (started && initialState == TargetState.STARTED)
reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
return started;
}
Worker.java中执行startConnector方法
//Start a connector managed by this worker.
public boolean startConnector(
String connName,
Map<String, String> connProps,
ConnectorContext ctx,
ConnectorStatus.Listener statusListener,
TargetState initialState
) {
try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
if (connectors.containsKey(connName))
throw new ConnectException("Connector with name " + connName + " already exists");
final WorkerConnector workerConnector;
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig); //Connecter初始化
workerConnector.transitionTo(initialState); //启动connector
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
workerMetricsGroup.recordConnectorStartupFailure();
statusListener.onFailure(connName, t);
return false;
}
WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
if (existing != null)
throw new ConnectException("Connector with name " + connName + " already exists");
log.info("Finished creating connector {}", connName);
workerMetricsGroup.recordConnectorStartupSuccess();
}
return true;
}
workerConnector.transitionTo(initialState); //启动connector
启动connector部分:WorkerConnector.java类中的方法,执行start方法
public void transitionTo(TargetState targetState) {
if (state == State.FAILED) {
log.warn("{} Cannot transition connector to {} since it has failed", this, targetState);
return;
}
log.debug("{} Transition connector to {}", this, targetState);
if (targetState == TargetState.PAUSED) {
pause();
} else if (targetState == TargetState.STARTED) {
if (state == State.INIT)
start(); //调用start方法
else
resume();
} else {
throw new IllegalArgumentException("Unhandled target state " + targetState);
}
}
其中TargetState为连接器的目标状态
public enum TargetState {
STARTED,
PAUSED,
}
连接器的目标状态是用户通过与rest api交互的所指的期望状态,当连接器第一次创建的时候,目标状态是“STARTED”,这并不意味着它实际上已经启动,只是Connect框架将在分配任务后尝试启动它,连接器暂停后,目标状态将更改为PAUSED(暂停),所有任务都将停止工作,目标状态保存在配置主题topic中,该主题由组中的所有工作进程读取。当一个工作进程看到一个正在运行的连接器的新目标状态时,它将把它拥有的任何任务(即领导者分配给它的任务)转换到所需的目标状态。完成任务重新平衡后,工作进程将在最后一个已知目标状态下启动任务
private boolean doStart() {
try {
switch (state) {
case STARTED:
return false;
case INIT:
case STOPPED:
connector.start(config);
this.state = State.STARTED;
return true;
default:
throw new IllegalArgumentException("Cannot start connector in state " + state);
}
} catch (Throwable t) {
log.error("{} Error while starting connector", this, t);
onFailure(t);
return false;
}
}
State为状态枚举
private enum State {
INIT, // initial state before startup
STOPPED, // the connector has been stopped/paused.
STARTED, // the connector has been started/resumed. 启动/已恢复
FAILED, // the connector has failed (no further transitions are possible after this state)
}
②getTaskStartingCallable线程用来启动连接器task任务
private boolean startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
return worker.startTask(
taskId,
configState,
configState.connectorConfig(taskId.connector()),
configState.taskConfig(taskId),
this,
configState.targetState(taskId.connector())
);
}
startTask方法如下:
private boolean startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
return worker.startTask(
taskId,
configState,
configState.connectorConfig(taskId.connector()),
configState.taskConfig(taskId),
this,
configState.targetState(taskId.connector())
);
}
Worker.java中执行startTask方法
public boolean startTask(
ConnectorTaskId id,
ClusterConfigState configState,
Map<String, String> connProps,
Map<String, String> taskProps,
TaskStatus.Listener statusListener,
TargetState initialState
) {
final WorkerTask workerTask;
try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
log.info("Creating task {}", id);
if (tasks.containsKey(id))
throw new ConnectException("Task already exists in this worker: " + id);
connectorStatusMetricsGroup.recordTaskAdded(id);
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
// By maintaining connector's specific class loader for this thread here, we first
// search for converters within the connector dependencies.
// If any of these aren't found, that means the connector didn't configure specific converters,
// so we should instantiate based upon the worker configuration
Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.CURRENT_CLASSLOADER);
Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (headerConverter == null) {
headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.PLUGINS);
log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
} else {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
}
workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, connectorLoader); //先build task 在执行 workerTask.initialize(taskConfig); 最后executor.submit(workerTask);
workerTask.initialize(taskConfig);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start task {}", id, t);
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
connectorStatusMetricsGroup.recordTaskRemoved(id);
workerMetricsGroup.recordTaskFailure();
statusListener.onFailure(id, t);
return false;
}
WorkerTask existing = tasks.putIfAbsent(id, workerTask);
if (existing != null)
throw new ConnectException("Task already exists in this worker: " + id);
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
}
workerMetricsGroup.recordTaskSuccess();
return true;
}
}
//先build task 在执行 workerTask.initialize(taskConfig); 最后executor.submit(workerTask);
执行buildWorkerTask
private WorkerTask buildWorkerTask(ClusterConfigState configState,
ConnectorConfig connConfig,
ConnectorTaskId id,
Task task,
TaskStatus.Listener statusListener,
TargetState initialState,
Converter keyConverter,
Converter valueConverter,
HeaderConverter headerConverter,
ClassLoader loader) {
ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
final Class<? extends Connector> connectorClass = plugins.connectorClass(
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
retryWithToleranceOperator.metrics(errorHandlingMetrics);
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
// Note we pass the configState as it performs dynamic transformations under the covers
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
time, retryWithToleranceOperator, herder.statusBackingStore());
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, consumer, loader, time,
retryWithToleranceOperator, herder.statusBackingStore());
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
}
}
此方法返回WorkerSinkTask,最后执行提交task
WorkerSinkTask.java运行类是一个线程,最终运行execute(),里面包含两个方法
@Override
public void execute() {
initializeAndStart();
try {
while (!isStopping())
iteration();
} finally {
// Make sure any uncommitted data has been committed and the task has
// a chance to clean up its state
closePartitions();
}
}
第一步:先初始化sinkTask,再运行iteration()方法;(继承sinktask具体实现类是HdfsSinkTask.java或者其他组件自定义的task)
/**
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
SinkConnectorConfig.validate(taskConfig);
if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
String[] topics = taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
Arrays.setAll(topics, i -> topics[i].trim());
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
log.debug("{} Initializing and starting task for topics {}", this, topics);
} else {
String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
Pattern pattern = Pattern.compile(topicsRegexStr);
consumer.subscribe(pattern, new HandleRebalance());
log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
}
task.initialize(context);
task.start(taskConfig);
log.info("{} Sink task finished initialization and start", this);
}
第二步再执行iteration()方法;
protected void iteration() {
final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
try {
long now = time.milliseconds();
// Maybe commit
if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
commitOffsets(now, false);
nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
// Check for timed out commits
if (committing && now >= commitTimeoutMs) {
log.warn("{} Commit of offsets timed out", this);
commitFailures++;
committing = false;
}
// And process messages
long timeoutMs = Math.max(nextCommit - now, 0);
poll(timeoutMs);
} catch (WakeupException we) {
log.trace("{} Consumer woken up", this);
if (isStopping())
return;
if (shouldPause()) {
pauseAll();
onPause();
context.requestCommit();
} else if (!pausedForRedelivery) {
resumeAll();
onResume();
}
}
}
其中poll方法如下:
/**
* Poll for new messages with the given timeout. Should only be invoked by the worker thread.
*/
protected void poll(long timeoutMs) {
rewind();
long retryTimeout = context.timeout();
if (retryTimeout > 0) {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} Polling returned {} messages", this, msgs.count());
convertMessages(msgs);
deliverMessages();
}
private void deliverMessages() {
// Finally, deliver this batch to the sink
try {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
long start = time.milliseconds();
task.put(new ArrayList<>(messageBatch));
recordBatch(messageBatch.size());
sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
currentOffsets.putAll(origOffsets);
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
// the task had not explicitly paused
if (pausedForRedelivery) {
if (!shouldPause())
resumeAll();
pausedForRedelivery = false;
}
} catch (RetriableException e) {
log.error("{} RetriableException from SinkTask:", this, e);
// If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
// but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
pausedForRedelivery = true;
pauseAll();
// Let this exit normally, the batch will be reprocessed on the next loop.
} catch (Throwable t) {
log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
+ "recover until manually restarted. Error: {}", this, t.getMessage(), t);
throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
}
}
task.put(new ArrayList<>(messageBatch));具体实现方法为hdfssinktask中的put方法
网友评论