美文网首页实时数据相关Flink
blink sql-client 提交flink任务

blink sql-client 提交flink任务

作者: 岳过山丘 | 来源:发表于2019-02-21 20:08 被阅读8次

    前置任务:
    改造原有的sql-client,使之能够读取文件,提交里面的任务到flink集群。
    启动blink集群参考 https://www.jianshu.com/p/4f59e512b178
    目标:使用sql-client提交任务,读取kafka消息,自定义udtf解析,存入csv文件。

    1.sql文件

    create table kafka_stream(
      messageKey varbinary, 
      `message` varbinary, 
      topic varchar,
      `partition` int,
      `offset` bigint
    ) with (
      type ='kafka011',
      topic = 't102',
    `group.id`='t1',
    bootstrap.servers = 'localhost:9092'
    );
    
    create table csv_sink(
    id varchar,
    name varchar,
    age varchar
    ) with (
    type ='csv',
    path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
    );
    insert into csv_sink
    SELECT
        T.id,
        T.user_name,
        T.age
    from
        kafka_stream as S LEFT JOIN
        LATERAL TABLE (parseDataMessage(message)) as T (
            id,
            user_name,
            age
        ) on true;
    

    2.udtf 解析kafka 消息

    kafka里消息格式是

    {"attributes":{"schemaName":"dbtest","tableName":"result1"},"fieldCount":3,"fields":[{"index":0,"name":"id","null":false,"primaryKey":true,"type":"INTEGER","value":"90995"},{"index":1,"name":"user_name","null":false,"primaryKey":false,"type":"VARCHAR","value":"a"},{"index":2,"name":"age","null":false,"primaryKey":false,"type":"INTEGER","value":"90995"}],"timestamp":1550733014456}
    

    udtf

    
    public class ParseDataMessageUDTF extends TableFunction<Row> {
       public static final String __TIMESTAMP = "__timestamp";
       public static final String __EVENT_TYPE = "__event_type";
       public static final String __ATTRIBUTES = "__attributes";
    
       private List<String> fieldName = Lists.newArrayList();
    
       public ParseDataMessageUDTF(String args) {
           fieldName = Arrays.asList(args.split(","));
       }
    
    
       public void eval(byte[] message) {
           String mess = new String(message, Charset.forName("UTF-8"));
           DataMessage dataMessage = JSON.parseObject(mess, DataMessage.class);
           Row row = new Row(fieldName.size());
           Map<String, List<Field>> map = dataMessage.getFields().stream().collect(Collectors.groupingBy(Field::getName));
           for (int i = 0; i < fieldName.size(); i++) {
               switch (fieldName.get(i)) {
                   case __TIMESTAMP:
                       row.setField(i, dataMessage.getTimestamp());
                       break;
                   case __EVENT_TYPE:
                       row.setField(i, dataMessage.getEventType().toString());
                       break;
                   case __ATTRIBUTES:
                       row.setField(i, dataMessage.getAttributes());
                       break;
                   default:
                       List<Field> flist = map.get(fieldName.get(i));
                       if (flist != null && !flist.isEmpty()) {
                           row.setField(i, flist.get(0).getValue());
                       } else {
                           row.setField(i, null);
                       }
               }
    
           }
           collect(row);
       }
    
       @Override
       // 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
       public DataType getResultType(Object[] arguments, Class[] argTypes) {
           TypeInformation[] typeInformations = new TypeInformation[fieldName.size()];
    
           for (int i = 0; i < fieldName.size(); i++) {
               switch (fieldName.get(i)) {
                   case __TIMESTAMP:
                       typeInformations[i] = BasicTypeInfo.of(Long.class);
                       break;
                   case __EVENT_TYPE:
                       typeInformations[i] = TypeInformation.of(String.class);
                       break;
                   case __ATTRIBUTES:
                       typeInformations[i] = new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                      break;
    
                   default:
                       typeInformations[i] = BasicTypeInfo.of(String.class);
               }
           }
    
           RowTypeInfo rowType = new RowTypeInfo(typeInformations);
           return new TypeInfoWrappedDataType(rowType);
       }
    }
    

    3.打shade包 参考 https://www.jianshu.com/p/4f481fd8c0cb

    4.注册udtf funtion

    拷贝sql-client-defaults.yaml 为sql-client-kafka.yaml

    functions: # empty list
    - name: parseDataMessage
      from: class
      class: io.bigdata.blink.udf.ParseDataMessageUDTF
      constructor: 
        - "id,user_name,age"
    

    4.提交任务

    ./sql-client.sh embedded --sqlPath /Users/IdeaProjects/github/apache-flink/build-target/bin/kafka.sql -e sql-client-kafka.yaml -j ../lib/udtf-1.0-SNAPSHOT.jar

    写入数据到kafka

    5.运行结果

    image.png image.png

    相关文章

      网友评论

        本文标题:blink sql-client 提交flink任务

        本文链接:https://www.haomeiwen.com/subject/chgdyqtx.html