This commit is contained in:
Brennan 2021-07-23 13:17:54 -07:00 коммит произвёл GitHub
Родитель d77e3eed3b
Коммит dfd1f63d39
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 240 добавлений и 15 удалений

Просмотреть файл

@ -45,6 +45,8 @@ namespace signalr
SIGNALRCLIENT_API void __cdecl set_http_headers(const std::map<std::string, std::string>& http_headers);
SIGNALRCLIENT_API void __cdecl set_scheduler(std::shared_ptr<scheduler> scheduler);
SIGNALRCLIENT_API const std::shared_ptr<scheduler>& __cdecl get_scheduler() const noexcept;
SIGNALRCLIENT_API void set_handshake_timeout(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_handshake_timeout() const noexcept;
private:
#ifdef USE_CPPRESTSDK
@ -53,5 +55,6 @@ namespace signalr
#endif
std::map<std::string, std::string> m_http_headers;
std::shared_ptr<scheduler> m_scheduler;
std::chrono::milliseconds m_handshake_timeout;
};
}

Просмотреть файл

@ -50,6 +50,11 @@ namespace signalr
m_future.get();
}
bool is_set() const
{
return m_isSet;
}
private:
completion_event_impl() : m_isSet(false)
{
@ -91,6 +96,11 @@ namespace signalr
{
m_impl->get();
}
bool is_set() const
{
return m_impl->is_set();
}
private:
std::shared_ptr<completion_event_impl> m_impl;
};

Просмотреть файл

@ -11,7 +11,7 @@
#include "message_type.h"
#include "handshake_protocol.h"
#include "signalrclient/websocket_client.h"
#include "messagepack_hub_protocol.h"
#include "signalr_default_scheduler.h"
namespace signalr
{
@ -65,6 +65,7 @@ namespace signalr
{
// start may be waiting on the handshake response so we complete it here, this no-ops if already set
connection->m_handshakeTask->set(std::make_exception_ptr(signalr_exception("connection closed while handshake was in progress.")));
connection->m_disconnect_cts->cancel();
connection->m_callback_manager.clear("connection was stopped before invocation result was received");
@ -107,6 +108,7 @@ namespace signalr
m_connection->set_client_config(m_signalr_client_config);
m_handshakeTask = std::make_shared<completion_event>();
m_disconnect_cts = std::make_shared<cancellation_token>();
m_handshakeReceived = false;
std::weak_ptr<hub_connection_impl> weak_connection = shared_from_this();
m_connection->start([weak_connection, callback](std::exception_ptr start_exception)
@ -139,10 +141,13 @@ namespace signalr
return;
}
auto handshake_request = handshake::write_handshake(connection->m_protocol);
std::shared_ptr<std::mutex> handshake_request_lock = std::make_shared<std::mutex>();
std::shared_ptr<bool> handshake_request_done = std::make_shared<bool>();
connection->m_connection->send(handshake_request, connection->m_protocol->transfer_format(), [weak_connection, callback](std::exception_ptr exception)
auto handle_handshake = [weak_connection, handshake_request_done, handshake_request_lock, callback](std::exception_ptr exception, bool fromSend)
{
assert(fromSend ? *handshake_request_done : true);
auto connection = weak_connection.lock();
if (!connection)
{
@ -151,25 +156,103 @@ namespace signalr
return;
}
if (exception)
{
callback(exception);
return;
std::lock_guard<std::mutex> lock(*handshake_request_lock);
// connection.send will be waiting on the handshake task which has been set by the caller already
if (!fromSend && *handshake_request_done == true)
{
return;
}
*handshake_request_done = true;
}
try
{
connection->m_handshakeTask->get();
callback(nullptr);
if (exception == nullptr)
{
connection->m_handshakeTask->get();
callback(nullptr);
}
}
catch (...)
{
auto handshake_exception = std::current_exception();
connection->m_connection->stop([callback, handshake_exception](std::exception_ptr)
{
callback(handshake_exception);
}, nullptr);
exception = std::current_exception();
}
if (exception != nullptr)
{
connection->m_connection->stop([callback, exception](std::exception_ptr)
{
callback(exception);
}, exception);
}
};
auto handshake_request = handshake::write_handshake(connection->m_protocol);
auto handshake_task = connection->m_handshakeTask;
auto handshake_timeout = connection->m_signalr_client_config.get_handshake_timeout();
connection->m_disconnect_cts->register_callback([handle_handshake, handshake_request_lock, handshake_request_done]()
{
{
std::lock_guard<std::mutex> lock(*handshake_request_lock);
// no op after connection.send returned, m_handshakeTask should be set before m_disconnect_cts is canceled
if (*handshake_request_done == true)
{
return;
}
}
// if the request isn't completed then no one is waiting on the handshake task
// so we need to run the callback here instead of relying on connection.send completing
// handshake_request_done is set in handle_handshake, don't set it here
handle_handshake(nullptr, false);
});
timer(connection->m_signalr_client_config.get_scheduler(),
[handle_handshake, handshake_task, handshake_timeout, handshake_request_lock](std::chrono::milliseconds duration)
{
{
std::lock_guard<std::mutex> lock(*handshake_request_lock);
// if the task is set then connection.send is either already waiting on the handshake or has completed,
// or stop has been called and will be handling the callback
if (handshake_task->is_set())
{
return true;
}
if (duration < handshake_timeout)
{
return false;
}
}
auto exception = std::make_exception_ptr(signalr_exception("timed out waiting for the server to respond to the handshake message."));
// unblocks connection.send if it's waiting on the task
handshake_task->set(exception);
handle_handshake(exception, false);
return true;
});
connection->m_connection->send(handshake_request, connection->m_protocol->transfer_format(),
[handle_handshake, handshake_request_done, handshake_request_lock](std::exception_ptr exception)
{
{
std::lock_guard<std::mutex> lock(*handshake_request_lock);
if (*handshake_request_done == true)
{
// callback ran from timer or cancellation token, nothing to do here
return;
}
// indicates that the handshake timer doesn't need to call the callback, it just needs to set the timeout exception
// handle_handshake will be waiting on the handshake completion (error or success) to call the callback
*handshake_request_done = true;
}
handle_handshake(exception, true);
});
});
}
@ -205,6 +288,8 @@ namespace signalr
return;
}
assert(connection->get_connection_state() == connection_state::disconnected);
std::vector<std::function<void(std::exception_ptr)>> callbacks;
{
@ -251,6 +336,7 @@ namespace signalr
if (found != obj.end())
{
m_handshakeTask->set(std::make_exception_ptr(signalr_exception(std::string("Received unexpected message while waiting for the handshake response."))));
return;
}
m_handshakeReceived = true;

Просмотреть файл

@ -58,6 +58,7 @@ namespace signalr
bool m_handshakeReceived;
std::shared_ptr<completion_event> m_handshakeTask;
std::function<void(std::exception_ptr)> m_disconnected;
std::shared_ptr<cancellation_token> m_disconnect_cts;
signalr_client_config m_signalr_client_config;
std::unique_ptr<hub_protocol> m_protocol;

Просмотреть файл

@ -43,6 +43,7 @@ namespace signalr
#endif
signalr_client_config::signalr_client_config()
: m_handshake_timeout(std::chrono::seconds(15))
{
m_scheduler = std::make_shared<signalr_default_scheduler>();
}
@ -76,4 +77,19 @@ namespace signalr
{
return m_scheduler;
}
void signalr_client_config::set_handshake_timeout(std::chrono::milliseconds timeout)
{
if (timeout <= std::chrono::seconds(0))
{
throw std::runtime_error("timeout must be greater than 0.");
}
m_handshake_timeout = timeout;
}
std::chrono::milliseconds signalr_client_config::get_handshake_timeout() const noexcept
{
return m_handshake_timeout;
}
}

Просмотреть файл

@ -148,9 +148,9 @@ namespace signalr
// find the first callback that is ready to run, find a thread to run it and remove it from the list
auto curr_time = std::chrono::steady_clock::now();
auto it = callbacks.begin();
auto found = false;
while (it != callbacks.end())
{
auto found = false;
if (it->second <= curr_time)
{
for (auto& thread : threads)

Просмотреть файл

@ -210,6 +210,11 @@ TEST(start, start_fails_for_handshake_response_with_error)
{
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
std::exception_ptr exception;
hub_connection.set_disconnected([&exception](std::exception_ptr ex)
{
exception = ex;
});
auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
@ -232,6 +237,80 @@ TEST(start, start_fails_for_handshake_response_with_error)
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
try
{
ASSERT_NE(nullptr, exception);
std::rethrow_exception(exception);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("Received an error during handshake: bad things", ex.what());
}
}
TEST(start, start_fails_if_non_handshake_message_received)
{
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
websocket_client->receive_message("{\"arguments\":[1,\"Foo\"],\"target\":\"Target\",\"type\":1}\x1e");
try
{
mre.get();
ASSERT_TRUE(false);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("Received unexpected message while waiting for the handshake response.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}
TEST(start, on_not_called_if_multiple_messages_received_before_handshake)
{
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
bool on_called = false;
hub_connection.on("Target", [&on_called](signalr::value)
{
on_called = true;
});
auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
websocket_client->receive_message("{\"arguments\":[1,\"Foo\"],\"target\":\"Target\",\"type\":1}\x1e{\"arguments\":[1,\"Foo\"],\"target\":\"Target\",\"type\":1}\x1e");
try
{
mre.get();
ASSERT_TRUE(false);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("Received unexpected message while waiting for the handshake response.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
ASSERT_FALSE(on_called);
}
TEST(start, start_fails_for_incomplete_handshake_response)
@ -319,6 +398,36 @@ TEST(start, start_fails_if_stop_called_before_handshake_response)
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}
TEST(start, start_fails_if_handshake_times_out)
{
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
auto config = signalr_client_config();
config.set_handshake_timeout(std::chrono::seconds(1));
hub_connection.set_client_config(config);
auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});
ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
try
{
mre.get();
ASSERT_TRUE(false);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("timed out waiting for the server to respond to the handshake message.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}
TEST(start, propogates_exception_from_negotiate)
{
auto http_client = std::make_shared<test_http_client>([](const std::string& url, http_request) -> http_response
@ -1624,7 +1733,7 @@ TEST(config, can_replace_scheduler)
mre.get();
ASSERT_EQ(6, scheduler->schedule_count);
ASSERT_EQ(7, scheduler->schedule_count);
}
class throw_hub_protocol : public hub_protocol