美文网首页
Kafka java api-生产者代码

Kafka java api-生产者代码

作者: 数据萌新 | 来源:发表于2018-10-10 16:45 被阅读0次

    前面做过命令行让生产者发送消息,现在使用java api来进行消息的生产,以及解释kafka高性能是如何实现(来源于学习资料)。
    使用shell创建topic和发送消息如下:

    #参数:zookeeper连接地址和端口号,副本数(包括自身),使用几个partition,topic的名称
    [root@mini1 bin]# ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic orderMq
    Created topic "orderMq".
    [root@mini1 bin]# kafka-console-producer.sh --broker-list mini1:9092 --topic orderMq
    hello tom
    hi jerry
    spring 
    hhaah
    xixi
    nini
    

    下面使用java api来发送消息
    注:如果topic已经存在那么肯定就不创建了,但是不存在则会创建。

    <dependencies>
           <dependency>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka_2.8.2</artifactId>
               <version>0.8.1</version>
           </dependency>
       </dependencies>
    
    public class KafkaProducerSimple {
        public static void main(String[] args) {
            /**
             * 1、指定当前kafka producer生产的数据的目的地
             *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
             *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
             */
            String TOPIC = "orderMq";
            /**
             * 2、读取配置文件
             */
            Properties props = new Properties();
            /*
             * key.serializer.class默认为serializer.class  key的序列化使用哪个类
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /*
             * kafka broker对应的主机,格式为host1:port1,host2:port2
             */
            props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
            /*
             * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
             * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
             * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
             * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
             * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
             * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
             * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
             * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
             */
            props.put("request.required.acks", "1");
            /*
             * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
             * 默认值:kafka.producer.DefaultPartitioner
             * 用来把消息分到各个partition中,默认行为是对key进行hash。
             */
            props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
    //        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
            /**
             * 3、通过配置文件,创建生产者
             */
            Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
            /**
             * 4、通过for循环生产数据
             */
            for (int messageNo = 1; messageNo < 100000; messageNo++) {
                /**
                 * 5、调用producer的send方法发送数据
                 * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
                 */
                producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
            }
        }
    }
    
    public class MyLogPartitioner implements Partitioner {
        private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
    
        public MyLogPartitioner(VerifiableProperties props) {
        }
    
        /**
         *
         * @param obj 传来的key 用它来进行hash分到partition
         * @param numPartitions 几个partition 如果集群中已存在该topic,那么partition数为原本存在数,否则默认是2
         * @return 生产到哪个partition
         */
        public int partition(Object obj, int numPartitions) {
     //使用下面被注释掉的代码,则类似于hadoop的partition分发方式,hash取模去发到对应序号的partition,这里使用1则表示发送到orderMQ-1的topic
    //        return Integer.parseInt(obj.toString())%numPartitions;
            return 1;
        }
    
    }
    

    启动kafka集群,执行main方法,去集群中查看。

    [root@mini1 orderMQ-1]# ll
    总用量 14296
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 14610099 11月 22 07:53 00000000000000000000.log
    [root@mini1 orderMQ-1]# ll
    总用量 14696
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 15012813 11月 22 07:53 00000000000000000000.log
    [root@mini1 orderMQ-1]# ll
    总用量 15184
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 15513339 11月 22 07:53 00000000000000000000.log
    [root@mini1 orderMQ-1]# ll
    总用量 15448
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 15783297 11月 22 07:53 00000000000000000000.log
    [root@mini1 orderMQ-1]# ll
    总用量 16288
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 16643559 11月 22 07:53 00000000000000000000.log
    [root@mini1 orderMQ-1]# ll
    总用量 16600
    -rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
    -rw-r--r--. 1 root root 16961019 11月 22 07:53 00000000000000000000.log
    

    看到消息在不断增加。

    相关文章

      网友评论

          本文标题:Kafka java api-生产者代码

          本文链接:https://www.haomeiwen.com/subject/ogfmaftx.html