美文网首页
kafka集群搭建过程

kafka集群搭建过程

作者: 牙齿不帅 | 来源:发表于2020-02-25 15:15 被阅读0次

1.下载kafka安装包 https://www.apache.org/dyn/closer.cgi
【zookeeper已经与kafka集成到了一个块】

2.修改config/zookeeper.properties。【三个机器都操作】

主要配置:


maxClientCnxns=200

dataDir=/home/data/zookeeper #需要自己创建文件夹

tickTime=2000

initLimit=5

syncLimit=2

server.1=10.10.89.204:2888:3888 #根据节点修改成1,2,3

server.2=10.10.89.205:2888:3888

server.3=10.10.89.206:2888:3888

#10.10.89.204为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举   或者leader挂掉之后进行新的选举的端口默认是3888

3.创建dataDir=/home/data/zookeeper/myid。#根据节点修改成1,2,3。myid是zk集群用来发现彼此的标识,必须创建,且不能相同; # echo "1" >> myid
4.启动zookeeper。


#./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

如报错误: failed; error='Cannot allocate memory' (errno=12)
原因:内存不够了,修改zookeeper-server-start.sh中的期望内存即可,默认512m。

5.编辑config/server.properties。

主要配置:


broker.id=0 #根据节点修改成0,1,2

port=9092 #端口号,可能是外部访问的端口号

host.name=10.10.89.204 #服务器绑定的域名

log.dirs=/datacenter/data

zookeeper.connect=10.10.89.204:2181,10.10.89.205:2181,10.10.89.206:2181

6.启动kafka。


#./bin/kafka-server-start.sh -daemon config/server.properties

java客户端测试:

1.创建maven java工程。


<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>0.11.0.0</version>

  </dependency>

  <dependency>

  <groupId>org.apache.kafka</groupId>

  <artifactId>kafka-log4j-appender</artifactId>

  <version>0.10.2.1</version>

      </dependency>

      <dependency>

  <groupId>org.slf4j</groupId>

  <artifactId>slf4j-log4j12</artifactId>

  <version>1.7.5</version>

      </dependency>

</dependencies>

2.加入src/main/java/log4j.properties。


log4j.rootLogger=debug,Console

# appender kafka

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=kafkaTest

log4j.appender.kafka.syncSend=false

# multiple brokers are separated by comma ",".

log4j.appender.kafka.brokerList=10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092

log4j.appender.kafka.layout=org.apache.log4j.PatternLayout

log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

#\u8F93\u51FA\u65E5\u5FD7\u5230\u63A7\u5236\u53F0

log4j.appender.Console=org.apache.log4j.ConsoleAppender

log4j.appender.Console.Threshold=all

log4j.appender.Console.layout=org.apache.log4j.PatternLayout

log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n

#kafka

log4j.logger.com.demo.kafka.Log4jToKafka=info,kafka

#\u5173\u95EDspring\u4F4E\u7EA7\u522B\u65E5\u5FD7

log4j.logger.org.springside.examples.miniweb=ERROR

log4j.logger.com.octo.captcha.service.image.DefaultManageableImageCaptchaService=ERROR

log4j.logger.com.mchange.v2.resourcepool.BasicResourcePool=ERROR

log4j.logger.com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool=ERROR

log4j.logger.com.mchange.v2.c3p0.impl.NewPooledConnection=ERROR

log4j.logger.com.mchange.v2.c3p0.management.DynamicPooledDataSourceManagerMBean=ERROR

log4j.logger.com.mchange.v2.c3p0.C3P0Registry=ERROR

log4j.logger.com.mchange.v2.log.MLog=ERROR

log4j.logger.com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource=ERROR

3.创建生产者类。


public class Producer

{

/**

* 不加这个会抛出:java.lang.ExceptionInInitializerError错误

*/

private static final Logger log = LoggerFactory.getLogger(App.class);

public static void main( String[] args )

{

Properties properties = new Properties();

properties.put("bootstrap.servers", "10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092");

properties.put("acks", "all");

properties.put("retries", 0);

properties.put("batch.size", 16384);

properties.put("linger.ms", 1);

properties.put("buffer.memory", 33554432);

properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 100; i++) {

String msg = "Message " + i;

producer.send(new ProducerRecord<String, String>("HelloWorld", msg));

System.out.println("Sent:" + msg);

}

producer.close();

}

}

4.创建消费者类。


 public class ConsumerDemo {

 public static void main(String[] args) {

 Properties properties = new Properties();

 properties.put("bootstrap.servers",   "10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092");

 properties.put("group.id", "group-1");

 properties.put("enable.auto.commit", "true");

 properties.put("auto.commit.interval.ms", "1000");

 properties.put("auto.offset.reset", "earliest");

 properties.put("session.timeout.ms", "30000");

 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String> (properties);

 kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));

 while (true) {

 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

  try {

 Thread.sleep(2000);

 } catch (InterruptedException e) {

 e.printStackTrace();

 }

 for (ConsumerRecord<String, String> record : records) {

 System.out.printf("=================================================================================================offset = %d, value = %s", r ecord.offset(),  record.value());

 System.out.println();

 }

 System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");

 }

 }

 }

相关文章

网友评论

      本文标题:kafka集群搭建过程

      本文链接:https://www.haomeiwen.com/subject/kojtchtx.html