美文网首页
canal 源码解析 canal 的存储

canal 源码解析 canal 的存储

作者: pcgreat | 来源:发表于2020-01-04 18:03 被阅读0次

canal 当前只支持内存模式 , 能保存多少数据 ,是否可以从指定position后 在开始消费 ,
带着这些疑问 看了下MemoryEventStoreWithBuffer
这块 代码 设计的真是很简单 ,和disruptor 设计 感觉还是有差距的
RingBuffer 结构 , 默认队列 size 16*1024 默认内存 16m ,可以通过配置文件设置 。

    // 阻塞put/get操作控制信号
    private ReentrantLock     lock          = new ReentrantLock();
    private Condition         notFull       = lock.newCondition();
    private Condition         notEmpty      = lock.newCondition();
    // 记录下put/get/ack操作的三个下标
    private AtomicLong        putSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前put操作最后一次写操作发生的位置
    private AtomicLong        getSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前get操作读取的最后一条的位置
    private AtomicLong        ackSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前ack操作的最后一条的位置

没有对下标做 cacheline 的填充 ,MemoryEventStoreWithBuffer 的读写线程在 put ,get 操作 之前 都会去 获取 ReentrantLock ,那么事实上 同时只有一个线程 能够操作 RingBuffer ,感觉这一块设计 有提升的地方

个人猜测 有几个位置 是Ha 方案的一个关键 ,
第一个 就是 上一个binlog msg确认位置
第二个 EventStore ringbuffer的消费确认位置 。
如果 canal server 1 down , canal server 2 可以从zk 获取 对应instance binlog msg position 去 获取数据 。
如果 某一个 canal client down , 那么 另一个 canal client 会启动 , 依然会从 EventStore ringbuffer的消费确认位置 去拉取消息 。
至少 这两个位置 应该是保存在 zk 上的 。
EventTransactionBuffer 会flush binlog postion 到 zk 。 但是如果我做的话 ,我应该会在client 调用 ack 方法的时候去同步这个position ,而不是在它放入eventstore 之前 放入 。这一点 需要在debug 的时候确认 。

public AbstractEventParser(){
        // 初始化一下
        transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
                if (!running) {
                    return;
                }

                if (!successed) {
                    throw new CanalParseException("consume failed!");
                }

                LogPosition position = buildLastTransactionPosition(transaction);
                if (position != null) { // 可能position为空
                    logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                }
            }
        });
    }

位点组件选择
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

相关文章

网友评论

      本文标题:canal 源码解析 canal 的存储

      本文链接:https://www.haomeiwen.com/subject/vazxactx.html