This commit is contained in:
Ammar Aijazi 2015-10-15 17:39:26 -07:00
Родитель 21e934a6a2
Коммит c03a1a2eed
50 изменённых файлов: 0 добавлений и 4550 удалений

Просмотреть файл

@ -15,7 +15,6 @@
<module>gorpc</module>
<module>grpc-client</module>
<module>hadoop</module>
<module>vtgate-client</module>
</modules>
<organization>

Просмотреть файл

@ -1,21 +0,0 @@
VtGate Java Driver
==================
Add the following repository and dependency to your pom.xml to use this client library.
```
<repositories>
<repository>
<id>youtube-snapshots</id>
<url>https://github.com/youtube/mvn-repo/raw/master/snapshots</url>
</repository>
</repositories>
```
```
<dependency>
<groupId>com.youtube.vitess</groupId>
<artifactId>vtgate-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
```

Просмотреть файл

@ -1,150 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.youtube.vitess</groupId>
<artifactId>vitess-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>vtgate-client</artifactId>
<dependencies>
<dependency>
<groupId>com.youtube.vitess</groupId>
<artifactId>gorpc-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.5.1</version>
<!-- Include the test jar to reuse Hadoop testing utils -->
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<pluginRepositories>
<pluginRepository>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
</pluginRepository>
<pluginRepository>
<id>protoc-plugin</id>
<url>https://dl.bintray.com/sergei-ivanov/maven/</url>
</pluginRepository>
</pluginRepositories>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.2.3.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<argLine>${surefireArgLine}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.13</version>
<configuration>
<argLine>${failsafeArgLine}</argLine>
<systemPropertyVariables>
<vtgate.test.env>com.youtube.vitess.vtgate.TestEnv</vtgate.test.env>
<vtgate.rpcclient.factory>com.youtube.vitess.vtgate.rpcclient.gorpc.BsonRpcClientFactory</vtgate.rpcclient.factory>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.google.protobuf.tools</groupId>
<artifactId>maven-protoc-plugin</artifactId>
<version>0.4.2</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.java.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>../../proto</protoSourceRoot>
<includes>
<include>query.proto</include>
<include>topodata.proto</include>
<include>vtgate.proto</include>
<include>vtrpc.proto</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -1,90 +0,0 @@
package com.youtube.vitess.vtgate;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class BatchQuery {
private List<String> sqls;
private List<List<BindVariable>> bindVarsList;
private String keyspace;
private String tabletType;
private List<byte[]> keyspaceIds;
private Object session;
private BatchQuery(String keyspace, String tabletType) {
this.keyspace = keyspace;
this.tabletType = tabletType;
this.sqls = new LinkedList<>();
this.bindVarsList = new LinkedList<>();
}
public List<String> getSqls() {
return sqls;
}
public List<List<BindVariable>> getBindVarsList() {
return bindVarsList;
}
public String getTabletType() {
return tabletType;
}
public String getKeyspace() {
return keyspace;
}
public List<byte[]> getKeyspaceIds() {
return keyspaceIds;
}
public Object getSession() {
return session;
}
public void setSession(Object session) {
this.session = session;
}
public static class BatchQueryBuilder {
private BatchQuery query;
public BatchQueryBuilder(String keyspace, String tabletType) {
query = new BatchQuery(keyspace, tabletType);
}
public BatchQuery build() {
if (query.sqls.size() == 0) {
throw new IllegalStateException("query must have at least one sql");
}
if (query.keyspaceIds == null) {
throw new IllegalStateException("query must have keyspaceIds set");
}
return query;
}
public BatchQueryBuilder addSqlAndBindVars(String sql, List<BindVariable> bindVars) {
query.sqls.add(sql);
query.bindVarsList.add(bindVars);
return this;
}
public BatchQueryBuilder withKeyspaceIds(List<KeyspaceId> keyspaceIds) {
List<byte[]> kidsBytes = new ArrayList<>();
for (KeyspaceId kid : keyspaceIds) {
kidsBytes.add(kid.getBytes());
}
query.keyspaceIds = kidsBytes;
return this;
}
public BatchQueryBuilder withAddedKeyspaceId(KeyspaceId keyspaceId) {
if (query.getKeyspaceIds() == null) {
query.keyspaceIds = new ArrayList<byte[]>();
}
query.getKeyspaceIds().add(keyspaceId.getBytes());
return this;
}
}
}

Просмотреть файл

@ -1,27 +0,0 @@
package com.youtube.vitess.vtgate;
import java.util.List;
public class BatchQueryResponse {
private List<QueryResult> results;
private Object session;
private String error;
public BatchQueryResponse(List<QueryResult> results, Object session, String error) {
this.results = results;
this.session = session;
this.error = error;
}
public List<QueryResult> getResults() {
return results;
}
public Object getSession() {
return session;
}
public String getError() {
return error;
}
}

Просмотреть файл

@ -1,121 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.primitives.UnsignedLong;
import org.apache.commons.lang.CharEncoding;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.ISODateTimeFormat;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
public class BindVariable {
private String name;
private Type type;
private long longVal;
private UnsignedLong uLongVal;
private double doubleVal;
private byte[] byteArrayVal;
private BindVariable(String name, Type type) {
this.name = name;
this.type = type;
}
public static BindVariable forNull(String name) {
return new BindVariable(name, Type.NULL);
}
public static BindVariable forInt(String name, Integer value) {
return forLong(name, value.longValue());
}
public static BindVariable forLong(String name, Long value) {
BindVariable bv = new BindVariable(name, Type.LONG);
bv.longVal = value;
return bv;
}
public static BindVariable forFloat(String name, Float value) {
return forDouble(name, value.doubleValue());
}
public static BindVariable forDouble(String name, Double value) {
BindVariable bv = new BindVariable(name, Type.DOUBLE);
bv.doubleVal = value;
return bv;
}
public static BindVariable forULong(String name, UnsignedLong value) {
BindVariable bv = new BindVariable(name, Type.UNSIGNED_LONG);
bv.uLongVal = value;
return bv;
}
public static BindVariable forDateTime(String name, DateTime value) {
String dateTimeStr =
value.toString(ISODateTimeFormat.dateHourMinuteSecondMillis()).replace('T', ' ');
return forString(name, dateTimeStr);
}
public static BindVariable forDate(String name, DateTime value) {
String date = value.toString(ISODateTimeFormat.date());
return forString(name, date);
}
public static BindVariable forTime(String name, DateTime value) {
String time = value.toString(DateTimeFormat.forPattern("HH:mm:ss"));
return forString(name, time);
}
public static BindVariable forString(String name, String value) {
try {
return forBytes(name, value.getBytes(CharEncoding.ISO_8859_1));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public static BindVariable forBytes(String name, byte[] value) {
BindVariable bv = new BindVariable(name, Type.BYTE_ARRAY);
bv.byteArrayVal = value;
return bv;
}
public static BindVariable forShort(String name, Short value) {
return forLong(name, value.longValue());
}
public static BindVariable forBigDecimal(String name, BigDecimal value) {
return forDouble(name, value.doubleValue());
}
public String getName() {
return name;
}
public Type getType() {
return type;
}
public long getLongVal() {
return longVal;
}
public UnsignedLong getULongVal() {
return uLongVal;
}
public double getDoubleVal() {
return doubleVal;
}
public byte[] getByteArrayVal() {
return byteArrayVal;
}
public static enum Type {
NULL, LONG, UNSIGNED_LONG, DOUBLE, BYTE_ARRAY;
}
}

Просмотреть файл

@ -1,44 +0,0 @@
package com.youtube.vitess.vtgate;
public class Exceptions {
/**
* Exception raised at the tablet or MySQL layer due to issues such as invalid syntax, etc.
*/
@SuppressWarnings("serial")
public static class DatabaseException extends Exception {
public DatabaseException(String message) {
super(message);
}
}
/**
* Exception raised by MySQL due to violation of unique key constraint
*/
@SuppressWarnings("serial")
public static class IntegrityException extends DatabaseException {
public IntegrityException(String message) {
super(message);
}
}
/**
* Exception caused due to irrecoverable connection failures or other low level exceptions
*/
@SuppressWarnings("serial")
public static class ConnectionException extends Exception {
public ConnectionException(String message) {
super(message);
}
}
/**
* Exception raised due to fetching a non-existent field or with the wrong type
*/
@SuppressWarnings("serial")
public static class InvalidFieldException extends RuntimeException {
public InvalidFieldException(String message) {
super(message);
}
}
}

Просмотреть файл

@ -1,158 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.Row.Cell;
import org.apache.commons.lang.CharEncoding;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.ISODateTimeFormat;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
public class Field {
/**
* MySQL field flags bitset values e.g. to distinguish between signed and unsigned integer.
* Comments are taken from the original source code.
* These numbers should exactly match values defined in dist/mysql-5.1.52/include/mysql_com.h
*/
public enum Flag {
// VT_ZEROVALUE_FLAG is not part of the MySQL specification and only used in unit tests.
VT_ZEROVALUE_FLAG(0),
VT_NOT_NULL_FLAG (1), /* Field can't be NULL */
VT_PRI_KEY_FLAG(2), /* Field is part of a primary key */
VT_UNIQUE_KEY_FLAG(4), /* Field is part of a unique key */
VT_MULTIPLE_KEY_FLAG(8), /* Field is part of a key */
VT_BLOB_FLAG(16), /* Field is a blob */
VT_UNSIGNED_FLAG(32), /* Field is unsigned */
VT_ZEROFILL_FLAG(64), /* Field is zerofill */
VT_BINARY_FLAG(128), /* Field is binary */
/* The following are only sent to new clients */
VT_ENUM_FLAG(256), /* field is an enum */
VT_AUTO_INCREMENT_FLAG(512), /* field is a autoincrement field */
VT_TIMESTAMP_FLAG(1024), /* Field is a timestamp */
VT_SET_FLAG(2048), /* field is a set */
VT_NO_DEFAULT_VALUE_FLAG(4096), /* Field doesn't have default value */
VT_ON_UPDATE_NOW_FLAG(8192), /* Field is set to NOW on UPDATE */
VT_NUM_FLAG(32768); /* Field is num (for clients) */
public long mysqlFlag;
Flag(long mysqlFlag) {
this.mysqlFlag = mysqlFlag;
}
}
private String name;
private FieldType type;
private long mysqlFlags;
public static Field newFieldFromMysql(String name, int mysqlTypeId, long mysqlFlags) {
for (FieldType ft : FieldType.values()) {
if (ft.mysqlType == mysqlTypeId) {
return new Field(name, ft, mysqlFlags);
}
}
throw new RuntimeException("Unknown MySQL type: " + mysqlTypeId);
}
@VisibleForTesting
static Field newFieldForTest(FieldType fieldType, Flag flag) {
return new Field("dummyField", fieldType, flag.mysqlFlag);
}
private Field(String name, FieldType type, long mysqlFlags) {
this.name = name;
this.type = type;
this.mysqlFlags = mysqlFlags;
}
public Cell convertValueToCell(byte[] bytes) {
if ((mysqlFlags & Flag.VT_UNSIGNED_FLAG.mysqlFlag) != 0) {
return new Cell(name, convert(bytes), type.unsignedJavaType);
} else {
return new Cell(name, convert(bytes), type.javaType);
}
}
Object convert(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
String s = null;
try {
s = new String(bytes, CharEncoding.ISO_8859_1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
switch (type) {
case VT_DECIMAL:
return new BigDecimal(s);
case VT_TINY:
return Integer.valueOf(s);
case VT_SHORT:
return Integer.valueOf(s);
case VT_LONG:
return Long.valueOf(s);
case VT_FLOAT:
return Float.valueOf(s);
case VT_DOUBLE:
return Double.valueOf(s);
case VT_NULL:
return null;
case VT_TIMESTAMP:
s = s.replace(' ', 'T');
return DateTime.parse(s);
case VT_LONGLONG:
// This can be an unsigned or a signed long
if ((mysqlFlags & Flag.VT_UNSIGNED_FLAG.mysqlFlag) != 0) {
return UnsignedLong.valueOf(s);
} else {
return Long.valueOf(s);
}
case VT_INT24:
return Integer.valueOf(s);
case VT_DATE:
return DateTime.parse(s, ISODateTimeFormat.date());
case VT_TIME:
DateTime d = DateTime.parse(s, DateTimeFormat.forPattern("HH:mm:ss"));
return d;
case VT_DATETIME:
s = s.replace(' ', 'T');
return DateTime.parse(s);
case VT_YEAR:
return Short.valueOf(s);
case VT_NEWDATE:
return DateTime.parse(s, ISODateTimeFormat.date());
case VT_VARCHAR:
return bytes;
case VT_BIT:
return bytes;
case VT_NEWDECIMAL:
return new BigDecimal(s);
case VT_ENUM:
return s;
case VT_SET:
return s;
case VT_TINY_BLOB:
return bytes;
case VT_MEDIUM_BLOB:
return bytes;
case VT_LONG_BLOB:
return bytes;
case VT_BLOB:
return bytes;
case VT_VAR_STRING:
return bytes;
case VT_STRING:
return bytes;
case VT_GEOMETRY:
return bytes;
default:
throw new RuntimeException("invalid field type " + this);
}
}
}

Просмотреть файл

@ -1,57 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.primitives.UnsignedLong;
import org.joda.time.DateTime;
import java.math.BigDecimal;
/**
* Represents all field types supported by Vitess and their corresponding types in Java. mysqlType
* numbers should exactly match values defined in dist/mysql-5.1.52/include/mysql/mysql_com.h
*
*/
public enum FieldType {
VT_DECIMAL(0, BigDecimal.class),
VT_TINY(1, Integer.class),
VT_SHORT(2, Integer.class),
VT_LONG(3, Long.class),
VT_FLOAT(4, Float.class),
VT_DOUBLE(5, Double.class),
VT_NULL(6, null),
VT_TIMESTAMP(7, DateTime.class),
VT_LONGLONG(8, Long.class, UnsignedLong.class),
VT_INT24(9, Integer.class),
VT_DATE(10, DateTime.class),
VT_TIME(11, DateTime.class),
VT_DATETIME(12, DateTime.class),
VT_YEAR(13, Short.class),
VT_NEWDATE(14, DateTime.class),
VT_VARCHAR(15, byte[].class),
VT_BIT(16, byte[].class),
VT_NEWDECIMAL(246, BigDecimal.class),
VT_ENUM(247, String.class),
VT_SET(248, String.class),
VT_TINY_BLOB(249, byte[].class),
VT_MEDIUM_BLOB(250, byte[].class),
VT_LONG_BLOB(251, byte[].class),
VT_BLOB(252, byte[].class),
VT_VAR_STRING(253, byte[].class),
VT_STRING(254, byte[].class),
VT_GEOMETRY(255, byte[].class);
public final int mysqlType;
public final Class javaType;
public final Class unsignedJavaType;
FieldType(int mysqlType, Class javaType) {
this.mysqlType = mysqlType;
this.javaType = javaType;
this.unsignedJavaType = javaType;
}
FieldType(int mysqlType, Class javaType, Class unsignedJavaType) {
this.mysqlType = mysqlType;
this.javaType = javaType;
this.unsignedJavaType = unsignedJavaType;
}
}

Просмотреть файл

@ -1,29 +0,0 @@
package com.youtube.vitess.vtgate;
import java.util.HashMap;
import java.util.Map;
public class KeyRange {
private static final String START = "Start";
private static final String END = "End";
private static final byte[] NULL_BYTES = "".getBytes();
public static final KeyRange ALL = new KeyRange(null, null);
KeyspaceId start;
KeyspaceId end;
public KeyRange(KeyspaceId start, KeyspaceId end) {
this.start = start;
this.end = end;
}
public Map<String, byte[]> toMap() {
byte[] startBytes = start == null ? NULL_BYTES : start.getBytes();
byte[] endBytes = end == null ? NULL_BYTES : end.getBytes();
Map<String, byte[]> map = new HashMap<>();
map.put(START, startBytes);
map.put(END, endBytes);
return map;
}
}

Просмотреть файл

@ -1,91 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedLong;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
/**
* KeyspaceId can be either String or UnsignedLong. Use factory method valueOf to create instances
*/
public class KeyspaceId implements Comparable<KeyspaceId> {
private Object id;
public static final String COL_NAME = "keyspace_id";
public KeyspaceId() {
}
public void setId(Object id) {
if (!(id instanceof String) && !(id instanceof UnsignedLong)) {
throw new IllegalArgumentException(
"invalid id type, must be either String or UnsignedLong " + id.getClass());
}
this.id = id;
}
public Object getId() {
return id;
}
public byte[] getBytes() {
if (id instanceof String) {
try {
return Hex.decodeHex(((String) id).toCharArray());
} catch (DecoderException e) {
throw new IllegalArgumentException("illegal string id", e);
}
} else {
return Longs.toByteArray(((UnsignedLong) id).longValue());
}
}
/**
* Creates a KeyspaceId from id which must be a String or UnsignedLong.
*/
public static KeyspaceId valueOf(Object id) {
KeyspaceId kid = new KeyspaceId();
kid.setId(id);
return kid;
}
@Override
public boolean equals(Object o) {
if (o instanceof KeyspaceId) {
return this.compareTo((KeyspaceId) o) == 0;
}
return false;
}
@Override
public int hashCode() {
if (id instanceof UnsignedLong) {
return ((UnsignedLong) id).hashCode();
}
return ((String) id).hashCode();
}
@Override
public int compareTo(KeyspaceId o) {
if (o == null) {
throw new NullPointerException();
}
if (id instanceof UnsignedLong && o.id instanceof UnsignedLong) {
UnsignedLong thisId = (UnsignedLong) id;
UnsignedLong otherId = (UnsignedLong) o.id;
return thisId.compareTo(otherId);
}
if (id instanceof String && o.id instanceof String) {
String thisId = (String) id;
String otherId = (String) o.id;
return thisId.compareTo(otherId);
}
throw new IllegalArgumentException("unexpected id types");
}
}

Просмотреть файл

@ -1,132 +0,0 @@
package com.youtube.vitess.vtgate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Represents a VtGate query request. Use QueryBuilder to construct instances.
*
*/
public class Query {
private String sql;
private String keyspace;
private List<BindVariable> bindVars;
private String tabletType;
private List<byte[]> keyspaceIds;
private List<Map<String, byte[]>> keyRanges;
private boolean streaming;
private Object session;
private Query(String sql, String keyspace, String tabletType) {
this.sql = sql;
this.keyspace = keyspace;
this.tabletType = tabletType;
}
public String getSql() {
return sql;
}
public String getKeyspace() {
return keyspace;
}
public List<BindVariable> getBindVars() {
return bindVars;
}
public String getTabletType() {
return tabletType;
}
public List<byte[]> getKeyspaceIds() {
return keyspaceIds;
}
public List<Map<String, byte[]>> getKeyRanges() {
return keyRanges;
}
public boolean isStreaming() {
return streaming;
}
public Object getSession() {
return session;
}
public void setSession(Object session) {
this.session = session;
}
public static class QueryBuilder {
private Query query;
public QueryBuilder(String sql, String keyspace, String tabletType) {
query = new Query(sql, keyspace, tabletType);
}
public Query build() {
if (query.keyRanges == null && query.keyspaceIds == null) {
throw new IllegalStateException("query must have either keyspaceIds or keyRanges");
}
if (query.keyRanges != null && query.keyspaceIds != null) {
throw new IllegalStateException("query cannot have both keyspaceIds and keyRanges");
}
return query;
}
public QueryBuilder setBindVars(List<BindVariable> bindVars) {
query.bindVars = bindVars;
return this;
}
public QueryBuilder setKeyspaceIds(List<KeyspaceId> keyspaceIds) {
List<byte[]> kidsBytes = new ArrayList<>();
for (KeyspaceId kid : keyspaceIds) {
kidsBytes.add(kid.getBytes());
}
query.keyspaceIds = kidsBytes;
return this;
}
public QueryBuilder setKeyRanges(List<KeyRange> keyRanges) {
List<Map<String, byte[]>> keyRangeMaps = new ArrayList<>();
for (KeyRange kr : keyRanges) {
keyRangeMaps.add(kr.toMap());
}
query.keyRanges = keyRangeMaps;
return this;
}
public QueryBuilder setStreaming(boolean streaming) {
query.streaming = streaming;
return this;
}
public QueryBuilder addBindVar(BindVariable bindVariable) {
if (query.getBindVars() == null) {
query.bindVars = new ArrayList<BindVariable>();
}
query.getBindVars().add(bindVariable);
return this;
}
public QueryBuilder addKeyspaceId(KeyspaceId keyspaceId) {
if (query.getKeyspaceIds() == null) {
query.keyspaceIds = new ArrayList<byte[]>();
}
query.getKeyspaceIds().add(keyspaceId.getBytes());
return this;
}
public QueryBuilder addKeyRange(KeyRange keyRange) {
if (query.getKeyRanges() == null) {
query.keyRanges = new ArrayList<Map<String, byte[]>>();
}
query.getKeyRanges().add(keyRange.toMap());
return this;
}
}
}

Просмотреть файл

@ -1,25 +0,0 @@
package com.youtube.vitess.vtgate;
public class QueryResponse {
private QueryResult result;
private Object session;
private String error;
public QueryResponse(QueryResult result, Object session, String error) {
this.result = result;
this.session = session;
this.error = error;
}
public QueryResult getResult() {
return result;
}
public Object getSession() {
return session;
}
public String getError() {
return error;
}
}

Просмотреть файл

@ -1,39 +0,0 @@
package com.youtube.vitess.vtgate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import java.util.List;
/**
* Represents a VtGate query result set. For selects, rows are better accessed through the iterator
* {@link Cursor}.
*/
public class QueryResult {
private List<Row> rows;
private List<Field> fields;
private long rowsAffected;
private long lastRowId;
public QueryResult(List<Row> rows, List<Field> fields, long rowsAffected, long lastRowId) {
this.rows = rows;
this.fields = fields;
this.rowsAffected = rowsAffected;
this.lastRowId = lastRowId;
}
public List<Field> getFields() {
return fields;
}
public List<Row> getRows() {
return rows;
}
public long getRowsAffected() {
return rowsAffected;
}
public long getLastRowId() {
return lastRowId;
}
}

Просмотреть файл

@ -1,20 +0,0 @@
package com.youtube.vitess.vtgate;
public class RPCError {
private long code;
private String message;
public RPCError(long code, String message) {
this.code = code;
this.message = message;
}
public long getCode() {
return code;
}
public String getMessage() {
return message;
}
}

Просмотреть файл

@ -1,182 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.Exceptions.InvalidFieldException;
import com.youtube.vitess.vtgate.Row.Cell;
import org.joda.time.DateTime;
import java.math.BigDecimal;
import java.util.Iterator;
import java.util.List;
public class Row implements Iterator<Cell>, Iterable<Cell> {
private ImmutableMap<String, Cell> contents;
private Iterator<String> iterator;
public Row(List<Cell> cells) {
ImmutableMap.Builder<String, Cell> builder = new ImmutableMap.Builder<>();
for (Cell cell : cells) {
builder.put(cell.getName(), cell);
}
contents = builder.build();
iterator = contents.keySet().iterator();
}
public int size() {
return contents.keySet().size();
}
public Object getObject(int index) throws InvalidFieldException {
if (index >= size()) {
throw new InvalidFieldException("invalid field index " + index);
}
return getObject(contents.keySet().asList().get(index));
}
public Object getObject(String fieldName) throws InvalidFieldException {
if (!contents.containsKey(fieldName)) {
throw new InvalidFieldException("invalid field name " + fieldName);
}
return contents.get(fieldName).getValue();
}
public Integer getInt(String fieldName) throws InvalidFieldException {
return (Integer) getAndCheckType(fieldName, Integer.class);
}
public Integer getInt(int index) throws InvalidFieldException {
return (Integer) getAndCheckType(index, Integer.class);
}
public UnsignedLong getULong(String fieldName) throws InvalidFieldException {
return (UnsignedLong) getAndCheckType(fieldName, UnsignedLong.class);
}
public UnsignedLong getULong(int index) throws InvalidFieldException {
return (UnsignedLong) getAndCheckType(index, UnsignedLong.class);
}
public String getString(String fieldName) throws InvalidFieldException {
return (String) getAndCheckType(fieldName, String.class);
}
public String getString(int index) throws InvalidFieldException {
return (String) getAndCheckType(index, String.class);
}
public Long getLong(String fieldName) throws InvalidFieldException {
return (Long) getAndCheckType(fieldName, Long.class);
}
public Long getLong(int index) throws InvalidFieldException {
return (Long) getAndCheckType(index, Long.class);
}
public Double getDouble(String fieldName) throws InvalidFieldException {
return (Double) getAndCheckType(fieldName, Double.class);
}
public Double getDouble(int index) throws InvalidFieldException {
return (Double) getAndCheckType(index, Double.class);
}
public Float getFloat(String fieldName) throws InvalidFieldException {
return (Float) getAndCheckType(fieldName, Float.class);
}
public Float getFloat(int index) throws InvalidFieldException {
return (Float) getAndCheckType(index, Float.class);
}
public DateTime getDateTime(String fieldName) throws InvalidFieldException {
return (DateTime) getAndCheckType(fieldName, DateTime.class);
}
public DateTime getDateTime(int index) throws InvalidFieldException {
return (DateTime) getAndCheckType(index, DateTime.class);
}
public byte[] getBytes(String fieldName) throws InvalidFieldException {
return (byte[]) getAndCheckType(fieldName, byte[].class);
}
public byte[] getBytes(int index) throws InvalidFieldException {
return (byte[]) getAndCheckType(index, byte[].class);
}
public BigDecimal getBigDecimal(String fieldName) throws InvalidFieldException {
return (BigDecimal) getAndCheckType(fieldName, BigDecimal.class);
}
public BigDecimal getBigDecimal(int index) throws InvalidFieldException {
return (BigDecimal) getAndCheckType(index, BigDecimal.class);
}
public Short getShort(String fieldName) throws InvalidFieldException {
return (Short) getAndCheckType(fieldName, Short.class);
}
public Short getShort(int index) throws InvalidFieldException {
return (Short) getAndCheckType(index, Short.class);
}
private Object getAndCheckType(String fieldName, Class clazz) throws InvalidFieldException {
Object o = getObject(fieldName);
if (o != null && !clazz.isInstance(o)) {
throw new InvalidFieldException(
"type mismatch expected:" + clazz.getName() + " actual: " + o.getClass().getName());
}
return o;
}
private Object getAndCheckType(int index, Class clazz) throws InvalidFieldException {
return getAndCheckType(contents.keySet().asList().get(index), clazz);
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Cell next() {
return contents.get(iterator.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException("can't remove from row");
}
@Override
public Iterator<Cell> iterator() {
return this;
}
public static class Cell {
private String name;
private Object value;
private Class type;
public Cell(String name, Object value, Class type) {
this.name = name;
this.value = value;
this.type = type;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
public Class getType() {
return type;
}
}
}

Просмотреть файл

@ -1,31 +0,0 @@
package com.youtube.vitess.vtgate;
public class SplitQueryRequest {
private String sql;
private String keyspace;
private String splitColumn;
private int splitCount;
public SplitQueryRequest(String sql, String keyspace, int splitCount, String splitColumn) {
this.sql = sql;
this.keyspace = keyspace;
this.splitCount = splitCount;
this.splitColumn = splitColumn;
}
public String getSql() {
return this.sql;
}
public String getKeyspace() {
return this.keyspace;
}
public int getSplitCount() {
return this.splitCount;
}
public String getSplitColumn() {
return this.splitColumn;
}
}

Просмотреть файл

@ -1,28 +0,0 @@
package com.youtube.vitess.vtgate;
import java.util.Map;
public class SplitQueryResponse {
private Map<Query, Long> queries;
private String error;
private RPCError err;
public SplitQueryResponse(Map<Query, Long> queries, String error, RPCError err) {
this.queries = queries;
this.error = error;
this.err = err;
}
public Map<Query, Long> getQueries() {
return queries;
}
public String getError() {
return error;
}
public RPCError getErr() {
return err;
}
}

Просмотреть файл

@ -1,131 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Helper class to hold the configurations for VtGate setup used in integration tests
*/
public class TestEnv {
public static final String PROPERTY_KEY_RPCCLIENT_FACTORY_CLASS = "vtgate.rpcclient.factory";
private Map<String, List<String>> shardKidMap;
private Map<String, Integer> tablets = new HashMap<>();
private String keyspace;
private Process pythonScriptProcess;
private int port;
private List<KeyspaceId> kids;
public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}
public String getKeyspace() {
return this.keyspace;
}
public int getPort() {
return this.port;
}
public void setPort(int port) {
this.port = port;
}
public Process getPythonScriptProcess() {
return this.pythonScriptProcess;
}
public void setPythonScriptProcess(Process process) {
this.pythonScriptProcess = process;
}
public void setShardKidMap(Map<String, List<String>> shardKidMap) {
this.shardKidMap = shardKidMap;
}
public Map<String, List<String>> getShardKidMap() {
return this.shardKidMap;
}
public void addTablet(String type, int count) {
tablets.put(type, count);
}
public String getShardNames() {
return StringUtils.join(shardKidMap.keySet(), ",");
}
public String getTabletConfig() {
return new Gson().toJson(tablets);
}
/**
* Return all keyspaceIds in the Keyspace
*/
public List<KeyspaceId> getAllKeyspaceIds() {
if (kids != null) {
return kids;
}
kids = new ArrayList<>();
for (List<String> ids : shardKidMap.values()) {
for (String id : ids) {
kids.add(KeyspaceId.valueOf(UnsignedLong.valueOf(id)));
}
}
return kids;
}
/**
* Return all keyspaceIds in a specific shard
*/
public List<KeyspaceId> getKeyspaceIds(String shardName) {
List<String> kidsStr = shardKidMap.get(shardName);
if (kidsStr != null) {
List<KeyspaceId> kids = new ArrayList<>();
for (String kid : kidsStr) {
kids.add(KeyspaceId.valueOf(UnsignedLong.valueOf(kid)));
}
return kids;
}
return null;
}
/**
* Get setup command to launch a cluster.
*/
public List<String> getSetupCommand() {
String vtTop = System.getenv("VTTOP");
if (vtTop == null) {
throw new RuntimeException("cannot find env variable: VTTOP");
}
List<String> command = new ArrayList<String>();
command.add("python");
command.add(vtTop + "/test/java_vtgate_test_helper.py");
command.add("--shards");
command.add(getShardNames());
command.add("--tablet-config");
command.add(getTabletConfig());
command.add("--keyspace");
command.add(keyspace);
return command;
}
public RpcClientFactory getRpcClientFactory() {
String rpcClientFactoryClass = System.getProperty(PROPERTY_KEY_RPCCLIENT_FACTORY_CLASS);
try {
Class<?> clazz = Class.forName(rpcClientFactoryClass);
return (RpcClientFactory)clazz.newInstance();
} catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) {
throw new RuntimeException(e);
}
}
}

Просмотреть файл

@ -1,126 +0,0 @@
package com.youtube.vitess.vtgate;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Exceptions.IntegrityException;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.CursorImpl;
import com.youtube.vitess.vtgate.cursor.StreamCursor;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* A single threaded VtGate client
*
* <p>Usage:
*
* <pre>
*VtGate vtGate = VtGate.connect(addresses);
*Query query = new QueryBuilder()...add params...build();
*Cursor cursor = vtGate.execute(query);
*for(Row row : cursor) {
* processRow(row);
*}
*
*For DMLs
*vtgate.begin();
*Query query = new QueryBuilder()...add params...build();
*vtgate.execute(query);
*vtgate.commit();
*
*vtgate.close();
*</pre>
*
*/
public class VtGate {
private static String INTEGRITY_ERROR_MSG = "(errno 1062)";
private RpcClient client;
private Object session;
/**
* Opens connection to a VtGate server. Connection remains open until close() is called.
*
* @param addresses comma separated list of host:port pairs
* @param timeoutMs connection timeout in milliseconds, 0 for no timeout
* @throws ConnectionException
*/
public static VtGate connect(String addresses, int timeoutMs, RpcClientFactory rpcFactory) throws ConnectionException {
List<String> addressList = Arrays.asList(addresses.split(","));
int index = new Random().nextInt(addressList.size());
RpcClient client = rpcFactory.create(addressList.get(index), timeoutMs);
return new VtGate(client);
}
private VtGate(RpcClient client) {
this.client = client;
}
public void begin() throws ConnectionException {
session = client.begin();
}
public Cursor execute(Query query) throws DatabaseException, ConnectionException {
if (session != null) {
query.setSession(session);
}
QueryResponse response = client.execute(query);
if (response.getSession() != null) {
session = response.getSession();
}
String error = response.getError();
if (error != null) {
if (error.contains(INTEGRITY_ERROR_MSG)) {
throw new IntegrityException(error);
}
throw new DatabaseException(response.getError());
}
if (query.isStreaming()) {
return new StreamCursor(response.getResult(), client);
}
return new CursorImpl(response.getResult());
}
/**
* Split a query into primary key range query parts. Rows corresponding to the sub queries will
* add up to original queries' rows. Sub queries are by default built to run against 'rdonly'
* instances. Batch jobs or MapReduce jobs that needs to scan all rows can use these queries to
* parallelize full table scans.
*/
public Map<Query, Long> splitQuery(String keyspace, String sql, int splitCount, String pkColumn)
throws ConnectionException, DatabaseException {
SplitQueryRequest req = new SplitQueryRequest(sql, keyspace, splitCount, pkColumn);
SplitQueryResponse response = client.splitQuery(req);
if (response.getError() != null) {
throw new DatabaseException(response.getError());
}
return response.getQueries();
}
public void commit() throws ConnectionException {
try {
client.commit(session);
} finally {
session = null;
}
}
public void rollback() throws ConnectionException {
try {
client.rollback(session);
} finally {
session = null;
}
}
public void close() throws ConnectionException {
if (session != null) {
rollback();
}
client.close();
}
}

Просмотреть файл

@ -1,11 +0,0 @@
package com.youtube.vitess.vtgate.cursor;
import com.youtube.vitess.vtgate.Row;
import java.util.Iterator;
public interface Cursor extends Iterator<Row>, Iterable<Row> {
public long getRowsAffected();
public long getLastRowId();
}

Просмотреть файл

@ -1,43 +0,0 @@
package com.youtube.vitess.vtgate.cursor;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.Row;
import java.util.Iterator;
public class CursorImpl implements Cursor {
private QueryResult result;
private Iterator<Row> iter;
public CursorImpl(QueryResult result) {
this.result = result;
this.iter = result.getRows().iterator();
}
public Row next() {
return this.iter.next();
}
public boolean hasNext() {
return this.iter.hasNext();
}
@Override
public void remove() {
throw new UnsupportedOperationException("cannot remove from results");
}
@Override
public Iterator<Row> iterator() {
return this;
}
public long getRowsAffected() {
return result.getRowsAffected();
}
public long getLastRowId() {
return result.getLastRowId();
}
}

Просмотреть файл

@ -1,97 +0,0 @@
package com.youtube.vitess.vtgate.cursor;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Iterator;
/**
* StreamCursor is for iterating through stream query results. When the current buffer is completed,
* StreamCursor does an RPC fetch for the next set until the stream ends or an error occurs.
*
*/
public class StreamCursor implements Cursor {
static final Logger logger = LogManager.getLogger(StreamCursor.class.getName());
private QueryResult queryResult;
private Iterator<Row> iterator;
private RpcClient client;
private boolean streamEnded;
public StreamCursor(QueryResult currentResult, RpcClient client) {
this.queryResult = currentResult;
this.iterator = currentResult.getRows().iterator();
this.client = client;
}
@Override
public boolean hasNext() {
fetchMoreIfNeeded();
return iterator.hasNext();
}
@Override
public Row next() {
fetchMoreIfNeeded();
return this.iterator.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException("cannot remove from results");
}
@Override
public Iterator<Row> iterator() {
return this;
}
public long getRowsAffected() {
throw new UnsupportedOperationException("not supported for streaming cursors");
}
public long getLastRowId() {
throw new UnsupportedOperationException("not supported for streaming cursors");
}
/**
* Update the iterator by fetching more rows if current buffer is done
*/
private void fetchMoreIfNeeded() {
// Either current buffer is not empty or we have reached end of stream,
// nothing to do here.
if (this.iterator.hasNext() || streamEnded) {
return;
}
QueryResult qr;
try {
qr = client.streamNext(queryResult.getFields());
} catch (ConnectionException e) {
logger.error("connection exception while streaming", e);
throw new RuntimeException(e);
}
// null reply indicates EndOfStream, mark stream as ended and return
if (qr == null) {
streamEnded = true;
return;
}
// For scatter streaming queries, VtGate sends fields data from each
// shard. Since fields has already been fetched, just ignore these and
// fetch the next batch.
if (qr.getRows().size() == 0) {
fetchMoreIfNeeded();
} else {
// We got more rows, update the current buffer and iterator
queryResult = qr;
iterator = queryResult.getRows().iterator();
}
}
}

Просмотреть файл

@ -1,96 +0,0 @@
# Hadoop Integration
This package contains the necessary implementations for providing Hadoop support on Vitess. This allows mapreducing over tables stored in Vitess from Hadoop.
Let's look at an example. Consider a table with the following schema that is stored in Vitess across several shards.
```
create table sample_table (
id bigint auto_increment,
name varchar(64),
keyspace_id bigint(20) unsigned NOT NULL,
primary key (id)) Engine=InnoDB;
```
Let's say we want to write a MapReduce job that imports this table from Vitess to HDFS where each row is turned into a CSV record in HDFS.
We can use [VitessInputFormat](https://github.com/youtube/vitess/blob/090830a29c10ae813a2edae18ad780067fb46c05/java/vtgate-client/src/main/java/com/youtube/vitess/vtgate/hadoop/VitessInputFormat.java), an implementation of Hadoop's [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html), for that. With VitessInputFormat, rows from the source table are streamed to the mapper task. Each input record has a [NullWritable](https://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/io/NullWritable.html) key (no key, really), and [RowWritable](https://github.com/youtube/vitess/blob/090830a29c10ae813a2edae18ad780067fb46c05/java/vtgate-client/src/main/java/com/youtube/vitess/vtgate/hadoop/writables/RowWritable.java) as value, which is a writable implementation for the entire row's contents.
Here is an example implementation of our mapper, which transforms each row into a CSV Text.
```java
public class TableCsvMapper extends
Mapper<NullWritable, RowWritable, NullWritable, Text> {
@Override
public void map(NullWritable key, RowWritable value, Context context)
throws IOException, InterruptedException {
Row row = value.getRow();
StringBuilder asCsv = new StringBuilder();
asCsv.append(row.getInt("id"));
asCsv.append(",");
asCsv.append(row.getString("name"));
asCsv.append(",");
asCsv.append(row.getULong("keyspace_id"));
context.write(NullWritable.get(), new Text(asCsv.toString()));
}
}
```
The controller code for this MR job is shown below. Note that we are not specifying any sharding/replication related information here. VtGate figures out the right number of shards and replicas to fetch the rows from. The MR author only needs to worry about which rows to fetch (query), how to process them (mapper) and the extent of parallelism (splitCount).
```java
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "map vitess table");
job.setJarByClass(VitessInputFormat.class);
job.setMapperClass(TableCsvMapper.class);
String vtgateAddresses = "localhost:15011,localhost:15012,localhost:15013";
String keyspace = "SAMPLE_KEYSPACE";
String query = "select id, name from sample_table";
int splitCount = 100;
VitessInputFormat.setInput(job, vtgateAddresses, keyspace, query, splitCount);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
...// set reducer and outputpath and launch the job
}
```
Refer [this integration test](https://github.com/youtube/vitess/blob/090830a29c10ae813a2edae18ad780067fb46c05/java/vtgate-client/src/test/java/com/youtube/vitess/vtgate/integration/hadoop/MapReduceIT.java) for a working example a MapReduce job on Vitess.
## How it Works
VitessInputFormat relies on VtGate's [SplitQuery](https://github.com/youtube/vitess/blob/c680b39852662739a4f5f539ad3ff48ae83d26d1/go/vt/vtgate/vtgateservice/interface.go#L38) service to obtain the input splits. This service accepts a SplitQueryRequest which consists of an input query and the desired number of splits (splitCount). SplitQuery returns SplitQueryResult, which has a list of SplitQueryParts. SplitQueryPart consists of a KeyRangeQuery and a size estimate of how many rows this sub-query might return. SplitQueryParts return rows that are mutually exclusive and collectively exhaustive - all rows belonging to the original input query will be returned by one and exactly one SplitQueryPart.
VitessInputFormat turns each SplitQueryPart into a mapper task. The number of splits generated may not be exactly equal to the desired split count specified in the input. Specifically, if the desired split count is not a multiple of the number of shards, then VtGate will round it up to the next bigger multiple of number of shards.
In addition to splitting the query, the SplitQuery service also acts as a gatekeeper that rejects queries unsuitable for MapReduce. Queries with potentially expensive operations such as Joins, Group By, inner queries, Distinct, Order By, etc are not allowed. Specifically, only input queries of the following syntax are permitted.
```
select [list of columns] from table where [list of simple column filters];
```
There are additional constraints on the table schema to ensure that the sub queries do not result in full table scans. The table must have a primary key (simple or composite) and the leading primary key must be of one of the following types.
```
VT_TINY, VT_SHORT, VT_LONG, VT_LONGLONG, VT_INT24, VT_FLOAT, VT_DOUBLE
```
#### Split Generation
Here's how SplitQuery works. VtGate forwards the input query to randomly chosen rdonly vttablets in each shard with a split count, M = original split count / N, where N is the number of shards. Each vttablet parses the query and rejects it if it does not meet the constraints. If it is a valid query, the tablet fetches the min and max value for the leading primary key column from MySQL. Split the [min, max] range of into M intervals. Construct subqueries by appending where clauses corresponding to PK range intervals to the original query and return it to VtGate. VtGate aggregates the splits received from tablets and constructs KeyRangeQueries by appending KeyRange corresponding to that shard. The following diagram depicts this flow for a sample request of split size 6 on a cluster with two shards.
![Image](https://raw.githubusercontent.com/youtube/vitess/090830a29c10ae813a2edae18ad780067fb46c05/java/vtgate-client/src/main/java/com/youtube/vitess/vtgate/hadoop/SplitQuery.png)
## Other Considerations
1. Specifying splitCount - Each split is a streaming query executed by a single mapper task. splitCount determines the number of mapper tasks that will be created and thus the extent of parallelism. Having too few, but long-running, splits would limit the throughput of the MR job as a whole. Long-running splits also makes retries of individual tasks failures more expensive as compared to leaner splits. On the other side, having too many splits can lead to extra overhead in task setup and connection overhead with VtGate. So, identifying the ideal split count is a balance between the two.
2. Joining multiple tables - Currently Vitess does not currently mapping over joined tables. However, this can be easily achieved by writing a multi-mapper MapReduce job and performing a reduce-side join in the MR job.
3. Views - Database Views are not great for full-table scans. If you need to map over a View, consider mapping over the underlying tables instead.

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 33 KiB

Просмотреть файл

@ -1,78 +0,0 @@
package com.youtube.vitess.vtgate.hadoop;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import org.apache.hadoop.conf.Configuration;
/**
* Collection of configuration properties used for {@link VitessInputFormat}
*/
public class VitessConf {
public static final String HOSTS = "vitess.vtgate.hosts";
public static final String CONN_TIMEOUT_MS = "vitess.vtgate.conn_timeout_ms";
public static final String INPUT_KEYSPACE = "vitess.vtgate.hadoop.keyspace";
public static final String INPUT_QUERY = "vitess.vtgate.hadoop.input_query";
public static final String SPLITS = "vitess.vtgate.hadoop.splits";
public static final String SPLIT_COLUMN = "vitess.vtgate.hadoop.splitcolumn";
public static final String RPC_FACTORY_CLASS = "vtgate.rpcclient.factory";
public static final String HOSTS_DELIM = ",";
private Configuration conf;
public VitessConf(Configuration conf) {
this.conf = conf;
}
public String getHosts() {
return conf.get(HOSTS);
}
public void setHosts(String hosts) {
conf.set(HOSTS, hosts);
}
public int getTimeoutMs() {
return conf.getInt(CONN_TIMEOUT_MS, 0);
}
public void setTimeoutMs(int timeoutMs) {
conf.setInt(CONN_TIMEOUT_MS, timeoutMs);
}
public String getKeyspace() {
return conf.get(INPUT_KEYSPACE);
}
public void setKeyspace(String keyspace) {
conf.set(INPUT_KEYSPACE, keyspace);
}
public String getInputQuery() {
return conf.get(INPUT_QUERY);
}
public void setInputQuery(String query) {
conf.set(INPUT_QUERY, query);
}
public int getSplits() {
return conf.getInt(SPLITS, 1);
}
public void setSplits(int splits) {
conf.setInt(SPLITS, splits);
}
public String getSplitColumn() {
return conf.get(SPLIT_COLUMN);
}
public void setSplitColumn(String splitColumn) {
conf.set(SPLIT_COLUMN, splitColumn);
}
public String getRpcFactoryClass() { return conf.get(RPC_FACTORY_CLASS); }
public void setRpcFactoryClass(Class<? extends RpcClientFactory> clz) {
conf.set(RPC_FACTORY_CLASS, clz.getName());
}
}

Просмотреть файл

@ -1,83 +0,0 @@
package com.youtube.vitess.vtgate.hadoop;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.hadoop.writables.KeyspaceIdWritable;
import com.youtube.vitess.vtgate.hadoop.writables.RowWritable;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* {@link VitessInputFormat} is the {@link InputFormat} for tables in Vitess. Input splits (
* {@link VitessInputSplit}) are fetched from VtGate via an RPC. map() calls are supplied with a
* {@link KeyspaceIdWritable}, {@link RowWritable} pair.
*/
public class VitessInputFormat extends InputFormat<NullWritable, RowWritable> {
@Override
public List<InputSplit> getSplits(JobContext context) {
try {
VitessConf conf = new VitessConf(context.getConfiguration());
Class<? extends RpcClientFactory> rpcFactoryClass = null;
VtGate vtgate;
try {
rpcFactoryClass = (Class<? extends RpcClientFactory>)Class.forName(conf.getRpcFactoryClass());
vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass.newInstance());
} catch (ClassNotFoundException|InstantiationException|IllegalAccessException e) {
throw new RuntimeException(e);
}
Map<Query, Long> queries =
vtgate.splitQuery(conf.getKeyspace(), conf.getInputQuery(), conf.getSplits(), conf.getSplitColumn());
List<InputSplit> splits = new LinkedList<>();
for (Query query : queries.keySet()) {
Long size = queries.get(query);
InputSplit split = new VitessInputSplit(query, size);
splits.add(split);
}
for (InputSplit split : splits) {
((VitessInputSplit) split).setLocations(conf.getHosts().split(VitessConf.HOSTS_DELIM));
}
return splits;
} catch (ConnectionException | DatabaseException e) {
throw new RuntimeException(e);
}
}
public RecordReader<NullWritable, RowWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new VitessRecordReader();
}
/**
* Sets the necessary configurations for Vitess table input source
*/
public static void setInput(Job job,
String hosts,
String keyspace,
String query,
int splits,
Class<? extends RpcClientFactory> rpcFactoryClass) {
job.setInputFormatClass(VitessInputFormat.class);
VitessConf vtConf = new VitessConf(job.getConfiguration());
vtConf.setHosts(hosts);
vtConf.setKeyspace(keyspace);
vtConf.setInputQuery(query);
vtConf.setSplits(splits);
vtConf.setRpcFactoryClass(rpcFactoryClass);
}
}

Просмотреть файл

@ -1,64 +0,0 @@
package com.youtube.vitess.vtgate.hadoop;
import com.google.gson.Gson;
import com.youtube.vitess.vtgate.Query;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* {@link VitessInputSplit} has a Query corresponding to a set of rows from a Vitess table input
* source. Locations point to the same VtGate hosts used to fetch the splits. Length information is
* only approximate.
*
*/
public class VitessInputSplit extends InputSplit implements Writable {
private String[] locations;
private Query query;
private long length;
private final Gson gson = new Gson();
public VitessInputSplit(Query query, long length) {
this.query = query;
this.length = length;
}
public VitessInputSplit() {
}
public Query getQuery() {
return query;
}
public void setLocations(String[] locations) {
this.locations = locations;
}
@Override
public long getLength() throws IOException, InterruptedException {
return length;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return locations;
}
@Override
public void readFields(DataInput input) throws IOException {
query = gson.fromJson(input.readUTF(), Query.class);
length = input.readLong();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeUTF(gson.toJson(query));
output.writeLong(length);
}
}

Просмотреть файл

@ -1,93 +0,0 @@
package com.youtube.vitess.vtgate.hadoop;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.hadoop.writables.RowWritable;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class VitessRecordReader extends RecordReader<NullWritable, RowWritable> {
private VitessInputSplit split;
private VtGate vtgate;
private VitessConf conf;
private RowWritable rowWritable;
private long rowsProcessed = 0;
private Cursor cursor;
/**
* Fetch connection parameters from Configuraiton and open VtGate connection.
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
this.split = (VitessInputSplit) split;
conf = new VitessConf(context.getConfiguration());
try {
Class<? extends RpcClientFactory> rpcFactoryClass = (Class<? extends RpcClientFactory>)Class.forName(conf.getRpcFactoryClass());
vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs(), rpcFactoryClass.newInstance());
} catch (ConnectionException|ClassNotFoundException|InstantiationException|IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
if (vtgate != null) {
try {
vtgate.close();
vtgate = null;
} catch (ConnectionException e) {
throw new RuntimeException(e);
}
}
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public RowWritable getCurrentValue() throws IOException, InterruptedException {
return rowWritable;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (rowsProcessed > split.getLength()) {
return 0.9f;
}
return rowsProcessed / split.getLength();
}
/**
* Fetches the next row. If this is the first invocation for the split, execute the streaming
* query. Subsequent calls just advance the iterator.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (cursor == null) {
try {
cursor = vtgate.execute(split.getQuery());
} catch (DatabaseException | ConnectionException e) {
throw new RuntimeException(e);
}
}
if (!cursor.hasNext()) {
return false;
}
Row row = cursor.next();
rowWritable = new RowWritable(row);
rowsProcessed++;
return true;
}
}

Просмотреть файл

@ -1,79 +0,0 @@
package com.youtube.vitess.vtgate.hadoop.utils;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import org.apache.commons.codec.binary.Base64;
import java.io.IOException;
import java.lang.reflect.Type;
/**
* Custom GSON adapters for {@link UnsignedLong} and {@link Class} types
*/
public class GsonAdapters {
public static final TypeAdapter<UnsignedLong> UNSIGNED_LONG = new TypeAdapter<UnsignedLong>() {
@Override
public UnsignedLong read(JsonReader in) throws IOException {
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
}
try {
return UnsignedLong.valueOf(in.nextString());
} catch (NumberFormatException e) {
throw new JsonSyntaxException(e);
}
}
@Override
public void write(JsonWriter out, UnsignedLong value) throws IOException {
out.value(value.toString());
}
};
public static final TypeAdapter<Class> CLASS = new TypeAdapter<Class>() {
@Override
public Class read(JsonReader in) throws IOException {
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
}
try {
return Class.forName(in.nextString());
} catch (NumberFormatException | ClassNotFoundException e) {
throw new JsonSyntaxException(e);
}
}
@Override
public void write(JsonWriter out, Class value) throws IOException {
out.value(value.getName());
}
};
public static final Object BYTE_ARRAY = new ByteArrayAdapter();
private static class ByteArrayAdapter implements JsonSerializer<byte[]>, JsonDeserializer<byte[]> {
@Override
public byte[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
return Base64.decodeBase64(json.getAsString());
}
@Override
public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonPrimitive(Base64.encodeBase64String(src));
}
}
}

Просмотреть файл

@ -1,89 +0,0 @@
package com.youtube.vitess.vtgate.hadoop.writables;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.hadoop.utils.GsonAdapters;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Serializable version of {@link KeyspaceId}
*/
public class KeyspaceIdWritable implements WritableComparable<KeyspaceIdWritable> {
private KeyspaceId keyspaceId;
// KeyspaceId can be UnsignedLong which needs a custom adapter
private Gson gson = new GsonBuilder().registerTypeAdapter(UnsignedLong.class,
GsonAdapters.UNSIGNED_LONG).create();
public KeyspaceIdWritable() {}
public KeyspaceIdWritable(KeyspaceId keyspaceId) {
this.keyspaceId = keyspaceId;
}
@Override
public void write(DataOutput out) throws IOException {
if (keyspaceId.getId() instanceof UnsignedLong) {
out.writeUTF("UnsignedLong");
out.writeUTF(((UnsignedLong) (keyspaceId.getId())).toString());
} else {
out.writeUTF("String");
out.writeUTF((String) (keyspaceId.getId()));
}
}
public KeyspaceId getKeyspaceId() {
return keyspaceId;
}
@Override
public void readFields(DataInput in) throws IOException {
String type = in.readUTF();
Object id;
if ("UnsignedLong".equals(type)) {
id = UnsignedLong.valueOf(in.readUTF());
} else {
id = in.readUTF();
}
keyspaceId = KeyspaceId.valueOf(id);
}
@Override
public String toString() {
return toJson();
}
public String toJson() {
return gson.toJson(keyspaceId, KeyspaceId.class);
}
@Override
public int compareTo(KeyspaceIdWritable o) {
if (o == null) {
throw new NullPointerException();
}
return this.keyspaceId.compareTo(o.keyspaceId);
}
@Override
public boolean equals(Object o) {
if (o instanceof KeyspaceIdWritable) {
return this.compareTo((KeyspaceIdWritable) o) == 0;
}
return false;
}
@Override
public int hashCode() {
return keyspaceId.hashCode();
}
}

Просмотреть файл

@ -1,125 +0,0 @@
package com.youtube.vitess.vtgate.hadoop.writables;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.hadoop.utils.GsonAdapters;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.LinkedList;
/**
* Serializable version of {@link Row}. Only implements {@link Writable} and not
* {@link WritableComparable} since this is not meant to be used as a key.
*
*/
public class RowWritable implements Writable {
private Row row;
// Row contains UnsignedLong and Class objects which need custom adapters
private Gson gson = new GsonBuilder()
.registerTypeHierarchyAdapter(byte[].class, GsonAdapters.BYTE_ARRAY)
.registerTypeAdapter(UnsignedLong.class, GsonAdapters.UNSIGNED_LONG)
.registerTypeAdapter(Class.class, GsonAdapters.CLASS)
.create();
public RowWritable() {
}
public RowWritable(Row row) {
this.row = row;
}
public Row getRow() {
return row;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(row.size());
for (Cell cell : row) {
writeCell(out, cell);
}
}
public void writeCell(DataOutput out, Cell cell) throws IOException {
out.writeUTF(cell.getName());
out.writeUTF(cell.getType().getName());
if (cell.getType().equals(byte[].class)){
out.writeUTF(Base64.encodeBase64String((byte[])cell.getValue()));
} else{
out.writeUTF(cell.getValue().toString());
}
}
@Override
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
LinkedList<Cell> cells = new LinkedList<>();
for (int i = 0; i < size; i++) {
cells.add(readCell(in));
}
row = new Row(cells);
}
public Cell readCell(DataInput in) throws IOException {
String name = in.readUTF();
String type = in.readUTF();
String value = in.readUTF();
Object val = null;
Class clazz;
try {
clazz = Class.forName(type);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
if (clazz.equals(Double.class)) {
val = Double.valueOf(value);
}
if (clazz.equals(Integer.class)) {
val = Integer.valueOf(value);
}
if (clazz.equals(Long.class)) {
val = Long.valueOf(value);
}
if (clazz.equals(Float.class)) {
val = Float.valueOf(value);
}
if (clazz.equals(UnsignedLong.class)) {
val = UnsignedLong.valueOf(value);
}
if (clazz.equals(Date.class)) {
val = Date.parse(value);
}
if (clazz.equals(String.class)) {
val = value;
}
if (clazz.equals(byte[].class)) {
val = Base64.decodeBase64(value);
}
if (val == null) {
throw new RuntimeException("unknown type in RowWritable: " + clazz);
}
return new Cell(name, val, clazz);
}
@Override
public String toString() {
return toJson();
}
public String toJson() {
return gson.toJson(row, Row.class);
}
}

Просмотреть файл

@ -1,32 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Field;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.QueryResponse;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.SplitQueryRequest;
import com.youtube.vitess.vtgate.SplitQueryResponse;
import java.util.List;
public interface RpcClient {
public Object begin() throws ConnectionException;
public void commit(Object session) throws ConnectionException;
public void rollback(Object session) throws ConnectionException;
public QueryResponse execute(Query query) throws ConnectionException;
public List<QueryResponse> executeBatchKeyspaceIds(
List<Query> queries, String tabletType, Object session, boolean asTransaction)
throws ConnectionException;
public QueryResult streamNext(List<Field> fields) throws ConnectionException;
public SplitQueryResponse splitQuery(SplitQueryRequest request) throws ConnectionException;
public void close() throws ConnectionException;
}

Просмотреть файл

@ -1,7 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
public interface RpcClientFactory {
RpcClient create(String address, int timeoutMs) throws ConnectionException;
}

Просмотреть файл

@ -1,24 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient.gorpc;
import com.google.common.net.HostAndPort;
import com.youtube.vitess.gorpc.Client;
import com.youtube.vitess.gorpc.Exceptions.GoRpcException;
import com.youtube.vitess.gorpc.codecs.bson.BsonClientCodecFactory;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import com.youtube.vitess.vtgate.rpcclient.RpcClientFactory;
public class BsonRpcClientFactory implements RpcClientFactory {
@Override
public RpcClient create(String address, int timeoutMs) throws ConnectionException {
try {
HostAndPort hostAndPort = HostAndPort.fromString(address);
Client client = Client.dialHttp(hostAndPort.getHostText(), hostAndPort.getPort(),
GoRpcClient.BSON_RPC_PATH, timeoutMs, new BsonClientCodecFactory());
return new GoRpcClient(client);
} catch (GoRpcException e) {
GoRpcClient.LOGGER.error("vtgate connection exception: ", e);
throw new ConnectionException(e.getMessage());
}
}
}

Просмотреть файл

@ -1,273 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient.gorpc;
import com.google.common.primitives.Ints;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BatchQuery;
import com.youtube.vitess.vtgate.BatchQueryResponse;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.Field;
import com.youtube.vitess.vtgate.Field.Flag;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.QueryResponse;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.RPCError;
import com.youtube.vitess.vtgate.SplitQueryRequest;
import com.youtube.vitess.vtgate.SplitQueryResponse;
import org.apache.commons.codec.binary.Hex;
import org.bson.BSONObject;
import org.bson.BasicBSONObject;
import org.bson.types.BasicBSONList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class Bsonify {
public static BSONObject queryToBson(Query query) {
BSONObject b = new BasicBSONObject();
b.put("Sql", query.getSql());
b.put("Keyspace", query.getKeyspace());
b.put("TabletType", query.getTabletType());
if (query.getBindVars() != null) {
b.put("BindVariables", bindVarsToBSON(query.getBindVars()));
}
if (query.getKeyspaceIds() != null) {
b.put("KeyspaceIds", query.getKeyspaceIds());
} else {
b.put("KeyRanges", query.getKeyRanges());
}
if (query.getSession() != null) {
b.put("Session", query.getSession());
}
return b;
}
public static BSONObject bindVarsToBSON(List<BindVariable> bindVars) {
BSONObject bindVariables = new BasicBSONObject();
for (BindVariable b : bindVars) {
if (b.getType().equals(BindVariable.Type.NULL)) {
bindVariables.put(b.getName(), null);
}
if (b.getType().equals(BindVariable.Type.LONG)) {
bindVariables.put(b.getName(), b.getLongVal());
}
if (b.getType().equals(BindVariable.Type.UNSIGNED_LONG)) {
bindVariables.put(b.getName(), b.getULongVal());
}
if (b.getType().equals(BindVariable.Type.DOUBLE)) {
bindVariables.put(b.getName(), b.getDoubleVal());
}
if (b.getType().equals(BindVariable.Type.BYTE_ARRAY)) {
bindVariables.put(b.getName(), b.getByteArrayVal());
}
}
return bindVariables;
}
public static BSONObject batchQueryToBson(BatchQuery batchQuery) {
BSONObject b = new BasicBSONObject();
List<Map<String, Object>> queries = new LinkedList<>();
Iterator<String> sqlIter = batchQuery.getSqls().iterator();
Iterator<List<BindVariable>> bvIter = batchQuery.getBindVarsList().iterator();
while (sqlIter.hasNext()) {
Map<String, Object> query = new HashMap<>();
query.put("Sql", sqlIter.next());
List<BindVariable> bindVars = bvIter.next();
if (bindVars != null) {
query.put("BindVariables", bindVarsToBSON(bindVars));
}
queries.add(query);
}
b.put("Queries", queries);
b.put("Keyspace", batchQuery.getKeyspace());
b.put("TabletType", batchQuery.getTabletType());
b.put("KeyspaceIds", batchQuery.getKeyspaceIds());
if (batchQuery.getSession() != null) {
b.put("Session", batchQuery.getSession());
}
return b;
}
public static QueryResponse bsonToQueryResponse(BSONObject reply) {
String error = null;
if (reply.containsField("Error")) {
byte[] err = (byte[]) reply.get("Error");
if (err.length > 0) {
error = new String(err);
}
}
QueryResult queryResult = null;
BSONObject result = (BSONObject) reply.get("Result");
if (result != null) {
queryResult = bsonToQueryResult(result, null);
}
Object session = null;
if (reply.containsField("Session")) {
session = reply.get("Session");
}
return new QueryResponse(queryResult, session, error);
}
public static BatchQueryResponse bsonToBatchQueryResponse(BSONObject reply) {
String error = null;
if (reply.containsField("Error")) {
byte[] err = (byte[]) reply.get("Error");
if (err.length > 0) {
error = new String(err);
}
}
Object session = null;
if (reply.containsField("Session")) {
session = reply.get("Session");
}
List<QueryResult> qrs = new LinkedList<>();
BasicBSONList results = (BasicBSONList) reply.get("List");
for (Object result : results) {
qrs.add(bsonToQueryResult((BSONObject) result, null));
}
return new BatchQueryResponse(qrs, session, error);
}
public static QueryResult bsonToQueryResult(BSONObject result, List<Field> fields) {
if (fields == null) {
fields = bsonToFields(result);
}
List<Row> rows = bsonToRows(result, fields);
long rowsAffected = (long) result.get("RowsAffected");
long lastRowId = (long) result.get("InsertId");
return new QueryResult(rows, fields, rowsAffected, lastRowId);
}
public static List<Field> bsonToFields(BSONObject result) {
List<Field> fieldList = new LinkedList<>();
BasicBSONList fields = (BasicBSONList) result.get("Fields");
for (Object field : fields) {
BSONObject fieldBson = (BSONObject) field;
String fieldName = new String((byte[]) fieldBson.get("Name"));
int mysqlType = Ints.checkedCast((Long) fieldBson.get("Type"));
long mysqlFlags = Flag.VT_ZEROVALUE_FLAG.mysqlFlag;
Object flags = fieldBson.get("Flags");
if (flags != null) {
mysqlFlags = (Long) flags;
}
fieldList.add(Field.newFieldFromMysql(fieldName, mysqlType, mysqlFlags));
}
return fieldList;
}
public static List<Row> bsonToRows(BSONObject result, List<Field> fields) {
List<Row> rowList = new LinkedList<>();
BasicBSONList rows = (BasicBSONList) result.get("Rows");
for (Object row : rows) {
LinkedList<Cell> cells = new LinkedList<>();
BasicBSONList cols = (BasicBSONList) row;
Iterator<Field> fieldsIter = fields.iterator();
for (Object col : cols) {
byte[] val = col != null ? (byte[]) col : null;
Field field = fieldsIter.next();
cells.add(field.convertValueToCell(val));
}
rowList.add(new Row(cells));
}
return rowList;
}
public static BSONObject splitQueryRequestToBson(SplitQueryRequest request) {
BSONObject query = new BasicBSONObject();
query.put("Sql", request.getSql());
BSONObject b = new BasicBSONObject();
b.put("Keyspace", request.getKeyspace());
b.put("SplitColumn", request.getSplitColumn());
b.put("Query", query);
b.put("SplitCount", request.getSplitCount());
return b;
}
public static SplitQueryResponse bsonToSplitQueryResponse(BSONObject reply) {
String error = null;
if (reply.containsField("Error")) {
byte[] err = (byte[]) reply.get("Error");
if (err.length > 0) {
error = new String(err);
}
}
RPCError err = null;
if (reply.containsField("Err")) {
BSONObject errBson = (BSONObject) reply.get("Err");
if (errBson != null) {
long code = (long) errBson.get("Code");
String messageString = null;
byte[] message = (byte[]) errBson.get("Message");
if (message.length > 0) {
messageString = new String(message);
}
err = new RPCError(code, messageString);
}
}
BasicBSONList result = (BasicBSONList) reply.get("Splits");
Map<Query, Long> queries = new HashMap<>();
for (Object split : result) {
BSONObject splitObj = (BasicBSONObject) split;
BSONObject query = (BasicBSONObject) (splitObj.get("Query"));
String sql = new String((byte[]) query.get("Sql"));
BSONObject bindVars = (BasicBSONObject) query.get("BindVariables");
List<BindVariable> bindVariables = new LinkedList<>();
for (String key : bindVars.keySet()) {
BindVariable bv = null;
BSONObject val = (BasicBSONObject) bindVars.get(key);
if (val == null) {
bv = BindVariable.forNull(key);
}
int type = (int)val.get("Type");
if (type == 3 /*unsigned long*/) {
bv = BindVariable.forULong(key, (UnsignedLong) val.get("ValueUint"));
}
if (type == 2 /*long*/) {
bv = BindVariable.forLong(key, (Long) val.get("ValueInt"));
}
if (type == 4 /*double*/) {
bv = BindVariable.forDouble(key, (Double) val.get("ValueFloat"));
}
if (type == 1 /*byte[]*/) {
bv = BindVariable.forBytes(key, (byte[]) val.get("ValueBytes"));
}
if (bv == null) {
throw new RuntimeException("invalid bind variable type: " + val.getClass());
}
bindVariables.add(bv);
}
BSONObject keyrangePart = (BasicBSONObject)splitObj.get("KeyRangePart");
String keyspace = new String((byte[]) keyrangePart.get("Keyspace"));
List<KeyRange> keyranges = new ArrayList<>();
for (Object o : (List<?>) keyrangePart.get("KeyRanges")) {
BSONObject keyrange = (BasicBSONObject) o;
String start = Hex.encodeHexString((byte[]) keyrange.get("Start"));
String end = Hex.encodeHexString((byte[]) keyrange.get("End"));
KeyRange kr = new KeyRange(KeyspaceId.valueOf(start), KeyspaceId.valueOf(end));
keyranges.add(kr);
}
Query q = new QueryBuilder(sql, keyspace, "rdonly").setKeyRanges(keyranges)
.setBindVars(bindVariables).setStreaming(true).build();
long size = (long) splitObj.get("Size");
queries.put(q, size);
}
return new SplitQueryResponse(queries, error, err);
}
}

Просмотреть файл

@ -1,146 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient.gorpc;
import com.youtube.vitess.gorpc.Client;
import com.youtube.vitess.gorpc.Exceptions.ApplicationException;
import com.youtube.vitess.gorpc.Exceptions.GoRpcException;
import com.youtube.vitess.gorpc.Response;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Field;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.QueryResponse;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.RPCError;
import com.youtube.vitess.vtgate.SplitQueryRequest;
import com.youtube.vitess.vtgate.SplitQueryResponse;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.bson.BSONObject;
import org.bson.BasicBSONObject;
import java.util.List;
public class GoRpcClient implements RpcClient {
public static final Logger LOGGER = LogManager.getLogger(GoRpcClient.class.getName());
public static final String BSON_RPC_PATH = "/_bson_rpc_";
private Client client;
public GoRpcClient(Client client) {
this.client = client;
}
@Override
public Object begin() throws ConnectionException {
Response response = call("VTGate.Begin", new BasicBSONObject());
return response.getReply();
}
@Override
public QueryResponse execute(Query query) throws ConnectionException {
String callMethod = null;
Response response;
if (query.isStreaming()) {
if (query.getKeyspaceIds() != null) {
callMethod = "VTGate.StreamExecuteKeyspaceIds";
} else {
callMethod = "VTGate.StreamExecuteKeyRanges";
}
response = streamCall(callMethod, Bsonify.queryToBson(query));
} else {
if (query.getKeyspaceIds() != null) {
callMethod = "VTGate.ExecuteKeyspaceIds";
} else {
callMethod = "VTGate.ExecuteKeyRanges";
}
response = call(callMethod, Bsonify.queryToBson(query));
}
return Bsonify.bsonToQueryResponse((BSONObject) response.getReply());
}
@Override
public List<QueryResponse> executeBatchKeyspaceIds(
List<Query> queries, String tabletType, Object session, boolean asTransaction)
throws ConnectionException {
throw new UnsupportedOperationException();
}
@Override
public QueryResult streamNext(List<Field> fields) throws ConnectionException {
Response response;
try {
response = client.streamNext();
} catch (GoRpcException | ApplicationException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
if (response == null) {
return null;
}
BSONObject reply = (BSONObject) response.getReply();
if (reply.containsField("Result")) {
BSONObject result = (BSONObject) reply.get("Result");
return Bsonify.bsonToQueryResult(result, fields);
}
return null;
}
@Override
public void commit(Object session) throws ConnectionException {
call("VTGate.Commit", session);
}
@Override
public void rollback(Object session) throws ConnectionException {
call("VTGate.Rollback", session);
}
@Override
public void close() throws ConnectionException {
try {
client.close();
} catch (GoRpcException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
}
@Override
public SplitQueryResponse splitQuery(SplitQueryRequest request) throws ConnectionException {
String callMethod = "VTGate.SplitQuery";
Response response = call(callMethod, Bsonify.splitQueryRequestToBson(request));
SplitQueryResponse splitQueryResponse = Bsonify.bsonToSplitQueryResponse(
(BSONObject) response.getReply());
try {
RPCError err = splitQueryResponse.getErr();
if (err != null) {
throw new ConnectionException(err.getMessage());
}
} catch (ConnectionException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
return splitQueryResponse;
}
private Response call(String methodName, Object args) throws ConnectionException {
try {
Response response = client.call(methodName, args);
return response;
} catch (GoRpcException | ApplicationException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
}
private Response streamCall(String methodName, Object args) throws ConnectionException {
try {
client.streamCall(methodName, args);
return client.streamNext();
} catch (GoRpcException | ApplicationException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
}
}

Просмотреть файл

@ -1,5 +0,0 @@
log4j.rootLogger=ERROR, consoleAppender
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n

Просмотреть файл

@ -1,46 +0,0 @@
package com.youtube.vitess.vtgate;
import com.youtube.vitess.vtgate.Field.Flag;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.text.ParseException;
@RunWith(JUnit4.class)
public class DateTypesTest {
@Test
public void testDateTimes() throws Exception {
DateTime dt = DateTime.now();
byte[] bytes = BindVariable.forDateTime("", dt).getByteArrayVal();
check(FieldType.VT_TIMESTAMP, dt, bytes);
check(FieldType.VT_DATETIME, dt, bytes);
}
@Test
public void testDate() throws Exception {
DateTime now = DateTime.now();
byte[] bytes = BindVariable.forDate("", now).getByteArrayVal();
DateTime date =
now.withMillisOfSecond(0).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0);
check(FieldType.VT_DATE, date, bytes);
check(FieldType.VT_NEWDATE, date, bytes);
}
@Test
public void testTime() throws Exception {
DateTime now = DateTime.now();
byte[] bytes = BindVariable.forTime("", now).getByteArrayVal();
DateTime time = now.withMillisOfSecond(0).withYear(1970).withMonthOfYear(1).withDayOfMonth(1);
check(FieldType.VT_TIME, time, bytes);
}
private void check(FieldType typeUnderTest, DateTime dt, byte[] bytes) throws ParseException {
Field f = Field.newFieldForTest(typeUnderTest, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(bytes);
Assert.assertEquals(DateTime.class, o.getClass());
Assert.assertEquals(dt, (DateTime) o);
}
}

Просмотреть файл

@ -1,97 +0,0 @@
package com.youtube.vitess.vtgate;
import com.google.common.collect.Lists;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.Field.Flag;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.math.BigDecimal;
import java.util.List;
@RunWith(JUnit4.class)
public class FieldTest {
@Test
public void testDouble() {
List<FieldType> typesToTest = Lists.newArrayList(FieldType.VT_DOUBLE);
String val = "1000.01";
for (FieldType type : typesToTest) {
Field f = Field.newFieldForTest(type, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(Double.class, o.getClass());
Assert.assertEquals(1000.01, ((Double) o).doubleValue(), 0.01);
}
}
@Test
public void testBigDecimal() {
List<FieldType> typesToTest = Lists.newArrayList(FieldType.VT_DECIMAL, FieldType.VT_NEWDECIMAL);
String val = "1000.01";
for (FieldType type : typesToTest) {
Field f = Field.newFieldForTest(type, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(BigDecimal.class, o.getClass());
Assert.assertEquals(1000.01, ((BigDecimal) o).doubleValue(), 0.01);
}
}
@Test
public void testInteger() {
List<FieldType> typesToTest =
Lists.newArrayList(FieldType.VT_TINY, FieldType.VT_SHORT, FieldType.VT_INT24);
String val = "1000";
for (FieldType type : typesToTest) {
Field f = Field.newFieldForTest(type, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(Integer.class, o.getClass());
Assert.assertEquals(1000, ((Integer) o).intValue());
}
}
@Test
public void testLong_LONG() {
String val = "1000";
Field f = Field.newFieldForTest(FieldType.VT_LONG, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(Long.class, o.getClass());
Assert.assertEquals(1000L, ((Long) o).longValue());
}
@Test
public void testLong_LONGLONG() {
String val = "10000000000000";
Field f = Field.newFieldForTest(FieldType.VT_LONGLONG, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(Long.class, o.getClass());
Assert.assertEquals(10000000000000L, ((Long) o).longValue());
}
@Test
public void testLong_LONGLONG_UNSIGNED() {
String val = "10000000000000";
Field f = Field.newFieldForTest(FieldType.VT_LONGLONG, Flag.VT_UNSIGNED_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(UnsignedLong.class, o.getClass());
Assert.assertEquals(10000000000000L, ((UnsignedLong) o).longValue());
}
@Test
public void testNull() {
Field f = Field.newFieldForTest(FieldType.VT_NULL, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(null);
Assert.assertEquals(null, o);
}
@Test
public void testFloat() {
String val = "1000.01";
Field f = Field.newFieldForTest(FieldType.VT_FLOAT, Flag.VT_ZEROVALUE_FLAG);
Object o = f.convert(val.getBytes());
Assert.assertEquals(Float.class, o.getClass());
Assert.assertEquals(1000.01, ((Float) o).floatValue(), 0.1);
}
}

Просмотреть файл

@ -1,67 +0,0 @@
package com.youtube.vitess.vtgate;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
@RunWith(JUnit4.class)
public class QueryBuilderTest {
@Test
public void testValidQueryWithKeyspaceIds() {
String sql = "select 1 from dual";
KeyspaceId kid = KeyspaceId.valueOf("80");
QueryBuilder builder =
new QueryBuilder("select 1 from dual", "test_keyspace", "master").addKeyspaceId(kid);
Query query = builder.build();
Assert.assertEquals(sql, query.getSql());
Assert.assertEquals("test_keyspace", query.getKeyspace());
Assert.assertEquals("master", query.getTabletType());
Assert.assertEquals(null, query.getBindVars());
Assert.assertEquals(1, query.getKeyspaceIds().size());
Assert.assertTrue(Arrays.equals(kid.getBytes(), query.getKeyspaceIds().get(0)));
Assert.assertEquals(null, query.getKeyRanges());
}
@Test
public void testValidQueryWithKeyRanges() {
String sql = "select 1 from dual";
QueryBuilder builder =
new QueryBuilder("select 1 from dual", "test_keyspace", "master").addKeyRange(KeyRange.ALL);
Query query = builder.build();
Assert.assertEquals(sql, query.getSql());
Assert.assertEquals("test_keyspace", query.getKeyspace());
Assert.assertEquals("master", query.getTabletType());
Assert.assertEquals(null, query.getBindVars());
Assert.assertEquals(1, query.getKeyRanges().size());
Assert.assertEquals(KeyRange.ALL.toMap(), query.getKeyRanges().get(0));
Assert.assertEquals(null, query.getKeyspaceIds());
}
@Test
public void testNoKeyspaceIdOrKeyrange() {
QueryBuilder builder = new QueryBuilder("select 1 from dual", "test_keyspace", "master");
try {
builder.build();
Assert.fail("did not raise IllegalStateException");
} catch (IllegalStateException e) {
Assert.assertEquals("query must have either keyspaceIds or keyRanges", e.getMessage());
}
}
@Test
public void testBothKeyspaceIdAndKeyrange() {
QueryBuilder builder = new QueryBuilder("select 1 from dual", "test_keyspace", "master")
.addKeyRange(KeyRange.ALL).addKeyspaceId(KeyspaceId.valueOf("80"));
try {
builder.build();
Assert.fail("did not raise IllegalStateException");
} catch (IllegalStateException e) {
Assert.assertEquals("query cannot have both keyspaceIds and keyRanges", e.getMessage());
}
}
}

Просмотреть файл

@ -1,234 +0,0 @@
package com.youtube.vitess.vtgate.integration;
import com.google.common.collect.Lists;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(JUnit4.class)
public class DataTypesIT {
public static TestEnv testEnv = getTestEnv();
@BeforeClass
public static void setUpVtGate() throws Exception {
Util.setupTestEnv(testEnv);
}
@AfterClass
public static void tearDownVtGate() throws Exception {
Util.teardownTestEnv(testEnv);
}
@Test
public void testInts() throws Exception {
String createTable =
"create table vtocc_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, year year, primary key(tiny)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql =
"insert into vtocc_ints values(:tiny, :tinyu, :small, :smallu, :medium, :mediumu, :normal, :normalu, :big, :bigu, :year)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("tiny", -128))
.addBindVar(BindVariable.forInt("tinyu", 255))
.addBindVar(BindVariable.forInt("small", -32768))
.addBindVar(BindVariable.forInt("smallu", 65535))
.addBindVar(BindVariable.forInt("medium", -8388608))
.addBindVar(BindVariable.forInt("mediumu", 16777215))
.addBindVar(BindVariable.forLong("normal", -2147483648L))
.addBindVar(BindVariable.forLong("normalu", 4294967295L))
.addBindVar(BindVariable.forLong("big", -9223372036854775808L))
.addBindVar(BindVariable.forULong("bigu", UnsignedLong.valueOf("18446744073709551615")))
.addBindVar(BindVariable.forShort("year", (short) 2012))
.addKeyRange(KeyRange.ALL)
.build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
String selectSql = "select * from vtocc_ints where tiny = -128";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
Assert.assertTrue(row.getInt("tiny").equals(-128));
Assert.assertTrue(row.getInt("tinyu").equals(255));
Assert.assertTrue(row.getInt("small").equals(-32768));
Assert.assertTrue(row.getInt("smallu").equals(65535));
Assert.assertTrue(row.getInt("medium").equals(-8388608));
Assert.assertTrue(row.getInt("mediumu").equals(16777215));
Assert.assertTrue(row.getLong("normal").equals(-2147483648L));
Assert.assertTrue(row.getLong("normalu").equals(4294967295L));
Assert.assertTrue(row.getLong("big").equals(-9223372036854775808L));
Assert.assertTrue(row.getULong("bigu").equals(UnsignedLong.valueOf("18446744073709551615")));
Assert.assertTrue(row.getShort("year").equals((short) 2012));
vtgate.close();
}
@Test
public void testFracts() throws Exception {
String createTable =
"create table vtocc_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql = "insert into vtocc_fracts values(:id, :deci, :num, :f, :d)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("id", 1))
.addBindVar(BindVariable.forDouble("deci", 1.99))
.addBindVar(BindVariable.forDouble("num", 2.99))
.addBindVar(BindVariable.forFloat("f", 3.99F))
.addBindVar(BindVariable.forDouble("d", 4.99))
.addKeyRange(KeyRange.ALL)
.build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
String selectSql = "select * from vtocc_fracts where id = 1";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
Assert.assertTrue(row.getLong("id").equals(1L));
Assert.assertTrue(row.getBigDecimal("deci").equals(new BigDecimal("1.99")));
Assert.assertTrue(row.getBigDecimal("num").equals(new BigDecimal("2.99")));
Assert.assertTrue(row.getFloat("f").equals(3.99F));
Assert.assertTrue(row.getDouble("d").equals(4.99));
vtgate.close();
}
@Test
public void testStrings() throws Exception {
String createTable =
"create table vtocc_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql =
"insert into vtocc_strings values(:vb, :c, :vc, :b, :tb, :bl, :ttx, :tx, :en, :s)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forBytes("vb", "a".getBytes()))
.addBindVar(BindVariable.forBytes("c", "b".getBytes()))
.addBindVar(BindVariable.forBytes("vc", "c".getBytes()))
.addBindVar(BindVariable.forBytes("b", "d".getBytes()))
.addBindVar(BindVariable.forBytes("tb", "e".getBytes()))
.addBindVar(BindVariable.forBytes("bl", "f".getBytes()))
.addBindVar(BindVariable.forBytes("ttx", "g".getBytes()))
.addBindVar(BindVariable.forBytes("tx", "h".getBytes()))
.addBindVar(BindVariable.forString("en", "a"))
.addBindVar(BindVariable.forString("s", "a,b"))
.addKeyRange(KeyRange.ALL)
.build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
String selectSql = "select * from vtocc_strings where vb = 'a'";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
Assert.assertTrue(Arrays.equals(row.getBytes("vb"), "a".getBytes()));
Assert.assertTrue(Arrays.equals(row.getBytes("c"), "b".getBytes()));
Assert.assertTrue(Arrays.equals(row.getBytes("vc"), "c".getBytes()));
// binary(4) column will be suffixed with three 0 bytes
Assert.assertTrue(
Arrays.equals(row.getBytes("b"), ByteBuffer.allocate(4).put((byte) 'd').array()));
Assert.assertTrue(Arrays.equals(row.getBytes("tb"), "e".getBytes()));
Assert.assertTrue(Arrays.equals(row.getBytes("bl"), "f".getBytes()));
Assert.assertTrue(Arrays.equals(row.getBytes("ttx"), "g".getBytes()));
Assert.assertTrue(Arrays.equals(row.getBytes("tx"), "h".getBytes()));
// Assert.assertTrue(row.getString("en").equals("a"));
// Assert.assertTrue(row.getString("s").equals("a,b"));
vtgate.close();
}
@Test
public void testMisc() throws Exception {
String createTable =
"create table vtocc_misc(id int, b bit(8), d date, dt datetime, t time, primary key(id)) comment 'vtocc_nocache'\n" + "";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(getQuery(createTable));
vtgate.commit();
String insertSql = "insert into vtocc_misc values(:id, :b, :d, :dt, :t)";
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forInt("id", 1))
.addBindVar(BindVariable.forBytes("b", ByteBuffer.allocate(1).put((byte) 1).array()))
.addBindVar(BindVariable.forDate("d", DateTime.parse("2012-01-01")))
.addBindVar(BindVariable.forDateTime("dt", DateTime.parse("2012-01-01T15:45:45")))
.addBindVar(BindVariable.forTime("t",
DateTime.parse("15:45:45", ISODateTimeFormat.timeElementParser())))
.addKeyRange(KeyRange.ALL)
.build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
String selectSql = "select * from vtocc_misc where id = 1";
Query selectQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(selectQuery);
Assert.assertEquals(1, cursor.getRowsAffected());
Row row = cursor.next();
Assert.assertTrue(row.getLong("id").equals(1L));
Assert.assertTrue(
Arrays.equals(row.getBytes("b"), ByteBuffer.allocate(1).put((byte) 1).array()));
Assert.assertTrue(row.getDateTime("d").equals(DateTime.parse("2012-01-01")));
Assert.assertTrue(row.getDateTime("dt").equals(DateTime.parse("2012-01-01T15:45:45")));
Assert.assertTrue(row.getDateTime("t").equals(
DateTime.parse("15:45:45", ISODateTimeFormat.timeElementParser())));
vtgate.close();
}
private Query getQuery(String sql) {
return new QueryBuilder(sql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
}
/**
* Create env with two shards each having a master and replica
*/
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new HashMap<>();
shardKidMap.put("-", Lists.newArrayList("527875958493693904"));
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("replica", 1);
return env;
}
}

Просмотреть файл

@ -1,135 +0,0 @@
package com.youtube.vitess.vtgate.integration;
import com.google.common.collect.Lists;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Exceptions.IntegrityException;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test failures and exceptions
*/
@RunWith(JUnit4.class)
public class FailuresIT {
public static TestEnv testEnv = getTestEnv();
@BeforeClass
public static void setUpVtGate() throws Exception {
Util.setupTestEnv(testEnv);
}
@AfterClass
public static void tearDownVtGate() throws Exception {
Util.teardownTestEnv(testEnv);
}
@Before
public void createTable() throws Exception {
Util.createTable(testEnv);
}
@Test
public void testIntegrityException() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String insertSql = "insert into vtgate_test(id, keyspace_id) values (:id, :keyspace_id)";
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(0);
Query insertQuery = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("1")))
.addBindVar(BindVariable.forULong("keyspace_id", ((UnsignedLong) kid.getId())))
.addKeyspaceId(kid).build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
vtgate.begin();
try {
vtgate.execute(insertQuery);
Assert.fail("failed to throw exception");
} catch (IntegrityException e) {
} finally {
vtgate.rollback();
vtgate.close();
}
}
@Test
public void testTimeout() throws ConnectionException, DatabaseException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 200, testEnv.getRpcClientFactory());
// Check timeout error raised for slow query
Query sleepQuery = new QueryBuilder("select sleep(0.5) from dual", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
try {
vtgate.execute(sleepQuery);
Assert.fail("did not raise timeout exception");
} catch (ConnectionException e) {
}
vtgate.close();
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 2000, testEnv.getRpcClientFactory());
// Check no timeout error for fast query
sleepQuery = new QueryBuilder("select sleep(0.01) from dual", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
vtgate.execute(sleepQuery);
vtgate.close();
}
@Test
public void testTxPoolFull() throws Exception {
List<VtGate> vtgates = new ArrayList<>();
boolean failed = false;
try {
// Transaction cap is 20
for (int i = 0; i < 25; i++) {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgates.add(vtgate);
vtgate.begin();
// Run a query to actually begin a transaction with the tablets
Query query = new QueryBuilder("delete from vtgate_test", testEnv.getKeyspace(), "master")
.addKeyRange(KeyRange.ALL).build();
vtgate.execute(query);
}
} catch (DatabaseException e) {
if (e.getMessage().contains("tx_pool_full")) {
failed = true;
}
} finally {
for (VtGate vtgate : vtgates) {
vtgate.close();
}
}
if (!failed) {
Assert.fail("failed to raise tx_pool_full exception");
}
}
/**
* Create env with two shards each having a master and replica
*/
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new HashMap<>();
shardKidMap.put("-", Lists.newArrayList("527875958493693904"));
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("replica", 1);
return env;
}
}

Просмотреть файл

@ -1,57 +0,0 @@
package com.youtube.vitess.vtgate.integration;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class StreamingServerShutdownIT {
static TestEnv testEnv = VtGateIT.getTestEnv();
@Before
public void setUpVtGate() throws Exception {
Util.setupTestEnv(testEnv);
Util.createTable(testEnv);
}
@After
public void tearDownVtGate() throws Exception {
Util.teardownTestEnv(testEnv);
}
@Test
public void testShutdownServerWhileStreaming() throws Exception {
Util.insertRows(testEnv, 1, 2000);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select A.* from vtgate_test A join vtgate_test B";
Query joinQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
Cursor cursor = vtgate.execute(joinQuery);
int count = 0;
try {
while (cursor.hasNext()) {
count++;
if (count == 1) {
Util.teardownTestEnv(testEnv);
}
cursor.next();
}
vtgate.close();
Assert.fail("failed to raise exception");
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().length() > 0);
}
}
}

Просмотреть файл

@ -1,146 +0,0 @@
package com.youtube.vitess.vtgate.integration;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.StreamCursor;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Collections;
import java.util.List;
/**
* Test cases for streaming queries in VtGate
*
*/
@RunWith(JUnit4.class)
public class StreamingVtGateIT {
public static TestEnv testEnv = VtGateIT.getTestEnv();
@BeforeClass
public static void setUpVtGate() throws Exception {
Util.setupTestEnv(testEnv);
}
@AfterClass
public static void tearDownVtGate() throws Exception {
Util.teardownTestEnv(testEnv);
}
@Before
public void createTable() throws Exception {
Util.createTable(testEnv);
}
@Test
public void testStreamCursorType() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(StreamCursor.class, cursor.getClass());
vtgate.close();
}
/**
* Test StreamExecuteKeyspaceIds query on a single shard
*/
@Test
public void testStreamExecuteKeyspaceIds() throws Exception {
int rowCount = 10;
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
for (String shardName : testEnv.getShardKidMap().keySet()) {
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getKeyspaceIds(shardName)).setStreaming(true).build();
Cursor cursor = vtgate.execute(query);
int count = Iterables.size(cursor);
Assert.assertEquals((int) Math.pow(rowCount, 3), count);
}
vtgate.close();
}
/**
* Same as testStreamExecuteKeyspaceIds but for StreamExecuteKeyRanges
*/
@Test
public void testStreamExecuteKeyRanges() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
int rowCount = 10;
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
KeyRange kr = new KeyRange(Collections.min(kids), Collections.max(kids));
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(kr)
.setStreaming(true).build();
Cursor cursor = vtgate.execute(query);
int count = Iterables.size(cursor);
Assert.assertEquals((int) Math.pow(rowCount, 3), count);
}
vtgate.close();
}
/**
* Test scatter streaming queries fetch rows from all shards
*/
@Test
public void testScatterStreamingQuery() throws Exception {
int rowCount = 10;
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowCount);
}
String selectSql = "select A.* from vtgate_test A join vtgate_test B join vtgate_test C";
Query query = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).setStreaming(true).build();
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Cursor cursor = vtgate.execute(query);
int count = Iterables.size(cursor);
Assert.assertEquals(2 * (int) Math.pow(rowCount, 3), count);
vtgate.close();
}
@Test
@Ignore("currently failing as vtgate doesn't set the error")
public void testStreamingWrites() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String insertSql = "insert into vtgate_test " + "(id, name, age, percent, keyspace_id) "
+ "values (:id, :name, :age, :percent, :keyspace_id)";
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(0);
Query query = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + 1)))
.addBindVar(BindVariable.forString("name", ("name_" + 1)))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))
.addKeyspaceId(kid)
.setStreaming(true)
.build();
vtgate.execute(query);
vtgate.commit();
vtgate.close();
}
}

Просмотреть файл

@ -1,370 +0,0 @@
package com.youtube.vitess.vtgate.integration;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.CursorImpl;
import com.youtube.vitess.vtgate.integration.util.Util;
import org.apache.commons.codec.binary.Hex;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@RunWith(JUnit4.class)
public class VtGateIT {
public static TestEnv testEnv = getTestEnv();
@BeforeClass
public static void setUpVtGate() throws Exception {
Util.setupTestEnv(testEnv);
}
@AfterClass
public static void tearDownVtGate() throws Exception {
Util.teardownTestEnv(testEnv);
}
@Before
public void createTable() throws Exception {
Util.createTable(testEnv);
}
/**
* Test selects using ExecuteKeyspaceIds
*/
@Test
public void testExecuteKeyspaceIds() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
// Ensure empty table
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(CursorImpl.class, cursor.getClass());
Assert.assertEquals(0, cursor.getRowsAffected());
Assert.assertEquals(0, cursor.getLastRowId());
Assert.assertFalse(cursor.hasNext());
vtgate.close();
// Insert 10 rows
Util.insertRows(testEnv, 1000, 10);
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(10, cursor.getRowsAffected());
Assert.assertEquals(0, cursor.getLastRowId());
Assert.assertTrue(cursor.hasNext());
// Fetch all rows from the first shard
KeyspaceId firstKid = testEnv.getAllKeyspaceIds().get(0);
Query query =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyspaceId(firstKid).build();
cursor = vtgate.execute(query);
// Check field types and values
Row row = cursor.next();
Cell idCell = row.next();
Assert.assertEquals("id", idCell.getName());
Assert.assertEquals(Long.class, idCell.getType());
Long id = row.getLong(idCell.getName());
Cell nameCell = row.next();
Assert.assertEquals("name", nameCell.getName());
Assert.assertEquals(byte[].class, nameCell.getType());
Assert.assertTrue(
Arrays.equals(("name_" + id.toString()).getBytes(), row.getBytes(nameCell.getName())));
Cell ageCell = row.next();
Assert.assertEquals("age", ageCell.getName());
Assert.assertEquals(Integer.class, ageCell.getType());
Assert.assertEquals(Integer.valueOf(2 * id.intValue()), row.getInt(ageCell.getName()));
vtgate.close();
}
/**
* Test queries are routed to the right shard based on based on keyspace ids
*/
@Test
public void testQueryRouting() throws Exception {
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, 10);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String allRowsSql = "select * from vtgate_test";
for (String shardName : testEnv.getShardKidMap().keySet()) {
Query shardRows = new QueryBuilder(allRowsSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getKeyspaceIds(shardName)).build();
Cursor cursor = vtgate.execute(shardRows);
Assert.assertEquals(10, cursor.getRowsAffected());
}
vtgate.close();
}
@Test
public void testDateFieldTypes() throws Exception {
DateTime dt = DateTime.now().minusDays(2).withMillisOfSecond(0);
Util.insertRows(testEnv, 10, 1, dt);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Query allRowsQuery = new QueryBuilder("select * from vtgate_test", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build();
Row row = vtgate.execute(allRowsQuery).next();
Assert.assertTrue(dt.equals(row.getDateTime("timestamp_col")));
Assert.assertTrue(dt.equals(row.getDateTime("datetime_col")));
Assert.assertTrue(dt.withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0)
.equals(row.getDateTime("date_col")));
Assert.assertTrue(
dt.withYear(1970).withMonthOfYear(1).withDayOfMonth(1).equals(row.getDateTime("time_col")));
vtgate.close();
}
/**
* Test ALL keyrange fetches rows from all shards
*/
@Test
public void testAllKeyRange() throws Exception {
// Insert 10 rows across the shards
Util.insertRows(testEnv, 1000, 10);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(allRowsQuery);
// Verify all rows returned
Assert.assertEquals(10, cursor.getRowsAffected());
vtgate.close();
}
/**
* Test reads using Keyrange query
*/
@Test
public void testKeyRangeReads() throws Exception {
int rowsPerShard = 10;
// insert rows in each shard using ExecuteKeyspaceIds
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
// Check ALL KeyRange query returns rows from both shards
Query allRangeQuery =
new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(KeyRange.ALL).build();
Cursor cursor = vtgate.execute(allRangeQuery);
Assert.assertEquals(rowsPerShard * 2, cursor.getRowsAffected());
// Check KeyRange query limited to a single shard returns 10 rows each
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> shardKids = testEnv.getKeyspaceIds(shardName);
KeyspaceId minKid = Collections.min(shardKids);
KeyspaceId maxKid = Collections.max(shardKids);
KeyRange shardKeyRange = new KeyRange(minKid, maxKid);
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(
shardKeyRange).build();
cursor = vtgate.execute(shardRangeQuery);
Assert.assertEquals(rowsPerShard, cursor.getRowsAffected());
}
// Now make a cross-shard KeyRange and check all rows are returned
Iterator<String> shardNameIter = testEnv.getShardKidMap().keySet().iterator();
KeyspaceId kidShard1 = testEnv.getKeyspaceIds(shardNameIter.next()).get(2);
KeyspaceId kidShard2 = testEnv.getKeyspaceIds(shardNameIter.next()).get(2);
KeyRange crossShardKeyrange;
if (kidShard1.compareTo(kidShard2) < 0) {
crossShardKeyrange = new KeyRange(kidShard1, kidShard2);
} else {
crossShardKeyrange = new KeyRange(kidShard2, kidShard1);
}
Query shardRangeQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").addKeyRange(
crossShardKeyrange).build();
cursor = vtgate.execute(shardRangeQuery);
Assert.assertEquals(rowsPerShard * 2, cursor.getRowsAffected());
vtgate.close();
}
/**
* Test inserts using KeyRange query
*/
@Test
public void testKeyRangeWrites() throws Exception {
Random random = new Random();
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String sql = "insert into vtgate_test " + "(id, name, keyspace_id, age) "
+ "values (:id, :name, :keyspace_id, :age)";
int count = 20;
// Insert 20 rows per shard
for (String shardName : testEnv.getShardKidMap().keySet()) {
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
KeyspaceId minKid = Collections.min(kids);
KeyspaceId maxKid = Collections.max(kids);
KeyRange kr = new KeyRange(minKid, maxKid);
for (int i = 0; i < count; i++) {
KeyspaceId kid = kids.get(i % kids.size());
Query query = new QueryBuilder(sql, testEnv.getKeyspace(), "master")
.addBindVar(
BindVariable.forULong("id", UnsignedLong.valueOf("" + Math.abs(random.nextInt()))))
.addBindVar(BindVariable.forString("name", ("name_" + i)))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))
.addBindVar(BindVariable.forNull("age"))
.addKeyRange(kr)
.build();
vtgate.execute(query);
}
}
vtgate.commit();
vtgate.close();
// Check 40 rows exist in total
vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
String selectSql = "select * from vtgate_test";
Query allRowsQuery = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build();
Cursor cursor = vtgate.execute(allRowsQuery);
Assert.assertEquals(count * 2, cursor.getRowsAffected());
// Check 20 rows exist per shard
for (String shardName : testEnv.getShardKidMap().keySet()) {
Query shardRows = new QueryBuilder(selectSql, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getKeyspaceIds(shardName)).build();
cursor = vtgate.execute(shardRows);
Assert.assertEquals(count, cursor.getRowsAffected());
}
vtgate.close();
}
@Test
public void testSplitQuery() throws Exception {
// Insert 20 rows per shard
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, 20);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", 1, "");
vtgate.close();
// Verify 2 splits, one per shard
Assert.assertEquals(2, queries.size());
Set<String> shardsInSplits = new HashSet<>();
for (Query q : queries.keySet()) {
Assert.assertEquals("select id,keyspace_id from vtgate_test", q.getSql());
Assert.assertEquals("test_keyspace", q.getKeyspace());
Assert.assertEquals("rdonly", q.getTabletType());
Assert.assertEquals(0, q.getBindVars().size());
Assert.assertEquals(null, q.getKeyspaceIds());
String start = Hex.encodeHexString(q.getKeyRanges().get(0).get("Start"));
String end = Hex.encodeHexString(q.getKeyRanges().get(0).get("End"));
shardsInSplits.add(start + "-" + end);
}
// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.getShardKidMap().keySet()));
}
@Test
public void testSplitQueryMultipleSplitsPerShard() throws Exception {
int rowCount = 30;
Util.insertRows(testEnv, 1, 30);
Map<String, List<BindVariable>> expectedSqls = Maps.newHashMap();
expectedSqls.put("select id, keyspace_id from vtgate_test where id < :_splitquery_end",
Lists.newArrayList(BindVariable.forInt("_splitquery_end", 10)));
expectedSqls.put(
"select id, keyspace_id from vtgate_test where id >= :_splitquery_start "
+ "and id < :_splitquery_end",
Lists.newArrayList(BindVariable.forInt("_splitquery_start", 10),
BindVariable.forInt("_splitquery_end", 19)));
expectedSqls.put("select id, keyspace_id from vtgate_test where id >= :_splitquery_start",
Lists.newArrayList(BindVariable.forInt("_splitquery_start", 19)));
Util.waitForTablet("rdonly", rowCount, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
int splitCount = 6;
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", splitCount, "");
vtgate.close();
// Verify 6 splits, 3 per shard
Assert.assertEquals(splitCount, queries.size());
Set<String> shardsInSplits = new HashSet<>();
for (Query q : queries.keySet()) {
String sql = q.getSql();
List<BindVariable> bindVars = expectedSqls.get(sql);
Assert.assertNotNull(bindVars);
Assert.assertEquals("test_keyspace", q.getKeyspace());
Assert.assertEquals("rdonly", q.getTabletType());
Assert.assertEquals(bindVars.size(), q.getBindVars().size());
Assert.assertEquals(null, q.getKeyspaceIds());
String start = Hex.encodeHexString(q.getKeyRanges().get(0).get("Start"));
String end = Hex.encodeHexString(q.getKeyRanges().get(0).get("End"));
shardsInSplits.add(start + "-" + end);
}
// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.getShardKidMap().keySet()));
}
@Test
public void testSplitQueryInvalidTable() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
try {
vtgate.splitQuery("test_keyspace", "select id from invalid_table", 1, "");
Assert.fail("failed to raise connection exception");
} catch (ConnectionException e) {
Assert.assertTrue(
e.getMessage().contains("query validation error: can't find table in schema"));
} finally {
vtgate.close();
}
}
/**
* Create env with two shards each having a master and replica
*/
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new LinkedHashMap<>();
shardKidMap.put("80-",
Lists.newArrayList("9767889778372766922", "9742070682920810358", "10296850775085416642"));
shardKidMap.put("-80",
Lists.newArrayList("527875958493693904", "626750931627689502", "345387386794260318"));
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("rdonly", 1);
return env;
}
}

Просмотреть файл

@ -1,251 +0,0 @@
package com.youtube.vitess.vtgate.integration.hadoop;
import com.google.common.collect.Lists;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.youtube.vitess.vtgate.Exceptions.InvalidFieldException;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.hadoop.VitessInputFormat;
import com.youtube.vitess.vtgate.hadoop.writables.KeyspaceIdWritable;
import com.youtube.vitess.vtgate.hadoop.writables.RowWritable;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;
import com.youtube.vitess.vtgate.hadoop.utils.GsonAdapters;
import junit.extensions.TestSetup;
import junit.framework.TestSuite;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Integration tests for MapReductions in Vitess. These tests use an in-process Hadoop cluster via
* {@link HadoopTestCase}. These tests are JUnit3 style because of this dependency. Vitess setup for
* these tests require at least one rdonly instance per shard.
*
*/
public class MapReduceIT extends HadoopTestCase {
public static TestEnv testEnv = getTestEnv();
private static Path outputPath = new Path("/tmp");
public MapReduceIT() throws IOException {
super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
}
@Override
public void setUp() throws Exception {
super.setUp();
Util.createTable(testEnv);
}
/**
* Run a mapper only MR job and verify all the rows in the source table were outputted into HDFS.
*/
public void testDumpTableToHDFS() throws Exception {
// Insert 20 rows per shard
int rowsPerShard = 20;
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
// Configurations for the job, output from mapper as Text
Configuration conf = createJobConf();
Job job = new Job(conf);
job.setJobName("table");
job.setJarByClass(VitessInputFormat.class);
job.setMapperClass(TableMapper.class);
VitessInputFormat.setInput(job,
"localhost:" + testEnv.getPort(),
testEnv.getKeyspace(),
"select keyspace_id, name from vtgate_test",
4,
testEnv.getRpcClientFactory().getClass());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(RowWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
Path outDir = new Path(outputPath, "mrvitess/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
FileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
String[] outputLines = MapReduceTestUtil.readOutput(outDir, conf).split("\n");
// there should be one line per row in the source table
assertEquals(rowsPerShard * 2, outputLines.length);
Set<String> actualKids = new HashSet<>();
Set<String> actualNames = new HashSet<>();
// Rows are keyspace ids are written as JSON since this is
// TextOutputFormat. Parse and verify we've gotten all the keyspace
// ids and rows.
Gson gson = new GsonBuilder()
.registerTypeHierarchyAdapter(byte[].class, GsonAdapters.BYTE_ARRAY)
.registerTypeAdapter(UnsignedLong.class, GsonAdapters.UNSIGNED_LONG)
.registerTypeAdapter(Class.class, GsonAdapters.CLASS)
.create();
for (String line : outputLines) {
String kidJson = line.split("\t")[0];
Map<String, String> m = new HashMap<>();
m = gson.fromJson(kidJson, m.getClass());
actualKids.add(m.get("id"));
String rowJson = line.split("\t")[1];
Map<String, Map<String, Map<String, String>>> map = new HashMap<>();
map = gson.fromJson(rowJson, map.getClass());
actualNames.add(
new String(Base64.decodeBase64(map.get("contents").get("name").get("value"))));
}
Set<String> expectedKids = new HashSet<>();
for (KeyspaceId kid : testEnv.getAllKeyspaceIds()) {
expectedKids.add(((UnsignedLong) kid.getId()).toString());
}
assertEquals(expectedKids.size(), actualKids.size());
assertTrue(actualKids.containsAll(expectedKids));
Set<String> expectedNames = new HashSet<>();
for (int i = 1; i <= rowsPerShard; i++) {
expectedNames.add("name_" + i);
}
assertEquals(rowsPerShard, actualNames.size());
assertTrue(actualNames.containsAll(expectedNames));
}
/**
* Map all rows and aggregate by keyspace id at the reducer.
*/
public void testReducerAggregateRows() throws Exception {
int rowsPerShard = 20;
for (String shardName : testEnv.getShardKidMap().keySet()) {
Util.insertRowsInShard(testEnv, shardName, rowsPerShard);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
Configuration conf = createJobConf();
Job job = new Job(conf);
job.setJobName("table");
job.setJarByClass(VitessInputFormat.class);
job.setMapperClass(TableMapper.class);
VitessInputFormat.setInput(job,
"localhost:" + testEnv.getPort(),
testEnv.getKeyspace(),
"select keyspace_id, name from vtgate_test",
1,
testEnv.getRpcClientFactory().getClass());
job.setMapOutputKeyClass(KeyspaceIdWritable.class);
job.setMapOutputValueClass(RowWritable.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outDir = new Path(outputPath, "mrvitess/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
FileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
String[] outputLines = MapReduceTestUtil.readOutput(outDir, conf).split("\n");
assertEquals(6, outputLines.length);
int totalRowsReduced = 0;
for (String line : outputLines) {
totalRowsReduced += Integer.parseInt(line);
}
assertEquals(rowsPerShard * 2, totalRowsReduced);
}
public static class TableMapper extends
Mapper<NullWritable, RowWritable, KeyspaceIdWritable, RowWritable> {
@Override
public void map(NullWritable key, RowWritable value, Context context) throws IOException,
InterruptedException {
try {
KeyspaceId id = new KeyspaceId();
id.setId(value.getRow().getULong("keyspace_id"));
context.write(new KeyspaceIdWritable(id), value);
} catch (InvalidFieldException e) {
throw new IOException(e);
}
}
}
public static class CountReducer extends
Reducer<KeyspaceIdWritable, RowWritable, NullWritable, LongWritable> {
@Override
public void reduce(KeyspaceIdWritable key, Iterable<RowWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
Iterator<RowWritable> iter = values.iterator();
while (iter.hasNext()) {
count++;
iter.next();
}
context.write(NullWritable.get(), new LongWritable(count));
}
}
/**
* Create env with two shards each having a master and rdonly
*/
static TestEnv getTestEnv() {
Map<String, List<String>> shardKidMap = new HashMap<>();
shardKidMap.put("-80",
Lists.newArrayList("527875958493693904", "626750931627689502", "345387386794260318"));
shardKidMap.put("80-",
Lists.newArrayList("9767889778372766922", "9742070682920810358", "10296850775085416642"));
TestEnv env = Util.getTestEnv(shardKidMap, "test_keyspace");
env.addTablet("rdonly", 1);
return env;
}
public static TestSetup suite() {
return new TestSetup(new TestSuite(MapReduceIT.class)) {
@Override
protected void setUp() throws Exception {
Util.setupTestEnv(testEnv);
}
@Override
protected void tearDown() throws Exception {
Util.teardownTestEnv(testEnv);
}
};
}
}

Просмотреть файл

@ -1,188 +0,0 @@
package com.youtube.vitess.vtgate.integration.util;
import com.google.common.primitives.UnsignedLong;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.Query.QueryBuilder;
import com.youtube.vitess.vtgate.TestEnv;
import com.youtube.vitess.vtgate.VtGate;
import com.youtube.vitess.vtgate.cursor.Cursor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.junit.Assert;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
public class Util {
static final Logger logger = LogManager.getLogger(Util.class.getName());
public static final String PROPERTY_KEY_VTGATE_TEST_ENV = "vtgate.test.env";
/**
* Setup MySQL, Vttablet and VtGate instances required for the tests. This uses a Python helper
* script to start and stop instances.
*/
public static void setupTestEnv(TestEnv testEnv) throws Exception {
ProcessBuilder pb = new ProcessBuilder(testEnv.getSetupCommand());
pb.redirectErrorStream(true);
Process p = pb.start();
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
// The port for VtGate is dynamically assigned and written to
// stdout as a JSON string.
String line;
while ((line = br.readLine()) != null) {
logger.info("java_vtgate_test_helper: " + line);
if (!line.startsWith("{")) {
continue;
}
try {
Type mapType = new TypeToken<Map<String, Integer>>() {}.getType();
Map<String, Integer> map = new Gson().fromJson(line, mapType);
testEnv.setPythonScriptProcess(p);
testEnv.setPort(map.get("port"));
return;
} catch (JsonSyntaxException e) {
logger.error("JsonSyntaxException parsing setup command output: " + line, e);
}
}
Assert.fail("setup script failed to parse vtgate port");
}
/**
* Teardown the test instances, if any.
*/
public static void teardownTestEnv(TestEnv testEnv) throws Exception {
Process process = testEnv.getPythonScriptProcess();
if (process != null) {
logger.info("sending empty line to java_vtgate_test_helper to stop test setup");
process.getOutputStream().write("\n".getBytes());
process.getOutputStream().flush();
process.waitFor();
testEnv.setPythonScriptProcess(null);
}
}
public static void insertRows(TestEnv testEnv, int startId, int count) throws ConnectionException,
DatabaseException {
insertRows(testEnv, startId, count, new DateTime());
}
public static void insertRows(TestEnv testEnv, int startId, int count, DateTime dateTime)
throws ConnectionException, DatabaseException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String insertSql = "insert into vtgate_test "
+ "(id, name, age, percent, datetime_col, timestamp_col, date_col, time_col, keyspace_id) "
+ "values (:id, :name, :age, :percent, :datetime_col, :timestamp_col, :date_col, :time_col, :keyspace_id)";
for (int i = startId; i < startId + count; i++) {
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(i % testEnv.getAllKeyspaceIds().size());
Query query = new QueryBuilder(insertSql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + i)))
.addBindVar(BindVariable.forBytes("name", ("name_" + i).getBytes()))
.addBindVar(BindVariable.forInt("age", i * 2))
.addBindVar(BindVariable.forDouble("percent", new Double(i / 100.0)))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))
.addBindVar(BindVariable.forDateTime("datetime_col", dateTime))
.addBindVar(BindVariable.forDateTime("timestamp_col", dateTime))
.addBindVar(BindVariable.forDate("date_col", dateTime))
.addBindVar(BindVariable.forTime("time_col", dateTime))
.addKeyspaceId(kid)
.build();
vtgate.execute(query);
}
vtgate.commit();
vtgate.close();
}
/**
* Insert rows to a specific shard using ExecuteKeyspaceIds
*/
public static void insertRowsInShard(TestEnv testEnv, String shardName, int count)
throws DatabaseException, ConnectionException {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
String sql = "insert into vtgate_test " + "(id, name, keyspace_id) "
+ "values (:id, :name, :keyspace_id)";
List<KeyspaceId> kids = testEnv.getKeyspaceIds(shardName);
for (int i = 1; i <= count; i++) {
KeyspaceId kid = kids.get(i % kids.size());
Query query = new QueryBuilder(sql, testEnv.getKeyspace(), "master")
.addBindVar(BindVariable.forULong("id", UnsignedLong.valueOf("" + i)))
.addBindVar(BindVariable.forBytes("name", ("name_" + i).getBytes()))
.addBindVar(BindVariable.forULong("keyspace_id", (UnsignedLong) kid.getId()))
.addKeyspaceId(kid)
.build();
vtgate.execute(query);
}
vtgate.commit();
vtgate.close();
}
public static void createTable(TestEnv testEnv) throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
vtgate.begin();
vtgate.execute(new QueryBuilder("drop table if exists vtgate_test", testEnv.getKeyspace(), "master")
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build());
String createTable = "create table vtgate_test (id bigint auto_increment,"
+ " name varchar(64), age SMALLINT, percent DECIMAL(5,2),"
+ " keyspace_id bigint(20) unsigned NOT NULL, datetime_col DATETIME,"
+ " timestamp_col TIMESTAMP, date_col DATE, time_col TIME, primary key (id))"
+ " Engine=InnoDB";
vtgate.execute(new QueryBuilder(createTable, testEnv.getKeyspace(), "master").setKeyspaceIds(
testEnv.getAllKeyspaceIds()).build());
vtgate.commit();
vtgate.close();
}
/**
* Wait until the specified tablet type has received at least rowCount rows in vtgate_test from
* the master. If the criteria isn't met after the specified number of attempts raise an
* exception.
*/
public static void waitForTablet(String tabletType, int rowCount, int attempts, TestEnv testEnv)
throws Exception {
String sql = "select * from vtgate_test";
VtGate vtgate = VtGate.connect("localhost:" + testEnv.getPort(), 0, testEnv.getRpcClientFactory());
for (int i = 0; i < attempts; i++) {
try {
Cursor cursor = vtgate.execute(new QueryBuilder(sql, testEnv.getKeyspace(), tabletType)
.setKeyspaceIds(testEnv.getAllKeyspaceIds()).build());
if (cursor.getRowsAffected() >= rowCount) {
vtgate.close();
return;
}
} catch (DatabaseException e) {
}
Thread.sleep(1000);
}
vtgate.close();
throw new Exception(tabletType + " fails to catch up");
}
public static TestEnv getTestEnv(Map<String, List<String>> shardKidMap, String keyspace) {
String testEnvClass = System.getProperty(PROPERTY_KEY_VTGATE_TEST_ENV);
try {
Class<?> clazz = Class.forName(testEnvClass);
TestEnv env = (TestEnv)clazz.newInstance();
env.setKeyspace(keyspace);
env.setShardKidMap(shardKidMap);
return env;
} catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) {
throw new RuntimeException(e);
}
}
}

Просмотреть файл

@ -1,105 +0,0 @@
package com.youtube.vitess.vtgate.rpcclient.gorpc;
import com.google.common.primitives.UnsignedLong;
import com.youtube.vitess.vtgate.Exceptions.InvalidFieldException;
import com.youtube.vitess.vtgate.Field.Flag;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.CursorImpl;
import com.youtube.vitess.vtgate.FieldType;
import org.bson.BSONObject;
import org.bson.BasicBSONObject;
import org.bson.types.BasicBSONList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.math.BigDecimal;
@RunWith(JUnit4.class)
public class BsonifyTest {
@Test
public void testResultParse() throws InvalidFieldException {
BSONObject result = new BasicBSONObject();
result.put("RowsAffected", 12L);
result.put("InsertId", 12345L);
BasicBSONList fields = new BasicBSONList();
fields.add(newField("col_0", FieldType.VT_DECIMAL, Flag.VT_ZEROVALUE_FLAG));
fields.add(newField("col_1", FieldType.VT_TINY, Flag.VT_ZEROVALUE_FLAG));
fields.add(newField("col_2", FieldType.VT_SHORT, Flag.VT_ZEROVALUE_FLAG));
fields.add(newField("col_3", FieldType.VT_LONG, Flag.VT_ZEROVALUE_FLAG));
fields.add(newField("col_4", FieldType.VT_LONGLONG, Flag.VT_ZEROVALUE_FLAG));
fields.add(newField("col_5", FieldType.VT_LONGLONG, Flag.VT_UNSIGNED_FLAG));
result.put("Fields", fields);
// Fill each column with the following different values: 0, 1, 2
BasicBSONList rows = new BasicBSONList();
int rowCount = 2;
for (int i = 0; i <= rowCount; i++) {
BasicBSONList row = new BasicBSONList();
row.add(new Double(i).toString().getBytes());
row.add(String.valueOf(i).getBytes());
row.add(String.valueOf(i).getBytes());
row.add(new Long(i).toString().getBytes());
row.add(new Long(i).toString().getBytes());
row.add(new Long(i).toString().getBytes());
Assert.assertEquals(fields.size(), row.size());
rows.add(row);
}
result.put("Rows", rows);
QueryResult qr = Bsonify.bsonToQueryResult(result, null);
Cursor cursor = new CursorImpl(qr);
Assert.assertEquals(12L, cursor.getRowsAffected());
Assert.assertEquals(12345L, cursor.getLastRowId());
for (int i = 0; i <= rowCount; i++) {
Row row = cursor.next();
Cell cell0 = row.next();
Assert.assertEquals("col_0", cell0.getName());
Assert.assertEquals(BigDecimal.class, cell0.getType());
Assert.assertEquals(new BigDecimal(String.format("%d.0", i)), row.getBigDecimal(cell0.getName()));
Cell cell1 = row.next();
Assert.assertEquals("col_1", cell1.getName());
Assert.assertEquals(Integer.class, cell1.getType());
Assert.assertEquals(new Integer(i), row.getInt(cell1.getName()));
Cell cell2 = row.next();
Assert.assertEquals("col_2", cell2.getName());
Assert.assertEquals(Integer.class, cell2.getType());
Assert.assertEquals(new Integer(i), row.getInt(cell2.getName()));
Cell cell3 = row.next();
Assert.assertEquals("col_3", cell3.getName());
Assert.assertEquals(Long.class, cell3.getType());
Assert.assertEquals(new Long(i), row.getLong(cell3.getName()));
Cell cell4 = row.next();
Assert.assertEquals("col_4", cell4.getName());
Assert.assertEquals(Long.class, cell4.getType());
Assert.assertEquals(new Long(i), row.getLong(cell4.getName()));
Cell cell5 = row.next();
Assert.assertEquals("col_5", cell5.getName());
Assert.assertEquals(UnsignedLong.class, cell5.getType());
Assert.assertEquals(UnsignedLong.valueOf(String.format("%d", i)), row.getULong(cell5.getName()));
}
// No more rows left.
Assert.assertFalse(cursor.hasNext());
}
private BSONObject newField(String name, FieldType fieldType, Flag flag) {
BSONObject field = new BasicBSONObject();
field.put("Name", name.getBytes());
field.put("Type", (long) fieldType.mysqlType);
field.put("Flags", (long) flag.mysqlFlag);
return field;
}
}

Просмотреть файл

@ -1,6 +0,0 @@
log4j.rootLogger=INFO, consoleAppender
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
OFF