diff --git a/Cargo.lock b/Cargo.lock index 8020749..4f606ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,7 +41,7 @@ dependencies = [ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -140,6 +140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -352,9 +353,11 @@ dependencies = [ name = "fxa-email-service" version = "0.1.0" dependencies = [ + "chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "md5 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)", "rocket 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -363,6 +366,7 @@ dependencies = [ "rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_sqs 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "sendgrid 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", @@ -492,7 +496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -579,6 +583,11 @@ name = "matches" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "md5" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "memchr" version = "0.1.11" @@ -685,7 +694,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -866,7 +875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -964,7 +973,7 @@ name = "remove_dir_all" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1098,6 +1107,18 @@ dependencies = [ "xml-rs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rusoto_sqs" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "xml-rs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.8" @@ -1122,7 +1143,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1351,7 +1372,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1681,7 +1702,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "winapi" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1804,6 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376" +"checksum md5 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "daa1004633f76cdcd5a9d83ffcfe615e30ca7a2a638fcc8b8039a2dac21289d7" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" "checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d" @@ -1857,6 +1879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12daaa6d62d64f6447bf0299ce775f4e05f8e75e5418e817da094b9de04ad22d" "checksum rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "53199d09fd1b7d4f5ac50f4d23106577624238ea77cae2b44eb1d1fc4cd956a4" "checksum rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4645c94b099369602db7d814b1741f991b3d6821d8076b2ac67824b8cc130" +"checksum rusoto_sqs 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "995322c1530460a18fadd2f16761f22ba3f4e1fe630e344ca97e81d921abfb00" "checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649" "checksum rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a54aa04a10c68c1c4eacb4337fd883b435997ede17a9385784b990777686b09a" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" @@ -1927,7 +1950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" -"checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" +"checksum winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "773ef9dcc5f24b7d850d0ff101e542ff24c3b090a9768e03ff889fdef41f00fd" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 02472e2..d1e8132 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,11 @@ name = "fxa-email-service" version = "0.1.0" [dependencies] +chrono = { version = "0.4.2", features = [ "serde" ] } config = "0.8.0" hex = "0.3.2" lazy_static = "1.0" +md5 = "0.3.7" regex = "1.0" reqwest = "0.8.5" rocket = "0.3.12" @@ -14,6 +16,7 @@ rocket_contrib = "0.3.12" rusoto_core = "0.32.0" rusoto_credential = "0.11.0" rusoto_ses = "0.32.0" +rusoto_sqs = "0.32.0" sendgrid = "0.7.0" serde = "1.0" serde_derive = "1.0" diff --git a/README.md b/README.md index ad715b1..4f5aa6b 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ * [How do I run the tests?](#how-do-i-run-the-tests) * [How can I send an email via SES?](#how-can-i-send-an-email-via-ses) * [How can I send an email via Sendgrid?](#how-can-i-send-an-email-via-sendgrid) +* [How do bounce, complaint and delivery notifications work?](#how-do-bounce-complaint-and-delivery-notifications-work) ## What's this? @@ -135,14 +136,14 @@ You'll need to set up some config with your AWS credentials. That can be with environment variables: -* `FXA_EMAIL_SES_KEYS_ACCESS` -* `FXA_EMAIL_SES_KEYS_SECRET` +* `FXA_EMAIL_AWS_KEYS_ACCESS` +* `FXA_EMAIL_AWS_KEYS_SECRET` Or in `config/local.json`: ```json { - "ses": { + "aws": { "keys": { "access": "...", "secret": "..." @@ -223,3 +224,78 @@ curl \ If everything is set-up correctly, you should receive email pretty much instantly. + +## How do bounce, complaint and delivery notifications work? + +For consistency with the implementation in the auth server, +three separate SQS queues are monitored +for bounce, complaint and delivery notifications. +Ultimately we expect to simplify this +to a single queue for all three notification types. + +Messages on these queues +are JSON payloads of the format +described in the [AWS docs](https://docs.aws.amazon.com/ses/latest/DeveloperGuide/notification-contents.html) +and encoded in [`src/queues/notification/mod.rs`](src/queues/notification/mod.rs). + +When a message is received, +three things happen in sequence: + +1. For bounce and complaint notification types, + a bounce record is created in the auth db. + Errors are fatal at this point, + steps `2` and `3` will not occur + if the db returns an error. + +2. The message is forwarded to the auth server + via a fourth, outgoing queue. + An error here is not fatal. + +3. The message is deleted from the origin queue. + +Currently, both the incoming and outgoing queues +happen to be SQS queues but, +since that's an implementation detail, +the code makes some effort to separate them +behind abstract `Incoming` and `Outgoing` traits. +It's not a perfect abstraction +because the `Notification` type +was allowed to leak out of the `sqs` module +for the sake of expediency. +That can be easily addressed later, +if and when we come to rely on +a different queue implementation. + +The queue URLs and region +are set via config, +either using environment variables: + +* `FXA_EMAIL_AWS_REGION` +* `FXA_EMAIL_AWS_SQSURLS_BOUNCE` +* `FXA_EMAIL_AWS_SQSURLS_COMPLAINT` +* `FXA_EMAIL_AWS_SQSURLS_DELIVERY` +* `FXA_EMAIL_AWS_SQSURLS_NOTIFICATION` + +Or in `config/local.json`: + +```json +{ + "aws": { + "region": "...", + "sqsurls": { + "bounce": "...", + "complaint": "...", + "delivery": "...", + "notification": "..." + } + } +} +``` + +The queue-handling code runs in a different process +to the main email-sending service. +You can run it locally like so: + +``` +cargo r --bin queues +``` diff --git a/config/default.json b/config/default.json index 6c47347..ca283dd 100644 --- a/config/default.json +++ b/config/default.json @@ -2,6 +2,9 @@ "authdb": { "baseuri": "http://127.0.0.1:8000/" }, + "aws": { + "region": "us-east-1" + }, "bouncelimits": { "enabled": true, "complaint": [ @@ -21,9 +24,6 @@ "address": "accounts@firefox.com", "name": "Firefox Accounts" }, - "ses": { - "region": "us-east-1" - }, "smtp": { "host": "127.0.0.1", "port": 25 diff --git a/src/deserialize.rs b/src/deserialize.rs index 7ef4eac..344a114 100644 --- a/src/deserialize.rs +++ b/src/deserialize.rs @@ -82,6 +82,13 @@ where deserialize(deserializer, validate::sendgrid_api_key, "Sendgrid API key") } +pub fn sqs_url<'d, D>(deserializer: D) -> Result +where + D: Deserializer<'d>, +{ + deserialize(deserializer, validate::sqs_url, "SQS queue URL") +} + fn deserialize<'d, D>( deserializer: D, validator: fn(&str) -> bool, diff --git a/src/providers/ses.rs b/src/providers/ses.rs index cbf6de3..1a0ea9b 100644 --- a/src/providers/ses.rs +++ b/src/providers/ses.rs @@ -19,12 +19,12 @@ pub struct SesProvider { impl SesProvider { pub fn new(settings: &Settings) -> SesProvider { let region = settings - .ses + .aws .region .parse::() .expect("invalid region"); - let client: Box = if let Some(ref keys) = settings.ses.keys { + let client: Box = if let Some(ref keys) = settings.aws.keys { let creds = StaticProvider::new(keys.access.to_string(), keys.secret.to_string(), None, None); Box::new(SesClient::new(RequestDispatcher::default(), creds, region)) diff --git a/src/queues/mock.rs b/src/queues/mock.rs new file mode 100644 index 0000000..7f4c509 --- /dev/null +++ b/src/queues/mock.rs @@ -0,0 +1,109 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use chrono::Utc; + +use super::{ + notification::{ + Bounce, BounceSubtype, BounceType, BouncedRecipient, ComplainedRecipient, Complaint, + Delivery, Notification, NotificationType, + }, + Factory, Incoming, Message, Outgoing, QueueError, +}; +use settings::Settings; + +#[derive(Debug)] +pub struct Queue<'s> { + id: &'s str, +} + +impl<'s> Factory<'s> for Queue<'s> { + fn new(id: &'s str, _settings: &Settings) -> Queue<'s> { + Queue { id } + } +} + +impl<'s> Incoming for Queue<'s> { + fn receive(&self) -> Result, QueueError> { + let message = match self.id { + "incoming-bounce" => { + let mut bounce_message = Message::default(); + bounce_message.notification.notification_type = NotificationType::Bounce; + bounce_message.notification.bounce = Some(Bounce { + bounce_type: BounceType::Permanent, + bounce_subtype: BounceSubtype::General, + bounced_recipients: vec![BouncedRecipient { + email_address: String::from( + "fxa-email-service.queues.mock.bounce@example.com", + ), + action: None, + status: None, + diagnostic_code: None, + }], + timestamp: Utc::now(), + feedback_id: String::from(""), + remote_mta_ip: None, + reporting_mta: None, + }); + bounce_message + } + + "incoming-complaint" => { + let mut complaint_message = Message::default(); + complaint_message.notification.notification_type = NotificationType::Complaint; + complaint_message.notification.complaint = Some(Complaint { + complained_recipients: vec![ComplainedRecipient { + email_address: String::from( + "fxa-email-service.queues.mock.complaint@example.com", + ), + }], + timestamp: Utc::now(), + feedback_id: String::from(""), + user_agent: None, + complaint_feedback_type: None, + arrival_date: Utc::now(), + }); + complaint_message + } + + "incoming-delivery" => { + let mut delivery_message = Message::default(); + delivery_message.notification.notification_type = NotificationType::Delivery; + delivery_message.notification.delivery = Some(Delivery { + timestamp: Utc::now(), + processing_time_millis: 0, + recipients: vec![String::from( + "fxa-email-service.queues.mock.delivery@example.com", + )], + smtp_response: String::from(""), + remote_mta_ip: None, + reporting_mta: None, + }); + delivery_message + } + + _ => return Err(QueueError::new(String::from("Not implemented"))), + }; + + Ok(vec![message]) + } + + fn delete(&self, _message: Message) -> Result<(), QueueError> { + if self.id == "outgoing" { + Err(QueueError::new(String::from("Not implemented"))) + } else { + Ok(()) + } + } +} + +impl<'s> Outgoing for Queue<'s> { + fn send(&self, _body: &Notification) -> Result { + if self.id == "outgoing" { + Ok(String::from("deadbeef")) + } else { + Err(QueueError::new(String::from("Not implemented"))) + } + } +} diff --git a/src/queues/mod.rs b/src/queues/mod.rs new file mode 100644 index 0000000..5ef26ad --- /dev/null +++ b/src/queues/mod.rs @@ -0,0 +1,177 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use std::{ + boxed::Box, error::Error, fmt::{self, Debug, Display, Formatter}, +}; + +use self::notification::{Notification, NotificationType}; +pub use self::sqs::Queue as Sqs; +use auth_db::{BounceSubtype, BounceType, Db, DbClient, DbError}; +use settings::Settings; + +mod mock; +mod notification; +pub mod sqs; +#[cfg(test)] +mod test; + +#[derive(Debug)] +pub struct Queues<'s> { + bounce: Box, + complaint: Box, + delivery: Box, + notification: Box, + db: DbClient, +} + +pub trait Incoming: Debug + Sync { + fn receive(&self) -> Result, QueueError>; + fn delete(&self, message: Message) -> Result<(), QueueError>; +} + +pub trait Outgoing: Debug + Sync { + fn send(&self, body: &Notification) -> Result; +} + +pub trait Factory<'s> { + fn new(id: &'s str, settings: &Settings) -> Self; +} + +#[derive(Debug, Default)] +pub struct Message { + pub id: String, + pub notification: Notification, +} + +#[derive(Debug)] +pub struct QueueError { + description: String, +} + +#[derive(Debug)] +pub struct QueueIds<'s> { + pub bounce: &'s str, + pub complaint: &'s str, + pub delivery: &'s str, + pub notification: &'s str, +} + +impl<'s> Queues<'s> { + pub fn new(ids: &QueueIds<'s>, settings: &Settings) -> Queues<'s> + where + Q: Incoming + Outgoing + Factory<'s>, + { + Queues { + bounce: Box::new(Q::new(ids.bounce, settings)), + complaint: Box::new(Q::new(ids.complaint, settings)), + delivery: Box::new(Q::new(ids.delivery, settings)), + notification: Box::new(Q::new(ids.notification, settings)), + db: DbClient::new(settings), + } + } + + pub fn process(&self) -> Result { + let mut count = self.process_queue(&self.bounce, &|notification: &Notification| { + if let Some(ref bounce) = notification.bounce { + for recipient in bounce.bounced_recipients.iter() { + self.db.create_bounce( + &recipient.email_address, + From::from(bounce.bounce_type), + From::from(bounce.bounce_subtype), + )?; + } + Ok(()) + } else { + Err(QueueError::new(format!( + "Unexpected notification type in bounce queue: {:?}", + notification.notification_type + ))) + } + })?; + + count += self.process_queue(&self.complaint, &|notification: &Notification| { + if let Some(ref complaint) = notification.complaint { + for recipient in complaint.complained_recipients.iter() { + let bounce_subtype = + if let Some(complaint_type) = complaint.complaint_feedback_type { + From::from(complaint_type) + } else { + BounceSubtype::Unmapped + }; + self.db.create_bounce( + &recipient.email_address, + BounceType::Complaint, + bounce_subtype, + )?; + } + Ok(()) + } else { + Err(QueueError::new(format!( + "Unexpected notification type in complaint queue: {:?}", + notification.notification_type + ))) + } + })?; + + count += self.process_queue(&self.delivery, &|_notification| Ok(()))?; + + Ok(count) + } + + fn process_queue( + &self, + queue: &Box, + handler: &Fn(&Notification) -> Result<(), QueueError>, + ) -> Result { + let messages = queue.receive()?; + let mut count = 0; + for message in messages.into_iter() { + if message.notification.notification_type != NotificationType::Null { + self.handle_notification(&message.notification, handler)?; + queue.delete(message)?; + count += 1; + } + } + Ok(count) + } + + fn handle_notification( + &self, + notification: &Notification, + handler: &Fn(&Notification) -> Result<(), QueueError>, + ) -> Result<(), QueueError> { + handler(¬ification)?; + if let Err(error) = self.notification.send(¬ification) { + // Errors sending to this queue are non-fatal because it's only used + // for logging. We still want to delete the message from the queue. + println!("{:?}", error); + } + Ok(()) + } +} + +impl QueueError { + pub fn new(description: String) -> QueueError { + QueueError { description } + } +} + +impl Error for QueueError { + fn description(&self) -> &str { + &self.description + } +} + +impl Display for QueueError { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "{}", self.description) + } +} + +impl From for QueueError { + fn from(error: DbError) -> QueueError { + QueueError::new(format!("database error: {:?}", error)) + } +} diff --git a/src/queues/notification/mod.rs b/src/queues/notification/mod.rs new file mode 100644 index 0000000..33f030a --- /dev/null +++ b/src/queues/notification/mod.rs @@ -0,0 +1,362 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use std::fmt::{self, Display, Formatter}; + +use chrono::{DateTime, Utc}; +use serde::{ + de::{Deserialize, Deserializer, Error as DeserializeError, Unexpected}, + ser::{Error as SerializeError, Serialize, Serializer}, +}; + +use auth_db::{BounceSubtype as AuthDbBounceSubtype, BounceType as AuthDbBounceType}; + +#[cfg(test)] +mod test; + +// Warning, long vehicle! This module is a direct encoding +// of the SES notification format that's documented here: +// +// https://docs.aws.amazon.com/ses/latest/DeveloperGuide/notification-contents.html + +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct Notification { + #[serde(rename = "notificationType")] + pub notification_type: NotificationType, + pub mail: Mail, + #[serde(skip_serializing_if = "Option::is_none")] + pub bounce: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub complaint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub delivery: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum NotificationType { + Bounce, + Complaint, + Delivery, + Null, +} + +impl From for String { + fn from(notification_type: NotificationType) -> String { + String::from(match notification_type { + NotificationType::Bounce => "Bounce", + NotificationType::Complaint => "Complaint", + NotificationType::Delivery => "Delivery", + NotificationType::Null => "", + }) + } +} + +impl Display for NotificationType { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "{}", From::from(*self): String) + } +} + +impl Default for NotificationType { + fn default() -> NotificationType { + NotificationType::Null + } +} + +impl<'d> Deserialize<'d> for NotificationType { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + let value: String = Deserialize::deserialize(deserializer)?; + match value.as_str() { + "Bounce" => Ok(NotificationType::Bounce), + "Complaint" => Ok(NotificationType::Complaint), + "Delivery" => Ok(NotificationType::Delivery), + _ => Err(D::Error::invalid_value( + Unexpected::Str(&value), + &"SES notificiation type", + )), + } + } +} + +impl Serialize for NotificationType { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let string: String = From::from(*self); + if string == "" { + Err(S::Error::custom("notification type not set")) + } else { + serializer.serialize_str(&string) + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Mail { + timestamp: DateTime, + #[serde(rename = "messageId")] + message_id: String, + source: String, + #[serde(rename = "sourceArn")] + source_arn: String, + #[serde(rename = "sourceIp")] + source_ip: String, + #[serde(rename = "sendingAccountId")] + sending_account_id: String, + destination: Vec, + #[serde(rename = "headersTruncated")] + headers_truncated: Option, + headers: Option>, + #[serde(rename = "commonHeaders")] + common_headers: Option>, +} + +impl Default for Mail { + fn default() -> Mail { + Mail { + timestamp: Utc::now(), + message_id: String::from(""), + source: String::from(""), + source_arn: String::from(""), + source_ip: String::from(""), + sending_account_id: String::from(""), + destination: Vec::new(), + headers_truncated: None, + headers: None, + common_headers: None, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Header { + name: String, + value: HeaderValue, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum HeaderValue { + Single(String), + Multiple(Vec), +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Bounce { + #[serde(rename = "bounceType")] + pub bounce_type: BounceType, + #[serde(rename = "bounceSubType")] + pub bounce_subtype: BounceSubtype, + pub bounced_recipients: Vec, + pub timestamp: DateTime, + #[serde(rename = "feedbackId")] + pub feedback_id: String, + #[serde(rename = "remoteMtaIp")] + pub remote_mta_ip: Option, + #[serde(rename = "reportingMTA")] + pub reporting_mta: Option, +} + +#[derive(Copy, Clone, Debug, PartialEq, Serialize)] +pub enum BounceType { + Undetermined, + Permanent, + Transient, +} + +impl From for AuthDbBounceType { + fn from(bounce_type: BounceType) -> AuthDbBounceType { + match bounce_type { + BounceType::Undetermined => { + println!("Mapped SesBounceType::Undetermined to AuthDbBounceType::Soft"); + AuthDbBounceType::Soft + } + BounceType::Permanent => AuthDbBounceType::Hard, + BounceType::Transient => AuthDbBounceType::Soft, + } + } +} + +impl<'d> Deserialize<'d> for BounceType { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + let value: String = Deserialize::deserialize(deserializer)?; + match value.as_str() { + "Undetermined" => Ok(BounceType::Undetermined), + "Permanent" => Ok(BounceType::Permanent), + "Transient" => Ok(BounceType::Transient), + _ => { + println!( + "Mapped unrecognised SES bounceType `{}` to BounceType::Undetermined", + value.as_str() + ); + Ok(BounceType::Undetermined) + } + } + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Serialize)] +pub enum BounceSubtype { + Undetermined, + General, + NoEmail, + Suppressed, + MailboxFull, + MessageTooLarge, + ContentRejected, + AttachmentRejected, +} + +impl From for AuthDbBounceSubtype { + fn from(bounce_subtype: BounceSubtype) -> AuthDbBounceSubtype { + match bounce_subtype { + BounceSubtype::Undetermined => AuthDbBounceSubtype::Undetermined, + BounceSubtype::General => AuthDbBounceSubtype::General, + BounceSubtype::NoEmail => AuthDbBounceSubtype::NoEmail, + BounceSubtype::Suppressed => AuthDbBounceSubtype::Suppressed, + BounceSubtype::MailboxFull => AuthDbBounceSubtype::MailboxFull, + BounceSubtype::MessageTooLarge => AuthDbBounceSubtype::MessageTooLarge, + BounceSubtype::ContentRejected => AuthDbBounceSubtype::ContentRejected, + BounceSubtype::AttachmentRejected => AuthDbBounceSubtype::AttachmentRejected, + } + } +} + +impl<'d> Deserialize<'d> for BounceSubtype { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + let value: String = Deserialize::deserialize(deserializer)?; + match value.as_str() { + "Undetermined" => Ok(BounceSubtype::Undetermined), + "General" => Ok(BounceSubtype::General), + "NoEmail" => Ok(BounceSubtype::NoEmail), + "Suppressed" => Ok(BounceSubtype::Suppressed), + "MailboxFull" => Ok(BounceSubtype::MailboxFull), + "MessageTooLarge" => Ok(BounceSubtype::MessageTooLarge), + "ContentRejected" => Ok(BounceSubtype::ContentRejected), + "AttachmentRejected" => Ok(BounceSubtype::AttachmentRejected), + _ => { + println!( + "Mapped unrecognised SES bounceSubType `{}` to BounceSubtype::Undetermined", + value.as_str() + ); + Ok(BounceSubtype::Undetermined) + } + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct BouncedRecipient { + #[serde(rename = "emailAddress")] + pub email_address: String, + pub action: Option, + pub status: Option, + #[serde(rename = "diagnosticCode")] + pub diagnostic_code: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Complaint { + #[serde(rename = "complainedRecipients")] + pub complained_recipients: Vec, + pub timestamp: DateTime, + #[serde(rename = "feedbackId")] + pub feedback_id: String, + #[serde(rename = "userAgent")] + pub user_agent: Option, + #[serde(rename = "complaintFeedbackType")] + pub complaint_feedback_type: Option, + #[serde(rename = "arrivalDate")] + pub arrival_date: DateTime, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ComplainedRecipient { + #[serde(rename = "emailAddress")] + pub email_address: String, +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum ComplaintFeedbackType { + Abuse, + AuthFailure, + Fraud, + NotSpam, + Other, + Virus, +} + +impl From for AuthDbBounceSubtype { + fn from(complaint_feedback_type: ComplaintFeedbackType) -> AuthDbBounceSubtype { + match complaint_feedback_type { + ComplaintFeedbackType::Abuse => AuthDbBounceSubtype::Abuse, + ComplaintFeedbackType::AuthFailure => AuthDbBounceSubtype::AuthFailure, + ComplaintFeedbackType::Fraud => AuthDbBounceSubtype::Fraud, + ComplaintFeedbackType::NotSpam => AuthDbBounceSubtype::NotSpam, + ComplaintFeedbackType::Other => AuthDbBounceSubtype::Other, + ComplaintFeedbackType::Virus => AuthDbBounceSubtype::Virus, + } + } +} + +impl<'d> Deserialize<'d> for ComplaintFeedbackType { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'d>, + { + let value: String = Deserialize::deserialize(deserializer)?; + match value.as_str() { + "abuse" => Ok(ComplaintFeedbackType::Abuse), + "auth-failure" => Ok(ComplaintFeedbackType::AuthFailure), + "fraud" => Ok(ComplaintFeedbackType::Fraud), + "not-spam" => Ok(ComplaintFeedbackType::NotSpam), + "other" => Ok(ComplaintFeedbackType::Other), + "virus" => Ok(ComplaintFeedbackType::Virus), + _ => { + println!("Mapped unrecognised SES complaintFeedbackType `{}` to ComplaintFeedbackType::Other", value.as_str()); + Ok(ComplaintFeedbackType::Other) + } + } + } +} + +impl Serialize for ComplaintFeedbackType { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let value = match self { + ComplaintFeedbackType::Abuse => "abuse", + ComplaintFeedbackType::AuthFailure => "auth-failure", + ComplaintFeedbackType::Fraud => "fraud", + ComplaintFeedbackType::NotSpam => "not-spam", + ComplaintFeedbackType::Other => "other", + ComplaintFeedbackType::Virus => "virus", + }; + serializer.serialize_str(value) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Delivery { + pub timestamp: DateTime, + #[serde(rename = "processingTimeMillis")] + pub processing_time_millis: u32, + pub recipients: Vec, + #[serde(rename = "smtpResponse")] + pub smtp_response: String, + #[serde(rename = "remoteMtaIp")] + pub remote_mta_ip: Option, + #[serde(rename = "reportingMTA")] + pub reporting_mta: Option, +} diff --git a/src/queues/notification/test.rs b/src/queues/notification/test.rs new file mode 100644 index 0000000..eaac987 --- /dev/null +++ b/src/queues/notification/test.rs @@ -0,0 +1,245 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use std::error::Error; + +use serde_json; + +use super::*; + +#[test] +fn deserialize_notification_type() { + let notification_type: NotificationType = + serde_json::from_value(From::from("Bounce")).expect("JSON error"); + assert_eq!(notification_type, NotificationType::Bounce); + let notification_type: NotificationType = + serde_json::from_value(From::from("Complaint")).expect("JSON error"); + assert_eq!(notification_type, NotificationType::Complaint); + let notification_type: NotificationType = + serde_json::from_value(From::from("Delivery")).expect("JSON error"); + assert_eq!(notification_type, NotificationType::Delivery); +} + +#[test] +fn deserialize_invalid_notification_type() { + match serde_json::from_value::(From::from("bounce")) { + Ok(_) => assert!(false, "serde_json::from_value should have failed"), + Err(error) => assert_eq!(error.description(), "JSON error"), + } + match serde_json::from_value::(From::from("Bouncex")) { + Ok(_) => assert!(false, "serde_json::from_value should have failed"), + Err(error) => assert_eq!(error.description(), "JSON error"), + } + match serde_json::from_value::(From::from("BBounce")) { + Ok(_) => assert!(false, "serde_json::from_value should have failed"), + Err(error) => assert_eq!(error.description(), "JSON error"), + } +} + +#[test] +fn serialize_notification_type() { + let json = serde_json::to_string(&NotificationType::Bounce).expect("JSON error"); + assert_eq!(json, "\"Bounce\""); + let json = serde_json::to_string(&NotificationType::Complaint).expect("JSON error"); + assert_eq!(json, "\"Complaint\""); + let json = serde_json::to_string(&NotificationType::Delivery).expect("JSON error"); + assert_eq!(json, "\"Delivery\""); +} + +#[test] +fn serialize_null_notification_type() { + match serde_json::to_string(&NotificationType::Null) { + Ok(_) => assert!(false, "serde_json::to_string should have failed"), + Err(error) => assert_eq!(error.description(), "JSON error"), + } +} + +#[test] +fn bounce_type_to_auth_db() { + let db_bounce_type: AuthDbBounceType = From::from(BounceType::Undetermined); + assert_eq!(db_bounce_type, AuthDbBounceType::Soft); + let db_bounce_type: AuthDbBounceType = From::from(BounceType::Permanent); + assert_eq!(db_bounce_type, AuthDbBounceType::Hard); + let db_bounce_type: AuthDbBounceType = From::from(BounceType::Transient); + assert_eq!(db_bounce_type, AuthDbBounceType::Soft); +} + +#[test] +fn deserialize_bounce_type() { + let bounce_type: BounceType = + serde_json::from_value(From::from("Undetermined")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Undetermined); + let bounce_type: BounceType = + serde_json::from_value(From::from("Permanent")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Permanent); + let bounce_type: BounceType = + serde_json::from_value(From::from("Transient")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Transient); + let bounce_type: BounceType = serde_json::from_value(From::from("wibble")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Undetermined); + let bounce_type: BounceType = + serde_json::from_value(From::from("Permanentx")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Undetermined); + let bounce_type: BounceType = + serde_json::from_value(From::from("PPermanent")).expect("JSON error"); + assert_eq!(bounce_type, BounceType::Undetermined); +} + +#[test] +fn serialize_bounce_type() { + let json = serde_json::to_string(&BounceType::Undetermined).expect("JSON error"); + assert_eq!(json, "\"Undetermined\""); + let json = serde_json::to_string(&BounceType::Permanent).expect("JSON error"); + assert_eq!(json, "\"Permanent\""); + let json = serde_json::to_string(&BounceType::Transient).expect("JSON error"); + assert_eq!(json, "\"Transient\""); +} + +#[test] +fn bounce_subtype_to_auth_db() { + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::Undetermined); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::Undetermined); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::General); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::General); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::NoEmail); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::NoEmail); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::Suppressed); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::Suppressed); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::MailboxFull); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::MailboxFull); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::MessageTooLarge); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::MessageTooLarge); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::ContentRejected); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::ContentRejected); + let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::AttachmentRejected); + assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::AttachmentRejected); +} + +#[test] +fn deserialize_bounce_subtype() { + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("Undetermined")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Undetermined); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("General")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::General); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("NoEmail")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::NoEmail); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("Suppressed")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Suppressed); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("MailboxFull")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::MailboxFull); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("MessageTooLarge")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::MessageTooLarge); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("ContentRejected")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::ContentRejected); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("AttachmentRejected")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::AttachmentRejected); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("wibble")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Undetermined); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("undetermined")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Undetermined); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("Undeterminedd")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Undetermined); + let bounce_subtype: BounceSubtype = + serde_json::from_value(From::from("UUndetermined")).expect("JSON error"); + assert_eq!(bounce_subtype, BounceSubtype::Undetermined); +} + +#[test] +fn serialize_bounce_subtype() { + let json = serde_json::to_string(&BounceSubtype::Undetermined).expect("JSON error"); + assert_eq!(json, "\"Undetermined\""); + let json = serde_json::to_string(&BounceSubtype::General).expect("JSON error"); + assert_eq!(json, "\"General\""); + let json = serde_json::to_string(&BounceSubtype::NoEmail).expect("JSON error"); + assert_eq!(json, "\"NoEmail\""); + let json = serde_json::to_string(&BounceSubtype::Suppressed).expect("JSON error"); + assert_eq!(json, "\"Suppressed\""); + let json = serde_json::to_string(&BounceSubtype::MailboxFull).expect("JSON error"); + assert_eq!(json, "\"MailboxFull\""); + let json = serde_json::to_string(&BounceSubtype::MessageTooLarge).expect("JSON error"); + assert_eq!(json, "\"MessageTooLarge\""); + let json = serde_json::to_string(&BounceSubtype::ContentRejected).expect("JSON error"); + assert_eq!(json, "\"ContentRejected\""); + let json = serde_json::to_string(&BounceSubtype::AttachmentRejected).expect("JSON error"); + assert_eq!(json, "\"AttachmentRejected\""); +} + +#[test] +fn complaint_feedback_type_to_auth_db() { + let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Abuse); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Abuse); + let db_complaint_feedback_type: AuthDbBounceSubtype = + From::from(ComplaintFeedbackType::AuthFailure); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::AuthFailure); + let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Fraud); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Fraud); + let db_complaint_feedback_type: AuthDbBounceSubtype = + From::from(ComplaintFeedbackType::NotSpam); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::NotSpam); + let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Other); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Other); + let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Virus); + assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Virus); +} + +#[test] +fn deserialize_complaint_feedback_type() { + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("abuse")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Abuse); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("auth-failure")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::AuthFailure); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("fraud")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Fraud); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("not-spam")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::NotSpam); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("other")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("virus")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Virus); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("wibble")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("Abuse")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("abusee")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other); + let complaint_feedback_type: ComplaintFeedbackType = + serde_json::from_value(From::from("aabuse")).expect("JSON error"); + assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other); +} + +#[test] +fn serialize_complaint_feedback_type() { + let json = serde_json::to_string(&ComplaintFeedbackType::Abuse).expect("JSON error"); + assert_eq!(json, "\"abuse\""); + let json = serde_json::to_string(&ComplaintFeedbackType::AuthFailure).expect("JSON error"); + assert_eq!(json, "\"auth-failure\""); + let json = serde_json::to_string(&ComplaintFeedbackType::Fraud).expect("JSON error"); + assert_eq!(json, "\"fraud\""); + let json = serde_json::to_string(&ComplaintFeedbackType::NotSpam).expect("JSON error"); + assert_eq!(json, "\"not-spam\""); + let json = serde_json::to_string(&ComplaintFeedbackType::Other).expect("JSON error"); + assert_eq!(json, "\"other\""); + let json = serde_json::to_string(&ComplaintFeedbackType::Virus).expect("JSON error"); + assert_eq!(json, "\"virus\""); +} diff --git a/src/queues/sqs.rs b/src/queues/sqs.rs new file mode 100644 index 0000000..28846da --- /dev/null +++ b/src/queues/sqs.rs @@ -0,0 +1,186 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use std::{ + boxed::Box, fmt::{self, Debug, Formatter}, +}; + +use md5; +use rusoto_core::{reactor::RequestDispatcher, Region}; +use rusoto_credential::StaticProvider; +use rusoto_sqs::{ + DeleteMessageError, DeleteMessageRequest, Message as SqsMessage, ReceiveMessageError, + ReceiveMessageRequest, SendMessageError, SendMessageRequest, Sqs, SqsClient, +}; +use serde_json::{self, Error as JsonError, Value as JsonValue}; + +use super::{notification::Notification, Factory, Incoming, Message, Outgoing, QueueError}; +use settings::Settings; + +pub struct Queue<'s> { + client: Box, + url: &'s str, + receive_request: ReceiveMessageRequest, +} + +impl<'s> Queue<'s> { + fn parse_message(&self, message: SqsMessage) -> Result { + let body = message.body.unwrap_or(String::from("")); + if body == "" { + return Err(QueueError::new(format!( + "Missing message body, queue=`{}`", + self.url + ))); + } + + if let Some(hash) = message.md5_of_body { + if hash != format!("{:x}", md5::compute(&body)) { + return Err(QueueError::new(format!( + "Message body does not match MD5 hash, queue=`{}`, hash=`{}`, body=`{}`", + self.url, hash, body + ))); + } + } + + let receipt_handle = message.receipt_handle.unwrap_or(String::from("")); + if receipt_handle == "" { + return Err(QueueError::new(format!( + "Missing receipt handle, queue=`{}`", + self.url + ))); + } + + serde_json::from_value(JsonValue::String(body.clone())) + .map(|notification: Notification| { + println!( + "Successfully parsed SQS message, queue=`{}`, receipt_handle=`{}`, notification_type=`{}`", + self.url, + receipt_handle, + notification.notification_type + ); + Message { + notification, + id: receipt_handle, + } + }) + .map_err(|error| { + QueueError::new(format!( + "Unparseable message body, queue=`{}`, error=`{}`, body=`{}`", + self.url, error, body + )) + }) + } +} + +impl<'s> Factory<'s> for Queue<'s> { + fn new(id: &'s str, settings: &Settings) -> Queue<'s> { + let region = settings + .aws + .region + .parse::() + .expect("invalid region"); + + let client: Box = if let Some(ref keys) = settings.aws.keys { + let creds = + StaticProvider::new(keys.access.to_string(), keys.secret.to_string(), None, None); + Box::new(SqsClient::new(RequestDispatcher::default(), creds, region)) + } else { + Box::new(SqsClient::simple(region)) + }; + + let mut receive_request = ReceiveMessageRequest::default(); + receive_request.max_number_of_messages = Some(10); + receive_request.queue_url = id.to_string(); + + Queue { + client, + url: id, + receive_request, + } + } +} + +impl<'s> Incoming for Queue<'s> { + fn receive(&self) -> Result, QueueError> { + self.client + .receive_message(&self.receive_request) + .sync() + .map(|result| result.messages.unwrap_or(Vec::new())) + .map(|messages| { + messages + .into_iter() + .map(|message| { + self.parse_message(message).unwrap_or_else(|error| { + // At this point any parse errors are message-specific. + // Log them but don't fail the broader call to receive, + // because other messages might be fine. + println!("Queue error receiving from {}: {:?}", self.url, error); + Message::default() + }) + }) + .collect() + }) + .map_err(From::from) + } + + fn delete(&self, message: Message) -> Result<(), QueueError> { + let request = DeleteMessageRequest { + queue_url: self.url.to_string(), + receipt_handle: message.id, + }; + self.client + .delete_message(&request) + .sync() + .map_err(|error| { + println!("Queue error deleting from {}: {:?}", self.url, error); + From::from(error) + }) + } +} + +impl<'s> Outgoing for Queue<'s> { + fn send(&self, notification: &Notification) -> Result { + let mut request = SendMessageRequest::default(); + request.message_body = serde_json::to_string(notification)?; + request.queue_url = self.url.to_string(); + + self.client + .send_message(&request) + .sync() + .map(|result| result.message_id.map_or(String::from(""), |id| id.clone())) + .map_err(From::from) + } +} + +impl From for QueueError { + fn from(error: ReceiveMessageError) -> QueueError { + QueueError::new(format!("SQS ReceiveMessage error: {:?}", error)) + } +} + +impl From for QueueError { + fn from(error: SendMessageError) -> QueueError { + QueueError::new(format!("SQS SendMessage error: {:?}", error)) + } +} + +impl From for QueueError { + fn from(error: DeleteMessageError) -> QueueError { + QueueError::new(format!("SQS DeleteMessage error: {:?}", error)) + } +} + +impl From for QueueError { + fn from(error: JsonError) -> QueueError { + QueueError::new(format!("JSON error: {:?}", error)) + } +} + +impl<'s> Debug for Queue<'s> { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!(formatter, "SQS queue, url=`{}`", self.url) + } +} + +unsafe impl<'s> Sync for Queue<'s> {} diff --git a/src/queues/test.rs b/src/queues/test.rs new file mode 100644 index 0000000..4c458bb --- /dev/null +++ b/src/queues/test.rs @@ -0,0 +1,73 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, you can obtain one at https://mozilla.org/MPL/2.0/. + +use super::*; + +// Although these tests look quite pointless, the implementation of the mock +// means they do achieve a few things that may not be obvious: +// +// * They ensure that Outgoing methods aren't invoked on Incoming queues and +// vice versa. +// +// * They assert that no errors bubble out of the queues with normal inputs. +// +// * They ensure that errors are returned if the wrong notification type is +// found on a particular queue. +// +// * They exercise most of the queue-handling code to make sure it runs with +// no panics. + +#[test] +fn process() { + let settings = Settings::new().expect("config error"); + let ids = QueueIds { + bounce: "incoming-bounce", + complaint: "incoming-complaint", + delivery: "incoming-delivery", + notification: "outgoing", + }; + let queues = Queues::new::(&ids, &settings); + match queues.process() { + Ok(count) => assert_eq!(count, 3), + Err(error) => assert!(false, error.description().to_string()), + } +} + +#[test] +fn process_complaint_on_bounce_queue() { + let settings = Settings::new().expect("config error"); + let ids = QueueIds { + bounce: "incoming-complaint", + complaint: "incoming-complaint", + delivery: "incoming-delivery", + notification: "outgoing", + }; + let queues = Queues::new::(&ids, &settings); + match queues.process() { + Ok(_) => assert!(false, "Queues::process should have failed"), + Err(error) => assert_eq!( + error.description(), + "Unexpected notification type in bounce queue: Complaint" + ), + } +} + +#[test] +fn process_bounce_on_complaint_queue() { + let settings = Settings::new().expect("config error"); + let ids = QueueIds { + bounce: "incoming-bounce", + complaint: "incoming-bounce", + delivery: "incoming-delivery", + notification: "outgoing", + }; + let queues = Queues::new::(&ids, &settings); + match queues.process() { + Ok(_) => assert!(false, "Queues::process should have failed"), + Err(error) => assert_eq!( + error.description(), + "Unexpected notification type in complaint queue: Bounce" + ), + } +} diff --git a/src/queues_main.rs b/src/queues_main.rs index 90538db..20ea2be 100644 --- a/src/queues_main.rs +++ b/src/queues_main.rs @@ -6,13 +6,18 @@ #![feature(try_from)] #![feature(type_ascription)] +extern crate chrono; extern crate config; extern crate hex; #[macro_use] extern crate lazy_static; +extern crate md5; extern crate regex; extern crate reqwest; extern crate rusoto_core; +extern crate rusoto_credential; +extern crate rusoto_ses; +extern crate rusoto_sqs; extern crate serde; #[macro_use] extern crate serde_derive; @@ -21,9 +26,30 @@ extern crate serde_json; mod auth_db; mod deserialize; mod duration; +mod queues; mod settings; mod validate; +use queues::{QueueIds, Queues, Sqs}; +use settings::Settings; + fn main() { - println!("Not implemented"); + let settings = Settings::new().expect("config error"); + let sqs_urls = match settings.aws.sqsurls { + Some(ref urls) => urls, + None => panic!("Missing config: aws.sqsurls.*"), + }; + let queue_ids = QueueIds { + bounce: &sqs_urls.bounce, + complaint: &sqs_urls.complaint, + delivery: &sqs_urls.delivery, + notification: &sqs_urls.notification, + }; + let queues = Queues::new::(&queue_ids, &settings); + loop { + match queues.process() { + Ok(count) => println!("Processed {} messages", count), + Err(error) => println!("{:?}", error), + } + } } diff --git a/src/settings/mod.rs b/src/settings/mod.rs index efa79c3..4b84a3b 100644 --- a/src/settings/mod.rs +++ b/src/settings/mod.rs @@ -17,6 +17,14 @@ pub struct AuthDb { pub baseuri: String, } +#[derive(Debug, Default, Deserialize)] +pub struct Aws { + pub keys: Option, + #[serde(deserialize_with = "deserialize::aws_region")] + pub region: String, + pub sqsurls: Option, +} + #[derive(Debug, Default, Deserialize)] pub struct AwsKeys { #[serde(deserialize_with = "deserialize::aws_access")] @@ -54,13 +62,6 @@ pub struct Sendgrid { pub key: String, } -#[derive(Debug, Default, Deserialize)] -pub struct Ses { - #[serde(deserialize_with = "deserialize::aws_region")] - pub region: String, - pub keys: Option, -} - #[derive(Debug, Default, Deserialize)] pub struct Smtp { #[serde(deserialize_with = "deserialize::host")] @@ -70,15 +71,31 @@ pub struct Smtp { pub password: Option, } +#[derive(Debug, Default, Deserialize)] +pub struct SqsUrls { + // Queue URLs are specified here for consistency with the auth server. + // However, we could also store queue names instead and then fetch the + // URL with rusoto_sqs::GetQueueUrl. Then we might be allowed to include + // the production queue names in default config? + #[serde(deserialize_with = "deserialize::sqs_url")] + pub bounce: String, + #[serde(deserialize_with = "deserialize::sqs_url")] + pub complaint: String, + #[serde(deserialize_with = "deserialize::sqs_url")] + pub delivery: String, + #[serde(deserialize_with = "deserialize::sqs_url")] + pub notification: String, +} + #[derive(Debug, Default, Deserialize)] pub struct Settings { pub authdb: AuthDb, + pub aws: Aws, pub bouncelimits: BounceLimits, #[serde(deserialize_with = "deserialize::provider")] pub provider: String, pub sender: Sender, pub sendgrid: Option, - pub ses: Ses, pub smtp: Smtp, } diff --git a/src/settings/test.rs b/src/settings/test.rs index 2ff9170..579e833 100644 --- a/src/settings/test.rs +++ b/src/settings/test.rs @@ -53,14 +53,18 @@ impl Drop for CleanEnvironment { fn env_vars_take_precedence() { let _clean_env = CleanEnvironment::new(vec![ "FXA_EMAIL_AUTHDB_BASEURI", + "FXA_EMAIL_AWS_REGION", + "FXA_EMAIL_AWS_KEYS_ACCESS", + "FXA_EMAIL_AWS_KEYS_SECRET", + "FXA_EMAIL_AWS_SQSURLS_BOUNCE", + "FXA_EMAIL_AWS_SQSURLS_COMPLAINT", + "FXA_EMAIL_AWS_SQSURLS_DELIVERY", + "FXA_EMAIL_AWS_SQSURLS_NOTIFICATION", "FXA_EMAIL_BOUNCELIMITS_ENABLED", "FXA_EMAIL_PROVIDER", "FXA_EMAIL_SENDER_ADDRESS", "FXA_EMAIL_SENDER_NAME", "FXA_EMAIL_SENDGRID_KEY", - "FXA_EMAIL_SES_REGION", - "FXA_EMAIL_SES_KEYS_ACCESS", - "FXA_EMAIL_SES_KEYS_SECRET", "FXA_EMAIL_SMTP_HOST", "FXA_EMAIL_SMTP_PORT", "FXA_EMAIL_SMTP_USER", @@ -70,6 +74,43 @@ fn env_vars_take_precedence() { match Settings::new() { Ok(settings) => { let auth_db_base_uri = format!("{}foo/", &settings.authdb.baseuri); + let aws_keys = if let Some(ref keys) = settings.aws.keys { + AwsKeys { + access: format!("{}A", keys.access), + secret: format!("{}s", keys.secret), + } + } else { + AwsKeys { + access: String::from("A"), + secret: String::from("s"), + } + }; + let aws_region = if settings.aws.region == "us-east-1" { + "eu-west-1" + } else { + "us-east-1" + }; + let aws_sqs_urls = if let Some(ref urls) = settings.aws.sqsurls { + SqsUrls { + bounce: format!("{}B", urls.bounce), + complaint: format!("{}C", urls.complaint), + delivery: format!("{}D", urls.delivery), + notification: format!("{}N", urls.notification), + } + } else { + SqsUrls { + bounce: String::from("https://sqs.us-east-1.amazonaws.com/123456789012/Bounce"), + complaint: String::from( + "https://sqs.us-east-1.amazonaws.com/123456789012/Complaint", + ), + delivery: String::from( + "https://sqs.us-east-1.amazonaws.com/123456789012/Delivery", + ), + notification: String::from( + "https://sqs.us-east-1.amazonaws.com/123456789012/Notification", + ), + } + }; let bounce_limits_enabled = !settings.bouncelimits.enabled; let provider = if settings.provider == "ses" { "smtp" @@ -81,22 +122,6 @@ fn env_vars_take_precedence() { let sendgrid_api_key = String::from( "000000000000000000000000000000000000000000000000000000000000000000000", ); - let ses_region = if settings.ses.region == "us-east-1" { - "eu-west-1" - } else { - "us-east-1" - }; - let ses_keys = if let Some(ref keys) = settings.ses.keys { - AwsKeys { - access: format!("{}A", keys.access), - secret: format!("{}s", keys.secret), - } - } else { - AwsKeys { - access: String::from("A"), - secret: String::from("s"), - } - }; let smtp_host = format!("{}2", &settings.smtp.host); let smtp_port = settings.smtp.port + 3; let smtp_user = if let Some(ref user) = settings.smtp.user { @@ -111,6 +136,16 @@ fn env_vars_take_precedence() { }; env::set_var("FXA_EMAIL_AUTHDB_BASEURI", &auth_db_base_uri); + env::set_var("FXA_EMAIL_AWS_REGION", &aws_region); + env::set_var("FXA_EMAIL_AWS_KEYS_ACCESS", &aws_keys.access); + env::set_var("FXA_EMAIL_AWS_KEYS_SECRET", &aws_keys.secret); + env::set_var("FXA_EMAIL_AWS_SQSURLS_BOUNCE", &aws_sqs_urls.bounce); + env::set_var("FXA_EMAIL_AWS_SQSURLS_COMPLAINT", &aws_sqs_urls.complaint); + env::set_var("FXA_EMAIL_AWS_SQSURLS_DELIVERY", &aws_sqs_urls.delivery); + env::set_var( + "FXA_EMAIL_AWS_SQSURLS_NOTIFICATION", + &aws_sqs_urls.notification, + ); env::set_var( "FXA_EMAIL_BOUNCELIMITS_ENABLED", &bounce_limits_enabled.to_string(), @@ -119,9 +154,6 @@ fn env_vars_take_precedence() { env::set_var("FXA_EMAIL_SENDER_ADDRESS", &sender_address); env::set_var("FXA_EMAIL_SENDER_NAME", &sender_name); env::set_var("FXA_EMAIL_SENDGRID_KEY", &sendgrid_api_key); - env::set_var("FXA_EMAIL_SES_REGION", &ses_region); - env::set_var("FXA_EMAIL_SES_KEYS_ACCESS", &ses_keys.access); - env::set_var("FXA_EMAIL_SES_KEYS_SECRET", &ses_keys.secret); env::set_var("FXA_EMAIL_SMTP_HOST", &smtp_host); env::set_var("FXA_EMAIL_SMTP_PORT", &smtp_port.to_string()); env::set_var("FXA_EMAIL_SMTP_USER", &smtp_user); @@ -130,11 +162,11 @@ fn env_vars_take_precedence() { match Settings::new() { Ok(env_settings) => { assert_eq!(env_settings.authdb.baseuri, auth_db_base_uri); + assert_eq!(env_settings.aws.region, aws_region); assert_eq!(env_settings.bouncelimits.enabled, bounce_limits_enabled); assert_eq!(env_settings.provider, provider); assert_eq!(env_settings.sender.address, sender_address); assert_eq!(env_settings.sender.name, sender_name); - assert_eq!(env_settings.ses.region, ses_region); assert_eq!(env_settings.smtp.host, smtp_host); assert_eq!(env_settings.smtp.port, smtp_port); @@ -144,11 +176,20 @@ fn env_vars_take_precedence() { assert!(false, "settings.sendgrid was not set"); } - if let Some(env_keys) = env_settings.ses.keys { - assert_eq!(env_keys.access, ses_keys.access); - assert_eq!(env_keys.secret, ses_keys.secret); + if let Some(env_keys) = env_settings.aws.keys { + assert_eq!(env_keys.access, aws_keys.access); + assert_eq!(env_keys.secret, aws_keys.secret); } else { - assert!(false, "ses.keys were not set"); + assert!(false, "aws.keys were not set"); + } + + if let Some(env_sqs_urls) = env_settings.aws.sqsurls { + assert_eq!(env_sqs_urls.bounce, aws_sqs_urls.bounce); + assert_eq!(env_sqs_urls.complaint, aws_sqs_urls.complaint); + assert_eq!(env_sqs_urls.delivery, aws_sqs_urls.delivery); + assert_eq!(env_sqs_urls.notification, aws_sqs_urls.notification); + } else { + assert!(false, "aws.sqsurls were not set"); } if let Some(env_user) = env_settings.smtp.user { @@ -187,6 +228,95 @@ fn invalid_auth_db_base_uri() { } } +#[test] +fn invalid_aws_region() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_REGION"]); + env::set_var("FXA_EMAIL_AWS_REGION", "us-east-1a"); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_access_key() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_KEYS_ACCESS"]); + env::set_var("FXA_EMAIL_AWS_KEYS_ACCESS", "DEADBEEF DEADBEEF"); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_secret_key() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_KEYS_SECRET"]); + env::set_var("FXA_EMAIL_AWS_KEYS_SECRET", "DEADBEEF DEADBEEF"); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_bounce_queue_url() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_BOUNCE"]); + env::set_var( + "FXA_EMAIL_AWS_SQSURLS_BOUNCE", + "http://sqs.us-east-1.amazonaws.com/123456789012/Bounce", + ); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_complaint_queue_url() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_COMPLAINT"]); + env::set_var( + "FXA_EMAIL_AWS_SQSURLS_COMPLAINT", + "http://sqs.us-east-1.amazonaws.com/123456789012/Complaint", + ); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_delivery_queue_url() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_DELIVERY"]); + env::set_var( + "FXA_EMAIL_AWS_SQSURLS_DELIVERY", + "http://sqs.us-east-1.amazonaws.com/123456789012/Delivery", + ); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + +#[test] +fn invalid_aws_notification_queue_url() { + let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_NOTIFICATION"]); + env::set_var( + "FXA_EMAIL_AWS_SQSURLS_NOTIFICATION", + "http://sqs.us-east-1.amazonaws.com/123456789012/Notification", + ); + + match Settings::new() { + Ok(_settings) => assert!(false, "Settings::new should have failed"), + Err(error) => assert_eq!(error.description(), "configuration error"), + } +} + #[test] fn invalid_bouncelimits_enabled() { let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_BOUNCELIMITS_ENABLED"]); @@ -242,39 +372,6 @@ fn invalid_sendgrid_api_key() { } } -#[test] -fn invalid_ses_region() { - let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_REGION"]); - env::set_var("FXA_EMAIL_SES_REGION", "us-east-1a"); - - match Settings::new() { - Ok(_settings) => assert!(false, "Settings::new should have failed"), - Err(error) => assert_eq!(error.description(), "configuration error"), - } -} - -#[test] -fn invalid_ses_access_key() { - let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_KEYS_ACCESS"]); - env::set_var("FXA_EMAIL_SES_KEYS_ACCESS", "DEADBEEF DEADBEEF"); - - match Settings::new() { - Ok(_settings) => assert!(false, "Settings::new should have failed"), - Err(error) => assert_eq!(error.description(), "configuration error"), - } -} - -#[test] -fn invalid_ses_secret_key() { - let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_KEYS_SECRET"]); - env::set_var("FXA_EMAIL_SES_KEYS_SECRET", "DEADBEEF DEADBEEF"); - - match Settings::new() { - Ok(_settings) => assert!(false, "Settings::new should have failed"), - Err(error) => assert_eq!(error.description(), "configuration error"), - } -} - #[test] fn invalid_smtp_host() { let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SMTP_HOST"]); diff --git a/src/validate/mod.rs b/src/validate/mod.rs index bd8ca09..d68b1d1 100644 --- a/src/validate/mod.rs +++ b/src/validate/mod.rs @@ -21,6 +21,8 @@ lazy_static! { static ref SENDER_NAME_FORMAT: Regex = Regex::new("^[A-Za-z0-9-]+(?: [A-Za-z0-9-]+)*$").unwrap(); static ref SENDGRID_API_KEY_FORMAT: Regex = Regex::new("^[A-Za-z0-9._]{69}$").unwrap(); + static ref SQS_URL_FORMAT: Regex = + Regex::new("^https://sqs\\.[a-z0-9-]+\\.amazonaws\\.com/[0-9]+/[A-Za-z0-9-]+$").unwrap(); } pub fn aws_region(value: &str) -> bool { @@ -58,3 +60,7 @@ pub fn sender_name(value: &str) -> bool { pub fn sendgrid_api_key(value: &str) -> bool { SENDGRID_API_KEY_FORMAT.is_match(value) } + +pub fn sqs_url(value: &str) -> bool { + SQS_URL_FORMAT.is_match(value) +} diff --git a/src/validate/test.rs b/src/validate/test.rs index d8f2ad8..4e83670 100644 --- a/src/validate/test.rs +++ b/src/validate/test.rs @@ -163,3 +163,29 @@ fn invalid_sendgrid_api_key() { "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz._123456" )); } + +#[test] +fn sqs_url() { + assert!(validate::sqs_url( + "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue" + )); + assert!(validate::sqs_url( + "https://sqs.us-west-2.amazonaws.com/42/fxa-email-bounce-prod" + )); +} + +#[test] +fn invalid_sqs_url() { + assert!(!validate::sqs_url( + "http://sqs.us-east-1.amazonaws.com/123456789012/MyQueue" + )); + assert!(!validate::sqs_url( + " https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue" + )); + assert!(!validate::sqs_url( + "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue " + )); + assert!(!validate::sqs_url( + "https://sqs.us-east-1.wibble.com/123456789012/MyQueue" + )); +}