启动kafka broker
- 先下载从官网下载kafka console包,
wget "https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz" -O kafka_2.12-0.11.0.1.tgz
tar -xvzf kafka_2.12-0.11.0.1.tgz
cd kafka_2.12-0.11.0.1
- 启动zookeeper
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > /tmp/zk.log 2>&1 &
- 启动kafka broker
nohup ./bin/kafka-server-[start.sh](start.sh) config/server.properties > /tmp/kafka.log 2>&1 &
- 创建kafka topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic connect-test --partitions 1 --replication-factor 1
./bin/kafka-topics.sh --list --zookeeper localhost:2181
管理POM依赖
需要添加connect-api、connect-runtime、connect-file、connect-json依赖包。由于connect-runtime中的jetty-util包的版本过旧,所以新引入jetty-util包。
pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ict.add</groupId>
<artifactId>connect-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-file</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.8.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
启动kafka connector
package com.ict.add;
import org.apache.kafka.connect.cli.ConnectStandalone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
public class ConnectTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectTest.class);
@Test
public void testConnectStandalone() {
String workerConfigFile = "connect-standalone.properties";
String sourceConnectFile = "connect-file-source.properties";
String sinkConnectFile = "connect-file-sink.properties";
String[] args = {workerConfigFile, sourceConnectFile, sinkConnectFile};
try {
ConnectStandalone.main(args);
} catch (Exception e) {
LOGGER.error("error ", e);
}
}
}
testConnectStandalone单元测试中提供了3个properties配置文件;
- connect-standalone.properties 是kafka-connect的Worker使用的配置。
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
rest.port=18083
plugin.path=
- connect-file-source.properties是kafka-connect的Connector的配置,这个是sourceConnector的配置,sourceConnector负责监听source.file.log中的改动,并将改动写入kafka topic。
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=source.file.log
topic=connect-test
- connect-file-sink.properties是kafka-connect的Connector的配置,这个是sinkConnector的配置,sinkConnector负责消费kafka topic的内容,并写入sink.file.log文件中。
name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=sink.file.log
topics=connect-test
- log4j.properties是log4j的配置文件,放在test/resources/路径下。
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, stdout
# stdout appender is set to be a ConsoleAppender.
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1} %x - %m%n
log4j.logger.org.reflections=ERROR
运行testConnectStandalone单元测试,就会启动2个connector,其中一个FileStreamSourceConnector和一个FileStreamSinkConnector。
touch source.file.log sink.file.log
echo 'hello' >>source.file.log
echo 'world' >>source.file.log
date >>source.file.log
cat sink.file.log
看到sink.file.log文件内容和刚刚写入source.file.log的内容一样。
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
启动一个kafka consumer,消费到的数据就是刚刚写入source.file.log的内容。
网友评论