美文网首页
PostgreSQL upsert (Insert 或者upda

PostgreSQL upsert (Insert 或者upda

作者: 山哥Samuel | 来源:发表于2022-02-12 19:13 被阅读0次

    最近的业务有kafka消息处理,由于源头那里无法控制幂等性和分区指定(因为经过了AWS Kinesis, 不是单纯的kafka),于是过来的消息有几个问题:

    • 消息乱序
    • 几条消息同时过来
    • 消息重复发到不同的partition. (如果是同一个partition, 可以用idempotent参数解决)

    那么这个数据在入库的时候,要做这件事:

    • 如果是全新的,insert进去
    • 如果数据库已经存在:
      • 比较ts (timestamp)字段,如果ts更大(新),就更新
      • 如果ts 字段更小,就舍弃
      • 如果ts字段跟数据库的相等:比较sequence_number, 如果更大,就更新数据库,否则舍弃

    要做这些逻辑,有很多解决方案,比如在分布式环境下,可以通过引入分布式锁来解决。而本文想要通过数据库原生的办法来解决这个问题。旨在学习一下postgresql的知识。

    先建个表来试试:(tracker_id加入唯一性约束,后面就试它)

    create table test(id SERIAL primary key, tracker_id varchar(37), last_sync_date bigint, ts bigint, sequence_number bigint);
    alter table test add constraint test_unique_tracker unique(tracker_id);
    

    那这句upsert要怎么写呢?通过google, 一般只看到存在就update的例子,很难找到update之前还有条件的。通过查找文档和试验,以下SQL是可以达到我们的目的的:

    注意:

    • set语句里面,等号左边直接是字段名,右边用excluded.字段名
    • where 语句里, 左边要用表名引用字段名,右边用excluded引用字段名
    insert into test (tracker_id, ts, sequence_number, last_sync_date)
    values ('324bf0d7-63db-40eb-a335-9a0a017b0e6c', 1644662828, 94875, 1644662860)
    on conflict (tracker_id)
    do 
      update
         set ts = EXCLUDED.ts,
             sequence_number = EXCLUDED.sequence_number,
             last_sync_date = EXCLUDED.last_sync_date
        where test.ts < EXCLUDED.ts
         or (test.ts = EXCLUDED.ts and test.sequence_number < EXCLUDED.sequence_number);
    

    通过修改values里面的值,你可以观察到,结果如我们所期望。

    Java Springboot 里面试试

    @Data
    @Entity
    @Table
    public class Test {
        @Id
        private long id;
        private String trackerId;
        private long ts;
        private long sequenceNumber;
        private long lastSyncDate;
    }
    
    // DTO
    @Data
    public class TestDto {
        private String trackerId;
        private long ts;
        private long sequenceNumber;
        private long lastSyncDate;
    }
    
    public interface TestRepository extends CrudRepository<Test, Long> {
        @Modifying
        @Query(value = "insert into test(tracker_id, ts, sequence_number, last_sync_date) " +
                "values (:trackerId, :ts, :sequenceNumber, :lastSyncDate) " +
                "on conflict (tracker_id) " +
                "do update " +
                "set ts=EXCLUDED.ts, " +
                "    sequence_number=EXCLUDED.sequence_number, " +
                "    last_sync_date=EXCLUDED.last_sync_date " +
                "where (test.ts<EXCLUDED.ts) " +
                "   or (test.ts=EXCLUDED.ts " +
                "      and test.sequence_number<EXCLUDED.sequence_number);",
                nativeQuery = true)
        void upsert(@Param("trackerId") String trackerId,
                    @Param("ts") long ts,
                    @Param("sequenceNumber") long sequenceNumber,
                    @Param("lastSyncDate") long lastSyncDate);
    }
    

    Controller 方法

        @PostMapping("/test")
        @Transactional
        public String upsert(@RequestBody TestDto dto) {
            repo.upsert(dto.getTrackerId(), dto.getTs(), dto.getSequenceNumber(), dto.getLastSyncDate());
            return "Done";
        }
    

    试试看


    Screen Shot 2022-02-12 at 7.13.14 PM.png

    大功告成!

    相关文章

      网友评论

          本文标题:PostgreSQL upsert (Insert 或者upda

          本文链接:https://www.haomeiwen.com/subject/ficxlrtx.html