美文网首页
《日子》.分布式-Kafka集群安装

《日子》.分布式-Kafka集群安装

作者: 战神汤姆 | 来源:发表于2015-03-13 17:01 被阅读0次

    1、zookeeper集群安装

      参考《日子》.分布式之开篇-Zookeeper集群安装

      zookeeper 三台

           192.168.0.70

           192.168.0.71

           192.168.0.72

    2、kafka集群安装

       部署机器三台:

        192.168.0.73

        192.168.0.74

        192.168.0.75

        java 环境安装:

          jdk包 jdk-7u51-linux-x64.rpm 上传至目录 /cluster/install/jdk-7u51-linux-x64.rpm

          [root@localhost #] rpm -ivh /cluster/install/jdk-7u51-linux-x64.rpm

       准备开发包:kafka_2.10-0.8.2.0.tgz 上传至目录 /cluster/install/kafka_2.10-0.8.2.0.tgz

         解压缩包:[root@localhost #] tar zxvf /cluster/install/kafka_2.10-0.8.2.0.tgz

         复制到cluster目录下[root@localhost #] cp /cluster/install/kafka_2.10-0.8.2.0  /cluster

         修改server.properties文件

      [root@localhost #] vi /cluster/kafka_2.10-0.8.2.0/config/server.properties

        broker.id=73 --分别对应每个机器IP末尾数

      host.name=192.168.0.73
        zookeeper.connect=192.168.0.70:2181,192.168.0.71:2181,192.168.0.72:2181

      复制到其他机器[root@localhost #] scp  -r /cluster root192.168.0.74:/

                              [root@localhost #] scp  -r /cluster root192.168.0.75:/

       更改相应的broker.id 、host.name

      启动:  [root@localhost bin#]./kafka-server-start.sh /cluster/kafka_2.10-0.8.2.0/config/server.properties

       忍不住来张图,刚启动时74是leader,尝试了下down掉74,现在转为73了

    192.168.0.73,192.168.0.74,192.168.0.75,开启端口 9092

    [root@localhost #]/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT

    [root@localhost #]/etc/rc.d/init.d/iptables save #将更改进行保存

    [root@localhost #]/etc/init.d/iptables restart #重启防火墙以便改动生效

    3、代码示例

       最关键的部分,所有程序员最心切的期待开始

    属性配制:

    public interface KafkaProperties {

       final static String zkConnect = "192.168.0.70:2181,192.168.0.71:2181,192.168.0.72:2181";

       final static String groupId = "group1";

       final static String topic = "topic1";

       final static String kafkaServerURL = "192.168.0.73,192.168.0.74,192.168.0.75";

       final static int kafkaServerPort = 9092;

       final static int kafkaProducerBufferSize = 64 * 1024;

       final static int connectionTimeOut = 20000;

       final static int reconnectInterval = 10000;

       final static String topic2 = "topic2";

       final static String topic3 = "topic3";

       final static String clientId = "SimpleConsumerDemoClient";

    }

    消息生产者

    KafkaProducer

    import java.util.Properties;

    import kafka.producer.KeyedMessage;

    import kafka.producer.ProducerConfig;

    public class KafkaProducer extends Thread

    {

       private final kafka.javaapi.producer.Producerproducer;

       private final String topic;

       private final Properties props = new Properties();

       public KafkaProducer(String topic)

       {

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        props.put("metadata.broker.list","192.168.0.73:9092,192.168.0.74:9092,192.168.0.75:9092");

        producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));

        this.topic = topic;

       }

       @Override

       public void run() {

        int messageNo = 1;

        Long start=new java.util.Date().getTime();

        while (true)

        {

         String messageStr = new String("Message_" + messageNo);

         System.out.println("Send:" + messageStr);

         producer.send(new KeyedMessage(topic,messageStr));

         messageNo++;

         try {

          sleep(3000);

          } catch (InterruptedException e) {

           e.printStackTrace();

          }

       }

      }

    }

    消息消费者

    import java.util.HashMap;

    import java.util.List;

    import java.util.Map;

    import java.util.Properties;

    import kafka.consumer.ConsumerConfig;

    import kafka.consumer.ConsumerIterator;

    import kafka.consumer.KafkaStream;

    import

    kafka.javaapi.consumer.ConsumerConnector;

    /**

    *@author leicui bourne_cui@163.com

    */

    public class KafkaConsumer extends Thread

    {

        private final ConsumerConnector consumer;

        private final String topic;

        public KafkaConsumer(String topic)

        {

          consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

          createConsumerConfig());

          this.topic = topic;

        }

        private static ConsumerConfig createConsumerConfig()

        {

          Properties props = new Properties();

          props.put("zookeeper.connect", KafkaProperties.zkConnect);

          props.put("group.id", KafkaProperties.groupId);

          props.put("zookeeper.session.timeout.ms", "40000");

          props.put("zookeeper.sync.time.ms", "200");

          props.put("auto.commit.interval.ms", "1000");

          return new ConsumerConfig(props);

         }

         @Override

         public void run() {

          Map topicCountMap = new HashMap();

          topicCountMap.put(topic, new Integer(1));

          Map<String,List<KafkaSteam<byte[],byte[]>>> consumerMap =                  consumer.createMessageStreams(topicCountMap);

                    KafkaStream<byte[],byte[]> stream =consumerMap.get(topic).get(0);

                    ConsumerIterator<byte[],byte[]> it = stream.iterator();

                    while (it.hasNext()) {

                    System.out.println("receive:" + new

                    String(it.next().message()));

                   try {

                     sleep(3000);

                     } catch (InterruptedException e) {

                       e.printStackTrace();

                      }

                }

           }

    }

    运行入口:

    public class KafkaConsumerProducerDemo {

       public static void main(String[] args)

       {

           KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);

            producerThread.start();

             KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);

           consumerThread.start();

        }

    }

    相关文章

      网友评论

          本文标题:《日子》.分布式-Kafka集群安装

          本文链接:https://www.haomeiwen.com/subject/ntsbxttx.html