Bug 1644364 - Move logic to limit max retries on recoverable failures to glean-core (#1120)

This commit is contained in:
Beatriz Rizental 2020-08-04 16:04:30 +02:00 коммит произвёл GitHub
Родитель 883804705b
Коммит 78bee73323
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 138 добавлений и 72 удалений

Просмотреть файл

@ -101,6 +101,7 @@ docstrings
dokka
dropdown
enqueue
enqueued
enum
envs
exe

Просмотреть файл

@ -3,12 +3,17 @@
[Full changelog](https://github.com/mozilla/glean/compare/v32.0.0...main)
* General
* Move logic to limit the number of retries on ping uploading "recoverable failures" to glean-core. ([#1120](https://github.com/mozilla/glean/pull/1120))
* The functionality to limit the number of retries in these cases was introduced to the Glean SDK in `v31.1.0`. The work done now was to move that logic to the glean-core in order to avoid code duplication throughout the language bindings.
* Update `glean_parser` to `v1.28.3`
* BUGFIX: Generate valid C# code when using Labeled metric types.
* BUGFIX: Support `HashSet` and `Dictionary` in the C# generated code.
* C#
* Add support for the String List metric type.
* Enable generating the C# APIs using the glean_parser.
* Python
* BUGFIX: Limit the number of retries for 5xx server errors on ping uploads. ([#1120](https://github.com/mozilla/glean/pull/1120))
* This kinds of failures yield a "recoverable error", which means the ping gets re-enqueued. That can cause infinite loops on the ping upload worker. For python we were incorrectly only limiting the number of retries for I/O errors, another type of "recoverable error".
# v32.0.0 (2020-08-03)

Просмотреть файл

@ -117,7 +117,15 @@ internal sealed class PingUploadTask {
object Wait : PingUploadTask()
/**
* A flag signaling that the pending pings queue is empty and requester is done.
* A flag signaling that requester doesn't need to request any more upload tasks at this moment.
*
* There are two possibilities for this scenario:
* * Pending pings queue is empty, no more pings to request;
* * Requester has reported more max recoverable upload failures on the same uploading_window[1]
* and should stop requesting at this moment.
*
* [1]: An "uploading window" starts when a requester gets a new `PingUploadTask::Upload(PingRequest)`
* response and finishes when they finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
*/
object Done : PingUploadTask()
}

Просмотреть файл

@ -19,7 +19,6 @@ import mozilla.telemetry.glean.Glean
import mozilla.telemetry.glean.net.FfiPingUploadTask
import mozilla.telemetry.glean.utils.testFlushWorkManagerJob
import mozilla.telemetry.glean.net.PingUploadTask
import mozilla.telemetry.glean.rust.Constants
/**
* Build the constraints around which the worker can be run, such as whether network
@ -53,8 +52,6 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont
companion object {
internal const val PING_WORKER_TAG = "mozac_service_glean_ping_upload_worker"
internal const val MAX_RETRIES = 3
/**
* Function to aid in properly enqueuing the worker in [WorkManager]
*
@ -98,11 +95,6 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont
*/
@Suppress("ReturnCount")
override fun doWork(): Result {
// We counte the number of failures,
// if we get more than three failures in upload we will stop this worker.
//
// This is a hack before a more robust mechanism is implemented in Rust.
var uploadFailures = 0
do {
// Create a slot of memory for the task: glean-core will write data into
// the allocated memory.
@ -121,17 +113,14 @@ class PingUploadWorker(context: Context, params: WorkerParameters) : Worker(cont
Glean.configuration
).toFfi()
if (result == Constants.UPLOAD_RESULT_RECOVERABLE) {
uploadFailures++
}
// Process the upload response
LibGleanFFI.INSTANCE.glean_process_ping_upload_response(incomingTask, result)
}
PingUploadTask.Wait -> return Result.retry()
PingUploadTask.Done -> return Result.success()
}
} while (uploadFailures < MAX_RETRIES)
return Result.failure()
} while (true)
// Limits are enforced by glean-core to avoid an inifinite loop here.
// Whenever a limit is reached, this binding will receive `PingUploadTask.Done` and step out.
}
}

Просмотреть файл

@ -22,9 +22,6 @@ namespace Mozilla.Glean.Net
// How many times to attempt waiting when told to by glean-core's upload API.
private const int MAX_WAIT_ATTEMPTS = 3;
// Maximum number of recoverable errors allowed before aborting the ping uploader.
private const int MAX_RETRIES = 3;
private readonly IPingUploader uploader;
/// <summary>
@ -100,9 +97,10 @@ namespace Mozilla.Glean.Net
// FOR TESTING Implement the upload worker here and call this from Glean.cs
int waitAttempts = 0;
int uploadFailures = 0;
while (uploadFailures < MAX_RETRIES)
// Limits are enforced by glean-core to avoid an inifinite loop here.
// Whenever a limit is reached, this binding will receive `UploadTaskTag.Done` and step out.
while (true)
{
FfiUploadTask incomingTask = new FfiUploadTask();
LibGleanFFI.glean_get_upload_task(ref incomingTask);
@ -123,11 +121,6 @@ namespace Mozilla.Glean.Net
// Delegate the actual upload and get its return value.
UploadResult result = Upload(path, body, headers, config);
if (result is RecoverableFailure)
{
uploadFailures += 1;
}
// Copy the `FfiUploadTask` to unmanaged memory, because
// `glean_process_ping_upload` assumes it has to free the memory.
IntPtr ptrCopy = Marshal.AllocHGlobal(Marshal.SizeOf(incomingTask));

Просмотреть файл

@ -18,9 +18,6 @@ public class HttpPingUploader {
static let recoverableErrorStatusCode: UInt16 = 500
// For this error, the ping data will be deleted and no retry happens
static let unrecoverableErrorStatusCode: UInt16 = 400
// Maximum number of recoverable errors allowed before aborting the ping uploader
static let maxRetries = 3
}
private let logger = Logger(tag: Constants.logTag)
@ -98,8 +95,9 @@ public class HttpPingUploader {
/// It will report back the task status to Glean, which will take care of deleting pending ping files.
/// It will continue upload as long as it can fetch new tasks.
func process() {
var uploadFailures = 0
while uploadFailures < Constants.maxRetries {
// Limits are enforced by glean-core to avoid an inifinite loop here.
// Whenever a limit is reached, this binding will receive `.done` and step out.
while true {
var incomingTask = FfiPingUploadTask()
glean_get_upload_task(&incomingTask)
let task = incomingTask.toPingUploadTask()
@ -107,9 +105,6 @@ public class HttpPingUploader {
switch task {
case let .upload(request):
self.upload(path: request.path, data: request.body, headers: request.headers) { result in
if case .recoverableFailure = result {
uploadFailures += 1
}
glean_process_ping_upload_response(&incomingTask, result.toFfi())
}
case .wait:

Просмотреть файл

@ -17,7 +17,6 @@ from .. import _ffi
from .._glean_ffi import ffi as ffi_support # type: ignore
from .._dispatcher import Dispatcher
from .._process_dispatcher import ProcessDispatcher
from .ping_uploader import RecoverableFailure
log = logging.getLogger(__name__)
@ -26,9 +25,6 @@ log = logging.getLogger(__name__)
# How many times to attempt waiting when told to by glean-core's upload API.
MAX_WAIT_ATTEMPTS = 3
# Maximum number of recoverable errors allowed before aborting the ping uploader
MAX_RETRIES = 3
class PingUploadWorker:
@classmethod
@ -120,9 +116,9 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool:
wait_attempts = 0
upload_failures = 0
while upload_failures < MAX_RETRIES:
# Limits are enforced by glean-core to avoid an inifinite loop here.
# Whenever a limit is reached, this binding will receive `UploadTaskTag.DONE` and step out.
while True:
incoming_task = ffi_support.new("FfiPingUploadTask *")
_ffi.lib.glean_get_upload_task(incoming_task)
@ -146,9 +142,6 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool:
url_path, body, _parse_ping_headers(headers, doc_id), configuration
)
if isinstance(upload_result, RecoverableFailure):
upload_failures = upload_failures + 1
# Process the response.
_ffi.lib.glean_process_ping_upload_response(
incoming_task, upload_result.to_ffi()
@ -161,9 +154,7 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool:
else:
return False
elif tag == UploadTaskTag.DONE:
break
return True
return True
__all__ = ["PingUploadWorker"]

Просмотреть файл

@ -81,11 +81,12 @@ def test_500_error_submit(safe_httpserver, monkeypatch):
ProcessDispatcher._wait_for_last_process()
# This kind of recoverable error will be tried 10 times
assert 10 == len(safe_httpserver.requests)
# The number of retries is defined on glean-core
assert 3 == len(safe_httpserver.requests)
metric = get_upload_failure_metric()
assert not metric["status_code_4xx"].test_has_value()
assert 10 == metric["status_code_5xx"].test_get_value()
assert 3 == metric["status_code_5xx"].test_get_value()
def test_500_error_submit_concurrent_writing(slow_httpserver, monkeypatch):
@ -113,12 +114,13 @@ def test_500_error_submit_concurrent_writing(slow_httpserver, monkeypatch):
counter.add()
times += 1
# This kind of recoverable error will be tried 10 times
assert 10 == len(slow_httpserver.requests)
# This kind of recoverable error will be tried 3 times
# The number of retries is defined on glean-core
assert 3 == len(slow_httpserver.requests)
metric = get_upload_failure_metric()
assert not metric["status_code_4xx"].test_has_value()
assert 10 == metric["status_code_5xx"].test_get_value()
assert 3 == metric["status_code_5xx"].test_get_value()
assert times > 0
assert times == counter.test_get_value()

Просмотреть файл

@ -490,7 +490,7 @@ impl Glean {
///
/// * `Wait` - which means the requester should ask again later;
/// * `Upload(PingRequest)` - which means there is a ping to upload. This wraps the actual request object;
/// * `Done` - which means there are no more pings queued right now.
/// * `Done` - which means requester should stop asking for now.
///
/// # Return value
///

Просмотреть файл

@ -11,7 +11,7 @@
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::thread;
use std::time::{Duration, Instant};
@ -26,6 +26,11 @@ mod directory;
mod request;
mod result;
/// The maximum recoverable failures allowed per uploading window.
///
/// Limiting this is necessary to avoid infinite loops on requesting upload tasks.
const MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW: u32 = 3;
// The maximum size in bytes a ping body may have to be eligible for upload.
const PING_BODY_MAX_SIZE: usize = 1024 * 1024; // 1 MB
@ -112,7 +117,16 @@ pub enum PingUploadTask {
/// A flag signaling that the pending pings directories are not done being processed,
/// thus the requester should wait and come back later.
Wait,
/// A flag signaling that the pending pings queue is empty and requester is done.
/// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
///
/// There are two possibilities for this scenario:
/// * Pending pings queue is empty, no more pings to request;
/// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
/// recoverable upload failures on the same uploading window[1]
/// and should stop requesting at this moment.
///
/// [1]: An "uploading window" starts when a requester gets a new `PingUploadTask::Upload(PingRequest)`
/// response and finishes when they finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
Done,
}
@ -127,6 +141,8 @@ pub struct PingUploadManager {
processed_pending_pings: Arc<AtomicBool>,
/// A vector to store the pending pings processed off-thread.
pending_pings: Arc<RwLock<Vec<PingPayload>>>,
/// The number of upload failures for the current uploading window.
recoverable_failure_count: AtomicU32,
/// A ping counter to help rate limit the ping uploads.
///
/// To keep resource usage in check,
@ -187,9 +203,10 @@ impl PingUploadManager {
Self {
queue,
directory_manager,
processed_pending_pings,
pending_pings,
directory_manager,
recoverable_failure_count: AtomicU32::new(0),
rate_limiter: None,
language_binding_name: language_binding_name.into(),
upload_metrics: UploadMetrics::new(),
@ -200,11 +217,12 @@ impl PingUploadManager {
self.processed_pending_pings.load(Ordering::SeqCst)
}
/// Checks if a ping with a certain `document_id` is already enqueued.
fn is_enqueued(queue: &VecDeque<PingRequest>, document_id: &str) -> bool {
queue
.iter()
.any(|request| request.document_id == document_id)
fn recoverable_failure_count(&self) -> u32 {
self.recoverable_failure_count.load(Ordering::SeqCst)
}
fn reset_recoverable_failure_count(&self) {
self.recoverable_failure_count.store(0, Ordering::SeqCst);
}
/// Attempts to build a ping request from a ping file payload.
@ -261,7 +279,10 @@ impl PingUploadManager {
.expect("Can't write to pending pings queue.");
// Checks if a ping with this `document_id` is already enqueued.
if Self::is_enqueued(&queue, &document_id) {
if queue
.iter()
.any(|request| request.document_id == document_id)
{
log::trace!(
"Attempted to enqueue a duplicate ping {} at {}.",
document_id,
@ -326,17 +347,7 @@ impl PingUploadManager {
queue
}
/// Gets the next `PingUploadTask`.
///
/// ## Arguments
///
/// * `glean` - The Glean object holding the database.
/// * `log_ping` - Whether to log the ping before returning.
///
/// # Return value
///
/// `PingUploadTask` - see [`PingUploadTask`](enum.PingUploadTask.html) for more information.
pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
if !self.has_processed_pings_dir() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
@ -352,6 +363,14 @@ impl PingUploadManager {
self.enqueue_ping(glean, &document_id, &path, &body, headers);
}
if self.recoverable_failure_count() >= MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW {
log::warn!(
"Reached maximum recoverable failures for the current uploading window. You are done."
);
return PingUploadTask::Done;
}
let mut queue = self
.queue
.write()
@ -393,6 +412,25 @@ impl PingUploadManager {
}
}
/// Gets the next `PingUploadTask`.
///
/// ## Arguments
///
/// * `glean` - The Glean object holding the database.
/// * `log_ping` - Whether to log the ping before returning.
///
/// # Return value
///
/// `PingUploadTask` - see [`PingUploadTask`](enum.PingUploadTask.html) for more information.
pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
let task = self.get_upload_task_internal(glean, log_ping);
if task == PingUploadTask::Done || task == PingUploadTask::Wait {
self.reset_recoverable_failure_count()
}
task
}
/// Processes the response from an attempt to upload a ping.
///
/// Based on the HTTP status of said response,
@ -466,6 +504,8 @@ impl PingUploadManager {
status
);
self.enqueue_ping_from_file(glean, &document_id);
self.recoverable_failure_count
.fetch_add(1, Ordering::SeqCst);
}
};
}
@ -1054,4 +1094,46 @@ mod test {
PingUploadTask::Done
);
}
#[test]
fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
let (mut glean, _) = new_glean(None);
// Wait for processing of pending pings directory to finish.
while glean.get_upload_task() == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
// Register a ping for testing
let ping_type = PingType::new("test", true, /* send_if_empty */ true, vec![]);
glean.register_ping_type(&ping_type);
// Submit the ping multiple times
let n = 5;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
// Return the max recoverable error failures in a row
for _ in 0..MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW {
match glean.get_upload_task() {
PingUploadTask::Upload(req) => {
glean.process_ping_upload_response(&req.document_id, RecoverableFailure)
}
_ => panic!("Expected upload manager to return the next request!"),
}
}
// Verify that after returning the max amount of recoverable failures,
// we are done even though we haven't gotten all the enqueued requests.
assert_eq!(glean.get_upload_task(), PingUploadTask::Done);
// Verify all requests are returned when we try again.
for _ in 0..n {
match glean.get_upload_task() {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}
}
}