Merge pull request #1137 from enisoc/java-client

Add Cursor to new Java client and use SQLException
This commit is contained in:
Anthony Yeh 2015-09-24 18:42:43 -07:00
Родитель 1d5b551660 22c34d66f5
Коммит ca2e7b7068
16 изменённых файлов: 858 добавлений и 281 удалений

Просмотреть файл

@ -65,7 +65,7 @@ func echoQueryResult(vals map[string]interface{}) *mproto.QueryResult {
var row []sqltypes.Value
for k, v := range vals {
qr.Fields = append(qr.Fields, mproto.Field{Name: k})
qr.Fields = append(qr.Fields, mproto.Field{Name: k, Type: mproto.VT_VARCHAR})
val := reflect.ValueOf(v)
if val.Kind() == reflect.Map {

Просмотреть файл

@ -1,18 +1,24 @@
package com.youtube.vitess.client;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
import com.google.protobuf.ByteString;
import com.youtube.vitess.client.cursor.Cursor;
import com.youtube.vitess.client.cursor.SimpleCursor;
import com.youtube.vitess.proto.Query.BindVariable;
import com.youtube.vitess.proto.Query.BoundQuery;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
import com.youtube.vitess.proto.Vtgate.BoundShardQuery;
import com.youtube.vitess.proto.Vtgate.ExecuteEntityIdsRequest.EntityId;
import com.youtube.vitess.proto.Vtrpc.RPCError;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
@ -20,11 +26,11 @@ import java.util.Map;
*/
public class Proto {
/**
* checkError raises the proper VitessException for an error returned by VTGate.
* checkError raises the proper SQLException for an error returned by VTGate.
*
* @param error
*/
public static void checkError(RPCError error) throws VitessException {
public static void checkError(RPCError error) throws SQLException {
// TODO(enisoc): Implement checkError.
}
@ -157,8 +163,7 @@ public class Proto {
* bindQuery creates a BoundQuery from query and vars.
*/
public static BoundQuery bindQuery(String query, Map<String, ?> vars) {
BoundQuery.Builder boundQueryBuilder =
BoundQuery.newBuilder().setSql(query);
BoundQuery.Builder boundQueryBuilder = BoundQuery.newBuilder().setSql(query);
if (vars != null) {
Map<String, BindVariable> bindVars = boundQueryBuilder.getMutableBindVariables();
for (Map.Entry<String, ?> entry : vars.entrySet()) {
@ -208,6 +213,14 @@ public class Proto {
return bindKeyspaceIdQuery(keyspace, keyspaceIds, bindQuery(query, vars));
}
public static List<Cursor> toCursorList(List<QueryResult> queryResults) {
ImmutableList.Builder<Cursor> builder = new ImmutableList.Builder<Cursor>();
for (QueryResult queryResult : queryResults) {
builder.add(new SimpleCursor(queryResult));
}
return builder.build();
}
public static final Function<byte[], ByteString> BYTE_ARRAY_TO_BYTE_STRING =
new Function<byte[], ByteString>() {
@Override

Просмотреть файл

@ -31,70 +31,68 @@ import com.youtube.vitess.proto.Vtgate.StreamExecuteRequest;
import com.youtube.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import java.io.Closeable;
import java.sql.SQLException;
/**
* RpcClient defines a set of methods to communicate with VTGates.
*/
public interface RpcClient extends Closeable {
// execute sends a single query using the VTGate V3 API.
ExecuteResponse execute(Context ctx, ExecuteRequest request)
throws VitessException, VitessRpcException;
ExecuteResponse execute(Context ctx, ExecuteRequest request) throws SQLException;
// executeShards sends a single query to a set of shards.
ExecuteShardsResponse executeShards(Context ctx, ExecuteShardsRequest request)
throws VitessException, VitessRpcException;
throws SQLException;
// executeKeyspaceIds sends a query with a set of keyspace IDs.
ExecuteKeyspaceIdsResponse executeKeyspaceIds(Context ctx, ExecuteKeyspaceIdsRequest request)
throws VitessException, VitessRpcException;
throws SQLException;
// executeKeyRanges sends a query with a set of key ranges.
ExecuteKeyRangesResponse executeKeyRanges(Context ctx, ExecuteKeyRangesRequest request)
throws VitessException, VitessRpcException;
throws SQLException;
// executeEntityIds sends a query with a set of entity IDs.
ExecuteEntityIdsResponse executeEntityIds(Context ctx, ExecuteEntityIdsRequest request)
throws VitessException, VitessRpcException;
throws SQLException;
// executeBatchShards sends a list of queries to a set of shards.
ExecuteBatchShardsResponse executeBatchShards(Context ctx, ExecuteBatchShardsRequest request)
throws VitessException, VitessRpcException;
throws SQLException;
// executeBatchKeyspaceIds sends a list of queries with keyspace ids as bind variables.
ExecuteBatchKeyspaceIdsResponse executeBatchKeyspaceIds(
Context ctx, ExecuteBatchKeyspaceIdsRequest request)
throws VitessException, VitessRpcException;
Context ctx, ExecuteBatchKeyspaceIdsRequest request) throws SQLException;
// streamExecute starts stream queries with the VTGate V3 API.
StreamIterator<QueryResult> streamExecute(Context ctx, StreamExecuteRequest request)
throws VitessRpcException;
throws SQLException;
// streamExecuteShard starts stream queries with multiple shards.
StreamIterator<QueryResult> streamExecuteShards(Context ctx, StreamExecuteShardsRequest request)
throws VitessRpcException;
throws SQLException;
// streamExecuteKeyspaceIds starts a list of stream queries with keyspace ids as bind variables.
StreamIterator<QueryResult> streamExecuteKeyspaceIds(
Context ctx, StreamExecuteKeyspaceIdsRequest request) throws VitessRpcException;
Context ctx, StreamExecuteKeyspaceIdsRequest request) throws SQLException;
// streamExecuteKeyRanges starts stream query with a set of key ranges.
StreamIterator<QueryResult> streamExecuteKeyRanges(
Context ctx, StreamExecuteKeyRangesRequest request) throws VitessRpcException;
Context ctx, StreamExecuteKeyRangesRequest request) throws SQLException;
// begin starts a transaction.
BeginResponse begin(Context ctx, BeginRequest request) throws VitessRpcException;
BeginResponse begin(Context ctx, BeginRequest request) throws SQLException;
// commit commits a transaction.
CommitResponse commit(Context ctx, CommitRequest request) throws VitessRpcException;
CommitResponse commit(Context ctx, CommitRequest request) throws SQLException;
// rollback rolls back a pending transaction.
RollbackResponse rollback(Context ctx, RollbackRequest request) throws VitessRpcException;
RollbackResponse rollback(Context ctx, RollbackRequest request) throws SQLException;
// splitQuery splits a query into smaller queries.
SplitQueryResponse splitQuery(Context ctx, SplitQueryRequest request)
throws VitessRpcException;
SplitQueryResponse splitQuery(Context ctx, SplitQueryRequest request) throws SQLException;
// getSrvKeyspace returns a list of serving keyspaces.
GetSrvKeyspaceResponse getSrvKeyspace(Context ctx, GetSrvKeyspaceRequest request)
throws VitessRpcException;
throws SQLException;
}

Просмотреть файл

@ -1,16 +1,24 @@
package com.youtube.vitess.client;
import java.sql.SQLException;
import java.util.NoSuchElementException;
/**
* StreamIterator is used to access the results of a streaming call.
* An {@link java.util.Iterator Iterator}-like interface for accessing the results of a
* Vitess streaming call.
*
* It is similar to java.util.Iterator, but the hasNext() method is
* <p>It is similar to {@link java.util.Iterator}, but the hasNext() method is
* understood to block until either a result is ready, an error occurs,
* or there are no more results. Also, unlike Iterator, these methods
* can throw VitessException or VitessRpcException.
* can throw SQLException.
*
* <p>The {@link #close()} method should be called when done to free up threads that may be blocking
* on the streaming connection.
*
* @param <E> the type of result returned by the iterator,
* e.g. {@link com.youtube.vitess.proto.Query.QueryResult QueryResult}
*/
public interface StreamIterator<E> {
public interface StreamIterator<E> extends AutoCloseable {
/**
* hasNext returns true if next() would return a value.
*
@ -21,7 +29,7 @@ public interface StreamIterator<E> {
* <li>An error occurs (throws exception).
* </ul>
*/
boolean hasNext() throws VitessException, VitessRpcException;
boolean hasNext() throws SQLException;
/**
* next returns the next value if one is available.
@ -33,5 +41,5 @@ public interface StreamIterator<E> {
* <li>An error occurs (throws other exception).
* </ul>
*/
E next() throws NoSuchElementException, VitessException, VitessRpcException;
E next() throws NoSuchElementException, SQLException;
}

Просмотреть файл

@ -2,7 +2,9 @@ package com.youtube.vitess.client;
import com.google.common.collect.Iterables;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.client.cursor.Cursor;
import com.youtube.vitess.client.cursor.SimpleCursor;
import com.youtube.vitess.client.cursor.StreamCursor;
import com.youtube.vitess.proto.Topodata.KeyRange;
import com.youtube.vitess.proto.Topodata.SrvKeyspace;
import com.youtube.vitess.proto.Topodata.TabletType;
@ -35,6 +37,7 @@ import com.youtube.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -67,9 +70,7 @@ import java.util.Map;
* for (Row row : result.getRowsList()) {
* // process each row.
* }
* } catch (VitessException e) {
* // ...
* } catch (VitessRpcException e) {
* } catch (SQLException e) {
* // ...
* }
* </pre>
@ -81,8 +82,8 @@ public class VTGateConn implements Closeable {
this.client = client;
}
public QueryResult execute(Context ctx, String query, Map<String, ?> bindVars,
TabletType tabletType) throws VitessException, VitessRpcException {
public Cursor execute(Context ctx, String query, Map<String, ?> bindVars, TabletType tabletType)
throws SQLException {
ExecuteRequest.Builder requestBuilder =
ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -92,12 +93,11 @@ public class VTGateConn implements Closeable {
}
ExecuteResponse response = client.execute(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType)
throws VitessException, VitessRpcException {
public Cursor executeShards(Context ctx, String query, String keyspace, Iterable<String> shards,
Map<String, ?> bindVars, TabletType tabletType) throws SQLException {
ExecuteShardsRequest.Builder requestBuilder =
ExecuteShardsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -109,12 +109,12 @@ public class VTGateConn implements Closeable {
}
ExecuteShardsResponse response = client.executeShards(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeKeyspaceIds(Context ctx, String query, String keyspace,
public Cursor executeKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType)
throws VitessException, VitessRpcException {
throws SQLException {
ExecuteKeyspaceIdsRequest.Builder requestBuilder =
ExecuteKeyspaceIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -126,12 +126,12 @@ public class VTGateConn implements Closeable {
}
ExecuteKeyspaceIdsResponse response = client.executeKeyspaceIds(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeKeyRanges(Context ctx, String query, String keyspace,
public Cursor executeKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType)
throws VitessException, VitessRpcException {
throws SQLException {
ExecuteKeyRangesRequest.Builder requestBuilder =
ExecuteKeyRangesRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -143,12 +143,12 @@ public class VTGateConn implements Closeable {
}
ExecuteKeyRangesResponse response = client.executeKeyRanges(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeEntityIds(Context ctx, String query, String keyspace,
public Cursor executeEntityIds(Context ctx, String query, String keyspace,
String entityColumnName, Map<byte[], ?> entityKeyspaceIds, Map<String, ?> bindVars,
TabletType tabletType) throws VitessException, VitessRpcException {
TabletType tabletType) throws SQLException {
ExecuteEntityIdsRequest.Builder requestBuilder =
ExecuteEntityIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -162,12 +162,11 @@ public class VTGateConn implements Closeable {
}
ExecuteEntityIdsResponse response = client.executeEntityIds(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public List<QueryResult> executeBatchShards(Context ctx,
Iterable<? extends BoundShardQuery> queries, TabletType tabletType, boolean asTransaction)
throws VitessException, VitessRpcException {
public List<Cursor> executeBatchShards(Context ctx, Iterable<? extends BoundShardQuery> queries,
TabletType tabletType, boolean asTransaction) throws SQLException {
ExecuteBatchShardsRequest.Builder requestBuilder =
ExecuteBatchShardsRequest.newBuilder()
.addAllQueries(queries)
@ -178,12 +177,12 @@ public class VTGateConn implements Closeable {
}
ExecuteBatchShardsResponse response = client.executeBatchShards(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResultsList();
return Proto.toCursorList(response.getResultsList());
}
public List<QueryResult> executeBatchKeyspaceIds(Context ctx,
public List<Cursor> executeBatchKeyspaceIds(Context ctx,
Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
boolean asTransaction) throws VitessException, VitessRpcException {
boolean asTransaction) throws SQLException {
ExecuteBatchKeyspaceIdsRequest.Builder requestBuilder =
ExecuteBatchKeyspaceIdsRequest.newBuilder()
.addAllQueries(queries)
@ -195,11 +194,11 @@ public class VTGateConn implements Closeable {
ExecuteBatchKeyspaceIdsResponse response =
client.executeBatchKeyspaceIds(ctx, requestBuilder.build());
Proto.checkError(response.getError());
return response.getResultsList();
return Proto.toCursorList(response.getResultsList());
}
public StreamIterator<QueryResult> streamExecute(Context ctx, String query,
Map<String, ?> bindVars, TabletType tabletType) throws VitessRpcException {
public Cursor streamExecute(Context ctx, String query, Map<String, ?> bindVars,
TabletType tabletType) throws SQLException {
StreamExecuteRequest.Builder requestBuilder =
StreamExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -207,12 +206,11 @@ public class VTGateConn implements Closeable {
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return client.streamExecute(ctx, requestBuilder.build());
return new StreamCursor(client.streamExecute(ctx, requestBuilder.build()));
}
public StreamIterator<QueryResult> streamExecuteShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType)
throws VitessRpcException {
public Cursor streamExecuteShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType) throws SQLException {
StreamExecuteShardsRequest.Builder requestBuilder =
StreamExecuteShardsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -222,12 +220,12 @@ public class VTGateConn implements Closeable {
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return client.streamExecuteShards(ctx, requestBuilder.build());
return new StreamCursor(client.streamExecuteShards(ctx, requestBuilder.build()));
}
public StreamIterator<QueryResult> streamExecuteKeyspaceIds(Context ctx, String query,
String keyspace, Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType)
throws VitessRpcException {
public Cursor streamExecuteKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType)
throws SQLException {
StreamExecuteKeyspaceIdsRequest.Builder requestBuilder =
StreamExecuteKeyspaceIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -237,12 +235,12 @@ public class VTGateConn implements Closeable {
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return client.streamExecuteKeyspaceIds(ctx, requestBuilder.build());
return new StreamCursor(client.streamExecuteKeyspaceIds(ctx, requestBuilder.build()));
}
public StreamIterator<QueryResult> streamExecuteKeyRanges(Context ctx, String query,
String keyspace, Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars,
TabletType tabletType) throws VitessRpcException {
public Cursor streamExecuteKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType)
throws SQLException {
StreamExecuteKeyRangesRequest.Builder requestBuilder =
StreamExecuteKeyRangesRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
@ -252,10 +250,10 @@ public class VTGateConn implements Closeable {
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return client.streamExecuteKeyRanges(ctx, requestBuilder.build());
return new StreamCursor(client.streamExecuteKeyRanges(ctx, requestBuilder.build()));
}
public VTGateTx begin(Context ctx) throws VitessException, VitessRpcException {
public VTGateTx begin(Context ctx) throws SQLException {
BeginRequest.Builder requestBuilder = BeginRequest.newBuilder();
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
@ -265,8 +263,7 @@ public class VTGateConn implements Closeable {
}
public List<SplitQueryResponse.Part> splitQuery(Context ctx, String keyspace, String query,
Map<String, ?> bindVars, String splitColumn, long splitCount)
throws VitessException, VitessRpcException {
Map<String, ?> bindVars, String splitColumn, long splitCount) throws SQLException {
SplitQueryRequest.Builder requestBuilder =
SplitQueryRequest.newBuilder()
.setKeyspace(keyspace)
@ -280,8 +277,7 @@ public class VTGateConn implements Closeable {
return response.getSplitsList();
}
public SrvKeyspace getSrvKeyspace(Context ctx, String keyspace)
throws VitessException, VitessRpcException {
public SrvKeyspace getSrvKeyspace(Context ctx, String keyspace) throws SQLException {
GetSrvKeyspaceRequest.Builder requestBuilder =
GetSrvKeyspaceRequest.newBuilder().setKeyspace(keyspace);
GetSrvKeyspaceResponse response = client.getSrvKeyspace(ctx, requestBuilder.build());

Просмотреть файл

@ -2,7 +2,8 @@ package com.youtube.vitess.client;
import com.google.common.collect.Iterables;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.client.cursor.Cursor;
import com.youtube.vitess.client.cursor.SimpleCursor;
import com.youtube.vitess.proto.Topodata.KeyRange;
import com.youtube.vitess.proto.Topodata.TabletType;
import com.youtube.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
@ -25,6 +26,8 @@ import com.youtube.vitess.proto.Vtgate.ExecuteShardsResponse;
import com.youtube.vitess.proto.Vtgate.RollbackRequest;
import com.youtube.vitess.proto.Vtgate.Session;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -44,11 +47,10 @@ public class VTGateTx {
return new VTGateTx(client, session);
}
public QueryResult execute(Context ctx, String query, Map<String, ?> bindVars,
TabletType tabletType, boolean notInTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
public Cursor execute(Context ctx, String query, Map<String, ?> bindVars, TabletType tabletType,
boolean notInTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("execute: not in transaction");
throw new SQLDataException("execute: not in transaction");
}
ExecuteRequest.Builder requestBuilder =
ExecuteRequest.newBuilder()
@ -62,15 +64,14 @@ public class VTGateTx {
ExecuteResponse response = client.execute(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType,
boolean notInTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
public Cursor executeShards(Context ctx, String query, String keyspace, Iterable<String> shards,
Map<String, ?> bindVars, TabletType tabletType, boolean notInTransaction)
throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeShards: not in transaction");
throw new SQLDataException("executeShards: not in transaction");
}
ExecuteShardsRequest.Builder requestBuilder =
ExecuteShardsRequest.newBuilder()
@ -86,15 +87,14 @@ public class VTGateTx {
ExecuteShardsResponse response = client.executeShards(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeKeyspaceIds(Context ctx, String query, String keyspace,
public Cursor executeKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType,
boolean notInTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
boolean notInTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeKeyspaceIds: not in transaction");
throw new SQLDataException("executeKeyspaceIds: not in transaction");
}
ExecuteKeyspaceIdsRequest.Builder requestBuilder =
ExecuteKeyspaceIdsRequest.newBuilder()
@ -110,15 +110,14 @@ public class VTGateTx {
ExecuteKeyspaceIdsResponse response = client.executeKeyspaceIds(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeKeyRanges(Context ctx, String query, String keyspace,
public Cursor executeKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType,
boolean notInTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
boolean notInTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeKeyRanges: not in transaction");
throw new SQLDataException("executeKeyRanges: not in transaction");
}
ExecuteKeyRangesRequest.Builder requestBuilder =
ExecuteKeyRangesRequest.newBuilder()
@ -134,15 +133,14 @@ public class VTGateTx {
ExecuteKeyRangesResponse response = client.executeKeyRanges(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public QueryResult executeEntityIds(Context ctx, String query, String keyspace,
public Cursor executeEntityIds(Context ctx, String query, String keyspace,
String entityColumnName, Map<byte[], ?> entityKeyspaceIds, Map<String, ?> bindVars,
TabletType tabletType, boolean notInTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
TabletType tabletType, boolean notInTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeEntityIds: not in transaction");
throw new SQLDataException("executeEntityIds: not in transaction");
}
ExecuteEntityIdsRequest.Builder requestBuilder =
ExecuteEntityIdsRequest.newBuilder()
@ -160,14 +158,13 @@ public class VTGateTx {
ExecuteEntityIdsResponse response = client.executeEntityIds(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResult();
return new SimpleCursor(response.getResult());
}
public List<QueryResult> executeBatchShards(Context ctx,
Iterable<? extends BoundShardQuery> queries, TabletType tabletType, boolean asTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
public List<Cursor> executeBatchShards(Context ctx, Iterable<? extends BoundShardQuery> queries,
TabletType tabletType, boolean asTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeBatchShards: not in transaction");
throw new SQLDataException("executeBatchShards: not in transaction");
}
ExecuteBatchShardsRequest.Builder requestBuilder =
ExecuteBatchShardsRequest.newBuilder()
@ -181,15 +178,14 @@ public class VTGateTx {
ExecuteBatchShardsResponse response = client.executeBatchShards(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResultsList();
return Proto.toCursorList(response.getResultsList());
}
public List<QueryResult> executeBatchKeyspaceIds(Context ctx,
public List<Cursor> executeBatchKeyspaceIds(Context ctx,
Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
boolean asTransaction)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
boolean asTransaction) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("executeBatchKeyspaceIds: not in transaction");
throw new SQLDataException("executeBatchKeyspaceIds: not in transaction");
}
ExecuteBatchKeyspaceIdsRequest.Builder requestBuilder =
ExecuteBatchKeyspaceIdsRequest.newBuilder()
@ -204,13 +200,12 @@ public class VTGateTx {
client.executeBatchKeyspaceIds(ctx, requestBuilder.build());
session = response.getSession();
Proto.checkError(response.getError());
return response.getResultsList();
return Proto.toCursorList(response.getResultsList());
}
public void commit(Context ctx)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
public void commit(Context ctx) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("commit: not in transaction");
throw new SQLDataException("commit: not in transaction");
}
CommitRequest.Builder requestBuilder = CommitRequest.newBuilder().setSession(session);
if (ctx.getCallerId() != null) {
@ -220,10 +215,9 @@ public class VTGateTx {
session = null;
}
public void rollback(Context ctx)
throws VitessException, VitessRpcException, VitessNotInTransactionException {
public void rollback(Context ctx) throws SQLException {
if (session == null) {
throw new VitessNotInTransactionException("rollback: not in transaction");
throw new SQLDataException("rollback: not in transaction");
}
RollbackRequest.Builder requestBuilder = RollbackRequest.newBuilder().setSession(session);
if (ctx.getCallerId() != null) {

Просмотреть файл

@ -1,22 +0,0 @@
package com.youtube.vitess.client;
/**
* VitessException indicates an error returned by VTGate.
*/
public class VitessException extends Exception {
public VitessException() {
super();
}
public VitessException(String message) {
super(message);
}
public VitessException(String message, Throwable cause) {
super(message, cause);
}
public VitessException(Throwable cause) {
super(cause);
}
}

Просмотреть файл

@ -1,23 +0,0 @@
package com.youtube.vitess.client;
/**
* VitessNotInTransactionException indicates a request is not wrapped in a transaction.
*/
public class VitessNotInTransactionException extends Exception {
public VitessNotInTransactionException() {
super();
}
public VitessNotInTransactionException(String message) {
super(message);
}
public VitessNotInTransactionException(String message, Throwable cause) {
super(message, cause);
}
public VitessNotInTransactionException(Throwable cause) {
super(cause);
}
}

Просмотреть файл

@ -1,23 +0,0 @@
package com.youtube.vitess.client;
/**
* VitessRpcException indicates an RPC error while trying to communicate with VTGate.
*/
public class VitessRpcException extends Exception {
public VitessRpcException() {
super();
}
public VitessRpcException(String message) {
super(message);
}
public VitessRpcException(String message, Throwable cause) {
super(message, cause);
}
public VitessRpcException(Throwable cause) {
super(cause);
}
}

Просмотреть файл

@ -0,0 +1,230 @@
package com.youtube.vitess.client.cursor;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.UnsignedLong;
import com.google.protobuf.ByteString;
import com.youtube.vitess.proto.Query.Field;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Query.Row;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.ISODateTimeFormat;
import java.math.BigDecimal;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
* Provides access to the result rows of a query.
*
* <p>{@code Cursor} wraps an underlying Vitess {@link QueryResult} object, converting column
* values from the raw result values to Java types. In the case of streaming queries, the
* {@link StreamCursor} implementation will also fetch more {@code QueryResult} objects as
* necessary.
*
* <p>Similar to {@link java.sql.ResultSet}, a {@code Cursor} is initially positioned before the
* first row, and the first call to {@link #next()} moves to the first row. The getter methods
* return the value of the specified column within the current row. The {@link #close()} method
* should be called to free resources when done, regardless of whether all the rows were processed.
*
* <p>Where possible, the methods use the same signature and exceptions as
* {@link java.sql.ResultSet}, but implementing the full {@code ResultSet} interface is not a goal
* of this class.
*
* <p>Each individual {@code Cursor} is not thread-safe; it must be protected if used concurrently.
* However, two cursors from the same {@link com.youtube.vitess.client.VTGateConn VTGateConn} can be
* accessed concurrently without additional synchronization.
*/
public abstract class Cursor implements AutoCloseable {
public abstract long getRowsAffected() throws SQLException;
public abstract long getInsertId() throws SQLException;
public abstract List<Field> getFields() throws SQLException;
public abstract boolean next() throws SQLException;
private Map<String, Integer> fieldMap;
public int findColumn(String columnLabel) throws SQLException {
if (fieldMap == null) {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
List<Field> fields = getFields();
for (int i = 0; i < fields.size(); ++i) {
builder.put(fields.get(i).getName(), i);
}
fieldMap = builder.build();
}
if (!fieldMap.containsKey(columnLabel)) {
throw new SQLDataException("column not found:" + columnLabel);
}
return fieldMap.get(columnLabel);
}
public Object getObject(String columnLabel) throws SQLException {
return getObject(findColumn(columnLabel));
}
public Object getObject(int columnIndex) throws SQLException {
Row row = getCurrentRow();
if (columnIndex >= row.getValuesCount()) {
throw new SQLDataException("invalid columnIndex: " + columnIndex);
}
return convertFieldValue(getFields().get(columnIndex), row.getValues(columnIndex));
}
public int getInt(String columnLabel) throws SQLException {
return getInt(findColumn(columnLabel));
}
public int getInt(int columnIndex) throws SQLException {
return (Integer) getAndCheckType(columnIndex, Integer.class);
}
public UnsignedLong getULong(String columnLabel) throws SQLException {
return getULong(findColumn(columnLabel));
}
public UnsignedLong getULong(int columnIndex) throws SQLException {
return (UnsignedLong) getAndCheckType(columnIndex, UnsignedLong.class);
}
public String getString(String columnLabel) throws SQLException {
return getString(findColumn(columnLabel));
}
public String getString(int columnIndex) throws SQLException {
return (String) getAndCheckType(columnIndex, String.class);
}
public long getLong(String columnLabel) throws SQLException {
return getLong(findColumn(columnLabel));
}
public long getLong(int columnIndex) throws SQLException {
return (Long) getAndCheckType(columnIndex, Long.class);
}
public double getDouble(String columnLabel) throws SQLException {
return getDouble(findColumn(columnLabel));
}
public double getDouble(int columnIndex) throws SQLException {
return (Double) getAndCheckType(columnIndex, Double.class);
}
public float getFloat(String columnLabel) throws SQLException {
return getFloat(findColumn(columnLabel));
}
public float getFloat(int columnIndex) throws SQLException {
return (Float) getAndCheckType(columnIndex, Float.class);
}
public DateTime getDateTime(String columnLabel) throws SQLException {
return getDateTime(findColumn(columnLabel));
}
public DateTime getDateTime(int columnIndex) throws SQLException {
return (DateTime) getAndCheckType(columnIndex, DateTime.class);
}
public byte[] getBytes(String columnLabel) throws SQLException {
return getBytes(findColumn(columnLabel));
}
public byte[] getBytes(int columnIndex) throws SQLException {
return (byte[]) getAndCheckType(columnIndex, byte[].class);
}
public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
return getBigDecimal(findColumn(columnLabel));
}
public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
return (BigDecimal) getAndCheckType(columnIndex, BigDecimal.class);
}
public short getShort(String columnLabel) throws SQLException {
return getShort(findColumn(columnLabel));
}
public short getShort(int columnIndex) throws SQLException {
return (Short) getAndCheckType(columnIndex, Short.class);
}
protected abstract Row getCurrentRow() throws SQLException;
private Object getAndCheckType(int columnIndex, Class<?> cls) throws SQLException {
Object o = getObject(columnIndex);
if (o != null && !cls.isInstance(o)) {
throw new SQLDataException(
"type mismatch, expected:" + cls.getName() + ", actual: " + o.getClass().getName());
}
return o;
}
Object convertFieldValue(Field field, ByteString value) throws SQLException {
if (value == null || value.size() == 0) {
return null;
}
// Note: We don't actually know the charset in which the value is encoded.
// For dates and numeric values, we just assume UTF-8 because they (hopefully) don't contain
// anything outside 7-bit ASCII, which (hopefully) is a subset of the actual charset.
// For strings, we return byte[] and the application is responsible for using the right charset.
switch (field.getType()) {
case TYPE_DECIMAL: // fall through
case TYPE_NEWDECIMAL:
return new BigDecimal(value.toStringUtf8());
case TYPE_TINY: // fall through
case TYPE_SHORT: // fall through
case TYPE_INT24:
return Integer.valueOf(value.toStringUtf8());
case TYPE_LONG:
return Long.valueOf(value.toStringUtf8());
case TYPE_FLOAT:
return Float.valueOf(value.toStringUtf8());
case TYPE_DOUBLE:
return Double.valueOf(value.toStringUtf8());
case TYPE_NULL:
return null;
case TYPE_LONGLONG:
// This can be an unsigned or a signed long
if ((field.getFlags() & Field.Flag.VT_UNSIGNED_FLAG_VALUE) != 0) {
return UnsignedLong.valueOf(value.toStringUtf8());
} else {
return Long.valueOf(value.toStringUtf8());
}
case TYPE_DATE: // fall through
case TYPE_NEWDATE:
return DateTime.parse(value.toStringUtf8(), ISODateTimeFormat.date());
case TYPE_TIME:
return DateTime.parse(value.toStringUtf8(), DateTimeFormat.forPattern("HH:mm:ss"));
case TYPE_DATETIME: // fall through
case TYPE_TIMESTAMP:
return DateTime.parse(value.toStringUtf8().replace(' ', 'T'));
case TYPE_YEAR:
return Short.valueOf(value.toStringUtf8());
case TYPE_ENUM: // fall through
case TYPE_SET:
return value.toStringUtf8();
case TYPE_VARCHAR: // fall through
case TYPE_BIT: // fall through
case TYPE_TINY_BLOB: // fall through
case TYPE_MEDIUM_BLOB: // fall through
case TYPE_LONG_BLOB: // fall through
case TYPE_BLOB: // fall through
case TYPE_VAR_STRING: // fall through
case TYPE_STRING: // fall through
case TYPE_GEOMETRY:
return value.toByteArray();
default:
throw new SQLDataException("unknown field type: " + field.getType());
}
}
}

Просмотреть файл

@ -0,0 +1,68 @@
package com.youtube.vitess.client.cursor;
import com.youtube.vitess.proto.Query.Field;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Query.Row;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
/**
* A {@link Cursor} that serves records from a single {@link QueryResult} object.
*/
public class SimpleCursor extends Cursor {
private QueryResult queryResult;
private Iterator<Row> rowIterator;
private Row row;
public SimpleCursor(QueryResult queryResult) {
this.queryResult = queryResult;
rowIterator = queryResult.getRowsList().iterator();
}
@Override
public long getRowsAffected() throws SQLException {
return queryResult.getRowsAffected();
}
@Override
public long getInsertId() throws SQLException {
return queryResult.getInsertId();
}
@Override
public List<Field> getFields() throws SQLException {
return queryResult.getFieldsList();
}
@Override
public void close() throws Exception {
row = null;
rowIterator = null;
}
@Override
public boolean next() throws SQLException {
if (rowIterator == null) {
throw new SQLDataException("next() called on closed Cursor");
}
if (rowIterator.hasNext()) {
row = rowIterator.next();
return true;
}
row = null;
return false;
}
@Override
protected Row getCurrentRow() throws SQLException {
if (row == null) {
throw new SQLDataException("no current row");
}
return row;
}
}

Просмотреть файл

@ -0,0 +1,93 @@
package com.youtube.vitess.client.cursor;
import com.youtube.vitess.client.StreamIterator;
import com.youtube.vitess.proto.Query.Field;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Query.Row;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Iterator;
import java.util.List;
/**
* A {@link Cursor} that serves records from the sequence of {@link QueryResult} objects
* represented by a {@link StreamIterator}.
*/
public class StreamCursor extends Cursor {
private StreamIterator<QueryResult> streamIterator;
private Iterator<Row> rowIterator;
private Row row;
private List<Field> fields;
public StreamCursor(StreamIterator<QueryResult> streamIterator) {
this.streamIterator = streamIterator;
}
@Override
public long getRowsAffected() throws SQLException {
throw new SQLFeatureNotSupportedException("getRowsAffected() is not supported on StreamCursor");
}
@Override
public long getInsertId() throws SQLException {
throw new SQLFeatureNotSupportedException("getInsertId() is not supported on StreamCursor");
}
@Override
public List<Field> getFields() throws SQLException {
if (fields == null) {
throw new SQLDataException("can't get fields until first streaming result is received");
}
return fields;
}
@Override
public void close() throws Exception {
streamIterator.close();
streamIterator = null;
}
@Override
public boolean next() throws SQLException {
if (streamIterator == null) {
throw new SQLDataException("next() called on closed Cursor");
}
// Get the next Row from the current QueryResult.
if (rowIterator != null && rowIterator.hasNext()) {
row = rowIterator.next();
return true;
}
// Get the next QueryResult. Loop in case we get a QueryResult with no Rows (e.g. only Fields).
while (streamIterator.hasNext()) {
QueryResult queryResult = streamIterator.next();
if (fields == null) {
// The first QueryResult should have the fields.
fields = queryResult.getFieldsList();
}
rowIterator = queryResult.getRowsList().iterator();
// Get the first Row from the new QueryResult.
if (rowIterator.hasNext()) {
row = rowIterator.next();
return true;
}
}
// No more Rows and no more QueryResults.
row = null;
return false;
}
@Override
protected Row getCurrentRow() throws SQLException {
if (row == null) {
throw new SQLDataException("no current row");
}
return row;
}
}

Просмотреть файл

@ -3,7 +3,8 @@ package com.youtube.vitess.client;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.client.cursor.Cursor;
import com.youtube.vitess.proto.Query.Field;
import com.youtube.vitess.proto.Topodata.KeyRange;
import com.youtube.vitess.proto.Topodata.KeyspaceIdType;
import com.youtube.vitess.proto.Topodata.ShardReference;
@ -18,6 +19,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -94,12 +96,19 @@ public abstract class RpcClientTest {
private static final String CALLER_ID_ECHO =
"principal:\"test_principal\" component:\"test_component\" subcomponent:\"test_subcomponent\" ";
private static Map<String, String> getEcho(QueryResult result) {
Map<String, String> fields = new HashMap<String, String>();
for (int i = 0; i < result.getFieldsCount(); i++) {
fields.put(result.getFields(i).getName(), result.getRows(0).getValues(i).toStringUtf8());
private static Map<String, String> getEcho(Cursor cursor) throws Exception {
Map<String, String> values = new HashMap<String, String>();
if (cursor.next()) {
// Echo values are stored as columns in the first row of the result.
List<Field> fields = cursor.getFields();
for (int i = 0; i < fields.size(); i++) {
values.put(fields.get(i).getName(), new String(cursor.getBytes(i), StandardCharsets.UTF_8));
}
}
return fields;
cursor.close();
return values;
}
@Test
@ -177,15 +186,14 @@ public abstract class RpcClientTest {
public void testEchoStreamExecute() throws Exception {
Map<String, String> echo;
echo = getEcho(conn.streamExecute(ctx, ECHO_PREFIX + QUERY, BIND_VARS, TABLET_TYPE).next());
echo = getEcho(conn.streamExecute(ctx, ECHO_PREFIX + QUERY, BIND_VARS, TABLET_TYPE));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
echo = getEcho(
conn.streamExecuteShards(ctx, ECHO_PREFIX + QUERY, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE)
.next());
echo = getEcho(conn.streamExecuteShards(
ctx, ECHO_PREFIX + QUERY, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
@ -193,8 +201,8 @@ public abstract class RpcClientTest {
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
echo = getEcho(conn.streamExecuteKeyspaceIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEYSPACE_IDS,
BIND_VARS, TABLET_TYPE).next());
echo = getEcho(conn.streamExecuteKeyspaceIds(
ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEYSPACE_IDS, BIND_VARS, TABLET_TYPE));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
@ -202,8 +210,8 @@ public abstract class RpcClientTest {
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
echo = getEcho(conn.streamExecuteKeyRanges(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEY_RANGES,
BIND_VARS, TABLET_TYPE).next());
echo = getEcho(conn.streamExecuteKeyRanges(
ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEY_RANGES, BIND_VARS, TABLET_TYPE));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));

Просмотреть файл

@ -0,0 +1,232 @@
package com.youtube.vitess.client.cursor;
import com.google.common.primitives.UnsignedLong;
import com.google.protobuf.ByteString;
import com.youtube.vitess.proto.Query.Field;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Query.Row;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
public class CursorTest {
@Test
public void testFindColumn() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").build())
.addFields(Field.newBuilder().setName("col1").build())
.addFields(Field.newBuilder().setName("col2").build())
.build())) {
Assert.assertEquals(0, cursor.findColumn("col0"));
Assert.assertEquals(1, cursor.findColumn("col1"));
Assert.assertEquals(2, cursor.findColumn("col2"));
}
}
@Test
public void testGetInt() throws Exception {
List<Field.Type> types =
Arrays.asList(Field.Type.TYPE_TINY, Field.Type.TYPE_SHORT, Field.Type.TYPE_INT24);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("12345")))
.build())) {
cursor.next();
Assert.assertEquals(12345, cursor.getInt("col0"));
}
}
}
@Test
public void testGetULong() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(
Field.newBuilder()
.setName("col0")
.setType(Field.Type.TYPE_LONGLONG)
.setFlags(Field.Flag.VT_UNSIGNED_FLAG_VALUE)
.build())
.addRows(
Row.newBuilder().addValues(ByteString.copyFromUtf8("18446744073709551615")))
.build())) {
cursor.next();
Assert.assertEquals(UnsignedLong.fromLongBits(-1), cursor.getULong("col0"));
}
}
@Test
public void testGetString() throws Exception {
List<Field.Type> types = Arrays.asList(Field.Type.TYPE_ENUM, Field.Type.TYPE_SET);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("val123")))
.build())) {
cursor.next();
Assert.assertEquals("val123", cursor.getString("col0"));
}
}
}
@Test
public void testGetLong() throws Exception {
List<Field.Type> types = Arrays.asList(Field.Type.TYPE_LONG, Field.Type.TYPE_LONGLONG);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("12345")))
.build())) {
cursor.next();
Assert.assertEquals(12345L, cursor.getLong("col0"));
}
}
}
@Test
public void testGetDouble() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(
Field.newBuilder().setName("col0").setType(Field.Type.TYPE_DOUBLE).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("2.5")))
.build())) {
cursor.next();
Assert.assertEquals(2.5, cursor.getDouble("col0"), 0.01);
}
}
@Test
public void testGetFloat() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(
Field.newBuilder().setName("col0").setType(Field.Type.TYPE_FLOAT).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("2.5")))
.build())) {
cursor.next();
Assert.assertEquals(2.5f, cursor.getFloat("col0"), 0.01f);
}
}
@Test
public void testGetDateTime() throws Exception {
List<Field.Type> types = Arrays.asList(Field.Type.TYPE_DATETIME, Field.Type.TYPE_TIMESTAMP);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(
Row.newBuilder().addValues(ByteString.copyFromUtf8("2008-01-02 14:15:16")))
.build())) {
cursor.next();
Assert.assertEquals(new DateTime(2008, 1, 2, 14, 15, 16), cursor.getDateTime("col0"));
}
}
types = Arrays.asList(Field.Type.TYPE_DATE, Field.Type.TYPE_NEWDATE);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("2008-01-02")))
.build())) {
cursor.next();
Assert.assertEquals(new DateTime(2008, 1, 2, 0, 0, 0), cursor.getDateTime("col0"));
}
}
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(Field.Type.TYPE_TIME).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("12:34:56")))
.build())) {
cursor.next();
Assert.assertEquals(new DateTime(1970, 1, 1, 12, 34, 56), cursor.getDateTime("col0"));
}
}
@Test
public void testGetBytes() throws Exception {
List<Field.Type> types =
Arrays.asList(Field.Type.TYPE_VARCHAR, Field.Type.TYPE_BIT, Field.Type.TYPE_TINY_BLOB,
Field.Type.TYPE_MEDIUM_BLOB, Field.Type.TYPE_LONG_BLOB, Field.Type.TYPE_BLOB,
Field.Type.TYPE_VAR_STRING, Field.Type.TYPE_STRING, Field.Type.TYPE_GEOMETRY);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("hello world")))
.build())) {
cursor.next();
Assert.assertArrayEquals("hello world".getBytes("UTF-8"), cursor.getBytes("col0"));
}
}
}
@Test
public void testGetBigDecimal() throws Exception {
List<Field.Type> types = Arrays.asList(Field.Type.TYPE_DECIMAL, Field.Type.TYPE_NEWDECIMAL);
for (Field.Type type : types) {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(type).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("1234.56789")))
.build())) {
cursor.next();
Assert.assertEquals(
new BigDecimal(BigInteger.valueOf(123456789), 5), cursor.getBigDecimal("col0"));
}
}
}
@Test
public void testGetShort() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(Field.Type.TYPE_YEAR).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("1234")))
.build())) {
cursor.next();
Assert.assertEquals(1234, cursor.getShort("col0"));
}
}
@Test
public void testNull() throws Exception {
try (
Cursor cursor = new SimpleCursor(
QueryResult.newBuilder()
.addFields(Field.newBuilder().setName("col0").setType(Field.Type.TYPE_NULL).build())
.addRows(Row.newBuilder().addValues(ByteString.copyFromUtf8("1234")))
.build())) {
cursor.next();
Assert.assertEquals(null, cursor.getObject("col0"));
}
}
}

Просмотреть файл

@ -3,8 +3,6 @@ package com.youtube.vitess.client.grpc;
import com.youtube.vitess.client.Context;
import com.youtube.vitess.client.RpcClient;
import com.youtube.vitess.client.StreamIterator;
import com.youtube.vitess.client.VitessException;
import com.youtube.vitess.client.VitessRpcException;
import com.youtube.vitess.proto.Query.QueryResult;
import com.youtube.vitess.proto.Vtgate.BeginRequest;
import com.youtube.vitess.proto.Vtgate.BeginResponse;
@ -45,6 +43,7 @@ import com.youtube.vitess.proto.grpc.VitessGrpc.VitessStub;
import io.grpc.ChannelImpl;
import java.io.IOException;
import java.sql.SQLException;
/**
* GrpcClient is a gRPC-based implementation of Vitess Rpcclient.
@ -66,210 +65,197 @@ public class GrpcClient implements RpcClient {
}
@Override
public ExecuteResponse execute(Context ctx, ExecuteRequest request)
throws VitessException, VitessRpcException {
public ExecuteResponse execute(Context ctx, ExecuteRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.execute(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteShardsResponse executeShards(Context ctx, ExecuteShardsRequest request)
throws VitessException, VitessRpcException {
throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeShards(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteKeyspaceIdsResponse executeKeyspaceIds(
Context ctx, ExecuteKeyspaceIdsRequest request) throws VitessException, VitessRpcException {
Context ctx, ExecuteKeyspaceIdsRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeKeyspaceIds(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteKeyRangesResponse executeKeyRanges(Context ctx, ExecuteKeyRangesRequest request)
throws VitessException, VitessRpcException {
throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeKeyRanges(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteEntityIdsResponse executeEntityIds(Context ctx, ExecuteEntityIdsRequest request)
throws VitessException, VitessRpcException {
throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeEntityIds(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteBatchShardsResponse executeBatchShards(
Context ctx, ExecuteBatchShardsRequest request) throws VitessException, VitessRpcException {
Context ctx, ExecuteBatchShardsRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeBatchShards(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public ExecuteBatchKeyspaceIdsResponse executeBatchKeyspaceIds(
Context ctx, ExecuteBatchKeyspaceIdsRequest request)
throws VitessException, VitessRpcException {
Context ctx, ExecuteBatchKeyspaceIdsRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.executeBatchKeyspaceIds(request);
} catch (Exception e) {
checkGrpcError(e);
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public StreamIterator<QueryResult> streamExecute(Context ctx, StreamExecuteRequest request)
throws VitessRpcException {
throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
GrpcStreamAdapter<StreamExecuteResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteResponse response) throws VitessException {
QueryResult getResult(StreamExecuteResponse response) throws SQLException {
return response.getResult();
}
};
asyncStub.streamExecute(request, adapter);
return adapter;
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public StreamIterator<QueryResult> streamExecuteShards(
Context ctx, StreamExecuteShardsRequest request) throws VitessRpcException {
Context ctx, StreamExecuteShardsRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
GrpcStreamAdapter<StreamExecuteShardsResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteShardsResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteShardsResponse response) throws VitessException {
QueryResult getResult(StreamExecuteShardsResponse response) throws SQLException {
return response.getResult();
}
};
asyncStub.streamExecuteShards(request, adapter);
return adapter;
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public StreamIterator<QueryResult> streamExecuteKeyspaceIds(
Context ctx, StreamExecuteKeyspaceIdsRequest request) throws VitessRpcException {
Context ctx, StreamExecuteKeyspaceIdsRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
GrpcStreamAdapter<StreamExecuteKeyspaceIdsResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteKeyspaceIdsResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteKeyspaceIdsResponse response)
throws VitessException {
QueryResult getResult(StreamExecuteKeyspaceIdsResponse response) throws SQLException {
return response.getResult();
}
};
asyncStub.streamExecuteKeyspaceIds(request, adapter);
return adapter;
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public StreamIterator<QueryResult> streamExecuteKeyRanges(
Context ctx, StreamExecuteKeyRangesRequest request) throws VitessRpcException {
Context ctx, StreamExecuteKeyRangesRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
GrpcStreamAdapter<StreamExecuteKeyRangesResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteKeyRangesResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteKeyRangesResponse response) throws VitessException {
QueryResult getResult(StreamExecuteKeyRangesResponse response) throws SQLException {
return response.getResult();
}
};
asyncStub.streamExecuteKeyRanges(request, adapter);
return adapter;
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public BeginResponse begin(Context ctx, BeginRequest request) throws VitessRpcException {
public BeginResponse begin(Context ctx, BeginRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.begin(request);
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public CommitResponse commit(Context ctx, CommitRequest request) throws VitessRpcException {
public CommitResponse commit(Context ctx, CommitRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.commit(request);
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public RollbackResponse rollback(Context ctx, RollbackRequest request) throws VitessRpcException {
public RollbackResponse rollback(Context ctx, RollbackRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.rollback(request);
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public SplitQueryResponse splitQuery(Context ctx, SplitQueryRequest request)
throws VitessRpcException {
public SplitQueryResponse splitQuery(Context ctx, SplitQueryRequest request) throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.splitQuery(request);
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
@Override
public GetSrvKeyspaceResponse getSrvKeyspace(Context ctx, GetSrvKeyspaceRequest request)
throws VitessRpcException {
throws SQLException {
try (GrpcContext gctx = new GrpcContext(ctx)) {
return blockingStub.getSrvKeyspace(request);
} catch (Exception e) {
throw new VitessRpcException("grpc error", e);
throw convertGrpcError(e);
}
}
/**
* checkGrpcError converts an exception from the gRPC framework into
* a VitessException if it represents an app-level Vitess error.
* @param e
* @throws VitessException
* Converts an exception from the gRPC framework into the appropriate {@link SQLException}
*/
private void checkGrpcError(Exception e) throws VitessException {
// TODO(enisoc): Implement checkGrpcError.
private SQLException convertGrpcError(Exception e) {
// TODO(enisoc): Implement convertGrpcError.
return (SQLException) e;
}
}

Просмотреть файл

@ -1,40 +1,48 @@
package com.youtube.vitess.client.grpc;
import com.youtube.vitess.client.StreamIterator;
import com.youtube.vitess.client.VitessException;
import com.youtube.vitess.client.VitessRpcException;
import io.grpc.stub.StreamObserver;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.util.NoSuchElementException;
/**
* GrpcStreamAdapter is an implementation of StreamIterator that allows
* iteration (with checked exceptions) over results obtained through the
* gRPC StreamObserver interface.
* A {@link StreamIterator} that returns results provided by a gRPC {@link StreamObserver}
*
* <p>This class is abstract because it needs to be told how to extract the
* result (e.g. QueryResult) from a given RPC response (e.g. StreamExecuteResponse).
* Callers must therefore implement getResult() when instantiating this class.
* <p>This adapter allows iteration (with checked exceptions) over results obtained through the
* gRPC {@code StreamObserver} interface.
*
* @param <V> The type of value sent through the StreamObserver interface.
* @param <E> The type of value to return through the StreamIterator interface.
* <p>This class is abstract because it needs to be told how to extract the result
* (e.g. {@link com.youtube.vitess.proto.Query.QueryResult QueryResult}) from a given RPC response
* (e.g. {@link com.youtube.vitess.proto.Vtgate.StreamExecuteResponse StreamExecuteResponse}).
* Callers must therefore implement {@link #getResult(Object)} when instantiating this class.
*
* <p>The {@code StreamObserver} side will block until the result has been returned to the consumer
* by the {@code StreamIterator} side. Therefore, the {@link #close()} method must be called when
* done, to unblock the {@code StreamObserver} side.
*
* @param <V> The type of value sent through the {@link StreamObserver} interface.
* @param <E> The type of value to return through the {@link StreamIterator} interface.
*/
abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamIterator<E> {
abstract class GrpcStreamAdapter<V, E>
implements StreamObserver<V>, StreamIterator<E>, AutoCloseable {
/**
* getResult must be implemented to tell the adapter how to convert from
* the StreamObserver value type (V) to the StreamIterator value type (E).
* Before converting, getResult() should check for application-level errors
* in the RPC response and throw VitessException.
* in the RPC response and throw the appropriate SQLException.
* @param value The RPC response object.
* @return The result object to pass to the iterator consumer.
* @throws VitessException For errors originating within the Vitess server.
* @throws SQLException For errors originating within the Vitess server.
*/
abstract E getResult(V value) throws VitessException;
abstract E getResult(V value) throws SQLException;
private E nextValue;
private Throwable error;
private boolean completed = false;
private boolean closed = false;
@Override
public void onValue(V value) {
@ -42,8 +50,9 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
try {
// Wait until the previous value has been consumed.
while (nextValue != null) {
// If there's been an error, drain the rest of the stream without blocking.
if (error != null) {
// If there's been an error, or the iterator was closed, drain the rest of the stream
// without blocking.
if (closed || error != null) {
return;
}
@ -54,7 +63,7 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
notifyAll();
} catch (InterruptedException e) {
onError(e);
} catch (VitessException e) {
} catch (SQLException e) {
onError(e);
}
}
@ -71,13 +80,13 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
@Override
public void onError(Throwable error) {
synchronized (this) {
error = error;
this.error = error;
notifyAll();
}
}
@Override
public boolean hasNext() throws VitessException, VitessRpcException {
public boolean hasNext() throws SQLException {
synchronized (this) {
try {
// Wait for a new value to show up.
@ -86,10 +95,13 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
return false;
}
if (error != null) {
if (error instanceof VitessException) {
throw (VitessException) error;
// We got an error from the gRPC layer.
if (error instanceof SQLException) {
// If it's a type we can throw, just pass it through.
throw (SQLException) error;
} else {
throw new VitessRpcException("error in gRPC StreamIterator", error);
// Otherwise wrap it in a type we can throw.
throw new SQLException("error in gRPC StreamIterator", error);
}
}
@ -99,13 +111,13 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
return true;
} catch (InterruptedException e) {
onError(e);
throw new VitessRpcException("error in gRPC StreamIterator", e);
throw new SQLDataException("gRPC StreamIterator interrupted while waiting for value", e);
}
}
}
@Override
public E next() throws NoSuchElementException, VitessException, VitessRpcException {
public E next() throws NoSuchElementException, SQLException {
synchronized (this) {
if (hasNext()) {
E value = nextValue;
@ -117,4 +129,11 @@ abstract class GrpcStreamAdapter<V, E> implements StreamObserver<V>, StreamItera
}
}
}
@Override
public void close() throws Exception {
synchronized (this) {
closed = true;
}
}
}