Merge pull request #883 from BillyONeal/dev/bion/threadpool

Add API to set the number of threads in the asio thread pool.
This commit is contained in:
Billy O'Neal 2018-10-04 11:52:56 -07:00 коммит произвёл GitHub
Родитель f897582260 ed2b047229
Коммит 61e4933ae5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 131 добавлений и 75 удалений

Просмотреть файл

@ -57,6 +57,17 @@ public:
virtual ~threadpool() = default;
/// <summary>
/// Initializes the cpprestsdk threadpool with a custom number of threads
/// </summary>
/// <remarks>
/// This function allows an application (in their main function) to initialize the cpprestsdk
/// threadpool with a custom threadcount. Libraries should avoid calling this function to avoid
/// a diamond problem with multiple consumers attempting to customize the pool.
/// </remarks>
/// <exception cref="std::exception">Thrown if the threadpool has already been initialized</exception>
static void initialize_with_threads(size_t num_threads);
template<typename T>
CASABLANCA_DEPRECATED("Use `.service().post(task)` directly.")
void schedule(T task)
@ -73,4 +84,3 @@ protected:
};
}

Просмотреть файл

@ -10,7 +10,9 @@
#include "pplx/threadpool.h"
#include <boost/asio/detail/thread.hpp>
#include <new>
#include <vector>
#include <type_traits>
#if defined(__ANDROID__)
#include <android/log.h>
@ -19,6 +21,34 @@
namespace
{
#if defined(__ANDROID__)
// This pointer will be 0-initialized by default (at load time).
std::atomic<JavaVM*> JVM;
JNIEnv* get_jvm_env()
{
abort_if_no_jvm();
JNIEnv* env = nullptr;
auto result = JVM.load()->AttachCurrentThread(&env, nullptr);
if (result != JNI_OK)
{
throw std::runtime_error("Could not attach to JVM");
}
return env;
}
static void abort_if_no_jvm()
{
if (JVM == nullptr)
{
__android_log_print(ANDROID_LOG_ERROR, "CPPRESTSDK", "%s",
"The CppREST SDK must be initialized before first use on android: "
"https://github.com/Microsoft/cpprestsdk/wiki/How-to-build-for-Android");
std::abort();
}
}
#endif // __ANDROID__
struct threadpool_impl final : crossplat::threadpool
{
@ -39,10 +69,16 @@ struct threadpool_impl final : crossplat::threadpool
}
}
threadpool_impl& get_shared()
{
return *this;
}
private:
void add_thread()
{
m_threads.push_back(std::unique_ptr<boost::asio::detail::thread>(new boost::asio::detail::thread([&]{ thread_start(this); })));
m_threads.push_back(std::unique_ptr<boost::asio::detail::thread>(
new boost::asio::detail::thread([&]{ thread_start(this); })));
}
#if defined(__ANDROID__)
@ -50,7 +86,7 @@ private:
{
crossplat::JVM.load()->DetachCurrentThread();
}
#endif
#endif // __ANDROID__
static void* thread_start(void *arg) CPPREST_NOEXCEPT
{
@ -58,95 +94,105 @@ private:
// Calling get_jvm_env() here forces the thread to be attached.
crossplat::get_jvm_env();
pthread_cleanup_push(detach_from_java, nullptr);
#endif
#endif // __ANDROID__
threadpool_impl* _this = reinterpret_cast<threadpool_impl*>(arg);
_this->m_service.run();
#if defined(__ANDROID__)
pthread_cleanup_pop(true);
#endif
#endif // __ANDROID__
return arg;
}
std::vector<std::unique_ptr<boost::asio::detail::thread>> m_threads;
boost::asio::io_service::work m_work;
};
#if defined(_WIN32)
struct shared_threadpool
{
typename std::aligned_union<0, threadpool_impl>::type shared_storage;
threadpool_impl& get_shared()
{
return reinterpret_cast<threadpool_impl&>(shared_storage);
}
shared_threadpool(size_t n)
{
::new (static_cast<void*>(&get_shared())) threadpool_impl(n);
}
~shared_threadpool()
{
// if linked into a DLL, the threadpool shared instance will be
// destroyed at DLL_PROCESS_DETACH, at which stage joining threads
// causes deadlock, hence this dance
bool terminate_threads = boost::asio::detail::thread::terminate_threads();
boost::asio::detail::thread::set_terminate_threads(true);
get_shared().~threadpool_impl();
boost::asio::detail::thread::set_terminate_threads(terminate_threads);
}
};
typedef shared_threadpool platform_shared_threadpool;
#else // ^^^ _WIN32 ^^^ // vvv !_WIN32 vvv //
typedef threadpool_impl platform_shared_threadpool;
#endif
std::pair<bool, platform_shared_threadpool*> initialize_shared_threadpool(size_t num_threads)
{
static typename std::aligned_union<0, platform_shared_threadpool>::type storage;
platform_shared_threadpool* const ptr =
&reinterpret_cast<platform_shared_threadpool&>(storage);
bool initialized_this_time = false;
#if defined(__ANDROID__)
// mutex based implementation due to paranoia about (lack of) call_once support on Android
// remove this if/when call_once is supported
static std::mutex mtx;
static std::atomic<bool> initialized;
abort_if_no_jvm();
if (!initialized.load())
{
std::lock_guard<std::mutex> guard(mtx);
if (!initialized.load())
{
::new (static_cast<void*>(ptr)) platform_shared_threadpool(num_threads);
initialized.store(true);
initialized_this_time = true;
}
} // also unlock
#else // ^^^ __ANDROID__ ^^^ // vvv !__ANDROID___ vvv //
static std::once_flag of;
// #if defined(__ANDROID__) // if call_once can be used for android
// abort_if_no_jvm();
// #endif // __ANDROID__
std::call_once(of, [num_threads, ptr, &initialized_this_time] {
::new (static_cast<void*>(ptr)) platform_shared_threadpool(num_threads);
initialized_this_time = true;
});
#endif // __ANDROID__
return {initialized_this_time, ptr};
}
}
namespace crossplat
{
#if defined(__ANDROID__)
// This pointer will be 0-initialized by default (at load time).
std::atomic<JavaVM*> JVM;
static void abort_if_no_jvm()
threadpool& threadpool::shared_instance()
{
if (JVM == nullptr)
return initialize_shared_threadpool(40).second->get_shared();
}
void threadpool::initialize_with_threads(size_t num_threads)
{
const auto result = initialize_shared_threadpool(num_threads);
if (!result.first)
{
__android_log_print(ANDROID_LOG_ERROR, "CPPRESTSDK", "%s", "The CppREST SDK must be initialized before first use on android: https://github.com/Microsoft/cpprestsdk/wiki/How-to-build-for-Android");
std::abort();
throw std::runtime_error("the cpprestsdk threadpool has already been initialized");
}
}
JNIEnv* get_jvm_env()
{
abort_if_no_jvm();
JNIEnv* env = nullptr;
auto result = JVM.load()->AttachCurrentThread(&env, nullptr);
if (result != JNI_OK)
{
throw std::runtime_error("Could not attach to JVM");
}
return env;
}
threadpool& threadpool::shared_instance()
{
abort_if_no_jvm();
static threadpool_impl s_shared(40);
return s_shared;
}
#elif defined(_WIN32)
// if linked into a DLL, the threadpool shared instance will be destroyed at DLL_PROCESS_DETACH,
// at which stage joining threads causes deadlock, hence this dance
threadpool& threadpool::shared_instance()
{
static bool terminate_threads = false;
static struct restore_terminate_threads
{
~restore_terminate_threads()
{
boost::asio::detail::thread::set_terminate_threads(terminate_threads);
}
} destroyed_after;
static threadpool_impl s_shared(40);
static struct enforce_terminate_threads
{
~enforce_terminate_threads()
{
terminate_threads = boost::asio::detail::thread::terminate_threads();
boost::asio::detail::thread::set_terminate_threads(true);
}
} destroyed_before;
return s_shared;
}
#else
// initialize the static shared threadpool
threadpool& threadpool::shared_instance()
{
static threadpool_impl s_shared(40);
return s_shared;
}
#endif
}
#if defined(__ANDROID__)
@ -159,4 +205,4 @@ std::unique_ptr<crossplat::threadpool> crossplat::threadpool::construct(size_t n
{
return std::unique_ptr<crossplat::threadpool>(new threadpool_impl(num_threads));
}
#endif
#endif // !defined(CPPREST_EXCLUDE_WEBSOCKETS) || !defined(_WIN32)