ThisWasTheProblem - websocket_transport no longer captures this pointer

Capturing `this` pointer in task lambdas is problematic because the task may run after the object gets out of scope or is destructed. The websocket transport tasks were rewritten to not capture `this` pointer anymore. In the future in cases where we need to be able to access members of `this` we will pass it as a smart pointer.
This commit is contained in:
Pawel Kadluczka 2014-11-06 23:23:57 -08:00 коммит произвёл moozzyk
Родитель 41838a9d81
Коммит 3b4869c4d9
3 изменённых файлов: 188 добавлений и 53 удалений

Просмотреть файл

@ -6,29 +6,62 @@
namespace signalr
{
websocket_transport::websocket_transport(std::unique_ptr<websocket_client> websocket_client)
: m_websocket_client(std::move(websocket_client))
{}
namespace
{
void receive_loop(std::shared_ptr<websocket_client> websocket_client, pplx::cancellation_token_source cts);
}
websocket_transport::websocket_transport(std::shared_ptr<websocket_client> websocket_client)
: m_websocket_client(websocket_client)
{
// we use this cts to check if the receive loop is running so it should be
// initially cancelled to indicate that the receive loop is not running
m_receive_loop_cts.cancel();
}
websocket_transport::~websocket_transport()
{
try
{
disconnect().get();
}
catch (...) // must not throw from the destructor
{}
}
pplx::task<void> websocket_transport::connect(const web::uri &url)
{
if (!m_receive_loop_cts.get_token().is_canceled())
{
throw std::runtime_error(utility::conversions::to_utf8string(
_XPLATSTR("transport already connected")));
}
// TODO: prepare request (websocket_client_config)
pplx::cancellation_token_source receive_loop_cts;
std::shared_ptr<websocket_client> websocket_client(m_websocket_client);
pplx::task_completion_event<void> connect_tce;
m_websocket_client->connect(url)
.then([this](pplx::task<void> connect_task){
.then([websocket_client, connect_tce, receive_loop_cts](pplx::task<void> connect_task)
{
try
{
connect_task.wait();
receive_loop();
this->m_connect_tce.set();
connect_task.get();
receive_loop(websocket_client, receive_loop_cts);
connect_tce.set();
}
catch (const std::exception &e)
{
this->m_connect_tce.set_exception(e);
// TODO: logging, on error(?) - see what we do in the .net client
receive_loop_cts.cancel();
connect_tce.set_exception(e);
}
});
return pplx::create_task(m_connect_tce);
m_receive_loop_cts = receive_loop_cts;
return pplx::create_task(connect_tce);
}
pplx::task<void> websocket_transport::send(const utility::string_t &data)
@ -39,32 +72,63 @@ namespace signalr
pplx::task<void> websocket_transport::disconnect()
{
return m_websocket_client->close();
m_receive_loop_cts.cancel();
return m_websocket_client->close()
.then([](pplx::task<void> close_task)
{
try
{
close_task.get();
}
catch (const std::exception &)
{
// TODO:log
}
});
}
void websocket_transport::receive_loop()
// unnamed namespace makes this function invisible for other translation units
namespace
{
// TODO: there is a race where m_websocket_client is destroyed in the destructor
// but the receive loop is not yet stopped since it is running on a different thread
// in which case we crash when try calling m_websocket_client->receive()
m_websocket_client->receive().then([this](pplx::task<std::string> receive_task)
void receive_loop(std::shared_ptr<websocket_client> websocket_client, pplx::cancellation_token_source cts)
{
try
websocket_client->receive()
.then([websocket_client, cts](pplx::task<std::string> receive_task)
{
auto msg_body = receive_task.get();
receive_loop();
}
catch (web_sockets::client::websocket_exception &)
{
// TODO: should close the websocket?
// TODO: report error
}
catch (std::exception &)
{
// TODO: should close the websocket?
// TODO: report error
}
});
try
{
auto msg_body = receive_task.get();
// TODO: process message
if (!pplx::is_task_cancellation_requested())
{
receive_loop(websocket_client, cts);
}
// TODO: `else` to log that loop has been cancelled?
return;
}
// TODO: log, report error, close websocket (when appropriate)
catch (const web_sockets::client::websocket_exception&)
{
}
catch (const pplx::task_canceled &)
{
}
catch (const std::exception&)
{
}
catch (...)
{
}
cts.cancel();
}, cts.get_token());
}
}
}

Просмотреть файл

@ -14,7 +14,9 @@ namespace signalr
{
public:
websocket_transport(std::unique_ptr<websocket_client> websocket_client);
websocket_transport(std::shared_ptr<websocket_client> websocket_client);
~websocket_transport();
websocket_transport(const websocket_transport&) = delete;
@ -27,10 +29,8 @@ namespace signalr
pplx::task<void> disconnect() override;
private:
std::unique_ptr<websocket_client> m_websocket_client;
std::shared_ptr<websocket_client> m_websocket_client;
pplx::task_completion_event<void> m_connect_tce;
void receive_loop();
pplx::cancellation_token_source m_receive_loop_cts;
};
}

Просмотреть файл

@ -12,7 +12,7 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop)
{
bool connect_called = false, receive_called = false;
auto client = std::make_unique<test_websocket_client>();
auto client = std::make_shared<test_websocket_client>();
client->set_connect_function([&connect_called](const web::uri &) -> pplx::task<void>
{
@ -20,20 +20,15 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop)
return pplx::task_from_result();
});
pplx::task_completion_event<std::string> receive_tce;
client->set_receive_function([&receive_called, &receive_tce]()->pplx::task<std::string>
client->set_receive_function([&receive_called]()->pplx::task<std::string>
{
receive_called = true;
// TODO: a workaround for a race in the receive_loop - breaks the loop
return pplx::task_from_exception<std::string>(std::exception());
return pplx::task_from_result(std::string(""));
});
websocket_transport ws_transport(std::move(client));
websocket_transport ws_transport(client);
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).wait();
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).get();
ASSERT_TRUE(connect_called);
ASSERT_TRUE(receive_called);
@ -41,13 +36,13 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop)
TEST(websocket_transport_connect, connect_propagates_exceptions)
{
auto client = std::make_unique<test_websocket_client>();
auto client = std::make_shared<test_websocket_client>();
client->set_connect_function([](const web::uri &) -> pplx::task<void>
{
throw web_sockets::client::websocket_exception(_XPLATSTR("connecting failed"));
});
websocket_transport ws_transport(std::move(client));
websocket_transport ws_transport(client);
try
{
@ -60,11 +55,41 @@ TEST(websocket_transport_connect, connect_propagates_exceptions)
}
}
TEST(websocket_transport_connect, cannot_call_connect_on_already_connected_transport)
{
auto client = std::make_shared<test_websocket_client>();
websocket_transport ws_transport(client);
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).wait();
try
{
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).wait();
ASSERT_TRUE(false); // exception not thrown
}
catch (const std::exception &e)
{
ASSERT_STREQ("transport already connected", e.what());
}
}
TEST(websocket_transport_connect, can_connect_after_disconnecting)
{
auto client = std::make_shared<test_websocket_client>();
websocket_transport ws_transport(client);
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).get();
ws_transport.disconnect().get();
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).get();
// shouldn't throw or crash
}
TEST(websocket_transport_send, send_creates_and_sends_websocket_messages)
{
bool send_called = false;
auto client = std::make_unique<test_websocket_client>();
auto client = std::make_shared<test_websocket_client>();
client->set_send_function([&send_called](const utility::string_t&) -> pplx::task<void>
{
@ -72,7 +97,7 @@ TEST(websocket_transport_send, send_creates_and_sends_websocket_messages)
return pplx::task_from_result();
});
websocket_transport ws_transport(std::move(client));
websocket_transport ws_transport(client);
ws_transport.send(_XPLATSTR("ABC")).wait();
@ -83,7 +108,7 @@ TEST(websocket_transport_disconnect, disconnect_closes_websocket)
{
bool close_called = false;
auto client = std::make_unique<test_websocket_client>();
auto client = std::make_shared<test_websocket_client>();
client->set_close_function([&close_called]() -> pplx::task<void>
{
@ -91,9 +116,55 @@ TEST(websocket_transport_disconnect, disconnect_closes_websocket)
return pplx::task_from_result();
});
websocket_transport ws_transport(std::move(client));
websocket_transport ws_transport(client);
ws_transport.disconnect().wait();
ws_transport.disconnect().get();
ASSERT_TRUE(close_called);
}
TEST(websocket_transport_disconnect, disconnect_does_not_throw)
{
auto client = std::make_shared<test_websocket_client>();
client->set_close_function([]() -> pplx::task<void>
{
return pplx::task_from_exception<void>(std::exception());
});
websocket_transport ws_transport(client);
ws_transport.disconnect().get();
}
TEST(websocket_transport_disconnect, receive_not_called_after_disconnect)
{
auto client = std::make_shared<test_websocket_client>();
pplx::task_completion_event<std::string> receive_task_tce;
client->set_close_function([&receive_task_tce]()
{
// unblock receive
receive_task_tce.set(std::string(""));
return pplx::task_from_result();
});
int num_called = 0;
client->set_receive_function([&receive_task_tce, &num_called]() -> pplx::task<std::string>
{
num_called++;
return pplx::create_task(receive_task_tce);
});
websocket_transport ws_transport(client);
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).get();
ws_transport.disconnect().get();
receive_task_tce = pplx::task_completion_event<std::string>();
ws_transport.connect(_XPLATSTR("http://fakeuri.org")).get();
ws_transport.disconnect().get();
ASSERT_EQ(2, num_called);
}