美文网首页
Kafka Producer 之发送字节数组 - Java 版

Kafka Producer 之发送字节数组 - Java 版

作者: 焉知非鱼 | 来源:发表于2018-08-19 11:36 被阅读623次
package wm.helper.util;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*;
import java.util.Properties;

public class WmKafkaProducer {
    public static void main(String[] args) throws IOException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");       // 键的序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  // 值的序列化

        //生产者发送消息
        String topic = "dc-diagnostic-report";
        Producer<String, byte[]> procuder = new org.apache.kafka.clients.producer.KafkaProducer<String,byte[]>(props);

        // 发送发字节数组
        byte[] byfiles = getBytes("/Users/ohmycloud/work/cihon/sxw/wm-telematics/data/dtc.gz");
        ProducerRecord<String, byte[]> msgtar = new ProducerRecord<String, byte[]>(topic, "LL2274082JW100128", byfiles);
        procuder.send(msgtar);
        procuder.close(); // 主动关闭生产者
        System.out.println("消息发送完成。");
    }

    /**
     * 获得指定文件的byte数组
     */
    public static byte[] getBytes(String filePath){

        byte[] buffer = null;
        try {
            File file = new File(filePath);
            FileInputStream fis = new FileInputStream(file);
            ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
            byte[] b = new byte[1000];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
            }
            fis.close();
            bos.close();
            buffer = bos.toByteArray();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer;
    }
}

相关文章

网友评论

      本文标题:Kafka Producer 之发送字节数组 - Java 版

      本文链接:https://www.haomeiwen.com/subject/ymkciftx.html