[c++ grpc] Simplify service_unary_call_data

The change avoids the explicit call to
`service_unary_call_data::queue_receive` in the generated code.
This commit is contained in:
Ara Ayvazyan 2018-05-09 13:14:02 -07:00 коммит произвёл Christopher Warrington
Родитель 6625f63c46
Коммит 155d2393bd
10 изменённых файлов: 98 добавлений и 316 удалений

Просмотреть файл

@ -117,7 +117,7 @@ public:
std::shared_ptr< ::bond::ext::gRPC::io_manager> _ioManager;
std::shared_ptr<TThreadPool> _threadPool;
#{doubleLineSep 2 privateProxyMethodDecl serviceMethods}
#{newlineSep 2 privateProxyMethodDecl serviceMethods}
};
using Client = #{proxyName}< ::bond::ext::gRPC::thread_pool>;
@ -131,7 +131,6 @@ public:
#{newlineSep 3 serviceAddMethod serviceMethods}
}
virtual ~#{serviceName}() { }
#{serviceStartMethod}
#{newlineSep 2 serviceVirtualMethod serviceMethods}
@ -320,37 +319,17 @@ inline void #{className}::#{proxyName}<TThreadPool>::Async#{methodName}(
BOOST_ASSERT(#{tpParam});
#{newlineSep 3 initMethodReceiveData serviceMethodsWithIndex}
#{newlineSep 3 queueReceive serviceMethodsWithIndex}
}|]
where cqParam = uniqueName "cq" methodNames
tpParam = uniqueName "tp" methodNames
initMethodReceiveData (index,Function{..}) = [lt|#{serviceRdMember methodName}.emplace(
this,
initMethodReceiveData (index,Function{..}) = initMethodReceiveDataContent index methodName
initMethodReceiveData (index,Event{..}) = initMethodReceiveDataContent index methodName
initMethodReceiveDataContent index methodName = [lt|#{serviceRdMember methodName}.emplace(
*this,
#{index},
#{cqParam},
#{tpParam},
std::bind(&#{serviceName}::#{methodName}, this, std::placeholders::_1));|]
initMethodReceiveData (index,Event{..}) = [lt|#{serviceRdMember methodName}.emplace(
this,
#{index},
#{cqParam},
#{tpParam},
std::bind(&#{serviceName}::#{methodName}, this, std::placeholders::_1));|]
queueReceive (index,Function{..}) = [lt|this->queue_receive(
#{index},
&#{serviceRdMember methodName}->_receivedCall->context(),
&#{serviceRdMember methodName}->_receivedCall->request(),
&#{serviceRdMember methodName}->_receivedCall->responder(),
#{cqParam},
&#{serviceRdMember methodName}.get());|]
queueReceive (index,Event{..}) = [lt|this->queue_receive(
#{index},
&#{serviceRdMember methodName}->_receivedCall->context(),
&#{serviceRdMember methodName}->_receivedCall->request(),
&#{serviceRdMember methodName}->_receivedCall->responder(),
#{cqParam},
&#{serviceRdMember methodName}.get());|]
serviceMethodReceiveData Function{..} = [lt|::boost::optional< ::bond::ext::gRPC::detail::service_unary_call_data< #{bonded (methodTypeToMaybe methodInput)}, #{payload (methodTypeToMaybe methodResult)}, TThreadPool>> #{serviceRdMember methodName};|]
serviceMethodReceiveData Event{..} = [lt|::boost::optional< ::bond::ext::gRPC::detail::service_unary_call_data< #{bonded (methodTypeToMaybe methodInput)}, #{payload Nothing}, TThreadPool>> #{serviceRdMember methodName};|]

Просмотреть файл

@ -127,13 +127,9 @@ public:
std::shared_ptr<TThreadPool> _threadPool;
const ::grpc::internal::RpcMethod rpcmethod_foo31_;
const ::grpc::internal::RpcMethod rpcmethod_foo32_;
const ::grpc::internal::RpcMethod rpcmethod_foo33_;
const ::grpc::internal::RpcMethod rpcmethod_ConsumesGeneric1_;
const ::grpc::internal::RpcMethod rpcmethod_ConsumesGeneric2_;
};
@ -152,7 +148,6 @@ public:
this->AddMethod("/tests.Foo/ConsumesGeneric2");
}
virtual ~ServiceCore() { }
virtual void start(
::grpc::ServerCompletionQueue* cq,
std::shared_ptr<TThreadPool> tp) override
@ -161,71 +156,35 @@ public:
BOOST_ASSERT(tp);
_rd_foo31.emplace(
this,
*this,
0,
cq,
tp,
std::bind(&ServiceCore::foo31, this, std::placeholders::_1));
_rd_foo32.emplace(
this,
*this,
1,
cq,
tp,
std::bind(&ServiceCore::foo32, this, std::placeholders::_1));
_rd_foo33.emplace(
this,
*this,
2,
cq,
tp,
std::bind(&ServiceCore::foo33, this, std::placeholders::_1));
_rd_ConsumesGeneric1.emplace(
this,
*this,
3,
cq,
tp,
std::bind(&ServiceCore::ConsumesGeneric1, this, std::placeholders::_1));
_rd_ConsumesGeneric2.emplace(
this,
*this,
4,
cq,
tp,
std::bind(&ServiceCore::ConsumesGeneric2, this, std::placeholders::_1));
this->queue_receive(
0,
&_rd_foo31->_receivedCall->context(),
&_rd_foo31->_receivedCall->request(),
&_rd_foo31->_receivedCall->responder(),
cq,
&_rd_foo31.get());
this->queue_receive(
1,
&_rd_foo32->_receivedCall->context(),
&_rd_foo32->_receivedCall->request(),
&_rd_foo32->_receivedCall->responder(),
cq,
&_rd_foo32.get());
this->queue_receive(
2,
&_rd_foo33->_receivedCall->context(),
&_rd_foo33->_receivedCall->request(),
&_rd_foo33->_receivedCall->responder(),
cq,
&_rd_foo33.get());
this->queue_receive(
3,
&_rd_ConsumesGeneric1->_receivedCall->context(),
&_rd_ConsumesGeneric1->_receivedCall->request(),
&_rd_ConsumesGeneric1->_receivedCall->responder(),
cq,
&_rd_ConsumesGeneric1.get());
this->queue_receive(
4,
&_rd_ConsumesGeneric2->_receivedCall->context(),
&_rd_ConsumesGeneric2->_receivedCall->request(),
&_rd_ConsumesGeneric2->_receivedCall->responder(),
cq,
&_rd_ConsumesGeneric2.get());
}
virtual void foo31(::bond::ext::gRPC::unary_call< ::bond::bonded<Payload>, ::bond::Void>) = 0;

Просмотреть файл

@ -91,7 +91,6 @@ public:
this->AddMethod("/tests.Foo/foo");
}
virtual ~ServiceCore() { }
virtual void start(
::grpc::ServerCompletionQueue* cq,
std::shared_ptr<TThreadPool> tp) override
@ -100,19 +99,11 @@ public:
BOOST_ASSERT(tp);
_rd_foo.emplace(
this,
*this,
0,
cq,
tp,
std::bind(&ServiceCore::foo, this, std::placeholders::_1));
this->queue_receive(
0,
&_rd_foo->_receivedCall->context(),
&_rd_foo->_receivedCall->request(),
&_rd_foo->_receivedCall->responder(),
cq,
&_rd_foo.get());
}
virtual void foo(::bond::ext::gRPC::unary_call< ::bond::bonded< ::tests::Param>, ::tests::Result>) = 0;

Просмотреть файл

@ -265,43 +265,24 @@ public:
std::shared_ptr<TThreadPool> _threadPool;
const ::grpc::internal::RpcMethod rpcmethod_foo11_;
const ::grpc::internal::RpcMethod rpcmethod_foo12_;
const ::grpc::internal::RpcMethod rpcmethod_foo12_impl_;
const ::grpc::internal::RpcMethod rpcmethod_foo13_;
const ::grpc::internal::RpcMethod rpcmethod_foo14_;
const ::grpc::internal::RpcMethod rpcmethod_foo15_;
const ::grpc::internal::RpcMethod rpcmethod_foo21_;
const ::grpc::internal::RpcMethod rpcmethod_foo22_;
const ::grpc::internal::RpcMethod rpcmethod_foo23_;
const ::grpc::internal::RpcMethod rpcmethod_foo24_;
const ::grpc::internal::RpcMethod rpcmethod_foo31_;
const ::grpc::internal::RpcMethod rpcmethod_foo32_;
const ::grpc::internal::RpcMethod rpcmethod_foo33_;
const ::grpc::internal::RpcMethod rpcmethod__rd_foo33_;
const ::grpc::internal::RpcMethod rpcmethod_foo34_;
const ::grpc::internal::RpcMethod rpcmethod_foo41_;
const ::grpc::internal::RpcMethod rpcmethod_foo42_;
const ::grpc::internal::RpcMethod rpcmethod_foo43_;
const ::grpc::internal::RpcMethod rpcmethod_foo44_;
const ::grpc::internal::RpcMethod rpcmethod_cq_;
};
@ -335,7 +316,6 @@ public:
this->AddMethod("/tests.Foo/cq");
}
virtual ~ServiceCore() { }
virtual void start(
::grpc::ServerCompletionQueue* cq0,
std::shared_ptr<TThreadPool> tp) override
@ -344,266 +324,125 @@ public:
BOOST_ASSERT(tp);
_rd_foo11.emplace(
this,
*this,
0,
cq0,
tp,
std::bind(&ServiceCore::foo11, this, std::placeholders::_1));
_rd_foo12.emplace(
this,
*this,
1,
cq0,
tp,
std::bind(&ServiceCore::foo12, this, std::placeholders::_1));
_rd_foo12_impl.emplace(
this,
*this,
2,
cq0,
tp,
std::bind(&ServiceCore::foo12_impl, this, std::placeholders::_1));
_rd_foo13.emplace(
this,
*this,
3,
cq0,
tp,
std::bind(&ServiceCore::foo13, this, std::placeholders::_1));
_rd_foo14.emplace(
this,
*this,
4,
cq0,
tp,
std::bind(&ServiceCore::foo14, this, std::placeholders::_1));
_rd_foo15.emplace(
this,
*this,
5,
cq0,
tp,
std::bind(&ServiceCore::foo15, this, std::placeholders::_1));
_rd_foo21.emplace(
this,
*this,
6,
cq0,
tp,
std::bind(&ServiceCore::foo21, this, std::placeholders::_1));
_rd_foo22.emplace(
this,
*this,
7,
cq0,
tp,
std::bind(&ServiceCore::foo22, this, std::placeholders::_1));
_rd_foo23.emplace(
this,
*this,
8,
cq0,
tp,
std::bind(&ServiceCore::foo23, this, std::placeholders::_1));
_rd_foo24.emplace(
this,
*this,
9,
cq0,
tp,
std::bind(&ServiceCore::foo24, this, std::placeholders::_1));
_rd_foo31.emplace(
this,
*this,
10,
cq0,
tp,
std::bind(&ServiceCore::foo31, this, std::placeholders::_1));
_rd_foo32.emplace(
this,
*this,
11,
cq0,
tp,
std::bind(&ServiceCore::foo32, this, std::placeholders::_1));
_rd_foo330.emplace(
this,
*this,
12,
cq0,
tp,
std::bind(&ServiceCore::foo33, this, std::placeholders::_1));
_rd__rd_foo33.emplace(
this,
*this,
13,
cq0,
tp,
std::bind(&ServiceCore::_rd_foo33, this, std::placeholders::_1));
_rd_foo34.emplace(
this,
*this,
14,
cq0,
tp,
std::bind(&ServiceCore::foo34, this, std::placeholders::_1));
_rd_foo41.emplace(
this,
*this,
15,
cq0,
tp,
std::bind(&ServiceCore::foo41, this, std::placeholders::_1));
_rd_foo42.emplace(
this,
*this,
16,
cq0,
tp,
std::bind(&ServiceCore::foo42, this, std::placeholders::_1));
_rd_foo43.emplace(
this,
*this,
17,
cq0,
tp,
std::bind(&ServiceCore::foo43, this, std::placeholders::_1));
_rd_foo44.emplace(
this,
*this,
18,
cq0,
tp,
std::bind(&ServiceCore::foo44, this, std::placeholders::_1));
_rd_cq.emplace(
this,
*this,
19,
cq0,
tp,
std::bind(&ServiceCore::cq, this, std::placeholders::_1));
this->queue_receive(
0,
&_rd_foo11->_receivedCall->context(),
&_rd_foo11->_receivedCall->request(),
&_rd_foo11->_receivedCall->responder(),
cq0,
&_rd_foo11.get());
this->queue_receive(
1,
&_rd_foo12->_receivedCall->context(),
&_rd_foo12->_receivedCall->request(),
&_rd_foo12->_receivedCall->responder(),
cq0,
&_rd_foo12.get());
this->queue_receive(
2,
&_rd_foo12_impl->_receivedCall->context(),
&_rd_foo12_impl->_receivedCall->request(),
&_rd_foo12_impl->_receivedCall->responder(),
cq0,
&_rd_foo12_impl.get());
this->queue_receive(
3,
&_rd_foo13->_receivedCall->context(),
&_rd_foo13->_receivedCall->request(),
&_rd_foo13->_receivedCall->responder(),
cq0,
&_rd_foo13.get());
this->queue_receive(
4,
&_rd_foo14->_receivedCall->context(),
&_rd_foo14->_receivedCall->request(),
&_rd_foo14->_receivedCall->responder(),
cq0,
&_rd_foo14.get());
this->queue_receive(
5,
&_rd_foo15->_receivedCall->context(),
&_rd_foo15->_receivedCall->request(),
&_rd_foo15->_receivedCall->responder(),
cq0,
&_rd_foo15.get());
this->queue_receive(
6,
&_rd_foo21->_receivedCall->context(),
&_rd_foo21->_receivedCall->request(),
&_rd_foo21->_receivedCall->responder(),
cq0,
&_rd_foo21.get());
this->queue_receive(
7,
&_rd_foo22->_receivedCall->context(),
&_rd_foo22->_receivedCall->request(),
&_rd_foo22->_receivedCall->responder(),
cq0,
&_rd_foo22.get());
this->queue_receive(
8,
&_rd_foo23->_receivedCall->context(),
&_rd_foo23->_receivedCall->request(),
&_rd_foo23->_receivedCall->responder(),
cq0,
&_rd_foo23.get());
this->queue_receive(
9,
&_rd_foo24->_receivedCall->context(),
&_rd_foo24->_receivedCall->request(),
&_rd_foo24->_receivedCall->responder(),
cq0,
&_rd_foo24.get());
this->queue_receive(
10,
&_rd_foo31->_receivedCall->context(),
&_rd_foo31->_receivedCall->request(),
&_rd_foo31->_receivedCall->responder(),
cq0,
&_rd_foo31.get());
this->queue_receive(
11,
&_rd_foo32->_receivedCall->context(),
&_rd_foo32->_receivedCall->request(),
&_rd_foo32->_receivedCall->responder(),
cq0,
&_rd_foo32.get());
this->queue_receive(
12,
&_rd_foo330->_receivedCall->context(),
&_rd_foo330->_receivedCall->request(),
&_rd_foo330->_receivedCall->responder(),
cq0,
&_rd_foo330.get());
this->queue_receive(
13,
&_rd__rd_foo33->_receivedCall->context(),
&_rd__rd_foo33->_receivedCall->request(),
&_rd__rd_foo33->_receivedCall->responder(),
cq0,
&_rd__rd_foo33.get());
this->queue_receive(
14,
&_rd_foo34->_receivedCall->context(),
&_rd_foo34->_receivedCall->request(),
&_rd_foo34->_receivedCall->responder(),
cq0,
&_rd_foo34.get());
this->queue_receive(
15,
&_rd_foo41->_receivedCall->context(),
&_rd_foo41->_receivedCall->request(),
&_rd_foo41->_receivedCall->responder(),
cq0,
&_rd_foo41.get());
this->queue_receive(
16,
&_rd_foo42->_receivedCall->context(),
&_rd_foo42->_receivedCall->request(),
&_rd_foo42->_receivedCall->responder(),
cq0,
&_rd_foo42.get());
this->queue_receive(
17,
&_rd_foo43->_receivedCall->context(),
&_rd_foo43->_receivedCall->request(),
&_rd_foo43->_receivedCall->responder(),
cq0,
&_rd_foo43.get());
this->queue_receive(
18,
&_rd_foo44->_receivedCall->context(),
&_rd_foo44->_receivedCall->request(),
&_rd_foo44->_receivedCall->responder(),
cq0,
&_rd_foo44.get());
this->queue_receive(
19,
&_rd_cq->_receivedCall->context(),
&_rd_cq->_receivedCall->request(),
&_rd_cq->_receivedCall->responder(),
cq0,
&_rd_cq.get());
}
virtual void foo11(::bond::ext::gRPC::unary_call< ::bond::bonded< ::bond::Void>, ::bond::Void>) = 0;

Просмотреть файл

@ -55,11 +55,6 @@ namespace bond { namespace ext { namespace gRPC {
status(status),
context(std::move(context))
{ }
unary_call_result(const unary_call_result&) = default;
unary_call_result(unary_call_result&&) = default;
unary_call_result& operator=(const unary_call_result&) = default;
unary_call_result& operator=(unary_call_result&&) = default;
};
} } } // namespace bond::ext::gRPC

Просмотреть файл

@ -35,7 +35,8 @@ template <typename TThreadPool>
class service : private grpc::Service
{
public:
virtual ~service() { }
service(const service& other) = delete;
service& operator=(const service& other) = delete;
/// @brief Starts the service.
///
@ -95,6 +96,8 @@ public:
}
protected:
service() = default;
/// @brief Registers a method name for dispatch to this service.
///
/// @note This method is for use by generated and helper code only.
@ -106,15 +109,12 @@ protected:
BOOST_ASSERT(methodName);
// ownership of the service method is transfered to grpc::Service
AddMethod(
grpc::Service::AddMethod(
new grpc::internal::RpcServiceMethod(
methodName,
grpc::internal::RpcMethod::NORMAL_RPC,
nullptr)); // nullptr indicates async handler
}
private:
using grpc::Service::AddMethod;
};
} } } } //namespace bond::ext::gRPC::detail

Просмотреть файл

@ -42,47 +42,33 @@ namespace bond { namespace ext { namespace gRPC { namespace detail {
/// has been enqueued in the thread pool, detail::service_unary_call_data
/// re-enqueues itself to get the next call.
template <typename TRequest, typename TResponse, typename TThreadPool>
struct service_unary_call_data : io_manager_tag
class service_unary_call_data : io_manager_tag
{
/// The type of the user-defined callback that will be invoked upon receipt
/// of this call.
typedef std::function<void(unary_call<TRequest, TResponse> call)> CallbackType;
using uc_impl = unary_call_impl<TRequest, TResponse>;
/// The service implementing the method.
service<TThreadPool>* _service;
/// The index of the method. Method indices correspond to the order in
/// which they were registered with detail::service::AddMethod
int _methodIndex;
/// The completion port to post IO operations to.
grpc::ServerCompletionQueue* _cq;
/// The thread pool implementation to use to invoke the user callback.
std::shared_ptr<TThreadPool> _threadPool;
/// The user code to invoke when a call to this method is received.
CallbackType _cb;
/// Individual state for one specific call to this method.
std::unique_ptr<uc_impl> _receivedCall;
public:
template <typename Callback>
service_unary_call_data(
service<TThreadPool>* service,
service<TThreadPool>& service,
int methodIndex,
grpc::ServerCompletionQueue* cq,
std::shared_ptr<TThreadPool> threadPool,
CallbackType cb)
Callback&& cb)
: _service(service),
_methodIndex(methodIndex),
_cq(cq),
_threadPool(threadPool),
_cb(cb),
_receivedCall(new uc_impl)
_threadPool(std::move(threadPool)),
_cb(std::forward<Callback>(cb)),
_receivedCall()
{
BOOST_ASSERT(service);
BOOST_ASSERT(cq);
BOOST_ASSERT(threadPool);
BOOST_ASSERT(cb);
BOOST_ASSERT(_cq);
BOOST_ASSERT(_threadPool);
BOOST_ASSERT(_cb);
queue_receive();
}
service_unary_call_data(const service_unary_call_data& other) = delete;
service_unary_call_data& operator=(const service_unary_call_data& other) = delete;
void invoke(bool ok) override
{
if (ok)
@ -105,21 +91,46 @@ struct service_unary_call_data : io_manager_tag
});
}
// create new state for the next request that will be received
_receivedCall.reset(new uc_impl);
_service->queue_receive(
_methodIndex,
&_receivedCall->context(),
&_receivedCall->request(),
&_receivedCall->responder(),
_cq,
this);
queue_receive();
}
else
{
// we're shutting down, so don't requeue
}
}
private:
void queue_receive()
{
BOOST_ASSERT(!_receivedCall);
// create new state for the next request that will be received
_receivedCall.reset(new uc_impl);
_service.queue_receive(
_methodIndex,
&_receivedCall->context(),
&_receivedCall->request(),
&_receivedCall->responder(),
_cq,
this);
}
using uc_impl = unary_call_impl<TRequest, TResponse>;
/// The service implementing the method.
service<TThreadPool>& _service;
/// The index of the method. Method indices correspond to the order in
/// which they were registered with detail::service::AddMethod
const int _methodIndex;
/// The completion port to post IO operations to.
grpc::ServerCompletionQueue* _cq;
/// The thread pool implementation to use to invoke the user callback.
std::shared_ptr<TThreadPool> _threadPool;
/// The user code to invoke when a call to this method is received.
std::function<void(unary_call<TRequest, TResponse>)> _cb;
/// Individual state for one specific call to this method.
std::unique_ptr<uc_impl> _receivedCall;
};
} } } } //namespace bond::ext::gRPC::detail

Просмотреть файл

@ -16,7 +16,7 @@ namespace bond { namespace ext { namespace gRPC {
namespace detail {
template <typename TRequest, typename TResponse, typename TThreadPool>
struct service_unary_call_data;
class service_unary_call_data;
} // namespace detail
@ -41,7 +41,6 @@ class shared_unary_call;
template <typename TRequest, typename TResponse>
class unary_call final : public detail::unary_call_base<TRequest, TResponse>
{
using impl_type = detail::unary_call_impl<TRequest, TResponse>;
using base_type = detail::unary_call_base<TRequest, TResponse>;
public:
@ -83,8 +82,8 @@ public:
}
private:
template <typename SUCDRequest, typename SUCDResponse, typename SUCDThreadPool>
friend struct detail::service_unary_call_data;
template <typename OtherRequest, typename OtherResponse, typename OtherThreadPool>
friend class detail::service_unary_call_data;
using base_type::base_type;
};

Просмотреть файл

@ -4,14 +4,18 @@
#include "services_grpc.h"
#include <boost/test/unit_test.hpp>
#include <boost/static_assert.hpp>
#include <algorithm>
#include <iterator>
#include <map>
#include <string>
#include <type_traits>
using unit_test::SimpleService;
BOOST_STATIC_ASSERT(std::has_virtual_destructor<SimpleService::Service>::value);
static bool AttributeMapsEqual(
const std::map<std::string, std::string>& lhs,
const std::map<std::string, std::string>& rhs)

Просмотреть файл

@ -14,12 +14,17 @@
#include "event.h"
#include <boost/optional.hpp>
#include <boost/static_assert.hpp>
#include <atomic>
#include <thread>
#include <type_traits>
namespace wait_callback_tests
{
BOOST_STATIC_ASSERT(std::is_copy_constructible<bond::ext::gRPC::unary_call_result<bond::Box<int>>>::value);
BOOST_STATIC_ASSERT(std::is_move_constructible<bond::ext::gRPC::unary_call_result<bond::Box<int>>>::value);
using wait_callbackBox = bond::ext::gRPC::wait_callback<bond::Box<int>>;
using callback_arg = wait_callbackBox::arg_type;