From 9b384148749388163da4283692f8e8fc1f3dd69b Mon Sep 17 00:00:00 2001 From: Christopher Warrington Date: Tue, 30 May 2017 19:18:58 -0700 Subject: [PATCH] [c++ grpc] Add wait_callback helper The wait_callback helper can be used with the generated gRPC proxies to synchronously wait for the proxy callback to be invoked. * helloworld updated to use wait_callback * gRPC compat client updated to use wait_callback * unit tests added --- cpp/inc/bond/ext/grpc/exception.h | 18 ++ cpp/inc/bond/ext/grpc/wait_callback.h | 126 +++++++++++++ cpp/test/compat/grpc/pingpong_client.cpp | 71 +------- cpp/test/grpc/CMakeLists.txt | 4 +- cpp/test/grpc/wait_callback.cpp | 167 ++++++++++++++++++ doc/CMakeLists.txt | 1 + doc/doxygen/bond.doxygen | 4 +- .../examples/wait_callback_example.cpp | 16 ++ examples/cpp/grpc/helloworld/helloworld.cpp | 82 +++------ 9 files changed, 363 insertions(+), 126 deletions(-) create mode 100644 cpp/inc/bond/ext/grpc/exception.h create mode 100644 cpp/inc/bond/ext/grpc/wait_callback.h create mode 100644 cpp/test/grpc/wait_callback.cpp create mode 100644 doc/doxygen/examples/wait_callback_example.cpp diff --git a/cpp/inc/bond/ext/grpc/exception.h b/cpp/inc/bond/ext/grpc/exception.h new file mode 100644 index 00000000..44024701 --- /dev/null +++ b/cpp/inc/bond/ext/grpc/exception.h @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#pragma once + +#include + +namespace bond { namespace ext { namespace gRPC { + + /// @brief Exception thrown to indicate that a callback has been invoked + /// multiple times when only one invocation is expected. + class MultipleInvocationException : public Exception + { + public: + MultipleInvocationException() : Exception("The callback was invoked more than once.") { } + }; + +} } } // namespace bond::ext::gRPC diff --git a/cpp/inc/bond/ext/grpc/wait_callback.h b/cpp/inc/bond/ext/grpc/wait_callback.h new file mode 100644 index 00000000..c337c6c8 --- /dev/null +++ b/cpp/inc/bond/ext/grpc/wait_callback.h @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +namespace bond { namespace ext { namespace gRPC { + +/// @brief A callback type that can be manually waited upon. +/// +/// The type can be used to synchronously get the result of invoking an +/// async proxy method. +/// +/// The wait() member function can be used to wait until the callback has +/// been called. Then, the status() and response() member functions can be +/// called to inspect the results. +template +class wait_callback +{ +public: + wait_callback() : _impl(std::make_shared()) { } + + /// @brief Records the response and status. + /// + /// @exception MultipleInvocationException thrown if the callback (or a + /// copy of the callback) is invoked more than once. + void operator()(const bond::bonded& response, const grpc::Status& status) + { + std::unique_lock lock(_impl->_m); + if (!_impl->_results) + { + _impl->_results.emplace(response, status); + + // Drop the lock before notifying so we don't wake someone up to + // then have them wait on the lock. + lock.unlock(); + _impl->_cv.notify_all(); + } + else + { + throw MultipleInvocationException(); + } + } + + /// @brief Waits for this to have been invoked. + void wait() const + { + std::unique_lock lock(_impl->_m); + _impl->_cv.wait(lock, [this]() { return static_cast(_impl->_results); }); + } + + /// @brief Waits at least \p timeout for this to have been invoked. + /// + /// @param timeout the minimum amount of time to wait. + /// + /// @return \p true if a callback was invoked. \p false if the timeout + /// occured. + template + bool wait_for(const std::chrono::duration& timeout) const + { + std::unique_lock lock(_impl->_m); + return _impl->_cv.wait_for(lock, timeout, [this]() { return static_cast(_impl->_results); }); + } + + /// @brief Gets the response. + /// + /// @warning Blocks until this has been invoked. + const bond::bonded& response() const + { + wait(); + return std::get<0>(_impl->_results.get()); + } + + /// @brief Gets the status. + /// + /// @warning Blocks until this has been invoked. + const grpc::Status& status() const + { + wait(); + return std::get<1>(_impl->_results.get()); + } + +private: + /// The interesting guts of wait_callback. We use an impl class so that + /// wait_callback can be copied and all the copies affect the same underlying + /// state. + struct impl + { + impl() = default; + impl(const impl&) = delete; + impl(impl&&) = delete; + impl& operator=(const impl&) = delete; + impl& operator=(impl&&) = delete; + + /// mutex to lock the shared state + std::mutex _m; + /// condition variable used to signal anyone waiting + std::condition_variable _cv; + /// The response and status, but more importantly, doubles as a flag + /// indicating whether a callback has been invoked yet. If this is + /// empty, no callback has been invoked yet. If non-empty, a + /// callback has already been invoked. + boost::optional, grpc::Status>> _results; + }; + + /// shared_ptr to the actual state. + std::shared_ptr _impl; +}; + +/// @example wait_callback_example.cpp +/// +/// This is a brief example showing how wait_callback can be used to +/// synchronously get the result of invoking an async proxy method. + +} } } // bond::extgrpc diff --git a/cpp/test/compat/grpc/pingpong_client.cpp b/cpp/test/compat/grpc/pingpong_client.cpp index 00f7773d..f8bb356e 100644 --- a/cpp/test/compat/grpc/pingpong_client.cpp +++ b/cpp/test/compat/grpc/pingpong_client.cpp @@ -8,9 +8,7 @@ #include #include -#include - -#include +#include #include #include @@ -24,65 +22,6 @@ using grpc::Status; using namespace PingPongNS; -template -class wait_callback -{ -public: - wait_callback() : - _response(), - _status(), - _event() - { } - - wait_callback(const wait_callback&) = delete; - wait_callback(wait_callback&&) = delete; - wait_callback& operator=(const wait_callback&) = delete; - wait_callback& operator=(wait_callback&&) = delete; - - std::function&, const ::grpc::Status&)> callback() - { - return std::function&, const ::grpc::Status&)>( - [this](const bond::bonded& response, const ::grpc::Status& status) - { - _response.emplace(response); - _status.emplace(status); - _event.set(); - }); - } - - operator std::function&, const ::grpc::Status&)>() - { - return callback(); - } - - void wait() - { - _event.wait(); - } - - template - bool wait(std::chrono::duration timeout) - { - return _event.wait(timeout); - } - - const bond::bonded& response() const - { - return _response.get(); - } - - const grpc::Status& status() const - { - return _status.get(); - } - -private: - boost::optional> _response; - boost::optional _status; - - bond::ext::detail::event _event; -}; - int main() { auto ioManager = std::make_shared(); @@ -112,9 +51,9 @@ int main() printf("Sending\n"); fflush(stdout); - wait_callback cb; + bond::ext::gRPC::wait_callback cb; client.AsyncPing(&context, bond::bonded{ request }, cb); - bool gotResponse = cb.wait(std::chrono::seconds(1)); + bool gotResponse = cb.wait_for(std::chrono::seconds(1)); if (!gotResponse) { @@ -150,9 +89,9 @@ int main() request.Payload = "error" + std::to_string(i); request.Action = PingAction::Error; - wait_callback cb; + bond::ext::gRPC::wait_callback cb; client.AsyncPing(&context, bond::bonded { request }, cb); - bool gotResponse = cb.wait(std::chrono::seconds(1)); + bool gotResponse = cb.wait_for(std::chrono::seconds(1)); if (!gotResponse) { diff --git a/cpp/test/grpc/CMakeLists.txt b/cpp/test/grpc/CMakeLists.txt index db6dd4e9..cdff7587 100644 --- a/cpp/test/grpc/CMakeLists.txt +++ b/cpp/test/grpc/CMakeLists.txt @@ -41,5 +41,7 @@ target_link_libraries (grpc_test_common PUBLIC ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} grpc++) -add_unit_test (thread_pool.cpp) add_unit_test (io_manager.cpp) +add_unit_test (thread_pool.cpp) +add_unit_test (wait_callback.cpp) +target_link_libraries(wait_callback PRIVATE bond_apply) diff --git a/cpp/test/grpc/wait_callback.cpp b/cpp/test/grpc/wait_callback.cpp new file mode 100644 index 00000000..a87972ce --- /dev/null +++ b/cpp/test/grpc/wait_callback.cpp @@ -0,0 +1,167 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +// TODO: move unit_test_framework.h to cpp/test/inc +#include "../core/unit_test_framework.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace wait_callback_tests +{ + using wait_callbackBox = bond::ext::gRPC::wait_callback>; + + static const int ANY_INT_VALUE = 100; + static bond::bonded> anyBondedValue; + static grpc::Status anyStatus; + + static bond::bonded> MakeAnyBonded() + { + bond::Box boxedInt; + boxedInt.value = ANY_INT_VALUE; + + bond::OutputBuffer ob; + bond::CompactBinaryWriter writer(ob); + bond::Serialize(boxedInt, writer); + + bond::blob buffer = ob.GetBuffer(); + + bond::InputBuffer ib(buffer); + bond::CompactBinaryReader reader(ib); + + return bond::bonded>(reader); + } + + static void CallbackCapturesValues() + { + wait_callbackBox cb; + cb(anyBondedValue, anyStatus); + + UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE); + UT_AssertIsTrue(cb.status().ok()); + } + + static void SubsequentInvocationThrow() + { + wait_callbackBox cb; + cb(anyBondedValue, anyStatus); + + UT_AssertThrows(cb(anyBondedValue, grpc::Status::CANCELLED), bond::ext::gRPC::MultipleInvocationException); + + UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE); + UT_AssertIsTrue(cb.status().ok()); + } + + static void SubsequentInvocationOnCopyThrow() + { + wait_callbackBox cb; + wait_callbackBox otherCb(cb); + cb(anyBondedValue, anyStatus); + + UT_AssertThrows(otherCb(anyBondedValue, grpc::Status::CANCELLED), bond::ext::gRPC::MultipleInvocationException); + + UT_AssertIsTrue(otherCb.response().Deserialize().value == ANY_INT_VALUE); + UT_AssertIsTrue(otherCb.status().ok()); + } + + static void CanBeConvertedToStdFunction() + { + wait_callbackBox cb; + std::function>&, const grpc::Status&)> f = cb; + + f(anyBondedValue, anyStatus); + + UT_AssertIsTrue(cb.response().Deserialize().value == ANY_INT_VALUE); + UT_AssertIsTrue(cb.status().ok()); + } + + static void CopiesSeeSameValues() + { + wait_callbackBox cb; + wait_callbackBox otherCb(cb); + + cb(anyBondedValue, anyStatus); + + UT_AssertIsTrue(otherCb.response().Deserialize().value == ANY_INT_VALUE); + UT_AssertIsTrue(otherCb.status().ok()); + } + + static void AsignmentSeesSameValues() + { + wait_callbackBox cb; + cb(anyBondedValue, anyStatus); + + wait_callbackBox otherCb; + otherCb(anyBondedValue, grpc::Status::CANCELLED); + + UT_AssertIsTrue(!otherCb.status().ok()); + + otherCb = cb; + UT_AssertIsTrue(otherCb.status().ok()); + } + + static void WaitReturnsTrueAfterCBInvoked() + { + wait_callbackBox cb; + + bool wasInvoked = cb.wait_for(std::chrono::milliseconds(0)); + UT_AssertIsFalse(wasInvoked); + + cb(anyBondedValue, anyStatus); + wasInvoked = cb.wait_for(std::chrono::milliseconds(0)); + UT_AssertIsTrue(wasInvoked); + } + + static void WaitingThreadGetsNotified() + { + wait_callbackBox cb; + bond::ext::detail::event threadStarted; + std::atomic wasInvoked(false); + + + std::thread t([&cb, &threadStarted, &wasInvoked]() + { + threadStarted.set(); + wasInvoked = cb.wait_for(std::chrono::seconds(30)); + }); + + // This is a clumsy attempt to get the thread into the wait_for method + // before invoking the callback. + bool wasStarted = threadStarted.wait(std::chrono::seconds(30)); + UT_AssertIsTrue(wasStarted); + + cb(anyBondedValue, anyStatus); + t.join(); + + UT_AssertIsTrue(wasInvoked); + } + + static void Initialize() + { + anyBondedValue = MakeAnyBonded(); + anyStatus = grpc::Status::OK; + + UnitTestSuite suite("wait_callback"); + suite.AddTestCase(&CallbackCapturesValues, "CallbackCapturesValues"); + suite.AddTestCase(&SubsequentInvocationThrow, "SubsequentInvocationThrow"); + suite.AddTestCase(&SubsequentInvocationOnCopyThrow, "SubsequentInvocationOnCopyThrow"); + suite.AddTestCase(&CanBeConvertedToStdFunction, "CanBeConvertedToStdFunction"); + suite.AddTestCase(&CopiesSeeSameValues, "CopiesSeeSameValues"); + suite.AddTestCase(&AsignmentSeesSameValues, "AsignmentSeesSameValues"); + suite.AddTestCase(&WaitReturnsTrueAfterCBInvoked, "WaitReturnsTrueAfterCBInvoked"); + suite.AddTestCase(&WaitingThreadGetsNotified, "WaitingThreadGetsNotified"); + } +} + +bool init_unit_test() +{ + wait_callback_tests::Initialize(); + return true; +} diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index 3dd95c17..0a4938c3 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -131,6 +131,7 @@ if (Haskell_PANDOC_EXECUTABLE AND Doxygen_EXECUTABLE) doxygen/bond_layout.xml doxygen/bond_reference.css doxygen/mainpage.md + doxygen/examples/wait_callback_example.cpp ${cpp_headers}) set (doxygen_output_dir diff --git a/doc/doxygen/bond.doxygen b/doc/doxygen/bond.doxygen index 3a625fdb..adfe347b 100644 --- a/doc/doxygen/bond.doxygen +++ b/doc/doxygen/bond.doxygen @@ -645,7 +645,7 @@ WARN_LOGFILE = # directories like "/usr/src/myproject". Separate the files or directories # with spaces. -INPUT = doxygen/mainpage.md ../cpp/inc/bond +INPUT = doxygen/mainpage.md ../cpp/inc/bond/ # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is @@ -705,7 +705,7 @@ EXCLUDE_SYMBOLS = # directories that contain example code fragments that are included (see # the \include command). -EXAMPLE_PATH = +EXAMPLE_PATH = doxygen/examples/ # If the value of the EXAMPLE_PATH tag contains directories, you can use the # EXAMPLE_PATTERNS tag to specify one or more wildcard pattern (like *.cpp diff --git a/doc/doxygen/examples/wait_callback_example.cpp b/doc/doxygen/examples/wait_callback_example.cpp new file mode 100644 index 00000000..67fe8e92 --- /dev/null +++ b/doc/doxygen/examples/wait_callback_example.cpp @@ -0,0 +1,16 @@ +#include +// ... + +int main() +{ + Example::Client client(/* ... */); + + bond::ext::gRPC::wait_callback cb; + client.AsyncExampleMethod(/* ... */, cb); + + cb.wait(); + if (cb.status().ok()) + { + DoSomeThingWith(cb.response()); + } +} diff --git a/examples/cpp/grpc/helloworld/helloworld.cpp b/examples/cpp/grpc/helloworld/helloworld.cpp index 373c1ee2..d87eac23 100644 --- a/examples/cpp/grpc/helloworld/helloworld.cpp +++ b/examples/cpp/grpc/helloworld/helloworld.cpp @@ -1,14 +1,12 @@ #include "helloworld_grpc.h" #include "helloworld_types.h" -// event.h needed for test purposes -#include - #include #include #include -#include #include +#include +#include #include #include @@ -21,9 +19,6 @@ using grpc::ClientContext; using grpc::ServerBuilder; using grpc::Status; -using bond::ext::detail::event; -using bond::ext::gRPC::io_manager; - using namespace helloworld; // Logic and data behind the server's behavior. @@ -31,8 +26,8 @@ class GreeterServiceImpl final : public Greeter::Service { void SayHello( bond::ext::gRPC::unary_call< - bond::bonded, - bond::bonded> call) override + bond::bonded, + bond::bonded> call) override { HelloRequest request = call.request().Deserialize(); @@ -43,37 +38,9 @@ class GreeterServiceImpl final : public Greeter::Service } }; -void printAndSet( - event* print_event, - bool* isCorrectResponse, - const bond::bonded& response, - const Status& status) -{ - - *isCorrectResponse = false; - - if(status.ok()) - { - const std::string& message = response.Deserialize().message; - - if (message.compare("hello world") == 0) - { - std::cout << "Correct response: " << message; - *isCorrectResponse = true; - } - else - { - std::cout << "Wrong response"; - *isCorrectResponse = false; - } - } - - print_event->set(); -} - int main() { - auto ioManager = std::make_shared(); + auto ioManager = std::make_shared(); auto threadPool = std::make_shared(); GreeterServiceImpl service; @@ -96,31 +63,32 @@ int main() HelloRequest request; request.name = user; - bond::bonded req(request); - event print_event; - bool isCorrectResponse; - std::function&, const Status&)> f_print = - [&print_event, &isCorrectResponse](bond::bonded response, Status status) - { - printAndSet(&print_event, &isCorrectResponse, response, status); - }; + bond::ext::gRPC::wait_callback cb; + greeter.AsyncSayHello(&context, bond::bonded{request}, cb); - greeter.AsyncSayHello(&context, req, f_print); - - bool waitResult = print_event.wait(std::chrono::seconds(10)); + bool waitResult = cb.wait_for(std::chrono::seconds(10)); if (!waitResult) { std::cout << "timeout ocurred"; - } - - if (waitResult && isCorrectResponse) - { - return 0; - } - else - { return 1; } + else if (!cb.status().ok()) + { + std::cout << "request failed"; + return 1; + } + + HelloReply reply; + cb.response().Deserialize(reply); + + if (reply.message.compare("hello world") != 0) + { + std::cout << "Wrong response: " << reply.message; + return 1; + } + + std::cout << "Correct response: " << reply.message; + return 0; }