美文网首页
flink学习(2)flink Streaming从kafka接

flink学习(2)flink Streaming从kafka接

作者: 啊其11 | 来源:发表于2019-05-10 15:21 被阅读0次

    今天测试今天尝试了flink从kafka获取数据的测试程序编写,主要测试的kafka发送json的接收例子,尝试了几个kafka的DeserializationSchema(反序列化模式),包括了SimpleStringSchema,JSONKeyValueDeserializationSchema以及自定义DeserializationSchema.代码通过Flink计算引擎从Kafka相应的Topic中读取数据,通过FlinkKafkaConsumer010来实现.

    1.SimpleStringSchema

    官网上有SimpleStringSchema的示例,它可以构建DataStream[String],返回的就是kafka生产者发过来的信息。

    以下是代码:

    package whTest
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.streaming.api.scala._
    
    object Fromkafka {
      case class Person (name:String,sex:String,age:Int)
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
        /**
          * 如果我们启用了Flink的Checkpint机制,
          * 那么Flink Kafka Consumer将会从指定的Topic中消费消息,
          * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
          * 所以,如果Flink作业出故障了,Flink将会从最新的Checkpint中恢复,
          * 并且从上一次偏移量开始读取Kafka中消费消息。
          */
        env.enableCheckpointing(5000)
        import org.apache.flink.streaming.api.TimeCharacteristic
        //设置系统基本时间特性为事件时间
       // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       //kafka连接配置信息
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("zookeeper.connect", "localhost:2181")
        properties.setProperty("group.id", "test")
        val kafkaStream = env
          .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
          .print()
        
        // execute program
        env.execute("kafkaTest")
      }
    }
    
    

    测试结果:

    {"ID_Link":"11111","CarNum":100,"speed":10.0}//即为生产者发送的信息
    

    如果我们需要将消息进行封装,DataStream[String]->DataStream[MyType],可以在DataStream[String]后追加map函数进行转换,当然也可以使用下文的自定义DeserializationSchema。

    2. JSONKeyValueDeserializationSchema

    JSONKeyValueDeserializationSchema可以将序列化的JSON转换为ObjectNode对象,可以用objectNode.get("field")访问字段。新建JSONKeyValueDeserializationSchema需要带一个boolean类型参数,为true表示需要指明是否需要包含“元数据”、偏移量、分区和主题等信息,为false表明只需要数据。
    以下是代码和结果:

    package whTest
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.streaming.api.scala._
    
    object Fromkafka {
      case class Person (name:String,sex:String,age:Int)
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
        /**
          * 如果我们启用了Flink的Checkpint机制,
          * 那么Flink Kafka Consumer将会从指定的Topic中消费消息,
          * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
          * 所以,如果Flink作业出故障了,Flink将会从最新的Checkpint中恢复,
          * 并且从上一次偏移量开始读取Kafka中消费消息。
          */
        env.enableCheckpointing(5000)
        import org.apache.flink.streaming.api.TimeCharacteristic
        //设置系统基本时间特性为事件时间
       // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        // only required for Kafka 0.8
        properties.setProperty("zookeeper.connect", "localhost:2181")
        properties.setProperty("group.id", "test")
        val kafkaStream = env
          .addSource(new FlinkKafkaConsumer010("test", new JSONKeyValueDeserializationSchema(true), properties))
          .print()
      
        // execute program
        env.execute("kafkaTest")
      }
    }
    
    

    结果:

      // new JSONKeyValueDeserializationSchema(true)   send json :{"name":"limei","age":12,"sex":"f"}        get : {"value":{"name":"limei","age":12,"sex":"f"},"metadata":{"offset":10,"topic":"test","partition":0}}
        //  new JSONKeyValueDeserializationSchema(false)   send json :{"name":"limei","age":12,"sex":"f"}        get :{"value":{"name":"limei","age":12,"sex":"f"}}
    

    3.自定义DeserializationSchema

    自定义DeserializationSchema需要实现DeserializationSchema接口,这一部分代码可以参考官方代码org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer。
    我需要实现的是将从kafka获取到的json数据转化为我需要的自定义pojo类(VideoData)。
    主要是要实现DeserializationSchema方法的deserialize方法,这个方法的输入是byte[] message类型,我们需要将其转换为String类型,然后通过Json工具类解析成POJO类。这里我使用的是google的Gson框架。

    以下是DeserializationSchema类和POJO类代码

    package whTest;
    
    import com.google.gson.Gson;
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.ByteOrder;
    import java.nio.charset.Charset;
    import java.nio.CharBuffer;
    import java.nio.charset.CharsetDecoder;
    
    public class VideoDataDeSerializer implements DeserializationSchema<VideoData> {
        private static final long serialVersionUID = 1L;
        @Override
        public VideoData deserialize(byte[] message) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
                 
            String mess = this.byteBuffertoString(buffer);
                    //封装为POJO类
            Gson gson = new Gson();
            VideoData data = gson.fromJson(mess, VideoData.class);
            return data;
        }
    
        @Override
        public boolean isEndOfStream(VideoData nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<VideoData> getProducedType() {
            return null;
        }
    
        /**
         * 将ByteBuffer类型转换为String类型
         * @param buffer
         * @return
         */
        public static String byteBuffertoString(ByteBuffer buffer)
        {
            Charset charset = null;
            CharsetDecoder decoder = null;
            CharBuffer charBuffer = null;
            try
            {
                charset = Charset.forName("UTF-8");
                decoder = charset.newDecoder();
                // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
                charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
                return charBuffer.toString();
            }
            catch (Exception ex)
            {
                ex.printStackTrace();
                return "";
            }
        }
    }
    
    

    POJO类:

    package whTest;
    
    public class VideoData {
        public VideoData(String ID_Link,int CarNum,float speed){
            this.ID_Link =ID_Link;
            this.CarNum = CarNum;
            this.speed = speed;
        }
        private String ID_Link;
        private int CarNum;
        private float speed;
    
        public void setID_Link(String ID_Link) {
            this.ID_Link = ID_Link;
        }
    
        public void setCarNum(int carNum) {
            CarNum = carNum;
        }
    
        public void setSpeed(float speed) {
            this.speed = speed;
        }
    
        public String getID_Link() {
            return ID_Link;
        }
    
        public int getCarNum() {
            return CarNum;
        }
    
        public float getSpeed() {
            return speed;
        }
    }
    
    

    主函数只需要把DeserializationSchema类修改为自定义的VideoDataDeSerializer,当kafka生产者发送过来用VideoData转换的Json类型时,返回的就是我们需要的DataStream[VideoData]。这就不需要后面再用map函数将String转换为VideoData类型了。

    相关文章

      网友评论

          本文标题:flink学习(2)flink Streaming从kafka接

          本文链接:https://www.haomeiwen.com/subject/ccyaoqtx.html