зеркало из https://github.com/Azure/YCSB.git
Adding TransportCLient support for connecting to remote elasticsearch cluster
This commit is contained in:
Родитель
7b564cc3ff
Коммит
d3ed4281be
|
@ -14,7 +14,10 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.*;
|
||||
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
||||
import static org.elasticsearch.index.query.FilterBuilders.*;
|
||||
|
@ -37,10 +40,13 @@ public class ElasticSearchClient extends DB {
|
|||
|
||||
public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
|
||||
public static final String DEFAULT_INDEX_KEY = "es.ycsb";
|
||||
public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
|
||||
private Node node;
|
||||
private Client client;
|
||||
private String indexKey;
|
||||
|
||||
private Boolean remoteMode;
|
||||
|
||||
/**
|
||||
* Initialize any state for this DB. Called once per DB instance; there is
|
||||
* one DB instance per client thread.
|
||||
|
@ -51,6 +57,8 @@ public class ElasticSearchClient extends DB {
|
|||
Properties props = getProperties();
|
||||
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
|
||||
String clusterName = props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
|
||||
//Check if transport client needs to be used (To connect to multiple elasticsearch nodes)
|
||||
remoteMode = Boolean.parseBoolean(props.getProperty("elasticsearch.remote", "false"));
|
||||
Boolean newdb = Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
|
||||
Builder settings = settingsBuilder()
|
||||
.put("node.local", "true")
|
||||
|
@ -68,10 +76,28 @@ public class ElasticSearchClient extends DB {
|
|||
settings.put(props);
|
||||
System.out.println("ElasticSearch starting node = " + settings.get("cluster.name"));
|
||||
System.out.println("ElasticSearch node data path = " + settings.get("path.data"));
|
||||
System.out.println("ElasticSearch Remote Mode = " +remoteMode);
|
||||
//Remote mode support for connecting to remote elasticsearch cluster
|
||||
if(remoteMode) {
|
||||
settings.put("client.transport.sniff", true)
|
||||
.put("client.transport.ignore_cluster_name", false)
|
||||
.put("client.transport.ping_timeout", "30s")
|
||||
.put("client.transport.nodes_sampler_interval", "30s");
|
||||
//Default it to localhost:9300
|
||||
String nodeList[] = props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST).split(",");
|
||||
System.out.println("ElasticSearch Remote Hosts = " +props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
|
||||
TransportClient tClient = new TransportClient(settings);
|
||||
for(String h : nodeList) {
|
||||
String node[] = h.split(":");
|
||||
tClient.addTransportAddress(new InetSocketTransportAddress(node[0], Integer.parseInt(node[1])));
|
||||
}
|
||||
client = tClient;
|
||||
} else { //Start node only if transport client mode is disabled
|
||||
node = nodeBuilder().clusterName(clusterName).settings(settings).node();
|
||||
node.start();
|
||||
client = node.client();
|
||||
}
|
||||
|
||||
node = nodeBuilder().clusterName(clusterName).settings(settings).node();
|
||||
node.start();
|
||||
client = node.client();
|
||||
|
||||
if (newdb) {
|
||||
client.admin().indices().prepareDelete(indexKey).execute().actionGet();
|
||||
|
@ -86,10 +112,14 @@ public class ElasticSearchClient extends DB {
|
|||
|
||||
@Override
|
||||
public void cleanup() throws DBException {
|
||||
if (!node.isClosed()) {
|
||||
if(!remoteMode) {
|
||||
if (!node.isClosed()) {
|
||||
client.close();
|
||||
node.stop();
|
||||
node.close();
|
||||
}
|
||||
} else {
|
||||
client.close();
|
||||
node.stop();
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче