diff --git a/CHANGELOG.md b/CHANGELOG.md index b03cdee6d..b395988bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * Python: * **Breaking Change**: The `glean.util` and `glean.hardware` modules, which were unintentionally public, have been made private. + * Most Glean work and I/O is now done on its own worker thread. This brings the parallelism Python in line with the other platforms. * The timing distribution, memory distribution, string list, labeled boolean and labeled string metric types are now supported in Python ([#762](https://github.com/mozilla/glean/pull/762), [#763](https://github.com/mozilla/glean/pull/763), [#765](https://github.com/mozilla/glean/pull/765), [#766](https://github.com/mozilla/glean/pull/766)) # v25.1.0 (2020-02-26) diff --git a/docs/user/adding-glean-to-your-project.md b/docs/user/adding-glean-to-your-project.md index 8406c387a..120c2423d 100644 --- a/docs/user/adding-glean-to-your-project.md +++ b/docs/user/adding-glean-to-your-project.md @@ -266,6 +266,10 @@ Please refer to the [custom pings documentation](pings/custom.md). > **Important**: as stated [before](adding-glean-to-your-project.md#before-using-glean), any new data collection requires documentation and data-review. > This is also required for any new metric automatically collected by the Glean SDK. +### Parallelism + +All of Glean's target languages use a separate worker thread to do most of Glean's work, including any I/O. This thread is fully managed by Glean as an implementation detail. Therefore, users should be free to use the Glean API wherever it is most convenient, without worrying about the performance impact of updating metrics and sending pings. + ### Testing metrics In order to make testing metrics easier 'out of the box', all metrics include a set of test API functions in order to facilitate unit testing. These include functions to test whether a value has been stored, and functions to retrieve the stored value for validation. For more information, please refer to [Unit testing Glean metrics](testing-metrics.md). diff --git a/glean-core/android/src/test/java/mozilla/telemetry/glean/GleanTest.kt b/glean-core/android/src/test/java/mozilla/telemetry/glean/GleanTest.kt index d48dd6d66..aff5ef2e3 100644 --- a/glean-core/android/src/test/java/mozilla/telemetry/glean/GleanTest.kt +++ b/glean-core/android/src/test/java/mozilla/telemetry/glean/GleanTest.kt @@ -449,7 +449,7 @@ class GleanTest { // Given the following block of code: // // Metric.A.set("SomeTestValue") - // Glean.sendPings(listOf("custom-ping-1")) + // Glean.submitPings(listOf("custom-ping-1")) // // This test ensures that "custom-ping-1" contains "metric.a" with a value of "SomeTestValue" // when the ping is collected. diff --git a/glean-core/python/glean/_dispatcher.py b/glean-core/python/glean/_dispatcher.py index 880787570..d045cb599 100644 --- a/glean-core/python/glean/_dispatcher.py +++ b/glean-core/python/glean/_dispatcher.py @@ -2,11 +2,99 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +""" +This module implements a single-threaded (mostly FIFO) work queue on which +most Glean work is done. +""" import functools +import logging +import queue +import threading from typing import Callable, List, Tuple +# This module uses threading, rather than multiprocessing for parallelism. This +# is normally not recommended for Python due to the Global Interpreter Lock +# (GIL), however the usual problems with the GIL are lessened by the fact that: +# +# - Most long-running work and I/O is done in the Rust extension. The cffi +# library used to interface with Rust releases the GIL around every foreign +# call. See https://cffi.readthedocs.io/en/latest/ref.html#conversions +# +# - The other significant blocking I/O is in networking code, where the GIL +# is also released while waiting on network sockets. +# +# This approach greatly reduces complexity of the implementation. Using +# multiprocessing would imply going to a 100% IPC-like approach, since the +# Rust-side Glean objects could not be easily shared or message-passed across +# the process boundary, whereas sharing across threads works transparently. + + +log = logging.getLogger(__name__) + + +class _ThreadWorker: + """ + Manages a single worker to perform tasks in another thread. + """ + + def __init__(self): + self._queue = queue.Queue() + # The worker thread is only started when work needs to be performed so + # that importing Glean alone does not start an unnecessary thread. + self._started = False + + def add_task(self, sync: bool, task: Callable, *args, **kwargs): + """ + Add a task to the worker queue. + + Args: + sync (bool): If `True`, block until the task is complete. + task (Callable): The task to run. + + Additional arguments are passed to the task. + """ + if not self._started: + self._start_worker() + # If we are already on the worker thread, don't place the tasks in the + # queue, just run them now. This is required for synchronous testing + # mode, and also to run the tasks in the expected order. + if threading.get_ident() == self._ident: + task(*args, **kwargs) + else: + args = args or () + kwargs = kwargs or {} + self._queue.put((task, args, kwargs)) + if sync: + self._queue.join() + + def _start_worker(self): + """ + Starts the worker thread. + """ + t = threading.Thread(target=self._worker) + # Start the thread in daemon mode. + t.daemon = True + t.start() + self._started = True + self._ident = t.ident + + def _worker(self): + """ + Implements the worker thread. Takes tasks off of the queue and runs + them. + """ + while True: + task, args, kwargs = self._queue.get() + try: + task(*args, **kwargs) + except Exception: + log.exception("Glean error") + finally: + self._queue.task_done() + + class Dispatcher: # This value was chosen in order to allow several tasks to be queued for # execution but still be conservative of memory. This queue size is @@ -18,12 +106,18 @@ class Dispatcher: # are run immediately _queue_initial_tasks = True # type: bool - # The task queue - _task_queue = [] # type: List[Tuple[Callable, tuple, dict]] + # The preinit task queue + _preinit_task_queue = [] # type: List[Tuple[Callable, tuple, dict]] + + # The live task queue to run things in another thread + _task_worker = _ThreadWorker() # The number of tasks that overflowed the queue _overflow_count = 0 # type: int + # When `True`, all tasks are run synchronously + _testing_mode = False # type: bool + @classmethod def reset(cls): """ @@ -31,9 +125,13 @@ class Dispatcher: queueing mode. """ cls._queue_initial_tasks = True - cls._task_queue = [] + cls._preinit_task_queue = [] cls._overflow_count = 0 + @classmethod + def _execute_task(cls, func: Callable, *args, **kwargs): + cls._task_worker.add_task(cls._testing_mode, func, *args, **kwargs) + @classmethod def task(cls, func: Callable): """ @@ -48,11 +146,11 @@ class Dispatcher: @functools.wraps(func) def wrapper(*args, **kwargs): if cls._queue_initial_tasks: - if len(cls._task_queue) >= cls.MAX_QUEUE_SIZE: + if len(cls._preinit_task_queue) >= cls.MAX_QUEUE_SIZE: return - cls._task_queue.append((func, args, kwargs)) + cls._preinit_task_queue.append((func, args, kwargs)) else: - func(*args, **kwargs) + cls._execute_task(func, *args, **kwargs) return wrapper @@ -82,16 +180,16 @@ class Dispatcher: """ if cls._queue_initial_tasks: - if len(cls._task_queue) >= cls.MAX_QUEUE_SIZE: + if len(cls._preinit_task_queue) >= cls.MAX_QUEUE_SIZE: # This value ends up in the `preinit_tasks_overflow` metric, # but we can't record directly there, because that would only # add the recording to an already-overflowing task queue and # would be silently dropped. cls._overflow_count += 1 return - cls._task_queue.append((func, (), {})) + cls._preinit_task_queue.append((func, (), {})) else: - func() + cls._execute_task(func) @classmethod def launch_at_front(cls, func: Callable): @@ -102,7 +200,7 @@ class Dispatcher: """ if cls._queue_initial_tasks: - cls._task_queue.insert(0, (func, (), {})) + cls._preinit_task_queue.insert(0, (func, (), {})) else: func() @@ -123,9 +221,9 @@ class Dispatcher: Stops queueing tasks and processes any tasks in the queue. """ cls.set_task_queueing(False) - for (task, args, kwargs) in cls._task_queue: - task(*args, **kwargs) - cls._task_queue.clear() + for (task, args, kwargs) in cls._preinit_task_queue: + cls._execute_task(task, *args, **kwargs) + cls._preinit_task_queue.clear() if cls._overflow_count > 0: from ._builtins import metrics diff --git a/glean-core/python/glean/testing/__init__.py b/glean-core/python/glean/testing/__init__.py index d62793d4d..57f638095 100644 --- a/glean-core/python/glean/testing/__init__.py +++ b/glean-core/python/glean/testing/__init__.py @@ -34,6 +34,9 @@ def reset_glean( global settings. """ from glean import Glean + from glean._dispatcher import Dispatcher + + Dispatcher._testing_mode = True data_dir = None # type: Optional[Path] if not clear_stores: diff --git a/glean-core/python/tests/test_dispatcher.py b/glean-core/python/tests/test_dispatcher.py index b464f74ef..c6e122f46 100644 --- a/glean-core/python/tests/test_dispatcher.py +++ b/glean-core/python/tests/test_dispatcher.py @@ -3,6 +3,9 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import threading + + from glean._dispatcher import Dispatcher @@ -18,13 +21,13 @@ def test_launch_correctly_adds_tasks_to_queue_if_queue_tasks_is_true(): for i in range(3): update() - assert 3 == len(Dispatcher._task_queue) + assert 3 == len(Dispatcher._preinit_task_queue) assert 0 == thread_canary[0] Dispatcher.flush_queued_initial_tasks() assert 3 == thread_canary[0] - assert 0 == len(Dispatcher._task_queue) + assert 0 == len(Dispatcher._preinit_task_queue) def test_maximum_tasks(): @@ -33,7 +36,7 @@ def test_maximum_tasks(): for i in range(Dispatcher.MAX_QUEUE_SIZE + 10): Dispatcher.task(lambda: 0)() - assert len(Dispatcher._task_queue) == Dispatcher.MAX_QUEUE_SIZE + assert len(Dispatcher._preinit_task_queue) == Dispatcher.MAX_QUEUE_SIZE def test_maximum_queue(): @@ -42,4 +45,101 @@ def test_maximum_queue(): for i in range(Dispatcher.MAX_QUEUE_SIZE + 10): Dispatcher.launch(lambda: 0) - assert len(Dispatcher._task_queue) == Dispatcher.MAX_QUEUE_SIZE + assert len(Dispatcher._preinit_task_queue) == Dispatcher.MAX_QUEUE_SIZE + + +def test_tasks_run_off_the_main_thread(): + main_thread_id = threading.get_ident() + thread_canary = [False] + + def test_task(): + assert main_thread_id != threading.get_ident() + assert False is thread_canary[0] + thread_canary[0] = True + + Dispatcher.launch(test_task) + Dispatcher._task_worker._queue.join() + + assert True is thread_canary[0] + + +def test_queue_tasks_are_flushed_off_the_main_thread(): + main_thread_id = threading.get_ident() + Dispatcher._testing_mode = False + Dispatcher._queue_initial_tasks = False + thread_canary = [0] + + def test_task(): + assert main_thread_id != threading.get_ident() + thread_canary[0] += 1 + + Dispatcher._testing_mode = False + Dispatcher._queue_initial_tasks = True + + for i in range(3): + Dispatcher.launch(test_task) + + assert 3 == len(Dispatcher._preinit_task_queue) + assert 0 == thread_canary[0] + + Dispatcher.flush_queued_initial_tasks() + + Dispatcher._task_worker._queue.join() + + assert 3 == thread_canary[0] + assert 0 == len(Dispatcher._preinit_task_queue) + + +def test_queued_tasks_are_executed_in_the_order_they_are_received(): + main_thread_id = threading.get_ident() + Dispatcher._testing_mode = False + Dispatcher._queue_initial_tasks = True + + class Job: + thread_counter = [0] + thread_list = [] + + def __init__(self, num): + self.num = num + + def __lt__(self, other): + return id(self) < id(other) + + def __call__(self): + assert main_thread_id != threading.get_ident() + self.thread_counter[0] += 1 + self.thread_list.append(self.num) + + for i in range(50): + Dispatcher.launch(Job(i)) + + Dispatcher.flush_queued_initial_tasks() + + for i in range(50, 100): + Dispatcher.launch(Job(i)) + + Dispatcher._task_worker._queue.join() + + assert Job.thread_list == list(range(100)) + assert Job.thread_counter[0] == 100 + + +def test_dispatched_tasks_throwing_exceptions_are_correctly_handled(): + Dispatcher._testing_mode = False + Dispatcher._queue_initial_tasks = False + thread_canary = [0] + + def exception_task(): + 42 / 0 + + Dispatcher.launch(exception_task) + + def working_task(): + thread_canary[0] += 1 + + for i in range(3): + Dispatcher.launch(working_task) + + Dispatcher._task_worker._queue.join() + + assert 3 == thread_canary[0] diff --git a/glean-core/python/tests/test_glean.py b/glean-core/python/tests/test_glean.py index af4d1bb47..3290dce3d 100644 --- a/glean-core/python/tests/test_glean.py +++ b/glean-core/python/tests/test_glean.py @@ -19,7 +19,7 @@ from glean import __version__ as glean_version from glean import _builtins from glean import _util from glean._dispatcher import Dispatcher -from glean.metrics import CounterMetricType, Lifetime, PingType +from glean.metrics import CounterMetricType, Lifetime, PingType, StringMetricType GLEAN_APP_ID = "glean-python-test" @@ -278,9 +278,70 @@ def test_get_language_reports_the_modern_translation_for_some_languages(): pass -@pytest.mark.skip -def test_ping_collection_must_happen_after_currently_scheduled_metrics_recordings(): - pass +def test_ping_collection_must_happen_after_currently_scheduled_metrics_recordings( + ping_schema_url, +): + # Given the following block of code: + # + # metrics.metric.a.set("SomeTestValue") + # Glean.submit_pings(["custom-ping-1"]) + # + # This test ensures that "custom-ping-1" contains "metric.a" with a value of "SomeTestValue" + # when the ping is collected. + + # safe_httpserver.serve_content(b"", code=200) + + class TestUploader: + def do_upload(self, url_path, serialized_ping, configuration): + self.url_path = url_path + self.serialized_ping = serialized_ping + self.configuration = configuration + + real_uploader = Glean._configuration.ping_uploader + test_uploader = TestUploader() + Glean._configuration.ping_uploader = test_uploader + + Glean._configuration.log_pings = True + + ping_name = "custom_ping_1" + ping = PingType( + name=ping_name, include_client_id=True, send_if_empty=False, reason_codes=[] + ) + string_metric = StringMetricType( + disabled=False, + category="category", + lifetime=Lifetime.PING, + name="string_metric", + send_in_pings=[ping_name], + ) + + # This test relies on testing mode to be disabled, since we need to prove the + # real-world async behaviour of this. + Dispatcher._testing_mode = False + + # This is the important part of the test. Even though both the metrics API and + # sendPings are async and off the main thread, "SomeTestValue" should be recorded, + # the order of the calls must be preserved. + test_value = "SomeTestValue" + string_metric.set(test_value) + ping.submit() + + # Wait until the work is completea + Dispatcher._task_worker._queue.join() + + assert ping_name == test_uploader.url_path.split("/")[3] + + json_content = json.loads(test_uploader.serialized_ping) + + assert 0 == validate_ping.validate_ping( + io.StringIO(test_uploader.serialized_ping), + sys.stdout, + schema_url=ping_schema_url, + ) + + assert {"category.string_metric": test_value} == json_content["metrics"]["string"] + + Glean._configuration.ping_uploader = real_uploader def test_basic_metrics_should_be_cleared_when_disabling_uploading(): @@ -384,7 +445,7 @@ def test_overflowing_the_task_queue_records_telemetry(): for i in range(110): Dispatcher.launch(lambda: None) - assert 100 == len(Dispatcher._task_queue) + assert 100 == len(Dispatcher._preinit_task_queue) assert 10 == Dispatcher._overflow_count Dispatcher.flush_queued_initial_tasks()