美文网首页
【kafka-部署】集群搭建&快速开始

【kafka-部署】集群搭建&快速开始

作者: 粮忆雨 | 来源:发表于2018-12-04 20:44 被阅读0次

kafka官网:http://kafka.apache.org/quickstart                    
安装环境:centos 7

问题导读:
1、如何搭建kafka集群
2、怎样操作kafka命令
3、使用代码实现生产者和消费者

更多kafka基础概念【kafka-基础】kafka基础概念及应用

环境准备

(参考hadoop集群搭建的第一步【安装环境准备】https://www.jianshu.com/p/71b4f456726a

下载安装

下载地址:kafka dist >>

Step1: 解压安装

wget http://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz -C /opt/
ln -s /opt/kafka_2.11-2.1.0/ /opt/apps/kafka

Step2: 修改配置

mkdir -p /data/kafka/kafka-logs
vi /opt/apps/kafka/config/server.properties
设置节点编号,不同节点依次使用[0,1,2,3...]不同的整数 image.png 修改kafka数据存储位置 image.png 修改zookeeper节点 image.png

Step3: 分发/同步
将kafka分发到其他节点并修改kafka节点编号broker.id

scp -r /opt/kafka_2.11-2.1.0/ root@hdc-data5:/opt/
scp -r /opt/kafka_2.11-2.1.0/ root@hdc-data6:/opt/

给同步的节点创建软连接【若实际安装中不使用软连接映射则无需执行】

ln -s /opt/kafka_2.11-2.1.0/ /opt/apps/kafka

修改kafka节点编号broker.id

hdc-data5节点 image.png hdc-data6节点 image.png

Step4: 启动zookeeper
(zookeeper安装参考hadoop集群搭建的第二步【安装zookeeper】https://www.jianshu.com/p/71b4f456726a
若zookeeper未启动则需启动它

zkServer.sh start
zkServer.sh status

Step5: 启动Kafka

  • 不使用编写脚本的话,需要到每台kafka节点启动:bin/kafka-server-start.sh config/server.properties

  • 编写kafka启动脚本,并修改执行权限。方便集群启动:bin/start-kafka.sh

vi /opt/apps/kafka/bin/start-kafka.sh
chmod 755 /opt/apps/kafka/bin/start-kafka.sh

#!/bin/bash
BROKERS="hdc-data4 hdc-data5 hdc-data6"
KAFKA_HOME="/opt/apps/kafka"

for i in $BROKERS
do
    echo "Starting kafka on ${i},logging out to ${KAFKA_HOME}/logs/kafka-${i}-server-start.log"
    ssh ${i} "source /etc/profile;mkdir -p ${KAFKA_HOME}/logs;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > ${KAFKA_HOME}/logs/kafka-${i}-server-start.log 2>&1 &"
    if [[ $? -ne 0 ]]; then
        echo "Starting kafka on ${i} is  ok"
    fi
done
echo All kafka are started
exit 0

【可选】编写kafka关闭脚本,并修改执行权限。
vi /opt/apps/kafka/bin/stop-kafka.sh
chmod 755 /opt/apps/kafka/bin/stop-kafka.sh

#!/bin/bash
BROKERS="hdc-data4 hdc-data5 hdc-data6"
KAFKA_HOME="/opt/apps/kafka"

for i in $BROKERS
do
    echo "Stopping kafka on ${i}"
    ssh ${i} "source /etc/profile;bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"
    if [[ $? -ne 0 ]]; then
        echo "Stopping kafka on ${i} is down"
    fi
done
echo All kafka are stopped
exit 0

相关命令使用

  • 创建topic
bin/kafka-topics.sh --create --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181 --replication-factor 3 --partitions 3 --topic test
  • 查看topic列表
bin/kafka-topics.sh --list --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181
  • 生产者生产消息(使用一台节点启动生产者命令)
bin/kafka-console-producer.sh --broker-list hdc-data4:9092,hdc-data5:9092,hdc-data6:9092 --topic test
image.png
  • 消费者消费消息(使用一台节点启动消费者命令)
bin/kafka-console-consumer.sh --bootstrap-server hdc-data4:9092,hdc-data5:9092,hdc-data6:9092 --topic test --from-beginning
image.png
  • 查看topic描述
bin/kafka-topics.sh --describe --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181
image.png
  • 通过zookeeper查看topic信息
启动zookeeper客户端
./bin/zkCli.sh 
查看topic相关信息:
ls /brokers/topics/
查看消费者相关信息:
ls /consumers
image.png
  • 删除topic

删除topic需要delete.topic.enable=true。从1.0.0版本开始,删除topic配置默认是开启的(可在config/server.properties 中添加delete.topic.enable=flase/true控制)

step1: 通过kafka命令删除目标topic(目标topic会被标记为删除状态,过几秒后自动删除)

bin/kafka-topics.sh --zookeeper hdc-data4:2181,hdc-data5:2181,hdc-data6:2181 --delete --topic test
image.png
image.png

(若是旧版本(0.x.x)需要手动在每台节点上删除目标topic的真实数据,真实数据目录未log.dir配置目录)

step2: 进入zookeeper客户端删除topic信息

/opt/apps/zookeeper/bin/zkCli.sh 

rmr /brokers/topics/test

(若是旧版本(0.x.x)还需在zookeeper客户端执行rmr /admin/delete_topics/test删除被标记为删除的topic信息)

代码开发

生产者(Producer)
Scala实现

package com.hdc.kafka.demo

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

/**
  * kafka回调类
  * @param startTime
  * @param message
  */
class DemoCallback(startTime : Long, message : String) extends Callback{

    override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
        val elapsedTime = System.currentTimeMillis() - startTime
        if (metadata != null) {
            System.out.println(
                "message => (" + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms")
        } else {
            e.printStackTrace()
        }
    }
}

/**
  * kafka生产者
  */
object KafkaProducer {
    //kafka节点
    def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"

    def TOPIC = "test"

    def isAsync = true

    def main(args: Array[String]): Unit = {

         val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer")
        val producer = new KafkaProducer[Int, String](props)
        try {
            //模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
            while (true) {
                val cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
                val arr = Array("000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7")
                for (meter_id <- arr) {
                    val msg = "{\"active_quan\":" + ((new util.Random).nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}"
                    val startTime = System.currentTimeMillis()
                    if (isAsync) {
                        // Send asynchronously
                        producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg));
                    } else {
                        // Send synchronously
                        producer.send(new ProducerRecord(TOPIC,msg)).get()
                        System.out.println("Sent message: (" + msg + ")")
                    }
                }
                Thread.sleep(5000)
            }
        } catch {
            case ex: Exception => {
                println(ex)
            }
        } finally {
            producer.close
        }
    }
}

java实现

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * kafka生产者
 *
 * @author liangriyu
 * @version 1.0
 * @create 2018-12-08 0:52
 */
public class JavaKafkaProducer {

    public final static String BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092";
    public final static String TOPIC = "test";
    public final static boolean isAsync = true;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");

        KafkaProducer<Integer, String> producer = new KafkaProducer(props);
        try {
            //模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
            while (true) {
                String cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                String[] arr = {"000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7"};
                for (String meter_id : arr) {
                    String msg = "{\"active_quan\":" + (new Random().nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}";
                    long startTime = System.currentTimeMillis();
                    if (isAsync) {
                        // Send asynchronously
                        producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg));
                    } else {
                        // Send synchronously
                        producer.send(new ProducerRecord(TOPIC,msg)).get();
                        System.out.println("Sent message: (" + msg + ")");
                    }
                }
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

/**
 * kafka回调类
 */
class DemoCallback implements Callback {

    private final long startTime;
    private final String message;

    public DemoCallback(long startTime, String message) {
        this.startTime = startTime;
        this.message = message;
    }

    public void onCompletion(RecordMetadata metadata, Exception e) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message => (" + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            e.printStackTrace();
        }
    }
}

消费者

java实现

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class JavaKafkaConsumer extends ShutdownableThread {

    public final static String BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092";
    public final static String TOPIC = "test";
    private final KafkaConsumer<Integer, String> consumer;

    public JavaKafkaConsumer() {
        super("KafkaConsumerDemo", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        consumer = new KafkaConsumer(props);
    }

    public void doWork() {
        consumer.subscribe(Collections.singletonList(TOPIC));
        //Duration jdk8,File ->Project Structure->Project Settings -> Modules -> 你的Module名字 -> Sources -> Language Level-> 8-Lam....
        //File->setting->java compiler->1.8
        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
        }
    }

    @Override
    public String name() {
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }

    public static void main(String[] args) {
        JavaKafkaConsumer consumerThread = new JavaKafkaConsumer();
        consumerThread.start();
    }
}

maven

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hdc</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Kafka Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.hdc.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- Eclipse Scala Integration -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.8</version>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <projectnatures>
                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                    </projectnatures>
                    <buildcommands>
                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <classpathContainers>
                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                    </classpathContainers>
                    <excludes>
                        <exclude>org.scala-lang:scala-library</exclude>
                        <exclude>org.scala-lang:scala-compiler</exclude>
                    </excludes>
                    <sourceIncludes>
                        <sourceInclude>**/*.scala</sourceInclude>
                        <sourceInclude>**/*.java</sourceInclude>
                    </sourceIncludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

相关文章

网友评论

      本文标题:【kafka-部署】集群搭建&快速开始

      本文链接:https://www.haomeiwen.com/subject/xezpcqtx.html