feat(queues): return futures instead of blocking for queue handling

This commit is contained in:
Phil Booth 2018-05-26 09:17:51 +01:00
Родитель 4b05fd3dc3
Коммит 6c154a04f8
7 изменённых файлов: 245 добавлений и 163 удалений

1
Cargo.lock сгенерированный
Просмотреть файл

@ -355,6 +355,7 @@ version = "0.1.0"
dependencies = [
"chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"md5 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",

Просмотреть файл

@ -5,6 +5,7 @@ version = "0.1.0"
[dependencies]
chrono = { version = "0.4.2", features = [ "serde" ] }
config = "0.8.0"
futures = "0.1.21"
hex = "0.3.2"
lazy_static = "1.0"
md5 = "0.3.7"

Просмотреть файл

@ -3,6 +3,7 @@
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use chrono::Utc;
use futures::future::{self, Future};
use super::{
notification::{
@ -23,8 +24,8 @@ impl<'s> Factory<'s> for Queue<'s> {
}
}
impl<'s> Incoming for Queue<'s> {
fn receive(&self) -> Result<Vec<Message>, QueueError> {
impl<'s> Incoming<'s> for Queue<'s> {
fn receive(&'s self) -> Box<Future<Item = Vec<Message>, Error = QueueError> + 's> {
let message = match self.id {
"incoming-bounce" => {
let mut bounce_message = Message::default();
@ -65,27 +66,48 @@ impl<'s> Incoming for Queue<'s> {
delivery_message
}
_ => return Err(QueueError::new(String::from("Not implemented"))),
"incoming-bounce-error" => {
let mut invalid_message = Message::default();
invalid_message.notification.notification_type = NotificationType::Bounce;
invalid_message.notification.complaint = Some(Complaint {
complained_recipients: vec![String::from(
"fxa-email-service.queues.mock.complaint@example.com",
)],
timestamp: Utc::now(),
complaint_feedback_type: None,
});
invalid_message
}
_ => {
return Box::new(future::err(QueueError::new(String::from(
"Not implemented",
))))
}
};
Ok(vec![message])
Box::new(future::ok(vec![message]))
}
fn delete(&self, _message: Message) -> Result<(), QueueError> {
fn delete(&'s self, _message: Message) -> Box<Future<Item = (), Error = QueueError> + 's> {
if self.id == "outgoing" {
Err(QueueError::new(String::from("Not implemented")))
Box::new(future::err(QueueError::new(String::from(
"Not implemented",
))))
} else {
Ok(())
Box::new(future::ok(()))
}
}
}
impl<'s> Outgoing for Queue<'s> {
fn send(&self, _body: &Notification) -> Result<String, QueueError> {
impl<'s> Outgoing<'s> for Queue<'s> {
fn send(&'s self, _body: &Notification) -> Box<Future<Item = String, Error = QueueError> + 's> {
if self.id == "outgoing" {
Ok(String::from("deadbeef"))
Box::new(future::ok(String::from("deadbeef")))
} else {
Err(QueueError::new(String::from("Not implemented")))
Box::new(future::err(QueueError::new(String::from(
"Not implemented",
))))
}
}
}

Просмотреть файл

@ -6,6 +6,8 @@ use std::{
boxed::Box, error::Error, fmt::{self, Debug, Display, Formatter},
};
use futures::future::{self, Future};
use self::notification::{Notification, NotificationType};
pub use self::sqs::Queue as Sqs;
use auth_db::{BounceSubtype, BounceType, Db, DbClient, DbError};
@ -19,20 +21,28 @@ mod test;
#[derive(Debug)]
pub struct Queues<'s> {
bounce: Box<Incoming + 's>,
complaint: Box<Incoming + 's>,
delivery: Box<Incoming + 's>,
notification: Box<Outgoing + 's>,
bounce: Box<Incoming<'s> + 's>,
complaint: Box<Incoming<'s> + 's>,
delivery: Box<Incoming<'s> + 's>,
notification: Box<Outgoing<'s> + 's>,
db: DbClient,
}
pub trait Incoming: Debug + Sync {
fn receive(&self) -> Result<Vec<Message>, QueueError>;
fn delete(&self, message: Message) -> Result<(), QueueError>;
// The return types for these traits are really ugly
// but I couldn't work out how to alias them because
// of the lifetime that's needed to make the boxing
// work. When trait aliases become a thing, we'll be
// able to alias the Future<...> part, see:
//
// * https://github.com/rust-lang/rfcs/pull/1733
// * https://github.com/rust-lang/rust/issues/41517
pub trait Incoming<'s>: Debug + Sync {
fn receive(&'s self) -> Box<Future<Item = Vec<Message>, Error = QueueError> + 's>;
fn delete(&'s self, message: Message) -> Box<Future<Item = (), Error = QueueError> + 's>;
}
pub trait Outgoing: Debug + Sync {
fn send(&self, body: &Notification) -> Result<String, QueueError>;
pub trait Outgoing<'s>: Debug + Sync {
fn send(&'s self, body: &Notification) -> Box<Future<Item = String, Error = QueueError> + 's>;
}
pub trait Factory<'s> {
@ -61,7 +71,7 @@ pub struct QueueIds<'s> {
impl<'s> Queues<'s> {
pub fn new<Q: 's>(ids: &QueueIds<'s>, settings: &Settings) -> Queues<'s>
where
Q: Incoming + Outgoing + Factory<'s>,
Q: Incoming<'s> + Outgoing<'s> + Factory<'s>,
{
Queues {
bounce: Box::new(Q::new(ids.bounce, settings)),
@ -72,80 +82,110 @@ impl<'s> Queues<'s> {
}
}
pub fn process(&self) -> Result<usize, QueueError> {
let mut count = self.process_queue(&self.bounce, &|notification: &Notification| {
if let Some(ref bounce) = notification.bounce {
for recipient in bounce.bounced_recipients.iter() {
self.db.create_bounce(
&recipient,
From::from(bounce.bounce_type),
From::from(bounce.bounce_subtype),
)?;
}
Ok(())
} else {
Err(QueueError::new(format!(
"Unexpected notification type in bounce queue: {:?}",
notification.notification_type
)))
}
})?;
pub fn process(&'s self) -> Box<Future<Item = usize, Error = QueueError> + 's> {
let joined_futures = self
.process_queue(&self.bounce)
.join3(
self.process_queue(&self.complaint),
self.process_queue(&self.delivery),
)
.map(|results| results.0 + results.1 + results.2);
count += self.process_queue(&self.complaint, &|notification: &Notification| {
if let Some(ref complaint) = notification.complaint {
for recipient in complaint.complained_recipients.iter() {
let bounce_subtype =
if let Some(complaint_type) = complaint.complaint_feedback_type {
From::from(complaint_type)
} else {
BounceSubtype::Unmapped
};
self.db
.create_bounce(&recipient, BounceType::Complaint, bounce_subtype)?;
}
Ok(())
} else {
Err(QueueError::new(format!(
"Unexpected notification type in complaint queue: {:?}",
notification.notification_type
)))
}
})?;
count += self.process_queue(&self.delivery, &|_notification| Ok(()))?;
Ok(count)
Box::new(joined_futures)
}
fn process_queue(
&self,
queue: &Box<Incoming + 's>,
handler: &Fn(&Notification) -> Result<(), QueueError>,
) -> Result<usize, QueueError> {
let messages = queue.receive()?;
let mut count = 0;
for message in messages.into_iter() {
if message.notification.notification_type != NotificationType::Null {
self.handle_notification(&message.notification, handler)?;
queue.delete(message)?;
count += 1;
}
}
Ok(count)
&'s self,
queue: &'s Box<Incoming<'s> + 's>,
) -> Box<Future<Item = usize, Error = QueueError> + 's> {
let future = queue
.receive()
.and_then(move |messages| {
let mut futures: Vec<
Box<Future<Item = (), Error = QueueError> + 's>,
> = Vec::new();
for message in messages.into_iter() {
if message.notification.notification_type != NotificationType::Null {
let future = self
.handle_notification(&message.notification)
.and_then(move |_| queue.delete(message));
futures.push(Box::new(future));
}
}
future::join_all(futures.into_iter())
})
.map(|results| results.len());
Box::new(future)
}
fn handle_notification(
&self,
&'s self,
notification: &Notification,
handler: &Fn(&Notification) -> Result<(), QueueError>,
) -> Result<(), QueueError> {
handler(&notification)?;
if let Err(error) = self.notification.send(&notification) {
// Errors sending to this queue are non-fatal because it's only used
// for logging. We still want to delete the message from the queue.
println!("{:?}", error);
) -> Box<Future<Item = (), Error = QueueError> + 's> {
let result = match notification.notification_type {
NotificationType::Bounce => self.record_bounce(notification),
NotificationType::Complaint => self.record_complaint(notification),
NotificationType::Delivery => Ok(()),
NotificationType::Null => {
Err(QueueError::new(String::from("Invalid notification type")))
}
};
match result {
Ok(_) => {
let future = self
.notification
.send(&notification)
.map(|id| {
println!("Sent message to notification queue, id=`{}`", id);
()
})
.or_else(|error| {
// Errors sending to this queue are non-fatal because it's only used
// for logging. We still want to delete the message from the queue.
println!("{:?}", error);
Ok(())
});
Box::new(future)
}
Err(error) => Box::new(future::err(error)),
}
}
fn record_bounce(&'s self, notification: &Notification) -> Result<(), QueueError> {
if let Some(ref bounce) = notification.bounce {
for recipient in bounce.bounced_recipients.iter() {
self.db.create_bounce(
&recipient,
From::from(bounce.bounce_type),
From::from(bounce.bounce_subtype),
)?;
}
Ok(())
} else {
Err(QueueError::new(String::from(
"Missing payload in bounce notification",
)))
}
}
fn record_complaint(&'s self, notification: &Notification) -> Result<(), QueueError> {
if let Some(ref complaint) = notification.complaint {
for recipient in complaint.complained_recipients.iter() {
let bounce_subtype = if let Some(complaint_type) = complaint.complaint_feedback_type
{
From::from(complaint_type)
} else {
BounceSubtype::Unmapped
};
self.db
.create_bounce(&recipient, BounceType::Complaint, bounce_subtype)?;
}
Ok(())
} else {
Err(QueueError::new(String::from(
"Missing payload in complaint notification",
)))
}
Ok(())
}
}

Просмотреть файл

@ -6,6 +6,7 @@ use std::{
boxed::Box, fmt::{self, Debug, Formatter},
};
use futures::future::{self, Future};
use md5;
use rusoto_core::{reactor::RequestDispatcher, Region};
use rusoto_credential::StaticProvider;
@ -104,13 +105,13 @@ impl<'s> Factory<'s> for Queue<'s> {
}
}
impl<'s> Incoming for Queue<'s> {
fn receive(&self) -> Result<Vec<Message>, QueueError> {
self.client
impl<'s> Incoming<'s> for Queue<'s> {
fn receive(&'s self) -> Box<Future<Item = Vec<Message>, Error = QueueError> + 's> {
let future = self
.client
.receive_message(&self.receive_request)
.sync()
.map(|result| result.messages.unwrap_or(Vec::new()))
.map(|messages| {
.map(move |messages| {
messages
.into_iter()
.map(|message| {
@ -124,35 +125,45 @@ impl<'s> Incoming for Queue<'s> {
})
.collect()
})
.map_err(From::from)
.map_err(From::from);
Box::new(future)
}
fn delete(&self, message: Message) -> Result<(), QueueError> {
fn delete(&'s self, message: Message) -> Box<Future<Item = (), Error = QueueError> + 's> {
let request = DeleteMessageRequest {
queue_url: self.url.to_string(),
receipt_handle: message.id,
};
self.client
.delete_message(&request)
.sync()
.map_err(|error| {
println!("Queue error deleting from {}: {:?}", self.url, error);
From::from(error)
})
let future = self.client.delete_message(&request).map_err(move |error| {
println!("Queue error deleting from {}: {:?}", self.url, error);
From::from(error)
});
Box::new(future)
}
}
impl<'s> Outgoing for Queue<'s> {
fn send(&self, notification: &Notification) -> Result<String, QueueError> {
impl<'s> Outgoing<'s> for Queue<'s> {
fn send(
&'s self,
notification: &Notification,
) -> Box<Future<Item = String, Error = QueueError> + 's> {
let mut request = SendMessageRequest::default();
request.message_body = serde_json::to_string(notification)?;
request.message_body = match serde_json::to_string(notification) {
Ok(body) => body,
Err(error) => return Box::new(future::err(From::from(error))),
};
request.queue_url = self.url.to_string();
self.client
let future = self
.client
.send_message(&request)
.sync()
.map(|result| result.message_id.map_or(String::from(""), |id| id.clone()))
.map_err(From::from)
.map_err(From::from);
Box::new(future)
}
}

Просмотреть файл

@ -12,62 +12,47 @@ use super::*;
//
// * They assert that no errors bubble out of the queues with normal inputs.
//
// * They ensure that errors are returned if the wrong notification type is
// found on a particular queue.
// * They assert that an error is returned if there is a mismatch between
// notification_type and the rest of the notification body.
//
// * They exercise most of the queue-handling code to make sure it runs with
// no panics.
#[test]
fn process() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
lazy_static! {
static ref SETTINGS: Settings = Settings::new().expect("config error");
static ref GOOD_QUEUE_IDS: QueueIds<'static> = QueueIds {
bounce: "incoming-bounce",
complaint: "incoming-complaint",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
static ref GOOD_QUEUES: Queues<'static> =
Queues::new::<mock::Queue>(&GOOD_QUEUE_IDS, &SETTINGS);
static ref BAD_BOUNCE_QUEUE_IDS: QueueIds<'static> = QueueIds {
bounce: "incoming-bounce-error",
complaint: "incoming-complaint",
delivery: "incoming-delivery",
notification: "outgoing",
};
static ref BAD_BOUNCE_QUEUE: Queues<'static> =
Queues::new::<mock::Queue>(&BAD_BOUNCE_QUEUE_IDS, &SETTINGS);
}
#[test]
fn process() {
match GOOD_QUEUES.process().wait() {
Ok(count) => assert_eq!(count, 3),
Err(error) => assert!(false, error.description().to_string()),
}
}
#[test]
fn process_complaint_on_bounce_queue() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
bounce: "incoming-complaint",
complaint: "incoming-complaint",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
fn process_error() {
match BAD_BOUNCE_QUEUE.process().wait() {
Ok(_) => assert!(false, "Queues::process should have failed"),
Err(error) => assert_eq!(
error.description(),
"Unexpected notification type in bounce queue: Complaint"
),
}
}
#[test]
fn process_bounce_on_complaint_queue() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
bounce: "incoming-bounce",
complaint: "incoming-bounce",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
Ok(_) => assert!(false, "Queues::process should have failed"),
Err(error) => assert_eq!(
error.description(),
"Unexpected notification type in complaint queue: Bounce"
"Missing payload in bounce notification",
),
}
}

Просмотреть файл

@ -8,6 +8,7 @@
extern crate chrono;
extern crate config;
extern crate futures;
extern crate hex;
#[macro_use]
extern crate lazy_static;
@ -30,26 +31,47 @@ mod queues;
mod settings;
mod validate;
use queues::{QueueIds, Queues, Sqs};
use futures::future::{self, Future, Loop};
use queues::{QueueError, QueueIds, Queues, Sqs};
use settings::Settings;
fn main() {
let settings = Settings::new().expect("config error");
let sqs_urls = match settings.aws.sqsurls {
Some(ref urls) => urls,
None => panic!("Missing config: aws.sqsurls.*"),
};
let queue_ids = QueueIds {
bounce: &sqs_urls.bounce,
complaint: &sqs_urls.complaint,
delivery: &sqs_urls.delivery,
notification: &sqs_urls.notification,
};
let queues = Queues::new::<Sqs>(&queue_ids, &settings);
loop {
match queues.process() {
Ok(count) => println!("Processed {} messages", count),
Err(error) => println!("{:?}", error),
lazy_static! {
static ref SETTINGS: Settings = Settings::new().expect("config error");
static ref QUEUE_IDS: QueueIds<'static> = {
let sqs_urls = match SETTINGS.aws.sqsurls {
Some(ref urls) => urls,
None => panic!("Missing config: aws.sqsurls.*"),
};
QueueIds {
bounce: &sqs_urls.bounce,
complaint: &sqs_urls.complaint,
delivery: &sqs_urls.delivery,
notification: &sqs_urls.notification,
}
}
};
static ref QUEUES: Queues<'static> = Queues::new::<Sqs>(&QUEUE_IDS, &SETTINGS);
}
type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = QueueError>>;
fn main() {
let process_queues: &Fn(usize) -> LoopResult = &|previous_count: usize| {
let future = QUEUES
.process()
.or_else(|error: QueueError| {
println!("{:?}", error);
future::ok(0)
})
.and_then(move |count: usize| {
let total_count = count + previous_count;
println!(
"Processed {} messages, total message count is now {}",
count, total_count
);
Ok(Loop::Continue(total_count))
});
Box::new(future)
};
future::loop_fn(0, process_queues).wait().unwrap();
}