前序文章陆续介绍了批处理的基本概念,Job使用、Step、Item的结构以及文件的读写。本文将接着前面的内容说明数据库如何进行批处理读写,这也是日常使用最多的场景,利用数据库批量读写实现数据库同步,业务结算类的工作。
1.数据读取
数据库是绝大部分系统要用到的数据存储工具,因此针对数据库执行批量数据处理任务也是很常见的需求。数据的批量处理与常规业务开发不同,如果一次性读取百万条,对于任何系统而言肯定都是不可取的。为了解决这个问题Spring Batch提供了2套数据读取方案:
基于游标读取数据
基于分页读取数据
1.1.自定义数据源
往往实际工作场景中,使用的都是动态的数据源或者是多库读取,对于Reader而言是可以自定义数据源的存在,无论是基于游标还是分页的读取器都是可以手动注入数据源
2.游标读取数据
对于有经验大数据工程师而言数据库游标的操作应该是非常熟悉的,因为这是从数据库读取数据流标准方法,而且在Java中也封装了ResultSet这种面向游标操作的数据结构。
ResultSet一直都会指向结果集中的某一行数据,使用next方法可以让游标跳转到下一行数据。Spring Batch同样使用这个特性来控制数据的读取:
1.在初始化时打开游标。
2.每一次调用ItemReader::read方法就从ResultSet获取一行数据并执行next。
3.返回可用于数据处理的映射结构(map、dict)。
在一切都执行完毕之后,框架会使用回调过程调用ResultSet::close来关闭游标。由于所有的业务过程都绑定在一个事物之上,所以知道到Step执行完毕或异常退出调用执行close。下图展示了数据读取的过程:
分页
SQL语句的查询结果称为数据集(对于大部分数据库而言,其SQL执行结果会产生临时的表空间索引来存放数据集)。游标开始会停滞在ID=2的位置,一次ItemReader执行完毕后会产生对应的实体FOO2,然后游标下移直到最后的ID=6。最后关闭游标。
2.1.JdbcCursorItemReader
JdbcCursorItemReader是使用游标读取数据集的ItemReader实现类之一。它使用JdbcTemplate中的DataSource控制ResultSet,其过程是将ResultSet的每行数据转换为所需要的实体类。
JdbcCursorItemReader的执行过程有三步:
1.通过DataSource创建JdbcTemplate。
2.设定数据集的SQL语句。
3.创建ResultSet到实体类的映射。 大致如下:
//庸才周久一
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
除了上面的代码,JdbcCursorItemReader还有其他属性:
JdbcCursorItemReader.png
在运行代码之前请先在数据库中执行以下DDL语句,并添加部分测试数据。
CREATE TABLE `tmp_test_weather` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`siteid` varchar(64) NOT NULL COMMENT '业务主键',
`month` varchar(64) NOT NULL COMMENT '日期',
`type` varchar(64) NOT NULL COMMENT '气象类型',
`value` int(11) NOT NULL COMMENT '值',
`ext` varchar(255) DEFAULT NULL COMMENT '扩展数据',
PRIMARY KEY (`id`)
) ;
运行代码:
//庸才周久一
public class JdbcReader {
@Bean
public RowMapper<WeatherEntity> weatherEntityRowMapper() {
return new RowMapper<WeatherEntity>() {
public static final String SITEID_COLUMN = "siteId"; // 设置映射字段
public static final String MONTH_COLUMN = "month";
public static final String TYPE_COLUMN = "type";
public static final String VALUE_COLUMN = "value";
public static final String EXT_COLUMN = "ext";
@Override
// 数据转换
public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
WeatherEntity weatherEntity = new WeatherEntity();
weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
return weatherEntity;
}
};
}
@Bean
public ItemReader<WeatherEntity> jdbcCursorItemReader(
@Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(datasource); //设置DataSource
//设置读取的SQL
itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER");
itemReader.setRowMapper(rowMapper); //设置转换
return itemReader;
}
}
2.2.HibernateCursorItemReader
在Java体系中数据库操作常见的规范有JPA或ORM,Spring Batch提供了HibernateCursorItemReader来实现HibernateTemplate,它可以通过Hibernate框架进行游标的控制。
需要注意的是:使用Hibernate框架来处理批量数据到目前为止一直都有争议,核心原因是Hibernate最初是为在线联机事物型系统开发的。不过这并不意味着不能使用它来处理批数据,解决此问题就是让Hibernate使用StatelessSession用来保持游标,而不是standard session一次读写,这将导致Hibernate的缓存机制和数据脏读检查失效,进而影响批处理的过程。关于Hibernate的状态控制机制请阅读官方文档。
HibernateCursorItemReader使用过程与JdbcCursorItemReader没多大差异都是逐条读取数据然后控制状态链接关闭。只不过他提供了Hibernate所使用的HSQL方案。
@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
itemReader.setName("hibernateCursorItemReader");
itemReader.setQueryString("from WeatherEntity tmp_test_weather");
itemReader.setSessionFactory(sessionFactory);
return itemReader;
}
或
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
如果没有特别的需要,不推荐使用Hibernate。
2.3.StoredProcedureItemReader
存储过程是在同一个数据库中处理大量数据的常用方法。StoredProcedureItemReader
的执行过程和JdbcCursorItemReader
一致,但是底层逻辑是先执行存储过程,然后返回存储过程执行结果游标。不同的数据库存储过程游标返回会有一些差异:
- 作为一个
ResultSet
返回。(SQL Server, Sybase, DB2, Derby以及MySQL) - 参数返回一个
ref-cursor
实例。比如Oracle、PostgreSQL数据库,这类数据库存储过程是不会直接return任何内容的,需要从传参获取。 - 返回存储过程调用后的返回值。
针对以上3个类型,配置上有一些差异:
//庸才周久一
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_processor_weather");
reader.setRowMapper(new weatherEntityRowMapper());
reader.setRefCursorPosition(1);//第二种类型需要指定ref-cursor的参数位置
reader.setFunction(true);//第三种类型需要明确的告知reader通过返回获取
return reader;
}
使用存储过程处理数据的好处是可以实现针对库内的数据进行合并、分割、排序等处理。如果数据在同一个数据库,性能也明显好于通过Java处理。
3.1.JdbcPagingItemReader
分页查询的默认实现类是JdbcPagingItemReader,它的核心功能是用分页器PagingQueryProvider进行分页控制。由于不同的数据库分页方法差别很大,所以针对不同的数据库有不同的实现类。框架提供了SqlPagingQueryProviderFactoryBean用于检查当前数据库并自动注入对应的PagingQueryProvider。
JdbcPagingItemReader会从数据库中一次性读取一整页的数据,但是调用Reader的时候还是会一行一行的返回数据。框架会自行根据运行情况确定什么时候需要执行下一个分页的查询。
//庸才周久一
public class PageReader {
final private boolean wrapperBuilder = false;
@Bean
//设置 queryProvider
public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("select id, siteid, month, type, value, ext");
provider.setFromClause("from tmp_test_weather");
provider.setWhereClause("where id>:start");
provider.setSortKey("id");
return provider;
}
@Bean
public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
PagingQueryProvider queryProvider,
RowMapper<WeatherEntity> rowMapper) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("start", "1");
JdbcPagingItemReader<WeatherEntity> itemReader;
if (wrapperBuilder) {
itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(rowMapper)
.pageSize(1000)
.build();
} else {
itemReader = new JdbcPagingItemReader<>();
itemReader.setName("weatherEntityJdbcPagingItemReader");
itemReader.setDataSource(dataSource);
itemReader.setQueryProvider(queryProvider);
itemReader.setParameterValues(parameterValues);
itemReader.setRowMapper(rowMapper);
itemReader.setPageSize(1000);
}
return itemReader;
}
}
4.数据写入
Spring Batch为不同类型的文件的写入提供了多个实现类,但并没有为数据库的写入提供任何实现类,而是交由开发者自己去实现接口。理由是:
数据库的写入与文件写入有巨大的差别。对于一个Step而言,在写入一份文件时需要保持对文件的打开状态从而能够高效的向队尾添加数据。如果每次都重新打开文件,从开始位置移动到队尾会耗费大量的时间(很多文件流无法在open时就知道长度)。当整个Step结束时才能关闭文件的打开状态,框架提供的文件读写类都实现了这个控制过程。
另外无论使用何种方式将数据写入文件都是"逐行进行"的(流数据写入、字符串逐行写入)。因此当数据写入与整个Step绑定为事物时还需要实现一个控制过程是:在写入数据的过程中出现异常时要擦除本次事物已经写入的数据,这样才能和整个Step的状态保持一致。框架中的类同样实现了这个过程。
但是向数据库写入数据并不需要类似于文件的尾部写入控制,因为数据库的各种链接池本身就保证了链接->写入->释放的高效执行,也不存在向队尾添加数据的问题。而且几乎所有的数据库驱动都提供了事物能力,在任何时候出现异常都会自动回退,不存在擦除数据的问题。
因此,对于数据库的写入操作只要按照常规的批量数据写入的方式即可,开发者使用任何工具都可以完成这个过程。
如下图自定义实现的写入器
/**
* itemWrite抽象类,主要处理平台是否有数据变化
* @author zhouhui
*
* @param <T>
*/
@Slf4j
public abstract class AbstractItemStreamWriter<T> implements ItemStreamWriter<T> {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
log.info("打开写入器");
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
log.info("更新写入器");
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
log.info("关闭写入器");
}
}
具体实现
@Configuration
@Slf4j
public class CreateTBCamWriterConfig{
@Bean("createTBCamWriter")
@StepScope
public ItemStreamWriter<CamData> writeDataToTbCam(@Autowired final CacheUtils cacheUtil) {
return new AbstractItemStreamWriter<CamData>() {
@Override
public void write(List<? extends CamData> items) throws Exception {
System.out.println("do something");
}
};
}
}
这里值得注意的是,如果读取器异常,写入器并不会执行
5.组合使用案例
下面是一些组合使用过程,简单实现了文件到数据库、数据库到文件的过程。文件读写的过程已经在文件读写中介绍过,这里会重复使用之前介绍的文件读写的功能。
下面的案例是将data.csv
中的数据写入到数据库,然后再将数据写入到out-data.csv
。案例组合使用已有的item
完成任务:flatFileReader
、jdbcBatchWriter
、jdbcCursorItemReader
、simpleProcessor
、flatFileWriter
。这种Reader
、Processor
、Writer
组合的方式也是完成一个批处理工程的常见开发方式。
案例的运行代码在org.chenkui.spring.batch.sample.database.complex
包中,使用了2个Step
来完成任务,一个将数据读取到数据库,一个将数据进行过滤,然后再写入到文件:
//庸才周久一
public class FileComplexProcessConfig {
@Bean
// 配置Step1
public Step file2DatabaseStep(StepBuilderFactory builder,
@Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
@Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
return builder.get("file2DatabaseStep") // 创建
.<WeatherEntity, WeatherEntity>chunk(50) // 分片
.reader(reader) // 读取
.writer(writer) // 写入
.faultTolerant() // 开启容错处理
.skipLimit(20) // 跳过设置
.skip(Exception.class) // 跳过异常
.build();
}
@Bean
// 配置Step2
public Step database2FileStep(StepBuilderFactory builder,
@Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
@Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
@Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
return builder.get("database2FileStep") // 创建
.<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
.reader(reader) // 读取
.processor(processor) //
.writer(writer) // 写入
.faultTolerant() // 开启容错处理
.skipLimit(20) // 跳过设置
.skip(Exception.class) // 跳过异常
.build();
}
@Bean
public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
@Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
return builder.get("File2Database").start(step2Database).next(step2File).build();
}
}
网友评论