美文网首页
MongoDB change stream 实战

MongoDB change stream 实战

作者: Yellowtail | 来源:发表于2020-06-09 14:28 被阅读0次

背景

最近公司要上一个需求,就是部分业务数据有插入动作时,对用户进行通知
举个虚假的例子,你的下属这会儿有个成交什么的

虽然 save 的逻辑的确是在我们的微服务代码里,
我如果在 save 这里加上这些逻辑,功能没问题,但是不就增加耦合度了么?后面修改逻辑,难度上天
用切面,先不说性能,感觉把代码逻辑放在api容器里,总觉得不太对,我觉得这个功能肯定是要放在离线计算的task容器里的

那用定时任务,看下这段时间内有多少新创建的数据?可以是可以,但是一是延迟高,二是实现逻辑有点傻

还好 MongoDB 提供了 Change Stream 的功能, 戳 mongodb changeStreams

原理和中文文档,网上一搜一堆,但是就是没有生产使用的具体代码例子

所以我来分享下

Spring data

我用的是 Spring Data Mongodb, 其它持久层框架的,请自行查阅官方文档

spring data change-streams

但是这个文档写的有点简陋

  1. import 没写,根本不知道该导哪个包
  2. 例子太简单了点,消费的时候就打印一下,看不到更多的细节

所以我尽量解释的详细点,但是注意,代码本身是运行不起来的,包括mongodb的配置和业务代码等,需要自行实现

代码

ChangeStreamService

我自己的代码,包名是 com.xixi 开头的,其它都是可以引入的包的代码


import com.xixi.SkmrActionLogsDocument;
import com.xixi.DBNameConstant;
import com.xixi.StopWatch;
import com.xixi.OnApplicationStarted;
import com.xixi.IChangeStreamStop;
import com.xixi.CommonErrorHandler;
import com.xixi.LogListener;

import com.mongodb.client.model.changestream.OperationType;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;


/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class ChangeStreamService implements IChangeStreamStop, OnApplicationStarted  {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamService.class);

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private LogListener logListener;

    @Autowired
    private ResumeTokenService resumeTokenService;

    private MessageListenerContainer messageListenerContainer;


    // https://docs.spring.io/spring-data/mongodb/docs/2.2.6.RELEASE/reference/html/#change-streams

    public void onStarted() {

        LOGGER.info("ChangeStreamService start");

        // 1. 启动一个 消息监听容器
        // 构造、使用了一个 spring 实现的线程池
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(10);
        simpleAsyncTaskExecutor.setThreadNamePrefix("cs-mq-consumer-");

        messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, simpleAsyncTaskExecutor);
        messageListenerContainer.start();

        // 2. 建 一个监听器, 当有 消息收到的时候, 会被调用, 消息的 body 会被转成 domain, 原始消息在 Document 里面
        // 也就是 LogListener

        
        // 3. 设置一些监听选项
        // https://docs.mongodb.com/manual/reference/change-events/#change-stream-output

        ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions("yourDb",
                "yourCollection", genOptions());


        // 4. 向容器注册一个监听请求, 返回值是一个订阅对象, 可以检查当前任务的状态, 也可以用来取消执行来释放资源
        CommonErrorHandler commonErrorHandler = new CommonErrorHandler();

        messageListenerContainer.register(new ChangeStreamRequest<>(logListener, requestOptions),
                SkmrActionLogsDocument.class, commonErrorHandler);
    }

    /**
     * 停止 消息监听容器
     * @param
     * @author YellowTail
     * @since 2020-06-02
     */
    public void stop() {
        LOGGER.info("ChangeStreamService stop MessageListenerContainer");

        StopWatch stopWatch = StopWatch.createStarted();

        // 5. 停止容器
        messageListenerContainer.stop();
        LOGGER.info("ChangeStreamService stop container success, cost {} ms", stopWatch.stopThenRestart());

        // 6. 向 redis 存入 这次最后的 token
        // dev 测试发现,新的容器先起来了,老的容器后 stop的
        resumeTokenService.updateToken(logListener.getLastToken());

        LOGGER.info("ChangeStreamService stop success, cost {} ms", stopWatch.stop());
    }

    /**
     * 生成 change stream 的配置
     * @param
     * @author YellowTail
     * @since 2020-06-09
     */
    private ChangeStreamOptions genOptions() {
        // 使用 pipeline 来过滤

        MatchOperation matchOperation = Aggregation.match(Criteria.where("operationType").is(OperationType.INSERT.getValue()));

//        ChangeStreamOptions changeStreamOptions =  ChangeStreamOptions.empty();

        ChangeStreamOptions.ChangeStreamOptionsBuilder changeStreamOptionsBuilder = ChangeStreamOptions.builder()
                .filter(Aggregation.newAggregation(matchOperation));

        String token = resumeTokenService.getToken();
        if (StringUtils.isNotBlank(token)) {
            // 重启的时候,可以接着上次的来
            changeStreamOptionsBuilder.resumeToken(new BsonString(token));
        }

        return changeStreamOptionsBuilder.build();
    }
}

现在来讲解一下代码细节

onStarted 这个方法做了几件事

  1. 新建一个消息监听容器,并启动起来
  2. 新建一个监听器,有消息来的时候,去消费
  3. 根据需要,设置一些监听选项
  4. 消息监听容器注册一个监听请求,关联上监听器

新建一个消息监听容器

我偷懒,没有做什么更多的设置,基本是全默认的
设置了线程数(应该是这个作用吧,没详细了解)
设置了线程名字前缀(方便在日志文件里搜索日志)

新建一个监听器

我新建了一个 Service 来做这件事


import com.xixi.SkmrActionLogsDocument;
import com.xixi.IdTokenModel;
import com.xixi.ResumeTokenStack;
import com.xixi.VisitorNotifyMqObject;
import com.xixi.VisitorMqProducerService;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Service;

/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class LogListener implements MessageListener<ChangeStreamDocument<Document>, SkmrActionLogsDocument> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

    @Autowired
    private MqProducerService mqProducerService;

    private final ResumeTokenStack resumeTokenStack = new ResumeTokenStack();

    /**
     * 收到消息的时候,这个方法会被调用
     * @param message
     * @author YellowTail
     * @since 2020-06-03
     */
    public void onMessage(Message<ChangeStreamDocument<Document>, SkmrActionLogsDocument> message) {
        LOGGER.info("LogListener receive a message");

        String token = message.getRaw().getResumeToken().get("_data").asString().getValue();

        SkmrActionLogsDocument document = message.getBody();
        String _id = document.get_id();

        LOGGER.info("document _id is {}, targetType {}", _id, document.getTargetType());

        // 1. 把 _id 和 token 存起来
        resumeTokenStack.push(new IdTokenModel(_id, token));

        // 2. 做一些事情
      

        // 3. 发送 mq 消息
        mqProducerService.send(xxx);

    }

    /**
     * 得到消费的最后一个 token
     * @author YellowTail
     * @since 2020-06-08
     */
    public String getLastToken() {
        IdTokenModel pop = resumeTokenStack.pop();

        if (null == pop) {
            return null;
        }

        return pop.getToken();
    }

}

错误处理

简单实现了一下

public class CommonErrorHandler implements ErrorHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommonErrorHandler.class);

    public void handleError(Throwable t) {
        LOGGER.error("occur error, ", t);
    }
}

优雅停机和避免重复消费

因为我们是k8s,容器重启的时候,可能会有多个容器并存,我担心多个容器同时去 watch,导致重复提醒,
所以加了优雅停机, 在重启的时候,让老容器不再去watch

逻辑就是

  1. 写一个 controller
  2. controller 调用接口IChangeStreamStopstop 方法

这样,k8s在对容器关停的时候,容器就能停止容器,避免重复消费;具体代码实现就不贴了

定义一个接口

public interface IChangeStreamStop {

    void stop();
}

所以可以看到前面的代码

ChangeStreamService implements IChangeStreamStop

k8s yaml 配置

containers:
- name: change-stream
  env:
    - name: aliyun_logs_xxx
      value: /ms/logs/*.log
  image: rxxx:latest
  imagePullPolicy: Always   
  ports:
    - containerPort: 8080
  #就绪检查
  readinessProbe:
    failureThreshold: 10
    httpGet:
      path: /xxx
      port: 8080
      scheme: HTTP
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    timeoutSeconds: 1
  #健康检查
  livenessProbe:
    failureThreshold: 10
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    tcpSocket:
      port: 8080
    timeoutSeconds: 1
  # 优雅停机
  lifecycle:
    preStop:
        httpGet:
            path: /xxx/stop
            port: 8080
            scheme: HTTP
  #资源限制
  resources:
    limits:
      memory: 2Gi
    requests:
      memory: 1500Mi

启动

那么 ChangeStreamService里的消息监听容器 什么时候启动起来呢?

定义一个接口

/**
 * IOC 容器启动之后会自动调用的接口
 * @author YellowTail
 * @since 2019-04-03
 */
public interface OnApplicationStarted {

    /**
     * <br>IOC 容器启动之后会调用的方法
     *
     * @author YellowTail
     * @since 2019-04-03
     */
    void onStarted();
}

对应代码

ChangeStreamService implements IChangeStreamStop, OnApplicationStarted 

具体自行实现

参考

mongodb changeStreams

spring data change-streams

相关文章

网友评论

      本文标题:MongoDB change stream 实战

      本文链接:https://www.haomeiwen.com/subject/tsrstktx.html