0.main函数
env.addSource(new KafkaLagSource(parameterTool))//添加source
.assignTimestampsAndWatermarks(new MetricEventLagWatermarkExtractor())/添加TimestampsAndWatermarks
.keyBy(x -> x.getTags().get("group") + "," + x.getTags().get("topic"))
//去重
.timeWindow(Time.seconds(2 * 60), Time.seconds(2 * 60)).reduce(new ReduceFunction<MetricEvent>() {
@Override
public MetricEvent reduce(MetricEvent value1, MetricEvent value2) throws Exception {
return value1.getTimestamp() > value2.getTimestamp() ? value1 : value2;
}
}) .flatMap(new ComputeLagFunction())
.print();
- 自定义flink source,读取kafka __consumer_offsets,解析
import com.google.common.collect.Lists;
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.group.BaseKey;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.group.OffsetKey;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.weakref.jmx.internal.guava.collect.Maps;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
//MetricEvent自定义类型,name,timestamp,tag,fields
public class KafkaLagSource extends RichSourceFunction<MetricEvent> {
public static final String KAFKA_LAG_NAME = "kafka_lag_name";
private transient Consumer<byte[], byte[]> consumer;
ParameterTool parameterTool;
public KafkaLagSource(ParameterTool parameterTool) {
this.parameterTool = parameterTool;
}
//一直从kafka poll 数据,解析后ctx.collect发到下个算子,每条数据是group在topic的某个partition消费的offset
@Override
public void run(SourceContext<MetricEvent> ctx) throws Exception {
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
if (record.key() == null) {
continue;
}
BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
byte[] value = record.value();
if (value == null) {
continue;
}
if (baseKey instanceof OffsetKey) {
OffsetKey newKey = (OffsetKey) baseKey;
MetricEvent event = buildEvent(newKey, value);
ctx.collect(event);
}
}
}
}
private MetricEvent buildEvent(OffsetKey newKey, byte[] value) {
OffsetAndMetadata offsetMeta = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
TopicPartition tp = newKey.key().topicPartition();
String group = newKey.key().group();
MetricEvent event = new MetricEvent();
event.setName(KAFKA_LAG_NAME);
event.setTimestamp(offsetMeta.commitTimestamp());
HashMap<String, String> tags = Maps.newHashMap();
tags.put(KafkaLagConstants.GROUP, group);
tags.put(KafkaLagConstants.TOPIC, tp.topic());
tags.put(KafkaLagConstants.PARTITION, tp.partition() + "");
event.setTags(tags);
HashMap<String, Object> fields = Maps.newHashMap();
fields.put(KafkaLagConstants.OFFSET, offsetMeta.offset());
event.setFields(fields);
return event;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
consumer = createKafkaConsumer();
consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
}
@Override
public void cancel() {
}
Consumer<byte[], byte[]> createKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", parameterTool.get("bootstrap.servers"));
props.put("group.id", parameterTool.get("group.id"));
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer<byte[], byte[]>(props);
}
}
2.计算lag
import com.google.common.base.Throwables;
import io.terminus.spot.analyzer.alerting.utils.KafkaLagConstants;
import io.terminus.spot.analyzer.base.models.MetricEvent;
import io.terminus.spot.analyzer.base.utils.GsonUtil;
import io.terminus.spot.analyzer.base.utils.HttpUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class ComputeLagFunction extends RichFlatMapFunction<MetricEvent, MetricEvent> {
private String kafkaManangerIp;
private int lagLimit;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool tools = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
kafkaManangerIp = tools.get("kafka.manager.ip");
lagLimit = tools.getInt("kafka.lag.limit", 100000);
log.info("kafkaManangerIp {}", kafkaManangerIp);
}
@Override
public void flatMap(MetricEvent value, Collector<MetricEvent> out) throws Exception {
Map<String, String> tags = value.getTags();
String topic = tags.get(KafkaLagConstants.TOPIC);
String group = tags.get(KafkaLagConstants.GROUP);
try {
String url = "http://" + kafkaManangerIp + ":9000/clusters/spot/consumers/" + group + "/topic/" + topic + "/type/KF";
String httpResult = (HttpUtils.doGet(url));
Document doc = Jsoup.parse(httpResult);
String offsetString = (doc.body().selectFirst("table.table").selectFirst("tr").select("td").eachText().get(1));
Long offset = Long.valueOf(offsetString);
value.getFields().put(KafkaLagConstants.LAG, offset > 0 ? offset : 0L);
value.getTags().put(KafkaLagConstants.PATH, url);
if (offset > lagLimit) { //lag过大 发送到下游
out.collect(value);
}
} catch (Exception e) {
log.error(Throwables.getStackTraceAsString(e));
}
}
}
拿到topic,group拼url请求,解析,拿到totallag。
image.png
3.pom
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.11.3</version>
</dependency>
网友评论