[hbase2] HBase server side value filtering for long SCAN operations (#1462)

This commit is contained in:
Beata Sudi 2020-12-07 04:58:21 +01:00 коммит произвёл GitHub
Родитель ff4ecbd37f
Коммит a2d512771a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 165 добавлений и 15 удалений

Просмотреть файл

@ -71,6 +71,9 @@ Following options can be configurable using `-p`.
* `clientbuffering`: Whether or not to use client side buffering and batching of write operations. This can significantly improve performance and defaults to true.
* `writebuffersize`: The maximum amount, in bytes, of data to buffer on the client side before a flush is forced. The default is 12MB. Only used when `clientbuffering` is true.
* `durability`: Whether or not writes should be appended to the WAL. Bypassing the WAL can improve throughput but data cannot be recovered in the event of a crash. The default is true.
* `hbase.usescanvaluefiltering` : If true, the HBase scan operations will be configured to apply server-side filtering on the values during Scan operations. This means that only those records will be returned from HBase, where the values (byte arrays) are greater/less/etc. than the byte array defined in the `hbase.scanfiltervalue` parameter. The type of the filtering can be set in the `hbase.scanfilteroperator` parameter. This feature is disabled by default.
* `hbase.scanfilteroperator`: specifying the server-side filter operator to use during scan operations. One of the following strings: less_or_equal, greater_or_equal, greater, less, not_equal, equal. The default value is less_or_equal. This parameter is only used, if `hbase.usescanvaluefiltering` is set to true.
* `hbase.scanfiltervalue`: specifying the server-side filter value to use during scan operations. It is defined as a hexadecimal string, will be translated into a byte array. This parameter is only used if `hbase.usescanvaluefiltering` is set to true. By default it is a 200 long string "7FFFFFF...", as the core workload is defining 100 bytes long random byte arrays as values. Using the default `hbase.scanfiltervalue` and default `hbase.scanfilteroperator` will result in the filtering of approximately half of the values.
Additional HBase settings should be provided in the `hbase-site.xml` file located in your `/HBASE-HOME-DIR/conf` directory. Typically this will be `/etc/hbase/conf`.

Просмотреть файл

@ -15,6 +15,11 @@
package site.ycsb.db.hbase2;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.ValueFilter;
import site.ycsb.ByteArrayByteIterator;
import site.ycsb.ByteIterator;
import site.ycsb.DBException;
@ -61,9 +66,9 @@ import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
*/
public class HBaseClient2 extends site.ycsb.DB {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
private Configuration config = HBaseConfiguration.create();
private boolean debug = false;
private String tableName = "";
@ -101,6 +106,17 @@ public class HBaseClient2 extends site.ycsb.DB {
private boolean clientSideBuffering = false;
private long writeBufferSize = 1024 * 1024 * 12;
/**
* If true, we will configure server-side value filtering during scans.
*/
private boolean useScanValueFiltering = false;
private CompareOperator scanFilterOperator;
private static final String DEFAULT_SCAN_FILTER_OPERATOR = "less_or_equal";
private ByteArrayComparable scanFilterValue;
private static final String DEFAULT_SCAN_FILTER_VALUE = // 200 hexadecimal chars translated into 100 bytes
"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" +
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF";
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
@ -126,8 +142,8 @@ public class HBaseClient2 extends site.ycsb.DB {
UserGroupInformation.setConfiguration(config);
}
if ((getProperties().getProperty("principal")!=null)
&& (getProperties().getProperty("keytab")!=null)) {
if ((getProperties().getProperty("principal") != null)
&& (getProperties().getProperty("keytab") != null)) {
try {
UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"),
getProperties().getProperty("keytab"));
@ -165,9 +181,17 @@ public class HBaseClient2 extends site.ycsb.DB {
debug = true;
}
if ("false"
.equals(getProperties().getProperty("hbase.usepagefilter", "true"))) {
usePageFilter = false;
usePageFilter = isBooleanParamSet("hbase.usepagefilter", usePageFilter);
if (isBooleanParamSet("hbase.usescanvaluefiltering", false)) {
useScanValueFiltering=true;
String operator = getProperties().getProperty("hbase.scanfilteroperator");
operator = operator == null || operator.trim().isEmpty() ? DEFAULT_SCAN_FILTER_OPERATOR : operator;
scanFilterOperator = CompareOperator.valueOf(operator.toUpperCase());
String filterValue = getProperties().getProperty("hbase.scanfiltervalue");
filterValue = filterValue == null || filterValue.trim().isEmpty() ? DEFAULT_SCAN_FILTER_VALUE : filterValue;
scanFilterValue = new BinaryComparator(Bytes.fromHex(filterValue));
}
columnFamily = getProperties().getProperty("columnfamily");
@ -331,9 +355,11 @@ public class HBaseClient2 extends site.ycsb.DB {
// HBase has no record limit. Here, assume recordcount is small enough to
// bring back in one call.
// We get back recordcount records
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
s.setCaching(recordcount);
if (this.usePageFilter) {
s.setFilter(new PageFilter(recordcount));
filterList.addFilter(new PageFilter(recordcount));
}
// add specified fields or else all fields
@ -345,6 +371,13 @@ public class HBaseClient2 extends site.ycsb.DB {
}
}
// define value filter if needed
if (useScanValueFiltering){
filterList.addFilter(new ValueFilter(scanFilterOperator, scanFilterValue));
}
s.setFilter(filterList);
// get results
ResultScanner scanner = null;
try {
@ -523,6 +556,11 @@ public class HBaseClient2 extends site.ycsb.DB {
void setConfiguration(final Configuration newConfig) {
this.config = newConfig;
}
private boolean isBooleanParamSet(String param, boolean defaultValue){
return Boolean.parseBoolean(getProperties().getProperty(param, Boolean.toString(defaultValue)));
}
}
/*

Просмотреть файл

@ -15,6 +15,7 @@
package site.ycsb.db.hbase2;
import static org.junit.Assert.assertArrayEquals;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY;
import static site.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
import static org.junit.Assert.assertEquals;
@ -39,13 +40,13 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
@ -70,7 +71,6 @@ public class HBaseClient2Test {
/**
* Creates a mini-cluster for use in these tests.
*
* This is a heavy-weight operation, so invoked only once for the test class.
*/
@BeforeClass
@ -93,16 +93,19 @@ public class HBaseClient2Test {
}
/**
* Sets up the mini-cluster for testing.
*
* We re-create the table for each test.
* Re-create the table for each test. Using default properties.
*/
@Before
public void setUp() throws Exception {
setUp(new Properties());
}
/**
* Re-create the table for each test. Using custom properties.
*/
public void setUp(Properties p) throws Exception {
client = new HBaseClient2();
client.setConfiguration(new Configuration(testingUtil.getConfiguration()));
Properties p = new Properties();
p.setProperty("columnfamily", COLUMN_FAMILY);
Measurements.setProperties(p);
@ -124,6 +127,7 @@ public class HBaseClient2Test {
@Test
public void testRead() throws Exception {
setUp();
final String rowKey = "row1";
final Put p = new Put(Bytes.toBytes(rowKey));
p.addColumn(Bytes.toBytes(COLUMN_FAMILY),
@ -142,6 +146,7 @@ public class HBaseClient2Test {
@Test
public void testReadMissingRow() throws Exception {
setUp();
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
final Status status = client.read(tableName, "Missing row", null, result);
assertEquals(Status.NOT_FOUND, status);
@ -150,6 +155,7 @@ public class HBaseClient2Test {
@Test
public void testScan() throws Exception {
setUp();
// Fill with data
final String colStr = "row_number";
final byte[] col = Bytes.toBytes(colStr);
@ -183,8 +189,48 @@ public class HBaseClient2Test {
}
}
@Test
public void testScanWithValueFilteringUsingDefaultProperties() throws Exception {
testScanWithValueFiltering(null, null, 100, new byte[][] {
Bytes.fromHex("0000"), Bytes.fromHex("1111"), Bytes.fromHex("2222"), Bytes.fromHex("3333"),
Bytes.fromHex("4444"), Bytes.fromHex("5555"), Bytes.fromHex("6666"), Bytes.fromHex("7777"),
});
}
@Test
public void testScanWithValueFilteringOperationLessOrEqual() throws Exception {
testScanWithValueFiltering("less_or_equal", "3333", 100, new byte[][] {
Bytes.fromHex("0000"), Bytes.fromHex("1111"), Bytes.fromHex("2222"), Bytes.fromHex("3333"),
});
}
@Test
public void testScanWithValueFilteringOperationEqual() throws Exception {
testScanWithValueFiltering("equal", "AAAA", 100, new byte[][]{
Bytes.fromHex("AAAA")
});
}
@Test
public void testScanWithValueFilteringOperationNotEqual() throws Exception {
testScanWithValueFiltering("not_equal", "AAAA", 100 , new byte[][]{
Bytes.fromHex("0000"), Bytes.fromHex("1111"), Bytes.fromHex("2222"), Bytes.fromHex("3333"),
Bytes.fromHex("4444"), Bytes.fromHex("5555"), Bytes.fromHex("6666"), Bytes.fromHex("7777"),
Bytes.fromHex("8888"), Bytes.fromHex("9999"), Bytes.fromHex("BBBB"),
Bytes.fromHex("CCCC"), Bytes.fromHex("DDDD"), Bytes.fromHex("EEEE"), Bytes.fromHex("FFFF")
});
}
@Test
public void testScanWithValueFilteringAndRowLimit() throws Exception {
testScanWithValueFiltering("greater", "8887", 3, new byte[][] {
Bytes.fromHex("8888"), Bytes.fromHex("9999"), Bytes.fromHex("AAAA")
});
}
@Test
public void testUpdate() throws Exception{
setUp();
final String key = "key";
final HashMap<String, String> input = new HashMap<String, String>();
input.put("column1", "value1");
@ -209,5 +255,68 @@ public class HBaseClient2Test {
public void testDelete() {
fail("Not yet implemented");
}
private void testScanWithValueFiltering(String operation, String filterValue, int scanRowLimit,
byte[][] expectedValuesReturned) throws Exception {
Properties properties = new Properties();
properties.setProperty("hbase.usescanvaluefiltering", String.valueOf(true));
if(operation != null) {
properties.setProperty("hbase.scanfilteroperator", operation);
}
if(filterValue != null) {
properties.setProperty("hbase.scanfiltervalue", filterValue);
}
// setup the client and fill two columns with data
setUp(properties);
setupTableColumnWithHexValues("col_1");
setupTableColumnWithHexValues("col_2");
Vector<HashMap<String, ByteIterator>> result = new Vector<>();
// first scan the whole table (both columns)
client.scan(tableName, "00000", scanRowLimit, null, result);
assertEquals(expectedValuesReturned.length, result.size());
for(int i = 0; i < expectedValuesReturned.length; i++) {
final HashMap<String, ByteIterator> row = result.get(i);
assertEquals(2, row.size());
assertTrue(row.containsKey("col_1") && row.containsKey("col_2"));
assertArrayEquals(expectedValuesReturned[i], row.get("col_1").toArray());
assertArrayEquals(expectedValuesReturned[i], row.get("col_2").toArray());
}
// now scan only a single column (the filter should work here too)
result = new Vector<>();
client.scan(tableName, "00000", scanRowLimit, Collections.singleton("col_1"), result);
assertEquals(expectedValuesReturned.length, result.size());
for(int i = 0; i < expectedValuesReturned.length; i++) {
final HashMap<String, ByteIterator> row = result.get(i);
assertEquals(1, row.size());
assertTrue(row.containsKey("col_1"));
assertArrayEquals(expectedValuesReturned[i], row.get("col_1").toArray());
}
}
private void setupTableColumnWithHexValues(String colStr) throws Exception {
final byte[] col = Bytes.toBytes(colStr);
final byte[][] values = {
Bytes.fromHex("0000"), Bytes.fromHex("1111"), Bytes.fromHex("2222"), Bytes.fromHex("3333"),
Bytes.fromHex("4444"), Bytes.fromHex("5555"), Bytes.fromHex("6666"), Bytes.fromHex("7777"),
Bytes.fromHex("8888"), Bytes.fromHex("9999"), Bytes.fromHex("AAAA"), Bytes.fromHex("BBBB"),
Bytes.fromHex("CCCC"), Bytes.fromHex("DDDD"), Bytes.fromHex("EEEE"), Bytes.fromHex("FFFF")
};
final List<Put> puts = new ArrayList<>(16);
for(int i = 0; i < 16; i++) {
final byte[] key = Bytes.toBytes(String.format("%05d", i));
final byte[] value = values[i];
final Put p = new Put(key);
p.addColumn(Bytes.toBytes(COLUMN_FAMILY), col, value);
puts.add(p);
}
table.put(puts);
}
}