美文网首页数客联盟程序员
Learning Apache Flink(join API)

Learning Apache Flink(join API)

作者: Woople | 来源:发表于2017-11-29 09:22 被阅读144次

    本文将演示如何通过flink接kafka两个topic的数据,然后进行join。

    The DeserializationSchema

    首先要实现DeserializationSchema接口,用来解析kafka中的数据

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    import org.apache.flink.types.Row;
    import java.io.IOException;
    
    public class TestDeserializationSchema implements DeserializationSchema<Row> {
        private final TypeInformation<Row> typeInfo;
        private final String[] fieldNames;
    
        public TestDeserializationSchema(TypeInformation<Row> typeInfo) {
            this.typeInfo = typeInfo;
            this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
        }
    
        @Override
        public Row deserialize(byte[] message) throws IOException {
            try {
                String messages = new String(message);
                String[] messagesArray = messages.split(",");
    
                Row row = new Row(fieldNames.length);
                for (int i = 0; i < fieldNames.length; i++) {
                    row.setField(i, messagesArray[i]);
                }
    
                return row;
            } catch (Throwable t) {
                throw new IOException("Failed to deserialize Row object.", t);
            }
        }
    
        @Override
        public boolean isEndOfStream(Row row) {
            return false;
        }
    
        @Override
        public TypeInformation<Row> getProducedType() {
            return typeInfo;
        }
    }
    

    完整实现

    import java.util.Properties
    
    import com.woople.flink.streaming.connectors.kafka.TestDeserializationSchema
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.types.Row
    
    object KafkaJoin {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val properties = new Properties()
        // comma separated list of Kafka brokers
        properties.setProperty("bootstrap.servers", "hostA:6667")
        // id of the consumer group
        properties.setProperty("group.id", "test")
        val s1 = env.addSource(new FlinkKafkaConsumer010[Row]("foo", new TestDeserializationSchema(Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)), properties))
        val s2 = env.addSource(new FlinkKafkaConsumer010[Row]("baz", new TestDeserializationSchema(Types.ROW_NAMED(Array("imsi","phoneNum"), Types.STRING, Types.STRING)), properties))
    
        val result = s1.join(s2).where(_.getField(0)).equalTo(_.getField(0))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
            .apply((l, r) => {
              val row = new Row(4)
              row.setField(0, l.getField(0))
              row.setField(1, l.getField(1))
              row.setField(2, l.getField(2))
              row.setField(3, r.getField(1))
              row
            })
    
        result.print()
        env.execute("Kafka Window Stream Join")
      }
    }
    

    测试用例

    分别向topic中写入

    kafka-console-producer.sh --broker-list hostA:6667 --topic foo
    400123456789,88,99
    
    kafka-console-producer.sh --broker-list hostA:6667 --topic baz
    400123456789,19900000000
    

    最终得到的结果

    400123456789,88,99,19900000000
    

    相关文章

      网友评论

        本文标题:Learning Apache Flink(join API)

        本文链接:https://www.haomeiwen.com/subject/dkaybxtx.html