合并报表执行合并前的重分类操作通常采用批处理方式,往往需要按T+1或按小时周期处理,按T+1更新数据无法满足月结期间财务人员的对账需求,按小时全量加载数据造成系统压力过大用户体验不佳,增量加载又需要开发复杂的增量处理逻辑,故需要一种简单高效的实时重分类方案。Flink从1.11开始加入了对CDC(Change Data Capture,变更数据获取)的原生支持,使得流处理的实时重分类计算成为可能。
方案介绍:
image.png合并报表实时重分类面临的一大挑战是如何获取会计凭证的changelog数据流,此方案中使用Debezium实时采集Postgresql数据库的wal日志,将格式化后的changelog流写入Kafka的指定Topic中,使用FlinkSQL将凭证流关联相关维表后通过重分类逻辑将按维度汇总的数据流输出到HANA目标表中。
Debezium:
在本方案中的作用是通过生成一个KafkaConnector以Postgres为Source,Kafka为Sink,将wal日志解析成debezium的标准格式写入到kafka中,可以使用Json或Avro格式,这里使用Avro格式,相比Json格式,Avro格式表的Schema信息不用冗余存放,只存放数据信息,可大大节省数据占用在kafka的磁盘空间,提高传输效率。
创建Debezium Kafka Connector的代码
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.dbname": "postgres",
"database.user": "***",
"database.password": "***",
"database.server.name" : "postgres",
"database.include.list": "public",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"table.include.list": "public.acdoca",
"snapshot.mode": "always",
"producer.override.acks": "1",
"producer.override.compression.type": "snappy",
"producer.override.linger.ms": "50",
"producer.override.batch.size": "327680",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"decimal.handling.mode": "string"
}
}
Kafka:
创建KafkaConnector后kafka首先会启动一个SnapshotTask,对源表加锁,并记录日志当前的LSN位置,然后执行全量查询,获取全量数据,释放锁后通过之前记录的LSN位置开启实时日志采集任务。因为同步到kafka的Topic中partitions只有一个,所以保证了数据写入的顺序。
写入到Kafka的数据:
Flink:
负责从Kafka获取凭证数据并关联相关维表,考虑到有的维表数据量大及维表后续数据更新问题,通过缓存+LRU算法解决。使用flinksql创建源表source_acdoca,维表dim_prps,目标表dwd_final,通过运行聚合函数将源表和维表关联后的数据写入dwd_final,flink可以自动识别并处理changelog流,当源表更新后,目标表自动按主键实时更新。
String acdoca_ddl = "create table if not exists source_acdoca(\n" +
"rclnt string comment 'rclnt',\n" +
"rldnr string comment 'rldnr',\n" +
"rbukrs string comment 'rbukrs',\n" +
"gjahr string comment 'gjahr',\n" +
"belnr string comment 'belnr',\n" +
"docln string comment 'docln',\n" +
"poper string comment 'poper',\n" +
"ps_posid string comment 'ps_posid',\n" +
"ps_pspid string comment 'ps_pspid',\n" +
"hsl string comment 'hsl',\n" +
"PRIMARY KEY (rbukrs,gjahr,belnr,docln) NOT ENFORCED\n" +
")\n" +
"with(\n" +
"'connector' = 'kafka',\n" +
"'format' = 'debezium-avro-confluent',\n" +
"'debezium-avro-confluent.schema-registry.url' = 'http://confluent:8081/',\n" +
"'topic' = 'postgres.public.acdoca',\n" +
"'properties.bootstrap.servers' = 'kafka:9092',\n" +
"'properties.group.id' = 'flink-analysis',\n" +
"'scan.startup.mode' = 'earliest-offset'\n" +
")";
String prps_ddl = "create table if not exists dim_prps(\n" +
"pspnr string comment 'pspnr',\n" +
"posid string comment 'posid',\n" +
"usr02 string comment 'usr02',\n" +
"PRIMARY KEY (pspnr) NOT ENFORCED\n" +
")\n" +
"with(\n" +
"'connector.type' = 'jdbc',\n" +
"'connector.url' = 'jdbc:postgresql://postgres:5432/postgres',\n" +
"'connector.table' = 'public.prps',\n" +
"'connector.driver' = 'org.postgresql.Driver',\n" +
"'connector.username' = '***',\n" +
"'connector.password' = '***',\n" +
//"'connector.scan.fetch-size' = '200',\n" +
"'connector.lookup.cache.max-rows' = '1000',\n" +
"'connector.lookup.cache.ttl' = '60000'\n" +
")";
String mysqlDDL = "CREATE TABLE if not exists dwd_final (\n" +
"gjahr string comment 'gjahr',\n" +
"poper string comment 'poper',\n" +
"rbukrs string comment 'rbukrs',\n" +
"usr02 string comment 'usr02',\n" +
"ps_posid string comment 'ps_posid',\n" +
"ps_pspid string comment 'ps_pspid',\n" +
"hsl decimal(10,2) comment 'hsl',\n" +
"PRIMARY KEY (gjahr,poper,rbukrs,usr02) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'jdbc',\n" +
"'url' = 'jdbc:mysql://mysql:3308/mysql?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false',\n" +
"'table-name' = 'dwd_final',\n" +
"'driver' = 'com.mysql.jdbc.Driver',\n" +
"'username' = '***',\n" +
"'password' = '***'\n" +
")";
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql(acdoca_ddl);
tEnv.executeSql(prps_ddl);
tEnv.executeSql(mysqlDDL);
tEnv.executeSql("insert into dwd_final select t1.gjahr,t1.poper,t1.rbukrs,t1.ps_posid,t1.ps_pspid,t2.usr02,sum(cast(t1.hsl as DECIMAL(10,2))) as hsl from source_acdoca as t1 inner join dim_prps as t2 on t1.ps_posid=t2.posid group by t1.gjahr,t1.poper,t1.rbukrs,t1.ps_posid,t1.ps_pspid,t2.usr02");
网友评论