线程池:
项目中永远不要自己去new线程,然后start,一般都是用线程池。
定期发送短信业务可以用核心线程可以被回收的线程池,因为并不是时刻都在发短信。
其他业务如果处理比较频繁,那就用固定线程数或者核心线程数固定,设置最大线程数的线程池。
一个项目中,每个线程池最好专一做一个业务,你不用担心线程池多了会不会影响性能。数据库驱动也用到了线程池,redis驱动也用到了线程池,spring很多组件都用到了线程池,所以一个项目中会存在很多线程池,一般一个java进程里有两三百的活跃线程,都是因为项目中引用的其他库中用到的线程池,其实你自己用到的只是冰山一角。
以String url = "http://127.0.0.1:10002/api/authEnroll/page?current=1&size=1";
作为测试URL,通过JSOUP抓取改URL的数据。这个URL是考试系统的一个GET接口,dev开发环境下可以不登陆通过浏览器访问获取数据。
项目地址:E:\IDEAProject\sjcj_pc_server
测试一
TaskCallable
TaskCallable实现Callable接口,重写call()方法,call()方法里实现了查询每一页数据并返回每页数据的功能
public class TaskCallable implements Callable<List<Map<String, Object>> {
TaskService taskService = new TaskService();
private String search;//查询条件 根据条件来定义该类的属性
private int bindex;//当前页数
private int num;//每页查询多少条
private List page;//每次分页查出来的数据
//private JSONArray enterpriseNetData;//每次分页查出来的数据
public TaskCallable(int bindex, int num) throws IOException, SessionTimeoutException {
this.bindex = bindex;
this.num = num;
}
@Override
public List<Map<String, Object>> call() throws Exception {
//分页查询数据库数据
List<Map<String, Object>> enterpriseNetData = taskService.enterpriseNetData(bindex, num);
System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
return enterpriseNetData;
}
}
TaskService
maxBodySize太大的话多线程时会导致oom。如果一个请求就要申请9M,那么100多个请求,算下来都1个多G了,会导致OOM
public class TaskService {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskService.class);
/**
* GET请求
*/
public List<Map<String, Object>> enterpriseNetData(int current,int size) throws IOException, SessionTimeoutException {
String url = "http://127.0.0.1:10002/api/authEnroll/page?current=" + current + "&size=" + size;
Connection connection = Jsoup.connect(url)
.ignoreContentType(true)
.method(Connection.Method.GET)
.header("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
.followRedirects(false)
.maxBodySize(10000000) //body大小 1024*1024=1M, 太大的话多线程时会导致oom。如果一个请求就要申请9M,那么100多个请求,算下来都1个多G了,会导致OOM
.timeout(1000 * 100);
String body = connection.execute().body();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//json字符串转为Map对象
Map jsonObject = objectMapper.readValue(result, Map.class);
logger.debug("[enterpriseData]" + jsonObject.get("data").getClass().toString());
logger.debug("[enterpriseData]" + jsonObject.get("data").toString());
List<Map<String, Object>> list = (JSONArray) ((JSONObject) jsonObject.get("data")).get("records");
return list;
}
/**
* 计算总数
*/
public Map<String,Object> getEnterpriseNetData(int current, int size) throws IOException, SessionTimeoutException {
String url = "http://127.0.0.1:10002/api/authEnroll/page?current=1&size=1";
Connection connection = Jsoup.connect(url)
.ignoreContentType(true)
.method(Connection.Method.GET)
.header("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
.followRedirects(false)
.maxBodySize(10000000) //body大小
.timeout(1000 * 100);
String body = connection.execute().body();
//json字符串转为Map对象
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Map<String,Object> jsonObject = objectMapper.readValue(result, Map.class);
logger.debug("[enterpriseData]" + jsonObject.get("data").getClass().toString());
logger.debug("[enterpriseData]" + jsonObject.get("data").toString());
return jsonObject;
}
}
ThreadTest
测试类
@RunWith(SpringRunner.class)
public class ThreadTest {
@Test
public void test1() throws ExecutionException, InterruptedException, IOException, SessionTimeoutException {
TaskService taskService = new TaskService();
Map<String,Object> enterpriseNetData = taskService.getEnterpriseNetData(1, 1);
Integer total = (Integer) enterpriseNetData.get("data").get("total");
List<Map<String,Object>> records = (List<Map<String,Object>>) ( enterpriseNetData.get("data")).get("records");
System.out.println(total);
getMaxResult(total);
}
public List<Map<String,Object>> getMaxResult(int total) throws InterruptedException, ExecutionException, IOException,
SessionTimeoutException {
long start = System.currentTimeMillis();//开始时间
//查询数据库总数量
int count = total;
int size = 1000;//一次查询多少条
//需要查询的次数
int times = count / size;
if (count % size != 0) {
times = times + 1;
}
//开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
int current = 1;
//Callable用于产生结果
List<Callable<JSONArray>> tasks = new ArrayList<>();
for (int i = 0; i < times; i++) {
Callable<List<Map<String,Object>>> callable = new TaskCallable(current, size);
tasks.add(callable);
current++;
}
//定义固定长度的线程池 防止线程过多
ExecutorService executorService = Executors.newFixedThreadPool(15);
//Future用于获取结果
List<Future<List<Map<String,Object>>>> futures = executorService.invokeAll(tasks);
List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
//处理线程返回结果
if (futures != null && futures.size() > 0) {
for (Future<JSONArray> future : futures) {
list.addAll(future.get());
}
}
executorService.shutdown();//关闭线程池
while (true) {
// 判断线程池中任务是否全部执行完毕 若执行完毕 再返回 list
if (executorService.isTerminated()) {
break;
}
}
long end = System.currentTimeMillis();
System.out.println("线程查询数据用时:" + (end - start) + "ms");
return list;
}
}
测试二
测试二使用匿名内部类方式new Callable出来,
TaskService同测试一
ThreadTest1
package thread;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jsptpd.yzglj.SessionTimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by HASEE on 2019/3/29 10:10
*/
@RunWith(SpringRunner.class)
public class ThreadTest1 {
@Test
public void test1() throws ExecutionException, InterruptedException, IOException, SessionTimeoutException {
TaskService taskService = new TaskService();
Map<String,Object> enterpriseNetData = taskService.getEnterpriseNetData(1, 1);
Integer total = (Integer) (enterpriseNetData.get("data")).get("total");
List<Map<String,Object>> records = (List<Map<String,Object>>) ( enterpriseNetData.get("data")).get("records");
System.out.println(total);
getMaxResult(total);
}
public List<Map<String,Object>> getMaxResult(int total) throws InterruptedException, ExecutionException, IOException,
SessionTimeoutException {
long start = System.currentTimeMillis();//开始时间
//查询数据库总数量
int count = total;
int size = 1000;//一次查询多少条
//需要查询的次数
int times = count / size;
if (count % size != 0) {
times = times + 1;
}
//开始页数 连接的是orcle的数据库 封装的分页方式 我的是从1开始
int current = 1;
//Callable用于产生结果
List<Callable<List<Map<String,Object>>>> tasks = new ArrayList<>();
//定义固定长度的线程池 防止线程过多
ExecutorService pool = Executors.newFixedThreadPool(15);
TaskService taskService = new TaskService();
JSONArray jsonArray = new JSONArray();
for (int i = 0; i < times; i++) {
int finalCurrent = current;
Callable<List<Map<String,Object>>> run = new Callable<List<Map<String,Object>>>() {
@Override
public List<Map<String,Object>> call() throws Exception {
//查询每一页的数据
List<Map<String,Object>> cityNetData = taskService.enterpriseNetData(finalCurrent, size);
jsonArray.addAll(cityNetData); //JSONArray实现了List接口
System.out.println(jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
Thread.sleep(1000);
return jsonArray;
}
};
current++;
pool.submit(run);
}
pool.shutdown();//关闭线程池
while (true) {
// 判断线程池中任务是否全部执行完毕 若执行完毕 再返回 list
if (pool.isTerminated()) {
break;
}
}
long end = System.currentTimeMillis();
System.out.println("线程查询数据用时:" + (end - start) + "ms");
return jsonArray;
}
}
测试一和测试二的区别
测试一,每次查询100条数据。处理线程返回结果.png 测试二,每次查询的数据新增到集合中.png测试一自定义了一个Callable,测试二使用了匿名内部类,有点像Comparator和Compable比较器。
测试一也是在for循环里面Callable<List<Map<String,Object>>> callable = new TaskCallable(current, size);
测试二Callable<List<Map<String,Object>>> run = new Callable<List<Map<String,Object>>>() { ....... }测试一:①【将获取每页数据的任务放进线程池
List<Callable<List<Map<String,Object>>>>
中,然后pool.invokeAll(tasks)
】,②【Future用于获取结果,最后处理线程返回结果,关闭线程池】。①(在for循环之内,即需要查询的次数)②(在for循环之外处理)
测试二:获取每页数据,处理线程返回数据,将数据新增到集合中。(这一过程都是在for循环之内进行)
网友评论