1.下载kafka安装包 https://www.apache.org/dyn/closer.cgi
【zookeeper已经与kafka集成到了一个块】
2.修改config/zookeeper.properties。【三个机器都操作】
主要配置:
maxClientCnxns=200
dataDir=/home/data/zookeeper #需要自己创建文件夹
tickTime=2000
initLimit=5
syncLimit=2
server.1=10.10.89.204:2888:3888 #根据节点修改成1,2,3
server.2=10.10.89.205:2888:3888
server.3=10.10.89.206:2888:3888
#10.10.89.204为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举 或者leader挂掉之后进行新的选举的端口默认是3888
3.创建dataDir=/home/data/zookeeper/myid。#根据节点修改成1,2,3。myid是zk集群用来发现彼此的标识,必须创建,且不能相同; # echo "1" >> myid
4.启动zookeeper。
#./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如报错误: failed; error='Cannot allocate memory' (errno=12)
原因:内存不够了,修改zookeeper-server-start.sh中的期望内存即可,默认512m。
5.编辑config/server.properties。
主要配置:
broker.id=0 #根据节点修改成0,1,2
port=9092 #端口号,可能是外部访问的端口号
host.name=10.10.89.204 #服务器绑定的域名
log.dirs=/datacenter/data
zookeeper.connect=10.10.89.204:2181,10.10.89.205:2181,10.10.89.206:2181
6.启动kafka。
#./bin/kafka-server-start.sh -daemon config/server.properties
java客户端测试:
1.创建maven java工程。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2.加入src/main/java/log4j.properties。
log4j.rootLogger=debug,Console
# appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=kafkaTest
log4j.appender.kafka.syncSend=false
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
#\u8F93\u51FA\u65E5\u5FD7\u5230\u63A7\u5236\u53F0
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Threshold=all
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n
#kafka
log4j.logger.com.demo.kafka.Log4jToKafka=info,kafka
#\u5173\u95EDspring\u4F4E\u7EA7\u522B\u65E5\u5FD7
log4j.logger.org.springside.examples.miniweb=ERROR
log4j.logger.com.octo.captcha.service.image.DefaultManageableImageCaptchaService=ERROR
log4j.logger.com.mchange.v2.resourcepool.BasicResourcePool=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.NewPooledConnection=ERROR
log4j.logger.com.mchange.v2.c3p0.management.DynamicPooledDataSourceManagerMBean=ERROR
log4j.logger.com.mchange.v2.c3p0.C3P0Registry=ERROR
log4j.logger.com.mchange.v2.log.MLog=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource=ERROR
3.创建生产者类。
public class Producer
{
/**
* 不加这个会抛出:java.lang.ExceptionInInitializerError错误
*/
private static final Logger log = LoggerFactory.getLogger(App.class);
public static void main( String[] args )
{
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
producer.close();
}
}
4.创建消费者类。
public class ConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.10.89.204:9092,10.10.89.205:9092,10.10.89.206:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String> (properties);
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (ConsumerRecord<String, String> record : records) {
System.out.printf("=================================================================================================offset = %d, value = %s", r ecord.offset(), record.value());
System.out.println();
}
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
}
}
}
网友评论