diff --git a/.dictionary b/.dictionary index 305943a68..00706bf21 100644 --- a/.dictionary +++ b/.dictionary @@ -101,6 +101,7 @@ docstrings dokka dropdown enqueue +enqueued enum envs exe diff --git a/CHANGELOG.md b/CHANGELOG.md index d31badd0d..338a27d98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,12 +3,17 @@ [Full changelog](https://github.com/mozilla/glean/compare/v32.0.0...main) * General + * Move logic to limit the number of retries on ping uploading "recoverable failures" to glean-core. ([#1120](https://github.com/mozilla/glean/pull/1120)) + * The functionality to limit the number of retries in these cases was introduced to the Glean SDK in `v31.1.0`. The work done now was to move that logic to the glean-core in order to avoid code duplication throughout the language bindings. * Update `glean_parser` to `v1.28.3` * BUGFIX: Generate valid C# code when using Labeled metric types. * BUGFIX: Support `HashSet` and `Dictionary` in the C# generated code. * C# * Add support for the String List metric type. * Enable generating the C# APIs using the glean_parser. +* Python + * BUGFIX: Limit the number of retries for 5xx server errors on ping uploads. ([#1120](https://github.com/mozilla/glean/pull/1120)) + * This kinds of failures yield a "recoverable error", which means the ping gets re-enqueued. That can cause infinite loops on the ping upload worker. For python we were incorrectly only limiting the number of retries for I/O errors, another type of "recoverable error". # v32.0.0 (2020-08-03) diff --git a/glean-core/android/src/main/java/mozilla/telemetry/glean/net/Upload.kt b/glean-core/android/src/main/java/mozilla/telemetry/glean/net/Upload.kt index a5f352faa..ac6b1fc80 100644 --- a/glean-core/android/src/main/java/mozilla/telemetry/glean/net/Upload.kt +++ b/glean-core/android/src/main/java/mozilla/telemetry/glean/net/Upload.kt @@ -117,7 +117,15 @@ internal sealed class PingUploadTask { object Wait : PingUploadTask() /** - * A flag signaling that the pending pings queue is empty and requester is done. + * A flag signaling that requester doesn't need to request any more upload tasks at this moment. + * + * There are two possibilities for this scenario: + * * Pending pings queue is empty, no more pings to request; + * * Requester has reported more max recoverable upload failures on the same uploading_window[1] + * and should stop requesting at this moment. + * + * [1]: An "uploading window" starts when a requester gets a new `PingUploadTask::Upload(PingRequest)` + * response and finishes when they finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response. */ object Done : PingUploadTask() } diff --git a/glean-core/android/src/main/java/mozilla/telemetry/glean/scheduler/PingUploadWorker.kt b/glean-core/android/src/main/java/mozilla/telemetry/glean/scheduler/PingUploadWorker.kt index 501ea4566..389104a4b 100644 --- a/glean-core/android/src/main/java/mozilla/telemetry/glean/scheduler/PingUploadWorker.kt +++ b/glean-core/android/src/main/java/mozilla/telemetry/glean/scheduler/PingUploadWorker.kt @@ -19,7 +19,6 @@ import mozilla.telemetry.glean.Glean import mozilla.telemetry.glean.net.FfiPingUploadTask import mozilla.telemetry.glean.utils.testFlushWorkManagerJob import mozilla.telemetry.glean.net.PingUploadTask -import mozilla.telemetry.glean.rust.Constants /** * Build the constraints around which the worker can be run, such as whether network @@ -53,8 +52,6 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont companion object { internal const val PING_WORKER_TAG = "mozac_service_glean_ping_upload_worker" - internal const val MAX_RETRIES = 3 - /** * Function to aid in properly enqueuing the worker in [WorkManager] * @@ -98,11 +95,6 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont */ @Suppress("ReturnCount") override fun doWork(): Result { - // We counte the number of failures, - // if we get more than three failures in upload we will stop this worker. - // - // This is a hack before a more robust mechanism is implemented in Rust. - var uploadFailures = 0 do { // Create a slot of memory for the task: glean-core will write data into // the allocated memory. @@ -121,17 +113,14 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont Glean.configuration ).toFfi() - if (result == Constants.UPLOAD_RESULT_RECOVERABLE) { - uploadFailures++ - } - // Process the upload response LibGleanFFI.INSTANCE.glean_process_ping_upload_response(incomingTask, result) } PingUploadTask.Wait -> return Result.retry() PingUploadTask.Done -> return Result.success() } - } while (uploadFailures < MAX_RETRIES) - return Result.failure() + } while (true) + // Limits are enforced by glean-core to avoid an inifinite loop here. + // Whenever a limit is reached, this binding will receive `PingUploadTask.Done` and step out. } } diff --git a/glean-core/csharp/Glean/Net/BaseUploader.cs b/glean-core/csharp/Glean/Net/BaseUploader.cs index 2191e9c47..c7cc1985b 100644 --- a/glean-core/csharp/Glean/Net/BaseUploader.cs +++ b/glean-core/csharp/Glean/Net/BaseUploader.cs @@ -22,9 +22,6 @@ namespace Mozilla.Glean.Net // How many times to attempt waiting when told to by glean-core's upload API. private const int MAX_WAIT_ATTEMPTS = 3; - // Maximum number of recoverable errors allowed before aborting the ping uploader. - private const int MAX_RETRIES = 3; - private readonly IPingUploader uploader; /// @@ -100,9 +97,10 @@ namespace Mozilla.Glean.Net // FOR TESTING Implement the upload worker here and call this from Glean.cs int waitAttempts = 0; - int uploadFailures = 0; - while (uploadFailures < MAX_RETRIES) + // Limits are enforced by glean-core to avoid an inifinite loop here. + // Whenever a limit is reached, this binding will receive `UploadTaskTag.Done` and step out. + while (true) { FfiUploadTask incomingTask = new FfiUploadTask(); LibGleanFFI.glean_get_upload_task(ref incomingTask); @@ -123,11 +121,6 @@ namespace Mozilla.Glean.Net // Delegate the actual upload and get its return value. UploadResult result = Upload(path, body, headers, config); - if (result is RecoverableFailure) - { - uploadFailures += 1; - } - // Copy the `FfiUploadTask` to unmanaged memory, because // `glean_process_ping_upload` assumes it has to free the memory. IntPtr ptrCopy = Marshal.AllocHGlobal(Marshal.SizeOf(incomingTask)); diff --git a/glean-core/ios/Glean/Net/HttpPingUploader.swift b/glean-core/ios/Glean/Net/HttpPingUploader.swift index 5b22b6ee0..8213472c5 100644 --- a/glean-core/ios/Glean/Net/HttpPingUploader.swift +++ b/glean-core/ios/Glean/Net/HttpPingUploader.swift @@ -18,9 +18,6 @@ public class HttpPingUploader { static let recoverableErrorStatusCode: UInt16 = 500 // For this error, the ping data will be deleted and no retry happens static let unrecoverableErrorStatusCode: UInt16 = 400 - - // Maximum number of recoverable errors allowed before aborting the ping uploader - static let maxRetries = 3 } private let logger = Logger(tag: Constants.logTag) @@ -98,8 +95,9 @@ public class HttpPingUploader { /// It will report back the task status to Glean, which will take care of deleting pending ping files. /// It will continue upload as long as it can fetch new tasks. func process() { - var uploadFailures = 0 - while uploadFailures < Constants.maxRetries { + // Limits are enforced by glean-core to avoid an inifinite loop here. + // Whenever a limit is reached, this binding will receive `.done` and step out. + while true { var incomingTask = FfiPingUploadTask() glean_get_upload_task(&incomingTask) let task = incomingTask.toPingUploadTask() @@ -107,9 +105,6 @@ public class HttpPingUploader { switch task { case let .upload(request): self.upload(path: request.path, data: request.body, headers: request.headers) { result in - if case .recoverableFailure = result { - uploadFailures += 1 - } glean_process_ping_upload_response(&incomingTask, result.toFfi()) } case .wait: diff --git a/glean-core/python/glean/net/ping_upload_worker.py b/glean-core/python/glean/net/ping_upload_worker.py index 7a4022ba3..efdd5326a 100644 --- a/glean-core/python/glean/net/ping_upload_worker.py +++ b/glean-core/python/glean/net/ping_upload_worker.py @@ -17,7 +17,6 @@ from .. import _ffi from .._glean_ffi import ffi as ffi_support # type: ignore from .._dispatcher import Dispatcher from .._process_dispatcher import ProcessDispatcher -from .ping_uploader import RecoverableFailure log = logging.getLogger(__name__) @@ -26,9 +25,6 @@ log = logging.getLogger(__name__) # How many times to attempt waiting when told to by glean-core's upload API. MAX_WAIT_ATTEMPTS = 3 -# Maximum number of recoverable errors allowed before aborting the ping uploader -MAX_RETRIES = 3 - class PingUploadWorker: @classmethod @@ -120,9 +116,9 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool: wait_attempts = 0 - upload_failures = 0 - - while upload_failures < MAX_RETRIES: + # Limits are enforced by glean-core to avoid an inifinite loop here. + # Whenever a limit is reached, this binding will receive `UploadTaskTag.DONE` and step out. + while True: incoming_task = ffi_support.new("FfiPingUploadTask *") _ffi.lib.glean_get_upload_task(incoming_task) @@ -146,9 +142,6 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool: url_path, body, _parse_ping_headers(headers, doc_id), configuration ) - if isinstance(upload_result, RecoverableFailure): - upload_failures = upload_failures + 1 - # Process the response. _ffi.lib.glean_process_ping_upload_response( incoming_task, upload_result.to_ffi() @@ -161,9 +154,7 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool: else: return False elif tag == UploadTaskTag.DONE: - break - - return True + return True __all__ = ["PingUploadWorker"] diff --git a/glean-core/python/tests/test_network.py b/glean-core/python/tests/test_network.py index 349b831ac..cc6d29b7f 100644 --- a/glean-core/python/tests/test_network.py +++ b/glean-core/python/tests/test_network.py @@ -81,11 +81,12 @@ def test_500_error_submit(safe_httpserver, monkeypatch): ProcessDispatcher._wait_for_last_process() # This kind of recoverable error will be tried 10 times - assert 10 == len(safe_httpserver.requests) + # The number of retries is defined on glean-core + assert 3 == len(safe_httpserver.requests) metric = get_upload_failure_metric() assert not metric["status_code_4xx"].test_has_value() - assert 10 == metric["status_code_5xx"].test_get_value() + assert 3 == metric["status_code_5xx"].test_get_value() def test_500_error_submit_concurrent_writing(slow_httpserver, monkeypatch): @@ -113,12 +114,13 @@ def test_500_error_submit_concurrent_writing(slow_httpserver, monkeypatch): counter.add() times += 1 - # This kind of recoverable error will be tried 10 times - assert 10 == len(slow_httpserver.requests) + # This kind of recoverable error will be tried 3 times + # The number of retries is defined on glean-core + assert 3 == len(slow_httpserver.requests) metric = get_upload_failure_metric() assert not metric["status_code_4xx"].test_has_value() - assert 10 == metric["status_code_5xx"].test_get_value() + assert 3 == metric["status_code_5xx"].test_get_value() assert times > 0 assert times == counter.test_get_value() diff --git a/glean-core/src/lib.rs b/glean-core/src/lib.rs index 90363f993..557f06d76 100644 --- a/glean-core/src/lib.rs +++ b/glean-core/src/lib.rs @@ -490,7 +490,7 @@ impl Glean { /// /// * `Wait` - which means the requester should ask again later; /// * `Upload(PingRequest)` - which means there is a ping to upload. This wraps the actual request object; - /// * `Done` - which means there are no more pings queued right now. + /// * `Done` - which means requester should stop asking for now. /// /// # Return value /// diff --git a/glean-core/src/upload/mod.rs b/glean-core/src/upload/mod.rs index d224f2670..7fd2cecf0 100644 --- a/glean-core/src/upload/mod.rs +++ b/glean-core/src/upload/mod.rs @@ -11,7 +11,7 @@ use std::collections::VecDeque; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::thread; use std::time::{Duration, Instant}; @@ -26,6 +26,11 @@ mod directory; mod request; mod result; +/// The maximum recoverable failures allowed per uploading window. +/// +/// Limiting this is necessary to avoid infinite loops on requesting upload tasks. +const MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW: u32 = 3; + // The maximum size in bytes a ping body may have to be eligible for upload. const PING_BODY_MAX_SIZE: usize = 1024 * 1024; // 1 MB @@ -112,7 +117,16 @@ pub enum PingUploadTask { /// A flag signaling that the pending pings directories are not done being processed, /// thus the requester should wait and come back later. Wait, - /// A flag signaling that the pending pings queue is empty and requester is done. + /// A flag signaling that requester doesn't need to request any more upload tasks at this moment. + /// + /// There are two possibilities for this scenario: + /// * Pending pings queue is empty, no more pings to request; + /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW + /// recoverable upload failures on the same uploading window[1] + /// and should stop requesting at this moment. + /// + /// [1]: An "uploading window" starts when a requester gets a new `PingUploadTask::Upload(PingRequest)` + /// response and finishes when they finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response. Done, } @@ -127,6 +141,8 @@ pub struct PingUploadManager { processed_pending_pings: Arc, /// A vector to store the pending pings processed off-thread. pending_pings: Arc>>, + /// The number of upload failures for the current uploading window. + recoverable_failure_count: AtomicU32, /// A ping counter to help rate limit the ping uploads. /// /// To keep resource usage in check, @@ -187,9 +203,10 @@ impl PingUploadManager { Self { queue, + directory_manager, processed_pending_pings, pending_pings, - directory_manager, + recoverable_failure_count: AtomicU32::new(0), rate_limiter: None, language_binding_name: language_binding_name.into(), upload_metrics: UploadMetrics::new(), @@ -200,11 +217,12 @@ impl PingUploadManager { self.processed_pending_pings.load(Ordering::SeqCst) } - /// Checks if a ping with a certain `document_id` is already enqueued. - fn is_enqueued(queue: &VecDeque, document_id: &str) -> bool { - queue - .iter() - .any(|request| request.document_id == document_id) + fn recoverable_failure_count(&self) -> u32 { + self.recoverable_failure_count.load(Ordering::SeqCst) + } + + fn reset_recoverable_failure_count(&self) { + self.recoverable_failure_count.store(0, Ordering::SeqCst); } /// Attempts to build a ping request from a ping file payload. @@ -261,7 +279,10 @@ impl PingUploadManager { .expect("Can't write to pending pings queue."); // Checks if a ping with this `document_id` is already enqueued. - if Self::is_enqueued(&queue, &document_id) { + if queue + .iter() + .any(|request| request.document_id == document_id) + { log::trace!( "Attempted to enqueue a duplicate ping {} at {}.", document_id, @@ -326,17 +347,7 @@ impl PingUploadManager { queue } - /// Gets the next `PingUploadTask`. - /// - /// ## Arguments - /// - /// * `glean` - The Glean object holding the database. - /// * `log_ping` - Whether to log the ping before returning. - /// - /// # Return value - /// - /// `PingUploadTask` - see [`PingUploadTask`](enum.PingUploadTask.html) for more information. - pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask { + fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask { if !self.has_processed_pings_dir() { log::info!( "Tried getting an upload task, but processing is ongoing. Will come back later." @@ -352,6 +363,14 @@ impl PingUploadManager { self.enqueue_ping(glean, &document_id, &path, &body, headers); } + if self.recoverable_failure_count() >= MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW { + log::warn!( + "Reached maximum recoverable failures for the current uploading window. You are done." + ); + + return PingUploadTask::Done; + } + let mut queue = self .queue .write() @@ -393,6 +412,25 @@ impl PingUploadManager { } } + /// Gets the next `PingUploadTask`. + /// + /// ## Arguments + /// + /// * `glean` - The Glean object holding the database. + /// * `log_ping` - Whether to log the ping before returning. + /// + /// # Return value + /// + /// `PingUploadTask` - see [`PingUploadTask`](enum.PingUploadTask.html) for more information. + pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask { + let task = self.get_upload_task_internal(glean, log_ping); + if task == PingUploadTask::Done || task == PingUploadTask::Wait { + self.reset_recoverable_failure_count() + } + + task + } + /// Processes the response from an attempt to upload a ping. /// /// Based on the HTTP status of said response, @@ -466,6 +504,8 @@ impl PingUploadManager { status ); self.enqueue_ping_from_file(glean, &document_id); + self.recoverable_failure_count + .fetch_add(1, Ordering::SeqCst); } }; } @@ -1054,4 +1094,46 @@ mod test { PingUploadTask::Done ); } + + #[test] + fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() { + let (mut glean, _) = new_glean(None); + + // Wait for processing of pending pings directory to finish. + while glean.get_upload_task() == PingUploadTask::Wait { + thread::sleep(Duration::from_millis(10)); + } + + // Register a ping for testing + let ping_type = PingType::new("test", true, /* send_if_empty */ true, vec![]); + glean.register_ping_type(&ping_type); + + // Submit the ping multiple times + let n = 5; + for _ in 0..n { + glean.submit_ping(&ping_type, None).unwrap(); + } + + // Return the max recoverable error failures in a row + for _ in 0..MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW { + match glean.get_upload_task() { + PingUploadTask::Upload(req) => { + glean.process_ping_upload_response(&req.document_id, RecoverableFailure) + } + _ => panic!("Expected upload manager to return the next request!"), + } + } + + // Verify that after returning the max amount of recoverable failures, + // we are done even though we haven't gotten all the enqueued requests. + assert_eq!(glean.get_upload_task(), PingUploadTask::Done); + + // Verify all requests are returned when we try again. + for _ in 0..n { + match glean.get_upload_task() { + PingUploadTask::Upload(_) => {} + _ => panic!("Expected upload manager to return the next request!"), + } + } + } }