allow user to plugin a customized TestEnv implementation

1. Get rid of SetupCommand.java and move its logic to TestEnv.
2. Add getRpcClientFactory method in TestEnv.
3. Use setters & getters in TestEnv.
4. define vtgate.test.env and vtgate.rpcclient.factory properties in vtgate-client/pom.xml.
This commit is contained in:
Shengzhe Yao 2015-07-14 14:16:50 -07:00
Родитель 6ed59261ff
Коммит 2c42db70cc
15 изменённых файлов: 189 добавлений и 148 удалений

Просмотреть файл

@ -183,10 +183,6 @@ integration_test: small_integration_test medium_integration_test large_integrati
site_integration_test:
$(call run_integration_tests, $(site_integration_test_files))
# this rule only works if bootstrap.sh was successfully ran in ./java
java_test:
cd java && mvn verify
java_vtgate_client_test:
mvn -f java/vtgate-client/pom.xml clean verify

Просмотреть файл

@ -69,6 +69,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
@ -88,6 +89,10 @@
<version>2.13</version>
<configuration>
<argLine>${failsafeArgLine}</argLine>
<systemPropertyVariables>
<vtgate.test.env>com.youtube.vitess.vtgate.TestEnv</vtgate.test.env>
<vtgate.rpcclient.factory>com.youtube.vitess.vtgate.rpcclient.gorpc.BsonRpcClientFactory</vtgate.rpcclient.factory>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
@ -101,6 +106,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>

Просмотреть файл

@ -1,9 +1,8 @@
package com.youtube.vitess.vtgate.integration.util;
package com.youtube.vitess.vtgate;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.rpcclient.BsonRpcClientFactory;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import org.apache.commons.lang.StringUtils;
@ -16,17 +15,44 @@ import java.util.Map;
* Helper class to hold the configurations for VtGate setup used in integration tests
*/
public class TestEnv {
public Map<String, List<String>> shardKidMap;
public Map<String, Integer> tablets;
public String keyspace;
public Process pythonScriptProcess;
public int port;
public List<KeyspaceId> kids;
public static final String PROPERTY_KEY_RPCCLIENT_FACTORY_CLASS = "vtgate.rpcclient.factory";
private Map<String, List<String>> shardKidMap;
private Map<String, Integer> tablets = new HashMap<>();
private String keyspace;
private Process pythonScriptProcess;
private int port;
private List<KeyspaceId> kids;
public TestEnv(Map<String, List<String>> shardKidMap, String keyspace_name) {
public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}
public String getKeyspace() {
return this.keyspace;
}
public int getPort() {
return this.port;
}
public void setPort(int port) {
this.port = port;
}
public Process getPythonScriptProcess() {
return this.pythonScriptProcess;
}
public void setPythonScriptProcess(Process process) {
this.pythonScriptProcess = process;
}
public void setShardKidMap(Map<String, List<String>> shardKidMap) {
this.shardKidMap = shardKidMap;
this.keyspace = keyspace_name;
this.tablets = new HashMap<String, Integer>();
}
public Map<String, List<String>> getShardKidMap() {
return this.shardKidMap;
}
public void addTablet(String type, int count) {
@ -93,7 +119,13 @@ public class TestEnv {
return command;
}
public Class<? extends RpcClientFactory> getRpcClientFactory() {
return BsonRpcClientFactory.class;
public RpcClientFactory getRpcClientFactory() {
String rpcClientFactoryClass = System.getProperty(PROPERTY_KEY_RPCCLIENT_FACTORY_CLASS);
try {
Class<?> clazz = Class.forName(rpcClientFactoryClass);
return (RpcClientFactory)clazz.newInstance();
} catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) {
throw new RuntimeException(e);
}
}
}

Просмотреть файл

@ -10,7 +10,6 @@ import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -18,7 +17,7 @@ import java.util.Random;
/**
* A single threaded VtGate client
*
* Usage:
* <p>Usage:
*
* <pre>
*VtGate vtGate = VtGate.connect(addresses);
@ -50,16 +49,11 @@ public class VtGate {
* @param timeoutMs connection timeout in milliseconds, 0 for no timeout
* @throws ConnectionException
*/
public static VtGate connect(String addresses, int timeoutMs, Class<? extends RpcClientFactory> rpcFactoryClass) throws ConnectionException {
public static VtGate connect(String addresses, int timeoutMs, RpcClientFactory rpcFactory) throws ConnectionException {
List<String> addressList = Arrays.asList(addresses.split(","));
int index = new Random().nextInt(addressList.size());
try {
RpcClientFactory rpcFactory = rpcFactoryClass.newInstance();
RpcClient client = rpcFactory.create(addressList.get(index), timeoutMs);
return new VtGate(client);
} catch (InstantiationException|IllegalAccessException e) {
throw new RuntimeException(e);
}
RpcClient client = rpcFactory.create(addressList.get(index), timeoutMs);
return new VtGate(client);
}
private VtGate(RpcClient client) {

Просмотреть файл

@ -13,7 +13,7 @@ public class VitessConf {
public static final String INPUT_QUERY = "vitess.vtgate.hadoop.input_query";
public static final String SPLITS = "vitess.vtgate.hadoop.splits";
public static final String SPLIT_COLUMN = "vitess.vtgate.hadoop.splitcolumn";
public static final String RPC_FACTORY_CLASS = "vitess.rpc.factory.class";
public static final String RPC_FACTORY_CLASS = "vtgate.rpcclient.factory";
public static final String HOSTS_DELIM = ",";
private Configuration conf;

Просмотреть файл

@ -32,13 +32,14 @@ public class VitessInputFormat extends InputFormat<NullWritable, RowWritable> {
try {
VitessConf conf = new VitessConf(context.getConfiguration());
Class<? extends RpcClientFactory> rpcFactoryClass = null;
VtGate vtgate;
try {
rpcFactoryClass = (Class<? extends RpcClientFactory>)Class.forName(conf.getRpcFactoryClass());
} catch (ClassNotFoundException e) {
vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass.newInstance());
} catch (ClassNotFoundException|InstantiationException|IllegalAccessException e) {
throw new RuntimeException(e);
}
VtGate vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass);
Map<Query, Long> queries =
vtgate.splitQuery(conf.getKeyspace(), conf.getInputQuery(), conf.getSplits(), conf.getSplitColumn());
List<InputSplit> splits = new LinkedList<>();

Просмотреть файл

@ -33,8 +33,8 @@ public class VitessRecordReader extends RecordReader<NullWritable, RowWritable>
conf = new VitessConf(context.getConfiguration());
try {
Class<? extends RpcClientFactory> rpcFactoryClass = (Class<? extends RpcClientFactory>)Class.forName(conf.getRpcFactoryClass());
vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass);
} catch (ConnectionException|ClassNotFoundException e) {
vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass.newInstance());
} catch (ConnectionException|ClassNotFoundException|InstantiationException|IllegalAccessException e) {
throw new RuntimeException(e);
}
}

Просмотреть файл

@ -1,14 +1,15 @@
package com.youtube.vitess.vtgate.rpcclient;
package com.youtube.vitess.vtgate.rpcclient.gorpc;
import com.google.common.net.HostAndPort;
import com.youtube.vitess.gorpc.Client;
import com.youtube.vitess.gorpc.Exceptions.GoRpcException;
import com.youtube.vitess.gorpc.codecs.bson.BsonClientCodecFactory;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.rpcclient.gorpc.GoRpcClient;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
public class BsonRpcClientFactory implements RpcClientFactory {
@Override
public RpcClient create(String address, int timeoutMs) throws ConnectionException {
try {
HostAndPort hostAndPort = HostAndPort.fromString(address);

Просмотреть файл

@ -10,7 +10,7 @@ import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.joda.time.DateTime;
@ -48,14 +48,14 @@ public class DataTypesIT {
public void testInts() throws Exception {
String createTable =
"create table vtocc_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, year year, primary key(tiny)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql =
"insert into vtocc_ints values(:tiny, :tinyu, :small, :smallu, :medium, :mediumu, :normal, :normalu, :big, :bigu, :year)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("tiny", -128))
.addBindVar(BindVariable.forInt("tinyu", 255))
.addBindVar(BindVariable.forInt("small", -32768))
@ -75,7 +75,7 @@ public class DataTypesIT {
String selectSql = "select * from vtocc_ints where tiny = -128";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
@ -97,13 +97,13 @@ public class DataTypesIT {
public void testFracts() throws Exception {
String createTable =
"create table vtocc_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql = "insert into vtocc_fracts values(:id, :deci, :num, :f, :d)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("id", 1))
.addBindVar(BindVariable.forDouble("deci", 1.99))
.addBindVar(BindVariable.forDouble("num", 2.99))
@ -117,7 +117,7 @@ public class DataTypesIT {
String selectSql = "select * from vtocc_fracts where id = 1";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
@ -133,14 +133,14 @@ public class DataTypesIT {
public void testStrings() throws Exception {
String createTable =
"create table vtocc_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql =
"insert into vtocc_strings values(:vb, :c, :vc, :b, :tb, :bl, :ttx, :tx, :en, :s)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forBytes("vb", "a".getBytes()))
.addBindVar(BindVariable.forBytes("c", "b".getBytes()))
.addBindVar(BindVariable.forBytes("vc", "c".getBytes()))
@ -159,7 +159,7 @@ public class DataTypesIT {
String selectSql = "select * from vtocc_strings where vb = 'a'";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
@ -182,13 +182,13 @@ public class DataTypesIT {
public void testMisc() throws Exception {
String createTable =
"create table vtocc_misc(id int, b bit(8), d date, dt datetime, t time, primary key(id)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql = "insert into vtocc_misc values(:id, :b, :d, :dt, :t)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("id", 1))
.addBindVar(BindVariable.forBytes("b", ByteBuffer.allocate(1).put((byte) 1).array()))
.addBindVar(BindVariable.forDate("d", DateTime.parse("2012-01-01")))
@ -203,7 +203,7 @@ public class DataTypesIT {
String selectSql = "select * from vtocc_misc where id = 1";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
@ -218,7 +218,7 @@ public class DataTypesIT {
}
private Query getQuery(String sql) {
return new QueryBuilder(sql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
return new QueryBuilder(sql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
}
/**
@ -227,7 +227,7 @@ public class DataTypesIT {
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new HashMap<>();
shardKidMap.put("-", Lists.newArrayList("527875958493693904"));
TestEnv env = new TestEnv(shardKidMap, "test_keyspace");
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("replica", 1);
return env;
}

Просмотреть файл

@ -12,7 +12,7 @@ import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.AfterClass;
@ -52,10 +52,10 @@ public class FailuresIT {
@Test
public void testIntegrityException() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String insertSql = "insert into vtgate_test(id, keyspace_id) values (:id, :keyspace_id)";
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(0);
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("1")))
.addBindVar(BindVariable.forULong("keyspace_id", ((UnsignedLong) kid.getId())))
.addKeyspaceId(kid).build();
@ -75,9 +75,9 @@ public class FailuresIT {
@Test
public void testTimeout() throws ConnectionException, DatabaseException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 200, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 200, testEnv.getRpcClientFactory());
// Check timeout error raised for slow query
Query sleepQuery = new QueryBuilder("select sleep(0.5) from dual", testEnv.keyspace, "master")
Query sleepQuery = new QueryBuilder("select sleep(0.5) from dual", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
try {
vtgate.execute(sleepQuery);
@ -85,9 +85,9 @@ public class FailuresIT {
} catch (ConnectionException e) {
}
vtgate.close();
vtgate = VtGate.connect("localhost:" + testEnv.port, 2000, testEnv.getRpcClientFactory());
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 2000, testEnv.getRpcClientFactory());
// Check no timeout error for fast query
sleepQuery = new QueryBuilder("select sleep(0.01) from dual", testEnv.keyspace, "master")
sleepQuery = new QueryBuilder("select sleep(0.01) from dual", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
vtgate.execute(sleepQuery);
vtgate.close();
@ -100,11 +100,11 @@ public class FailuresIT {
try {
// Transaction cap is 20
for (int i = 0; i < 25; i++) {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgates.add(vtgate);
vtgate.begin();
// Run a query to actually begin a transaction with the tablets
Query query = new QueryBuilder("delete from vtgate_test", testEnv.keyspace, "master")
Query query = new QueryBuilder("delete from vtgate_test", testEnv.getKeyspace(), "master")
.addKeyRange(KeyRange.ALL).build();
vtgate.execute(query);
}
@ -128,7 +128,7 @@ public class FailuresIT {
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new HashMap<>();
shardKidMap.put("-", Lists.newArrayList("527875958493693904"));
TestEnv env = new TestEnv(shardKidMap, "test_keyspace");
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("replica", 1);
return env;
}

Просмотреть файл

@ -4,7 +4,7 @@ import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.After;
@ -33,9 +33,9 @@ public class StreamingServerShutdownIT {
@Test
public void testShutdownServerWhileStreaming() throws Exception {
Util.insertRows(testEnv, 1, 2000);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select A.* from vtgate_test A join vtgate_test B";
Query joinQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master")
Query joinQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
Cursor cursor = vtgate.execute(joinQuery);
@ -51,7 +51,7 @@ public class StreamingServerShutdownIT {
vtgate.close();
Assert.fail("failed to raise exception");
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains("vtgate exception: connection exception"));
Assert.assertTrue(e.getMessage().length() > 0);
}
}
}

Просмотреть файл

@ -1,5 +1,6 @@
package com.youtube.vitess.vtgate.integration;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BindVariable;
@ -8,10 +9,10 @@ import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.StreamCursor;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.AfterClass;
@ -51,9 +52,9 @@ public class StreamingVtGateIT {
@Test
public void testStreamCursorType() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master")
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(StreamCursor.class, cursor.getClass());
@ -66,20 +67,16 @@ public class StreamingVtGateIT {
@Test
public void testStreamExecuteKeyspaceIds() throws Exception {
int rowCount = 10;
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
for (String shardName : testEnv.shardKidMap.keySet()) {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
for (String shardName : testEnv.getShardKidMap().keySet()) {
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.keyspace, "master")
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getKeyspaceIds(shardName)).setStreaming(true).build();
Cursor cursor = vtgate.execute(query);
int count = 0;
while (cursor.hasNext()) {
cursor.next();
count++;
}
int count = Iterables.size(cursor);
Assert.assertEquals((int) Math.pow(rowCount, 3), count);
}
vtgate.close();
@ -90,16 +87,16 @@ public class StreamingVtGateIT {
*/
@Test
public void testStreamExecuteKeyRanges() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
int rowCount = 10;
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
KeyRange kr = new KeyRange(Collections.min(kids), Collections.max(kids));
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(kr)
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(kr)
.setStreaming(true).build();
Cursor cursor = vtgate.execute(query);
int count = 0;
@ -118,13 +115,13 @@ public class StreamingVtGateIT {
@Test
public void testScatterStreamingQuery() throws Exception {
int rowCount = 10;
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.keyspace, "master")
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Cursor cursor = vtgate.execute(query);
int count = 0;
for (Row row : cursor) {
@ -137,13 +134,13 @@ public class StreamingVtGateIT {
@Test
@Ignore("currently failing as vtgate doesn't set the error")
public void testStreamingWrites() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String insertSql = "insert into vtgate_test " + "(id, name, age, percent, keyspace_id) "
+ "values (:id, :name, :age, :percent, :keyspace_id)";
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(0);
Query query = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query query = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + 1)))
.addBindVar(BindVariable.forString("name", ("name_" + 1)))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))

Просмотреть файл

@ -13,7 +13,7 @@ import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.CursorImpl;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.apache.commons.codec.binary.Hex;
import org.joda.time.DateTime;
@ -60,11 +60,11 @@ public class VtGateIT {
*/
@Test
public void testExecuteKeyspaceIds() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
// Ensure empty table
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master").setKeyspaceIds(
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(CursorImpl.class, cursor.getClass());
@ -76,7 +76,7 @@ public class VtGateIT {
// Insert 10 rows
Util.insertRows(testEnv, 1000, 10);
vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(10, cursor.getRowsAffected());
Assert.assertEquals(0, cursor.getLastRowId());
@ -85,7 +85,7 @@ public class VtGateIT {
// Fetch all rows from the first shard
KeyspaceId firstKid = testEnv.getAllKeyspaceIds().get(0);
Query query =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyspaceId(firstKid).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyspaceId(firstKid).build();
cursor = vtgate.execute(query);
// Check field types and values
@ -114,15 +114,15 @@ public class VtGateIT {
*/
@Test
public void testQueryRouting() throws Exception {
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, 10);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String allRowsSql = "select * from vtgate_test";
for (String shardName : testEnv.shardKidMap.keySet()) {
Query shardRows = new QueryBuilder(allRowsSql, testEnv.keyspace, "master").setKeyspaceIds(
for (String shardName : testEnv.getShardKidMap().keySet()) {
Query shardRows = new QueryBuilder(allRowsSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getKeyspaceIds(shardName)).build();
Cursor cursor = vtgate.execute(shardRows);
Assert.assertEquals(10, cursor.getRowsAffected());
@ -134,8 +134,8 @@ public class VtGateIT {
public void testDateFieldTypes() throws Exception {
DateTime dt = DateTime.now().minusDays(2).withMillisOfSecond(0);
Util.insertRows(testEnv, 10, 1, dt);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
Query allRowsQuery = new QueryBuilder("select * from vtgate_test", testEnv.keyspace, "master")
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Query allRowsQuery = new QueryBuilder("select * from vtgate_test", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
Row row = vtgate.execute(allRowsQuery).next();
Assert.assertTrue(dt.equals(row.getDateTime("timestamp_col")));
@ -155,10 +155,10 @@ public class VtGateIT {
public void testAllKeyRange() throws Exception {
// Insert 10 rows across the shards
Util.insertRows(testEnv, 1000, 10);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(allRowsQuery);
// Verify all rows returned
Assert.assertEquals(10, cursor.getRowsAffected());
@ -172,33 +172,33 @@ public class VtGateIT {
public void testKeyRangeReads() throws Exception {
int rowsPerShard = 10;
// insert rows in each shard using ExecuteKeyspaceIds
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
// Check ALL KeyRange query returns rows from both shards
Query allRangeQuery =
new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(KeyRange.ALL).build();
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(allRangeQuery);
Assert.assertEquals(rowsPerShard * 2, cursor.getRowsAffected());
// Check KeyRange query limited to a single shard returns 10 rows each
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> shardKids = testEnv.getKeyspaceIds(shardName);
KeyspaceId minKid = Collections.min(shardKids);
KeyspaceId maxKid = Collections.max(shardKids);
KeyRange shardKeyRange = new KeyRange(minKid, maxKid);
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(
shardKeyRange).build();
cursor = vtgate.execute(shardRangeQuery);
Assert.assertEquals(rowsPerShard, cursor.getRowsAffected());
}
// Now make a cross-shard KeyRange and check all rows are returned
Iterator<String> shardNameIter = testEnv.shardKidMap.keySet().iterator();
Iterator<String> shardNameIter = testEnv.getShardKidMap().keySet().iterator();
KeyspaceId kidShard1 = testEnv.getKeyspaceIds(shardNameIter.next()).get(2);
KeyspaceId kidShard2 = testEnv.getKeyspaceIds(shardNameIter.next()).get(2);
KeyRange crossShardKeyrange;
@ -207,7 +207,7 @@ public class VtGateIT {
} else {
crossShardKeyrange = new KeyRange(kidShard2, kidShard1);
}
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master").addKeyRange(
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(
crossShardKeyrange).build();
cursor = vtgate.execute(shardRangeQuery);
Assert.assertEquals(rowsPerShard * 2, cursor.getRowsAffected());
@ -220,20 +220,20 @@ public class VtGateIT {
@Test
public void testKeyRangeWrites() throws Exception {
Random random = new Random();
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String sql = "insert into vtgate_test " + "(id, name, keyspace_id, age) "
+ "values (:id, :name, :keyspace_id, :age)";
int count = 20;
// Insert 20 rows per shard
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
KeyspaceId minKid = Collections.min(kids);
KeyspaceId maxKid = Collections.max(kids);
KeyRange kr = new KeyRange(minKid, maxKid);
for (int i = 0; i < count; i++) {
KeyspaceId kid = kids.get(i % kids.size());
Query query = new QueryBuilder(sql, testEnv.keyspace, "master")
Query query = new QueryBuilder(sql, testEnv.getKeyspace(), "master")
.addBindVar(
BindVariable.forULong("id", UnsignedLong.valueOf("" + Math.abs(random.nextInt()))))
.addBindVar(BindVariable.forString("name", ("name_" + i)))
@ -248,16 +248,16 @@ public class VtGateIT {
vtgate.close();
// Check 40 rows exist in total
vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.keyspace, "master").setKeyspaceIds(
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(count * 2, cursor.getRowsAffected());
// Check 20 rows exist per shard
for (String shardName : testEnv.shardKidMap.keySet()) {
Query shardRows = new QueryBuilder(selectSql, testEnv.keyspace, "master").setKeyspaceIds(
for (String shardName : testEnv.getShardKidMap().keySet()) {
Query shardRows = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getKeyspaceIds(shardName)).build();
cursor = vtgate.execute(shardRows);
Assert.assertEquals(count, cursor.getRowsAffected());
@ -270,11 +270,11 @@ public class VtGateIT {
@Test
public void testSplitQuery() throws Exception {
// Insert 20 rows per shard
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, 20);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", 1, "");
vtgate.close();
@ -294,7 +294,7 @@ public class VtGateIT {
}
// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.shardKidMap.keySet()));
Assert.assertTrue(shardsInSplits.containsAll(testEnv.getShardKidMap().keySet()));
}
@Test
@ -309,7 +309,7 @@ public class VtGateIT {
"select id, keyspace_id from vtgate_test where id >= 19",
"select id, keyspace_id from vtgate_test where id >= 19");
Util.waitForTablet("rdonly", rowCount, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
int splitCount = 6;
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", splitCount, "");
@ -332,13 +332,13 @@ public class VtGateIT {
}
// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.shardKidMap.keySet()));
Assert.assertTrue(shardsInSplits.containsAll(testEnv.getShardKidMap().keySet()));
Assert.assertTrue(expectedSqls.size() == 0);
}
@Test
public void testSplitQueryInvalidTable() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
try {
vtgate.splitQuery("test_keyspace", "select id from invalid_table", 1, "");
Assert.fail("failed to raise connection exception");
@ -360,7 +360,7 @@ public class VtGateIT {
Lists.newArrayList("9767889778372766922", "9742070682920810358", "10296850775085416642"));
shardKidMap.put("-80",
Lists.newArrayList("527875958493693904", "626750931627689502", "345387386794260318"));
TestEnv env = new TestEnv(shardKidMap, "test_keyspace");
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("rdonly", 1);
return env;
}

Просмотреть файл

@ -10,7 +10,7 @@ import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.hadoop.VitessInputFormat;
import com.youtube.vitess.vtgate.hadoop.writables.KeyspaceIdWritable;
import com.youtube.vitess.vtgate.hadoop.writables.RowWritable;
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import com.youtube.vitess.vtgate.hadoop.utils.GsonAdapters;
@ -66,7 +66,7 @@ public class MapReduceIT extends HadoopTestCase {
public void testDumpTableToHDFS() throws Exception {
// Insert 20 rows per shard
int rowsPerShard = 20;
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
@ -77,11 +77,11 @@ public class MapReduceIT extends HadoopTestCase {
job.setJarByClass(VitessInputFormat.class);
job.setMapperClass(TableMapper.class);
VitessInputFormat.setInput(job,
"localhost:" + testEnv.port,
testEnv.keyspace,
"localhost:" + testEnv.getPort(),
testEnv.getKeyspace(),
"select keyspace_id, name from vtgate_test",
4,
testEnv.getRpcClientFactory());
testEnv.getRpcClientFactory().getClass());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(RowWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
@ -145,7 +145,7 @@ public class MapReduceIT extends HadoopTestCase {
*/
public void testReducerAggregateRows() throws Exception {
int rowsPerShard = 20;
for (String shardName : testEnv.shardKidMap.keySet()) {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
@ -156,11 +156,11 @@ public class MapReduceIT extends HadoopTestCase {
job.setJarByClass(VitessInputFormat.class);
job.setMapperClass(TableMapper.class);
VitessInputFormat.setInput(job,
"localhost:" + testEnv.port,
testEnv.keyspace,
"localhost:" + testEnv.getPort(),
testEnv.getKeyspace(),
"select keyspace_id, name from vtgate_test",
1,
testEnv.getRpcClientFactory());
testEnv.getRpcClientFactory().getClass());
job.setMapOutputKeyClass(KeyspaceIdWritable.class);
job.setMapOutputValueClass(RowWritable.class);
@ -228,7 +228,7 @@ public class MapReduceIT extends HadoopTestCase {
Lists.newArrayList("527875958493693904", "626750931627689502", "345387386794260318"));
shardKidMap.put("80-",
Lists.newArrayList("9767889778372766922", "9742070682920810358", "10296850775085416642"));
TestEnv env = new TestEnv(shardKidMap, "test_keyspace");
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("rdonly", 1);
return env;
}

Просмотреть файл

@ -11,6 +11,7 @@ import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
@ -28,7 +29,7 @@ import java.util.Map;
public class Util {
static final Logger logger = LogManager.getLogger(Util.class.getName());
public static final String PROPERTY_KEY_VTGATE_TEST_ENV = "vtgate.test.env";
/**
* Setup MySQL, Vttablet and VtGate instances required for the tests. This uses a Python helper
* script to start and stop instances.
@ -49,8 +50,8 @@ public class Util {
try {
Type mapType = new TypeToken<Map<String, Integer>>() {}.getType();
Map<String, Integer> map = new Gson().fromJson(line, mapType);
testEnv.pythonScriptProcess = p;
testEnv.port = map.get("port");
testEnv.setPythonScriptProcess(p);
testEnv.setPort(map.get("port"));
return;
} catch (JsonSyntaxException e) {
logger.error("JsonSyntaxException parsing setup command output: " + line, e);
@ -63,14 +64,14 @@ public class Util {
* Teardown the test instances, if any.
*/
public static void teardownTestEnv(TestEnv testEnv) throws Exception {
if (testEnv.pythonScriptProcess == null) {
return;
Process process = testEnv.getPythonScriptProcess();
if (process != null) {
logger.info("sending empty line to java_vtgate_test_helper to stop test setup");
process.getOutputStream().write("\n".getBytes());
process.getOutputStream().flush();
process.waitFor();
testEnv.setPythonScriptProcess(null);
}
logger.info("sending empty line to java_vtgate_test_helper to stop test setup");
testEnv.pythonScriptProcess.getOutputStream().write("\n".getBytes());
testEnv.pythonScriptProcess.getOutputStream().flush();
testEnv.pythonScriptProcess.waitFor();
testEnv.pythonScriptProcess = null;
}
public static void insertRows(TestEnv testEnv, int startId, int count) throws ConnectionException,
@ -80,7 +81,7 @@ public class Util {
public static void insertRows(TestEnv testEnv, int startId, int count, DateTime dateTime)
throws ConnectionException, DatabaseException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String insertSql = "insert into vtgate_test "
@ -88,7 +89,7 @@ public class Util {
+ "values (:id, :name, :age, :percent, :datetime_col, :timestamp_col, :date_col, :time_col, :keyspace_id)";
for (int i = startId; i < startId + count; i++) {
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(i % testEnv.getAllKeyspaceIds().size());
Query query = new QueryBuilder(insertSql, testEnv.keyspace, "master")
Query query = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + i)))
.addBindVar(BindVariable.forBytes("name", ("name_" + i).getBytes()))
.addBindVar(BindVariable.forInt("age", i * 2))
@ -111,14 +112,14 @@ public class Util {
*/
public static void insertRowsInShard(TestEnv testEnv, String shardName, int count)
throws DatabaseException, ConnectionException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String sql = "insert into vtgate_test " + "(id, name, keyspace_id) "
+ "values (:id, :name, :keyspace_id)";
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
for (int i = 1; i <= count; i++) {
KeyspaceId kid = kids.get(i % kids.size());
Query query = new QueryBuilder(sql, testEnv.keyspace, "master")
Query query = new QueryBuilder(sql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + i)))
.addBindVar(BindVariable.forBytes("name", ("name_" + i).getBytes()))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))
@ -131,16 +132,16 @@ public class Util {
}
public static void createTable(TestEnv testEnv) throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(new QueryBuilder("drop table if exists vtgate_test", testEnv.keyspace, "master")
vtgate.execute(new QueryBuilder("drop table if exists vtgate_test", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build());
String createTable = "create table vtgate_test (id bigint auto_increment,"
+ " name varchar(64), age SMALLINT, percent DECIMAL(5,2),"
+ " keyspace_id bigint(20) unsigned NOT NULL, datetime_col DATETIME,"
+ " timestamp_col TIMESTAMP, date_col DATE, time_col TIME, primary key (id))"
+ " Engine=InnoDB";
vtgate.execute(new QueryBuilder(createTable, testEnv.keyspace, "master").setKeyspaceIds(
vtgate.execute(new QueryBuilder(createTable, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build());
vtgate.commit();
vtgate.close();
@ -154,10 +155,10 @@ public class Util {
public static void waitForTablet(String tabletType, int rowCount, int attempts, TestEnv testEnv)
throws Exception {
String sql = "select * from vtgate_test";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0, testEnv.getRpcClientFactory());
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
for (int i = 0; i < attempts; i++) {
try {
Cursor cursor = vtgate.execute(new QueryBuilder(sql, testEnv.keyspace, tabletType)
Cursor cursor = vtgate.execute(new QueryBuilder(sql, testEnv.getKeyspace(), tabletType)
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build());
if (cursor.getRowsAffected() >= rowCount) {
vtgate.close();
@ -171,4 +172,17 @@ public class Util {
vtgate.close();
throw new Exception(tabletType + " fails to catch up");
}
public static TestEnv getTestEnv(Map<String, List<String>> shardKidMap, String keyspace) {
String testEnvClass = System.getProperty(PROPERTY_KEY_VTGATE_TEST_ENV);
try {
Class<?> clazz = Class.forName(testEnvClass);
TestEnv env = (TestEnv)clazz.newInstance();
env.setKeyspace(keyspace);
env.setShardKidMap(shardKidMap);
return env;
} catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) {
throw new RuntimeException(e);
}
}
}