Implement gRPC API to store indexed data (#4820)

This commit is contained in:
Mahati Chamarthy 2023-01-26 17:22:41 +00:00 коммит произвёл GitHub
Родитель 8727c73cb1
Коммит d3893eccaf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 778 добавлений и 23 удалений

Просмотреть файл

@ -15,17 +15,32 @@ add_subdirectory(protobuf)
add_ccf_app(
external_executor
SRCS external_executor.cpp
SRCS external_executor.cpp external_executor_indexing.cpp
INCLUDE_DIRS "${CMAKE_CURRENT_BINARY_DIR}/protobuf"
LINK_LIBS_ENCLAVE
executor_registration.proto.enclave kv.proto.enclave status.proto.enclave
misc.proto.enclave historical.proto.enclave protobuf.enclave
executor_registration.proto.enclave
kv.proto.enclave
status.proto.enclave
misc.proto.enclave
historical.proto.enclave
index.proto.enclave
protobuf.enclave
LINK_LIBS_VIRTUAL
executor_registration.proto.virtual kv.proto.virtual status.proto.virtual
misc.proto.virtual historical.proto.virtual protobuf.virtual
executor_registration.proto.virtual
kv.proto.virtual
status.proto.virtual
misc.proto.virtual
historical.proto.virtual
index.proto.virtual
protobuf.virtual
LINK_LIBS_SNP
executor_registration.proto.virtual kv.proto.virtual status.proto.virtual
misc.proto.virtual historical.proto.virtual protobuf.snp
executor_registration.proto.virtual
kv.proto.virtual
status.proto.virtual
misc.proto.virtual
historical.proto.virtual
index.proto.virtual
protobuf.snp
)
# Generate an ephemeral signing key

Просмотреть файл

@ -1,12 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ccf/app_interface.h"
#include "ccf/common_auth_policies.h"
#include "ccf/crypto/verifier.h"
#include "executor_code_id.h"
namespace externalexecutor
{
// This uses std::string to match protobuf's storage of raw bytes entries, and
// directly stores those raw bytes. Note that these strings may contain nulls
// and other unprintable characters, so may not be trivially displayable.
using Map = kv::RawCopySerialisedMap<std::string, std::string>;
struct ExecutorIdFormatter
{
static std::string format(const std::string& core)
@ -28,8 +36,8 @@ namespace externalexecutor
using ExecutorIDMap = std::map<ExecutorId, ExecutorNodeInfo>;
using ExecutorCertsMap = std::map<ExecutorId, crypto::Pem>;
ExecutorIDMap executor_ids;
ExecutorCertsMap executor_certs;
static ExecutorIDMap executor_ids;
static ExecutorCertsMap executor_certs;
struct ExecutorIdentity : public ccf::AuthnIdentity
{

Просмотреть файл

@ -9,15 +9,14 @@
#include "ccf/http_consts.h"
#include "ccf/http_responder.h"
#include "ccf/json_handler.h"
#include "ccf/kv/map.h"
#include "ccf/pal/locking.h"
#include "ccf/service/tables/nodes.h"
#include "endpoints/grpc/grpc.h"
#include "executor_auth_policy.h"
#include "executor_code_id.h"
#include "executor_registration.pb.h"
#include "external_executor_indexing.h"
#include "historical.pb.h"
#include "http/http_builder.h"
#include "index.pb.h"
#include "kv.pb.h"
#include "misc.pb.h"
#include "node/endpoint_context_impl.h"
@ -32,11 +31,6 @@
namespace externalexecutor
{
// This uses std::string to match protobuf's storage of raw bytes entries, and
// directly stores those raw bytes. Note that these strings may contain nulls
// and other unprintable characters, so may not be trivially displayable.
using Map = kv::RawCopySerialisedMap<std::string, std::string>;
class EndpointRegistry : public ccf::UserEndpointRegistry
{
struct RequestInfo
@ -57,6 +51,7 @@ namespace externalexecutor
ccf::grpc::DetachedStreamPtr<externalexecutor::protobuf::Work>
work_stream;
};
std::unordered_map<ExecutorId, ExecutorInfo> active_executors;
struct ExecutorIdList
@ -129,6 +124,170 @@ namespace externalexecutor
return nullptr;
}
std::unordered_map<std::string, MapStrategyPtr> map_index_strategies;
void install_index_service(ccfapp::AbstractNodeContext& node_context)
{
auto executor_auth_policy = std::make_shared<ExecutorAuthPolicy>();
ccf::AuthnPolicies executor_only{executor_auth_policy};
// registers the index and will store the streamptr
auto install_and_subscribe_index =
[this, &node_context](
ccf::endpoints::CommandEndpointContext& ctx,
externalexecutor::protobuf::IndexInstall&& payload,
ccf::grpc::StreamPtr<externalexecutor::protobuf::IndexWork>&&
out_stream) -> ccf::grpc::GrpcAdapterStreamingResponse {
std::string strategy = payload.strategy_name();
auto it = map_index_strategies.find(strategy);
if (it != map_index_strategies.end())
{
return ccf::grpc::make_error(
GRPC_STATUS_ALREADY_EXISTS,
fmt::format("Strategy {} already exists", strategy));
}
auto executor_id = get_caller_executor_id(ctx);
auto ds = payload.data_structure();
// Mark default ds as MAP
IndexDataStructure data_structure = MAP;
if (ds == externalexecutor::protobuf::IndexInstall::PREFIX_TREE)
{
data_structure = PREFIX_TREE;
}
// signal to the indexer that it is now subscribed
externalexecutor::protobuf::IndexWork work;
work.mutable_subscribed();
out_stream->stream_msg(work);
std::shared_ptr<ExecutorIndex> map_index =
std::make_shared<ExecutorIndex>(
payload.map_name(),
strategy,
data_structure,
executor_id,
ctx,
node_context,
std::move(out_stream));
node_context.get_indexing_strategies().install_strategy(map_index);
// store the index pointer into the strategies
map_index_strategies[strategy] = map_index;
return ccf::grpc::make_pending();
};
make_endpoint(
"/externalexecutor.protobuf.Index/InstallAndSubscribe",
HTTP_POST,
ccf::grpc_command_unary_stream_adapter<
externalexecutor::protobuf::IndexInstall,
externalexecutor::protobuf::IndexWork>(install_and_subscribe_index),
executor_only)
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
auto store_indexed_data =
[this](
ccf::endpoints::EndpointContext& ctx,
externalexecutor::protobuf::IndexPayload&& payload)
-> ccf::grpc::GrpcAdapterResponse<google::protobuf::Empty> {
std::string strategy = payload.strategy_name();
auto it = map_index_strategies.find(strategy);
if (it == map_index_strategies.end())
{
return ccf::grpc::make_error(
GRPC_STATUS_NOT_FOUND,
fmt::format("Strategy {} doesn't exist", strategy));
}
std::string key = payload.key();
std::string val = payload.value();
map_index_strategies[strategy]->store(key, val);
return ccf::grpc::make_success();
};
make_endpoint(
"/externalexecutor.protobuf.Index/StoreIndexedData",
HTTP_POST,
ccf::grpc_adapter<
externalexecutor::protobuf::IndexPayload,
google::protobuf::Empty>(store_indexed_data),
executor_only)
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
auto get_indexed_data = [this](
ccf::endpoints::ReadOnlyEndpointContext& ctx,
externalexecutor::protobuf::IndexKey&& payload)
-> ccf::grpc::GrpcAdapterResponse<
externalexecutor::protobuf::IndexValue> {
std::string strategy_name = payload.strategy_name();
auto it = map_index_strategies.find(strategy_name);
if (it == map_index_strategies.end())
{
return ccf::grpc::make_error(
GRPC_STATUS_NOT_FOUND,
fmt::format("Index {} is not found", strategy_name));
}
auto executor_id = get_caller_executor_id(ctx);
auto strategy_ptr = map_index_strategies[strategy_name];
std::string key = payload.key();
std::optional<std::string> data = strategy_ptr->fetch(key);
externalexecutor::protobuf::IndexValue response;
if (data.has_value())
{
response.set_value(data.value());
}
return ccf::grpc::make_success(response);
};
make_read_only_endpoint(
"/externalexecutor.protobuf.Index/GetIndexedData",
HTTP_POST,
ccf::grpc_read_only_adapter<
externalexecutor::protobuf::IndexKey,
externalexecutor::protobuf::IndexValue>(get_indexed_data),
executor_only)
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
auto unsubscribe_index =
[this](
ccf::endpoints::CommandEndpointContext& ctx,
externalexecutor::protobuf::IndexStrategy&& payload)
-> ccf::grpc::GrpcAdapterResponse<google::protobuf::Empty> {
auto it = map_index_strategies.find(payload.strategy_name());
if (it != map_index_strategies.end())
{
it->second->is_indexer_active = false;
};
externalexecutor::protobuf::IndexWork work;
work.mutable_work_done();
it->second->detached_stream->stream_msg(work);
return ccf::grpc::make_success();
};
make_endpoint(
"/externalexecutor.protobuf.Index/Unsubscribe",
HTTP_POST,
ccf::grpc_adapter<
externalexecutor::protobuf::IndexStrategy,
google::protobuf::Empty>(unsubscribe_index),
executor_only)
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
}
void install_registry_service()
{
// create an endpoint to get executor code id
@ -814,6 +973,8 @@ namespace externalexecutor
install_kv_service(context);
install_index_service(context);
auto run_string_ops = [this](
ccf::endpoints::CommandEndpointContext& ctx,
std::vector<temp::OpIn>&& payload,

Просмотреть файл

@ -0,0 +1,163 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "external_executor_indexing.h"
namespace externalexecutor
{
MapIndex::MapIndex(
const std::shared_ptr<ccf::indexing::AbstractLFSAccess>& lfs_access_,
const std::string& map) :
lfs_access(lfs_access_),
indexed_data(lfs_access_, 1000, map),
map_name(map)
{}
std::optional<std::string> MapIndex::fetch_data(const std::string& key)
{
std::lock_guard<ccf::pal::Mutex> guard(results_access);
const auto results = results_in_progress.find(key);
if (results != results_in_progress.end())
{
auto& bucket_value = results->second;
// We were already trying to fetch this. If it's finished fetching,
// parse and store the result
const auto fetch_result = bucket_value->fetch_result;
switch (fetch_result)
{
case (ccf::indexing::FetchResult::Fetching):
{
return std::nullopt;
}
case (ccf::indexing::FetchResult::Loaded):
{
std::string val(
bucket_value->contents.begin(), bucket_value->contents.end());
return val;
}
case (ccf::indexing::FetchResult::NotFound):
case (ccf::indexing::FetchResult::Corrupt):
{
const auto problem =
fetch_result == ccf::indexing::FetchResult::NotFound ? "missing" :
"corrupt";
LOG_FAIL_FMT("A file that indexer requires is {}.", problem);
LOG_DEBUG_FMT("The {} file is {}", problem, bucket_value->key);
return std::nullopt;
break;
}
}
}
else
{
// We're not currently fetching this. First check if it's in our
// current results
const auto current_result = indexed_data.find(key);
if (current_result.has_value())
{
return current_result.value();
}
else
{
// Begin fetching this bucket from disk
std::string blob_name = get_blob_name(map_name, key);
auto fetch_handle = lfs_access->fetch(blob_name);
results_in_progress[key] = fetch_handle;
}
}
return std::nullopt;
}
void MapIndex::store_data(const std::string& key, const std::string& value)
{
indexed_data.insert(key, std::move(value));
}
ExecutorIndex::ExecutorIndex(
const std::string& map_name_,
const std::string& strategy_prefix,
IndexDataStructure ds,
ExecutorId& id,
ccf::endpoints::CommandEndpointContext& ctx,
ccfapp::AbstractNodeContext& node_ctx,
IndexStream&& stream) :
Strategy(strategy_prefix),
map_name(map_name_),
data_structure(ds),
indexer_id(id),
endpoint_ctx(&ctx),
node_context(&node_ctx),
out_stream(std::move(stream)),
is_indexer_active(true)
{
if (kv::get_security_domain(map_name_) != kv::SecurityDomain::PUBLIC)
{
throw std::logic_error(fmt::format(
"This Strategy ({}) is currently only implemented for public tables, "
"so cannot be used for '{}'",
get_name(),
map_name_));
}
if (data_structure == MAP)
{
impl_index = std::make_unique<MapIndex>(
node_context->get_subsystem<ccf::indexing::AbstractLFSAccess>(),
map_name);
}
else if (data_structure == PREFIX_TREE)
{
impl_index = std::make_unique<PrefixTreeIndex>();
}
// create a detached stream pointer of the indexer
detached_stream =
ccf::grpc::detach_stream(ctx.rpc_ctx, std::move(out_stream), [this]() {
is_indexer_active = false;
});
}
void ExecutorIndex::handle_committed_transaction(
const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store)
{
auto tx = store->create_read_only_tx();
auto handle = tx.ro<Map>(map_name);
handle->foreach([this](const auto& k, const auto& v) {
externalexecutor::protobuf::IndexWork data;
externalexecutor::protobuf::IndexKeyValue* index_key_value =
data.mutable_key_value();
index_key_value->set_key(k);
index_key_value->set_value(v);
if (is_indexer_active)
{
// stream transactions to the indexer
if (!detached_stream->stream_msg(data))
{
is_indexer_active = false;
LOG_FAIL_FMT("Failed to stream request to indexer {}", indexer_id);
return false;
}
}
return true;
});
current_txid = tx_id;
}
std::optional<ccf::SeqNo> ExecutorIndex::next_requested()
{
return current_txid.seqno + 1;
}
void ExecutorIndex::store(const std::string& key, const std::string& value)
{
impl_index->store_data(key, value);
}
std::optional<std::string> ExecutorIndex::fetch(const std::string& key)
{
return impl_index->fetch_data(key);
}
}

Просмотреть файл

@ -0,0 +1,220 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ccf/common_endpoint_registry.h"
#include "ccf/ds/logger.h"
#include "ccf/indexing/strategy.h"
#include "ccf/kv/map.h"
#include "ccf/pal/locking.h"
#include "ds/lru.h"
#include "endpoints/grpc/grpc.h"
#include "executor_auth_policy.h"
#include "index.pb.h"
#include "indexing/lfs_interface.h"
#include "kv/kv_types.h"
namespace externalexecutor
{
class ExecutorIndex;
using MapStrategyPtr = std::shared_ptr<ExecutorIndex>;
using DetachedIndexStream =
ccf::grpc::DetachedStreamPtr<externalexecutor::protobuf::IndexWork>;
using IndexStream =
ccf::grpc::StreamPtr<externalexecutor::protobuf::IndexWork>;
using BucketValue = ccf::indexing::FetchResultPtr;
namespace
{
std::string get_blob_name(
const std::string& map_name, const std::string& key)
{
auto hex_key = ds::to_hex(key.begin(), key.end());
std::string blob_name = fmt::format("{}:{}", map_name, hex_key);
return blob_name;
}
} // namespace
class LRUIndexCache
{
public:
using Entry = std::pair<const std::string, std::string>;
using List = std::list<Entry>;
using Map = std::map<std::string, typename List::iterator>;
private:
ccf::pal::Mutex results_access;
// Entries are ordered by when they were most recently accessed, with most
// recent at the front
List entries_list;
// Maps from keys to iterators from entries_list, which must remain valid
// even when entries_list is modified
Map iter_map;
std::shared_ptr<ccf::indexing::AbstractLFSAccess> lfs_access;
size_t max_size;
const std::string map_name;
void flush_to_disk()
{
while (entries_list.size() > max_size)
{
const auto& least_recent_entry = entries_list.back();
auto key = least_recent_entry.first;
auto value = least_recent_entry.second;
std::string blob_name = get_blob_name(map_name, key);
ccf::indexing::LFSContents contents(value.begin(), value.end());
lfs_access->store(blob_name, std::move(contents));
iter_map.erase(least_recent_entry.first);
entries_list.pop_back();
}
}
public:
LRUIndexCache() {}
LRUIndexCache(
std::shared_ptr<ccf::indexing::AbstractLFSAccess> lfs_ptr,
size_t max_size,
const std::string& map) :
lfs_access(lfs_ptr),
max_size(max_size),
map_name(map)
{}
size_t size() const
{
return iter_map.size();
}
void set_max_size(size_t ms)
{
max_size = ms;
}
size_t get_max_size() const
{
return max_size;
}
std::optional<std::string> find(const std::string& k)
{
const auto it = iter_map.find(k);
if (it != iter_map.end())
{
return it->second->second;
}
return std::nullopt;
}
void insert(const std::string& k, const std::string& v)
{
std::lock_guard<ccf::pal::Mutex> guard(results_access);
auto it = iter_map.find(k);
if (it != iter_map.end())
{
// If it already exists, move to the front
auto& list_it = it->second;
entries_list.splice(entries_list.begin(), entries_list, list_it);
}
else
{
// Else add a new entry to both containers, and flush if necessary
entries_list.push_front(std::make_pair(k, v));
const auto list_it = entries_list.begin();
iter_map.emplace_hint(it, k, list_it);
flush_to_disk();
}
}
void clear()
{
entries_list.clear();
iter_map.clear();
}
};
enum IndexDataStructure
{
MAP,
PREFIX_TREE
};
class ImplIndex
{
public:
virtual std::optional<std::string> fetch_data(const std::string& key) = 0;
virtual void store_data(
const std::string& key, const std::string& value) = 0;
virtual ~ImplIndex() {}
};
class MapIndex : public ImplIndex
{
ccf::pal::Mutex results_access;
std::unordered_map<std::string, BucketValue> results_in_progress;
std::shared_ptr<ccf::indexing::AbstractLFSAccess> lfs_access;
LRUIndexCache indexed_data;
const std::string map_name;
public:
MapIndex() {}
MapIndex(
const std::shared_ptr<ccf::indexing::AbstractLFSAccess>& lfs_access_,
const std::string& map_name);
std::optional<std::string> fetch_data(const std::string& key) override;
void store_data(const std::string& key, const std::string& value) override;
};
class PrefixTreeIndex : public ImplIndex
{
public:
PrefixTreeIndex() {}
std::optional<std::string> fetch_data(const std::string& key) override
{
return std::nullopt;
};
void store_data(
const std::string& key, const std::string& value) override{};
};
class ExecutorIndex : public ccf::indexing::Strategy
{
protected:
const std::string map_name;
std::string strategy_name = "ExecutorIndex";
IndexDataStructure data_structure;
ccf::TxID current_txid = {};
ExecutorId indexer_id;
ccf::endpoints::CommandEndpointContext* endpoint_ctx;
ccfapp::AbstractNodeContext* node_context;
IndexStream out_stream;
std::unique_ptr<ImplIndex> impl_index = nullptr;
public:
bool is_indexer_active = false;
DetachedIndexStream detached_stream;
ExecutorIndex(
const std::string& map_name_,
const std::string& strategy_prefix,
IndexDataStructure ds,
ExecutorId& id,
ccf::endpoints::CommandEndpointContext& ctx,
ccfapp::AbstractNodeContext& node_context,
IndexStream&& stream);
void handle_committed_transaction(
const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store) override;
std::optional<ccf::SeqNo> next_requested() override;
void store(const std::string& key, const std::string& value);
std::optional<std::string> fetch(const std::string& key);
};
}

Просмотреть файл

@ -0,0 +1,78 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package externalexecutor.protobuf;
option optimize_for = LITE_RUNTIME;
// Service exposed by a CCF node for Executors/Indexers to interact with the
// Index stored on the CCF node. Communication with this service must be over
// TLS, authorising as a client cert that has previously been accepted by the
// ExecutorRegistration service.
service Index
{
// Installs index and registers with the given strategy name and table name.
// Streams Key-Value pairs of every transaction in the given table name.
// Stores indexed data in an unordered map datastructure or a prefix tree
// depending on the type of enum passed
rpc InstallAndSubscribe(IndexInstall) returns (stream IndexWork) {}
// Stores Indexer's data. Can only be invoked by the Indexer that installed
// the strategy
rpc StoreIndexedData(IndexPayload) returns (google.protobuf.Empty) {}
// Takes an input of Index Strategy name and a Key, returns values from the
// indexed data on the local CCF node This can be invoked by any registered
// executor
rpc GetIndexedData(IndexKey) returns (IndexValue) {}
rpc Unsubscribe(IndexStrategy) returns (google.protobuf.Empty) {}
}
message IndexKeyValue
{
bytes key = 1;
bytes value = 2;
}
message Subscribed {}
message Published {}
message IndexWork
{
oneof task
{
Subscribed subscribed = 1;
IndexKeyValue key_value = 2;
Published work_done = 3;
}
}
message IndexInstall
{
enum DataStructure {
MAP = 0;
PREFIX_TREE = 1;
}
string strategy_name = 1;
string map_name = 2;
DataStructure data_structure = 3;
}
message IndexKey
{
string strategy_name = 1;
bytes key = 2;
}
message IndexStrategy { string strategy_name = 1; }
message IndexValue { bytes value = 1; }
message IndexPayload
{
string strategy_name = 1;
bytes key = 2;
bytes value = 3;
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "ccf/endpoint_context.h"
#include "ccf/odata_error.h"
#include "message.h"
#include "node/rpc/rpc_context_impl.h"

Просмотреть файл

@ -17,7 +17,7 @@ namespace ccf::grpc
static constexpr size_t message_frame_length =
sizeof(CompressedFlag) + sizeof(MessageLength);
MessageLength read_message_frame(const uint8_t*& data, size_t& size)
static MessageLength read_message_frame(const uint8_t*& data, size_t& size)
{
auto compressed_flag = serialized::read<CompressedFlag>(data, size);
if (compressed_flag >= 1)
@ -31,7 +31,8 @@ namespace ccf::grpc
return ntohl(serialized::read<MessageLength>(data, size));
}
void write_message_frame(uint8_t*& data, size_t& size, size_t message_size)
static void write_message_frame(
uint8_t*& data, size_t& size, size_t message_size)
{
CompressedFlag compressed_flag = 0;
serialized::write(data, size, compressed_flag);
@ -40,7 +41,7 @@ namespace ccf::grpc
}
template <typename T>
std::vector<uint8_t> serialise_grpc_message(T proto_data)
static std::vector<uint8_t> serialise_grpc_message(T proto_data)
{
const auto data_length = proto_data.ByteSizeLong();
size_t r_size = ccf::grpc::impl::message_frame_length + data_length;
@ -60,7 +61,8 @@ namespace ccf::grpc
}
template <typename T>
std::vector<uint8_t> serialise_grpc_messages(const std::vector<T>& proto_data)
static std::vector<uint8_t> serialise_grpc_messages(
const std::vector<T>& proto_data)
{
size_t r_size = std::accumulate(
proto_data.begin(),

Просмотреть файл

@ -150,9 +150,9 @@ class LoggingExecutor:
)
if "log/private" in request.uri:
table = "private:records"
elif "log/public" in request.uri:
table = "records"
elif "log/public" in request.uri:
table = "public:records"
else:
LOG.error(f"Unhandled request: {request.method} {request.uri}")
stub.EndTx(response)

Просмотреть файл

@ -25,6 +25,12 @@ import executor_registration_pb2 as ExecutorRegistration
# pylint: disable=import-error
import executor_registration_pb2_grpc as RegistrationService
# pylint: disable=import-error
import index_pb2 as Index
# pylint: disable=import-error
import index_pb2_grpc as IndexService
# pylint: disable=no-name-in-module
from google.protobuf.empty_pb2 import Empty as Empty
@ -421,6 +427,105 @@ def test_async_streaming(network, args):
return network
@reqs.description("Test index API")
def test_index_api(network, args):
primary, _ = network.find_primary()
def add_kv_entries(network):
logging_executor = LoggingExecutor(primary)
supported_endpoints = logging_executor.supported_endpoints
credentials = register_new_executor(
primary, network, supported_endpoints=supported_endpoints
)
logging_executor.credentials = credentials
log_id = 14
with executor_thread(logging_executor):
with primary.client() as c:
for _ in range(3):
r = c.post(
"/app/log/public",
{"id": log_id, "msg": "hello_world_" + str(log_id)},
)
assert r.status_code == 200
log_id = log_id + 1
add_kv_entries(network)
credentials = register_new_executor(primary, network)
with grpc.secure_channel(
target=f"{primary.get_public_rpc_host()}:{primary.get_public_rpc_port()}",
credentials=credentials,
) as channel:
data = queue.Queue()
subscription_started = threading.Event()
def InstallandSub():
sub_credentials = register_new_executor(primary, network)
with grpc.secure_channel(
target=f"{primary.get_public_rpc_host()}:{primary.get_public_rpc_port()}",
credentials=sub_credentials,
) as subscriber_channel:
in_stub = IndexService.IndexStub(subscriber_channel)
for work in in_stub.InstallAndSubscribe(
Index.IndexInstall(
strategy_name="TestStrategy",
map_name="public:records",
data_structure=Index.IndexInstall.MAP,
)
):
if work.HasField("subscribed"):
subscription_started.set()
LOG.info("subscribed to a Index stream")
continue
elif work.HasField("work_done"):
LOG.info("work done")
break
assert work.HasField("key_value")
LOG.info("Has key value")
result = work.key_value
data.put(result)
th = threading.Thread(target=InstallandSub)
th.start()
# Wait for subscription thread to actually start, and the server has confirmed it is ready
assert subscription_started.wait(timeout=3), "Subscription wait timed out"
time.sleep(1)
index_stub = IndexService.IndexStub(channel)
while data.qsize() > 0:
LOG.info("storing indexed data")
res = data.get()
index_stub.StoreIndexedData(
Index.IndexPayload(
strategy_name="TestStrategy",
key=res.key,
value=res.value,
)
)
LOG.info("Fetching indexed data")
log_id = 14
for _ in range(3):
result = index_stub.GetIndexedData(
Index.IndexKey(
strategy_name="TestStrategy", key=log_id.to_bytes(8, "big")
)
)
assert result.value.decode("utf-8") == "hello_world_" + str(log_id)
log_id = log_id + 1
index_stub.Unsubscribe(Index.IndexStrategy(strategy_name="TestStrategy"))
th.join()
return network
@reqs.description("Test multiple executors that support the same endpoint")
def test_multiple_executors(network, args):
primary, _ = network.find_primary()
@ -482,6 +587,7 @@ def test_logging_executor(network, args):
r = c.post("/app/log/public", {"id": log_id, "msg": log_msg})
assert r.status_code == 200
r = c.get(f"/app/log/public?id={log_id}")
assert r.status_code == 200
@ -550,6 +656,7 @@ def run(args):
network = test_streaming(network, args)
network = test_async_streaming(network, args)
network = test_logging_executor(network, args)
network = test_index_api(network, args)
network = test_multiple_executors(network, args)