写这篇文章的主要目的是因为在自己使用Debezium的时候遇见的问题,网上的案例又比较少,自己决定写一篇文章,希望可以帮助到其他人,因为是第一次写博客,有什么不足的地方希望见谅或提出宝贵意见。
一、需求和软件的版本
(1)需求:监控sellInfo表的部分列,当这些列新增或更新或删除的时候,kafka生成一条变更信息到消费端,业务代码拿到这些信息来更新Elasticsearch
(2)软件版本:sqlServer版本,Microsoft SQL Server 2016 (SP1) ;
kafka版本,kafka2.12-2.5.0;
Debezium,JAR版本,debezium-debezium-connector-sqlserver-1.2.1
二、开启CDC
(1)为数据库开启CDC(需要sysadmin权限)
EXEC sys.sp_cdc_enable_db
(2)为数据库表开启CDC
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', --数据库名
@source_name = 'SellInfo', --数据库表名
@capture_instance = default, --默认实例
@role_name = NULL, --角色权限,null表示不设置权限
@captured_column_list = 'ArticleId,HtmlID,Title' --为哪些列开启监控
sellInfo开启cdc成功的标志,数据库出现cdc库,如图所示,这是发生update操作时,cdc会做记录。
二、安装kafka和开启kafka-connect
(1)下载kafka,并解压,重命名
当前路径 /root
tar zxvf kafka_2.12-2.5.0.tgz -C ./
mv kafka_2.12-2.5.0 kafka
(2)修改kafka/config/server.properties
listeners = PLAINTEXT://192.168.16.2:9092
advertised.listeners=PLAINTEXT://192.168.16.2:9092
(3)开启zookeeper,开启kafka
当前路径 /root/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
(4)下载Debezium JAR包
JAR下载地址,下载jar,并解压到 /kafka/kafka_connect_plugins(自己创建一个文件夹)
(5)修改kafka/config/connect-distribute.properties
bootstrap.servers=192.168.16.2:9092
plugin.path= /root/kafka/kafka_connect_plugins
(6)启动kafka-connect
当前路径 /root/kafka
bin/connect-distributed.sh -daemon config/connect-distributed.properties
开启成功的标志,postman可以使用访问端口(默认的端口号,可以在connect-distributed.properties 更改)
(7)编辑connect的属性
当前路径 /root/kafka
创建一个json文件,vi sellInfo.json
{
"name": "sqlserver-cdc-dbo-sellInfo",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "xx.xx.xx.xx", ---数据库ip
"database.port": "1433", ----端口号
"database.user": "xxxx", ----数据库用户名
"database.password": "xxxx", ----数据库密码
"database.dbname": "idea_test", -----数据库名
"database.server.name": "fullfillment",
"table.whitelist": "dbo.SellInfo", -----数据库表白名单
"snapshot.mode":"schema_only", ----快照方式,inital--表数据全量更新,schema_only--表数据开启cdc后的更新
"column.blacklist":"dbo.SellInfo.xxx,dbo.SellInfo.xxx" ----不需要监控的表的字段(下面有我用的时候遇见的问题)
"database.history.kafka.bootstrap.servers": "192.168.16.2:9092",
"database.history.kafka.topic": "dbhistory.fullfillment",
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"event.processing.failure.handing.mode":"warn"
}
}
更多数据设置可以参考Debezium的官方文档:
用于SQL Server的Debezium连接器(里面还有一些其他的连接器,mysql等)
(8)执行连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.16.2:8083/connectors/ -d @sellInfo.json
开启成功的标志:
更多postman关于连接器的操作可以参考这篇文章,帮助我很多
sqlserver增量订阅&消费实时同步kafka,最新解决方案,看完不会你打我!
三、验证成果
1、update [dbo].SellInfo set Audit = 2 where ArticleId = 1 (可以是新增,可以是删除)
2、查看当前的有哪些topic
此时路径 /root/kafka
bin/kafka-topics.sh --zookeeper localhost --list
__consumer_offsets ----默认创建
connect-configs ----默认创建
connect-offsets ----默认创建
connect-status ----默认创建
dbhistory.fullfillment ----记录表的结构信息
fullfillment ----记录表的结构信息
fullfillment.dbo.SellInfo -----SellInfo表改变的记录
查看fullfillment.dbo.SellInfo的信息:
当前路径 /root/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.16.2:9092 --topic fullfillment.dbo.SellInfo --from-beginning
四、总结
(1)第一次搞这种冷门的东西,刚开始确实有点头疼,后来明白,一定要学会参考别人的东西,查看官方文档,制定流程分阶段进行等,这也是我为什么要写这篇文章的原因
(2)可能出现的问题,CDC的开启,如果在执行命令的时候出现错误、按照给出的错误信息去baidu;kafka的安装与运行 --daemon是可以让程序离开shell界面后台运行,但是不会显示开启时的日志;kafka-connect的配置文件sellInfo.json中的'column.blacklist'字段出现问题,比如一张表a,id,name,age,phone,我CDC监控id,age ,’column.blacklist‘剩下的字段,运行connect同步数据时会出错,我看了源码感觉应该是一个bug,当然很有可能是有些配置不知道怎么使用导致的,最后我通过分析源码将表的结构变成id,age,name,phone避免了这种异常,错误代码我贴出来,希望有人可以解答这个问题。
08-03 12:41:56,160] ERROR Error requesting a row value, row: 3, requested index: 15 at position 3 (io.debezium.relational.TableSchemaBuilder:221) [2020-08-03 12:41:56,160] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31) org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=000003cb:000ec557:0003, change_lsn=000003cb:000ec557:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:220) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:247) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:524) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:190) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:162) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:222) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:251) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:97) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:193) ... 10 more
网友评论