package com.yiwugou.solr.common.client;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryForever;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.request.IsUpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.yiwugou.isearch.api.entity.solr.ProductDoc;
/**
*
* 只用于一个分片的solrcloud
*
* @author zhanxiaoyong@yiwugou.com
*
* @since 2019年3月19日 下午1:18:04
*/
public class SingleShardCloudSolrClient extends SolrClient {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger("SingleShardCloudSolrClient");
private final String zkHost;
private final String chroot;
private final String collection;
private final Integer timeout;
private SolrClient leaderClient;
private Map<String, SolrClient> URL_CLIENT = new ConcurrentHashMap<>();
private LBHttpSolrClient lbClient;
private CuratorFramework client;
// private Lock lock = new ReentrantLock();
private Object lock = new Object();
public SingleShardCloudSolrClient(String zkHost, String chroot, String collection, Integer timeout) {
this.zkHost = zkHost;
this.chroot = chroot;
this.collection = collection;
this.timeout = timeout;
this.init();
}
private void init() {
try {
client = CuratorFrameworkFactory.newClient(zkHost, new RetryForever(10000));
client.start();
String statePath = "/" + chroot + "/collections/" + collection + "/state.json";
String stateValue = IOUtils.toString(client.getData().forPath(statePath), StandardCharsets.UTF_8.name());
this.initClient(stateValue);
final NodeCache cache = new NodeCache(client, statePath);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
logger.error("state.json changed");
String stateValue = IOUtils.toString(cache.getCurrentData().getData(),
StandardCharsets.UTF_8.name());
initClient(stateValue);
}
});
cache.start(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void initClient(String stateValue) {
try {
logger.error("init client " + System.lineSeparator() + stateValue);
JSONObject state = JSONObject.parseObject(stateValue);
JSONObject shards = state.getJSONObject(collection).getJSONObject("shards");
JSONObject replicas = shards.getJSONObject(shards.keySet().iterator().next()).getJSONObject("replicas");
List<String> downUrls = new ArrayList<>();
List<String> activeUrls = new ArrayList<>();
String leaderUrl = null;
for (String key : replicas.keySet()) {
JSONObject node = replicas.getJSONObject(key);
String url = node.getString("base_url") + "/" + node.getString("core");
if (URL_CLIENT.get(url) == null) {
SolrClient solrClient = new HttpSolrClient.Builder().withBaseSolrUrl(url)
.withConnectionTimeout(timeout).build();
URL_CLIENT.put(url, solrClient);
}
if ("active".equalsIgnoreCase(node.getString("state"))) {
if ("true".equalsIgnoreCase(String.valueOf(node.get("leader")))) {
leaderUrl = url;
}
activeUrls.add(url);
} else {
downUrls.add(url);
}
}
if (leaderUrl == null) {
if (activeUrls.size() == 0) {
throw new IllegalArgumentException("not find active leader " + System.lineSeparator() + state);
}
logger.error("leader not active, use follower:" + activeUrls.get(0));
leaderClient = URL_CLIENT.get(activeUrls.get(0));
} else {
logger.error("leader:" + leaderUrl);
leaderClient = URL_CLIENT.get(leaderUrl);
}
if (lbClient == null) {
Collection<String> allUrls = CollectionUtils.union(activeUrls, downUrls);
lbClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(allUrls.toArray(new String[allUrls.size()]))
.withConnectionTimeout(timeout).build();
logger.error("lb:" + allUrls);
}
logger.error("active:" + activeUrls);
logger.error("down:" + downUrls);
} finally {
synchronized (lock) {
lock.notify();
}
}
}
private NamedList<Object> updateRequest(SolrRequest request, String collection, long start)
throws SolrServerException, IOException {
try {
return leaderClient.request(request, collection);
} catch (SolrServerException | IOException | SolrException se) {
synchronized (lock) {
while (System.currentTimeMillis() - start <= timeout) {
try {
lock.wait(timeout);
} catch (InterruptedException ie) {
}
long expend = System.currentTimeMillis() - start;
if (expend > timeout) {
throw new RuntimeException("timeout=" + timeout + ",expend=" + expend, se);
}
return updateRequest(request, collection, start);
}
}
throw new RuntimeException("timeout=" + timeout + ",expend=" + (System.currentTimeMillis() - start), se);
}
}
@Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
if (request instanceof IsUpdateRequest) {
long start = System.currentTimeMillis();
return this.updateRequest(request, collection, start);
} else {
return lbClient.request(request, collection);
}
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(leaderClient);
IOUtils.closeQuietly(lbClient);
IOUtils.closeQuietly(client);
}
public static void main(String[] args) throws Exception {
SingleShardCloudSolrClient solrClient = new SingleShardCloudSolrClient("127.0.0.1:2181", "solr", "product",
10000);
for (long i = 1; i <= Long.MAX_VALUE; i++) {
SolrParams params = new SolrQuery("title:笔记本电脑" + i);
QueryResponse qr = solrClient.query(params);
ProductDoc doc = new ProductDoc();
doc.setTitle("笔记本电脑" + i);
doc.setId(i);
UpdateResponse ur = solrClient.addBean(doc);
if (i % 1000 == 0) {
System.err.println(i);
}
}
}
}
不用每次请求都去连zk 只有zk改变时,重新获得LeaderClient
update 定位leader HttpSolrClient
query leader+follower 中任意选取一台 LBHttpSolrClient
zk挂了 query正常 update报错(这错误是SolrCloud返回的,可能是zk挂了 solrcloud不准你更新数据了)
leader挂了 会马上重新选取leader query update都正常
follower挂了 leader不变 query定位到leader
网友评论