本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。
自己写的单线程读取数据,多线程导入数据的代码示例:
按照网上的推荐,bulk一次写入数据的个数一般是1000-5000,请求大小一般为5-15MB。按照bulk官网的代码是可以设置请求个数、请求大小、发送请求时间,满足其一即发送bulk请求。即bulk-processor
官网链接:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
我们这里采用是1000条请求发送一次。
具体代码示例见文档的底部。
elasticsearch写入性能优化
- 多线程插入
- 取消replias,写入完成之后在修改replias>=1
- 提高ES占用内存,ES内存最好占服务器内存的一半,同时最大和最小设置成一样(-Xms、-Xmx),避免GC.
- 减少shard刷新间隔。写入数据时可以设置为-1。
curl -XPUT http://localhost:9200/index/_settings -d '{"index" : {"refresh_interval" : "-1"}}'
bulk使用建议
- 每个请求大小建议在5-15MB,逐步增大测试,当接收到EsRejectedExecutionException,就说明已经到达节点的瓶颈了,就需要减少并发或者升级硬件增加节点
- 当写入数据时,确保bulk请求时轮询访问所有节点,不要发送所有请求到一个结点导致这一个节点要在内存存储所有请求的数据去处理
优化磁盘IO
- 使用SSD
- 使用RAID 0,不用镜像备份,用replicas保证数据正确性,增大磁盘IO
- 使用多个磁盘给Elasticsearch访问,通过在path.data中添加
- 不使用远程存储,如NFS/SMB/CIFS;延时将成为性能瓶颈
段合并
- 段合并是很消耗计算资源和磁盘IO的操作,特别是出现比较大的段合并。
- 当出现段合并的速度落后于索引写入的速度,Elasticsearch为了避免出现堆积的段数量爆发,会降低单个线程的索引写入速度,并且会在INFO的log里记录"now throttling indexing"
- Elasticsearch默认比较保守,不想让搜索的性能被后台的段合并影响,默认的段合并速率限制比较低,默认是20MB/s,但如果使用的是SSD,可以考虑把这个参数设置到100-200MB/s
PUT /_cluster/settings { "persistent" : { "indices.store.throttle.max_bytes_per_sec" : "100mb" } }
如果你只是用bulk导入数据而不关注查询性能,可以关闭合并的阈值
PUT /_cluster/settings { "transient" : { "indices.store.throttle.type" : "none" } }
然后在导入完数据之后恢复成“merge”来恢复这个阈值设置
如果是机械硬盘,你需要增加下面的配置到elasticsearch.yml中
index.merge.scheduler.max_thread_count: 1
机械硬盘的并发IO性能较差,我们需要减少每个索引并发访问磁盘的线程数,这个设置会有max_thread_count+2个线程并发访问磁盘
如果是SSD可以忽略这个参数,默认线程数是Math.min(3, Runtime.getRuntime().availableProcessors() / 2),对于SSD来说没有问题。
可以增大index.translog.flush_threshold_size参数,默认是200M,可以增大到如1GB。增大这个参数可以允许translog在flush前存放更大的段(segment);更大的段的创建会减少flush的频率,并且更大的段合并越少,会减少磁盘IO,索引性能更高。
关于段合并的内容,可以查看文集中的其他文章。
其他优化
- 如果不需要实时精确的查询结果,可以把每个索引的index.refresh_interval设置为30s,如果在导入大量的数据,可以把这个值先设置为-1,完成数据导入之后在设置回来
- 如果在用bulk导入大量的数据,可以考虑不要副本,设置index.number_of_replicas: 0。有副本存在的时候,导入数据需要同步到副本,并且副本也要完成分析,索引和段合并的操作,影响导入性能。可以不设置副本导入数据然后在恢复副本。
- 如果导入的文档没有唯一的ID,可以使用Elasticsearch自动生成的唯一ID
性能测试
- 在一个节点的一个分片,不设置副本,测试性能
- 在完全默认设置上记录性能数据,作为测试的基准线
- 确保性能测试持续30分钟以上以确认长时间的性能;短时间的测试可能不会碰到segment合并和GC,无法确认这些因素的影响
- 每次基于默认基准线更改一个参数,如果性能有提升就保留设置,并基于此设置做后续的测试
导入数据的代码示例
使用bulk的方式导入
private static final ExecutorService executor = Executors.newFixedThreadPool(5);
private static TransportClient client;
class ConsumerEsDataById implements Runnable {
private BlockingQueue<List<Articles>> queue;
public ConsumerEsDataById(BlockingQueue<List<Articles>> queue) {
this.queue = queue;
}
@Override
public void run() {
long s = System.currentTimeMillis();
String docid = null;
try {
BulkRequestBuilder bulkRequest = client.prepareBulk();
String index = "subscribe";
String type = "article";
List<Articles> list = queue.take();
log.info(queue.size());
for (Articles articles : list) {
docid = articles.getId();
bulkRequest.add(client.prepareIndex(index, type).setRouting(articles.getWemediaId()).setId(articles.getId())
.setSource(articles.toJson()));
}
bulkRequest.execute().actionGet();
} catch (InterruptedException e) {
log.error("Insert into elasticsearch error!", e);
}
log.info("Insert into elasticsearch. Docid:" + docid + ", spend" + (System.currentTimeMillis() - s) + " ms.");
}
}
@RequestMapping("/elasticsearch/data")
@ResponseBody
public String esData(@RequestParam(value = "id", required = false) final String id) throws Exception {
long start = System.currentTimeMillis();
String startId = id;
List<Articles> list;
open();
BlockingQueue<List<Articles>> queue = new LinkedBlockingQueue<List<Articles>>(10);
while (true) {
long ss = System.currentTimeMillis();
list = articlePublishDao.getArticles(startId);
log.info("get data" + "spend " + (System.currentTimeMillis() - ss) + "ms.");
if (list.size() <= 0) {
break;
}
startId = list.get(list.size() - 1).getId();
queue.put(list);
log.info("get data" + "spend " + (System.currentTimeMillis() - ss) + "--------id---" + startId + "queue.put(list);---"
+ queue.size());
cute(new ConsumerEsDataById(queue));
}
log.info("get data time all use----------" + (System.currentTimeMillis() - start));
return "success";
}
使用bulkprocessor导入
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;
import org.apache.shiro.util.CollectionUtils;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@Controller("dataControllerForElasticsearch")
@RequestMapping("/statRaw")
public class InsertElasticsearchController {
private static final Logger log = Logger
.getLogger(InsertElasticsearchController.class);
@Autowired
private MopArticleDao mopArticleDao;
@Autowired
private StatRawIndexNumberJob statRawIndexNumberJob;
@Autowired
private StatRawStarIntegralJob statRawStarIntegralJob;
static {
open();
}
private static final ExecutorService executor = Executors.newFixedThreadPool(5);
private static TransportClient client;
class ConsumerEsDataById implements Runnable {
private BlockingQueue<List<MopArticle>> queue;
public ConsumerEsDataById(BlockingQueue<List<MopArticle>> queue) {
this.queue = queue;
}
@Override
public void run() {
long s = System.currentTimeMillis();
String docid = null;
try {
String index = "subscribe_moparticle";
String type = "moparticle";
List<MopArticle> list = queue.take();
for (MopArticle mopArticle : list) {
docid = mopArticle.getDocid();
if("".equals(docid)||docid==null){
continue;
}
if(docid == null || docid.length()!=16){
log.error("docid is error,docid:"+docid);
}
if(mopArticle.getTopicid().length()!=8){
continue;
}
if(!mopArticle.getTopicid().equalsIgnoreCase(docid.substring((docid.length()-8),docid.length()))){
continue;
}
IndexRequest add = new IndexRequest(index, type,
mopArticle.getDocid());
add.routing(mopArticle.getTopicid())
.source(mopArticle.toJsonForElasticsearch());
bulkProcessor.add(add);
}
} catch (InterruptedException e) {
log.error("Insert into elasticsearch error!", e);
}
log.info("Insert into elasticsearch. Docid:" + docid + ", spend" + (System.currentTimeMillis() - s) + " ms.");
}
}
BulkProcessor bulkProcessor = BulkProcessor
.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
}
}).setBulkActions(5000)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(5)
.setBackoffPolicy(BackoffPolicy
.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
@RequestMapping("/elasticsearch/bulkdata")
@ResponseBody
public String esData(@RequestParam(value = "id", required = false) final String id, String tableName) throws Exception {
long start = System.currentTimeMillis();
String startId = id;
List<MopArticle> mopArticleList;
open();
BlockingQueue<List<MopArticle>> queue = new LinkedBlockingQueue<List<MopArticle>>(10);
while (true) {
long ss = System.currentTimeMillis();
mopArticleList = mopArticleDao.getMopArticles(startId, tableName);
log.info("leng th:" + mopArticleList.size());
log.info("get data" + "spend " + (System.currentTimeMillis() - ss) + "ms.");
if (mopArticleList.size() < 1) {
break;
}
startId = mopArticleList.get(mopArticleList.size() - 1).getDocid();
queue.put(mopArticleList);
log.info("get data" + "spend " + (System.currentTimeMillis() - ss) + "--------id---" + startId +"tablename"+tableName+ "queue.put(list);---"+ queue.size());
executor.execute(new ConsumerEsDataById(queue));
}
log.info("get data time all use----------" + (System.currentTimeMillis() - start));
return "success";
}
@RequestMapping("/elasticsearch/testbulkProcessor")
@ResponseBody
public String importMoparticleData() throws Exception {
List<String> tableNameList = new ArrayList<String>();
int tableNum = 511;
while (tableNum <= 530) {
String tableName = "cindex_0" + Integer.toString(tableNum);
tableNameList.add(tableName);
tableNum++;
log.info("tableN ame" + tableName);
}
for (String tableName : tableNameList) {
MopArticle mopArticle = mopArticleDao.selectFirstIdFromMopArticle(tableName);
if (mopArticle == null || "".equals(mopArticle)) {
log.warn("table is null.tableName:" + tableName);
continue;
}
String docId = mopArticle.getDocid();
esData(docId, tableName);
}
return null;
}
/**
* 建立连接
*/
private static void open() {
// 设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "subscribe-online-elasticsearch")
// .put("cluster.name", "newsclient_nc_cluster")
.build();
try {
// 创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("bjzw-subscribe-elk2.server.163.org"), 9300));
// .addTransportAddress(new InetSocketTransportAddress(
// InetAddress.getByName("127.0.0.1"), 19300));
} catch (UnknownHostException e) {
log.error("Create es client error!", e);
}
}
}
网友评论