美文网首页
Apache Pulsar——Adaptor适配器

Apache Pulsar——Adaptor适配器

作者: 小波同学 | 来源:发表于2022-06-07 00:43 被阅读0次

一、Pulsar Adaptor on Kafka 适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。

在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平台中,那么Pulsar adaptor on kafka就变的非常的有用了,它可以帮助我们在不改变原有kafka的代码基础上,即可接入pulsar,但是需要注意,相关配置信息需要进行一些调整,例如:地址与topic。

1.1 需要导入Pulsar兼容kafka的依赖包

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

1.2 编写生产者

public class KafkaAdaptorProducer {
    
    public static void main(String[] args) throws Exception {
        //1. 创建kafka生产者的核心类对象: KafkaProducer
        //1.1: 创建生产者配置对象: 设置相关配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        props.put("acks", "all"); // 消息的确认方案
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化类型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化类型Producer<String, String> producer = new KafkaProducer<>(props);
        Producer<String, String> producer = new KafkaProducer<>(props);
        //2. 发送数据
        for (int i = 0; i < 10; i++) {
            //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i));
            producer.send(producerRecord).get();
        }
        //3. 释放资源
        producer.close();
    }
}

1.3 编写消费者

public class KafkaAdaptorConsumer {
    
    public static void main(String[] args) {
        //1. 创建kafka的消费者的核心对象: KafkaConsumer
        //1.1: 创建消费者配置对象, 并设置相关的参数:
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        props.setProperty("group.id", "test"); // 消费者组的 id
        props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量
        props.setProperty("auto.commit.interval.ms","1000");//每间隔多长时间提交一次偏移量:单位 毫秒
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key 反序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // val 发序列化
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2. 给消费者设置订阅topic:
        consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));

        //3. 循环获取相关的消息数据
        while (true) {
            //3.1: 从kafka中获取消息数据: 参数表示等待超时时间
            // 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();
                System.out.println("消息数据为:"+massage);
            }
        }
    }
}

二、Pulsar Adaptor on Spark 适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从Pulsar接收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理

2.1 导入相关的依赖包

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-spark</artifactId>
    <version>2.8.0</version>
</dependency>

2.2 编写spark的流式代码

import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.HashSet;
import java.util.Set;

/**
 * @Author: huangyibo
 * @Date: 2022/6/6 22:40
 * @Description:
 */

public class SparkStreamingAdaptor {

    public static void main(String[] args) throws InterruptedException {
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        String topic = "persistent://public/default/test_src";
        String subs = "test_sub";
        //1. 创建Java Spark Streaming 对象
        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Adaptor");
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        //2. 设置数据源: 从Pulsar中读取数据
        ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData<>();
        Set<String> set = new HashSet<>();
        set.add(topic);
        pulsarConf.setTopicNames(set);
        pulsarConf.setSubscriptionName(subs);
        SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl, pulsarConf, new AuthenticationDisabled());

        JavaReceiverInputDStream<byte[]> lineStream = streamingContext.receiverStream(pulsarReceiver);

        //3. 对接收到数据进行处理
        JavaDStream<String> stream = lineStream.map((Function<byte[], String>) String::new);
        //4. 输出操作
        stream.print();

        //5. 启动
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

相关文章

网友评论

      本文标题:Apache Pulsar——Adaptor适配器

      本文链接:https://www.haomeiwen.com/subject/jmhsmrtx.html