canal 当前只支持内存模式 , 能保存多少数据 ,是否可以从指定position后 在开始消费 ,
带着这些疑问 看了下MemoryEventStoreWithBuffer
这块 代码 设计的真是很简单 ,和disruptor 设计 感觉还是有差距的
RingBuffer 结构 , 默认队列 size 16*1024 默认内存 16m ,可以通过配置文件设置 。
// 阻塞put/get操作控制信号
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
// 记录下put/get/ack操作的三个下标
private AtomicLong putSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前put操作最后一次写操作发生的位置
private AtomicLong getSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前get操作读取的最后一条的位置
private AtomicLong ackSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前ack操作的最后一条的位置
没有对下标做 cacheline 的填充 ,MemoryEventStoreWithBuffer 的读写线程在 put ,get 操作 之前 都会去 获取 ReentrantLock ,那么事实上 同时只有一个线程 能够操作 RingBuffer ,感觉这一块设计 有提升的地方
个人猜测 有几个位置 是Ha 方案的一个关键 ,
第一个 就是 上一个binlog msg确认位置
第二个 EventStore ringbuffer的消费确认位置 。
如果 canal server 1 down , canal server 2 可以从zk 获取 对应instance binlog msg position 去 获取数据 。
如果 某一个 canal client down , 那么 另一个 canal client 会启动 , 依然会从 EventStore ringbuffer的消费确认位置 去拉取消息 。
至少 这两个位置 应该是保存在 zk 上的 。
EventTransactionBuffer 会flush binlog postion 到 zk 。 但是如果我做的话 ,我应该会在client 调用 ack 方法的时候去同步这个position ,而不是在它放入eventstore 之前 放入 。这一点 需要在debug 的时候确认 。
public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}
if (!successed) {
throw new CanalParseException("consume failed!");
}
LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
}
});
}
位点组件选择
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
网友评论