зеркало из https://github.com/Azure/YCSB.git
Merge pull request #684 from jasontedor/elasticsearch-improvements
[elasticsearch] Elasticsearch improvements
This commit is contained in:
Коммит
f675c05616
|
@ -43,15 +43,12 @@ For further configuration see below:
|
|||
The default setting for the Elasticsearch node that is created is as follows:
|
||||
|
||||
cluster.name=es.ycsb.cluster
|
||||
node.local=true
|
||||
path.data=$TEMP_DIR/esdata
|
||||
discovery.zen.ping.multicast.enabled=false
|
||||
index.mapping._id.indexed=true
|
||||
index.gateway.type=none
|
||||
gateway.type=none
|
||||
index.number_of_shards=1
|
||||
index.number_of_replicas=0
|
||||
es.index.key=es.ycsb
|
||||
es.number_of_shards=1
|
||||
es.number_of_replicas=0
|
||||
es.remote=false
|
||||
es.newdb=false
|
||||
es.hosts.list=localhost:9200 (only applies if es.remote=true)
|
||||
|
||||
### Custom Configuration
|
||||
If you wish to customize the settings used to create the Elasticsearch node
|
||||
|
@ -66,25 +63,17 @@ pass it into the Elasticsearch client:
|
|||
|
||||
./bin/ycsb run elasticsearch -P workloads/workloada -P myproperties.data -s
|
||||
|
||||
|
||||
If you wish to use a in-memory store type rather than the default disk store add
|
||||
the following properties to your custom properties file. For a large number of
|
||||
insert operations insure that you have sufficient memory on your test system
|
||||
otherwise you will run out of memory.
|
||||
|
||||
index.store.type=memory
|
||||
index.store.fs.memory.enabled=true
|
||||
cache.memory.small_buffer_size=4mb
|
||||
cache.memory.large_cache_size=1024mb
|
||||
|
||||
If you wish to change the default index name you can set the following property:
|
||||
|
||||
es.index.key=my_index_key
|
||||
|
||||
### Troubleshoot
|
||||
If you encounter error messages such as :
|
||||
"Primary shard is not active or isn't assigned is a known node."
|
||||
If you wish to run against a remote cluster you can set the following property:
|
||||
|
||||
Try removing /tmp/esdata/ folder.
|
||||
rm -rf /tmp/esdata
|
||||
es.remote=true
|
||||
|
||||
By default this will use localhost:9300 as a seed node to discover the cluster.
|
||||
You can also specify
|
||||
|
||||
es.hosts.list=(\w+:\d+)+
|
||||
|
||||
(a comma-separated list of host/port pairs) to change this.
|
||||
|
|
|
@ -29,7 +29,7 @@ LICENSE file.
|
|||
<name>Elasticsearch Binding</name>
|
||||
<packaging>jar</packaging>
|
||||
<properties>
|
||||
<elasticsearch-version>2.2.0</elasticsearch-version>
|
||||
<elasticsearch-version>2.3.1</elasticsearch-version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -22,7 +22,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
|
||||
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
|
||||
|
||||
|
||||
import com.yahoo.ycsb.ByteIterator;
|
||||
import com.yahoo.ycsb.DB;
|
||||
import com.yahoo.ycsb.DBException;
|
||||
|
@ -30,6 +29,7 @@ import com.yahoo.ycsb.Status;
|
|||
import com.yahoo.ycsb.StringByteIterator;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -57,19 +57,19 @@ import java.util.Vector;
|
|||
* Default properties to set:
|
||||
* </p>
|
||||
* <ul>
|
||||
* <li>es.cluster.name = es.ycsb.cluster
|
||||
* <li>es.client = true
|
||||
* <li>cluster.name = es.ycsb.cluster
|
||||
* <li>es.index.key = es.ycsb
|
||||
* <li>es.number_of_shards = 1
|
||||
* <li>es.number_of_replicas = 0
|
||||
* </ul>
|
||||
*
|
||||
* @author Sharmarke Aden
|
||||
*
|
||||
*/
|
||||
public class ElasticsearchClient extends DB {
|
||||
|
||||
public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
|
||||
public static final String DEFAULT_INDEX_KEY = "es.ycsb";
|
||||
public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
|
||||
private static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
|
||||
private static final String DEFAULT_INDEX_KEY = "es.ycsb";
|
||||
private static final String DEFAULT_REMOTE_HOST = "localhost:9300";
|
||||
private static final int NUMBER_OF_SHARDS = 1;
|
||||
private static final int NUMBER_OF_REPLICAS = 0;
|
||||
private Node node;
|
||||
private Client client;
|
||||
private String indexKey;
|
||||
|
@ -83,32 +83,26 @@ public class ElasticsearchClient extends DB {
|
|||
public void init() throws DBException {
|
||||
Properties props = getProperties();
|
||||
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
|
||||
String clusterName =
|
||||
props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
|
||||
|
||||
int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
|
||||
int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
|
||||
|
||||
// Check if transport client needs to be used (To connect to multiple
|
||||
// elasticsearch nodes)
|
||||
remoteMode = Boolean
|
||||
.parseBoolean(props.getProperty("elasticsearch.remote", "false"));
|
||||
Boolean newdb =
|
||||
Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
|
||||
remoteMode = Boolean.parseBoolean(props.getProperty("es.remote", "false"));
|
||||
Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
|
||||
Builder settings = Settings.settingsBuilder()
|
||||
.put("node.local", "true")
|
||||
.put("path.data", System.getProperty("java.io.tmpdir") + "/esdata")
|
||||
.put("discovery.zen.ping.multicast.enabled", "false")
|
||||
.put("index.mapping._id.indexed", "true")
|
||||
.put("index.gateway.type", "none")
|
||||
.put("index.number_of_shards", "1")
|
||||
.put("index.number_of_replicas", "0")
|
||||
.put("cluster.name", DEFAULT_CLUSTER_NAME)
|
||||
.put("node.local", Boolean.toString(!remoteMode))
|
||||
.put("path.home", System.getProperty("java.io.tmpdir"));
|
||||
|
||||
// if properties file contains elasticsearch user defined properties
|
||||
// add it to the settings file (will overwrite the defaults).
|
||||
settings.put(props);
|
||||
System.out.println(
|
||||
"Elasticsearch starting node = " + settings.get("cluster.name"));
|
||||
System.out
|
||||
.println("Elasticsearch node data path = " + settings.get("path.data"));
|
||||
System.out.println("Elasticsearch Remote Mode = " + remoteMode);
|
||||
final String clusterName = settings.get("cluster.name");
|
||||
System.err.println("Elasticsearch starting node = " + clusterName);
|
||||
System.err.println("Elasticsearch node path.home = " + settings.get("path.home"));
|
||||
System.err.println("Elasticsearch Remote Mode = " + remoteMode);
|
||||
// Remote mode support for connecting to remote elasticsearch cluster
|
||||
if (remoteMode) {
|
||||
settings.put("client.transport.sniff", true)
|
||||
|
@ -116,13 +110,9 @@ public class ElasticsearchClient extends DB {
|
|||
.put("client.transport.ping_timeout", "30s")
|
||||
.put("client.transport.nodes_sampler_interval", "30s");
|
||||
// Default it to localhost:9300
|
||||
String[] nodeList =
|
||||
props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST)
|
||||
.split(",");
|
||||
System.out.println("Elasticsearch Remote Hosts = "
|
||||
+ props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
|
||||
TransportClient tClient = TransportClient.builder()
|
||||
.settings(settings).build();
|
||||
String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
|
||||
System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
|
||||
TransportClient tClient = TransportClient.builder().settings(settings).build();
|
||||
for (String h : nodeList) {
|
||||
String[] nodes = h.split(":");
|
||||
try {
|
||||
|
@ -143,21 +133,29 @@ public class ElasticsearchClient extends DB {
|
|||
client = node.client();
|
||||
}
|
||||
|
||||
//wait for shards to be ready
|
||||
client.admin().cluster()
|
||||
.health(new ClusterHealthRequest("lists").waitForActiveShards(1))
|
||||
.actionGet();
|
||||
if (newdb) {
|
||||
final boolean exists =
|
||||
client.admin().indices()
|
||||
.exists(Requests.indicesExistsRequest(indexKey)).actionGet()
|
||||
.isExists();
|
||||
if (exists && newdb) {
|
||||
client.admin().indices().prepareDelete(indexKey).execute().actionGet();
|
||||
client.admin().indices().prepareCreate(indexKey).execute().actionGet();
|
||||
} else {
|
||||
boolean exists = client.admin().indices()
|
||||
.exists(Requests.indicesExistsRequest(indexKey)).actionGet()
|
||||
.isExists();
|
||||
if (!exists) {
|
||||
client.admin().indices().prepareCreate(indexKey).execute().actionGet();
|
||||
}
|
||||
}
|
||||
if (!exists || newdb) {
|
||||
client.admin().indices().create(
|
||||
new CreateIndexRequest(indexKey)
|
||||
.settings(
|
||||
Settings.builder()
|
||||
.put("index.number_of_shards", numberOfShards)
|
||||
.put("index.number_of_replicas", numberOfReplicas)
|
||||
.put("index.mapping._id.indexed", true)
|
||||
)).actionGet();
|
||||
client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
|
||||
}
|
||||
}
|
||||
|
||||
private int parseIntegerProperty(Properties properties, String key, int defaultValue) {
|
||||
String value = properties.getProperty(key);
|
||||
return value == null ? defaultValue : Integer.parseInt(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -192,15 +190,13 @@ public class ElasticsearchClient extends DB {
|
|||
try {
|
||||
final XContentBuilder doc = jsonBuilder().startObject();
|
||||
|
||||
for (Entry<String, String> entry : StringByteIterator.getStringMap(values)
|
||||
.entrySet()) {
|
||||
for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
|
||||
doc.field(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
doc.endObject();
|
||||
|
||||
client.prepareIndex(indexKey, table, key).setSource(doc).execute()
|
||||
.actionGet();
|
||||
client.prepareIndex(indexKey, table, key).setSource(doc).execute().actionGet();
|
||||
|
||||
return Status.OK;
|
||||
} catch (Exception e) {
|
||||
|
@ -248,8 +244,7 @@ public class ElasticsearchClient extends DB {
|
|||
public Status read(String table, String key, Set<String> fields,
|
||||
HashMap<String, ByteIterator> result) {
|
||||
try {
|
||||
final GetResponse response =
|
||||
client.prepareGet(indexKey, table, key).execute().actionGet();
|
||||
final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
|
||||
|
||||
if (response.isExists()) {
|
||||
if (fields != null) {
|
||||
|
@ -289,17 +284,14 @@ public class ElasticsearchClient extends DB {
|
|||
public Status update(String table, String key,
|
||||
HashMap<String, ByteIterator> values) {
|
||||
try {
|
||||
final GetResponse response =
|
||||
client.prepareGet(indexKey, table, key).execute().actionGet();
|
||||
final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
|
||||
|
||||
if (response.isExists()) {
|
||||
for (Entry<String, String> entry : StringByteIterator
|
||||
.getStringMap(values).entrySet()) {
|
||||
for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
|
||||
response.getSource().put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
client.prepareIndex(indexKey, table, key)
|
||||
.setSource(response.getSource()).execute().actionGet();
|
||||
client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet();
|
||||
|
||||
return Status.OK;
|
||||
}
|
||||
|
@ -343,11 +335,10 @@ public class ElasticsearchClient extends DB {
|
|||
HashMap<String, ByteIterator> entry;
|
||||
|
||||
for (SearchHit hit : response.getHits()) {
|
||||
entry = new HashMap<String, ByteIterator>(fields.size());
|
||||
entry = new HashMap<>(fields.size());
|
||||
|
||||
for (String field : fields) {
|
||||
entry.put(field,
|
||||
new StringByteIterator((String) hit.getSource().get(field)));
|
||||
entry.put(field, new StringByteIterator((String) hit.getSource().get(field)));
|
||||
}
|
||||
|
||||
result.add(entry);
|
||||
|
|
|
@ -38,10 +38,6 @@ import java.util.HashMap;
|
|||
import java.util.Set;
|
||||
import java.util.Vector;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author saden
|
||||
*/
|
||||
public class ElasticsearchClientTest {
|
||||
|
||||
protected final static ElasticsearchClient instance = new ElasticsearchClient();
|
||||
|
|
Загрузка…
Ссылка в новой задаче