一、添加Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
二、伪代码编写(仅参考)
/**
* 生产者(发布者)
*/
import com.lin.patitioner.HashPartitioner;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.util.Properties;
public class ProducerFirst {
static private final String TOPIC = "index_topic";
static private final String BROKER_LIST = "192.168.10.128:9092,192.168.10.129:9092,192.168.10.130:9092";
public static void main(String[] args) throws Exception {
Producer<String, String> producer = initProducer();
sendInfo(producer, TOPIC);
}
private static Producer<String,String> initProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKER_LIST);
props.put("serializer.class", StringEncoder.class.getName());
// 采取自定义 Partitioner (测试用例采用:HashPartitioner)
props.put("partitioner.class", HashPartitioner.class.getName());
// props.put("partitioner.class", RoundRobinPartitioner.class.getName());
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put("queue.buffering.max.messages", "1000000");
props.put("queue.enqueue.timeout.ms", "20000000");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
return producer;
}
private static void sendInfo(Producer<String, String> producer, String topic) throws InterruptedException {
KeyedMessage<String, String> messageOne = new KeyedMessage<String, String>(topic, "1", "test 1");
producer.send(messageOne);
KeyedMessage<String, String> messageTwo = new KeyedMessage<String, String>(topic, "2", "test 2");
producer.send(messageTwo);
KeyedMessage<String, String> messageThree = new KeyedMessage<String, String>(topic, "3", "test 3");
producer.send(messageThree);
if(producer != null){
producer.close();
}
}
}
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* 消费者(订阅者)
*/
public class HighLevelConsumer {
// 声明 zookeeper 集群配置
static private final String ZOOKEEPER = "192.168.10.128:2181,192.168.10.129:2182,192.168.10.130:2183";
// 创建并且声明主题 bin/kafka-topics.sh --create --zookeeper 192.168.10.128:2181 --replication-factor 1 --partitions 1 --topic index_topic
static private final String TOPIC = "index_topic";
public static void main(String[] args) {
args = new String[] { ZOOKEEPER, TOPIC, "topic-first", "consumer-first" };
if (args == null || args.length != 4) {
System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
System.exit(1);
}
String zookeeper= args[0];
String topic = args[1];
String groupId = args[2];
String consumerId = args[3];
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("consumer.id", consumerId);
props.put("auto.offset.reset", "largest");
// 自动提交关闭
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> interator = stream.iterator();
while (interator.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
String message = String.format(
"Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
messageAndMetadata.topic(), groupId, consumerId, messageAndMetadata.partition(),
messageAndMetadata.offset(), new String(messageAndMetadata.key()),
new String(messageAndMetadata.message()));
System.out.println(message);
consumerConnector.commitOffsets();
}
}
}
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* HashPartitioner
*/
public class HashPartitioner implements Partitioner {
public HashPartitioner(VerifiableProperties verifiableProperties ) {
}
@Override
public int partition(Object key, int numPartitions) {
if ((key instanceof Integer)) {
return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
}
return Math.abs(key.hashCode() % numPartitions);
}
}
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import java.util.concurrent.atomic.AtomicLong;
/**
* RoundRobinPartitioner
*/
public class RoundRobinPartitioner implements Partitioner {
public RoundRobinPartitioner(VerifiableProperties verifiableProperties ) {
}
private static AtomicLong next = new AtomicLong();
@Override
public int partition(Object key, int numPartitions) {
long nextIndex = next.incrementAndGet();
return (int)nextIndex % numPartitions;
}
}
网友评论