最近的业务有kafka消息处理,由于源头那里无法控制幂等性和分区指定(因为经过了AWS Kinesis, 不是单纯的kafka),于是过来的消息有几个问题:
- 消息乱序
- 几条消息同时过来
- 消息重复发到不同的partition. (如果是同一个partition, 可以用idempotent参数解决)
那么这个数据在入库的时候,要做这件事:
- 如果是全新的,insert进去
- 如果数据库已经存在:
- 比较ts (timestamp)字段,如果ts更大(新),就更新
- 如果ts 字段更小,就舍弃
- 如果ts字段跟数据库的相等:比较sequence_number, 如果更大,就更新数据库,否则舍弃
要做这些逻辑,有很多解决方案,比如在分布式环境下,可以通过引入分布式锁来解决。而本文想要通过数据库原生的办法来解决这个问题。旨在学习一下postgresql的知识。
先建个表来试试:(tracker_id加入唯一性约束,后面就试它)
create table test(id SERIAL primary key, tracker_id varchar(37), last_sync_date bigint, ts bigint, sequence_number bigint);
alter table test add constraint test_unique_tracker unique(tracker_id);
那这句upsert要怎么写呢?通过google, 一般只看到存在就update的例子,很难找到update之前还有条件的。通过查找文档和试验,以下SQL是可以达到我们的目的的:
注意:
- set语句里面,等号左边直接是字段名,右边用
excluded.字段名
- where 语句里, 左边要用表名引用字段名,右边用excluded引用字段名
insert into test (tracker_id, ts, sequence_number, last_sync_date)
values ('324bf0d7-63db-40eb-a335-9a0a017b0e6c', 1644662828, 94875, 1644662860)
on conflict (tracker_id)
do
update
set ts = EXCLUDED.ts,
sequence_number = EXCLUDED.sequence_number,
last_sync_date = EXCLUDED.last_sync_date
where test.ts < EXCLUDED.ts
or (test.ts = EXCLUDED.ts and test.sequence_number < EXCLUDED.sequence_number);
通过修改values里面的值,你可以观察到,结果如我们所期望。
Java Springboot 里面试试
@Data
@Entity
@Table
public class Test {
@Id
private long id;
private String trackerId;
private long ts;
private long sequenceNumber;
private long lastSyncDate;
}
// DTO
@Data
public class TestDto {
private String trackerId;
private long ts;
private long sequenceNumber;
private long lastSyncDate;
}
public interface TestRepository extends CrudRepository<Test, Long> {
@Modifying
@Query(value = "insert into test(tracker_id, ts, sequence_number, last_sync_date) " +
"values (:trackerId, :ts, :sequenceNumber, :lastSyncDate) " +
"on conflict (tracker_id) " +
"do update " +
"set ts=EXCLUDED.ts, " +
" sequence_number=EXCLUDED.sequence_number, " +
" last_sync_date=EXCLUDED.last_sync_date " +
"where (test.ts<EXCLUDED.ts) " +
" or (test.ts=EXCLUDED.ts " +
" and test.sequence_number<EXCLUDED.sequence_number);",
nativeQuery = true)
void upsert(@Param("trackerId") String trackerId,
@Param("ts") long ts,
@Param("sequenceNumber") long sequenceNumber,
@Param("lastSyncDate") long lastSyncDate);
}
Controller 方法
@PostMapping("/test")
@Transactional
public String upsert(@RequestBody TestDto dto) {
repo.upsert(dto.getTrackerId(), dto.getTs(), dto.getSequenceNumber(), dto.getLastSyncDate());
return "Done";
}
试试看
Screen Shot 2022-02-12 at 7.13.14 PM.png
大功告成!
网友评论