美文网首页实时数据相关
flink 自定义source,解析__consumer_off

flink 自定义source,解析__consumer_off

作者: 岳过山丘 | 来源:发表于2019-01-14 19:24 被阅读0次

    0.main函数

    env.addSource(new KafkaLagSource(parameterTool))//添加source
                   .assignTimestampsAndWatermarks(new MetricEventLagWatermarkExtractor())/添加TimestampsAndWatermarks
    .keyBy(x -> x.getTags().get("group") + "," + x.getTags().get("topic"))
    //去重
                   .timeWindow(Time.seconds(2 * 60), Time.seconds(2 * 60)).reduce(new ReduceFunction<MetricEvent>() {
               @Override
               public MetricEvent reduce(MetricEvent value1, MetricEvent value2) throws Exception {
                   return value1.getTimestamp() > value2.getTimestamp() ? value1 : value2;
               }
           }) .flatMap(new ComputeLagFunction())
                  .print();
    
    1. 自定义flink source,读取kafka __consumer_offsets,解析
    
    import com.google.common.collect.Lists;
    import kafka.common.OffsetAndMetadata;
    import kafka.coordinator.group.BaseKey;
    import kafka.coordinator.group.GroupMetadataManager;
    import kafka.coordinator.group.OffsetKey;
    import org.apache.flink.api.common.io.NonParallelInput;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.weakref.jmx.internal.guava.collect.Maps;
    
    import java.nio.ByteBuffer;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Properties;
    //MetricEvent自定义类型,name,timestamp,tag,fields
    
    public class KafkaLagSource extends RichSourceFunction<MetricEvent>  {
        public static final String KAFKA_LAG_NAME = "kafka_lag_name";
        private transient Consumer<byte[], byte[]> consumer;
        ParameterTool parameterTool;
    
        public KafkaLagSource(ParameterTool parameterTool) {
            this.parameterTool = parameterTool;
        }
    
    //一直从kafka poll 数据,解析后ctx.collect发到下个算子,每条数据是group在topic的某个partition消费的offset
        @Override
        public void run(SourceContext<MetricEvent> ctx) throws Exception {
            while (true) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
                while (iterator.hasNext()) {
                    ConsumerRecord<byte[], byte[]> record = iterator.next();
                    if (record.key() == null) {
                        continue;
                    }
                    BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
                    byte[] value = record.value();
                    if (value == null) {
                        continue;
                    }
                    if (baseKey instanceof OffsetKey) {
                        OffsetKey newKey = (OffsetKey) baseKey;
                        MetricEvent event = buildEvent(newKey, value);
                        ctx.collect(event);
                    }
                }
            }
        }
    
        private MetricEvent buildEvent(OffsetKey newKey, byte[] value) {
            OffsetAndMetadata offsetMeta = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
            TopicPartition tp = newKey.key().topicPartition();
            String group = newKey.key().group();
    
            MetricEvent event = new MetricEvent();
            event.setName(KAFKA_LAG_NAME);
            event.setTimestamp(offsetMeta.commitTimestamp());
            HashMap<String, String> tags = Maps.newHashMap();
            tags.put(KafkaLagConstants.GROUP, group);
            tags.put(KafkaLagConstants.TOPIC, tp.topic());
            tags.put(KafkaLagConstants.PARTITION, tp.partition() + "");
            event.setTags(tags);
            HashMap<String, Object> fields = Maps.newHashMap();
            fields.put(KafkaLagConstants.OFFSET, offsetMeta.offset());
            event.setFields(fields);
            return event;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            consumer = createKafkaConsumer();
            consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
        }
    
        @Override
        public void cancel() {
        }
    
        Consumer<byte[], byte[]> createKafkaConsumer() {
            Properties props = new Properties();
            props.put("bootstrap.servers", parameterTool.get("bootstrap.servers"));
            props.put("group.id", parameterTool.get("group.id"));
            props.put("enable.auto.commit", "false");
            props.put("auto.offset.reset", "latest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            return new KafkaConsumer<byte[], byte[]>(props);
        }
    }
    

    2.计算lag

    
    import com.google.common.base.Throwables;
    import io.terminus.spot.analyzer.alerting.utils.KafkaLagConstants;
    import io.terminus.spot.analyzer.base.models.MetricEvent;
    import io.terminus.spot.analyzer.base.utils.GsonUtil;
    import io.terminus.spot.analyzer.base.utils.HttpUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    import org.jsoup.Jsoup;
    import org.jsoup.nodes.Document;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Slf4j
    public class ComputeLagFunction extends RichFlatMapFunction<MetricEvent, MetricEvent> {
     private String kafkaManangerIp;
     private int lagLimit;
    
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         ParameterTool tools = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
         kafkaManangerIp = tools.get("kafka.manager.ip");
         lagLimit = tools.getInt("kafka.lag.limit", 100000);
         log.info("kafkaManangerIp {}", kafkaManangerIp);
    
     }
    
    
    
     @Override
     public void flatMap(MetricEvent value, Collector<MetricEvent> out) throws Exception {
         Map<String, String> tags = value.getTags();
         String topic = tags.get(KafkaLagConstants.TOPIC);
         String group = tags.get(KafkaLagConstants.GROUP);
         try {
             String url = "http://" + kafkaManangerIp + ":9000/clusters/spot/consumers/" + group + "/topic/" + topic + "/type/KF";
             String httpResult = (HttpUtils.doGet(url));
             Document doc = Jsoup.parse(httpResult);
             String offsetString = (doc.body().selectFirst("table.table").selectFirst("tr").select("td").eachText().get(1));
             Long offset = Long.valueOf(offsetString);
             value.getFields().put(KafkaLagConstants.LAG, offset > 0 ? offset : 0L);
             value.getTags().put(KafkaLagConstants.PATH, url);
             if (offset > lagLimit) { //lag过大 发送到下游
                 out.collect(value);
             }
         } catch (Exception e) {
             log.error(Throwables.getStackTraceAsString(e));
         }
     }
    }
    
    

    拿到topic,group拼url请求,解析,拿到totallag。


    image.png

    3.pom

      <dependency>
                <groupId>com.jayway.jsonpath</groupId>
                <artifactId>json-path</artifactId>
                <version>2.4.0</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.7</version>
                <scope>compile</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.11.0.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.jsoup</groupId>
                <artifactId>jsoup</artifactId>
                <version>1.11.3</version>
            </dependency>
    

    相关文章

      网友评论

        本文标题:flink 自定义source,解析__consumer_off

        本文链接:https://www.haomeiwen.com/subject/lsijdqtx.html