注意事项
1 不要使用fastjson解析,被爆出有漏洞,使用Jackson
2 使用jsoup请求url数据时,不要设置maxBodySize过大,因为如果请求次数多的话,被导致OOM
3 Callable中请求每一页的数据时,不要设置一个成员变量去接收每页返回的结果,而是在call()方法中使用局部变量
4 在处理List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
返回结果时,直接在for循环里对每页的数据进行提交数据库操作
之前的这篇文章有错误,没有将第几页的参数current传到```jsonPage中,导致查询出来的数据都是同一页的【 java处理大数据量的优化 】
思路还是一样的,只是在多线程处理的候不一样。比如有3994条数据,每次查询500条,需要8次能查完,就好比翻页效果一样。。利用多线程意思就是,多个线程同时进行查询,每次查一页数据。所以就需要给每一页的查询都放到线程集合中
image.png for (int i = 0; i < times; i++) {
//new一个线程出来
Callable<List<Map<String, String>>> callable = new ThreadCallable(date, current, init);
//将待处理线程放入集合中
tasks.add(callable);
current++;
}
ThreadCallable实现了Callable 接口,需要重写call()方法,我们就在call()方法里进行抓包获取数据。
@Override
public List<Map<String, String>> call() throws Exception {
//分页查询数据库数据
enterpriseNetData = outletService.requestOutletData(date, bindex, num);
System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
// Thread.sleep(1000);
return enterpriseNetData;
}
处理线程返回结果
/**
* 方法一
* 新版安监系统 网点数据
*/
public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
ExecutionException, BusinessException {
//开始时间
long start = System.currentTimeMillis();
List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
int size = countyNetData.size();
int init = 1000;// 每隔100条循环一次
//开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
int current = 1;
//循环多少次
System.out.println("循环获取数据的次数:" + size);
//Callable用于产生结果
List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
//一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
for (int i = 0; i < size; i++) {
//区县
String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);
callables.add(callable);
}
//定义固定长度的线程池 防止线程过多
ExecutorService executorService = Executors.newFixedThreadPool(10);
//Future用于获取结果
List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
logger.info("Future用于获取结果:" + futures.size());
//处理线程返回结果
if (futures != null && futures.size() > 0) {
for (Future<List<Map<String, Object>>> future : futures) {
//每个区县的网点数据
List<Map<String, Object>> list = future.get();
logger.info("outlet网点数据:" + list.size());
List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
todayData(expRecordOutlets, date);
}
}
executorService.shutdown();//关闭线程池
while (true) {
// 判断线程池中任务是否全部执行完毕 若执行完毕 再返回 list
if (executorService.isTerminated()) {
break;
}
}
long end = System.currentTimeMillis();
System.out.println("线程查询数据用时:" + (end - start) + "ms");
}
java用多线程批次查询大量数据(Callable返回数据)方式
java使用多线程及分页查询数据量很大的数据
管局数据抓取
ExpRecordOutletService
/**
* 网点数据
*/
@Service
public class ExpRecordOutletService extends ServiceImpl<ExpRecordOutletMapper, ExpRecordOutlet> {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ExpRecordOutletService.class);
@Autowired
ExpRecordCountCountyService countyService;
@Transactional
public void todayData(List<ExpRecordOutlet> list, Date date) throws BusinessException {
boolean isDelete = delete(new EntityWrapper<ExpRecordOutlet>().eq("count_date", App.removeHMS(date)));
Integer maxId = 1;
Map map = selectMap(Condition.create().setSqlSelect("max(id) as maxId"));
if (map != null) {
maxId = ((Integer) map.get("maxId"));
}
if (list != null && list.size() > 0) {
for (ExpRecordOutlet brand : list) {
maxId += 1;
brand.setId(maxId);
}
if (isDelete) {
insertBatch(list);
} else {
throw new BusinessException("更新失败");
}
}
}
@Async
public void outletsTask(List<Date> dates) {
logger.info("outlets数据开始采集");
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
for (Date date : dates) {
try {
logger.info(String.format("outlets[%s]数据采集开始", format.format(date)));
getOutletsNetData(date);
// List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(outletsNetData, date);
// todayData(expRecordOutlets, date);
logger.info(String.format("outlets[%s]数据采集结束", format.format(date)));
} catch (IOException e) {
logger.error("outlets采集任务执行失败", e);
continue;
} catch (InterruptedException e) {
logger.error("outlets采集任务执行失败", e);
continue;
} catch (ExecutionException e) {
logger.error("outlets采集任务执行失败", e);
continue;
} catch (SessionTimeoutException e) {
logger.error("outlets采集任务执行失败", e);
continue;
} catch (BusinessException e) {
logger.error("outlets数据更新失败", e);
continue;
}
logger.info("outlets数据结束采集");
}
}
/**
* 一页多少条数据
*
* @param date
* @param size
* @return
* @throws IOException
* @throws SessionTimeoutException
*/
public List<Map<String, Object>> requestOutletData(Date date, int current, int size, String cityCode) throws IOException,
SessionTimeoutException {
//采集数据的URL
String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
//条件构造
Map<String, String> map = new HashMap<>();
String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
"\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
"{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
"\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"%s\"]}," +
"{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
"\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
"\"pageNo\":\"%s\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
jsonPage = String.format(jsonPage, cityCode, dayStr, dayStr, current, size);
map.put("jsonpage", jsonPage);
map.put("isAjax", "true");
String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//json字符串转为Map对象
Map cityNetData = objectMapper.readValue(result, Map.class);
Boolean sessiontimeout = (Boolean) cityNetData.get("sessiontimeout");
if (sessiontimeout == true) {
throw new SessionTimeoutException();
}
List outletNetDataArray = (List<Map<String, Object>>) ((Map) cityNetData.get("data")).get("result");
return outletNetDataArray;
}
/**
* 查询返回总数
*/
public Map getTotal(Date date, int size) throws IOException, SessionTimeoutException {
//采集数据的URL
String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
//条件构造
Map<String, String> map = new HashMap<>();
String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
"\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
"{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
"\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"320000\"]}," +
"{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
"\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
"\"pageNo\":\"1\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
jsonPage = String.format(jsonPage, dayStr, dayStr, size);
map.put("jsonpage", jsonPage);
map.put("isAjax", "true");
String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//json字符串转为Map对象
Map cityNetData = objectMapper.readValue(result, Map.class);
return cityNetData;
}
/**
* 方法一
* 新版安监系统 网点数据
*/
public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
ExecutionException, BusinessException {
long start = System.currentTimeMillis();//开始时间
List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
int size = countyNetData.size();
int init = 1000;// 每隔100条循环一次
//开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
int current = 1;
System.out.println("循环获取数据的次数:" + size);//循环多少次
//Callable用于产生结果
List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
//一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
for (int i = 0; i < size; i++) {
// for (int i = 0; i < 2; i++) {
String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE"); //区县
//new一个线程出来
Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);
callables.add(callable);
// current++;
}
//定义固定长度的线程池 防止线程过多
ExecutorService executorService = Executors.newFixedThreadPool(10);
//Future用于获取结果
List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
logger.info("Future用于获取结果:" + futures.size());
//处理线程返回结果
if (futures != null && futures.size() > 0) {
for (Future<List<Map<String, Object>>> future : futures) {
//每个区县的网点数据
List<Map<String, Object>> list = future.get();
logger.info("outlet网点数据:" + list.size());
List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
todayData(expRecordOutlets, date);
}
}
executorService.shutdown();//关闭线程池
while (true) {
// 判断线程池中任务是否全部执行完毕 若执行完毕 再返回 list
if (executorService.isTerminated()) {
break;
}
}
long end = System.currentTimeMillis();
System.out.println("线程查询数据用时:" + (end - start) + "ms");
}
// /**
// * 方法一
// * 新版安监系统 网点数据
// */
// public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
// ExecutionException, BusinessException {
// long start = System.currentTimeMillis();//开始时间
//
// List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
// int size = countyNetData.size();
// int init = 1000;// 每隔100条循环一次
// //开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
// int current = 1;
// System.out.println("循环获取数据的次数:" + size);//循环多少次
// //Callable用于产生结果
// List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
//
// //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
// for (int i = 0; i < size; i++) {
//// for (int i = 0; i < 2; i++) {
// //区县
// String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
//
// //分页查询数据库数据
// List<Map<String, Object>> enterpriseNetData = requestOutletData(date, current, size, cityCode);
// for (Map<String, Object> map : enterpriseNetData) {
// map.put("province", 320000);
// map.put("city", cityCode.substring(0, 4) + "00");
// map.put("county", cityCode);
// }
// System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
// Thread.sleep(1000);
//
// List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(enterpriseNetData, date);
// todayData(expRecordOutlets, date);
//
// }
//
// long end = System.currentTimeMillis();
// System.out.println("线程查询数据用时:" + (end - start) + "ms");
// }
//
// /**
// * 方法二
// *
// * @param date
// * @return
// * @throws IOException
// * @throws SessionTimeoutException
// * @throws InterruptedException
// */
// //新版安监系统 网点数据
// private JSONArray getOutletsNetDataa(Date date) throws IOException, SessionTimeoutException, InterruptedException {
//
// //先查询一条数据,拿到总条数和页数,是为了多线程使用
// JSONObject cityNetData0 = getTotal(date, 1);
// int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
// int init = 1000;// 每隔100条循环一次
// int cycelTotal = total / init; //循环多少次
// if (total % init != 0) {
// cycelTotal += 1;
// if (total < init) {
// init = total;
// }
// }
// System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次
// //开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
// int current = 0;
//
// ExecutorService pool = Executors.newFixedThreadPool(5);
// JSONArray jsonArray = new JSONArray();
// for (int i = 0; i < cycelTotal; i++) {
// try {
// int finalInit = init;
// current++;
// int finalCurrent = current;
// Callable<JSONArray> run = new Callable<JSONArray>() {
// @Override
// public JSONArray call() throws Exception {
// //这里就是批次查找,每次100条循环获取数据
// JSONArray cityNetData = requestOutletData(date, finalCurrent, finalInit);
//// if (cityNetData.get("sessiontimeout").equals("true")) {
//// throw new SessionTimeoutException();
//// }
//// //对JSON数据预格式化
//// JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get
//// ("result");
//// for (int j = 0; j < outletNetDataArray.size(); j++) {
//// jsonArray.add(outletNetDataArray.get(j));
//// }
// jsonArray.addAll(cityNetData); //JSONArray实现了List接口
// System.out.println(jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
// Thread.sleep(1000);
// return jsonArray;
// }
// };
//
// pool.submit(run);
// } catch (Exception e) {
// logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
// e.printStackTrace();
// }
// }
//
// //任务执行完毕后 关闭线程池 不再接收新任务
// pool.shutdown();
// while (true) {
// // 判断线程池中任务是否全部执行完毕 若执行完毕 再返回 list
// if (pool.isTerminated()) {
// break;
// }
// }
// return jsonArray;
// }
public List<ExpRecordOutlet> jsonToOutlet(List<Map<String, Object>> cityNetData, Date date) {
List<ExpRecordOutlet> result = new ArrayList<>();
for (Map<String, Object> map : cityNetData) {
String totalNum = ((String) map.get("TOTAL_NUM")).replace(",", "").trim(); //业务量
String ywzb = ((String) map.get("YWZB")).replace("%", "").trim();//业务量占比
String pql = ((String) map.get("PQL")).replace(",", "").trim(); //投递量
String pqzb = ((String) map.get("PQZB")).replace("%", "").trim(); //投递量占比
String ltzb = ((String) map.get("LTZB")).trim(); //揽投比
String totalSmNum = ((String) map.get("TOTAL_SM_NUM")).replace(",", "").trim(); //实名量
String smlv = ((String) map.get("SMLV")).replace("%", "").replace("-", "").trim(); //实名率
String totalDsNum = ((String) map.get("TOTAL_DS_NUM")).replace(",", "").trim(); //电商量
String dslv = ((String) map.get("DSLV")).replace("%", "").replace("-", "").trim(); //电商率
String smSj = ((String) map.get("SM_SJ")).replace(",", "").trim(); //散件实名量
String smSjLv = ((String) map.get("SM_SJ_LV")).replace("%", "").replace("-", "").trim(); //散件实名率
String smXy = ((String) map.get("SM_XY")).replace(",", "").trim(); //协议实名量
String dszb = ((String) map.get("DSZB")).replace("%", "").trim();
String branchName = ((String) map.get("BRANCH_NAME")).trim(); //网点名称
String brandId = ((String) map.get("BRAND_ID")).trim(); //企业名称
String stationId = ((String) map.get("STATION_ID")).trim();
String province = String.valueOf(map.get("province"));
String city = ((String) map.get("city"));
String county = ((String) map.get("county"));
ExpRecordOutlet obj = new ExpRecordOutlet();
obj.setProvince(province);
obj.setCity(city);
obj.setCounty(county);
obj.setBusinessvolume(Integer.parseInt(totalNum));
obj.setBusinessvolumeper(new BigDecimal(Double.parseDouble(ywzb) / 100));
obj.setDelivervolume(Integer.parseInt(pql));
obj.setDelivervolumeper(new BigDecimal(Double.parseDouble(pqzb) / 100));
obj.setRealvolume(Integer.parseInt(totalSmNum));
if (smlv != null && !"".equals(smlv)) {
obj.setRealrate(new BigDecimal(Double.parseDouble(smlv) / 100));
} else {
obj.setRealrate(new BigDecimal(0));
}
obj.setEcomvolume(Integer.parseInt(totalDsNum));
if (dslv != null && !"".equals(dslv)) {
obj.setEcomrate(new BigDecimal(Double.parseDouble(dslv) / 100));
} else {
obj.setRealrate(new BigDecimal(0));
}
obj.setBrandid(brandId);
obj.setOutletname(branchName);
obj.setOutletid(stationId);
if (smXy != null && !"".equals(smXy)) {
obj.setPartvolume(Integer.parseInt(smXy));
} else {
obj.setPartvolume(0);
}
if (smSj != null && !"".equals(smSj)) {
obj.setRealpartvolume(Integer.parseInt(smSj));
} else {
obj.setRealpartvolume(0);
}
if (smSjLv != null && !"".equals(smSjLv)) {
obj.setRealpartrate(new BigDecimal(Double.parseDouble(smSjLv) / 100));
} else {
obj.setRealpartrate(new BigDecimal(0));
}
obj.setCreateTime(new Date());
obj.setCountDate(App.removeHMS(date));
obj.setCreateUser("robot");
result.add(obj);
}
return result;
}
}
ExpRecordOutletCallable
public class ExpRecordOutletCallable implements Callable<List<Map<String, Object>>> {
private ExpRecordOutletService outletCopyService; //需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型
private String search;//查询条件 根据条件来定义该类的属性
private int current;//当前页数
private int size;//每页查询多少条
// private List page;//每次分页查出来的数据
//JSONArray实现了List接口
// private List<Map<String, Object>> enterpriseNetData;//每次分页查出来的数据
private Date date;
private String cityCode;
public ExpRecordOutletCallable(ExpRecordOutletService outletCopyService, Date date,
int current, int size, String cityCode) throws
IOException, SessionTimeoutException {
this.outletCopyService = outletCopyService;
this.date = date;
this.current = current;
this.size = size;
this.cityCode = cityCode;
}
@Override
public List<Map<String, Object>> call() throws Exception {
//分页查询数据库数据
List<Map<String, Object>> enterpriseNetData = outletCopyService.requestOutletData(date, current, size, cityCode);
// //单个线程查询100条数据
// enterpriseNetData = new JSONArray();
// boolean flag = true;
// while (flag) {
// JSONArray data = outletCopyService.requestOutletData(date, current, size, cityCode);
// if (data.size() != 0 && data.size() < size) { //请求返回比如95条,size是100条
// enterpriseNetData.addAll(data);
// flag = false;
// break;
// }
// if (data.size() != 0 && data.size() == size) {
// current++;
// enterpriseNetData.addAll(data);
// }
// }
for (Map<String, Object> map : enterpriseNetData) {
map.put("province", 320000);
map.put("city", cityCode.substring(0, 4) + "00");
map.put("county", cityCode);
}
System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
Thread.sleep(1000);
return enterpriseNetData;
}
}
网友评论