美文网首页
Flink基础系列21-Sink之Kafka

Flink基础系列21-Sink之Kafka

作者: 只是甲 | 来源:发表于2021-10-27 15:56 被阅读0次

    备注:
    Flink 1.9.0

    一.Sink概述

    Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

    官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。


    image.png

    二.Sink之Kafka

    2.1 将文本文件数据写入Kafka

    sensor.txt

    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4
    sensor_7,1547718202,6.7
    sensor_10,1547718205,38.1
    sensor_1,1547718207,36.3
    sensor_1,1547718209,32.8
    sensor_1,1547718212,37.1
    

    代码:

    package org.zqs.kafka;
    
    import java.io.*;
    
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class Producer2 {
        public static String topic = "sensor4";//定义主题
    
        public static void main(String[] args) throws IOException {
    
            Properties p = new Properties();
            p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//kafka地址,多个地址用逗号分割
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
    
    
            try {
            //读取文件内容
            String filename = "C:\\Users\\Administrator\\IdeaProjects\\SparkStudy\\src\\main\\resources\\sensor.txt";
            FileInputStream file = new FileInputStream(filename);
            //指定字符缓冲输入流
            BufferedInputStream bis = new BufferedInputStream(file);
    
            byte[] bys = new byte[1024];
            int len;
    
                while ((len = bis.read(bys)) != -1) {
                    //一次读取一个字节数组
                    String msg = new String(bys, 0, len);
                    System.out.println(msg);
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                    kafkaProducer.send(record);
    
                }
                bis.close();
    
            }
            catch (Exception e) {
            e.getStackTrace();
            } finally {
                kafkaProducer.close();
            }
    
    
        }
    }
    
    

    测试记录:


    image.png

    2.2 Java代码准备

    代码:

    package org.flink.sink;
    
    import org.flink.beans.SensorReading;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Properties;
    
    /**
     * @author  只是甲
     * @date    2021-09-13
     * @remark  Kafka Sink
     */
    public class SinkTest1_Kafka {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    //        // 从文件读取数据
         //   DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt");
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
            // 从Kafka读取数据
            DataStream<String> inputStream = env.addSource( new FlinkKafkaConsumer<String>("sensor4", new SimpleStringSchema(), properties));
    
            // 转换成SensorReading类型
            DataStream<String> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
            });
    
            dataStream.addSink( new FlinkKafkaProducer<String>("10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092", "sinktest", new SimpleStringSchema()));
    
            env.execute();
        }
    }
    

    直接运行代码,后面开启生产者,看输出。

    2.3 开启生产者

    因为真实环境非离线,来一条处理一条,所以这个地方我们开启一个Kafka的生产者,手工的录入一些数据

    /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-producer.sh --broker-list 10.31.1.124:9092 --topic first
    

    输入:


    image.png

    2.4 查看Kafka输出

    /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 10.31.1.124:9092 --topic sensor4
    
    image.png

    参考:

    1. https://www.bilibili.com/video/BV1qy4y1q728
    2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae

    相关文章

      网友评论

          本文标题:Flink基础系列21-Sink之Kafka

          本文链接:https://www.haomeiwen.com/subject/jwnailtx.html