美文网首页
Structured-Streaming 学习四—Produce

Structured-Streaming 学习四—Produce

作者: 九七学姐 | 来源:发表于2017-07-24 17:19 被阅读26次

    这一篇主要是介绍如何写kafka的prodecer代码
    照着我人生导师的代码照猫画虎来的~

    public class TestProducer extends Thread{
        private final KafkaProducer<String, String> producer;
        private final Boolean isAsync;
    
        public TestProducer(String topic,Boolean isAsync) {
            String[] str = topic.split(",");
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "host:port");
            properties.put("client.id","testProducer");
            properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<String, String>(properties);       
            this.isAsync = isAsync;
        }
    
        public void run() {
            File file = new File("/path/to/file");  // 要分词的文件所在的位置
            BufferedReader reader = null;
            try{
                reader = new BufferedReader(new FileReader(file));
                String tempString = null;
                int line=1;
                while((tempString = reader.readLine())!= null){
                    producer.send(new ProducerRecord<String, String>("topic50",line+"---"+tempString));//topic注意改成你自己的topic
                    System.out.println("Success send [" +line+ "] message..");
                    line++;
                }
                reader.close();
                System.out.println("Total send [" +line+ "] message..");
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if(reader !=null)
                {
                    try{
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            producer.close();
    
        }
    }
    
    
    /**
    *main函数
    **/
    public class Main {
        public static void main(String[] args) {
            TestProducer test = new TestProducer("topic50", false);
            test.start();
        }
    }
    

    相关文章

      网友评论

          本文标题:Structured-Streaming 学习四—Produce

          本文链接:https://www.haomeiwen.com/subject/xgwbkxtx.html