Java代码中Kafka的使用

作者: 凌冰_lonny | 来源:发表于2017-12-27 19:07 被阅读10次

    kafka的客户端,建立消费者

    public class KafkaClient {
    
        public static ConsumerConnector createConsumer() {
            Properties consumerProducer = new Properties();
            consumerProducer.put("zookeeper.connect", "ip:port");
            consumerProducer.put("group.id", "group-1");
            consumerProducer.put("serializer.class", "kafka.serializer.StringEncoder");
            consumerProducer .put("auto.offset.reset", "smallest");
            ConsumerConfig config = new ConsumerConfig(consumerProducer);
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
    
            return consumer;
        }
    
    
        public static class ConsumerTest extends Thread {
            AtomicBoolean start = new AtomicBoolean(true);
            CountDownLatch c;
            private String topic;
            ConsumerConnector consumer;
    
            public ConsumerTest(String topic, ConsumerConnector consumer) {
    
                this.consumer = consumer;
                this.topic = topic;
            }
    
            public void run() {
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(topic, 1);
    
                Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams = consumer.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> stream = createMessageStreams.get(topic).get(0);// 获取每次接收到的这个数据
    
                ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    
                while (iterator.hasNext()) {
                    String j = new String(iterator.next().message());
                    System.out.println(j);
                }
    
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            ConsumerConnector c = KafkaClient.createConsumer();
            ConsumerTest t = new ConsumerTest("topic_sname", c);
            t.start();
            Thread.sleep(1000 * 60 * 10);
            t.interrupt();
            c.shutdown();
        }
    }
    

    kafka数据收集

    public class KafkaDataCollect {
        static Logger LOG = Logger.getLogger(KafkaDataCollect.class);
        ConsumerConnector consumer = KafkaClient.createConsumer();
        //尺度时间戳 用于限定每日更新量
        private long ruler = 0;
        KafkaDataCollect(){
            //初始化尺度时间戳
            ruler = getRuler();
        }
        public long getRuler(){
            long current = System.currentTimeMillis();
            String date = new java.text.SimpleDateFormat("dd/MM/yyyy").format(new java.util.Date(current));
            date = date + " 00:00:00";
            long rulertime = 0; 
            try {
                rulertime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").parse(date).getTime();
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return rulertime;
        }
        
        public void dataStreamIn(String topic) {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1);
    
            Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams = consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = createMessageStreams.get(topic).get(0);// 获取每次接收到的这个数据
            int articleCount = 0;
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            Jedis jedis = new Jedis(ip, 6379);
            try {
                while (iterator.hasNext()) {
                    String msgStr = "";
                    if((System.currentTimeMillis() - ruler) >= 24 * 60 * 60 * 1000)
                    {
                        String date = new java.text.SimpleDateFormat("dd/MM/yyyy").format(new java.util.Date(ruler));
                        msgStr = date + "Kafka data stream collected "+ articleCount+ " articles.";
                        LOG.info(msgStr);
                        articleCount = 0;
                        ruler = ruler + 24 * 60 * 60 * 1000;
                    }
                    if((System.currentTimeMillis() - ruler) >= 8 * 60 * 60 * 1000)
                    {
                        SendMessage.send("17865153777", msgStr, null, null, null);
                        msgStr = "";
                    }
                    String j = new String(iterator.next().message());
                    Gson gson = new Gson();
                    JsonFromKafkaData jsonD = gson.fromJson(j, new TypeToken<JsonFromKafkaData>() {
                    }.getType());
                    try {
                        LOG.info(j);
                        if(jsonD.getSimId()!=null && !jsonD.getSimId().contains("_"))
                            jsonD.setSimId("clusterId_"+jsonD.getSimId());
                        jedis.lpush("kafka-queue", gson.toJson(jsonD));
                        articleCount++;
                    } catch (Exception e) {
                        // TODO: handle exception
                        LOG.error("Input newData into queue.", e);
                    }
                }
            } catch (Exception e) {
                jedis.close();
                SendMessage.send("178xxxxxxxx", "Kafka data collection process stoped.", null, null, null);
                // TODO: handle exception
                e.printStackTrace();
            }
        }
        
        
        public static void main(String[] args) {
            KafkaDataCollect kafkaStream = new KafkaDataCollect();
            kafkaStream.dataStreamIn("topic_name");
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Java代码中Kafka的使用

        本文链接:https://www.haomeiwen.com/subject/shjdgxtx.html