package wm.helper.util;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.Properties;
public class WmKafkaProducer {
public static void main(String[] args) throws IOException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); // 值的序列化
//生产者发送消息
String topic = "dc-diagnostic-report";
Producer<String, byte[]> procuder = new org.apache.kafka.clients.producer.KafkaProducer<String,byte[]>(props);
// 发送发字节数组
byte[] byfiles = getBytes("/Users/ohmycloud/work/cihon/sxw/wm-telematics/data/dtc.gz");
ProducerRecord<String, byte[]> msgtar = new ProducerRecord<String, byte[]>(topic, "LL2274082JW100128", byfiles);
procuder.send(msgtar);
procuder.close(); // 主动关闭生产者
System.out.println("消息发送完成。");
}
/**
* 获得指定文件的byte数组
*/
public static byte[] getBytes(String filePath){
byte[] buffer = null;
try {
File file = new File(filePath);
FileInputStream fis = new FileInputStream(file);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
byte[] b = new byte[1000];
int n;
while ((n = fis.read(b)) != -1) {
bos.write(b, 0, n);
}
fis.close();
bos.close();
buffer = bos.toByteArray();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
}
网友评论