kafka官网:http://kafka.apache.org/quickstart
安装环境:centos 7
问题导读:
1、如何搭建kafka集群
2、怎样操作kafka命令
3、使用代码实现生产者和消费者
更多kafka基础概念【kafka-基础】kafka基础概念及应用
环境准备
(参考hadoop集群搭建的第一步【安装环境准备】https://www.jianshu.com/p/71b4f456726a)
下载安装
下载地址:kafka dist >>
Step1: 解压安装
wget http://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz -C /opt/
ln -s /opt/kafka_2.11-2.1.0/ /opt/apps/kafka
Step2: 修改配置
mkdir -p /data/kafka/kafka-logs
vi /opt/apps/kafka/config/server.properties
设置节点编号,不同节点依次使用[0,1,2,3...]不同的整数 image.png 修改kafka数据存储位置 image.png 修改zookeeper节点 image.png
Step3: 分发/同步
将kafka分发到其他节点并修改kafka节点编号broker.id
scp -r /opt/kafka_2.11-2.1.0/ root@hdc-data5:/opt/
scp -r /opt/kafka_2.11-2.1.0/ root@hdc-data6:/opt/
给同步的节点创建软连接【若实际安装中不使用软连接映射则无需执行】
ln -s /opt/kafka_2.11-2.1.0/ /opt/apps/kafka
修改kafka节点编号broker.id
hdc-data5节点 image.png hdc-data6节点 image.png
Step4: 启动zookeeper
(zookeeper安装参考hadoop集群搭建的第二步【安装zookeeper】https://www.jianshu.com/p/71b4f456726a)
若zookeeper未启动则需启动它
zkServer.sh start
zkServer.sh status
Step5: 启动Kafka
-
不使用编写脚本的话,需要到每台kafka节点启动:
bin/kafka-server-start.sh config/server.properties
-
编写kafka启动脚本,并修改执行权限。方便集群启动:
bin/start-kafka.sh
vi /opt/apps/kafka/bin/start-kafka.sh
chmod 755 /opt/apps/kafka/bin/start-kafka.sh
#!/bin/bash
BROKERS="hdc-data4 hdc-data5 hdc-data6"
KAFKA_HOME="/opt/apps/kafka"
for i in $BROKERS
do
echo "Starting kafka on ${i},logging out to ${KAFKA_HOME}/logs/kafka-${i}-server-start.log"
ssh ${i} "source /etc/profile;mkdir -p ${KAFKA_HOME}/logs;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > ${KAFKA_HOME}/logs/kafka-${i}-server-start.log 2>&1 &"
if [[ $? -ne 0 ]]; then
echo "Starting kafka on ${i} is ok"
fi
done
echo All kafka are started
exit 0
【可选】编写kafka关闭脚本,并修改执行权限。
vi /opt/apps/kafka/bin/stop-kafka.sh
chmod 755 /opt/apps/kafka/bin/stop-kafka.sh
#!/bin/bash
BROKERS="hdc-data4 hdc-data5 hdc-data6"
KAFKA_HOME="/opt/apps/kafka"
for i in $BROKERS
do
echo "Stopping kafka on ${i}"
ssh ${i} "source /etc/profile;bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"
if [[ $? -ne 0 ]]; then
echo "Stopping kafka on ${i} is down"
fi
done
echo All kafka are stopped
exit 0
相关命令使用
- 创建topic
bin/kafka-topics.sh --create --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181 --replication-factor 3 --partitions 3 --topic test
- 查看topic列表
bin/kafka-topics.sh --list --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181
- 生产者生产消息(使用一台节点启动生产者命令)
bin/kafka-console-producer.sh --broker-list hdc-data4:9092,hdc-data5:9092,hdc-data6:9092 --topic test
image.png
- 消费者消费消息(使用一台节点启动消费者命令)
bin/kafka-console-consumer.sh --bootstrap-server hdc-data4:9092,hdc-data5:9092,hdc-data6:9092 --topic test --from-beginning
image.png
- 查看topic描述
bin/kafka-topics.sh --describe --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181
image.png
- 通过zookeeper查看topic信息
启动zookeeper客户端
./bin/zkCli.sh
查看topic相关信息:
ls /brokers/topics/
查看消费者相关信息:
ls /consumers
image.png
- 删除topic
删除topic需要delete.topic.enable=true。从1.0.0版本开始,删除topic配置默认是开启的(可在config/server.properties 中添加delete.topic.enable=flase/true控制)
step1: 通过kafka命令删除目标topic(目标topic会被标记为删除状态,过几秒后自动删除)
bin/kafka-topics.sh --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181 --delete --topic test
image.png
image.png
(若是旧版本(0.x.x)需要手动在每台节点上删除目标topic的真实数据,真实数据目录未log.dir配置目录)
step2: 进入zookeeper客户端删除topic信息
/opt/apps/zookeeper/bin/zkCli.sh
rmr /brokers/topics/test
(若是旧版本(0.x.x)还需在zookeeper客户端执行
rmr /admin/delete_topics/test
删除被标记为删除的topic信息)
代码开发
生产者(Producer)
Scala实现
package com.hdc.kafka.demo
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
/**
* kafka回调类
* @param startTime
* @param message
*/
class DemoCallback(startTime : Long, message : String) extends Callback{
override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
val elapsedTime = System.currentTimeMillis() - startTime
if (metadata != null) {
System.out.println(
"message => (" + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms")
} else {
e.printStackTrace()
}
}
}
/**
* kafka生产者
*/
object KafkaProducer {
//kafka节点
def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
def TOPIC = "test"
def isAsync = true
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer")
val producer = new KafkaProducer[Int, String](props)
try {
//模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
while (true) {
val cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
val arr = Array("000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7")
for (meter_id <- arr) {
val msg = "{\"active_quan\":" + ((new util.Random).nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}"
val startTime = System.currentTimeMillis()
if (isAsync) {
// Send asynchronously
producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg));
} else {
// Send synchronously
producer.send(new ProducerRecord(TOPIC,msg)).get()
System.out.println("Sent message: (" + msg + ")")
}
}
Thread.sleep(5000)
}
} catch {
case ex: Exception => {
println(ex)
}
} finally {
producer.close
}
}
}
java实现
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
* kafka生产者
*
* @author liangriyu
* @version 1.0
* @create 2018-12-08 0:52
*/
public class JavaKafkaProducer {
public final static String BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092";
public final static String TOPIC = "test";
public final static boolean isAsync = true;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
KafkaProducer<Integer, String> producer = new KafkaProducer(props);
try {
//模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
while (true) {
String cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
String[] arr = {"000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7"};
for (String meter_id : arr) {
String msg = "{\"active_quan\":" + (new Random().nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}";
long startTime = System.currentTimeMillis();
if (isAsync) {
// Send asynchronously
producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg));
} else {
// Send synchronously
producer.send(new ProducerRecord(TOPIC,msg)).get();
System.out.println("Sent message: (" + msg + ")");
}
}
Thread.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
/**
* kafka回调类
*/
class DemoCallback implements Callback {
private final long startTime;
private final String message;
public DemoCallback(long startTime, String message) {
this.startTime = startTime;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception e) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message => (" + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
e.printStackTrace();
}
}
}
消费者
java实现
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class JavaKafkaConsumer extends ShutdownableThread {
public final static String BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092";
public final static String TOPIC = "test";
private final KafkaConsumer<Integer, String> consumer;
public JavaKafkaConsumer() {
super("KafkaConsumerDemo", false);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer(props);
}
public void doWork() {
consumer.subscribe(Collections.singletonList(TOPIC));
//Duration jdk8,File ->Project Structure->Project Settings -> Modules -> 你的Module名字 -> Sources -> Language Level-> 8-Lam....
//File->setting->java compiler->1.8
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
@Override
public String name() {
return null;
}
@Override
public boolean isInterruptible() {
return false;
}
public static void main(String[] args) {
JavaKafkaConsumer consumerThread = new JavaKafkaConsumer();
consumerThread.start();
}
}
maven
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hdc</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Kafka Quickstart Job</name>
<url>http://www.myorganization.org</url>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.hdc.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Eclipse Scala Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
网友评论