Preallocate web assembly workers (#9394)

Adding support for pre-allocating worker threads before the webassembly module
is compiled. This puts the work of loading and starting up the javascript in
parallel with the wasm compile, reducing overall load times.

This works with the worker pool. The pool can either create + load the workers
(default) or with `PTHREAD_POOL_DELAY_LOAD` it will only create them but leave
loading for later. (That means that when pthreads are created the actual wasm
will be loaded, but the work to create the Worker was already done earlier.)
This commit is contained in:
Tayeb Karim 2019-10-01 19:12:13 -04:00 коммит произвёл Alon Zakai
Родитель 936dc036f3
Коммит 3a142bf374
7 изменённых файлов: 273 добавлений и 46 удалений

Просмотреть файл

@ -426,6 +426,7 @@ a license to everyone to use it as detailed in LICENSE.)
* Ajay Patel <patel.ajay285@gmail.com>
* Adrien Devresse <adev@adev.name>
* Petr Penzin (petr.penzin@intel.com) (copyright owned by Intel Corporation)
* Tayeb Al Karim <tay@google.com> (copyright owned by Google, Inc.)
* Andrei Alexeyev <akari@taisei-project.org>
* Cesar Guirao Robles <cesar@no2.es>
* Mehdi Sabwat <mehdisabwat@gmail.com>

Просмотреть файл

@ -12,6 +12,11 @@ var LibraryPThread = {
schedPolicy: 0/*SCHED_OTHER*/,
schedPrio: 0
},
// Workers that have been created but uninitialized. These have already been
// parsed, but the wasm file has not yet been loaded, making it
// distinct from the unusedWorkers below. These workers will be created before
// loading wasm on the main thread.
preallocatedWorkers: [],
// Contains all Workers that are idle/unused and not currently hosting an executing pthread.
// Unused Workers can either be pooled up before page startup, but also when a pthread quits, its hosting
// Worker is not terminated, but is returned to this pool as an optimization so that starting the next thread is faster.
@ -28,6 +33,15 @@ var LibraryPThread = {
},
initMainThreadBlock: function() {
if (ENVIRONMENT_IS_PTHREAD) return undefined;
#if PTHREAD_POOL_SIZE > 0
var requestedPoolSize = {{{ PTHREAD_POOL_SIZE }}};
#if PTHREADS_DEBUG
out('Preallocating ' + requestedPoolSize + ' workers.');
#endif
PThread.preallocatedWorkers = PThread.createNewWorkers(requestedPoolSize);
#endif
PThread.mainThreadBlock = {{{ makeStaticAlloc(C_STRUCTS.pthread.__size__) }}};
for (var i = 0; i < {{{ C_STRUCTS.pthread.__size__ }}}/4; ++i) HEAPU32[PThread.mainThreadBlock/4+i] = 0;
@ -189,6 +203,15 @@ var LibraryPThread = {
}
PThread.pthreads = {};
for (var i = 0; i < PThread.preallocatedWorkers.length; ++i) {
var worker = PThread.preallocatedWorkers[i];
#if ASSERTIONS
assert(!worker.pthread); // This Worker should not be hosting a pthread at this time.
#endif
worker.terminate();
}
PThread.preallocatedWorkers = [];
for (var i = 0; i < PThread.unusedWorkers.length; ++i) {
var worker = PThread.unusedWorkers[i];
#if ASSERTIONS
@ -250,19 +273,78 @@ var LibraryPThread = {
allocateUnusedWorkers: function(numWorkers, onFinishedLoading) {
if (typeof SharedArrayBuffer === 'undefined') return; // No multithreading support, no-op.
#if PTHREADS_DEBUG
out('Preallocating ' + numWorkers + ' workers for a pthread spawn pool.');
out('Allocating ' + numWorkers + ' workers for a pthread spawn pool.');
#endif
var numWorkersLoaded = 0;
var pthreadMainJs = "{{{ PTHREAD_WORKER_FILE }}}";
// Allow HTML module to configure the location where the 'worker.js' file will be loaded from,
// via Module.locateFile() function. If not specified, then the default URL 'worker.js' relative
// to the main html file is loaded.
pthreadMainJs = locateFile(pthreadMainJs);
var workers = [];
var numWorkersToCreate = numWorkers;
if (PThread.preallocatedWorkers.length > 0) {
var workersUsed = Math.min(PThread.preallocatedWorkers.length, numWorkers);
#if PTHREADS_DEBUG
out('Using ' + workersUsed + ' preallocated workers');
#endif
workers = workers.concat(PThread.preallocatedWorkers.splice(0, workersUsed));
numWorkersToCreate -= workersUsed;
}
if (numWorkersToCreate > 0) {
workers = workers.concat(PThread.createNewWorkers(numWorkersToCreate));
}
// Add the listeners.
PThread.attachListenerToWorkers(workers, onFinishedLoading);
// Load the wasm module into the worker.
for (var i = 0; i < numWorkers; ++i) {
var worker = new Worker(pthreadMainJs);
var worker = workers[i];
#if !WASM_BACKEND
// Allocate tempDoublePtr for the worker. This is done here on the worker's behalf, since we may need to do this statically
// if the runtime has not been loaded yet, etc. - so we just use getMemory, which is main-thread only.
var tempDoublePtr = getMemory(8); // TODO: leaks. Cleanup after worker terminates.
#endif
// Ask the new worker to load up the Emscripten-compiled page. This is a heavy operation.
worker.postMessage({
cmd: 'load',
// If the application main .js file was loaded from a Blob, then it is not possible
// to access the URL of the current script that could be passed to a Web Worker so that
// it could load up the same file. In that case, developer must either deliver the Blob
// object in Module['mainScriptUrlOrBlob'], or a URL to it, so that pthread Workers can
// independently load up the same main application file.
urlOrBlob: Module['mainScriptUrlOrBlob'] || _scriptDir,
#if WASM
wasmMemory: wasmMemory,
wasmModule: wasmModule,
#if LOAD_SOURCE_MAP
wasmSourceMap: wasmSourceMap,
#endif
#if USE_OFFSET_CONVERTER
wasmOffsetConverter: wasmOffsetConverter,
#endif
#else
buffer: HEAPU8.buffer,
asmJsUrlOrBlob: Module["asmJsUrlOrBlob"],
#endif
#if !WASM_BACKEND
tempDoublePtr: tempDoublePtr,
#endif
DYNAMIC_BASE: DYNAMIC_BASE,
DYNAMICTOP_PTR: DYNAMICTOP_PTR,
PthreadWorkerInit: PthreadWorkerInit
});
PThread.unusedWorkers.push(worker);
}
},
// Attaches the listeners to the given workers. If onFinishedLoading is provided,
// will call that function when all workers have been loaded. It is assumed that no worker
// is yet loaded.
attachListenerToWorkers: function(workers, onFinishedLoading) {
var numWorkersLoaded = 0;
var numWorkers = workers.length;
for (var i = 0; i < numWorkers; ++i) {
var worker = workers[i];
(function(worker) {
worker.onmessage = function(e) {
var d = e.data;
@ -339,44 +421,25 @@ var LibraryPThread = {
err('pthread sent an error! ' + e.filename + ':' + e.lineno + ': ' + e.message);
};
}(worker));
} // for each worker
},
#if !WASM_BACKEND
// Allocate tempDoublePtr for the worker. This is done here on the worker's behalf, since we may need to do this statically
// if the runtime has not been loaded yet, etc. - so we just use getMemory, which is main-thread only.
var tempDoublePtr = getMemory(8); // TODO: leaks. Cleanup after worker terminates.
createNewWorkers: function(numWorkers) {
// Creates new workers with the discovered pthread worker file.
if (typeof SharedArrayBuffer === 'undefined') return []; // No multithreading support, no-op.
#if PTHREADS_DEBUG
out('Creating ' + numWorkers + ' workers.');
#endif
// Ask the new worker to load up the Emscripten-compiled page. This is a heavy operation.
worker.postMessage({
cmd: 'load',
// If the application main .js file was loaded from a Blob, then it is not possible
// to access the URL of the current script that could be passed to a Web Worker so that
// it could load up the same file. In that case, developer must either deliver the Blob
// object in Module['mainScriptUrlOrBlob'], or a URL to it, so that pthread Workers can
// independently load up the same main application file.
urlOrBlob: Module['mainScriptUrlOrBlob'] || _scriptDir,
#if WASM
wasmMemory: wasmMemory,
wasmModule: wasmModule,
#if LOAD_SOURCE_MAP
wasmSourceMap: wasmSourceMap,
#endif
#if USE_OFFSET_CONVERTER
wasmOffsetConverter: wasmOffsetConverter,
#endif
#else
buffer: HEAPU8.buffer,
asmJsUrlOrBlob: Module["asmJsUrlOrBlob"],
#endif
#if !WASM_BACKEND
tempDoublePtr: tempDoublePtr,
#endif
DYNAMIC_BASE: DYNAMIC_BASE,
DYNAMICTOP_PTR: DYNAMICTOP_PTR,
PthreadWorkerInit: PthreadWorkerInit
});
PThread.unusedWorkers.push(worker);
var pthreadMainJs = "{{{ PTHREAD_WORKER_FILE }}}";
// Allow HTML module to configure the location where the 'worker.js' file will be loaded from,
// via Module.locateFile() function. If not specified, then the default URL 'worker.js' relative
// to the main html file is loaded.
pthreadMainJs = locateFile(pthreadMainJs);
var newWorkers = [];
for (var i = 0; i < numWorkers; ++i) {
newWorkers.push(new Worker(pthreadMainJs));
}
return newWorkers;
},
getNewWorker: function() {

Просмотреть файл

@ -797,7 +797,7 @@ if (!ENVIRONMENT_IS_PTHREAD) addOnPreRun(function() {
});
#endif
#if PTHREAD_POOL_SIZE > 0
#if PTHREAD_POOL_SIZE > 0 && PTHREAD_POOL_DELAY_LOAD != 1
// To work around https://bugzilla.mozilla.org/show_bug.cgi?id=1049079, warm up a worker pool before starting up the application.
if (!ENVIRONMENT_IS_PTHREAD) addOnPreRun(function() { if (typeof SharedArrayBuffer !== 'undefined') { addRunDependency('pthreads'); PThread.allocateUnusedWorkers({{{PTHREAD_POOL_SIZE}}}, function() { removeRunDependency('pthreads'); }); }});
#endif

Просмотреть файл

@ -1267,9 +1267,16 @@ var IN_TEST_HARNESS = 0;
// If true, enables support for pthreads.
var USE_PTHREADS = 0;
// Specifies the number of web workers that are preallocated before runtime is
// initialized. If 0, workers are created on demand.
// PTHREAD_POOL_SIZE specifies the number of web workers that are created
// before the main runtime is initialized. If 0, workers are created on
// demand. If PTHREAD_POOL_DELAY_LOAD = 0, then the workers will be fully
// loaded (available for use) prior to the main runtime being initialized. If
// PTHREAD_POOL_DELAY_LOAD = 1, then the workers will only be created and
// have their runtimes loaded on demand after the main runtime is initialized.
// Note that this means that the workers cannot be joined from the main thread
// unless PROXY_TO_PTHREAD is used.
var PTHREAD_POOL_SIZE = 0;
var PTHREAD_POOL_DELAY_LOAD = 0;
// If not explicitly specified, this is the stack size to use for newly created
// pthreads. According to

Просмотреть файл

@ -0,0 +1,69 @@
// Copyright 2019 The Emscripten Authors. All rights reserved.
// Emscripten is available under two separate licenses, the MIT license and the
// University of Illinois/NCSA Open Source License. Both these licenses can be
// found in the LICENSE file.
#include <pthread.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <emscripten.h>
#include <emscripten/threading.h>
#include <vector>
pthread_t threads[50];
static void *thread_start(void *arg)
{
// This thread quits immediately...
pthread_exit((void*)0);
}
void CreateThread(int idx) {
int rc = pthread_create(&threads[idx], NULL, thread_start, (void*)idx);
assert(rc == 0);
}
void JoinThread(int idx) {
int rc = pthread_join(threads[idx], nullptr);
assert(rc == 0);
}
int main()
{
if (!emscripten_has_threading_support())
{
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
printf("Skipped: Threading is not supported.\n");
return 0;
}
// This test should be run with a prewarmed pool of size 50. They should be fully allocated.
assert(EM_ASM_INT(return PThread.unusedWorkers.length) == 50);
double total = 0;
for (int i = 0; i < 10; ++i) {
double t1 = emscripten_get_now();
for (int j = 0; j < 50; ++j) {
CreateThread(j);
}
double t2 = emscripten_get_now();
printf("Took %f ms to allocate 50 threads.\n", t2 - t1);
total += (t2 - t1);
// Join all the threads to clear the queue..
for (int j = 0; j < 50; ++j) {
JoinThread(j);
}
}
printf("Final average %f ms.\n", total / 10.0);
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
}

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright 2019 The Emscripten Authors. All rights reserved.
// Emscripten is available under two separate licenses, the MIT license and the
// University of Illinois/NCSA Open Source License. Both these licenses can be
// found in the LICENSE file.
#include <pthread.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <emscripten.h>
#include <emscripten/threading.h>
#include <vector>
pthread_t threads[5];
static void *thread_start(void *arg)
{
// This should be long enough for threads to pile up.
int idx = (int)arg;
printf("Starting thread %d\n", idx);
while (true) {
sleep(1);
}
printf("Finishing thread %d\n", idx);
pthread_exit((void*)0);
}
void CreateThread(int idx) {
EM_ASM(out('Main: Spawning thread '+$0+'...'), idx);
int rc = pthread_create(&threads[idx], NULL, thread_start, (void*)idx);
assert(rc == 0);
}
int main()
{
if (!emscripten_has_threading_support())
{
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
printf("Skipped: Threading is not supported.\n");
return 0;
}
// This test should be run with a prewarmed pool of size 4. None
// of the threads are allocated yet.
assert(EM_ASM_INT(return PThread.preallocatedWorkers.length) == 4);
assert(EM_ASM_INT(return PThread.unusedWorkers.length) == 0);
assert(EM_ASM_INT(return PThread.runningWorkers.length) == 0);
CreateThread(0);
// We have one running thread, allocated on demand.
assert(EM_ASM_INT(return PThread.preallocatedWorkers.length) == 3);
assert(EM_ASM_INT(return PThread.unusedWorkers.length) == 0);
assert(EM_ASM_INT(return PThread.runningWorkers.length) == 1);
for (int i = 1; i < 5; ++i) {
CreateThread(i);
}
// All the preallocated workers should be used.
// We can't join the threads or we'll hang forever. The main thread
// won't give up the thread to let the 5th thread be created. This is
// solved in non-test cases by using PROXY_TO_PTHREAD, but we can't
// do that here since we need to eval the length of the various pthread
// arrays.
assert(EM_ASM_INT(return PThread.runningWorkers.length) == 5);
assert(EM_ASM_INT(return PThread.unusedWorkers.length) == 0);
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
}

10
tests/test_browser.py поставляемый
Просмотреть файл

@ -3660,6 +3660,16 @@ window.close = function() {
test(['-O3'])
test(['-s', 'MODULARIZE_INSTANCE=1'])
# Test that preallocating worker threads work.
@requires_threads
def test_pthread_preallocates_workers(self):
self.btest(path_from_root('tests', 'pthread', 'test_pthread_preallocates_workers.cpp'), expected='0', args=['-O3', '-s', '-s', 'USE_PTHREADS=1', '-s', 'PTHREAD_POOL_SIZE=4', '-s', 'PTHREAD_POOL_DELAY_LOAD=1'])
# Test that allocating a lot of threads doesn't regress. This needs to be checked manually!
@requires_threads
def test_pthread_large_pthread_allocation(self):
self.btest(path_from_root('tests', 'pthread', 'test_large_pthread_allocation.cpp'), expected='0', args=['-s', 'TOTAL_MEMORY=128MB', '-O3', '-s', '-s', 'USE_PTHREADS=1', '-s', 'PTHREAD_POOL_SIZE=50'], message='Check output from test to ensure that a regression in time it takes to allocate the threads has not occurred.')
# Tests the -s PROXY_TO_PTHREAD=1 option.
@requires_threads
def test_pthread_proxy_to_pthread(self):