一、Pulsar Adaptor on Kafka 适配器
Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平台中,那么Pulsar adaptor on kafka就变的非常的有用了,它可以帮助我们在不改变原有kafka的代码基础上,即可接入pulsar,但是需要注意,相关配置信息需要进行一些调整,例如:地址与topic。
1.1 需要导入Pulsar兼容kafka的依赖包
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
<version>2.8.0</version>
</dependency>
1.2 编写生产者
public class KafkaAdaptorProducer {
public static void main(String[] args) throws Exception {
//1. 创建kafka生产者的核心类对象: KafkaProducer
//1.1: 创建生产者配置对象: 设置相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
props.put("acks", "all"); // 消息的确认方案
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化类型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化类型Producer<String, String> producer = new KafkaProducer<>(props);
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 发送数据
for (int i = 0; i < 10; i++) {
//2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i));
producer.send(producerRecord).get();
}
//3. 释放资源
producer.close();
}
}
1.3 编写消费者
public class KafkaAdaptorConsumer {
public static void main(String[] args) {
//1. 创建kafka的消费者的核心对象: KafkaConsumer
//1.1: 创建消费者配置对象, 并设置相关的参数:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
props.setProperty("group.id", "test"); // 消费者组的 id
props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量
props.setProperty("auto.commit.interval.ms","1000");//每间隔多长时间提交一次偏移量:单位 毫秒
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key 反序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // val 发序列化
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2. 给消费者设置订阅topic:
consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));
//3. 循环获取相关的消息数据
while (true) {
//3.1: 从kafka中获取消息数据: 参数表示等待超时时间
// 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息
for (ConsumerRecord<String, String> record : records) {
String massage = record.value();
System.out.println("消息数据为:"+massage);
}
}
}
}
二、Pulsar Adaptor on Spark 适配器
Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从Pulsar接收原始数据。
应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理
2.1 导入相关的依赖包
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>2.8.0</version>
</dependency>
2.2 编写spark的流式代码
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.HashSet;
import java.util.Set;
/**
* @Author: huangyibo
* @Date: 2022/6/6 22:40
* @Description:
*/
public class SparkStreamingAdaptor {
public static void main(String[] args) throws InterruptedException {
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
//1. 创建Java Spark Streaming 对象
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Adaptor");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
//2. 设置数据源: 从Pulsar中读取数据
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl, pulsarConf, new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineStream = streamingContext.receiverStream(pulsarReceiver);
//3. 对接收到数据进行处理
JavaDStream<String> stream = lineStream.map((Function<byte[], String>) String::new);
//4. 输出操作
stream.print();
//5. 启动
streamingContext.start();
streamingContext.awaitTermination();
}
}
网友评论