美文网首页Java
基于nginx+flume+kafka+mongodb实现埋点数

基于nginx+flume+kafka+mongodb实现埋点数

作者: JAVA进阶之道 | 来源:发表于2020-06-16 16:54 被阅读0次

名词解释

埋点其实就是用于记录用户在页面的一些操作行为。例如,用户访问页面(PV,Page Views)、访问页面用户数量(UV,User Views)、页面停留、按钮点击、文件下载等,这些都属于用户的操作行为。

开发背景

我司之前在处理埋点数据采集时,模式很简单,当用户操作页面控件时,前端监听到操作事件,并根据上下文环境,将事件相关的数据通过接口调用发送至埋点数据采集服务(简称ets服务),ets服务对数据解析处理后,做入库操作。

流程图如下

image

这种方式其实没毛病,有如下优势:

(1) 开发难度低,前后端开发人员约定好好接口、参数、模型等即可。

(2) 系统部署容易,排查问题难度低。

但是仔细想想,这种模式弊端也很多:

(1) 开发工作量大。例如,这次有个记录用户点击支付按钮的埋点,前端需要在支付的前端代码中插入一段埋点的代码,后端可能也需要改;后面又来了一个记录用户点击加入购物车按钮的埋点,前端又需要在加入购物车代码中插入一段埋点的代码…… 后续,只要有新的埋点需求,前端就得加代码或者改代码。

(2) 前端埋点接口调用过多影响性能。有可能访问一次页面,或者点击一次连接,要调用十几个埋点接口,而这些接口调用的结果,并不是用户所关心的内容。这样频繁的接口调用,对后端服务也构成了压力。

(3) 增加调试成本。每一次新的埋点需求,都需要前后端联调,沟通、参数、模型等,增加了调试时间。

设计目标

针对老的埋点系统的弊端,需要做重构,达成以下目标:

(1) 埋点数据收集自动化、实时收集;

(2) 减少前端开发工作量;

(3) 减少前后端联调工作量;

(4) 减少前端埋点相关代码。

软件、硬件依赖

请求日志记录

Nginx

日志收集程序

Flume

消息中间件

kakfa+zookeeper

后端日志数据处理服务

ets埋点数据采集微服务,采用springcloud stream技术。

缓存

redis

数据仓库

MongoDB

系统流程图

image

系统逻辑

(1) 客户端发送请求,通过Nginx进行请求转发,Nginx以json格式记录请求参数等信息至access.log;

(2) flume实时监控Nginx日志变化,收集并过滤有用日志,发送至kafka;

(3) ets服务作为消息消费者,监听kafka topic消息,收到消息后,对日志消息解析处理,确定日志数据存储集合;

(4) 解析token获取用户信息;

(5) 将转化好的日志数据入库。

系统说明

用户的大部分操作行为,都对应着一个URL后端请求或者前端请求。而这些请求必然会经过nginx进行请求转发,nginx日志会记录每一个请求的信息。

通过对nginx日志的实时监控、采集、入库,我们将用户行为数据标准化入库,从而替代了老式的前端调用埋点接口的方式,减少了开发量,减少了埋点请求数量。

========================下面为具体实现细节=======================

nginx日志JSON格式配置

logformat按照如下格式配置,日志会打印为json格式,便于flume和ets服务做数据解析。

重要的配置含义如下:

image

注:

1.escape=json 防止乱码

2."source":"$ http request source" 记录访问来源(用来区分不同的应用程序),前端需在http header追加request_source参数

例如我司开发的两款应用:

单分享APP1单分享小程序2

3."token":"$http_Authorization" 记录token

日志效果:

POST:

{

 "ipaddress": "192.168.8.1",

 "remote_user": "",

 "time_local": "26/Apr/2020:13:51:49 +0800",

 "request": "POST /order/api/zz/zz/zzHTTP/1.1",

 "request_uri": "/order/api/zz/zz/zz",

 "uri_with_args": "/order/api/zz/zz/zz",

 "request_method": "POST",

 "request_id": "123456",

 "status": "200",

 "body_bytes_sent": "388",

 "request_body": "{\"merchantCode\":\"123455",\"orderProductList\":[{\"skuId\":\"12345\",\"quantity\":1}],\"shippingMethod\":1}",

 "args": "",

 "http_referer": "https://s.h.com/hh/b2b/order/zzz",

 "http_user_agent": "Mozilla/5.0 (Linux; Android 9; MHA-AL00 Build/HUAWEIMHA-AL00; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.116 Mobile Safari/537.36 agentweb/4.0.2 UCBrowser/11.6.4.950",

 "http_x_forwarded_for": "",

 "request_source": "",

 "backend_request": "",

 "token": "Bearer token token", 

 "header-buz-params": ""

}

flume配置

通过flume来探测和收集nginx日志,并发送至kafka topic

配置日志拦截器

拦截器的作用是,过滤无用的nginx日志,筛选出有用的nginx日志(与埋点相关的日志)以发送至kafka

拦截器逻辑

public class MyFlumeInterceptor implements Interceptor {
  @Override
  public void initialize() {}

  // 单个事件拦截
  @Override
  public Event intercept(Event event) {
    String line = new String(event.getBody(), Charset.forName("UTF-8"));

    try {
      Object parse = JSON.parse(line);
      Map map=(Map)parse;
      Boolean backendRequest = MapUtils.getBoolean(map, "backend_request");
      if(!backendRequest){
        return null;
      }
    }
    catch (Exception e) {
      return null;
    }
    return event;
  }

  // 批量事件拦截
  @Override
  public List<Event> intercept(List<Event> events) {
    List<Event> out = Lists.newArrayList();
    Iterator it = events.iterator();
    while (it.hasNext()) {
      Event event = (Event) it.next();
      Event outEvent = this.intercept(event);
      if (outEvent != null) {
        out.add(outEvent);
      }
    }
    return out;
  }

  @Override
  public void close() {
  }

  public static class Builder implements Interceptor.Builder {
    @Override
    public Interceptor build() {
      return new MyFlumeInterceptor();
    }

    @Override
    public void configure(Context context) {
    }
  }
}

拦截器打包

执行maven package,将拦截器项目打包后放入flume安装目录lib目录下

配置日志收集探测和发送

配置日志来源、kafka地址、kafka topic、channel

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source配置 
a1.sources.r1.type = exec

# 监控多个日志文件,-f 参数后指定一组文件即可 
a1.sources.r1.command = tail -F /data/logs/xxx/xxx/xxx/cloud-gateway1.log /data/logs/xxx/xxx/xxx/cloud-gateway2.log /data/logs/xxx/xxx/xxx/cloud-gateway3.log

# sink配置 kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = nginx_log_topic
a1.sinks.k1.brokerList = 192.168.xx.xx:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# channel配置 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink、channel绑定配置 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

修改flume源码

目的:解决日志过长无法发送问题

注:如果日志不是很长,可不必修改

日志长度超过2048时,日志无法发送到kafka,原因是flume对日志长度做了限制

LineDeserializer类有个常量MAXLINE_DELT=2048,单行日志超出该长度时发送失败

解决办法:

修改源码,把2048改成2048*10,重新打包并放入lib文件夹下,重启flume

zookeeper集群配置

关于zookeeper集群的配置,网上有很多参考资料。这里作简要说明

配置zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
#快照日志的存储路径
dataDir=/home/data/zookeeper
dataLogDir=/data/logs/zookeeper
clientPort=2181
server.1=192.168.xx.xx:2888:3888
server.2=192.168.xx.xx:2888:3888
server.3=192.168.xx.xx:2888:3888

#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

#192.168.xx.xx为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

创建myid文件

在241,242,243机器上,分别创建3个myid文件,相当于标记自己的ID:

#server1
echo "1" > /home/data/zookeeper/myid
#server2
echo "2" > /home/data/zookeeper/myid
#server3
echo "3" > /home/data/zookeeper/myid

kafka集群配置

关于kafka集群的配置,网上资料也很多,这里做简要说明

### server.properties

broker.id=3
listeners=PLAINTEXT://192.168.20.243:9092
advertised.host.name=192.168.20.243
advertised.port=9092
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/logs/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# The minimum age of a log file to be eligible for deletion due to age
# log.retention.hours=168
log.retention.minutes=10
log.cleanup.policy=delete
log.cleaner.enable=true
log.segment.delete.delay.ms=0
log.segment.bytes=1073741824

log.retention.check.interval.ms=300000
#设置zookeeper的连接端口
zookeeper.connect=192.168.xx.xxx:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#配置kafka集群地址列表
broker.list=192.168.xx.xx:9092
producer.type=async

设置消息生命周期

目的:kafka topic中的日志消息被ets服务消费后,就没什么用处了,这时候需要即时清理掉,防止消息积压,占用内存

server.properties增加数据清理配置:
log.retention.minutes=10 数据最多保存10分钟。
log.cleanup.policy=delete 日志清理策略:删除
log.cleaner.enable=true 开启消息日志清理

以上配置是全局配置

单独对某个topic消息设置有效期:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

用户行为数据采集服务

以下只列出与kafka相关的配置。其他配置由于保密问题不作列出,参考springcloud相关技术文档

Application.yml配置

配置bingdings、kafka topic

server:
  port: 37000

spring:
  cloud:
    stream:
      bindings:
        nginx_kafka_log_input:
          destination: nginx_log_topic
          group: s1
      default-binder: kafka

配置中心配置

将kafka公用配置配置到springcloud config配置中心

spring:
  kafka:
    bootstrap-servers: 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092
    auto-create-topics: true
    enable-auto-commit: true
    auto-commit-interval: 20000
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

定义消息通道

public interface NginxLogMessageChannel {
    /**
     * 接收消息通道名称
     */
    String NGINX_KAFKA_LOG_INPUT = "nginx_kafka_log_input";

    /**
     *接收消息通道
     */
    @Input(NGINX_KAFKA_LOG_INPUT)
    MessageChannel recieveLogMessageChannel();
}

</pre>

消息订阅

监听kafka topic的nginx日志消息,解析并存入MongoDB

@EnableBinding(value = NginxLogMessageChannel.class)
@Component
@Slf4j
public class NginxLogMessageListener {
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());

    @StreamListener(NginxLogMessageChannel.NGINX_KAFKA_LOG_INPUT)
    public void receiveLog(Message<String> message) {
        EXECUTOR.execute(() -> {
            try {
               //日志消息解析、入库逻辑…………………..
            }
            catch (Exception e) {
              log.error("解析nginx请求日志时出错", e);
            }
        });
    }
}

消息重复消费问题

如果消费者消费消息的时间大于最大心跳持续时间(例如网络波动、服务器压力大 等),kafka会默认这个消费者已经挂掉了,kafka会协调其他分区的消费者再去消费此消息。

但其实该消费者没挂掉,还在消费消息,等他消费完成后,kafka新协调的消费者也消费了这条消息,会导致消息重复消费

解决方案:

对已消费的消息,将其request_id存入redis,并设置失效时间;在消费消息前,先去redis查询该id是否已经消费过

MongoDB基本模型设计

image

Redis 数据配置

URI-集合映射关系

在Redis中,配置与埋点相关的uri和该uri请求日志存储的集合的映射关系。

key为 hosjoy-hbp-ets: url-collection-map: ,数据类型为hash类型。存储结构为:hashkey: uri+ "-"+请求方式+"-"+请求来源 hashvalue: 集合名

例如: hashkey: order/api/app/xx/xx-POST-1 hashvalue:t app log

如果有多个集合,则用逗号分开

可变URI配置

带有URL占位符的URL配置形式如下:/order/api/xx/{id}/detail-GET-1

系统最终效果

用户通过APP访问某商品的商品详情页,我们根据该动作的请求URI、请求方式、请求来源去redis获取配置的mongo集合,然后去mongo中查看该动作的埋点数据

JSON数据样例如下:

{
    _id: ObjectId("5ea2580eda1591218c72d1a4"),
    requestId: "1cdd5881421a3353f36304949bbcab41",
    requestUri: "/product/api/xx/xx",
    requestMethod: "GET",
    requestSource: "2",
    requestStatus: "200",
    userId: "1234",
    username: "123456789",   
    remoteIp: "xx.xx.xx.xx",
    remoteUser: "",
    httpReferer: "https://xx.com/xx",
    requestTime: ISODate("2020-04-24T03:07:58.742Z"),
    userAgent: "Mozilla/5.0 (Linux; Android 9; MHA-AL00 Build/HUAWEIMHA-AL00; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.116 Mobile Safari/537.36 MicroMessenger/7.0.13.1640(0x27000D39) Process/appbrand0 NetType/4G Language/zh_CN ABI/arm64 WeChat/arm64",
    mapParams: {
        merchantCode: "123",
        id: "234"
    }
}

可以看出,通过监控nginx日志并记录用户行为动作的用户信息、时间、访问来源、浏览器信息、请求参数等,可以方便的做用户行为数据分析,而无需前端再去编写埋点相关代码,实现了自动、实时的采集用户行为数据。

后续做统计也很方便, 例如,现在需要统计2020年4月20日至2020年4月25日之间,访问来源为APP(request_source为2)的某商品的商品详情页访问uv:(以下为伪代码,主要便于理解,具体统计时以mongo语法为准):

select count(1) as UV from (
    select distinct userId from t_test_log
    where requestUri=” /product/api/xx/xx” 
    and requestMethod=”GET” 
    and requestSource=”2” 
    and requestStatus=”200” 
    and mapParams.id=”输入商品ID” 
    and requestTime>2020-04-20 
    and requestTime<2020-04-25
)

相关文章

网友评论

    本文标题:基于nginx+flume+kafka+mongodb实现埋点数

    本文链接:https://www.haomeiwen.com/subject/tcyrxktx.html