feat(queues): handle SES bounce, complaint and delivery notifications

A direct mapping of functionality from the auth server, so the different
notification types are expected on three different queues. Long-term it
makes sense to simplify that down to a single queue, but this way we can
ease the new service in alongside the existing auth server.

As a first cut, the methods on queues::sqs::Queue all block the thread
in this implementation. A subsequent change will transition to futures.
This commit is contained in:
Phil Booth 2018-05-17 10:50:55 +01:00
Родитель f18a5bc60e
Коммит 4338f24cbe
17 изменённых файлов: 1519 добавлений и 86 удалений

41
Cargo.lock сгенерированный
Просмотреть файл

@ -41,7 +41,7 @@ dependencies = [
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -140,6 +140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -352,9 +353,11 @@ dependencies = [
name = "fxa-email-service" name = "fxa-email-service"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"md5 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rocket 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", "rocket 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)",
@ -363,6 +366,7 @@ dependencies = [
"rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rusoto_sqs 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sendgrid 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "sendgrid 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
@ -492,7 +496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -579,6 +583,11 @@ name = "matches"
version = "0.1.6" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "md5"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "0.1.11" version = "0.1.11"
@ -685,7 +694,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -866,7 +875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -964,7 +973,7 @@ name = "remove_dir_all"
version = "0.5.1" version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -1098,6 +1107,18 @@ dependencies = [
"xml-rs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "xml-rs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rusoto_sqs"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)",
"rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"xml-rs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.8" version = "0.1.8"
@ -1122,7 +1143,7 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -1351,7 +1372,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -1681,7 +1702,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1804,6 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
"checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376" "checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376"
"checksum md5 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "daa1004633f76cdcd5a9d83ffcfe615e30ca7a2a638fcc8b8039a2dac21289d7"
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" "checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a"
"checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d" "checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d"
@ -1857,6 +1879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12daaa6d62d64f6447bf0299ce775f4e05f8e75e5418e817da094b9de04ad22d" "checksum rusoto_core 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12daaa6d62d64f6447bf0299ce775f4e05f8e75e5418e817da094b9de04ad22d"
"checksum rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "53199d09fd1b7d4f5ac50f4d23106577624238ea77cae2b44eb1d1fc4cd956a4" "checksum rusoto_credential 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "53199d09fd1b7d4f5ac50f4d23106577624238ea77cae2b44eb1d1fc4cd956a4"
"checksum rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4645c94b099369602db7d814b1741f991b3d6821d8076b2ac67824b8cc130" "checksum rusoto_ses 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4645c94b099369602db7d814b1741f991b3d6821d8076b2ac67824b8cc130"
"checksum rusoto_sqs 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "995322c1530460a18fadd2f16761f22ba3f4e1fe630e344ca97e81d921abfb00"
"checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649" "checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649"
"checksum rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a54aa04a10c68c1c4eacb4337fd883b435997ede17a9385784b990777686b09a" "checksum rustc_version 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a54aa04a10c68c1c4eacb4337fd883b435997ede17a9385784b990777686b09a"
"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f"
@ -1927,7 +1950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1" "checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" "checksum winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "773ef9dcc5f24b7d850d0ff101e542ff24c3b090a9768e03ff889fdef41f00fd"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

Просмотреть файл

@ -3,9 +3,11 @@ name = "fxa-email-service"
version = "0.1.0" version = "0.1.0"
[dependencies] [dependencies]
chrono = { version = "0.4.2", features = [ "serde" ] }
config = "0.8.0" config = "0.8.0"
hex = "0.3.2" hex = "0.3.2"
lazy_static = "1.0" lazy_static = "1.0"
md5 = "0.3.7"
regex = "1.0" regex = "1.0"
reqwest = "0.8.5" reqwest = "0.8.5"
rocket = "0.3.12" rocket = "0.3.12"
@ -14,6 +16,7 @@ rocket_contrib = "0.3.12"
rusoto_core = "0.32.0" rusoto_core = "0.32.0"
rusoto_credential = "0.11.0" rusoto_credential = "0.11.0"
rusoto_ses = "0.32.0" rusoto_ses = "0.32.0"
rusoto_sqs = "0.32.0"
sendgrid = "0.7.0" sendgrid = "0.7.0"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

Просмотреть файл

@ -12,6 +12,7 @@
* [How do I run the tests?](#how-do-i-run-the-tests) * [How do I run the tests?](#how-do-i-run-the-tests)
* [How can I send an email via SES?](#how-can-i-send-an-email-via-ses) * [How can I send an email via SES?](#how-can-i-send-an-email-via-ses)
* [How can I send an email via Sendgrid?](#how-can-i-send-an-email-via-sendgrid) * [How can I send an email via Sendgrid?](#how-can-i-send-an-email-via-sendgrid)
* [How do bounce, complaint and delivery notifications work?](#how-do-bounce-complaint-and-delivery-notifications-work)
## What's this? ## What's this?
@ -135,14 +136,14 @@ You'll need to set up some config
with your AWS credentials. with your AWS credentials.
That can be with environment variables: That can be with environment variables:
* `FXA_EMAIL_SES_KEYS_ACCESS` * `FXA_EMAIL_AWS_KEYS_ACCESS`
* `FXA_EMAIL_SES_KEYS_SECRET` * `FXA_EMAIL_AWS_KEYS_SECRET`
Or in `config/local.json`: Or in `config/local.json`:
```json ```json
{ {
"ses": { "aws": {
"keys": { "keys": {
"access": "...", "access": "...",
"secret": "..." "secret": "..."
@ -223,3 +224,78 @@ curl \
If everything is set-up correctly, If everything is set-up correctly,
you should receive email pretty much instantly. you should receive email pretty much instantly.
## How do bounce, complaint and delivery notifications work?
For consistency with the implementation in the auth server,
three separate SQS queues are monitored
for bounce, complaint and delivery notifications.
Ultimately we expect to simplify this
to a single queue for all three notification types.
Messages on these queues
are JSON payloads of the format
described in the [AWS docs](https://docs.aws.amazon.com/ses/latest/DeveloperGuide/notification-contents.html)
and encoded in [`src/queues/notification/mod.rs`](src/queues/notification/mod.rs).
When a message is received,
three things happen in sequence:
1. For bounce and complaint notification types,
a bounce record is created in the auth db.
Errors are fatal at this point,
steps `2` and `3` will not occur
if the db returns an error.
2. The message is forwarded to the auth server
via a fourth, outgoing queue.
An error here is not fatal.
3. The message is deleted from the origin queue.
Currently, both the incoming and outgoing queues
happen to be SQS queues but,
since that's an implementation detail,
the code makes some effort to separate them
behind abstract `Incoming` and `Outgoing` traits.
It's not a perfect abstraction
because the `Notification` type
was allowed to leak out of the `sqs` module
for the sake of expediency.
That can be easily addressed later,
if and when we come to rely on
a different queue implementation.
The queue URLs and region
are set via config,
either using environment variables:
* `FXA_EMAIL_AWS_REGION`
* `FXA_EMAIL_AWS_SQSURLS_BOUNCE`
* `FXA_EMAIL_AWS_SQSURLS_COMPLAINT`
* `FXA_EMAIL_AWS_SQSURLS_DELIVERY`
* `FXA_EMAIL_AWS_SQSURLS_NOTIFICATION`
Or in `config/local.json`:
```json
{
"aws": {
"region": "...",
"sqsurls": {
"bounce": "...",
"complaint": "...",
"delivery": "...",
"notification": "..."
}
}
}
```
The queue-handling code runs in a different process
to the main email-sending service.
You can run it locally like so:
```
cargo r --bin queues
```

Просмотреть файл

@ -2,6 +2,9 @@
"authdb": { "authdb": {
"baseuri": "http://127.0.0.1:8000/" "baseuri": "http://127.0.0.1:8000/"
}, },
"aws": {
"region": "us-east-1"
},
"bouncelimits": { "bouncelimits": {
"enabled": true, "enabled": true,
"complaint": [ "complaint": [
@ -21,9 +24,6 @@
"address": "accounts@firefox.com", "address": "accounts@firefox.com",
"name": "Firefox Accounts" "name": "Firefox Accounts"
}, },
"ses": {
"region": "us-east-1"
},
"smtp": { "smtp": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 25 "port": 25

Просмотреть файл

@ -82,6 +82,13 @@ where
deserialize(deserializer, validate::sendgrid_api_key, "Sendgrid API key") deserialize(deserializer, validate::sendgrid_api_key, "Sendgrid API key")
} }
pub fn sqs_url<'d, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'d>,
{
deserialize(deserializer, validate::sqs_url, "SQS queue URL")
}
fn deserialize<'d, D>( fn deserialize<'d, D>(
deserializer: D, deserializer: D,
validator: fn(&str) -> bool, validator: fn(&str) -> bool,

Просмотреть файл

@ -19,12 +19,12 @@ pub struct SesProvider {
impl SesProvider { impl SesProvider {
pub fn new(settings: &Settings) -> SesProvider { pub fn new(settings: &Settings) -> SesProvider {
let region = settings let region = settings
.ses .aws
.region .region
.parse::<Region>() .parse::<Region>()
.expect("invalid region"); .expect("invalid region");
let client: Box<Ses> = if let Some(ref keys) = settings.ses.keys { let client: Box<Ses> = if let Some(ref keys) = settings.aws.keys {
let creds = let creds =
StaticProvider::new(keys.access.to_string(), keys.secret.to_string(), None, None); StaticProvider::new(keys.access.to_string(), keys.secret.to_string(), None, None);
Box::new(SesClient::new(RequestDispatcher::default(), creds, region)) Box::new(SesClient::new(RequestDispatcher::default(), creds, region))

109
src/queues/mock.rs Normal file
Просмотреть файл

@ -0,0 +1,109 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use chrono::Utc;
use super::{
notification::{
Bounce, BounceSubtype, BounceType, BouncedRecipient, ComplainedRecipient, Complaint,
Delivery, Notification, NotificationType,
},
Factory, Incoming, Message, Outgoing, QueueError,
};
use settings::Settings;
#[derive(Debug)]
pub struct Queue<'s> {
id: &'s str,
}
impl<'s> Factory<'s> for Queue<'s> {
fn new(id: &'s str, _settings: &Settings) -> Queue<'s> {
Queue { id }
}
}
impl<'s> Incoming for Queue<'s> {
fn receive(&self) -> Result<Vec<Message>, QueueError> {
let message = match self.id {
"incoming-bounce" => {
let mut bounce_message = Message::default();
bounce_message.notification.notification_type = NotificationType::Bounce;
bounce_message.notification.bounce = Some(Bounce {
bounce_type: BounceType::Permanent,
bounce_subtype: BounceSubtype::General,
bounced_recipients: vec![BouncedRecipient {
email_address: String::from(
"fxa-email-service.queues.mock.bounce@example.com",
),
action: None,
status: None,
diagnostic_code: None,
}],
timestamp: Utc::now(),
feedback_id: String::from(""),
remote_mta_ip: None,
reporting_mta: None,
});
bounce_message
}
"incoming-complaint" => {
let mut complaint_message = Message::default();
complaint_message.notification.notification_type = NotificationType::Complaint;
complaint_message.notification.complaint = Some(Complaint {
complained_recipients: vec![ComplainedRecipient {
email_address: String::from(
"fxa-email-service.queues.mock.complaint@example.com",
),
}],
timestamp: Utc::now(),
feedback_id: String::from(""),
user_agent: None,
complaint_feedback_type: None,
arrival_date: Utc::now(),
});
complaint_message
}
"incoming-delivery" => {
let mut delivery_message = Message::default();
delivery_message.notification.notification_type = NotificationType::Delivery;
delivery_message.notification.delivery = Some(Delivery {
timestamp: Utc::now(),
processing_time_millis: 0,
recipients: vec![String::from(
"fxa-email-service.queues.mock.delivery@example.com",
)],
smtp_response: String::from(""),
remote_mta_ip: None,
reporting_mta: None,
});
delivery_message
}
_ => return Err(QueueError::new(String::from("Not implemented"))),
};
Ok(vec![message])
}
fn delete(&self, _message: Message) -> Result<(), QueueError> {
if self.id == "outgoing" {
Err(QueueError::new(String::from("Not implemented")))
} else {
Ok(())
}
}
}
impl<'s> Outgoing for Queue<'s> {
fn send(&self, _body: &Notification) -> Result<String, QueueError> {
if self.id == "outgoing" {
Ok(String::from("deadbeef"))
} else {
Err(QueueError::new(String::from("Not implemented")))
}
}
}

177
src/queues/mod.rs Normal file
Просмотреть файл

@ -0,0 +1,177 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use std::{
boxed::Box, error::Error, fmt::{self, Debug, Display, Formatter},
};
use self::notification::{Notification, NotificationType};
pub use self::sqs::Queue as Sqs;
use auth_db::{BounceSubtype, BounceType, Db, DbClient, DbError};
use settings::Settings;
mod mock;
mod notification;
pub mod sqs;
#[cfg(test)]
mod test;
#[derive(Debug)]
pub struct Queues<'s> {
bounce: Box<Incoming + 's>,
complaint: Box<Incoming + 's>,
delivery: Box<Incoming + 's>,
notification: Box<Outgoing + 's>,
db: DbClient,
}
pub trait Incoming: Debug + Sync {
fn receive(&self) -> Result<Vec<Message>, QueueError>;
fn delete(&self, message: Message) -> Result<(), QueueError>;
}
pub trait Outgoing: Debug + Sync {
fn send(&self, body: &Notification) -> Result<String, QueueError>;
}
pub trait Factory<'s> {
fn new(id: &'s str, settings: &Settings) -> Self;
}
#[derive(Debug, Default)]
pub struct Message {
pub id: String,
pub notification: Notification,
}
#[derive(Debug)]
pub struct QueueError {
description: String,
}
#[derive(Debug)]
pub struct QueueIds<'s> {
pub bounce: &'s str,
pub complaint: &'s str,
pub delivery: &'s str,
pub notification: &'s str,
}
impl<'s> Queues<'s> {
pub fn new<Q: 's>(ids: &QueueIds<'s>, settings: &Settings) -> Queues<'s>
where
Q: Incoming + Outgoing + Factory<'s>,
{
Queues {
bounce: Box::new(Q::new(ids.bounce, settings)),
complaint: Box::new(Q::new(ids.complaint, settings)),
delivery: Box::new(Q::new(ids.delivery, settings)),
notification: Box::new(Q::new(ids.notification, settings)),
db: DbClient::new(settings),
}
}
pub fn process(&self) -> Result<usize, QueueError> {
let mut count = self.process_queue(&self.bounce, &|notification: &Notification| {
if let Some(ref bounce) = notification.bounce {
for recipient in bounce.bounced_recipients.iter() {
self.db.create_bounce(
&recipient.email_address,
From::from(bounce.bounce_type),
From::from(bounce.bounce_subtype),
)?;
}
Ok(())
} else {
Err(QueueError::new(format!(
"Unexpected notification type in bounce queue: {:?}",
notification.notification_type
)))
}
})?;
count += self.process_queue(&self.complaint, &|notification: &Notification| {
if let Some(ref complaint) = notification.complaint {
for recipient in complaint.complained_recipients.iter() {
let bounce_subtype =
if let Some(complaint_type) = complaint.complaint_feedback_type {
From::from(complaint_type)
} else {
BounceSubtype::Unmapped
};
self.db.create_bounce(
&recipient.email_address,
BounceType::Complaint,
bounce_subtype,
)?;
}
Ok(())
} else {
Err(QueueError::new(format!(
"Unexpected notification type in complaint queue: {:?}",
notification.notification_type
)))
}
})?;
count += self.process_queue(&self.delivery, &|_notification| Ok(()))?;
Ok(count)
}
fn process_queue(
&self,
queue: &Box<Incoming + 's>,
handler: &Fn(&Notification) -> Result<(), QueueError>,
) -> Result<usize, QueueError> {
let messages = queue.receive()?;
let mut count = 0;
for message in messages.into_iter() {
if message.notification.notification_type != NotificationType::Null {
self.handle_notification(&message.notification, handler)?;
queue.delete(message)?;
count += 1;
}
}
Ok(count)
}
fn handle_notification(
&self,
notification: &Notification,
handler: &Fn(&Notification) -> Result<(), QueueError>,
) -> Result<(), QueueError> {
handler(&notification)?;
if let Err(error) = self.notification.send(&notification) {
// Errors sending to this queue are non-fatal because it's only used
// for logging. We still want to delete the message from the queue.
println!("{:?}", error);
}
Ok(())
}
}
impl QueueError {
pub fn new(description: String) -> QueueError {
QueueError { description }
}
}
impl Error for QueueError {
fn description(&self) -> &str {
&self.description
}
}
impl Display for QueueError {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "{}", self.description)
}
}
impl From<DbError> for QueueError {
fn from(error: DbError) -> QueueError {
QueueError::new(format!("database error: {:?}", error))
}
}

Просмотреть файл

@ -0,0 +1,362 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use std::fmt::{self, Display, Formatter};
use chrono::{DateTime, Utc};
use serde::{
de::{Deserialize, Deserializer, Error as DeserializeError, Unexpected},
ser::{Error as SerializeError, Serialize, Serializer},
};
use auth_db::{BounceSubtype as AuthDbBounceSubtype, BounceType as AuthDbBounceType};
#[cfg(test)]
mod test;
// Warning, long vehicle! This module is a direct encoding
// of the SES notification format that's documented here:
//
// https://docs.aws.amazon.com/ses/latest/DeveloperGuide/notification-contents.html
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct Notification {
#[serde(rename = "notificationType")]
pub notification_type: NotificationType,
pub mail: Mail,
#[serde(skip_serializing_if = "Option::is_none")]
pub bounce: Option<Bounce>,
#[serde(skip_serializing_if = "Option::is_none")]
pub complaint: Option<Complaint>,
#[serde(skip_serializing_if = "Option::is_none")]
pub delivery: Option<Delivery>,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum NotificationType {
Bounce,
Complaint,
Delivery,
Null,
}
impl From<NotificationType> for String {
fn from(notification_type: NotificationType) -> String {
String::from(match notification_type {
NotificationType::Bounce => "Bounce",
NotificationType::Complaint => "Complaint",
NotificationType::Delivery => "Delivery",
NotificationType::Null => "",
})
}
}
impl Display for NotificationType {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "{}", From::from(*self): String)
}
}
impl Default for NotificationType {
fn default() -> NotificationType {
NotificationType::Null
}
}
impl<'d> Deserialize<'d> for NotificationType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
let value: String = Deserialize::deserialize(deserializer)?;
match value.as_str() {
"Bounce" => Ok(NotificationType::Bounce),
"Complaint" => Ok(NotificationType::Complaint),
"Delivery" => Ok(NotificationType::Delivery),
_ => Err(D::Error::invalid_value(
Unexpected::Str(&value),
&"SES notificiation type",
)),
}
}
}
impl Serialize for NotificationType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string: String = From::from(*self);
if string == "" {
Err(S::Error::custom("notification type not set"))
} else {
serializer.serialize_str(&string)
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Mail {
timestamp: DateTime<Utc>,
#[serde(rename = "messageId")]
message_id: String,
source: String,
#[serde(rename = "sourceArn")]
source_arn: String,
#[serde(rename = "sourceIp")]
source_ip: String,
#[serde(rename = "sendingAccountId")]
sending_account_id: String,
destination: Vec<String>,
#[serde(rename = "headersTruncated")]
headers_truncated: Option<String>,
headers: Option<Vec<Header>>,
#[serde(rename = "commonHeaders")]
common_headers: Option<Vec<Header>>,
}
impl Default for Mail {
fn default() -> Mail {
Mail {
timestamp: Utc::now(),
message_id: String::from(""),
source: String::from(""),
source_arn: String::from(""),
source_ip: String::from(""),
sending_account_id: String::from(""),
destination: Vec::new(),
headers_truncated: None,
headers: None,
common_headers: None,
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Header {
name: String,
value: HeaderValue,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum HeaderValue {
Single(String),
Multiple(Vec<String>),
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Bounce {
#[serde(rename = "bounceType")]
pub bounce_type: BounceType,
#[serde(rename = "bounceSubType")]
pub bounce_subtype: BounceSubtype,
pub bounced_recipients: Vec<BouncedRecipient>,
pub timestamp: DateTime<Utc>,
#[serde(rename = "feedbackId")]
pub feedback_id: String,
#[serde(rename = "remoteMtaIp")]
pub remote_mta_ip: Option<String>,
#[serde(rename = "reportingMTA")]
pub reporting_mta: Option<String>,
}
#[derive(Copy, Clone, Debug, PartialEq, Serialize)]
pub enum BounceType {
Undetermined,
Permanent,
Transient,
}
impl From<BounceType> for AuthDbBounceType {
fn from(bounce_type: BounceType) -> AuthDbBounceType {
match bounce_type {
BounceType::Undetermined => {
println!("Mapped SesBounceType::Undetermined to AuthDbBounceType::Soft");
AuthDbBounceType::Soft
}
BounceType::Permanent => AuthDbBounceType::Hard,
BounceType::Transient => AuthDbBounceType::Soft,
}
}
}
impl<'d> Deserialize<'d> for BounceType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
let value: String = Deserialize::deserialize(deserializer)?;
match value.as_str() {
"Undetermined" => Ok(BounceType::Undetermined),
"Permanent" => Ok(BounceType::Permanent),
"Transient" => Ok(BounceType::Transient),
_ => {
println!(
"Mapped unrecognised SES bounceType `{}` to BounceType::Undetermined",
value.as_str()
);
Ok(BounceType::Undetermined)
}
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Serialize)]
pub enum BounceSubtype {
Undetermined,
General,
NoEmail,
Suppressed,
MailboxFull,
MessageTooLarge,
ContentRejected,
AttachmentRejected,
}
impl From<BounceSubtype> for AuthDbBounceSubtype {
fn from(bounce_subtype: BounceSubtype) -> AuthDbBounceSubtype {
match bounce_subtype {
BounceSubtype::Undetermined => AuthDbBounceSubtype::Undetermined,
BounceSubtype::General => AuthDbBounceSubtype::General,
BounceSubtype::NoEmail => AuthDbBounceSubtype::NoEmail,
BounceSubtype::Suppressed => AuthDbBounceSubtype::Suppressed,
BounceSubtype::MailboxFull => AuthDbBounceSubtype::MailboxFull,
BounceSubtype::MessageTooLarge => AuthDbBounceSubtype::MessageTooLarge,
BounceSubtype::ContentRejected => AuthDbBounceSubtype::ContentRejected,
BounceSubtype::AttachmentRejected => AuthDbBounceSubtype::AttachmentRejected,
}
}
}
impl<'d> Deserialize<'d> for BounceSubtype {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
let value: String = Deserialize::deserialize(deserializer)?;
match value.as_str() {
"Undetermined" => Ok(BounceSubtype::Undetermined),
"General" => Ok(BounceSubtype::General),
"NoEmail" => Ok(BounceSubtype::NoEmail),
"Suppressed" => Ok(BounceSubtype::Suppressed),
"MailboxFull" => Ok(BounceSubtype::MailboxFull),
"MessageTooLarge" => Ok(BounceSubtype::MessageTooLarge),
"ContentRejected" => Ok(BounceSubtype::ContentRejected),
"AttachmentRejected" => Ok(BounceSubtype::AttachmentRejected),
_ => {
println!(
"Mapped unrecognised SES bounceSubType `{}` to BounceSubtype::Undetermined",
value.as_str()
);
Ok(BounceSubtype::Undetermined)
}
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct BouncedRecipient {
#[serde(rename = "emailAddress")]
pub email_address: String,
pub action: Option<String>,
pub status: Option<String>,
#[serde(rename = "diagnosticCode")]
pub diagnostic_code: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Complaint {
#[serde(rename = "complainedRecipients")]
pub complained_recipients: Vec<ComplainedRecipient>,
pub timestamp: DateTime<Utc>,
#[serde(rename = "feedbackId")]
pub feedback_id: String,
#[serde(rename = "userAgent")]
pub user_agent: Option<String>,
#[serde(rename = "complaintFeedbackType")]
pub complaint_feedback_type: Option<ComplaintFeedbackType>,
#[serde(rename = "arrivalDate")]
pub arrival_date: DateTime<Utc>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ComplainedRecipient {
#[serde(rename = "emailAddress")]
pub email_address: String,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum ComplaintFeedbackType {
Abuse,
AuthFailure,
Fraud,
NotSpam,
Other,
Virus,
}
impl From<ComplaintFeedbackType> for AuthDbBounceSubtype {
fn from(complaint_feedback_type: ComplaintFeedbackType) -> AuthDbBounceSubtype {
match complaint_feedback_type {
ComplaintFeedbackType::Abuse => AuthDbBounceSubtype::Abuse,
ComplaintFeedbackType::AuthFailure => AuthDbBounceSubtype::AuthFailure,
ComplaintFeedbackType::Fraud => AuthDbBounceSubtype::Fraud,
ComplaintFeedbackType::NotSpam => AuthDbBounceSubtype::NotSpam,
ComplaintFeedbackType::Other => AuthDbBounceSubtype::Other,
ComplaintFeedbackType::Virus => AuthDbBounceSubtype::Virus,
}
}
}
impl<'d> Deserialize<'d> for ComplaintFeedbackType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
let value: String = Deserialize::deserialize(deserializer)?;
match value.as_str() {
"abuse" => Ok(ComplaintFeedbackType::Abuse),
"auth-failure" => Ok(ComplaintFeedbackType::AuthFailure),
"fraud" => Ok(ComplaintFeedbackType::Fraud),
"not-spam" => Ok(ComplaintFeedbackType::NotSpam),
"other" => Ok(ComplaintFeedbackType::Other),
"virus" => Ok(ComplaintFeedbackType::Virus),
_ => {
println!("Mapped unrecognised SES complaintFeedbackType `{}` to ComplaintFeedbackType::Other", value.as_str());
Ok(ComplaintFeedbackType::Other)
}
}
}
}
impl Serialize for ComplaintFeedbackType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let value = match self {
ComplaintFeedbackType::Abuse => "abuse",
ComplaintFeedbackType::AuthFailure => "auth-failure",
ComplaintFeedbackType::Fraud => "fraud",
ComplaintFeedbackType::NotSpam => "not-spam",
ComplaintFeedbackType::Other => "other",
ComplaintFeedbackType::Virus => "virus",
};
serializer.serialize_str(value)
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Delivery {
pub timestamp: DateTime<Utc>,
#[serde(rename = "processingTimeMillis")]
pub processing_time_millis: u32,
pub recipients: Vec<String>,
#[serde(rename = "smtpResponse")]
pub smtp_response: String,
#[serde(rename = "remoteMtaIp")]
pub remote_mta_ip: Option<String>,
#[serde(rename = "reportingMTA")]
pub reporting_mta: Option<String>,
}

Просмотреть файл

@ -0,0 +1,245 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use std::error::Error;
use serde_json;
use super::*;
#[test]
fn deserialize_notification_type() {
let notification_type: NotificationType =
serde_json::from_value(From::from("Bounce")).expect("JSON error");
assert_eq!(notification_type, NotificationType::Bounce);
let notification_type: NotificationType =
serde_json::from_value(From::from("Complaint")).expect("JSON error");
assert_eq!(notification_type, NotificationType::Complaint);
let notification_type: NotificationType =
serde_json::from_value(From::from("Delivery")).expect("JSON error");
assert_eq!(notification_type, NotificationType::Delivery);
}
#[test]
fn deserialize_invalid_notification_type() {
match serde_json::from_value::<NotificationType>(From::from("bounce")) {
Ok(_) => assert!(false, "serde_json::from_value should have failed"),
Err(error) => assert_eq!(error.description(), "JSON error"),
}
match serde_json::from_value::<NotificationType>(From::from("Bouncex")) {
Ok(_) => assert!(false, "serde_json::from_value should have failed"),
Err(error) => assert_eq!(error.description(), "JSON error"),
}
match serde_json::from_value::<NotificationType>(From::from("BBounce")) {
Ok(_) => assert!(false, "serde_json::from_value should have failed"),
Err(error) => assert_eq!(error.description(), "JSON error"),
}
}
#[test]
fn serialize_notification_type() {
let json = serde_json::to_string(&NotificationType::Bounce).expect("JSON error");
assert_eq!(json, "\"Bounce\"");
let json = serde_json::to_string(&NotificationType::Complaint).expect("JSON error");
assert_eq!(json, "\"Complaint\"");
let json = serde_json::to_string(&NotificationType::Delivery).expect("JSON error");
assert_eq!(json, "\"Delivery\"");
}
#[test]
fn serialize_null_notification_type() {
match serde_json::to_string(&NotificationType::Null) {
Ok(_) => assert!(false, "serde_json::to_string should have failed"),
Err(error) => assert_eq!(error.description(), "JSON error"),
}
}
#[test]
fn bounce_type_to_auth_db() {
let db_bounce_type: AuthDbBounceType = From::from(BounceType::Undetermined);
assert_eq!(db_bounce_type, AuthDbBounceType::Soft);
let db_bounce_type: AuthDbBounceType = From::from(BounceType::Permanent);
assert_eq!(db_bounce_type, AuthDbBounceType::Hard);
let db_bounce_type: AuthDbBounceType = From::from(BounceType::Transient);
assert_eq!(db_bounce_type, AuthDbBounceType::Soft);
}
#[test]
fn deserialize_bounce_type() {
let bounce_type: BounceType =
serde_json::from_value(From::from("Undetermined")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Undetermined);
let bounce_type: BounceType =
serde_json::from_value(From::from("Permanent")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Permanent);
let bounce_type: BounceType =
serde_json::from_value(From::from("Transient")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Transient);
let bounce_type: BounceType = serde_json::from_value(From::from("wibble")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Undetermined);
let bounce_type: BounceType =
serde_json::from_value(From::from("Permanentx")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Undetermined);
let bounce_type: BounceType =
serde_json::from_value(From::from("PPermanent")).expect("JSON error");
assert_eq!(bounce_type, BounceType::Undetermined);
}
#[test]
fn serialize_bounce_type() {
let json = serde_json::to_string(&BounceType::Undetermined).expect("JSON error");
assert_eq!(json, "\"Undetermined\"");
let json = serde_json::to_string(&BounceType::Permanent).expect("JSON error");
assert_eq!(json, "\"Permanent\"");
let json = serde_json::to_string(&BounceType::Transient).expect("JSON error");
assert_eq!(json, "\"Transient\"");
}
#[test]
fn bounce_subtype_to_auth_db() {
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::Undetermined);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::Undetermined);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::General);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::General);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::NoEmail);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::NoEmail);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::Suppressed);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::Suppressed);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::MailboxFull);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::MailboxFull);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::MessageTooLarge);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::MessageTooLarge);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::ContentRejected);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::ContentRejected);
let db_bounce_subtype: AuthDbBounceSubtype = From::from(BounceSubtype::AttachmentRejected);
assert_eq!(db_bounce_subtype, AuthDbBounceSubtype::AttachmentRejected);
}
#[test]
fn deserialize_bounce_subtype() {
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("Undetermined")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Undetermined);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("General")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::General);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("NoEmail")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::NoEmail);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("Suppressed")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Suppressed);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("MailboxFull")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::MailboxFull);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("MessageTooLarge")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::MessageTooLarge);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("ContentRejected")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::ContentRejected);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("AttachmentRejected")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::AttachmentRejected);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("wibble")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Undetermined);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("undetermined")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Undetermined);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("Undeterminedd")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Undetermined);
let bounce_subtype: BounceSubtype =
serde_json::from_value(From::from("UUndetermined")).expect("JSON error");
assert_eq!(bounce_subtype, BounceSubtype::Undetermined);
}
#[test]
fn serialize_bounce_subtype() {
let json = serde_json::to_string(&BounceSubtype::Undetermined).expect("JSON error");
assert_eq!(json, "\"Undetermined\"");
let json = serde_json::to_string(&BounceSubtype::General).expect("JSON error");
assert_eq!(json, "\"General\"");
let json = serde_json::to_string(&BounceSubtype::NoEmail).expect("JSON error");
assert_eq!(json, "\"NoEmail\"");
let json = serde_json::to_string(&BounceSubtype::Suppressed).expect("JSON error");
assert_eq!(json, "\"Suppressed\"");
let json = serde_json::to_string(&BounceSubtype::MailboxFull).expect("JSON error");
assert_eq!(json, "\"MailboxFull\"");
let json = serde_json::to_string(&BounceSubtype::MessageTooLarge).expect("JSON error");
assert_eq!(json, "\"MessageTooLarge\"");
let json = serde_json::to_string(&BounceSubtype::ContentRejected).expect("JSON error");
assert_eq!(json, "\"ContentRejected\"");
let json = serde_json::to_string(&BounceSubtype::AttachmentRejected).expect("JSON error");
assert_eq!(json, "\"AttachmentRejected\"");
}
#[test]
fn complaint_feedback_type_to_auth_db() {
let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Abuse);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Abuse);
let db_complaint_feedback_type: AuthDbBounceSubtype =
From::from(ComplaintFeedbackType::AuthFailure);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::AuthFailure);
let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Fraud);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Fraud);
let db_complaint_feedback_type: AuthDbBounceSubtype =
From::from(ComplaintFeedbackType::NotSpam);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::NotSpam);
let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Other);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Other);
let db_complaint_feedback_type: AuthDbBounceSubtype = From::from(ComplaintFeedbackType::Virus);
assert_eq!(db_complaint_feedback_type, AuthDbBounceSubtype::Virus);
}
#[test]
fn deserialize_complaint_feedback_type() {
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("abuse")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Abuse);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("auth-failure")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::AuthFailure);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("fraud")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Fraud);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("not-spam")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::NotSpam);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("other")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("virus")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Virus);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("wibble")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("Abuse")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("abusee")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other);
let complaint_feedback_type: ComplaintFeedbackType =
serde_json::from_value(From::from("aabuse")).expect("JSON error");
assert_eq!(complaint_feedback_type, ComplaintFeedbackType::Other);
}
#[test]
fn serialize_complaint_feedback_type() {
let json = serde_json::to_string(&ComplaintFeedbackType::Abuse).expect("JSON error");
assert_eq!(json, "\"abuse\"");
let json = serde_json::to_string(&ComplaintFeedbackType::AuthFailure).expect("JSON error");
assert_eq!(json, "\"auth-failure\"");
let json = serde_json::to_string(&ComplaintFeedbackType::Fraud).expect("JSON error");
assert_eq!(json, "\"fraud\"");
let json = serde_json::to_string(&ComplaintFeedbackType::NotSpam).expect("JSON error");
assert_eq!(json, "\"not-spam\"");
let json = serde_json::to_string(&ComplaintFeedbackType::Other).expect("JSON error");
assert_eq!(json, "\"other\"");
let json = serde_json::to_string(&ComplaintFeedbackType::Virus).expect("JSON error");
assert_eq!(json, "\"virus\"");
}

186
src/queues/sqs.rs Normal file
Просмотреть файл

@ -0,0 +1,186 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use std::{
boxed::Box, fmt::{self, Debug, Formatter},
};
use md5;
use rusoto_core::{reactor::RequestDispatcher, Region};
use rusoto_credential::StaticProvider;
use rusoto_sqs::{
DeleteMessageError, DeleteMessageRequest, Message as SqsMessage, ReceiveMessageError,
ReceiveMessageRequest, SendMessageError, SendMessageRequest, Sqs, SqsClient,
};
use serde_json::{self, Error as JsonError, Value as JsonValue};
use super::{notification::Notification, Factory, Incoming, Message, Outgoing, QueueError};
use settings::Settings;
pub struct Queue<'s> {
client: Box<Sqs>,
url: &'s str,
receive_request: ReceiveMessageRequest,
}
impl<'s> Queue<'s> {
fn parse_message(&self, message: SqsMessage) -> Result<Message, QueueError> {
let body = message.body.unwrap_or(String::from(""));
if body == "" {
return Err(QueueError::new(format!(
"Missing message body, queue=`{}`",
self.url
)));
}
if let Some(hash) = message.md5_of_body {
if hash != format!("{:x}", md5::compute(&body)) {
return Err(QueueError::new(format!(
"Message body does not match MD5 hash, queue=`{}`, hash=`{}`, body=`{}`",
self.url, hash, body
)));
}
}
let receipt_handle = message.receipt_handle.unwrap_or(String::from(""));
if receipt_handle == "" {
return Err(QueueError::new(format!(
"Missing receipt handle, queue=`{}`",
self.url
)));
}
serde_json::from_value(JsonValue::String(body.clone()))
.map(|notification: Notification| {
println!(
"Successfully parsed SQS message, queue=`{}`, receipt_handle=`{}`, notification_type=`{}`",
self.url,
receipt_handle,
notification.notification_type
);
Message {
notification,
id: receipt_handle,
}
})
.map_err(|error| {
QueueError::new(format!(
"Unparseable message body, queue=`{}`, error=`{}`, body=`{}`",
self.url, error, body
))
})
}
}
impl<'s> Factory<'s> for Queue<'s> {
fn new(id: &'s str, settings: &Settings) -> Queue<'s> {
let region = settings
.aws
.region
.parse::<Region>()
.expect("invalid region");
let client: Box<Sqs> = if let Some(ref keys) = settings.aws.keys {
let creds =
StaticProvider::new(keys.access.to_string(), keys.secret.to_string(), None, None);
Box::new(SqsClient::new(RequestDispatcher::default(), creds, region))
} else {
Box::new(SqsClient::simple(region))
};
let mut receive_request = ReceiveMessageRequest::default();
receive_request.max_number_of_messages = Some(10);
receive_request.queue_url = id.to_string();
Queue {
client,
url: id,
receive_request,
}
}
}
impl<'s> Incoming for Queue<'s> {
fn receive(&self) -> Result<Vec<Message>, QueueError> {
self.client
.receive_message(&self.receive_request)
.sync()
.map(|result| result.messages.unwrap_or(Vec::new()))
.map(|messages| {
messages
.into_iter()
.map(|message| {
self.parse_message(message).unwrap_or_else(|error| {
// At this point any parse errors are message-specific.
// Log them but don't fail the broader call to receive,
// because other messages might be fine.
println!("Queue error receiving from {}: {:?}", self.url, error);
Message::default()
})
})
.collect()
})
.map_err(From::from)
}
fn delete(&self, message: Message) -> Result<(), QueueError> {
let request = DeleteMessageRequest {
queue_url: self.url.to_string(),
receipt_handle: message.id,
};
self.client
.delete_message(&request)
.sync()
.map_err(|error| {
println!("Queue error deleting from {}: {:?}", self.url, error);
From::from(error)
})
}
}
impl<'s> Outgoing for Queue<'s> {
fn send(&self, notification: &Notification) -> Result<String, QueueError> {
let mut request = SendMessageRequest::default();
request.message_body = serde_json::to_string(notification)?;
request.queue_url = self.url.to_string();
self.client
.send_message(&request)
.sync()
.map(|result| result.message_id.map_or(String::from(""), |id| id.clone()))
.map_err(From::from)
}
}
impl From<ReceiveMessageError> for QueueError {
fn from(error: ReceiveMessageError) -> QueueError {
QueueError::new(format!("SQS ReceiveMessage error: {:?}", error))
}
}
impl From<SendMessageError> for QueueError {
fn from(error: SendMessageError) -> QueueError {
QueueError::new(format!("SQS SendMessage error: {:?}", error))
}
}
impl From<DeleteMessageError> for QueueError {
fn from(error: DeleteMessageError) -> QueueError {
QueueError::new(format!("SQS DeleteMessage error: {:?}", error))
}
}
impl From<JsonError> for QueueError {
fn from(error: JsonError) -> QueueError {
QueueError::new(format!("JSON error: {:?}", error))
}
}
impl<'s> Debug for Queue<'s> {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "SQS queue, url=`{}`", self.url)
}
}
unsafe impl<'s> Sync for Queue<'s> {}

73
src/queues/test.rs Normal file
Просмотреть файл

@ -0,0 +1,73 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.
use super::*;
// Although these tests look quite pointless, the implementation of the mock
// means they do achieve a few things that may not be obvious:
//
// * They ensure that Outgoing methods aren't invoked on Incoming queues and
// vice versa.
//
// * They assert that no errors bubble out of the queues with normal inputs.
//
// * They ensure that errors are returned if the wrong notification type is
// found on a particular queue.
//
// * They exercise most of the queue-handling code to make sure it runs with
// no panics.
#[test]
fn process() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
bounce: "incoming-bounce",
complaint: "incoming-complaint",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
Ok(count) => assert_eq!(count, 3),
Err(error) => assert!(false, error.description().to_string()),
}
}
#[test]
fn process_complaint_on_bounce_queue() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
bounce: "incoming-complaint",
complaint: "incoming-complaint",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
Ok(_) => assert!(false, "Queues::process should have failed"),
Err(error) => assert_eq!(
error.description(),
"Unexpected notification type in bounce queue: Complaint"
),
}
}
#[test]
fn process_bounce_on_complaint_queue() {
let settings = Settings::new().expect("config error");
let ids = QueueIds {
bounce: "incoming-bounce",
complaint: "incoming-bounce",
delivery: "incoming-delivery",
notification: "outgoing",
};
let queues = Queues::new::<mock::Queue>(&ids, &settings);
match queues.process() {
Ok(_) => assert!(false, "Queues::process should have failed"),
Err(error) => assert_eq!(
error.description(),
"Unexpected notification type in complaint queue: Bounce"
),
}
}

Просмотреть файл

@ -6,13 +6,18 @@
#![feature(try_from)] #![feature(try_from)]
#![feature(type_ascription)] #![feature(type_ascription)]
extern crate chrono;
extern crate config; extern crate config;
extern crate hex; extern crate hex;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
extern crate md5;
extern crate regex; extern crate regex;
extern crate reqwest; extern crate reqwest;
extern crate rusoto_core; extern crate rusoto_core;
extern crate rusoto_credential;
extern crate rusoto_ses;
extern crate rusoto_sqs;
extern crate serde; extern crate serde;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
@ -21,9 +26,30 @@ extern crate serde_json;
mod auth_db; mod auth_db;
mod deserialize; mod deserialize;
mod duration; mod duration;
mod queues;
mod settings; mod settings;
mod validate; mod validate;
use queues::{QueueIds, Queues, Sqs};
use settings::Settings;
fn main() { fn main() {
println!("Not implemented"); let settings = Settings::new().expect("config error");
let sqs_urls = match settings.aws.sqsurls {
Some(ref urls) => urls,
None => panic!("Missing config: aws.sqsurls.*"),
};
let queue_ids = QueueIds {
bounce: &sqs_urls.bounce,
complaint: &sqs_urls.complaint,
delivery: &sqs_urls.delivery,
notification: &sqs_urls.notification,
};
let queues = Queues::new::<Sqs>(&queue_ids, &settings);
loop {
match queues.process() {
Ok(count) => println!("Processed {} messages", count),
Err(error) => println!("{:?}", error),
}
}
} }

Просмотреть файл

@ -17,6 +17,14 @@ pub struct AuthDb {
pub baseuri: String, pub baseuri: String,
} }
#[derive(Debug, Default, Deserialize)]
pub struct Aws {
pub keys: Option<AwsKeys>,
#[serde(deserialize_with = "deserialize::aws_region")]
pub region: String,
pub sqsurls: Option<SqsUrls>,
}
#[derive(Debug, Default, Deserialize)] #[derive(Debug, Default, Deserialize)]
pub struct AwsKeys { pub struct AwsKeys {
#[serde(deserialize_with = "deserialize::aws_access")] #[serde(deserialize_with = "deserialize::aws_access")]
@ -54,13 +62,6 @@ pub struct Sendgrid {
pub key: String, pub key: String,
} }
#[derive(Debug, Default, Deserialize)]
pub struct Ses {
#[serde(deserialize_with = "deserialize::aws_region")]
pub region: String,
pub keys: Option<AwsKeys>,
}
#[derive(Debug, Default, Deserialize)] #[derive(Debug, Default, Deserialize)]
pub struct Smtp { pub struct Smtp {
#[serde(deserialize_with = "deserialize::host")] #[serde(deserialize_with = "deserialize::host")]
@ -70,15 +71,31 @@ pub struct Smtp {
pub password: Option<String>, pub password: Option<String>,
} }
#[derive(Debug, Default, Deserialize)]
pub struct SqsUrls {
// Queue URLs are specified here for consistency with the auth server.
// However, we could also store queue names instead and then fetch the
// URL with rusoto_sqs::GetQueueUrl. Then we might be allowed to include
// the production queue names in default config?
#[serde(deserialize_with = "deserialize::sqs_url")]
pub bounce: String,
#[serde(deserialize_with = "deserialize::sqs_url")]
pub complaint: String,
#[serde(deserialize_with = "deserialize::sqs_url")]
pub delivery: String,
#[serde(deserialize_with = "deserialize::sqs_url")]
pub notification: String,
}
#[derive(Debug, Default, Deserialize)] #[derive(Debug, Default, Deserialize)]
pub struct Settings { pub struct Settings {
pub authdb: AuthDb, pub authdb: AuthDb,
pub aws: Aws,
pub bouncelimits: BounceLimits, pub bouncelimits: BounceLimits,
#[serde(deserialize_with = "deserialize::provider")] #[serde(deserialize_with = "deserialize::provider")]
pub provider: String, pub provider: String,
pub sender: Sender, pub sender: Sender,
pub sendgrid: Option<Sendgrid>, pub sendgrid: Option<Sendgrid>,
pub ses: Ses,
pub smtp: Smtp, pub smtp: Smtp,
} }

Просмотреть файл

@ -53,14 +53,18 @@ impl Drop for CleanEnvironment {
fn env_vars_take_precedence() { fn env_vars_take_precedence() {
let _clean_env = CleanEnvironment::new(vec![ let _clean_env = CleanEnvironment::new(vec![
"FXA_EMAIL_AUTHDB_BASEURI", "FXA_EMAIL_AUTHDB_BASEURI",
"FXA_EMAIL_AWS_REGION",
"FXA_EMAIL_AWS_KEYS_ACCESS",
"FXA_EMAIL_AWS_KEYS_SECRET",
"FXA_EMAIL_AWS_SQSURLS_BOUNCE",
"FXA_EMAIL_AWS_SQSURLS_COMPLAINT",
"FXA_EMAIL_AWS_SQSURLS_DELIVERY",
"FXA_EMAIL_AWS_SQSURLS_NOTIFICATION",
"FXA_EMAIL_BOUNCELIMITS_ENABLED", "FXA_EMAIL_BOUNCELIMITS_ENABLED",
"FXA_EMAIL_PROVIDER", "FXA_EMAIL_PROVIDER",
"FXA_EMAIL_SENDER_ADDRESS", "FXA_EMAIL_SENDER_ADDRESS",
"FXA_EMAIL_SENDER_NAME", "FXA_EMAIL_SENDER_NAME",
"FXA_EMAIL_SENDGRID_KEY", "FXA_EMAIL_SENDGRID_KEY",
"FXA_EMAIL_SES_REGION",
"FXA_EMAIL_SES_KEYS_ACCESS",
"FXA_EMAIL_SES_KEYS_SECRET",
"FXA_EMAIL_SMTP_HOST", "FXA_EMAIL_SMTP_HOST",
"FXA_EMAIL_SMTP_PORT", "FXA_EMAIL_SMTP_PORT",
"FXA_EMAIL_SMTP_USER", "FXA_EMAIL_SMTP_USER",
@ -70,6 +74,43 @@ fn env_vars_take_precedence() {
match Settings::new() { match Settings::new() {
Ok(settings) => { Ok(settings) => {
let auth_db_base_uri = format!("{}foo/", &settings.authdb.baseuri); let auth_db_base_uri = format!("{}foo/", &settings.authdb.baseuri);
let aws_keys = if let Some(ref keys) = settings.aws.keys {
AwsKeys {
access: format!("{}A", keys.access),
secret: format!("{}s", keys.secret),
}
} else {
AwsKeys {
access: String::from("A"),
secret: String::from("s"),
}
};
let aws_region = if settings.aws.region == "us-east-1" {
"eu-west-1"
} else {
"us-east-1"
};
let aws_sqs_urls = if let Some(ref urls) = settings.aws.sqsurls {
SqsUrls {
bounce: format!("{}B", urls.bounce),
complaint: format!("{}C", urls.complaint),
delivery: format!("{}D", urls.delivery),
notification: format!("{}N", urls.notification),
}
} else {
SqsUrls {
bounce: String::from("https://sqs.us-east-1.amazonaws.com/123456789012/Bounce"),
complaint: String::from(
"https://sqs.us-east-1.amazonaws.com/123456789012/Complaint",
),
delivery: String::from(
"https://sqs.us-east-1.amazonaws.com/123456789012/Delivery",
),
notification: String::from(
"https://sqs.us-east-1.amazonaws.com/123456789012/Notification",
),
}
};
let bounce_limits_enabled = !settings.bouncelimits.enabled; let bounce_limits_enabled = !settings.bouncelimits.enabled;
let provider = if settings.provider == "ses" { let provider = if settings.provider == "ses" {
"smtp" "smtp"
@ -81,22 +122,6 @@ fn env_vars_take_precedence() {
let sendgrid_api_key = String::from( let sendgrid_api_key = String::from(
"000000000000000000000000000000000000000000000000000000000000000000000", "000000000000000000000000000000000000000000000000000000000000000000000",
); );
let ses_region = if settings.ses.region == "us-east-1" {
"eu-west-1"
} else {
"us-east-1"
};
let ses_keys = if let Some(ref keys) = settings.ses.keys {
AwsKeys {
access: format!("{}A", keys.access),
secret: format!("{}s", keys.secret),
}
} else {
AwsKeys {
access: String::from("A"),
secret: String::from("s"),
}
};
let smtp_host = format!("{}2", &settings.smtp.host); let smtp_host = format!("{}2", &settings.smtp.host);
let smtp_port = settings.smtp.port + 3; let smtp_port = settings.smtp.port + 3;
let smtp_user = if let Some(ref user) = settings.smtp.user { let smtp_user = if let Some(ref user) = settings.smtp.user {
@ -111,6 +136,16 @@ fn env_vars_take_precedence() {
}; };
env::set_var("FXA_EMAIL_AUTHDB_BASEURI", &auth_db_base_uri); env::set_var("FXA_EMAIL_AUTHDB_BASEURI", &auth_db_base_uri);
env::set_var("FXA_EMAIL_AWS_REGION", &aws_region);
env::set_var("FXA_EMAIL_AWS_KEYS_ACCESS", &aws_keys.access);
env::set_var("FXA_EMAIL_AWS_KEYS_SECRET", &aws_keys.secret);
env::set_var("FXA_EMAIL_AWS_SQSURLS_BOUNCE", &aws_sqs_urls.bounce);
env::set_var("FXA_EMAIL_AWS_SQSURLS_COMPLAINT", &aws_sqs_urls.complaint);
env::set_var("FXA_EMAIL_AWS_SQSURLS_DELIVERY", &aws_sqs_urls.delivery);
env::set_var(
"FXA_EMAIL_AWS_SQSURLS_NOTIFICATION",
&aws_sqs_urls.notification,
);
env::set_var( env::set_var(
"FXA_EMAIL_BOUNCELIMITS_ENABLED", "FXA_EMAIL_BOUNCELIMITS_ENABLED",
&bounce_limits_enabled.to_string(), &bounce_limits_enabled.to_string(),
@ -119,9 +154,6 @@ fn env_vars_take_precedence() {
env::set_var("FXA_EMAIL_SENDER_ADDRESS", &sender_address); env::set_var("FXA_EMAIL_SENDER_ADDRESS", &sender_address);
env::set_var("FXA_EMAIL_SENDER_NAME", &sender_name); env::set_var("FXA_EMAIL_SENDER_NAME", &sender_name);
env::set_var("FXA_EMAIL_SENDGRID_KEY", &sendgrid_api_key); env::set_var("FXA_EMAIL_SENDGRID_KEY", &sendgrid_api_key);
env::set_var("FXA_EMAIL_SES_REGION", &ses_region);
env::set_var("FXA_EMAIL_SES_KEYS_ACCESS", &ses_keys.access);
env::set_var("FXA_EMAIL_SES_KEYS_SECRET", &ses_keys.secret);
env::set_var("FXA_EMAIL_SMTP_HOST", &smtp_host); env::set_var("FXA_EMAIL_SMTP_HOST", &smtp_host);
env::set_var("FXA_EMAIL_SMTP_PORT", &smtp_port.to_string()); env::set_var("FXA_EMAIL_SMTP_PORT", &smtp_port.to_string());
env::set_var("FXA_EMAIL_SMTP_USER", &smtp_user); env::set_var("FXA_EMAIL_SMTP_USER", &smtp_user);
@ -130,11 +162,11 @@ fn env_vars_take_precedence() {
match Settings::new() { match Settings::new() {
Ok(env_settings) => { Ok(env_settings) => {
assert_eq!(env_settings.authdb.baseuri, auth_db_base_uri); assert_eq!(env_settings.authdb.baseuri, auth_db_base_uri);
assert_eq!(env_settings.aws.region, aws_region);
assert_eq!(env_settings.bouncelimits.enabled, bounce_limits_enabled); assert_eq!(env_settings.bouncelimits.enabled, bounce_limits_enabled);
assert_eq!(env_settings.provider, provider); assert_eq!(env_settings.provider, provider);
assert_eq!(env_settings.sender.address, sender_address); assert_eq!(env_settings.sender.address, sender_address);
assert_eq!(env_settings.sender.name, sender_name); assert_eq!(env_settings.sender.name, sender_name);
assert_eq!(env_settings.ses.region, ses_region);
assert_eq!(env_settings.smtp.host, smtp_host); assert_eq!(env_settings.smtp.host, smtp_host);
assert_eq!(env_settings.smtp.port, smtp_port); assert_eq!(env_settings.smtp.port, smtp_port);
@ -144,11 +176,20 @@ fn env_vars_take_precedence() {
assert!(false, "settings.sendgrid was not set"); assert!(false, "settings.sendgrid was not set");
} }
if let Some(env_keys) = env_settings.ses.keys { if let Some(env_keys) = env_settings.aws.keys {
assert_eq!(env_keys.access, ses_keys.access); assert_eq!(env_keys.access, aws_keys.access);
assert_eq!(env_keys.secret, ses_keys.secret); assert_eq!(env_keys.secret, aws_keys.secret);
} else { } else {
assert!(false, "ses.keys were not set"); assert!(false, "aws.keys were not set");
}
if let Some(env_sqs_urls) = env_settings.aws.sqsurls {
assert_eq!(env_sqs_urls.bounce, aws_sqs_urls.bounce);
assert_eq!(env_sqs_urls.complaint, aws_sqs_urls.complaint);
assert_eq!(env_sqs_urls.delivery, aws_sqs_urls.delivery);
assert_eq!(env_sqs_urls.notification, aws_sqs_urls.notification);
} else {
assert!(false, "aws.sqsurls were not set");
} }
if let Some(env_user) = env_settings.smtp.user { if let Some(env_user) = env_settings.smtp.user {
@ -187,6 +228,95 @@ fn invalid_auth_db_base_uri() {
} }
} }
#[test]
fn invalid_aws_region() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_REGION"]);
env::set_var("FXA_EMAIL_AWS_REGION", "us-east-1a");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_access_key() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_KEYS_ACCESS"]);
env::set_var("FXA_EMAIL_AWS_KEYS_ACCESS", "DEADBEEF DEADBEEF");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_secret_key() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_KEYS_SECRET"]);
env::set_var("FXA_EMAIL_AWS_KEYS_SECRET", "DEADBEEF DEADBEEF");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_bounce_queue_url() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_BOUNCE"]);
env::set_var(
"FXA_EMAIL_AWS_SQSURLS_BOUNCE",
"http://sqs.us-east-1.amazonaws.com/123456789012/Bounce",
);
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_complaint_queue_url() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_COMPLAINT"]);
env::set_var(
"FXA_EMAIL_AWS_SQSURLS_COMPLAINT",
"http://sqs.us-east-1.amazonaws.com/123456789012/Complaint",
);
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_delivery_queue_url() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_DELIVERY"]);
env::set_var(
"FXA_EMAIL_AWS_SQSURLS_DELIVERY",
"http://sqs.us-east-1.amazonaws.com/123456789012/Delivery",
);
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_aws_notification_queue_url() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_AWS_SQSURLS_NOTIFICATION"]);
env::set_var(
"FXA_EMAIL_AWS_SQSURLS_NOTIFICATION",
"http://sqs.us-east-1.amazonaws.com/123456789012/Notification",
);
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test] #[test]
fn invalid_bouncelimits_enabled() { fn invalid_bouncelimits_enabled() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_BOUNCELIMITS_ENABLED"]); let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_BOUNCELIMITS_ENABLED"]);
@ -242,39 +372,6 @@ fn invalid_sendgrid_api_key() {
} }
} }
#[test]
fn invalid_ses_region() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_REGION"]);
env::set_var("FXA_EMAIL_SES_REGION", "us-east-1a");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_ses_access_key() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_KEYS_ACCESS"]);
env::set_var("FXA_EMAIL_SES_KEYS_ACCESS", "DEADBEEF DEADBEEF");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test]
fn invalid_ses_secret_key() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SES_KEYS_SECRET"]);
env::set_var("FXA_EMAIL_SES_KEYS_SECRET", "DEADBEEF DEADBEEF");
match Settings::new() {
Ok(_settings) => assert!(false, "Settings::new should have failed"),
Err(error) => assert_eq!(error.description(), "configuration error"),
}
}
#[test] #[test]
fn invalid_smtp_host() { fn invalid_smtp_host() {
let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SMTP_HOST"]); let _clean_env = CleanEnvironment::new(vec!["FXA_EMAIL_SMTP_HOST"]);

Просмотреть файл

@ -21,6 +21,8 @@ lazy_static! {
static ref SENDER_NAME_FORMAT: Regex = static ref SENDER_NAME_FORMAT: Regex =
Regex::new("^[A-Za-z0-9-]+(?: [A-Za-z0-9-]+)*$").unwrap(); Regex::new("^[A-Za-z0-9-]+(?: [A-Za-z0-9-]+)*$").unwrap();
static ref SENDGRID_API_KEY_FORMAT: Regex = Regex::new("^[A-Za-z0-9._]{69}$").unwrap(); static ref SENDGRID_API_KEY_FORMAT: Regex = Regex::new("^[A-Za-z0-9._]{69}$").unwrap();
static ref SQS_URL_FORMAT: Regex =
Regex::new("^https://sqs\\.[a-z0-9-]+\\.amazonaws\\.com/[0-9]+/[A-Za-z0-9-]+$").unwrap();
} }
pub fn aws_region(value: &str) -> bool { pub fn aws_region(value: &str) -> bool {
@ -58,3 +60,7 @@ pub fn sender_name(value: &str) -> bool {
pub fn sendgrid_api_key(value: &str) -> bool { pub fn sendgrid_api_key(value: &str) -> bool {
SENDGRID_API_KEY_FORMAT.is_match(value) SENDGRID_API_KEY_FORMAT.is_match(value)
} }
pub fn sqs_url(value: &str) -> bool {
SQS_URL_FORMAT.is_match(value)
}

Просмотреть файл

@ -163,3 +163,29 @@ fn invalid_sendgrid_api_key() {
"1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz._123456" "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz._123456"
)); ));
} }
#[test]
fn sqs_url() {
assert!(validate::sqs_url(
"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
));
assert!(validate::sqs_url(
"https://sqs.us-west-2.amazonaws.com/42/fxa-email-bounce-prod"
));
}
#[test]
fn invalid_sqs_url() {
assert!(!validate::sqs_url(
"http://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
));
assert!(!validate::sqs_url(
" https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
));
assert!(!validate::sqs_url(
"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue "
));
assert!(!validate::sqs_url(
"https://sqs.us-east-1.wibble.com/123456789012/MyQueue"
));
}