[c++ grpc] Add wait_callback helper

The wait_callback helper can be used with the generated gRPC proxies to
synchronously wait for the proxy callback to be invoked.

* helloworld updated to use wait_callback
* gRPC compat client updated to use wait_callback
* unit tests added
This commit is contained in:
Christopher Warrington 2017-05-30 19:18:58 -07:00
Родитель d08d1a889c
Коммит 9b38414874
9 изменённых файлов: 363 добавлений и 126 удалений

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#pragma once
#include <bond/core/exception.h>
namespace bond { namespace ext { namespace gRPC {
/// @brief Exception thrown to indicate that a callback has been invoked
/// multiple times when only one invocation is expected.
class MultipleInvocationException : public Exception
{
public:
MultipleInvocationException() : Exception("The callback was invoked more than once.") { }
};
} } } // namespace bond::ext::gRPC

Просмотреть файл

@ -0,0 +1,126 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#pragma once
#include <bond/core/bonded.h>
#include <bond/ext/grpc/exception.h>
#include <grpc++/impl/codegen/status.h>
#include <boost/optional.hpp>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <tuple>
namespace bond { namespace ext { namespace gRPC {
/// @brief A callback type that can be manually waited upon.
///
/// The type can be used to synchronously get the result of invoking an
/// async proxy method.
///
/// The wait() member function can be used to wait until the callback has
/// been called. Then, the status() and response() member functions can be
/// called to inspect the results.
template <typename TResponse>
class wait_callback
{
public:
wait_callback() : _impl(std::make_shared<impl>()) { }
/// @brief Records the response and status.
///
/// @exception MultipleInvocationException thrown if the callback (or a
/// copy of the callback) is invoked more than once.
void operator()(const bond::bonded<TResponse>& response, const grpc::Status& status)
{
std::unique_lock<std::mutex> lock(_impl->_m);
if (!_impl->_results)
{
_impl->_results.emplace(response, status);
// Drop the lock before notifying so we don't wake someone up to
// then have them wait on the lock.
lock.unlock();
_impl->_cv.notify_all();
}
else
{
throw MultipleInvocationException();
}
}
/// @brief Waits for this to have been invoked.
void wait() const
{
std::unique_lock<std::mutex> lock(_impl->_m);
_impl->_cv.wait(lock, [this]() { return static_cast<bool>(_impl->_results); });
}
/// @brief Waits at least \p timeout for this to have been invoked.
///
/// @param timeout the minimum amount of time to wait.
///
/// @return \p true if a callback was invoked. \p false if the timeout
/// occured.
template <typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& timeout) const
{
std::unique_lock<std::mutex> lock(_impl->_m);
return _impl->_cv.wait_for(lock, timeout, [this]() { return static_cast<bool>(_impl->_results); });
}
/// @brief Gets the response.
///
/// @warning Blocks until this has been invoked.
const bond::bonded<TResponse>& response() const
{
wait();
return std::get<0>(_impl->_results.get());
}
/// @brief Gets the status.
///
/// @warning Blocks until this has been invoked.
const grpc::Status& status() const
{
wait();
return std::get<1>(_impl->_results.get());
}
private:
/// The interesting guts of wait_callback. We use an impl class so that
/// wait_callback can be copied and all the copies affect the same underlying
/// state.
struct impl
{
impl() = default;
impl(const impl&) = delete;
impl(impl&&) = delete;
impl& operator=(const impl&) = delete;
impl& operator=(impl&&) = delete;
/// mutex to lock the shared state
std::mutex _m;
/// condition variable used to signal anyone waiting
std::condition_variable _cv;
/// The response and status, but more importantly, doubles as a flag
/// indicating whether a callback has been invoked yet. If this is
/// empty, no callback has been invoked yet. If non-empty, a
/// callback has already been invoked.
boost::optional<std::tuple<bond::bonded<TResponse>, grpc::Status>> _results;
};
/// shared_ptr to the actual state.
std::shared_ptr<impl> _impl;
};
/// @example wait_callback_example.cpp
///
/// This is a brief example showing how wait_callback can be used to
/// synchronously get the result of invoking an async proxy method.
} } } // bond::extgrpc

Просмотреть файл

@ -8,9 +8,7 @@
#include <bond/ext/grpc/io_manager.h>
#include <bond/ext/grpc/thread_pool.h>
#include <bond/ext/detail/event.h>
#include <boost/optional/optional.hpp>
#include <bond/ext/grpc/wait_callback.h>
#include <chrono>
#include <memory>
@ -24,65 +22,6 @@ using grpc::Status;
using namespace PingPongNS;
template <typename TResponse>
class wait_callback
{
public:
wait_callback() :
_response(),
_status(),
_event()
{ }
wait_callback(const wait_callback&) = delete;
wait_callback(wait_callback&&) = delete;
wait_callback& operator=(const wait_callback&) = delete;
wait_callback& operator=(wait_callback&&) = delete;
std::function<void(const bond::bonded<TResponse>&, const ::grpc::Status&)> callback()
{
return std::function<void(const bond::bonded<TResponse>&, const ::grpc::Status&)>(
[this](const bond::bonded<TResponse>& response, const ::grpc::Status& status)
{
_response.emplace(response);
_status.emplace(status);
_event.set();
});
}
operator std::function<void(const bond::bonded<TResponse>&, const ::grpc::Status&)>()
{
return callback();
}
void wait()
{
_event.wait();
}
template <typename Rep, typename Period>
bool wait(std::chrono::duration<Rep, Period> timeout)
{
return _event.wait(timeout);
}
const bond::bonded<TResponse>& response() const
{
return _response.get();
}
const grpc::Status& status() const
{
return _status.get();
}
private:
boost::optional<bond::bonded<TResponse>> _response;
boost::optional<grpc::Status> _status;
bond::ext::detail::event _event;
};
int main()
{
auto ioManager = std::make_shared<bond::ext::gRPC::io_manager>();
@ -112,9 +51,9 @@ int main()
printf("Sending\n");
fflush(stdout);
wait_callback<PingResponse> cb;
bond::ext::gRPC::wait_callback<PingResponse> cb;
client.AsyncPing(&context, bond::bonded<PingRequest>{ request }, cb);
bool gotResponse = cb.wait(std::chrono::seconds(1));
bool gotResponse = cb.wait_for(std::chrono::seconds(1));
if (!gotResponse)
{
@ -150,9 +89,9 @@ int main()
request.Payload = "error" + std::to_string(i);
request.Action = PingAction::Error;
wait_callback<PingResponse> cb;
bond::ext::gRPC::wait_callback<PingResponse> cb;
client.AsyncPing(&context, bond::bonded<PingRequest> { request }, cb);
bool gotResponse = cb.wait(std::chrono::seconds(1));
bool gotResponse = cb.wait_for(std::chrono::seconds(1));
if (!gotResponse)
{

Просмотреть файл

@ -41,5 +41,7 @@ target_link_libraries (grpc_test_common PUBLIC
${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}
grpc++)
add_unit_test (thread_pool.cpp)
add_unit_test (io_manager.cpp)
add_unit_test (thread_pool.cpp)
add_unit_test (wait_callback.cpp)
target_link_libraries(wait_callback PRIVATE bond_apply)

Просмотреть файл

@ -0,0 +1,167 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// TODO: move unit_test_framework.h to cpp/test/inc
#include "../core/unit_test_framework.h"
#include <bond/core/bond.h>
#include <bond/core/bond_reflection.h>
#include <bond/ext/detail/event.h>
#include <bond/ext/grpc/wait_callback.h>
#include <bond/protocol/compact_binary.h>
#include <bond/stream/output_buffer.h>
#include <atomic>
#include <thread>
namespace wait_callback_tests
{
using wait_callbackBox = bond::ext::gRPC::wait_callback<bond::Box<int>>;
static const int ANY_INT_VALUE = 100;
static bond::bonded<bond::Box<int>> anyBondedValue;
static grpc::Status anyStatus;
static bond::bonded<bond::Box<int>> MakeAnyBonded()
{
bond::Box<int> boxedInt;
boxedInt.value = ANY_INT_VALUE;
bond::OutputBuffer ob;
bond::CompactBinaryWriter<bond::OutputBuffer> writer(ob);
bond::Serialize(boxedInt, writer);
bond::blob buffer = ob.GetBuffer();
bond::InputBuffer ib(buffer);
bond::CompactBinaryReader<bond::InputBuffer> reader(ib);
return bond::bonded<bond::Box<int>>(reader);
}
static void CallbackCapturesValues()
{
wait_callbackBox cb;
cb(anyBondedValue, anyStatus);
UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE);
UT_AssertIsTrue(cb.status().ok());
}
static void SubsequentInvocationThrow()
{
wait_callbackBox cb;
cb(anyBondedValue, anyStatus);
UT_AssertThrows(cb(anyBondedValue, grpc::Status::CANCELLED), bond::ext::gRPC::MultipleInvocationException);
UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE);
UT_AssertIsTrue(cb.status().ok());
}
static void SubsequentInvocationOnCopyThrow()
{
wait_callbackBox cb;
wait_callbackBox otherCb(cb);
cb(anyBondedValue, anyStatus);
UT_AssertThrows(otherCb(anyBondedValue, grpc::Status::CANCELLED), bond::ext::gRPC::MultipleInvocationException);
UT_AssertIsTrue(otherCb.response().Deserialize().value == ANY_INT_VALUE);
UT_AssertIsTrue(otherCb.status().ok());
}
static void CanBeConvertedToStdFunction()
{
wait_callbackBox cb;
std::function<void(const bond::bonded<bond::Box<int>>&, const grpc::Status&)> f = cb;
f(anyBondedValue, anyStatus);
UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE);
UT_AssertIsTrue(cb.status().ok());
}
static void CopiesSeeSameValues()
{
wait_callbackBox cb;
wait_callbackBox otherCb(cb);
cb(anyBondedValue, anyStatus);
UT_AssertIsTrue(otherCb.response().Deserialize().value == ANY_INT_VALUE);
UT_AssertIsTrue(otherCb.status().ok());
}
static void AsignmentSeesSameValues()
{
wait_callbackBox cb;
cb(anyBondedValue, anyStatus);
wait_callbackBox otherCb;
otherCb(anyBondedValue, grpc::Status::CANCELLED);
UT_AssertIsTrue(!otherCb.status().ok());
otherCb = cb;
UT_AssertIsTrue(otherCb.status().ok());
}
static void WaitReturnsTrueAfterCBInvoked()
{
wait_callbackBox cb;
bool wasInvoked = cb.wait_for(std::chrono::milliseconds(0));
UT_AssertIsFalse(wasInvoked);
cb(anyBondedValue, anyStatus);
wasInvoked = cb.wait_for(std::chrono::milliseconds(0));
UT_AssertIsTrue(wasInvoked);
}
static void WaitingThreadGetsNotified()
{
wait_callbackBox cb;
bond::ext::detail::event threadStarted;
std::atomic<bool> wasInvoked(false);
std::thread t([&cb, &threadStarted, &wasInvoked]()
{
threadStarted.set();
wasInvoked = cb.wait_for(std::chrono::seconds(30));
});
// This is a clumsy attempt to get the thread into the wait_for method
// before invoking the callback.
bool wasStarted = threadStarted.wait(std::chrono::seconds(30));
UT_AssertIsTrue(wasStarted);
cb(anyBondedValue, anyStatus);
t.join();
UT_AssertIsTrue(wasInvoked);
}
static void Initialize()
{
anyBondedValue = MakeAnyBonded();
anyStatus = grpc::Status::OK;
UnitTestSuite suite("wait_callback");
suite.AddTestCase(&CallbackCapturesValues, "CallbackCapturesValues");
suite.AddTestCase(&SubsequentInvocationThrow, "SubsequentInvocationThrow");
suite.AddTestCase(&SubsequentInvocationOnCopyThrow, "SubsequentInvocationOnCopyThrow");
suite.AddTestCase(&CanBeConvertedToStdFunction, "CanBeConvertedToStdFunction");
suite.AddTestCase(&CopiesSeeSameValues, "CopiesSeeSameValues");
suite.AddTestCase(&AsignmentSeesSameValues, "AsignmentSeesSameValues");
suite.AddTestCase(&WaitReturnsTrueAfterCBInvoked, "WaitReturnsTrueAfterCBInvoked");
suite.AddTestCase(&WaitingThreadGetsNotified, "WaitingThreadGetsNotified");
}
}
bool init_unit_test()
{
wait_callback_tests::Initialize();
return true;
}

Просмотреть файл

@ -131,6 +131,7 @@ if (Haskell_PANDOC_EXECUTABLE AND Doxygen_EXECUTABLE)
doxygen/bond_layout.xml
doxygen/bond_reference.css
doxygen/mainpage.md
doxygen/examples/wait_callback_example.cpp
${cpp_headers})
set (doxygen_output_dir

Просмотреть файл

@ -645,7 +645,7 @@ WARN_LOGFILE =
# directories like "/usr/src/myproject". Separate the files or directories
# with spaces.
INPUT = doxygen/mainpage.md ../cpp/inc/bond
INPUT = doxygen/mainpage.md ../cpp/inc/bond/
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is
@ -705,7 +705,7 @@ EXCLUDE_SYMBOLS =
# directories that contain example code fragments that are included (see
# the \include command).
EXAMPLE_PATH =
EXAMPLE_PATH = doxygen/examples/
# If the value of the EXAMPLE_PATH tag contains directories, you can use the
# EXAMPLE_PATTERNS tag to specify one or more wildcard pattern (like *.cpp

Просмотреть файл

@ -0,0 +1,16 @@
#include <bond/ext/grpc/wait_callback.h>
// ...
int main()
{
Example::Client client(/* ... */);
bond::ext::gRPC::wait_callback<ExampleResponse> cb;
client.AsyncExampleMethod(/* ... */, cb);
cb.wait();
if (cb.status().ok())
{
DoSomeThingWith(cb.response());
}
}

Просмотреть файл

@ -1,14 +1,12 @@
#include "helloworld_grpc.h"
#include "helloworld_types.h"
// event.h needed for test purposes
#include <bond/ext/detail/event.h>
#include <bond/ext/grpc/io_manager.h>
#include <bond/ext/grpc/server.h>
#include <bond/ext/grpc/server_builder.h>
#include <bond/ext/grpc/unary_call.h>
#include <bond/ext/grpc/thread_pool.h>
#include <bond/ext/grpc/unary_call.h>
#include <bond/ext/grpc/wait_callback.h>
#include <chrono>
#include <functional>
@ -21,9 +19,6 @@ using grpc::ClientContext;
using grpc::ServerBuilder;
using grpc::Status;
using bond::ext::detail::event;
using bond::ext::gRPC::io_manager;
using namespace helloworld;
// Logic and data behind the server's behavior.
@ -31,8 +26,8 @@ class GreeterServiceImpl final : public Greeter::Service
{
void SayHello(
bond::ext::gRPC::unary_call<
bond::bonded<HelloRequest>,
bond::bonded<HelloReply>> call) override
bond::bonded<HelloRequest>,
bond::bonded<HelloReply>> call) override
{
HelloRequest request = call.request().Deserialize();
@ -43,37 +38,9 @@ class GreeterServiceImpl final : public Greeter::Service
}
};
void printAndSet(
event* print_event,
bool* isCorrectResponse,
const bond::bonded<HelloReply>& response,
const Status& status)
{
*isCorrectResponse = false;
if(status.ok())
{
const std::string& message = response.Deserialize().message;
if (message.compare("hello world") == 0)
{
std::cout << "Correct response: " << message;
*isCorrectResponse = true;
}
else
{
std::cout << "Wrong response";
*isCorrectResponse = false;
}
}
print_event->set();
}
int main()
{
auto ioManager = std::make_shared<io_manager>();
auto ioManager = std::make_shared<bond::ext::gRPC::io_manager>();
auto threadPool = std::make_shared<bond::ext::gRPC::thread_pool>();
GreeterServiceImpl service;
@ -96,31 +63,32 @@ int main()
HelloRequest request;
request.name = user;
bond::bonded<HelloRequest> req(request);
event print_event;
bool isCorrectResponse;
std::function<void(const bond::bonded<HelloReply>&, const Status&)> f_print =
[&print_event, &isCorrectResponse](bond::bonded<HelloReply> response, Status status)
{
printAndSet(&print_event, &isCorrectResponse, response, status);
};
bond::ext::gRPC::wait_callback<HelloReply> cb;
greeter.AsyncSayHello(&context, bond::bonded<HelloRequest>{request}, cb);
greeter.AsyncSayHello(&context, req, f_print);
bool waitResult = print_event.wait(std::chrono::seconds(10));
bool waitResult = cb.wait_for(std::chrono::seconds(10));
if (!waitResult)
{
std::cout << "timeout ocurred";
}
if (waitResult && isCorrectResponse)
{
return 0;
}
else
{
return 1;
}
else if (!cb.status().ok())
{
std::cout << "request failed";
return 1;
}
HelloReply reply;
cb.response().Deserialize(reply);
if (reply.message.compare("hello world") != 0)
{
std::cout << "Wrong response: " << reply.message;
return 1;
}
std::cout << "Correct response: " << reply.message;
return 0;
}