Reconnect - "re-establish a bond of communication or emotion"

Adding support for reconnecting to the server if the connection is lost.
This commit is contained in:
moozzyk 2015-02-19 15:07:24 -08:00
Родитель ea08958be7
Коммит e41a8ff682
9 изменённых файлов: 1086 добавлений и 125 удалений

Просмотреть файл

@ -12,6 +12,13 @@
namespace signalr
{
// unnamed namespace makes it invisble outside this translation unit
namespace
{
// this is a workaround for a compiler bug where mutable lambdas won't sometimes compile
static void log(const logger& logger, trace_level level, const utility::string_t& entry);
}
std::shared_ptr<connection_impl> connection_impl::create(const utility::string_t& url, const utility::string_t& query_string,
trace_level trace_level, const std::shared_ptr<log_writer>& log_writer)
{
@ -27,15 +34,25 @@ namespace signalr
connection_impl::connection_impl(const utility::string_t& url, const utility::string_t& query_string, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
std::unique_ptr<web_request_factory> web_request_factory, std::unique_ptr<transport_factory> transport_factory)
: m_base_url(url), m_query_string(query_string), m_connection_state(connection_state::disconnected),
: m_base_url(url), m_query_string(query_string), m_connection_state(connection_state::disconnected), m_reconnect_delay(2000),
m_logger(log_writer, trace_level), m_transport(nullptr), m_web_request_factory(std::move(web_request_factory)),
m_transport_factory(std::move(transport_factory)), m_message_received([](const web::json::value&){})
m_transport_factory(std::move(transport_factory)), m_message_received([](const web::json::value&){}),
m_reconnecting([](){}), m_reconnected([](){}), m_disconnected([](){})
{ }
connection_impl::~connection_impl()
{
try
{
// Signaling the event is safe here. We are in the dtor so noone is using this instance. There might be some
// outstanding threads that hold on to the connection via a weak pointer but they won't be able to acquire
// the instance since it is being destroyed. Note that the event may actually be in non-signaled state here.
// This for instance happens when the connection goes out of scope while a reconnect is in progress. In this
// case the reconnect logic will not be able to acquire the connection instance from the weak_pointer to
// signal the event so this dtor would hang indefinitely. Using a shared_ptr to the connection in reconnect
// is not a good idea since it would prevent from invoking this dtor until the connection is reconnected or
// reconnection fails even if the instance actually went out of scope.
m_start_completed_event.set();
shutdown().get();
}
catch (const pplx::task_canceled&)
@ -162,9 +179,16 @@ namespace signalr
}
};
auto error_callback = [connect_request_tce](const std::exception &e)
auto error_callback = [weak_connection, connect_request_tce](const std::exception &e)
{
// no op after connection started successfully
connect_request_tce.set_exception(e);
auto connection = weak_connection.lock();
if (connection)
{
connection->reconnect();
}
};
auto transport = connection->m_transport_factory->create_transport(
@ -287,7 +311,7 @@ namespace signalr
{
m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("message_received callback threw an unknown exception.")));
utility::string_t(_XPLATSTR("message_received callback threw an unknown exception")));
// TODO: call on error callback
}
@ -319,7 +343,6 @@ namespace signalr
}
catch (const std::exception &e)
{
// TODO: call on error callback?
logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("error sending data: "))
@ -336,14 +359,34 @@ namespace signalr
return shutdown()
.then([connection]()
{
// the lock prevents a race where the user calls `stop` on a disconnected connection and calls `start`
// on a different thread at the same time. In this case we must not null out the transport if we are
// not in the `disconnecting` state to not affect the 'start' invocation.
std::lock_guard<std::mutex> lock(connection->m_stop_lock);
if (connection->change_state(connection_state::disconnecting, connection_state::disconnected))
{
// we do let the exception through (especially the task_canceled exception)
connection->m_transport = nullptr;
// the lock prevents a race where the user calls `stop` on a disconnected connection and calls `start`
// on a different thread at the same time. In this case we must not null out the transport if we are
// not in the `disconnecting` state to not affect the 'start' invocation.
std::lock_guard<std::mutex> lock(connection->m_stop_lock);
if (connection->change_state(connection_state::disconnecting, connection_state::disconnected))
{
// we do let the exception through (especially the task_canceled exception)
connection->m_transport = nullptr;
}
}
try
{
connection->m_disconnected();
}
catch (const std::exception &e)
{
connection->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("disconnected callback threw an exception: "))
.append(utility::conversions::to_string_t(e.what())));
}
catch (...)
{
connection->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("disconnected callback threw an unknown exception")));
}
});
}
@ -369,21 +412,25 @@ namespace signalr
return pplx::create_task([](){}, cts.get_token());
}
// we request a cancellation of the ongoing start request (if any) and wait until it is cancelled
// we request a cancellation of the ongoing start or reconnect request (if any) and wait until it is cancelled
m_disconnect_cts.cancel();
while (m_start_completed_event.wait(60000) != 0)
{
m_logger.log(trace_level::errors,
utility::string_t(_XPLATSTR("internal error - stopping the connection is still waiting for the start operation to finish which should have already finished or timed out")));
_XPLATSTR("internal error - stopping the connection is still waiting for the start operation to finish which should have already finished or timed out"));
}
// at this point we are either in the connected or disconnected state. If we are in the disconnected state
// we must break because the transport have already been nulled out.
if (!change_state(connection_state::connected, connection_state::disconnecting))
// at this point we are either in the connected, reconnecting or disconnected state. If we are in the disconnected state
// we must break because the transport has already been nulled out.
if (m_connection_state == connection_state::disconnected)
{
return pplx::task_from_result();
}
_ASSERTE(m_connection_state == connection_state::connected || m_connection_state == connection_state::reconnecting);
change_state(connection_state::disconnecting);
}
// This is fire and forget because we don't really care about the result
@ -397,13 +444,191 @@ namespace signalr
catch (...)
{
// We don't care about the result and even if the request failed there is not much we can do. We do
// need to observe the exception though to prevent from crash due to unobserved exception exception.
// need to observe the exception though to prevent from a crash due to unobserved exception exception.
}
});
return m_transport->disconnect();
}
void connection_impl::reconnect()
{
m_logger.log(trace_level::info, _XPLATSTR("connection lost - trying to re-establish connection"));
{
std::lock_guard<std::mutex> lock(m_stop_lock);
// reconnect might be called when starting the connection has not finished yet so wait until it is done
// before actually trying to reconnect
while (m_start_completed_event.wait(60000) != 0)
{
m_logger.log(trace_level::errors,
_XPLATSTR("internal error - reconnect is still waiting for the start operation to finish which should have already finished or timed out"));
}
// exit if starting the connection has not completed successfully or there is an ongoing stop request
if (!change_state(connection_state::connected, connection_state::reconnecting))
{
m_logger.log(trace_level::info,
_XPLATSTR("reconnecting cancelled - connection is not in the connected state"));
return;
}
// re-using the start completed event is safe because you cannot start the connection if it is not in the
// disconnected state. It also make it easier to handle stopping the connection when it is reconnecting.
m_start_completed_event.reset();
}
auto reconnect_url = url_builder::build_reconnect(m_base_url, m_transport->get_transport_type(),
m_connection_token, m_connection_data, m_message_id, m_groups_token, m_query_string);
auto weak_connection = std::weak_ptr<connection_impl>(shared_from_this());
// this is non-blocking
try_reconnect(reconnect_url, utility::datetime::utc_now().to_interval(), m_reconnect_window, m_reconnect_delay, m_disconnect_cts)
.then([weak_connection](pplx::task<bool> reconnect_task)
{
// try reconnect does not throw
auto reconnected = reconnect_task.get();
auto connection = weak_connection.lock();
if (!connection)
{
// connection instance went away - nothing to be done
return pplx::task_from_result();
}
connection->m_start_completed_event.set();
if (reconnected)
{
if (!connection->change_state(connection_state::reconnecting, connection_state::connected))
{
connection->m_logger.log(trace_level::errors,
utility::string_t(_XPLATSTR("internal error - transition from an unexpected state. expected state: reconnecting, actual state: "))
.append(translate_connection_state(connection->get_connection_state())));
_ASSERTE(false);
}
try
{
connection->m_reconnected();
}
catch (const std::exception &e)
{
connection->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnected callback threw an exception: "))
.append(utility::conversions::to_string_t(e.what())));
}
catch (...)
{
connection->m_logger.log(
trace_level::errors, _XPLATSTR("reconnected callback threw an unknown exception"));
}
return pplx::task_from_result();
}
return connection->stop();
});
// note we cannot call this before starting the reconnection loop since if the user called connection.stop() from
// the reconnecting callback they would dead lock themselves as stop would signal that reconnection attempts
// should be given up and block waiting for this to happen; as a result the reconnecting event would hung and
// prevent from starting the reconnection loop which would unblock the stop request.
try
{
m_reconnecting();
}
catch (const std::exception &e)
{
m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnecting callback threw an exception: "))
.append(utility::conversions::to_string_t(e.what())));
}
catch (...)
{
m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("reconnecting callback threw an unknown exception")));
}
}
// the assumption is that this function won't throw
pplx::task<bool> connection_impl::try_reconnect(const web::uri& reconnect_url, const utility::datetime::interval_type reconnect_start_time,
int reconnect_window /*milliseconds*/, int reconnect_delay /*milliseconds*/, pplx::cancellation_token_source disconnect_cts)
{
if (disconnect_cts.get_token().is_canceled())
{
log(m_logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto weak_connection = std::weak_ptr<connection_impl>(shared_from_this());
auto& logger = m_logger;
return m_transport->connect(reconnect_url)
.then([weak_connection, reconnect_url, reconnect_start_time, reconnect_window, reconnect_delay, logger, disconnect_cts]
(pplx::task<void> reconnect_task)
{
try
{
log(logger, trace_level::info, _XPLATSTR("reconnect attempt starting"));
reconnect_task.get();
log(logger, trace_level::info, _XPLATSTR("reconnect attempt completed successfully"));
return pplx::task_from_result<bool>(true);
}
catch (const std::exception& e)
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnect attempt failed due to: "))
.append(utility::conversions::to_string_t(e.what())));
}
if (disconnect_cts.get_token().is_canceled())
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto reconnect_window_end = reconnect_start_time + utility::datetime::from_milliseconds(reconnect_window);
if (utility::datetime::utc_now().to_interval() + utility::datetime::from_milliseconds(reconnect_delay) > reconnect_window_end)
{
utility::ostringstream_t oss;
oss << _XPLATSTR("connection could not be re-established within the configured timeout of ")
<< reconnect_window << _XPLATSTR(" milliseconds");
log(logger, trace_level::info, oss.str());
return pplx::task_from_result<bool>(false);
}
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_delay));
if (disconnect_cts.get_token().is_canceled())
{
log(logger, trace_level::info, utility::string_t(_XPLATSTR("reconnecting cancelled - connection is being stopped. line: "))
.append(utility::conversions::to_string_t(std::to_string(__LINE__))));
return pplx::task_from_result<bool>(false);
}
auto connection = weak_connection.lock();
if (connection)
{
return connection->try_reconnect(reconnect_url, reconnect_start_time, reconnect_window, reconnect_delay, disconnect_cts);
}
log(logger, trace_level::info, _XPLATSTR("reconnecting cancelled - connection no longer valid."));
return pplx::task_from_result<bool>(false);
});
}
connection_state connection_impl::get_connection_state() const
{
return m_connection_state.load();
@ -436,6 +661,30 @@ namespace signalr
m_headers = headers;
}
void connection_impl::set_reconnecting(const std::function<void()>& reconnecting)
{
ensure_disconnected("cannot set the reconnecting callback when the connection is not in the disconnected state. ");
m_reconnecting = reconnecting;
}
void connection_impl::set_reconnected(const std::function<void()>& reconnected)
{
ensure_disconnected("cannot set the reconnected callback when the connection is not in the disconnected state. ");
m_reconnected = reconnected;
}
void connection_impl::set_disconnected(const std::function<void()>& disconnected)
{
ensure_disconnected("cannot set the disconnected callback when the connection is not in the disconnected state. ");
m_disconnected = disconnected;
}
void connection_impl::set_reconnect_delay(const int reconnect_delay)
{
ensure_disconnected("cannot set reconnect delay when the connection is not in the disconnected state. ");
m_reconnect_delay = reconnect_delay;
}
void connection_impl::ensure_disconnected(const std::string& error_message)
{
auto connection_state = get_connection_state();
@ -502,4 +751,13 @@ namespace signalr
return _XPLATSTR("(unknown)");
}
}
namespace
{
// this is a workaround for the VS2013 compiler bug where mutable lambdas won't compile sometimes
static void log(const logger& logger, trace_level level, const utility::string_t& entry)
{
const_cast<signalr::logger &>(logger).log(level, entry);
}
}
}

Просмотреть файл

@ -43,8 +43,13 @@ namespace signalr
void set_message_received_string(const std::function<void(const utility::string_t&)>& message_received);
void set_message_received_json(const std::function<void(const web::json::value&)>& message_received);
void set_connection_data(const utility::string_t& connection_data);
void set_reconnecting(const std::function<void()>& reconnecting);
void set_reconnected(const std::function<void()>& reconnected);
void set_disconnected(const std::function<void()>& disconnected);
void set_headers(const std::unordered_map<utility::string_t, utility::string_t>& headers);
void set_reconnect_delay(const int reconnect_delay /*milliseconds*/);
void set_connection_data(const utility::string_t& connection_data);
private:
web::uri m_base_url;
@ -56,6 +61,9 @@ namespace signalr
std::unique_ptr<transport_factory> m_transport_factory;
std::function<void(const web::json::value&)> m_message_received;
std::function<void()> m_reconnecting;
std::function<void()> m_reconnected;
std::function<void()> m_disconnected;
std::unordered_map<utility::string_t, utility::string_t> m_headers;
pplx::cancellation_token_source m_disconnect_cts;
@ -64,6 +72,7 @@ namespace signalr
utility::string_t m_connection_token;
utility::string_t m_connection_data;
int m_reconnect_window; // in milliseconds
int m_reconnect_delay; // in milliseconds
utility::string_t m_message_id;
utility::string_t m_groups_token;
@ -77,6 +86,9 @@ namespace signalr
void process_response(const utility::string_t& response, const pplx::task_completion_event<void>& connect_request_tce);
pplx::task<void> shutdown();
void reconnect();
pplx::task<bool> try_reconnect(const web::uri& reconnect_url, const utility::datetime::interval_type reconnect_start_time,
int reconnect_window, int reconnect_delay, pplx::cancellation_token_source disconnect_cts);
bool change_state(connection_state old_state, connection_state new_state);
connection_state change_state(connection_state new_state);

Просмотреть файл

@ -14,8 +14,8 @@ namespace signalr
{
if (transport_type == transport_type::websockets)
{
auto websocket_client = std::make_shared<default_websocket_client>(headers);
return websocket_transport::create(websocket_client, logger, process_response_callback, error_callback);
return websocket_transport::create([headers](){ return std::make_shared<default_websocket_client>(headers); },
logger, process_response_callback, error_callback);
}
throw std::exception("not implemented");

Просмотреть файл

@ -7,18 +7,18 @@
namespace signalr
{
std::shared_ptr<transport> websocket_transport::create(const std::shared_ptr<websocket_client>& websocket_client,
std::shared_ptr<transport> websocket_transport::create(const std::function<std::shared_ptr<websocket_client>()>& websocket_client_factory,
const logger& logger, const std::function<void(const utility::string_t &)>& process_response_callback,
std::function<void(const std::exception&)> error_callback)
{
return std::shared_ptr<transport>(
new websocket_transport(websocket_client, logger, process_response_callback, error_callback));
new websocket_transport(websocket_client_factory, logger, process_response_callback, error_callback));
}
websocket_transport::websocket_transport(const std::shared_ptr<websocket_client>& websocket_client,
websocket_transport::websocket_transport(const std::function<std::shared_ptr<websocket_client>()>& websocket_client_factory,
const logger& logger, const std::function<void(const utility::string_t &)>& process_response_callback,
std::function<void(const std::exception&)> error_callback)
: transport(logger, process_response_callback, error_callback), m_websocket_client(websocket_client)
: transport(logger, process_response_callback, error_callback), m_websocket_client_factory(websocket_client_factory)
{
// we use this cts to check if the receive loop is running so it should be
// initially cancelled to indicate that the receive loop is not running
@ -44,65 +44,84 @@ namespace signalr
{
_ASSERTE(url.scheme() == _XPLATSTR("ws") || url.scheme() == _XPLATSTR("wss"));
if (!m_receive_loop_cts.get_token().is_canceled())
{
throw std::runtime_error("transport already connected");
}
std::lock_guard<std::mutex> lock(m_start_stop_lock);
m_logger.log(trace_level::info,
utility::string_t(_XPLATSTR("[websocket transport] connecting to: "))
if (!m_receive_loop_cts.get_token().is_canceled())
{
throw std::runtime_error("transport already connected");
}
m_logger.log(trace_level::info,
utility::string_t(_XPLATSTR("[websocket transport] connecting to: "))
.append(url.to_string()));
// TODO: prepare request (websocket_client_config)
pplx::cancellation_token_source receive_loop_cts;
pplx::task_completion_event<void> connect_tce;
auto websocket_client = m_websocket_client_factory();
auto transport = shared_from_this();
m_websocket_client->connect(url)
.then([transport, connect_tce, receive_loop_cts](pplx::task<void> connect_task)
{
try
{
connect_task.get();
transport->receive_loop(receive_loop_cts);
connect_tce.set();
std::lock_guard<std::mutex> lock(m_websocket_client_lock);
m_websocket_client = websocket_client;
}
catch (const std::exception &e)
{
transport->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("[websocket transport] exception when connecting to the server: "))
.append(utility::conversions::to_string_t(e.what())));
receive_loop_cts.cancel();
connect_tce.set_exception(std::current_exception());
}
});
// TODO: prepare request (websocket_client_config)
pplx::cancellation_token_source receive_loop_cts;
pplx::task_completion_event<void> connect_tce;
m_receive_loop_cts = receive_loop_cts;
auto transport = shared_from_this();
return pplx::create_task(connect_tce);
websocket_client->connect(url)
.then([transport, connect_tce, receive_loop_cts](pplx::task<void> connect_task)
{
try
{
connect_task.get();
transport->receive_loop(receive_loop_cts);
connect_tce.set();
}
catch (const std::exception &e)
{
transport->m_logger.log(
trace_level::errors,
utility::string_t(_XPLATSTR("[websocket transport] exception when connecting to the server: "))
.append(utility::conversions::to_string_t(e.what())));
receive_loop_cts.cancel();
connect_tce.set_exception(std::current_exception());
}
});
m_receive_loop_cts = receive_loop_cts;
return pplx::create_task(connect_tce);
}
}
pplx::task<void> websocket_transport::send(const utility::string_t &data)
{
// send will return a faulted task if client not connected
return m_websocket_client->send(data);
// send will return a faulted task if client has disconnected
return safe_get_websocket_client()->send(data);
}
pplx::task<void> websocket_transport::disconnect()
{
if (m_receive_loop_cts.get_token().is_canceled())
{
return pplx::task_from_result();
}
std::shared_ptr<websocket_client> websocket_client = nullptr;
m_receive_loop_cts.cancel();
{
std::lock_guard<std::mutex> lock(m_start_stop_lock);
if (m_receive_loop_cts.get_token().is_canceled())
{
return pplx::task_from_result();
}
m_receive_loop_cts.cancel();
websocket_client = safe_get_websocket_client();
}
auto logger = m_logger;
return m_websocket_client->close()
return websocket_client->close()
.then([logger](pplx::task<void> close_task)
mutable {
try
@ -133,7 +152,9 @@ namespace signalr
// incremented when the shared pointer is acquired and then decremented when it goes out of scope of the continuation.
auto weak_transport = std::weak_ptr<websocket_transport>(this_transport);
this_transport->m_websocket_client->receive()
auto websocket_client = this_transport->safe_get_websocket_client();
websocket_client->receive()
// There are two cases when we exit the loop. The first case is implicit - we pass the cancellation_token
// to `then` (note this is after the lambda body) and if the token is cancelled the continuation will not
// run at all. The second - explicit - case happens if the token gets cancelled after the continuation has
@ -153,7 +174,7 @@ namespace signalr
}, cts.get_token())
// this continuation is used to observe exceptions from the previous tasks. It will run always - even if one of
// the previous continuations throws or was not scheduled due to the cancellation token being set to cancelled
.then([weak_transport, logger, cts](pplx::task<void> task)
.then([weak_transport, logger, websocket_client, cts](pplx::task<void> task)
mutable {
try
{
@ -175,16 +196,16 @@ namespace signalr
utility::string_t(_XPLATSTR("[websocket transport] error receiving response from websocket: "))
.append(utility::conversions::to_string_t(e.what())));
websocket_client->close()
.then([](pplx::task<void> task)
{
try { task.get(); }
catch (...) {}
});
auto transport = weak_transport.lock();
if (transport)
{
transport->m_websocket_client->close()
.then([](pplx::task<void> task)
{
try { task.get(); }
catch (...) {}
});
transport->error(e);
}
}
@ -196,19 +217,29 @@ namespace signalr
trace_level::errors,
utility::string_t(_XPLATSTR("[websocket transport] unknown error occurred when receiving response from websocket")));
websocket_client->close()
.then([](pplx::task<void> task)
{
try { task.get(); }
catch (...) {}
});
auto transport = weak_transport.lock();
if (transport)
{
transport->m_websocket_client->close()
.then([](pplx::task<void> task)
{
try { task.get(); }
catch (...) {}
});
transport->error(std::runtime_error("unknown error"));
}
}
});
}
std::shared_ptr<websocket_client> websocket_transport::safe_get_websocket_client()
{
{
std::lock_guard<std::mutex> lock(m_websocket_client_lock);
auto websocket_client = m_websocket_client;
return websocket_client;
}
}
}

Просмотреть файл

@ -15,7 +15,7 @@ namespace signalr
class websocket_transport : public transport, public std::enable_shared_from_this<websocket_transport>
{
public:
static std::shared_ptr<transport> create(const std::shared_ptr<websocket_client>& websocket_client,
static std::shared_ptr<transport> create(const std::function<std::shared_ptr<websocket_client>()>& websocket_client_factory,
const logger& logger, const std::function<void(const utility::string_t&)>& process_response_callback,
std::function<void(const std::exception&)> error_callback);
@ -34,11 +34,14 @@ namespace signalr
transport_type get_transport_type() const override;
private:
websocket_transport(const std::shared_ptr<websocket_client>& websocket_client, const logger& logger,
const std::function<void(const utility::string_t &)>& process_response_callback,
websocket_transport(const std::function<std::shared_ptr<websocket_client>()>& websocket_client_factory,
const logger& logger, const std::function<void(const utility::string_t &)>& process_response_callback,
std::function<void(const std::exception&)> error_callback);
std::function<std::shared_ptr<websocket_client>()> m_websocket_client_factory;
std::shared_ptr<websocket_client> m_websocket_client;
std::mutex m_websocket_client_lock;
std::mutex m_start_stop_lock;
pplx::cancellation_token_source m_receive_loop_cts;
@ -46,5 +49,7 @@ namespace signalr
void handle_receive_error(const std::exception &e, pplx::cancellation_token_source cts,
logger logger, std::weak_ptr<transport> weak_transport);
std::shared_ptr<websocket_client> safe_get_websocket_client();
};
}

Просмотреть файл

@ -543,7 +543,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh
ASSERT_FALSE(log_entries.empty());
auto entry = remove_date_from_log_entry(log_entries[0]);
ASSERT_EQ(_XPLATSTR("[error ] message_received callback threw an unknown exception.\n"), entry);
ASSERT_EQ(_XPLATSTR("[error ] message_received callback threw an unknown exception\n"), entry);
}
TEST(connection_impl_set_message_received, error_logged_for_malformed_payload)
@ -626,7 +626,7 @@ TEST(connection_impl_set_message_received, unexpected_responses_logged)
ASSERT_EQ(_XPLATSTR("[info ] unexpected response received from the server: 42\n"), entry);
}
TEST(connection_impl_set_message_received, callback_can_be_set_only_in_disconnected_state)
void can_be_set_only_in_disconnected_state(std::function<void(connection_impl *)> callback, const char* expected_exception_message)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
@ -636,15 +636,57 @@ TEST(connection_impl_set_message_received, callback_can_be_set_only_in_disconnec
try
{
connection->set_message_received_string([](const utility::string_t&){});
callback(connection.get());
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::runtime_error &e)
{
ASSERT_STREQ("cannot set the callback when the connection is not in the disconnected state. current connection state: connected", e.what());
ASSERT_STREQ(expected_exception_message, e.what());
}
}
TEST(connection_impl_set_configuration, set_message_received_string_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_message_received_string([](const utility::string_t&){}); },
"cannot set the callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_message_received_json_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_message_received_json([](const web::json::value&){}); },
"cannot set the callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnecting_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnecting([](){}); },
"cannot set the reconnecting callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnected_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnected([](){}); },
"cannot set the reconnected callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_disconnected_callback_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_disconnected([](){}); },
"cannot set the disconnected callback when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_set_configuration, set_reconnect_delay_can_be_set_only_in_disconnected_state)
{
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_reconnect_delay(100); },
"cannot set reconnect delay when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_stop, stopping_disconnected_connection_is_no_op)
{
std::shared_ptr<log_writer> writer{ std::make_shared<memory_log_writer>() };
@ -861,9 +903,9 @@ TEST(connection_impl_stop, stop_ignores_exceptions_from_abort_requests)
connection->start()
.then([connection]()
{
return connection->stop();
}).get();
{
return connection->stop();
}).get();
ASSERT_EQ(connection_state::disconnected, connection->get_connection_state());
@ -875,6 +917,66 @@ TEST(connection_impl_stop, stop_ignores_exceptions_from_abort_requests)
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[3]));
}
TEST(connection_impl_stop, stop_invokes_disconnected_callback)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
auto connection = create_connection(websocket_client);
auto disconnected_invoked = false;
connection->set_disconnected([&disconnected_invoked](){ disconnected_invoked = true; });
connection->start()
.then([connection]()
{
return connection->stop();
}).get();
ASSERT_TRUE(disconnected_invoked);
}
TEST(connection_impl_stop, std_exception_for_disconnected_callback_caught_and_logged)
{
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_disconnected([](){ throw std::runtime_error("exception from disconnected"); });
connection->start()
.then([connection]()
{
return connection->stop();
}).get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(1, log_entries.size());
ASSERT_EQ(_XPLATSTR("[error ] disconnected callback threw an exception: exception from disconnected\n"), remove_date_from_log_entry(log_entries[0]));
}
TEST(connection_impl_stop, exception_for_disconnected_callback_caught_and_logged)
{
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_disconnected([](){ throw 42; });
connection->start()
.then([connection]()
{
return connection->stop();
}).get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(1, log_entries.size());
ASSERT_EQ(_XPLATSTR("[error ] disconnected callback threw an unknown exception\n"), remove_date_from_log_entry(log_entries[0]));
}
TEST(connection_impl_headers, custom_headers_set_in_requests)
{
auto writer = std::shared_ptr<log_writer>{std::make_shared<memory_log_writer>()};
@ -920,21 +1022,9 @@ TEST(connection_impl_headers, custom_headers_set_in_requests)
TEST(connection_impl_set_headers, headers_can_be_set_only_in_disconnected_state)
{
auto websocket_client = create_test_websocket_client(
/* receive function */ []() { return pplx::task_from_result(std::string("{ \"C\":\"x\", \"S\":1, \"M\":[] }")); });
auto connection = create_connection(websocket_client);
connection->start().get();
try
{
connection->set_headers(std::unordered_map<utility::string_t, utility::string_t>{});
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::runtime_error &e)
{
ASSERT_STREQ("cannot set headers when the connection is not in the disconnected state. current connection state: connected", e.what());
}
can_be_set_only_in_disconnected_state(
[](connection_impl* connection) { connection->set_headers(std::unordered_map<utility::string_t, utility::string_t>{}); },
"cannot set headers when the connection is not in the disconnected state. current connection state: connected");
}
TEST(connection_impl_change_state, change_state_logs)
@ -951,4 +1041,567 @@ TEST(connection_impl_change_state, change_state_logs)
auto entry = remove_date_from_log_entry(log_entries[0]);
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), entry);
}
}
TEST(connection_impl_reconnect, can_reconnect)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
auto connection = create_connection(websocket_client);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<pplx::event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
}
TEST(connection_impl_reconnect, successful_reconnect_state_changes)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<pplx::event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(4, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> connected\n"), remove_date_from_log_entry(log_entries[3]));
}
TEST(connection_impl_reconnect, connection_stopped_if_reconnecting_failed)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
int reconnect_invocations = 0;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[&reconnect_invocations](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
reconnect_invocations++;
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(_XPLATSTR("http://fakeuri"), _XPLATSTR(""), trace_level::state_changes,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
auto disconnected_event = std::make_shared<pplx::event>();
connection->set_disconnected([disconnected_event](){ disconnected_event->set(); });
connection->set_reconnect_delay(100);
connection->start();
ASSERT_FALSE(disconnected_event->wait(5000));
ASSERT_EQ(connection_state::disconnected, connection->get_connection_state());
ASSERT_GE(reconnect_invocations, 2);
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(5, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}
TEST(connection_impl_reconnect, reconnect_works_if_connection_dropped_during_after_init_and_before_start_successfully_completed)
{
auto connection_dropped_event = std::make_shared<pplx::event>();
auto web_request_factory = std::make_unique<test_web_request_factory>([&connection_dropped_event](const web::uri& url)
{
if (url.path() == _XPLATSTR("/start"))
{
connection_dropped_event->wait();
}
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, connection_dropped_event]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number == 1)
{
connection_dropped_event->set();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
return pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->set_reconnect_delay(100);
auto reconnected_event = std::make_shared<pplx::event>();
connection->set_reconnected([reconnected_event](){ reconnected_event->set(); });
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(4, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> connected\n"), remove_date_from_log_entry(log_entries[3]));
}
TEST(connection_impl_reconnect, reconnect_canceled_if_connection_dropped_during_start_and_start_failed)
{
auto connection_dropped_event = std::make_shared<pplx::event>();
auto web_request_factory = std::make_unique<test_web_request_factory>([&connection_dropped_event](const web::uri& url)
{
if (url.path() == _XPLATSTR("/start"))
{
connection_dropped_event->wait();
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)404, _XPLATSTR("Bad request"), _XPLATSTR("")));
}
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, connection_dropped_event]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 2);
if (call_number == 1)
{
connection_dropped_event->set();
return pplx::task_from_exception<std::string>(std::runtime_error("connection exception"));
}
return pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(_XPLATSTR("http://fakeuri"), _XPLATSTR(""), trace_level::state_changes | trace_level::info,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
try
{
connection->start().get();
ASSERT_TRUE(false); // exception expected but not thrown
}
catch (const std::exception&)
{ }
// Reconnecting happens on its own thread. If the connection is dropped after a successfull /connect but before the
// entire start sequence completes the reconnect thread is blocked to see if the starts sequence succeded or not.
// If the start sequence ultimately fails the reconnect logic will not be run - the reconnect thread will exit.
// However there is no further synchronization between start and reconnect threads so the order in which they will
// finish is not defined. Note that this does not matter for the user since they don't directly depend on/observe
// the reconnect in any way. In tests however if the start thread finishes first we can get here while the reconnect
// thread still has not finished. This would make the test fail so we need to wait until the reconnect thread finishes
// which will be when it logs a message that it is giving up reconnecting.
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 100 && memory_writer->get_log_entries().size() < 5; wait_time_ms <<= 1)
{
pplx::wait(wait_time_ms);
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(5, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[info ] [websocket transport] connecting to: ws://fakeuri/connect?transport=webSockets&clientProtocol=1.4&connectionToken=A==\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[info ] connection lost - trying to re-establish connection\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> disconnected\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[info ] reconnecting cancelled - connection is not in the connected state\n"), remove_date_from_log_entry(log_entries[4]));
}
TEST(connection_impl_reconnect, reconnect_canceled_when_connection_being_stopped)
{
std::atomic<bool> connection_started;
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, &connection_started]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}"
};
call_number = std::min(call_number + 1, 1);
return connection_started
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::state_changes | trace_level::info | trace_level::errors);
connection->set_reconnect_delay(100);
pplx::event reconnecting_event{};
connection->set_reconnecting([&reconnecting_event](){ reconnecting_event.set(); });
connection->start().then([&connection_started](){ connection_started = true; });
ASSERT_FALSE(reconnecting_event.wait(5000));
connection->stop().get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(13, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[info ] [websocket transport] connecting to: ws://fakeuri/connect?transport=webSockets&clientProtocol=1.4&connectionToken=A==\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[error ] [websocket transport] error receiving response from websocket: connection exception\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[info ] connection lost - trying to re-establish connection\n"), remove_date_from_log_entry(log_entries[4]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[5]));
ASSERT_EQ(_XPLATSTR("[info ] [websocket transport] connecting to: ws://fakeuri/reconnect?transport=webSockets&clientProtocol=1.4&connectionToken=A==&messageId=x\n"), remove_date_from_log_entry(log_entries[6]));
ASSERT_EQ(_XPLATSTR("[error ] [websocket transport] exception when connecting to the server: reconnect rejected\n"), remove_date_from_log_entry(log_entries[7]));
ASSERT_EQ(_XPLATSTR("[info ] reconnect attempt starting\n"), remove_date_from_log_entry(log_entries[8]));
ASSERT_EQ(_XPLATSTR("[info ] reconnect attempt failed due to: reconnect rejected\n"), remove_date_from_log_entry(log_entries[9]));
ASSERT_TRUE(remove_date_from_log_entry(log_entries[10]).find(_XPLATSTR("[info ] reconnecting cancelled - connection is being stopped. line")) == 0);
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[11]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[12]));
}
TEST(connection_impl_reconnect, reconnect_canceled_if_connection_goes_out_of_scope)
{
std::atomic<bool> connection_started;
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number, &connection_started]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}"
};
call_number = std::min(call_number + 1, 1);
return connection_started
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
{
auto connection = create_connection(websocket_client, writer, trace_level::state_changes);
connection->set_reconnect_delay(100);
pplx::event reconnecting_event{};
connection->set_reconnecting([&reconnecting_event](){ reconnecting_event.set(); });
connection->start().then([&connection_started](){ connection_started = true; });
ASSERT_FALSE(reconnecting_event.wait(5000));
}
// The connection_impl destructor does can be called on a different thread. This is because it is being internally
// used by tasks as a shared_ptr. As a result the dtor is being called on the thread which released the last reference.
// Therefore we need to wait block until the dtor has actually completed. Time out would most likely indicate a bug.
auto memory_writer = std::dynamic_pointer_cast<memory_log_writer>(writer);
for (int wait_time_ms = 5; wait_time_ms < 100 && memory_writer->get_log_entries().size() < 5; wait_time_ms <<= 1)
{
pplx::wait(wait_time_ms);
}
auto log_entries = memory_writer->get_log_entries();
ASSERT_EQ(5, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}
TEST(connection_impl_reconnect, std_exception_for_reconnected_reconnecting_callback_caught_and_logged)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_reconnect_delay(100);
connection->set_reconnecting([](){ throw std::runtime_error("exception from reconnecting"); });
auto reconnected_event = std::make_shared<pplx::event>();
connection->set_reconnected([reconnected_event]()
{
reconnected_event->set();
throw std::runtime_error("exception from reconnected");
});
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
connection->stop().get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(3, log_entries.size());
ASSERT_EQ(_XPLATSTR("[error ] reconnecting callback threw an exception: exception from reconnecting\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[error ] reconnected callback threw an exception: exception from reconnected\n"), remove_date_from_log_entry(log_entries[2]));
}
TEST(connection_impl_reconnect, exception_for_reconnected_reconnecting_callback_caught_and_logged)
{
int call_number = -1;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection = create_connection(websocket_client, writer, trace_level::errors);
connection->set_reconnect_delay(100);
connection->set_reconnecting([](){ throw 42; });
auto reconnected_event = std::make_shared<pplx::event>();
connection->set_reconnected([reconnected_event]()
{
reconnected_event->set();
throw 42;
});
connection->start();
ASSERT_FALSE(reconnected_event->wait(5000));
ASSERT_EQ(connection_state::connected, connection->get_connection_state());
connection->stop().get();
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(3, log_entries.size());
ASSERT_EQ(_XPLATSTR("[error ] reconnecting callback threw an unknown exception\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[error ] reconnected callback threw an unknown exception\n"), remove_date_from_log_entry(log_entries[2]));
}
TEST(connection_impl_reconnect, can_stop_connection_from_reconnecting_event)
{
auto web_request_factory = std::make_unique<test_web_request_factory>([](const web::uri& url)
{
auto response_body =
url.path() == _XPLATSTR("/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"DisconnectTimeout\" : 0.5, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start")
? _XPLATSTR("{\"Response\":\"started\" }")
: _XPLATSTR("");
return std::unique_ptr<web_request>(new web_request_stub((unsigned short)200, _XPLATSTR("OK"), response_body));
});
int call_number = -1;
int reconnect_invocations = 0;
auto websocket_client = create_test_websocket_client(
/* receive function */ [call_number]() mutable
{
std::string responses[]
{
"{ \"C\":\"x\", \"S\":1, \"M\":[] }",
"{}",
"{}",
"{}"
};
call_number = std::min(call_number + 1, 3);
return call_number == 2
? pplx::task_from_exception<std::string>(std::runtime_error("connection exception"))
: pplx::task_from_result(responses[call_number]);
},
/* send function */ [](const utility::string_t){ return pplx::task_from_exception<void>(std::runtime_error("should not be invoked")); },
/* connect function */[&reconnect_invocations](const web::uri& url)
{
if (url.path() == _XPLATSTR("/reconnect"))
{
reconnect_invocations++;
return pplx::task_from_exception<void>(std::runtime_error("reconnect rejected"));
}
return pplx::task_from_result();
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto connection =
connection_impl::create(_XPLATSTR("http://fakeuri"), _XPLATSTR(""), trace_level::state_changes,
writer, std::move(web_request_factory), std::make_unique<test_transport_factory>(websocket_client));
auto stop_event = std::make_shared<pplx::event>();
connection->set_reconnecting([&connection, stop_event]()
{
connection->stop()
.then([stop_event](){ stop_event->set(); });
});
connection->set_reconnect_delay(100);
connection->start();
ASSERT_FALSE(stop_event->wait(5000));
ASSERT_EQ(connection_state::disconnected, connection->get_connection_state());
auto log_entries = std::dynamic_pointer_cast<memory_log_writer>(writer)->get_log_entries();
ASSERT_EQ(5, log_entries.size());
ASSERT_EQ(_XPLATSTR("[state change] disconnected -> connecting\n"), remove_date_from_log_entry(log_entries[0]));
ASSERT_EQ(_XPLATSTR("[state change] connecting -> connected\n"), remove_date_from_log_entry(log_entries[1]));
ASSERT_EQ(_XPLATSTR("[state change] connected -> reconnecting\n"), remove_date_from_log_entry(log_entries[2]));
ASSERT_EQ(_XPLATSTR("[state change] reconnecting -> disconnecting\n"), remove_date_from_log_entry(log_entries[3]));
ASSERT_EQ(_XPLATSTR("[state change] disconnecting -> disconnected\n"), remove_date_from_log_entry(log_entries[4]));
}

Просмотреть файл

@ -16,7 +16,7 @@ std::shared_ptr<transport> test_transport_factory::create_transport(transport_ty
{
if (transport_type == transport_type::websockets)
{
return websocket_transport::create(m_websocket_client, logger, process_message_callback, error_callback);
return websocket_transport::create([&](){ return m_websocket_client; }, logger, process_message_callback, error_callback);
}
throw std::exception("not supported");

Просмотреть файл

@ -38,7 +38,7 @@ std::unique_ptr<web_request_factory> create_test_web_request_factory()
auto response_body =
url.path() == _XPLATSTR("/negotiate") || url.path() == _XPLATSTR("/signalr/negotiate")
? _XPLATSTR("{\"Url\":\"/signalr\", \"ConnectionToken\" : \"A==\", \"ConnectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 30.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"KeepAliveTimeout\" : 20.0, \"DisconnectTimeout\" : 10.0, \"ConnectionTimeout\" : 110.0, \"TryWebSockets\" : true, ")
_XPLATSTR("\"ProtocolVersion\" : \"1.4\", \"TransportConnectTimeout\" : 5.0, \"LongPollDelay\" : 0.0}")
: url.path() == _XPLATSTR("/start") || url.path() == _XPLATSTR("/signalr/start")
? _XPLATSTR("{\"Response\":\"started\" }")

Просмотреть файл

@ -30,7 +30,7 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop)
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto ws_transport = websocket_transport::create(client, logger(writer, trace_level::info),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::info),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org/connect?param=42")).get();
@ -53,7 +53,7 @@ TEST(websocket_transport_connect, connect_propagates_exceptions)
return pplx::task_from_exception<void>(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")));
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
try
@ -76,7 +76,7 @@ TEST(websocket_transport_connect, connect_logs_exceptions)
});
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto ws_transport = websocket_transport::create(client, logger(writer, trace_level::errors),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors),
[](const utility::string_t&){}, [](const std::exception&){});
try
@ -100,7 +100,7 @@ TEST(websocket_transport_connect, connect_logs_exceptions)
TEST(websocket_transport_connect, cannot_call_connect_on_already_connected_transport)
{
auto client = std::make_shared<test_websocket_client>();
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).wait();
@ -119,7 +119,7 @@ TEST(websocket_transport_connect, cannot_call_connect_on_already_connected_trans
TEST(websocket_transport_connect, can_connect_after_disconnecting)
{
auto client = std::make_shared<test_websocket_client>();
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -132,7 +132,7 @@ TEST(websocket_transport_connect, transport_destroyed_even_if_disconnect_not_cal
{
auto client = std::make_shared<test_websocket_client>();
{
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -164,10 +164,12 @@ TEST(websocket_transport_send, send_creates_and_sends_websocket_messages)
return pplx::task_from_result();
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->send(_XPLATSTR("ABC")).wait();
ws_transport->connect(_XPLATSTR("ws://url"))
.then([ws_transport](){ return ws_transport->send(_XPLATSTR("ABC")); })
.wait();
ASSERT_TRUE(send_called);
}
@ -184,7 +186,7 @@ TEST(websocket_transport_disconnect, disconnect_closes_websocket)
return pplx::task_from_result();
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://url"))
@ -207,7 +209,7 @@ TEST(websocket_transport_disconnect, disconnect_does_not_throw)
return pplx::task_from_exception<void>(std::exception());
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://url"))
@ -229,7 +231,7 @@ TEST(websocket_transport_disconnect, disconnect_logs_exceptions)
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto ws_transport = websocket_transport::create(client, logger(writer, trace_level::errors),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://url"))
@ -277,7 +279,7 @@ TEST(websocket_transport_disconnect, receive_not_called_after_disconnect)
return pplx::create_task(receive_task_tce);
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -302,7 +304,7 @@ TEST(websocket_transport_disconnect, disconnect_is_no_op_if_transport_not_starte
return pplx::task_from_result();
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->disconnect().get();
@ -324,7 +326,7 @@ TEST(websocket_transport_disconnect, exceptions_from_outstanding_receive_task_ob
});
});
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -376,7 +378,7 @@ void receive_loop_logs_exception_runner(const T& e, const utility::string_t& exp
std::shared_ptr<log_writer> writer(std::make_shared<memory_log_writer>());
auto ws_transport = websocket_transport::create(client, logger(writer, trace_level),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level),
[](const utility::string_t&){}, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://url"))
@ -412,7 +414,7 @@ TEST(websocket_transport_receive_loop, process_response_callback_called_when_mes
process_response_event->set();
};
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
process_response, [](const std::exception&){});
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -446,7 +448,7 @@ TEST(websocket_transport_receive_loop, error_callback_called_when_exception_thro
error_event->set();
};
auto ws_transport = websocket_transport::create(client, logger(std::make_shared<trace_log_writer>(), trace_level::none),
auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, error_callback);
ws_transport->connect(_XPLATSTR("ws://fakeuri.org")).get();
@ -460,9 +462,9 @@ TEST(websocket_transport_receive_loop, error_callback_called_when_exception_thro
TEST(websocket_transport_get_transport_type, get_transport_type_returns_websockets)
{
auto ws_transport = websocket_transport::create(
std::make_shared<default_websocket_client>(std::unordered_map<utility::string_t, utility::string_t> {}),
[](){ return std::make_shared<default_websocket_client>(std::unordered_map<utility::string_t, utility::string_t> {}); },
logger(std::make_shared<trace_log_writer>(), trace_level::none),
[](const utility::string_t&){}, [](const std::exception&){});
ASSERT_EQ(transport_type::websockets, ws_transport->get_transport_type());
}
}