[core] changes to enable folding YCSB-TS back into YCSB (#1095)

* [core] Drop JDK 7 Support

Sets compiler target to 1.8
Additionally removes openjdk7 from travis build-matrix
Fixes #1033

Additionally includes lexicographic reordering of binding-dependency properties in root pom

* [core] introduce baseclass for timeseries database bindings

This base-class parses the queries generated by TimeSeriesWorkload into a format
that's usable for simple consumption.
The format has been defined through the work done by Andreas Bader (@baderas) in
https://github.com/TSDBBench/YCSB-TS.

* [core] parse debug and test properties for all Timeseries databases

* [core] Add method to define possible tagKeys for Timeseries Workloads

Some TimeSeries Databases require a "schema" up front.
To generate said schema, the tag keys used by the workload are required.
This method should be centrally available for maintainability

* [core] Provide default implementation for  operation

TimeSeries databases only rarely support deleting data. As such we can generally assume
the operation to always return a Status.NOT_IMPLEMENTED. Similarly to updating that trivial
implementation does not need to be repeated in every TimeSeriesDatabase implementation

* [core] Address review

This adds a copyright header and significant amounts of javadoc for TimeseriesDB
Furthermore occurrences of Long have been largely replaced with the primitive long
Finally an overload for signed integer values has been added for insertions
This commit is contained in:
Clemens Lieb 2018-03-20 00:37:13 +01:00 коммит произвёл Chris Larsen
Родитель ca25820e06
Коммит 2218649a0f
4 изменённых файлов: 366 добавлений и 27 удалений

Просмотреть файл

@ -22,7 +22,6 @@ language: java
jdk:
- oraclejdk9
- oraclejdk8
- openjdk7
addons:
hosts:

Просмотреть файл

@ -72,6 +72,8 @@ LICENSE file.
<module name="JavadocType">
<property name="scope" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<!-- unfortunately we cannot add implNote, implSpec, apiNote and apiSpec to checkstyle -->
<property name="allowUnknownTags" value="true"/>
</module>
<module name="JavadocStyle"/>

Просмотреть файл

@ -0,0 +1,336 @@
/*
* Copyright (c) 2018 YCSB Contributors All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import com.yahoo.ycsb.generator.Generator;
import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator;
import com.yahoo.ycsb.workloads.TimeSeriesWorkload;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Abstract class to adapt the default ycsb DB interface to Timeseries databases.
* This class is mostly here to be extended by Timeseries dataabases
* originally developed by Andreas Bader in <a href="https://github.com/TSDBBench/YCSB-TS">YCSB-TS</a>.
* <p>
* This class is mostly parsing the workload information passed through the default ycsb interface
* according to the information outlined in {@link TimeSeriesWorkload}.
* It also contains some minor utility methods relevant to Timeseries databases.
* </p>
*
* @implSpec It's vital to call <tt>super.init()</tt> when overwriting the init method
* to correctly initialize the workload-parsing.
*/
public abstract class TimeseriesDB extends DB {
// defaults for downsampling. Basically we ignore it
private static final String DOWNSAMPLING_FUNCTION_PROPERTY_DEFAULT = "NONE";
private static final String DOWNSAMPLING_INTERVAL_PROPERTY_DEFAULT = "0";
// debug property loading
private static final String DEBUG_PROPERTY = "debug";
private static final String DEBUG_PROPERTY_DEFAULT = "false";
// test property loading
private static final String TEST_PROPERTY = "test";
private static final String TEST_PROPERTY_DEFAULT = "false";
// Workload parameters that we need to parse this
protected String timestampKey;
protected String valueKey;
protected String tagPairDelimiter;
protected String queryTimeSpanDelimiter;
protected String deleteDelimiter;
protected TimeUnit timestampUnit;
protected String groupByKey;
protected String downsamplingKey;
protected Integer downsamplingInterval;
protected AggregationOperation downsamplingFunction;
// YCSB-parameters
protected boolean debug;
protected boolean test;
/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
@Override
public void init() throws DBException {
// taken from BasicTSDB
timestampKey = getProperties().getProperty(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY,
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT);
valueKey = getProperties().getProperty(
TimeSeriesWorkload.VALUE_KEY_PROPERTY,
TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
tagPairDelimiter = getProperties().getProperty(
TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY,
TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT);
queryTimeSpanDelimiter = getProperties().getProperty(
TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY,
TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
deleteDelimiter = getProperties().getProperty(
TimeSeriesWorkload.DELETE_DELIMITER_PROPERTY,
TimeSeriesWorkload.DELETE_DELIMITER_PROPERTY_DEFAULT);
timestampUnit = TimeUnit.valueOf(getProperties().getProperty(
TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY,
TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY_DEFAULT));
groupByKey = getProperties().getProperty(
TimeSeriesWorkload.GROUPBY_KEY_PROPERTY,
TimeSeriesWorkload.GROUPBY_KEY_PROPERTY_DEFAULT);
downsamplingKey = getProperties().getProperty(
TimeSeriesWorkload.DOWNSAMPLING_KEY_PROPERTY,
TimeSeriesWorkload.DOWNSAMPLING_KEY_PROPERTY_DEFAULT);
downsamplingFunction = TimeseriesDB.AggregationOperation.valueOf(getProperties()
.getProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, DOWNSAMPLING_FUNCTION_PROPERTY_DEFAULT));
downsamplingInterval = Integer.valueOf(getProperties()
.getProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, DOWNSAMPLING_INTERVAL_PROPERTY_DEFAULT));
test = Boolean.parseBoolean(getProperties().getProperty(TEST_PROPERTY, TEST_PROPERTY_DEFAULT));
debug = Boolean.parseBoolean(getProperties().getProperty(DEBUG_PROPERTY, DEBUG_PROPERTY_DEFAULT));
}
@Override
public final Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
Map<String, List<String>> tagQueries = new HashMap<>();
Long timestamp = null;
for (String field : fields) {
if (field.startsWith(timestampKey)) {
String[] timestampParts = field.split(tagPairDelimiter);
if (timestampParts[1].contains(queryTimeSpanDelimiter)) {
// Since we're looking for a single datapoint, a range of timestamps makes no sense.
// As we cannot throw an exception to bail out here, we return `BAD_REQUEST` instead.
return Status.BAD_REQUEST;
}
timestamp = Long.valueOf(timestampParts[1]);
} else {
String[] queryParts = field.split(tagPairDelimiter);
tagQueries.computeIfAbsent(queryParts[0], k -> new ArrayList<>()).add(queryParts[1]);
}
}
if (timestamp == null) {
return Status.BAD_REQUEST;
}
return read(table, timestamp, tagQueries);
}
/**
* Read a record from the database. Each value from the result will be stored in a HashMap
*
* @param metric The name of the metric
* @param timestamp The timestamp of the record to read.
* @param tags actual tags that were want to receive (can be empty)
* @return Zero on success, a non-zero error code on error or "not found".
*/
protected abstract Status read(String metric, long timestamp, Map<String, List<String>> tags);
/**
* @inheritDoc
* @implNote this method parses the information passed to it and subsequently passes it to the modified
* interface at {@link #scan(String, long, long, Map, AggregationOperation, int, TimeUnit)}
*/
@Override
public final Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
Map<String, List<String>> tagQueries = new HashMap<>();
TimeseriesDB.AggregationOperation aggregationOperation = TimeseriesDB.AggregationOperation.NONE;
Set<String> groupByFields = new HashSet<>();
boolean rangeSet = false;
long start = 0;
long end = 0;
for (String field : fields) {
if (field.startsWith(timestampKey)) {
String[] timestampParts = field.split(tagPairDelimiter);
if (!timestampParts[1].contains(queryTimeSpanDelimiter)) {
// seems like this should be a more elaborate query.
// for now we don't support scanning single timestamps
// TODO: Support Timestamp range queries
return Status.NOT_IMPLEMENTED;
}
String[] rangeParts = timestampParts[1].split(queryTimeSpanDelimiter);
rangeSet = true;
start = Long.valueOf(rangeParts[0]);
end = Long.valueOf(rangeParts[1]);
} else if (field.startsWith(groupByKey)) {
String groupBySpecifier = field.split(tagPairDelimiter)[1];
aggregationOperation = TimeseriesDB.AggregationOperation.valueOf(groupBySpecifier);
} else if (field.startsWith(downsamplingKey)) {
String downsamplingSpec = field.split(tagPairDelimiter)[1];
// apparently that needs to always hold true:
if (!downsamplingSpec.equals(downsamplingFunction.toString() + downsamplingInterval.toString())) {
System.err.print("Downsampling specification for Scan did not match configured downsampling");
return Status.BAD_REQUEST;
}
} else {
String[] queryParts = field.split(tagPairDelimiter);
if (queryParts.length == 1) {
// we should probably warn about this being ignored...
System.err.println("Grouping by arbitrary series is currently not supported");
groupByFields.add(field);
} else {
tagQueries.computeIfAbsent(queryParts[0], k -> new ArrayList<>()).add(queryParts[1]);
}
}
}
if (!rangeSet) {
return Status.BAD_REQUEST;
}
return scan(table, start, end, tagQueries, downsamplingFunction, downsamplingInterval, timestampUnit);
}
/**
* Perform a range scan for a set of records in the database. Each value from the result will be stored in a
* HashMap.
*
* @param metric The name of the metric
* @param startTs The timestamp of the first record to read.
* @param endTs The timestamp of the last record to read.
* @param tags actual tags that were want to receive (can be empty).
* @param aggreg The aggregation operation to perform.
* @param timeValue value for timeUnit for aggregation
* @param timeUnit timeUnit for aggregation
* @return A {@link Status} detailing the outcome of the scan operation.
*/
protected abstract Status scan(String metric, long startTs, long endTs, Map<String, List<String>> tags,
AggregationOperation aggreg, int timeValue, TimeUnit timeUnit);
@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
return Status.NOT_IMPLEMENTED;
// not supportable for general TSDBs
// can be explicitly overwritten in inheriting classes
}
@Override
public final Status insert(String table, String key, Map<String, ByteIterator> values) {
NumericByteIterator tsContainer = (NumericByteIterator) values.remove(timestampKey);
NumericByteIterator valueContainer = (NumericByteIterator) values.remove(valueKey);
if (valueContainer.isFloatingPoint()) {
return insert(table, tsContainer.getLong(), valueContainer.getDouble(), values);
} else {
return insert(table, tsContainer.getLong(), valueContainer.getLong(), values);
}
}
/**
* Insert a record into the database. Any tags/tagvalue pairs in the specified tagmap and the given value will be
* written into the record with the specified timestamp.
*
* @param metric The name of the metric
* @param timestamp The timestamp of the record to insert.
* @param value The actual value to insert.
* @param tags A Map of tag/tagvalue pairs to insert as tags
* @return A {@link Status} detailing the outcome of the insert
*/
protected abstract Status insert(String metric, long timestamp, long value, Map<String, ByteIterator> tags);
/**
* Insert a record in the database. Any tags/tagvalue pairs in the specified tagmap and the given value will be
* written into the record with the specified timestamp.
*
* @param metric The name of the metric
* @param timestamp The timestamp of the record to insert.
* @param value actual value to insert
* @param tags A HashMap of tag/tagvalue pairs to insert as tags
* @return A {@link Status} detailing the outcome of the insert
*/
protected abstract Status insert(String metric, long timestamp, double value, Map<String, ByteIterator> tags);
/**
* NOTE: This operation is usually <b>not</b> supported for Time-Series databases.
* Deletion of data is often instead regulated through automatic cleanup and "retention policies" or similar.
*
* @return Status.NOT_IMPLEMENTED or a {@link Status} specifying the outcome of deletion
* in case the operation is supported.
*/
public Status delete(String table, String key) {
return Status.NOT_IMPLEMENTED;
}
/**
* Examines the given {@link Properties} and returns an array containing the Tag Keys
* (basically matching column names for traditional Relational DBs) that are detailed in the workload specification.
* See {@link TimeSeriesWorkload} for how these are generated.
* <p>
* This method is intended to be called during the initialization phase to create a table schema
* for DBMS that require such a schema before values can be inserted (or queried)
*
* @param properties The properties detailing the workload configuration.
* @return An array of strings specifying all allowed TagKeys (or column names)
* except for the "value" and the "timestamp" column name.
* @implSpec WARNING this method must exactly match how tagKeys are generated by the {@link TimeSeriesWorkload},
* otherwise databases requiring this information will most likely break!
*/
protected static String[] getPossibleTagKeys(Properties properties) {
final int tagCount = Integer.parseInt(properties.getProperty(TimeSeriesWorkload.TAG_COUNT_PROPERTY,
TimeSeriesWorkload.TAG_COUNT_PROPERTY_DEFAULT));
final int tagKeylength = Integer.parseInt(properties.getProperty(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY,
TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY_DEFAULT));
Generator<String> tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeylength);
String[] tagNames = new String[tagCount];
for (int i = 0; i < tagCount; i++) {
tagNames[i] = tagKeyGenerator.nextValue();
}
return tagNames;
}
/**
* An enum containing the possible aggregation operations.
* Not all of these operations are required to be supported by implementing classes.
* <p>
* Aggregations are applied when using the <tt>SCAN</tt> operation on a range of timestamps.
* That way the result set is reduced from multiple records into
* a single one or one record for each group specified through <tt>GROUP BY</tt> clauses.
*/
public enum AggregationOperation {
/**
* No aggregation whatsoever. Return the results as a full table
*/
NONE,
/**
* Sum the values of the matching records when calculating the value.
* GroupBy criteria apply where relevant for sub-summing.
*/
SUM,
/**
* Calculate the arithmetic mean over the value across matching records when calculating the value.
* GroupBy criteria apply where relevant for group-targeted averages
*/
AVERAGE,
/**
* Count the number of matching records and return that as value.
* GroupBy criteria apply where relevant.
*/
COUNT,
/**
* Return only the maximum of the matching record values.
* GroupBy criteria apply and result in group-based maxima.
*/
MAX,
/**
* Return only the minimum of the matching record values.
* GroupBy criteria apply and result in group-based minima.
*/
MIN;
}
}

54
pom.xml
Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved.
<!--
Copyright (c) 2012 - 2017 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
@ -68,42 +68,44 @@ LICENSE file.
<properties>
<maven.assembly.version>2.5.5</maven.assembly.version>
<maven.dependency.version>2.10</maven.dependency.version>
<asynchbase.version>1.7.1</asynchbase.version>
<hbase098.version>0.98.14-hadoop2</hbase098.version>
<hbase10.version>1.0.2</hbase10.version>
<hbase12.version>1.2.5</hbase12.version>
<!-- datastore binding versions, lex sorted -->
<accumulo.1.6.version>1.6.6</accumulo.1.6.version>
<accumulo.1.7.version>1.7.3</accumulo.1.7.version>
<accumulo.1.8.version>1.8.1</accumulo.1.8.version>
<cassandra.cql.version>3.0.0</cassandra.cql.version>
<geode.version>1.2.0</geode.version>
<aerospike.version>3.1.2</aerospike.version>
<arangodb.version>2.7.3</arangodb.version>
<arangodb3.version>4.1.7</arangodb3.version>
<asynchbase.version>1.7.1</asynchbase.version>
<azuredocumentdb.version>1.8.1</azuredocumentdb.version>
<azurestorage.version>4.0.0</azurestorage.version>
<cassandra.cql.version>3.0.0</cassandra.cql.version>
<cloudspanner.version>0.24.0-beta</cloudspanner.version>
<couchbase.version>1.4.10</couchbase.version>
<couchbase2.version>2.3.1</couchbase2.version>
<elasticsearch5-version>5.5.1</elasticsearch5-version>
<geode.version>1.2.0</geode.version>
<googlebigtable.version>0.9.7</googlebigtable.version>
<hbase098.version>0.98.14-hadoop2</hbase098.version>
<hbase10.version>1.0.2</hbase10.version>
<hbase12.version>1.2.5</hbase12.version>
<hypertable.version>0.9.5.6</hypertable.version>
<infinispan.version>7.2.2.Final</infinispan.version>
<kudu.version>1.1.0</kudu.version>
<openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
<!--<mapkeeper.version>1.0</mapkeeper.version>-->
<mongodb.version>3.0.3</mongodb.version>
<mongodb.async.version>2.0.1</mongodb.async.version>
<openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
<orientdb.version>2.2.10</orientdb.version>
<redis.version>2.0.0</redis.version>
<s3.version>1.10.20</s3.version>
<voldemort.version>0.81</voldemort.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<thrift.version>0.8.0</thrift.version>
<hypertable.version>0.9.5.6</hypertable.version>
<elasticsearch5-version>5.5.1</elasticsearch5-version>
<couchbase.version>1.4.10</couchbase.version>
<couchbase2.version>2.3.1</couchbase2.version>
<tarantool.version>1.6.5</tarantool.version>
<redis.version>2.0.0</redis.version>
<riak.version>2.0.5</riak.version>
<aerospike.version>3.1.2</aerospike.version>
<s3.version>1.10.20</s3.version>
<solr.version>5.5.3</solr.version>
<solr6.version>6.4.1</solr6.version>
<arangodb.version>2.7.3</arangodb.version>
<arangodb3.version>4.1.7</arangodb3.version>
<azurestorage.version>4.0.0</azurestorage.version>
<cloudspanner.version>0.24.0-beta</cloudspanner.version>
<tarantool.version>1.6.5</tarantool.version>
<thrift.version>0.8.0</thrift.version>
<voldemort.version>0.81</voldemort.version>
</properties>
<modules>
@ -180,7 +182,7 @@ LICENSE file.
<requireMavenVersion>
<version>3.1.0</version>
</requireMavenVersion>
</rules>
</rules>
</configuration>
</execution>
</executions>
@ -190,8 +192,8 @@ LICENSE file.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>