美文网首页
Canal系列4- canal-adapter 接入mq消息,同

Canal系列4- canal-adapter 接入mq消息,同

作者: 尹楷楷 | 来源:发表于2023-09-28 22:11 被阅读0次

业务场景:
之前已经做过mysql-> deployer->adapter->es和deployer->rabbitmq,现在来把二者结合起来,实现:
mysql-> deployer->rabbitmq->adapter->es。
也就是说:mysql同步至canal-deployer,canal-deployer同步rabbitmq,canal-adapter消费同步至es。

canal.adapter-1.1.5\conf\application.yml

  • canal.conf.mode 改为rabbitMQ,表示要从rabbitMQ中捞数据
  • 配置好rabbbitmq的连接基本信息,注意不要加端口。默认就是
  • mysql数据源就不说了配好就行,注意defaultDS要用在后面
  • canalAdapters.instance 配置rabbitmq的队列名,后面会用到
  • canalAdapters.groups.groupId 同样后面要用到
  • es连接信息也不用说了,注意下 key: exampleKey 后面会用到
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 
    kafka.auto.offset.reset: 
    kafka.request.timeout.ms: 
    kafka.session.timeout.ms: 
    kafka.isolation.level: 
    kafka.max.poll.records: 
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 
    rocketmq.batch.size: 
    rocketmq.enable.message.trace: 
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host: 127.0.0.1
    rabbitmq.virtual.host: /
    rabbitmq.username: yinkai
    rabbitmq.password: yinkai
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS: # 这个也得注意,会用在book.yml中 ,单做那个sql语句的数据源
      url: jdbc:mysql://127.0.0.1:3306/db_example?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: canal_queue  # 这里配置rabbitmq的queueName,这个也得注意,会用在book.yml中  # canal instance Name or mq topic name
    groups:
    - groupId: g1 # 这个也得注意,会用在book.yml中
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        key: exampleKey # es的key(数据导入端的key)这个也得注意,会用在book.yml中
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: geektime
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

my.canal.adapter-1.1.5\conf\es7\book.yml

  • 我这个book.yml是用mysql表名命名的
  • 具体注释信息
dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值 (对应application中的值)
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key (对应application中的值)
# 这里要配置rabbitmq的 queueName 
destination: canal_queue            # cannal的instance或者MQ的topic (对应application中的值)
groupId: g1                        # 对应MQ模式下的groupId, 只会同步对应groupId的数据 (对应application中的值)
esMapping:
  _index: book           # es 的索引名称
  _type: _doc                   # es 的type名称, es7下无需配置此项
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
#  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性
  # sql映射
  sql: "select a.id as _id, a.name, a.year, a.last_updated,a.name2,a.name3,a.name4,a.name5,a.name6,date_format(a.create_time,'%Y-%m-%d %H:%I:%S') as create_time,a.my_json from book a"
#  objFields:
#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
#    _obj: object               # json对象
  etlCondition: "where a.last_updated>='{0}'"     # etl 的条件参数
  commitBatch: 3000                         # 提交批大小

相关文章

网友评论

      本文标题:Canal系列4- canal-adapter 接入mq消息,同

      本文链接:https://www.haomeiwen.com/subject/hlalbdtx.html