deprecation: delete v2 java code

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2020-03-23 22:14:13 -07:00
Родитель 186247f9a2
Коммит 93852a412f
10 изменённых файлов: 16 добавлений и 1780 удалений

Просмотреть файл

@ -110,6 +110,19 @@ func (c *echoClient) Execute(ctx context.Context, session *vtgatepb.Session, sql
return c.fallbackClient.Execute(ctx, session, sql, bindVariables)
}
func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
if strings.HasPrefix(sql, EchoPrefix) {
callback(echoQueryResult(map[string]interface{}{
"callerId": callerid.EffectiveCallerIDFromContext(ctx),
"query": sql,
"bindVars": bindVariables,
"session": session,
}))
return nil
}
return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback)
}
func (c *echoClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
if len(sqlList) > 0 && strings.HasPrefix(sqlList[0], EchoPrefix) {
var queryResponse []sqltypes.QueryResponse

Просмотреть файл

@ -20,32 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.vitess.proto.Query.QueryResult;
import io.vitess.proto.Vtgate;
import io.vitess.proto.Vtgate.BeginRequest;
import io.vitess.proto.Vtgate.BeginResponse;
import io.vitess.proto.Vtgate.CommitRequest;
import io.vitess.proto.Vtgate.CommitResponse;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteBatchShardsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchShardsResponse;
import io.vitess.proto.Vtgate.ExecuteEntityIdsRequest;
import io.vitess.proto.Vtgate.ExecuteEntityIdsResponse;
import io.vitess.proto.Vtgate.ExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.ExecuteKeyRangesResponse;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteRequest;
import io.vitess.proto.Vtgate.ExecuteResponse;
import io.vitess.proto.Vtgate.ExecuteShardsRequest;
import io.vitess.proto.Vtgate.ExecuteShardsResponse;
import io.vitess.proto.Vtgate.GetSrvKeyspaceRequest;
import io.vitess.proto.Vtgate.GetSrvKeyspaceResponse;
import io.vitess.proto.Vtgate.RollbackRequest;
import io.vitess.proto.Vtgate.RollbackResponse;
import io.vitess.proto.Vtgate.StreamExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.StreamExecuteRequest;
import io.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.Vtgate.VStreamResponse;
@ -67,46 +44,6 @@ public interface RpcClient extends Closeable {
ListenableFuture<ExecuteResponse> execute(Context ctx, ExecuteRequest request)
throws SQLException;
/**
* Sends a single query to a set of shards.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteShardsResponse> executeShards(Context ctx, ExecuteShardsRequest request)
throws SQLException;
/**
* Sends a query with a set of keyspace IDs.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteKeyspaceIdsResponse> executeKeyspaceIds(
Context ctx, ExecuteKeyspaceIdsRequest request) throws SQLException;
/**
* Sends a query with a set of key ranges.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteKeyRangesResponse> executeKeyRanges(
Context ctx, ExecuteKeyRangesRequest request) throws SQLException;
/**
* Sends a query with a set of entity IDs.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteEntityIdsResponse> executeEntityIds(
Context ctx, ExecuteEntityIdsRequest request) throws SQLException;
/**
* Sends a list of queries using the VTGate V3 API.
*
@ -118,26 +55,6 @@ public interface RpcClient extends Closeable {
Vtgate.ExecuteBatchRequest request)
throws SQLException;
/**
* Sends a list of queries to a set of shards.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteBatchShardsResponse> executeBatchShards(
Context ctx, ExecuteBatchShardsRequest request) throws SQLException;
/**
* Sends a list of queries with keyspace ids as bind variables.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<ExecuteBatchKeyspaceIdsResponse> executeBatchKeyspaceIds(
Context ctx, ExecuteBatchKeyspaceIdsRequest request) throws SQLException;
/**
* Starts stream queries with the VTGate V3 API.
*
@ -153,89 +70,6 @@ public interface RpcClient extends Closeable {
StreamIterator<QueryResult> streamExecute(Context ctx, StreamExecuteRequest request)
throws SQLException;
/**
* Starts stream queries with multiple shards.
*
* <p>Note: Streaming queries are not asynchronous, because they typically shouldn't
* be used from a latency-critical serving path anyway. This method will return as soon as the
* request is initiated, but StreamIterator methods will block until the next chunk of results is
* received from the server.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
StreamIterator<QueryResult> streamExecuteShards(Context ctx, StreamExecuteShardsRequest request)
throws SQLException;
/**
* Starts a list of stream queries with keyspace ids as bind variables.
*
* <p>Note: Streaming queries are not asynchronous, because they typically shouldn't
* be used from a latency-critical serving path anyway. This method will return as soon as the
* request is initiated, but StreamIterator methods will block until the next chunk of results is
* received from the server.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
StreamIterator<QueryResult> streamExecuteKeyspaceIds(
Context ctx, StreamExecuteKeyspaceIdsRequest request) throws SQLException;
/**
* Starts stream query with a set of key ranges.
*
* <p>Note: Streaming queries are not asynchronous, because they typically shouldn't
* be used from a latency-critical serving path anyway. This method will return as soon as the
* request is initiated, but StreamIterator methods will block until the next chunk of results is
* received from the server.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
StreamIterator<QueryResult> streamExecuteKeyRanges(
Context ctx, StreamExecuteKeyRangesRequest request) throws SQLException;
/**
* Starts a transaction.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<BeginResponse> begin(Context ctx, BeginRequest request) throws SQLException;
/**
* Commits a transaction.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<CommitResponse> commit(Context ctx, CommitRequest request) throws SQLException;
/**
* Rolls back a pending transaction.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<RollbackResponse> rollback(Context ctx, RollbackRequest request)
throws SQLException;
/**
* Returns a list of serving keyspaces.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
ListenableFuture<GetSrvKeyspaceResponse> getSrvKeyspace(
Context ctx, GetSrvKeyspaceRequest request) throws SQLException;
/**
* Starts streaming the vstream binlog events.
*

Просмотреть файл

@ -19,11 +19,7 @@ package io.vitess.client;
import io.vitess.client.cursor.Cursor;
import io.vitess.client.cursor.CursorWithError;
import io.vitess.proto.Query;
import io.vitess.proto.Topodata.KeyRange;
import io.vitess.proto.Topodata.SrvKeyspace;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
import io.vitess.proto.Vtgate.BoundShardQuery;
import java.io.Closeable;
import java.io.IOException;
@ -75,39 +71,6 @@ public class VTGateBlockingConn implements Closeable {
return conn.execute(ctx, query, bindVars, tabletType, includedFields).checkedGet();
}
public Cursor executeShards(Context ctx, String query, String keyspace, Iterable<String> shards,
Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
return conn.executeShards(ctx, query, keyspace, shards, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn
.executeKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn
.executeKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeEntityIds(Context ctx, String query, String keyspace,
String entityColumnName, Map<byte[], ?> entityKeyspaceIds, Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.executeEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIds,
bindVars, tabletType, includedFields).checkedGet();
}
public List<CursorWithError> executeBatch(Context ctx, ArrayList<String> queryList,
@Nullable ArrayList<Map<String, ?>> bindVarsList, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
@ -123,76 +86,12 @@ public class VTGateBlockingConn implements Closeable {
.checkedGet();
}
/**
* Execute multiple keyspace ID queries as a batch.
*
* @param asTransaction If true, automatically create a transaction (per shard) that encloses
* all the batch queries.
*/
public List<Cursor> executeBatchShards(Context ctx, Iterable<? extends BoundShardQuery> queries,
TabletType tabletType, boolean asTransaction,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.executeBatchShards(ctx, queries, tabletType, asTransaction, includedFields)
.checkedGet();
}
/**
* Execute multiple keyspace ID queries as a batch.
*
* @param asTransaction If true, automatically create a transaction (per shard) that encloses
* all the batch queries.
*/
public List<Cursor> executeBatchKeyspaceIds(Context ctx,
Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
boolean asTransaction, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.executeBatchKeyspaceIds(ctx, queries, tabletType, asTransaction, includedFields)
.checkedGet();
}
public Cursor streamExecute(Context ctx, String query, Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.streamExecute(ctx, query, bindVars, tabletType, includedFields);
}
public Cursor streamExecuteShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn
.streamExecuteShards(ctx, query, keyspace, shards, bindVars, tabletType, includedFields);
}
public Cursor streamExecuteKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.streamExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType,
includedFields);
}
public Cursor streamExecuteKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return conn.streamExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType,
includedFields);
}
public VTGateBlockingTx begin(Context ctx) throws SQLException {
return begin(ctx, false);
}
public VTGateBlockingTx begin(Context ctx, boolean singleDB) throws SQLException {
return new VTGateBlockingTx(conn.begin(ctx, singleDB).checkedGet());
}
public SrvKeyspace getSrvKeyspace(Context ctx, String keyspace) throws SQLException {
return conn.getSrvKeyspace(ctx, keyspace).checkedGet();
}
@Override
public void close() throws IOException {
conn.close();

Просмотреть файл

@ -1,146 +0,0 @@
/*
* Copyright 2019 The Vitess Authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vitess.client;
import io.vitess.client.cursor.Cursor;
import io.vitess.client.cursor.CursorWithError;
import io.vitess.proto.Query;
import io.vitess.proto.Topodata.KeyRange;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
import io.vitess.proto.Vtgate.BoundShardQuery;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* A synchronous wrapper around a VTGate transaction.
*
* <p>This is a wrapper around the asynchronous {@link VTGateTx} class
* that converts all methods to synchronous.
*/
@Deprecated
public class VTGateBlockingTx {
private final VTGateTx tx;
/**
* Wraps an existing {@link VTGateTx} in a synchronous API.
*/
public VTGateBlockingTx(VTGateTx tx) {
this.tx = tx;
}
public Cursor execute(Context ctx, String query, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.execute(ctx, query, bindVars, tabletType, includedFields).checkedGet();
}
public Cursor executeShards(
Context ctx,
String query,
String keyspace,
Iterable<String> shards,
Map<String, ?> bindVars,
TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.executeShards(ctx, query, keyspace, shards, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeKeyspaceIds(
Context ctx,
String query,
String keyspace,
Iterable<byte[]> keyspaceIds,
Map<String, ?> bindVars,
TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx
.executeKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeKeyRanges(
Context ctx,
String query,
String keyspace,
Iterable<? extends KeyRange> keyRanges,
Map<String, ?> bindVars,
TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx
.executeKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, includedFields)
.checkedGet();
}
public Cursor executeEntityIds(
Context ctx,
String query,
String keyspace,
String entityColumnName,
Map<byte[], ?> entityKeyspaceIds,
Map<String, ?> bindVars,
TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.executeEntityIds(
ctx, query, keyspace, entityColumnName, entityKeyspaceIds, bindVars, tabletType,
includedFields)
.checkedGet();
}
public List<CursorWithError> executeBatch(Context ctx, List<String> queryList,
@Nullable List<Map<String, ?>> bindVarsList, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.executeBatch(ctx, queryList, bindVarsList, tabletType, includedFields).checkedGet();
}
public List<Cursor> executeBatchShards(
Context ctx, Iterable<? extends BoundShardQuery> queries, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.executeBatchShards(ctx, queries, tabletType, includedFields).checkedGet();
}
public List<Cursor> executeBatchKeyspaceIds(
Context ctx, Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
return tx.executeBatchKeyspaceIds(ctx, queries, tabletType, includedFields).checkedGet();
}
public void commit(Context ctx) throws SQLException {
commit(ctx, false);
}
public void commit(Context ctx, boolean atomic) throws SQLException {
tx.commit(ctx, atomic).checkedGet();
}
public void rollback(Context ctx) throws SQLException {
tx.rollback(ctx).checkedGet();
}
}

Просмотреть файл

@ -30,34 +30,11 @@ import io.vitess.client.cursor.CursorWithError;
import io.vitess.client.cursor.SimpleCursor;
import io.vitess.client.cursor.StreamCursor;
import io.vitess.proto.Query;
import io.vitess.proto.Topodata.KeyRange;
import io.vitess.proto.Topodata.SrvKeyspace;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate;
import io.vitess.proto.Vtgate.BeginRequest;
import io.vitess.proto.Vtgate.BeginResponse;
import io.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
import io.vitess.proto.Vtgate.BoundShardQuery;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteBatchShardsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchShardsResponse;
import io.vitess.proto.Vtgate.ExecuteEntityIdsRequest;
import io.vitess.proto.Vtgate.ExecuteEntityIdsResponse;
import io.vitess.proto.Vtgate.ExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.ExecuteKeyRangesResponse;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteRequest;
import io.vitess.proto.Vtgate.ExecuteResponse;
import io.vitess.proto.Vtgate.ExecuteShardsRequest;
import io.vitess.proto.Vtgate.ExecuteShardsResponse;
import io.vitess.proto.Vtgate.GetSrvKeyspaceRequest;
import io.vitess.proto.Vtgate.GetSrvKeyspaceResponse;
import io.vitess.proto.Vtgate.StreamExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.StreamExecuteRequest;
import io.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import java.io.Closeable;
import java.io.IOException;
@ -141,130 +118,6 @@ public final class VTGateConn implements Closeable {
directExecutor()));
}
public SQLFuture<Cursor> executeShards(Context ctx, String query, String keyspace,
Iterable<String> shards, @Nullable Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteShardsRequest.Builder requestBuilder =
ExecuteShardsRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllShards(checkNotNull(shards))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<Cursor>(
transformAsync(
client.executeShards(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteShardsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteShardsResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
}
public SQLFuture<Cursor> executeKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, @Nullable Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteKeyspaceIdsRequest.Builder requestBuilder = ExecuteKeyspaceIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllKeyspaceIds(
Iterables.transform(checkNotNull(keyspaceIds), Proto.BYTE_ARRAY_TO_BYTE_STRING))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<Cursor>(
transformAsync(
client.executeKeyspaceIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteKeyspaceIdsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteKeyspaceIdsResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
}
public SQLFuture<Cursor> executeKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, @Nullable Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteKeyRangesRequest.Builder requestBuilder = ExecuteKeyRangesRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllKeyRanges(checkNotNull(keyRanges))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<Cursor>(
transformAsync(
client.executeKeyRanges(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteKeyRangesResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteKeyRangesResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
}
public SQLFuture<Cursor> executeEntityIds(Context ctx, String query, String keyspace,
String entityColumnName, Map<byte[], ?> entityKeyspaceIds, @Nullable Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteEntityIdsRequest.Builder requestBuilder = ExecuteEntityIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.setEntityColumnName(checkNotNull(entityColumnName))
.addAllEntityKeyspaceIds(Iterables
.transform(entityKeyspaceIds.entrySet(), Proto.MAP_ENTRY_TO_ENTITY_KEYSPACE_ID))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<Cursor>(
transformAsync(
client.executeEntityIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteEntityIdsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteEntityIdsResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
}
public SQLFuture<List<CursorWithError>> executeBatch(Context ctx, List<String> queryList,
@Nullable List<Map<String, ?>> bindVarsList, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
@ -315,80 +168,6 @@ public final class VTGateConn implements Closeable {
directExecutor()));
}
/**
* Execute multiple keyspace ID queries as a batch.
*
* @param asTransaction If true, automatically create a transaction (per shard) that encloses
* all the batch queries.
*/
public SQLFuture<List<Cursor>> executeBatchShards(Context ctx,
Iterable<? extends BoundShardQuery> queries, TabletType tabletType, boolean asTransaction,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteBatchShardsRequest.Builder requestBuilder =
ExecuteBatchShardsRequest.newBuilder()
.addAllQueries(checkNotNull(queries))
.setTabletType(checkNotNull(tabletType))
.setAsTransaction(asTransaction)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<List<Cursor>>(
transformAsync(
client.executeBatchShards(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteBatchShardsResponse, List<Cursor>>() {
@Override
public ListenableFuture<List<Cursor>> apply(ExecuteBatchShardsResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<List<Cursor>>immediateFuture(
Proto.toCursorList(response.getResultsList()));
}
},
directExecutor()));
}
/**
* Execute multiple keyspace ID queries as a batch.
*
* @param asTransaction If true, automatically create a transaction (per shard) that encloses
* all the batch queries.
*/
public SQLFuture<List<Cursor>> executeBatchKeyspaceIds(Context ctx,
Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
boolean asTransaction, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
ExecuteBatchKeyspaceIdsRequest.Builder requestBuilder =
ExecuteBatchKeyspaceIdsRequest.newBuilder()
.addAllQueries(checkNotNull(queries))
.setTabletType(checkNotNull(tabletType))
.setAsTransaction(asTransaction)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<List<Cursor>>(
transformAsync(
client.executeBatchKeyspaceIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteBatchKeyspaceIdsResponse, List<Cursor>>() {
@Override
public ListenableFuture<List<Cursor>> apply(ExecuteBatchKeyspaceIdsResponse response)
throws Exception {
Proto.checkError(response.getError());
return Futures.<List<Cursor>>immediateFuture(
Proto.toCursorList(response.getResultsList()));
}
},
directExecutor()));
}
public Cursor streamExecute(Context ctx, String query, @Nullable Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
@ -407,104 +186,6 @@ public final class VTGateConn implements Closeable {
return new StreamCursor(client.streamExecute(ctx, requestBuilder.build()));
}
public Cursor streamExecuteShards(Context ctx, String query, String keyspace,
Iterable<String> shards, @Nullable Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
StreamExecuteShardsRequest.Builder requestBuilder = StreamExecuteShardsRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllShards(checkNotNull(shards))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new StreamCursor(client.streamExecuteShards(ctx, requestBuilder.build()));
}
public Cursor streamExecuteKeyspaceIds(Context ctx, String query, String keyspace,
Iterable<byte[]> keyspaceIds, @Nullable Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
StreamExecuteKeyspaceIdsRequest.Builder requestBuilder = StreamExecuteKeyspaceIdsRequest
.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllKeyspaceIds(
Iterables.transform(checkNotNull(keyspaceIds), Proto.BYTE_ARRAY_TO_BYTE_STRING))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new StreamCursor(client.streamExecuteKeyspaceIds(ctx, requestBuilder.build()));
}
public Cursor streamExecuteKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, @Nullable Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
StreamExecuteKeyRangesRequest.Builder requestBuilder = StreamExecuteKeyRangesRequest
.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setKeyspace(checkNotNull(keyspace))
.addAllKeyRanges(checkNotNull(keyRanges))
.setTabletType(checkNotNull(tabletType))
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new StreamCursor(client.streamExecuteKeyRanges(ctx, requestBuilder.build()));
}
public SQLFuture<VTGateTx> begin(Context ctx) throws SQLException {
return begin(ctx, false);
}
public SQLFuture<VTGateTx> begin(Context ctx, boolean singleDB) throws SQLException {
BeginRequest.Builder requestBuilder = BeginRequest.newBuilder().setSingleDb(singleDB);
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<VTGateTx>(
transformAsync(
client.begin(ctx, requestBuilder.build()),
new AsyncFunction<BeginResponse, VTGateTx>() {
@Override
public ListenableFuture<VTGateTx> apply(BeginResponse response) throws Exception {
return Futures.<VTGateTx>immediateFuture(
new VTGateTx(client, response.getSession(), keyspace));
}
},
directExecutor()));
}
public SQLFuture<SrvKeyspace> getSrvKeyspace(Context ctx, String keyspace) throws SQLException {
GetSrvKeyspaceRequest.Builder requestBuilder =
GetSrvKeyspaceRequest.newBuilder().setKeyspace(checkNotNull(keyspace));
return new SQLFuture<SrvKeyspace>(
transformAsync(
client.getSrvKeyspace(ctx, requestBuilder.build()),
new AsyncFunction<GetSrvKeyspaceResponse, SrvKeyspace>() {
@Override
public ListenableFuture<SrvKeyspace> apply(GetSrvKeyspaceResponse response)
throws Exception {
return Futures.<SrvKeyspace>immediateFuture(response.getSrvKeyspace());
}
},
directExecutor()));
}
@Override
public void close() throws IOException {
client.close();

Просмотреть файл

@ -1,456 +0,0 @@
/*
* Copyright 2019 The Vitess Authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vitess.client;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.vitess.client.cursor.Cursor;
import io.vitess.client.cursor.CursorWithError;
import io.vitess.client.cursor.SimpleCursor;
import io.vitess.proto.Query;
import io.vitess.proto.Topodata.KeyRange;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate;
import io.vitess.proto.Vtgate.BoundKeyspaceIdQuery;
import io.vitess.proto.Vtgate.BoundShardQuery;
import io.vitess.proto.Vtgate.CommitRequest;
import io.vitess.proto.Vtgate.CommitResponse;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteBatchShardsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchShardsResponse;
import io.vitess.proto.Vtgate.ExecuteEntityIdsRequest;
import io.vitess.proto.Vtgate.ExecuteEntityIdsResponse;
import io.vitess.proto.Vtgate.ExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.ExecuteKeyRangesResponse;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteRequest;
import io.vitess.proto.Vtgate.ExecuteResponse;
import io.vitess.proto.Vtgate.ExecuteShardsRequest;
import io.vitess.proto.Vtgate.ExecuteShardsResponse;
import io.vitess.proto.Vtgate.RollbackRequest;
import io.vitess.proto.Vtgate.RollbackResponse;
import io.vitess.proto.Vtgate.Session;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* An asynchronous VTGate transaction session.
*
* <p>Because {@code VTGateTx} manages a session cookie, only one operation can be in flight at a
* time on a given instance. The methods are {@code synchronized} only because the session cookie is
* updated asynchronously when the RPC response comes back.
*
* <p>After calling any method that returns a {@link SQLFuture}, you must wait for that future to
* complete before calling any other methods on that {@code VTGateTx} instance. An {@link
* IllegalStateException} will be thrown if this constraint is violated.
*
* <p>All operations on {@code VTGateTx} are asynchronous, including those whose ultimate return
* type is {@link Void}, such as {@link #commit(Context)} and {@link #rollback(Context)}. You must
* still wait for the futures returned by these methods to complete and check the error on them
* (such as by calling {@code checkedGet()} before you can assume the operation has finished
* successfully.
*
* <p>If you prefer a synchronous API, you can use {@link VTGateBlockingConn#begin(Context)}, which
* returns a {@link VTGateBlockingTx} instead.
*/
@Deprecated
public class VTGateTx {
private final RpcClient client;
private final String keyspace;
private Session session;
private SQLFuture<?> lastCall;
VTGateTx(RpcClient client, Session session, String keyspace) {
this.client = checkNotNull(client);
this.keyspace = checkNotNull(keyspace);
setSession(checkNotNull(session));
}
public synchronized SQLFuture<Cursor> execute(Context ctx, String query, Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
checkCallIsAllowed("execute");
ExecuteRequest.Builder requestBuilder =
ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
.setKeyspaceShard(keyspace)
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Cursor> call =
new SQLFuture<>(
transformAsync(
client.execute(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteResponse response) throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Cursor> executeShards(Context ctx, String query, String keyspace,
Iterable<String> shards, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
checkCallIsAllowed("executeShards");
ExecuteShardsRequest.Builder requestBuilder = ExecuteShardsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
.setKeyspace(keyspace)
.addAllShards(shards)
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Cursor> call =
new SQLFuture<>(
transformAsync(
client.executeShards(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteShardsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteShardsResponse response)
throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Cursor> executeKeyspaceIds(Context ctx, String query,
String keyspace, Iterable<byte[]> keyspaceIds, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
checkCallIsAllowed("executeKeyspaceIds");
ExecuteKeyspaceIdsRequest.Builder requestBuilder = ExecuteKeyspaceIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
.setKeyspace(keyspace)
.addAllKeyspaceIds(Iterables.transform(keyspaceIds, Proto.BYTE_ARRAY_TO_BYTE_STRING))
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Cursor> call =
new SQLFuture<>(
transformAsync(
client.executeKeyspaceIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteKeyspaceIdsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteKeyspaceIdsResponse response)
throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Cursor> executeKeyRanges(Context ctx, String query, String keyspace,
Iterable<? extends KeyRange> keyRanges, Map<String, ?> bindVars, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
checkCallIsAllowed("executeKeyRanges");
ExecuteKeyRangesRequest.Builder requestBuilder = ExecuteKeyRangesRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
.setKeyspace(keyspace)
.addAllKeyRanges(keyRanges)
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Cursor> call =
new SQLFuture<>(
transformAsync(
client.executeKeyRanges(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteKeyRangesResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteKeyRangesResponse response)
throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Cursor> executeEntityIds(Context ctx, String query, String keyspace,
String entityColumnName, Map<byte[], ?> entityKeyspaceIds, Map<String, ?> bindVars,
TabletType tabletType, Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
checkCallIsAllowed("executeEntityIds");
ExecuteEntityIdsRequest.Builder requestBuilder = ExecuteEntityIdsRequest.newBuilder()
.setQuery(Proto.bindQuery(query, bindVars))
.setKeyspace(keyspace)
.setEntityColumnName(entityColumnName)
.addAllEntityKeyspaceIds(Iterables
.transform(entityKeyspaceIds.entrySet(), Proto.MAP_ENTRY_TO_ENTITY_KEYSPACE_ID))
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Cursor> call =
new SQLFuture<>(
transformAsync(
client.executeEntityIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteEntityIdsResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteEntityIdsResponse response)
throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public SQLFuture<List<CursorWithError>> executeBatch(Context ctx, List<String> queryList,
@Nullable List<Map<String, ?>> bindVarsList, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields)
throws SQLException {
List<Query.BoundQuery> queries = new ArrayList<>();
if (null != bindVarsList && bindVarsList.size() != queryList.size()) {
throw new SQLDataException(
"Size of SQL Query list does not match the bind variables list");
}
for (int i = 0; i < queryList.size(); ++i) {
queries.add(i, Proto.bindQuery(checkNotNull(queryList.get(i)),
bindVarsList == null ? null : bindVarsList.get(i)));
}
Vtgate.ExecuteBatchRequest.Builder requestBuilder =
Vtgate.ExecuteBatchRequest.newBuilder()
.addAllQueries(checkNotNull(queries))
.setKeyspaceShard(keyspace)
.setTabletType(checkNotNull(tabletType))
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
return new SQLFuture<>(
transformAsync(
client.executeBatch(ctx, requestBuilder.build()),
new AsyncFunction<Vtgate.ExecuteBatchResponse, List<CursorWithError>>() {
@Override
public ListenableFuture<List<CursorWithError>> apply(
Vtgate.ExecuteBatchResponse response) throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.immediateFuture(
Proto.fromQueryResponsesToCursorList(response.getResultsList()));
}
},
directExecutor()));
}
public synchronized SQLFuture<List<Cursor>> executeBatchShards(Context ctx,
Iterable<? extends BoundShardQuery> queries, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
checkCallIsAllowed("executeBatchShards");
ExecuteBatchShardsRequest.Builder requestBuilder = ExecuteBatchShardsRequest.newBuilder()
.addAllQueries(queries)
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<List<Cursor>> call =
new SQLFuture<>(
transformAsync(
client.executeBatchShards(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteBatchShardsResponse, List<Cursor>>() {
@Override
public ListenableFuture<List<Cursor>> apply(ExecuteBatchShardsResponse response)
throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<List<Cursor>>immediateFuture(
Proto.toCursorList(response.getResultsList()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<List<Cursor>> executeBatchKeyspaceIds(Context ctx,
Iterable<? extends BoundKeyspaceIdQuery> queries, TabletType tabletType,
Query.ExecuteOptions.IncludedFields includedFields) throws SQLException {
checkCallIsAllowed("executeBatchKeyspaceIds");
ExecuteBatchKeyspaceIdsRequest.Builder requestBuilder = ExecuteBatchKeyspaceIdsRequest
.newBuilder()
.addAllQueries(queries)
.setTabletType(tabletType)
.setSession(session)
.setOptions(Query.ExecuteOptions.newBuilder()
.setIncludedFields(includedFields));
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<List<Cursor>> call =
new SQLFuture<>(
transformAsync(
client.executeBatchKeyspaceIds(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteBatchKeyspaceIdsResponse, List<Cursor>>() {
@Override
public ListenableFuture<List<Cursor>> apply(
ExecuteBatchKeyspaceIdsResponse response) throws Exception {
setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<List<Cursor>>immediateFuture(
Proto.toCursorList(response.getResultsList()));
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Void> commit(Context ctx) throws SQLException {
return commit(ctx, false);
}
public synchronized SQLFuture<Void> commit(Context ctx, boolean atomic) throws SQLException {
checkCallIsAllowed("commit");
CommitRequest.Builder requestBuilder = CommitRequest.newBuilder().setSession(session)
.setAtomic(atomic);
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Void> call =
new SQLFuture<>(
transformAsync(
client.commit(ctx, requestBuilder.build()),
new AsyncFunction<CommitResponse, Void>() {
@Override
public ListenableFuture<Void> apply(CommitResponse response) throws Exception {
setSession(null);
return Futures.<Void>immediateFuture(null);
}
},
directExecutor()));
lastCall = call;
return call;
}
public synchronized SQLFuture<Void> rollback(Context ctx) throws SQLException {
checkCallIsAllowed("rollback");
RollbackRequest.Builder requestBuilder = RollbackRequest.newBuilder().setSession(session);
if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}
SQLFuture<Void> call =
new SQLFuture<>(
transformAsync(
client.rollback(ctx, requestBuilder.build()),
new AsyncFunction<RollbackResponse, Void>() {
@Override
public ListenableFuture<Void> apply(RollbackResponse response) throws Exception {
setSession(null);
return Futures.<Void>immediateFuture(null);
}
},
directExecutor()));
lastCall = call;
return call;
}
protected synchronized void checkCallIsAllowed(String call) throws SQLException {
// Calls are not allowed to overlap.
if (lastCall != null && !lastCall.isDone()) {
throw new IllegalStateException("Can't call " + call
+ "() on a VTGateTx instance until the last asynchronous call is done.");
}
// All calls must occur within a valid transaction.
if (session == null || !session.getInTransaction()) {
throw new SQLDataException("Can't perform " + call + "() while not in transaction.");
}
}
protected synchronized void setSession(Session session) {
this.session = session;
}
}

Просмотреть файл

@ -31,11 +31,6 @@ import io.vitess.client.cursor.Cursor;
import io.vitess.client.cursor.Row;
import io.vitess.proto.Query;
import io.vitess.proto.Query.Field;
import io.vitess.proto.Topodata.KeyRange;
import io.vitess.proto.Topodata.KeyspaceIdType;
import io.vitess.proto.Topodata.ShardReference;
import io.vitess.proto.Topodata.SrvKeyspace;
import io.vitess.proto.Topodata.SrvKeyspace.KeyspacePartition;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.Vtgate.VStreamResponse;
@ -116,28 +111,6 @@ public abstract class RpcClientTest {
private static final String QUERY = "test query with unicode: \u6211\u80fd\u541e\u4e0b\u73bb\u7483\u800c\u4e0d\u50b7\u8eab\u9ad4";
private static final String KEYSPACE = "test_keyspace";
private static final List<String> SHARDS = Arrays.asList("-80", "80-");
private static final String SHARDS_ECHO = "[-80 80-]";
private static final List<byte[]> KEYSPACE_IDS =
Arrays.asList(new byte[]{1, 2, 3, 4}, new byte[]{5, 6, 7, 8});
private static final String KEYSPACE_IDS_ECHO = "[[1 2 3 4] [5 6 7 8]]";
private static final List<KeyRange> KEY_RANGES =
Arrays.asList(KeyRange.newBuilder().setStart(ByteString.copyFrom(new byte[]{1, 2, 3, 4}))
.setEnd(ByteString.copyFrom(new byte[]{5, 6, 7, 8})).build());
private static final String KEY_RANGES_ECHO =
"[start:\"\\001\\002\\003\\004\" end:\"\\005\\006\\007\\010\" ]";
private static final ImmutableMap<byte[], Object> ENTITY_KEYSPACE_IDS =
new ImmutableMap.Builder<byte[], Object>()
.put(new byte[]{1, 2, 3}, 123)
.put(new byte[]{4, 5, 6}, 2.5)
.put(new byte[]{7, 8, 9}, new byte[]{1, 2, 3})
.build();
private static final String ENTITY_KEYSPACE_IDS_ECHO =
"[type:INT64 value:\"123\" keyspace_id:\"\\001\\002\\003\" type:FLOAT64 value:\"2.5\" keyspace_id:\"\\004\\005\\006\" type:VARBINARY value:\"\\001\\002\\003\" keyspace_id:\"\\007\\010\\t\" ]";
private static final TabletType TABLET_TYPE = TabletType.REPLICA;
private static final String TABLET_TYPE_ECHO = TABLET_TYPE.toString();
private static final Query.ExecuteOptions.IncludedFields ALL_FIELDS = Query.ExecuteOptions.IncludedFields.ALL;
@ -152,12 +125,8 @@ public abstract class RpcClientTest {
private static final String BIND_VARS_ECHO =
"map[bytes:type:VARBINARY value:\"\\001\\002\\003\" float:type:FLOAT64 value:\"2.5\" int:type:INT64 value:\"123\" ]";
private static final String SESSION_ECHO = "in_transaction:true ";
private static final String NONTX_V3_SESSION_ECHO = "autocommit:true target_string:\"test_keyspace@replica\" options:<included_fields:ALL > ";
private static final String V3_SESSION_ECHO = "in_transaction:true target_string:\"test_keyspace@replica\" options:<included_fields:ALL > ";
private static final CallerID CALLER_ID = CallerID.newBuilder().setPrincipal("test_principal")
.setComponent("test_component").setSubcomponent("test_subcomponent").build();
private static final String CALLER_ID_ECHO =
@ -210,7 +179,7 @@ public abstract class RpcClientTest {
while (DateTime.now().isBefore(deadline)) {
try {
ctx = Context.getDefault().withDeadlineAfter(Duration.standardSeconds(30));
conn.getSrvKeyspace(ctx, "small");
conn.execute(ctx, ECHO_PREFIX + QUERY, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
// RPC succeeded. Stop testing.
break;
} catch (SQLTransientException e) {
@ -245,73 +214,6 @@ public abstract class RpcClientTest {
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(NONTX_V3_SESSION_ECHO, echo.get("session"));
echo = getEcho(
conn.executeShards(ctx, ECHO_PREFIX + QUERY, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE,
ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(SHARDS_ECHO, echo.get("shards"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.executeKeyspaceIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEYSPACE_IDS,
BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEYSPACE_IDS_ECHO, echo.get("keyspaceIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.executeKeyRanges(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEY_RANGES, BIND_VARS,
TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEY_RANGES_ECHO, echo.get("keyRanges"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.executeEntityIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, "column1",
ENTITY_KEYSPACE_IDS, BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals("column1", echo.get("entityColumnName"));
Assert.assertEquals(ENTITY_KEYSPACE_IDS_ECHO, echo.get("entityIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.executeBatchShards(ctx,
Arrays.asList(Proto.bindShardQuery(KEYSPACE, SHARDS, ECHO_PREFIX + QUERY, BIND_VARS)),
TABLET_TYPE, true, ALL_FIELDS).get(0));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(SHARDS_ECHO, echo.get("shards"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals("true", echo.get("asTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.executeBatchKeyspaceIds(ctx,
Arrays.asList(
Proto.bindKeyspaceIdQuery(KEYSPACE, KEYSPACE_IDS, ECHO_PREFIX + QUERY, BIND_VARS)),
TABLET_TYPE, true, ALL_FIELDS).get(0));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEYSPACE_IDS_ECHO, echo.get("keyspaceIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals("true", echo.get("asTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
}
@Test
@ -324,154 +226,6 @@ public abstract class RpcClientTest {
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(NONTX_V3_SESSION_ECHO, echo.get("session"));
echo = getEcho(conn.streamExecuteShards(ctx, ECHO_PREFIX + QUERY, KEYSPACE, SHARDS, BIND_VARS,
TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(SHARDS_ECHO, echo.get("shards"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.streamExecuteKeyspaceIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEYSPACE_IDS,
BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEYSPACE_IDS_ECHO, echo.get("keyspaceIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(conn.streamExecuteKeyRanges(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEY_RANGES,
BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEY_RANGES_ECHO, echo.get("keyRanges"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
}
@Test
public void testEchoTransactionExecute() throws Exception {
Map<String, String> echo;
VTGateBlockingTx tx = conn.begin(ctx);
echo = getEcho(tx.execute(ctx, ECHO_PREFIX + QUERY, BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(V3_SESSION_ECHO, echo.get("session"));
// V3 returns additional session artifacts that V2
// doesn't care about. So, start with a new session
// before testing V2 functionality.
tx.rollback(ctx);
tx = conn.begin(ctx);
echo = getEcho(
tx.executeShards(ctx, ECHO_PREFIX + QUERY, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE,
ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(SHARDS_ECHO, echo.get("shards"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("notInTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(tx.executeKeyspaceIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEYSPACE_IDS,
BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEYSPACE_IDS_ECHO, echo.get("keyspaceIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("notInTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(tx.executeKeyRanges(ctx, ECHO_PREFIX + QUERY, KEYSPACE, KEY_RANGES, BIND_VARS,
TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEY_RANGES_ECHO, echo.get("keyRanges"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("notInTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(tx.executeEntityIds(ctx, ECHO_PREFIX + QUERY, KEYSPACE, "column1",
ENTITY_KEYSPACE_IDS, BIND_VARS, TABLET_TYPE, ALL_FIELDS));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals("column1", echo.get("entityColumnName"));
Assert.assertEquals(ENTITY_KEYSPACE_IDS_ECHO, echo.get("entityIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("notInTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
tx.rollback(ctx);
tx = conn.begin(ctx);
echo = getEcho(tx.executeBatchShards(ctx,
Arrays.asList(Proto.bindShardQuery(KEYSPACE, SHARDS, ECHO_PREFIX + QUERY, BIND_VARS)),
TABLET_TYPE, ALL_FIELDS).get(0));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(SHARDS_ECHO, echo.get("shards"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("asTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
echo = getEcho(tx.executeBatchKeyspaceIds(ctx,
Arrays.asList(
Proto.bindKeyspaceIdQuery(KEYSPACE, KEYSPACE_IDS, ECHO_PREFIX + QUERY, BIND_VARS)),
TABLET_TYPE, ALL_FIELDS).get(0));
Assert.assertEquals(CALLER_ID_ECHO, echo.get("callerId"));
Assert.assertEquals(ECHO_PREFIX + QUERY, echo.get("query"));
Assert.assertEquals(KEYSPACE, echo.get("keyspace"));
Assert.assertEquals(KEYSPACE_IDS_ECHO, echo.get("keyspaceIds"));
Assert.assertEquals(BIND_VARS_ECHO, echo.get("bindVars"));
Assert.assertEquals(TABLET_TYPE_ECHO, echo.get("tabletType"));
Assert.assertEquals(SESSION_ECHO, echo.get("session"));
Assert.assertEquals("false", echo.get("asTransaction"));
Assert.assertEquals(OPTIONS_ALL_FIELDS_ECHO, echo.get("options"));
tx.commit(ctx);
}
@Test
public void testGetSrvKeyspace() throws Exception {
SrvKeyspace expected = SrvKeyspace.newBuilder()
.addPartitions(KeyspacePartition.newBuilder().setServedType(TabletType.REPLICA)
.addShardReferences(ShardReference.newBuilder().setName("shard0").setKeyRange(KeyRange
.newBuilder().setStart(ByteString.copyFrom(new byte[]{0x40, 0, 0, 0, 0, 0, 0, 0}))
.setEnd(ByteString.copyFrom(new byte[]{(byte) 0x80, 0, 0, 0, 0, 0, 0, 0})).build())
.build())
.build())
.setShardingColumnName("sharding_column_name")
.setShardingColumnType(KeyspaceIdType.UINT64).addServedFrom(SrvKeyspace.ServedFrom
.newBuilder().setTabletType(TabletType.MASTER).setKeyspace("other_keyspace").build())
.build();
SrvKeyspace actual = conn.getSrvKeyspace(ctx, "big");
Assert.assertEquals(expected, actual);
}
abstract static class Executable {
@ -512,72 +266,6 @@ public abstract class RpcClientTest {
checkExecuteErrors(exe, false);
}
abstract static class TransactionExecutable {
abstract void execute(VTGateBlockingTx tx, String query) throws Exception;
}
void checkTransactionExecuteErrors(TransactionExecutable exe) throws Exception {
for (String error : EXECUTE_ERRORS.keySet()) {
Class<?> cls = EXECUTE_ERRORS.get(error);
try {
VTGateBlockingTx tx = conn.begin(ctx);
String query = ERROR_PREFIX + error;
exe.execute(tx, query);
Assert.fail("no exception thrown for " + query);
} catch (Exception e) {
Assert.assertEquals(cls, e.getClass());
if (error.equals("integrity error")) {
// The mock test server sends back errno:1062 sqlstate:23000 for this case.
// Make sure these values get properly extracted by the client.
SQLException sqlException = (SQLException) e;
Assert.assertEquals(1062, sqlException.getErrorCode());
Assert.assertEquals("23000", sqlException.getSQLState());
}
}
// Don't close the transaction on partial error.
VTGateBlockingTx tx = conn.begin(ctx);
try {
String query = PARTIAL_ERROR_PREFIX + error;
exe.execute(tx, query);
Assert.fail("no exception thrown for " + query);
} catch (Exception e) {
Assert.assertEquals(cls, e.getClass());
if (error.equals("integrity error")) {
// The mock test server sends back errno:1062 sqlstate:23000 for this case.
// Make sure these values get properly extracted by the client.
SQLException sqlException = (SQLException) e;
Assert.assertEquals(1062, sqlException.getErrorCode());
Assert.assertEquals("23000", sqlException.getSQLState());
}
}
// The transaction should still be usable now.
tx.rollback(ctx);
// Close the transaction on partial error.
tx = conn.begin(ctx);
try {
String query = PARTIAL_ERROR_PREFIX + error + "/close transaction";
exe.execute(tx, query);
Assert.fail("no exception thrown for " + query);
} catch (Exception e) {
Assert.assertEquals(cls, e.getClass());
}
// The transaction should be unusable now.
try {
tx.rollback(ctx);
Assert.fail("no exception thrown for rollback() after closed transaction");
} catch (Exception e) {
Assert.assertEquals(SQLDataException.class, e.getClass());
Assert.assertEquals(true, e.getMessage().contains("not in transaction"));
}
}
}
@Test
public void testExecuteErrors() throws Exception {
checkExecuteErrors(new Executable() {
@ -586,48 +274,6 @@ public abstract class RpcClientTest {
conn.execute(ctx, query, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeShards(ctx, query, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeKeyspaceIds(ctx, query, KEYSPACE, KEYSPACE_IDS, BIND_VARS, TABLET_TYPE,
ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeKeyRanges(ctx, query, KEYSPACE, KEY_RANGES, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeEntityIds(ctx, query, KEYSPACE, "column1", ENTITY_KEYSPACE_IDS, BIND_VARS,
TABLET_TYPE, ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeBatchShards(ctx,
Arrays.asList(Proto.bindShardQuery(KEYSPACE, SHARDS, query, BIND_VARS)), TABLET_TYPE,
true, ALL_FIELDS);
}
});
checkExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.executeBatchKeyspaceIds(ctx,
Arrays.asList(Proto.bindKeyspaceIdQuery(KEYSPACE, KEYSPACE_IDS, query, BIND_VARS)),
TABLET_TYPE, true, ALL_FIELDS);
}
});
}
@Test
@ -638,81 +284,6 @@ public abstract class RpcClientTest {
conn.streamExecute(ctx, query, BIND_VARS, TABLET_TYPE, ALL_FIELDS).next();
}
});
checkStreamExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.streamExecuteShards(ctx, query, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE, ALL_FIELDS)
.next();
}
});
checkStreamExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.streamExecuteKeyspaceIds(ctx, query, KEYSPACE, KEYSPACE_IDS, BIND_VARS, TABLET_TYPE,
ALL_FIELDS)
.next();
}
});
checkStreamExecuteErrors(new Executable() {
@Override
void execute(String query) throws Exception {
conn.streamExecuteKeyRanges(ctx, query, KEYSPACE, KEY_RANGES, BIND_VARS, TABLET_TYPE,
ALL_FIELDS)
.next();
}
});
}
@Test
public void testTransactionExecuteErrors() throws Exception {
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.execute(ctx, query, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeShards(ctx, query, KEYSPACE, SHARDS, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeKeyspaceIds(ctx, query, KEYSPACE, KEYSPACE_IDS, BIND_VARS, TABLET_TYPE,
ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeKeyRanges(ctx, query, KEYSPACE, KEY_RANGES, BIND_VARS, TABLET_TYPE, ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeEntityIds(ctx, query, KEYSPACE, "column1", ENTITY_KEYSPACE_IDS, BIND_VARS,
TABLET_TYPE, ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeBatchShards(ctx,
Arrays.asList(Proto.bindShardQuery(KEYSPACE, SHARDS, query, BIND_VARS)), TABLET_TYPE,
ALL_FIELDS);
}
});
checkTransactionExecuteErrors(new TransactionExecutable() {
@Override
void execute(VTGateBlockingTx tx, String query) throws Exception {
tx.executeBatchKeyspaceIds(ctx,
Arrays.asList(Proto.bindKeyspaceIdQuery(KEYSPACE, KEYSPACE_IDS, query, BIND_VARS)),
TABLET_TYPE, ALL_FIELDS);
}
});
}
@Test

Просмотреть файл

@ -122,25 +122,4 @@ public class TestUtil {
getRpcClientFactory().create(ctx, "localhost:" + testEnv.getPort()),
testEnv.getKeyspace());
}
public static void insertRows(TestEnv testEnv, int startId, int count) throws Exception {
try (VTGateBlockingConn conn = getBlockingConn(testEnv)) {
// Deadline for the overall insert loop
Context ctx = Context.getDefault().withDeadlineAfter(Duration.millis(5000));
VTGateBlockingTx tx = conn.begin(ctx);
String insertSql = "insert into vtgate_test "
+ "(id, name, age, percent) values (:id, :name, :age, :percent)";
Map<String, Object> bindVars = new HashMap<>();
for (int id = startId; id - startId < count; id++) {
bindVars.put("id", id);
bindVars.put("name", "name_" + id);
bindVars.put("age", id % 10);
bindVars.put("percent", id / 100.0);
tx.execute(ctx, insertSql, bindVars, TabletType.MASTER,
Query.ExecuteOptions.IncludedFields.ALL);
}
tx.commit(ctx);
}
}
}

Просмотреть файл

@ -31,36 +31,10 @@ import io.vitess.client.RpcClient;
import io.vitess.client.StreamIterator;
import io.vitess.proto.Query.QueryResult;
import io.vitess.proto.Vtgate;
import io.vitess.proto.Vtgate.BeginRequest;
import io.vitess.proto.Vtgate.BeginResponse;
import io.vitess.proto.Vtgate.CommitRequest;
import io.vitess.proto.Vtgate.CommitResponse;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteBatchShardsRequest;
import io.vitess.proto.Vtgate.ExecuteBatchShardsResponse;
import io.vitess.proto.Vtgate.ExecuteEntityIdsRequest;
import io.vitess.proto.Vtgate.ExecuteEntityIdsResponse;
import io.vitess.proto.Vtgate.ExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.ExecuteKeyRangesResponse;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.ExecuteKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.ExecuteRequest;
import io.vitess.proto.Vtgate.ExecuteResponse;
import io.vitess.proto.Vtgate.ExecuteShardsRequest;
import io.vitess.proto.Vtgate.ExecuteShardsResponse;
import io.vitess.proto.Vtgate.GetSrvKeyspaceRequest;
import io.vitess.proto.Vtgate.GetSrvKeyspaceResponse;
import io.vitess.proto.Vtgate.RollbackRequest;
import io.vitess.proto.Vtgate.RollbackResponse;
import io.vitess.proto.Vtgate.StreamExecuteKeyRangesRequest;
import io.vitess.proto.Vtgate.StreamExecuteKeyRangesResponse;
import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsResponse;
import io.vitess.proto.Vtgate.StreamExecuteRequest;
import io.vitess.proto.Vtgate.StreamExecuteResponse;
import io.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import io.vitess.proto.Vtgate.StreamExecuteShardsResponse;
import io.vitess.proto.Vtgate.VStreamResponse;
import io.vitess.proto.grpc.VitessGrpc;
import io.vitess.proto.grpc.VitessGrpc.VitessFutureStub;
@ -141,34 +115,6 @@ public class GrpcClient implements RpcClient {
new ExceptionConverter<ExecuteResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteShardsResponse> executeShards(Context ctx,
ExecuteShardsRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeShards(request), Exception.class,
new ExceptionConverter<ExecuteShardsResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteKeyspaceIdsResponse> executeKeyspaceIds(Context ctx,
ExecuteKeyspaceIdsRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeKeyspaceIds(request), Exception.class,
new ExceptionConverter<ExecuteKeyspaceIdsResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteKeyRangesResponse> executeKeyRanges(Context ctx,
ExecuteKeyRangesRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeKeyRanges(request), Exception.class,
new ExceptionConverter<ExecuteKeyRangesResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteEntityIdsResponse> executeEntityIds(Context ctx,
ExecuteEntityIdsRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeEntityIds(request), Exception.class,
new ExceptionConverter<ExecuteEntityIdsResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<Vtgate.ExecuteBatchResponse> executeBatch(Context ctx,
Vtgate.ExecuteBatchRequest request) throws SQLException {
@ -176,21 +122,6 @@ public class GrpcClient implements RpcClient {
new ExceptionConverter<Vtgate.ExecuteBatchResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteBatchShardsResponse> executeBatchShards(Context ctx,
ExecuteBatchShardsRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeBatchShards(request), Exception.class,
new ExceptionConverter<ExecuteBatchShardsResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<ExecuteBatchKeyspaceIdsResponse> executeBatchKeyspaceIds(Context ctx,
ExecuteBatchKeyspaceIdsRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).executeBatchKeyspaceIds(request),
Exception.class, new ExceptionConverter<ExecuteBatchKeyspaceIdsResponse>(),
MoreExecutors.directExecutor());
}
@Override
public StreamIterator<QueryResult> streamExecute(Context ctx, StreamExecuteRequest request)
throws SQLException {
@ -205,76 +136,6 @@ public class GrpcClient implements RpcClient {
return adapter;
}
@Override
public StreamIterator<QueryResult> streamExecuteShards(Context ctx,
StreamExecuteShardsRequest request) throws SQLException {
GrpcStreamAdapter<StreamExecuteShardsResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteShardsResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteShardsResponse response) throws SQLException {
return response.getResult();
}
};
getAsyncStub(ctx).streamExecuteShards(request, adapter);
return adapter;
}
@Override
public StreamIterator<QueryResult> streamExecuteKeyspaceIds(Context ctx,
StreamExecuteKeyspaceIdsRequest request) throws SQLException {
GrpcStreamAdapter<StreamExecuteKeyspaceIdsResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteKeyspaceIdsResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteKeyspaceIdsResponse response) throws SQLException {
return response.getResult();
}
};
getAsyncStub(ctx).streamExecuteKeyspaceIds(request, adapter);
return adapter;
}
@Override
public StreamIterator<QueryResult> streamExecuteKeyRanges(Context ctx,
StreamExecuteKeyRangesRequest request) throws SQLException {
GrpcStreamAdapter<StreamExecuteKeyRangesResponse, QueryResult> adapter =
new GrpcStreamAdapter<StreamExecuteKeyRangesResponse, QueryResult>() {
@Override
QueryResult getResult(StreamExecuteKeyRangesResponse response) throws SQLException {
return response.getResult();
}
};
getAsyncStub(ctx).streamExecuteKeyRanges(request, adapter);
return adapter;
}
@Override
public ListenableFuture<BeginResponse> begin(Context ctx, BeginRequest request)
throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).begin(request), Exception.class,
new ExceptionConverter<BeginResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<CommitResponse> commit(Context ctx, CommitRequest request)
throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).commit(request), Exception.class,
new ExceptionConverter<CommitResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<RollbackResponse> rollback(Context ctx, RollbackRequest request)
throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).rollback(request), Exception.class,
new ExceptionConverter<RollbackResponse>(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<GetSrvKeyspaceResponse> getSrvKeyspace(Context ctx,
GetSrvKeyspaceRequest request) throws SQLException {
return Futures.catchingAsync(getFutureStub(ctx).getSrvKeyspace(request), Exception.class,
new ExceptionConverter<GetSrvKeyspaceResponse>(), MoreExecutors.directExecutor());
}
@Override
public StreamIterator<Vtgate.VStreamResponse> getVStream(Context ctx,
Vtgate.VStreamRequest vstreamRequest) {

Просмотреть файл

@ -25,7 +25,7 @@ import io.vitess.client.RpcClient;
import io.vitess.client.RpcClientTest;
import io.vitess.client.grpc.GrpcClientFactory;
import io.vitess.client.grpc.StaticAuthCredentials;
import io.vitess.proto.Vtgate.GetSrvKeyspaceRequest;
import io.vitess.proto.Vtgate.ExecuteRequest;
import org.joda.time.Duration;
@ -92,7 +92,7 @@ public class GrpcClientStaticAuthTest extends RpcClientTest {
.setCallCredentials(new StaticAuthCredentials("test-username", "WRONG-password"))
.create(Context.getDefault(), "localhost:" + port);
try {
client.getSrvKeyspace(Context.getDefault(), GetSrvKeyspaceRequest.getDefaultInstance()).get();
client.execute(Context.getDefault(), ExecuteRequest.getDefaultInstance()).get();
Assert.fail();
} catch (ExecutionException e) {
StatusRuntimeException cause = (StatusRuntimeException) Throwables.getRootCause(e);