一、时间戳是什么?
时间戳主要是标记数据的变动时间,同时它在实际的查询和操作等接口中,也有它的妙用。
- 数据库的某条数据变动了,或修改或删除,都应该更新它的修改时间
- 增量查询,每一次查询的时间都比上一次查询的时间要大,这样就不是查询全部数据
- 增量同步,每一次同步数据后,记住本次同步时间,下一次同步的时候,同步数据的范围就是时间戳大于上一次同步时间
示例,在mysql数据库中,时间戳就是字段modified_date,每次变更的时候,就会更新该字段为now()。
`created_date` datetime DEFAULT NULL COMMENT '创建时间',
`modified_date` datetime DEFAULT NULL COMMENT '更新时间',
二、编程实现
- mybatis
详请参考链接:https://www.cnblogs.com/myesn/p/mybatis-plus-auto-fill-field-value-and-soft-delete.html
/**
* 创建时间
*/
@TableField(value = "created_date", fill = FieldFill.INSERT)
private Date createdDate;
/**
* 更新时间
*/
@TableField(value = "modified_date", fill = FieldFill.INSERT_UPDATE)
private Date modifiedDate;
需要再实现MetaObjectHandler
@Component
public class MybatisPlusMetaObjectHandler implements MetaObjectHandler {
@Override
public void insertFill(MetaObject metaObject) {
this.strictInsertFill(metaObject, "createdDate", Date.class, DateUtil.date());
this.strictInsertFill(metaObject, "modifiedDate", Date.class, DateUtil.date());
}
@Override
public void updateFill(MetaObject metaObject) {
this.strictInsertFill(metaObject, "modifiedDate", Date.class, DateUtil.date());
}
}
- jpa
@CreatedDate
@Column(name = "created_date", columnDefinition = "datetime DEFAULT NOW() COMMENT '创建时间'")
private Date createdDate;
@LastModifiedDate
@CreatedDate
@Column(name = "modified_date", columnDefinition = "datetime DEFAULT NOW() COMMENT '更新时间'")
private Date modifiedDate;
值得注意的是,你如果有手动执行sql语句,也切记把修改时间更新为当前时间。
三、流程图
增量查询.png 增量同步的流程图.png四、使用场景
3.1、增量查询
不同于分页查询,增量查询的条件是根据时间戳大于传入的时间,也就是只查询变化的数据,而分页查询想要查询出变化了的数据,必须逐页查询所有页码。
举个例子,经常去逛图书馆的你,想要知道最近有些什么新书上架,而非逛遍整个图书馆。图书查询系统,如果有书的上架时间,每次查询新书,就只需根据拿上一次查询时间和图书的上架时间比较。
- 这种增量的方式,有个前提条件是客户端能够记住上一次的查询时间戳,在查询后,能够更新端侧的查询时间戳为最新,待上次增量的时候,传入时间戳参数。
SELECT * FROM xxx
WHERE
<![CDATA[
(created_date >= #{timestamp} OR modified_date >= #{timestamp} )
]]>
3.2、增量同步
mysql数据库是有事务的,elasticsearch不具备事务,往往我们会使用Mysql来保存、修改或删除数据,复杂查询则交给elasticsearch。其实也就是CQRS架构中,我们会需要同步mysql数据到elasticsearch。
- 当mysql数据库的数据变更时,发布event事件,订阅者消费事件,把最新的数据保存到es里。
- 利用xxl-job这种分布式的定时任务框架,定期对变更数据同步到elasticsearch里,做一个兜底补偿。
- 和上一个场景--增量查询不同的是,这里的时间戳需要服务端来保存和更新,上一个场景中的时间戳一般都是由客户端来保存和更新。
@XxlJob("orderSyncJobHandler")
@Transactional(rollbackFor = Throwable.class)
public ReturnT<String> syncES(String param) {
// 把同步时间保存在redis里
final String redisKey = indexConfiguration.getKey();
//本次同步时间
Date thisSyncDate = DateUtil.offsetMinute(new Date(), -1);
try {
List<Order> orderList;
ValueOperations<String, String> operations = redisHandler.getStringValueOperations();
//上一次同步时间
Date lastSyncDate;
int offsetMonth = 24;
// 由xxl-job手动指定同步时间
if (!StringUtils.isEmpty(param)) {
// 指定时间范围,单位是月,也可以是日,小时,分。
offsetMonth = Integer.parseInt(param);
lastSyncDate = DateUtil.offsetMonth(new Date(), -1 * offsetMonth);
} else {
// 从redis里取上一次的增量时间点
if (!StringUtils.isEmpty(operations.get(redisKey))) {
lastSyncDate = DateUtil.parse(operations.get(redisKey), DatePattern.NORM_DATETIME_PATTERN);
} else {
// 如果redis的key为空,默认初始化最近24个月的数据
lastSyncDate = DateUtil.offsetMonth(new Date(), -1 * offsetMonth);
}
}
// 防止数据量过大,这里采用分页查询,使用的是一个do-while循环。
Pageable pageRequest = PageRequest.of(0, 100, Sort.Direction.ASC, "modifiedAt");
do {
orderList = orderRepository.findByModifiedAtGreaterThanEqualAndModifiedAtLessThanEqual(
lastSyncDate,
thisSyncDate,
pageRequest);
// 同步最新数据到es
for (Order order : orderList) {
orderIndexService.sendToESFromDB(order);
}
pageRequest = pageRequest.next();
} while (!orderList.isEmpty());
// 更新本次的同步时间
operations.set(redisKey, DateUtil.format(thisSyncDate, DatePattern.NORM_DATETIME_PATTERN));
return new ReturnT<>("定时同步订单数据到ES成功");
} catch (Exception e) {
log.error("定时同步订单数据到ES失败", e);
}
return ReturnT.FAIL;
}
网友评论