1604191: Python: run Glean work on a worker thread (#783)

* 1604191: Python: run Glean work on a worker thread

* Implement tests

* Add another test

* Apply suggestions from code review

Co-Authored-By: William Lachance <wrlach@gmail.com>

* Convert docstring to comment

* Add docs about parallelism

* lint

* Remove unnecessary layer of indirection

Co-authored-by: William Lachance <wrlach@gmail.com>
This commit is contained in:
Michael Droettboom 2020-03-27 09:18:25 -04:00 коммит произвёл GitHub
Родитель f2e1a20474
Коммит b11564f0e7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 290 добавлений и 23 удалений

Просмотреть файл

@ -20,6 +20,7 @@
* Python: * Python:
* **Breaking Change**: The `glean.util` and `glean.hardware` modules, which * **Breaking Change**: The `glean.util` and `glean.hardware` modules, which
were unintentionally public, have been made private. were unintentionally public, have been made private.
* Most Glean work and I/O is now done on its own worker thread. This brings the parallelism Python in line with the other platforms.
* The timing distribution, memory distribution, string list, labeled boolean and labeled string metric types are now supported in Python ([#762](https://github.com/mozilla/glean/pull/762), [#763](https://github.com/mozilla/glean/pull/763), [#765](https://github.com/mozilla/glean/pull/765), [#766](https://github.com/mozilla/glean/pull/766)) * The timing distribution, memory distribution, string list, labeled boolean and labeled string metric types are now supported in Python ([#762](https://github.com/mozilla/glean/pull/762), [#763](https://github.com/mozilla/glean/pull/763), [#765](https://github.com/mozilla/glean/pull/765), [#766](https://github.com/mozilla/glean/pull/766))
# v25.1.0 (2020-02-26) # v25.1.0 (2020-02-26)

Просмотреть файл

@ -266,6 +266,10 @@ Please refer to the [custom pings documentation](pings/custom.md).
> **Important**: as stated [before](adding-glean-to-your-project.md#before-using-glean), any new data collection requires documentation and data-review. > **Important**: as stated [before](adding-glean-to-your-project.md#before-using-glean), any new data collection requires documentation and data-review.
> This is also required for any new metric automatically collected by the Glean SDK. > This is also required for any new metric automatically collected by the Glean SDK.
### Parallelism
All of Glean's target languages use a separate worker thread to do most of Glean's work, including any I/O. This thread is fully managed by Glean as an implementation detail. Therefore, users should be free to use the Glean API wherever it is most convenient, without worrying about the performance impact of updating metrics and sending pings.
### Testing metrics ### Testing metrics
In order to make testing metrics easier 'out of the box', all metrics include a set of test API functions in order to facilitate unit testing. These include functions to test whether a value has been stored, and functions to retrieve the stored value for validation. For more information, please refer to [Unit testing Glean metrics](testing-metrics.md). In order to make testing metrics easier 'out of the box', all metrics include a set of test API functions in order to facilitate unit testing. These include functions to test whether a value has been stored, and functions to retrieve the stored value for validation. For more information, please refer to [Unit testing Glean metrics](testing-metrics.md).

Просмотреть файл

@ -449,7 +449,7 @@ class GleanTest {
// Given the following block of code: // Given the following block of code:
// //
// Metric.A.set("SomeTestValue") // Metric.A.set("SomeTestValue")
// Glean.sendPings(listOf("custom-ping-1")) // Glean.submitPings(listOf("custom-ping-1"))
// //
// This test ensures that "custom-ping-1" contains "metric.a" with a value of "SomeTestValue" // This test ensures that "custom-ping-1" contains "metric.a" with a value of "SomeTestValue"
// when the ping is collected. // when the ping is collected.

Просмотреть файл

@ -2,11 +2,99 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this # License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
This module implements a single-threaded (mostly FIFO) work queue on which
most Glean work is done.
"""
import functools import functools
import logging
import queue
import threading
from typing import Callable, List, Tuple from typing import Callable, List, Tuple
# This module uses threading, rather than multiprocessing for parallelism. This
# is normally not recommended for Python due to the Global Interpreter Lock
# (GIL), however the usual problems with the GIL are lessened by the fact that:
#
# - Most long-running work and I/O is done in the Rust extension. The cffi
# library used to interface with Rust releases the GIL around every foreign
# call. See https://cffi.readthedocs.io/en/latest/ref.html#conversions
#
# - The other significant blocking I/O is in networking code, where the GIL
# is also released while waiting on network sockets.
#
# This approach greatly reduces complexity of the implementation. Using
# multiprocessing would imply going to a 100% IPC-like approach, since the
# Rust-side Glean objects could not be easily shared or message-passed across
# the process boundary, whereas sharing across threads works transparently.
log = logging.getLogger(__name__)
class _ThreadWorker:
"""
Manages a single worker to perform tasks in another thread.
"""
def __init__(self):
self._queue = queue.Queue()
# The worker thread is only started when work needs to be performed so
# that importing Glean alone does not start an unnecessary thread.
self._started = False
def add_task(self, sync: bool, task: Callable, *args, **kwargs):
"""
Add a task to the worker queue.
Args:
sync (bool): If `True`, block until the task is complete.
task (Callable): The task to run.
Additional arguments are passed to the task.
"""
if not self._started:
self._start_worker()
# If we are already on the worker thread, don't place the tasks in the
# queue, just run them now. This is required for synchronous testing
# mode, and also to run the tasks in the expected order.
if threading.get_ident() == self._ident:
task(*args, **kwargs)
else:
args = args or ()
kwargs = kwargs or {}
self._queue.put((task, args, kwargs))
if sync:
self._queue.join()
def _start_worker(self):
"""
Starts the worker thread.
"""
t = threading.Thread(target=self._worker)
# Start the thread in daemon mode.
t.daemon = True
t.start()
self._started = True
self._ident = t.ident
def _worker(self):
"""
Implements the worker thread. Takes tasks off of the queue and runs
them.
"""
while True:
task, args, kwargs = self._queue.get()
try:
task(*args, **kwargs)
except Exception:
log.exception("Glean error")
finally:
self._queue.task_done()
class Dispatcher: class Dispatcher:
# This value was chosen in order to allow several tasks to be queued for # This value was chosen in order to allow several tasks to be queued for
# execution but still be conservative of memory. This queue size is # execution but still be conservative of memory. This queue size is
@ -18,12 +106,18 @@ class Dispatcher:
# are run immediately # are run immediately
_queue_initial_tasks = True # type: bool _queue_initial_tasks = True # type: bool
# The task queue # The preinit task queue
_task_queue = [] # type: List[Tuple[Callable, tuple, dict]] _preinit_task_queue = [] # type: List[Tuple[Callable, tuple, dict]]
# The live task queue to run things in another thread
_task_worker = _ThreadWorker()
# The number of tasks that overflowed the queue # The number of tasks that overflowed the queue
_overflow_count = 0 # type: int _overflow_count = 0 # type: int
# When `True`, all tasks are run synchronously
_testing_mode = False # type: bool
@classmethod @classmethod
def reset(cls): def reset(cls):
""" """
@ -31,9 +125,13 @@ class Dispatcher:
queueing mode. queueing mode.
""" """
cls._queue_initial_tasks = True cls._queue_initial_tasks = True
cls._task_queue = [] cls._preinit_task_queue = []
cls._overflow_count = 0 cls._overflow_count = 0
@classmethod
def _execute_task(cls, func: Callable, *args, **kwargs):
cls._task_worker.add_task(cls._testing_mode, func, *args, **kwargs)
@classmethod @classmethod
def task(cls, func: Callable): def task(cls, func: Callable):
""" """
@ -48,11 +146,11 @@ class Dispatcher:
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
if cls._queue_initial_tasks: if cls._queue_initial_tasks:
if len(cls._task_queue) >= cls.MAX_QUEUE_SIZE: if len(cls._preinit_task_queue) >= cls.MAX_QUEUE_SIZE:
return return
cls._task_queue.append((func, args, kwargs)) cls._preinit_task_queue.append((func, args, kwargs))
else: else:
func(*args, **kwargs) cls._execute_task(func, *args, **kwargs)
return wrapper return wrapper
@ -82,16 +180,16 @@ class Dispatcher:
""" """
if cls._queue_initial_tasks: if cls._queue_initial_tasks:
if len(cls._task_queue) >= cls.MAX_QUEUE_SIZE: if len(cls._preinit_task_queue) >= cls.MAX_QUEUE_SIZE:
# This value ends up in the `preinit_tasks_overflow` metric, # This value ends up in the `preinit_tasks_overflow` metric,
# but we can't record directly there, because that would only # but we can't record directly there, because that would only
# add the recording to an already-overflowing task queue and # add the recording to an already-overflowing task queue and
# would be silently dropped. # would be silently dropped.
cls._overflow_count += 1 cls._overflow_count += 1
return return
cls._task_queue.append((func, (), {})) cls._preinit_task_queue.append((func, (), {}))
else: else:
func() cls._execute_task(func)
@classmethod @classmethod
def launch_at_front(cls, func: Callable): def launch_at_front(cls, func: Callable):
@ -102,7 +200,7 @@ class Dispatcher:
""" """
if cls._queue_initial_tasks: if cls._queue_initial_tasks:
cls._task_queue.insert(0, (func, (), {})) cls._preinit_task_queue.insert(0, (func, (), {}))
else: else:
func() func()
@ -123,9 +221,9 @@ class Dispatcher:
Stops queueing tasks and processes any tasks in the queue. Stops queueing tasks and processes any tasks in the queue.
""" """
cls.set_task_queueing(False) cls.set_task_queueing(False)
for (task, args, kwargs) in cls._task_queue: for (task, args, kwargs) in cls._preinit_task_queue:
task(*args, **kwargs) cls._execute_task(task, *args, **kwargs)
cls._task_queue.clear() cls._preinit_task_queue.clear()
if cls._overflow_count > 0: if cls._overflow_count > 0:
from ._builtins import metrics from ._builtins import metrics

Просмотреть файл

@ -34,6 +34,9 @@ def reset_glean(
global settings. global settings.
""" """
from glean import Glean from glean import Glean
from glean._dispatcher import Dispatcher
Dispatcher._testing_mode = True
data_dir = None # type: Optional[Path] data_dir = None # type: Optional[Path]
if not clear_stores: if not clear_stores:

Просмотреть файл

@ -3,6 +3,9 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
import threading
from glean._dispatcher import Dispatcher from glean._dispatcher import Dispatcher
@ -18,13 +21,13 @@ def test_launch_correctly_adds_tasks_to_queue_if_queue_tasks_is_true():
for i in range(3): for i in range(3):
update() update()
assert 3 == len(Dispatcher._task_queue) assert 3 == len(Dispatcher._preinit_task_queue)
assert 0 == thread_canary[0] assert 0 == thread_canary[0]
Dispatcher.flush_queued_initial_tasks() Dispatcher.flush_queued_initial_tasks()
assert 3 == thread_canary[0] assert 3 == thread_canary[0]
assert 0 == len(Dispatcher._task_queue) assert 0 == len(Dispatcher._preinit_task_queue)
def test_maximum_tasks(): def test_maximum_tasks():
@ -33,7 +36,7 @@ def test_maximum_tasks():
for i in range(Dispatcher.MAX_QUEUE_SIZE + 10): for i in range(Dispatcher.MAX_QUEUE_SIZE + 10):
Dispatcher.task(lambda: 0)() Dispatcher.task(lambda: 0)()
assert len(Dispatcher._task_queue) == Dispatcher.MAX_QUEUE_SIZE assert len(Dispatcher._preinit_task_queue) == Dispatcher.MAX_QUEUE_SIZE
def test_maximum_queue(): def test_maximum_queue():
@ -42,4 +45,101 @@ def test_maximum_queue():
for i in range(Dispatcher.MAX_QUEUE_SIZE + 10): for i in range(Dispatcher.MAX_QUEUE_SIZE + 10):
Dispatcher.launch(lambda: 0) Dispatcher.launch(lambda: 0)
assert len(Dispatcher._task_queue) == Dispatcher.MAX_QUEUE_SIZE assert len(Dispatcher._preinit_task_queue) == Dispatcher.MAX_QUEUE_SIZE
def test_tasks_run_off_the_main_thread():
main_thread_id = threading.get_ident()
thread_canary = [False]
def test_task():
assert main_thread_id != threading.get_ident()
assert False is thread_canary[0]
thread_canary[0] = True
Dispatcher.launch(test_task)
Dispatcher._task_worker._queue.join()
assert True is thread_canary[0]
def test_queue_tasks_are_flushed_off_the_main_thread():
main_thread_id = threading.get_ident()
Dispatcher._testing_mode = False
Dispatcher._queue_initial_tasks = False
thread_canary = [0]
def test_task():
assert main_thread_id != threading.get_ident()
thread_canary[0] += 1
Dispatcher._testing_mode = False
Dispatcher._queue_initial_tasks = True
for i in range(3):
Dispatcher.launch(test_task)
assert 3 == len(Dispatcher._preinit_task_queue)
assert 0 == thread_canary[0]
Dispatcher.flush_queued_initial_tasks()
Dispatcher._task_worker._queue.join()
assert 3 == thread_canary[0]
assert 0 == len(Dispatcher._preinit_task_queue)
def test_queued_tasks_are_executed_in_the_order_they_are_received():
main_thread_id = threading.get_ident()
Dispatcher._testing_mode = False
Dispatcher._queue_initial_tasks = True
class Job:
thread_counter = [0]
thread_list = []
def __init__(self, num):
self.num = num
def __lt__(self, other):
return id(self) < id(other)
def __call__(self):
assert main_thread_id != threading.get_ident()
self.thread_counter[0] += 1
self.thread_list.append(self.num)
for i in range(50):
Dispatcher.launch(Job(i))
Dispatcher.flush_queued_initial_tasks()
for i in range(50, 100):
Dispatcher.launch(Job(i))
Dispatcher._task_worker._queue.join()
assert Job.thread_list == list(range(100))
assert Job.thread_counter[0] == 100
def test_dispatched_tasks_throwing_exceptions_are_correctly_handled():
Dispatcher._testing_mode = False
Dispatcher._queue_initial_tasks = False
thread_canary = [0]
def exception_task():
42 / 0
Dispatcher.launch(exception_task)
def working_task():
thread_canary[0] += 1
for i in range(3):
Dispatcher.launch(working_task)
Dispatcher._task_worker._queue.join()
assert 3 == thread_canary[0]

Просмотреть файл

@ -19,7 +19,7 @@ from glean import __version__ as glean_version
from glean import _builtins from glean import _builtins
from glean import _util from glean import _util
from glean._dispatcher import Dispatcher from glean._dispatcher import Dispatcher
from glean.metrics import CounterMetricType, Lifetime, PingType from glean.metrics import CounterMetricType, Lifetime, PingType, StringMetricType
GLEAN_APP_ID = "glean-python-test" GLEAN_APP_ID = "glean-python-test"
@ -278,9 +278,70 @@ def test_get_language_reports_the_modern_translation_for_some_languages():
pass pass
@pytest.mark.skip def test_ping_collection_must_happen_after_currently_scheduled_metrics_recordings(
def test_ping_collection_must_happen_after_currently_scheduled_metrics_recordings(): ping_schema_url,
pass ):
# Given the following block of code:
#
# metrics.metric.a.set("SomeTestValue")
# Glean.submit_pings(["custom-ping-1"])
#
# This test ensures that "custom-ping-1" contains "metric.a" with a value of "SomeTestValue"
# when the ping is collected.
# safe_httpserver.serve_content(b"", code=200)
class TestUploader:
def do_upload(self, url_path, serialized_ping, configuration):
self.url_path = url_path
self.serialized_ping = serialized_ping
self.configuration = configuration
real_uploader = Glean._configuration.ping_uploader
test_uploader = TestUploader()
Glean._configuration.ping_uploader = test_uploader
Glean._configuration.log_pings = True
ping_name = "custom_ping_1"
ping = PingType(
name=ping_name, include_client_id=True, send_if_empty=False, reason_codes=[]
)
string_metric = StringMetricType(
disabled=False,
category="category",
lifetime=Lifetime.PING,
name="string_metric",
send_in_pings=[ping_name],
)
# This test relies on testing mode to be disabled, since we need to prove the
# real-world async behaviour of this.
Dispatcher._testing_mode = False
# This is the important part of the test. Even though both the metrics API and
# sendPings are async and off the main thread, "SomeTestValue" should be recorded,
# the order of the calls must be preserved.
test_value = "SomeTestValue"
string_metric.set(test_value)
ping.submit()
# Wait until the work is completea
Dispatcher._task_worker._queue.join()
assert ping_name == test_uploader.url_path.split("/")[3]
json_content = json.loads(test_uploader.serialized_ping)
assert 0 == validate_ping.validate_ping(
io.StringIO(test_uploader.serialized_ping),
sys.stdout,
schema_url=ping_schema_url,
)
assert {"category.string_metric": test_value} == json_content["metrics"]["string"]
Glean._configuration.ping_uploader = real_uploader
def test_basic_metrics_should_be_cleared_when_disabling_uploading(): def test_basic_metrics_should_be_cleared_when_disabling_uploading():
@ -384,7 +445,7 @@ def test_overflowing_the_task_queue_records_telemetry():
for i in range(110): for i in range(110):
Dispatcher.launch(lambda: None) Dispatcher.launch(lambda: None)
assert 100 == len(Dispatcher._task_queue) assert 100 == len(Dispatcher._preinit_task_queue)
assert 10 == Dispatcher._overflow_count assert 10 == Dispatcher._overflow_count
Dispatcher.flush_queued_initial_tasks() Dispatcher.flush_queued_initial_tasks()