feat(metrics): add code and config for email service notification queue

Fixes #2633.

Once the email service starts handling bounce and complaint events we
will need a way for it to tell the auth server to emit metrics. This
change adds a handler + config for a new SQS queue to that end. It
duplicates some of the code from other handlers but that's intentional,
we plan to remove those queues eventually.
This commit is contained in:
Phil Booth 2018-10-08 08:05:41 +01:00
Родитель 93580daf86
Коммит ccd55565de
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 36FBB106F9C32516
4 изменённых файлов: 586 добавлений и 21 удалений

Просмотреть файл

@ -9,33 +9,38 @@
// If required, modules will be instrumented.
require('../lib/newrelic')()
var config = require('../config').getProperties()
var log = require('../lib/log')(config.log.level, 'fxa-email-bouncer')
var error = require('../lib/error')
var Token = require('../lib/tokens')(log, config)
var SQSReceiver = require('../lib/sqs')(log)
var bounces = require('../lib/email/bounces')(log, error)
var delivery = require('../lib/email/delivery')(log)
const config = require('../config').getProperties()
const log = require('../lib/log')(config.log.level, 'fxa-email-bouncer')
const error = require('../lib/error')
const Token = require('../lib/tokens')(log, config)
const SQSReceiver = require('../lib/sqs')(log)
const bounces = require('../lib/email/bounces')(log, error)
const delivery = require('../lib/email/delivery')(log)
const notifications = require('../lib/email/notifications')(log, error)
var DB = require('../lib/db')(
const DB = require('../lib/db')(
config,
log,
Token
)
var bounceQueue = new SQSReceiver(config.emailNotifications.region, [
config.emailNotifications.bounceQueueUrl,
config.emailNotifications.complaintQueueUrl
])
const {
bounceQueueUrl,
complaintQueueUrl,
deliveryQueueUrl,
notificationQueueUrl,
region
} = config.emailNotifications
var deliveryQueue = new SQSReceiver(config.emailNotifications.region, [
config.emailNotifications.deliveryQueueUrl
])
const bounceQueue = new SQSReceiver(region, [ bounceQueueUrl, complaintQueueUrl ])
const deliveryQueue = new SQSReceiver(region, [ deliveryQueueUrl ])
const notificationQueue = new SQSReceiver(region, [ notificationQueueUrl ])
DB.connect(config[config.db.backend])
.then(
function (db) {
bounces(bounceQueue, db)
delivery(deliveryQueue)
}
)
.then(db => {
// bounces and delivery are now deprecated, we'll delete them
// as soon as we're 100% confident in fxa-email-service
bounces(bounceQueue, db)
delivery(deliveryQueue)
notifications(notificationQueue, db)
})

Просмотреть файл

@ -536,6 +536,12 @@ var conf = convict({
format: String,
env: 'DELIVERY_QUEUE_URL',
default: ''
},
notificationQueueUrl: {
doc: 'Queue URL for notifications from fxa-email-service (eventually this will be the only email-related queue)',
format: String,
env: 'NOTIFICATION_QUEUE_URL',
default: ''
}
},
profileServerMessaging: {

Просмотреть файл

@ -0,0 +1,63 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
'use strict'
const P = require('../promise')
const utils = require('./utils/helpers')
// Account deletion threshold for new unverified accounts that receive
// a bounce or complaint notification. Unverified accounts younger than
// 6 hours old will be deleted if a bounce or complaint occurs.
const SIX_HOURS = 1000 * 60 * 60 * 6
module.exports = (log, error) => {
return (queue, db) => {
queue.start()
queue.on('data', async message => {
try {
utils.logErrorIfHeadersAreWeirdOrMissing(log, message, 'notification')
let addresses = [], eventType = 'bounced', isDeletionCandidate = false
if (message.bounce) {
addresses = mapBounceComplaintRecipients(message.bounce.bouncedRecipients)
isDeletionCandidate = true
} else if (message.complaint) {
addresses = mapBounceComplaintRecipients(message.complaint.complainedRecipients)
isDeletionCandidate = true
} else if (message.delivery) {
addresses = message.delivery.recipients
eventType = 'delivered'
}
await P.all(addresses.map(async address => {
const domain = utils.getAnonymizedEmailDomain(address)
utils.logFlowEventFromMessage(log, message, eventType)
utils.logEmailEventFromMessage(log, message, eventType, domain)
if (isDeletionCandidate) {
const emailRecord = await db.accountRecord(address)
if (! emailRecord.emailVerified && emailRecord.createdAt >= Date.now() - SIX_HOURS) {
// A bounce or complaint on a new unverified account is grounds for deletion
await db.deleteAccount(emailRecord)
log.info({ op: 'accountDeleted', ...emailRecord })
}
}
}))
} catch (err) {
log.error({ op: 'email.notification.error', err })
}
message.del()
})
}
}
function mapBounceComplaintRecipients (recipients) {
return recipients.map(recipient => recipient.emailAddress)
}

Просмотреть файл

@ -0,0 +1,491 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
'use strict'
const ROOT_DIR = '../../..'
const { assert } = require('chai')
const error = require(`${ROOT_DIR}/lib/error`)
const { mockLog } = require('../../mocks')
const notifications = require(`${ROOT_DIR}/lib/email/notifications`)
const P = require(`${ROOT_DIR}/lib/promise`)
const sinon = require('sinon')
const SIX_HOURS = 1000 * 60 * 60 * 6
describe('lib/email/notifications:', () => {
let now, del, log, queue, emailRecord, db
beforeEach(() => {
now = Date.now()
sinon.stub(Date, 'now', () => now)
del = sinon.spy()
log = mockLog()
queue = {
start: sinon.spy(),
on: sinon.spy()
}
emailRecord = {
emailVerified: false,
createdAt: now - SIX_HOURS - 1
}
db = {
accountRecord: sinon.spy(() => P.resolve(emailRecord)),
deleteAccount: sinon.spy(() => P.resolve())
}
notifications(log, error)(queue, db)
})
afterEach(() => {
Date.now.restore()
})
it('called queue.start', () => {
assert.equal(queue.start.callCount, 1)
assert.lengthOf(queue.start.args[0], 0)
})
it('called queue.on', () => {
assert.equal(queue.on.callCount, 1)
const args = queue.on.args[0]
assert.lengthOf(args, 2)
assert.equal(args[0], 'data')
assert.isFunction(args[1])
assert.lengthOf(args[1], 1)
})
describe('bounce message:', () => {
beforeEach(() => {
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'en-gb',
'X-Flow-Begin-Time': now - 1,
'X-Flow-Id': 'foo',
'X-Template-Name': 'bar',
'X-Template-Version': 'baz',
}
},
bounce: {
bouncedRecipients: [ { emailAddress: 'wibble@example.com' } ]
}
})
})
it('logged a flow event', () => {
assert.equal(log.flowEvent.callCount, 1)
const args = log.flowEvent.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
event: 'email.bar.bounced',
flow_id: 'foo',
flow_time: 1,
time: now
})
})
it('logged an email event', () => {
assert.equal(log.info.callCount, 1)
const args = log.info.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
bounced: true,
domain: 'other',
flow_id: 'foo',
locale: 'en-gb',
op: 'emailEvent',
template: 'bar',
templateVersion: 'baz',
type: 'bounced'
})
})
it('did not delete the account', () => {
assert.equal(db.accountRecord.callCount, 1)
const args = db.accountRecord.args[0]
assert.lengthOf(args, 1)
assert.equal(args[0], 'wibble@example.com')
assert.equal(db.deleteAccount.callCount, 0)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
assert.lengthOf(del.args[0], 0)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('complaint message, 2 recipients:', () => {
beforeEach(() => {
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'fr',
'X-Flow-Begin-Time': now - 2,
'X-Flow-Id': 'wibble',
'X-Template-Name': 'blee'
}
},
complaint: {
complainedRecipients: [
{ emailAddress: 'foo@example.com' },
{ emailAddress: 'pmbooth@gmail.com' }
]
}
})
})
it('logged 2 flow events', () => {
assert.equal(log.flowEvent.callCount, 2)
let args = log.flowEvent.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
event: 'email.blee.bounced',
flow_id: 'wibble',
flow_time: 2,
time: now
})
args = log.flowEvent.args[1]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
event: 'email.blee.bounced',
flow_id: 'wibble',
flow_time: 2,
time: now
})
})
it('logged 2 email events', () => {
assert.equal(log.info.callCount, 2)
let args = log.info.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
complaint: true,
domain: 'other',
flow_id: 'wibble',
locale: 'fr',
op: 'emailEvent',
template: 'blee',
templateVersion: '',
type: 'bounced'
})
args = log.info.args[1]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
complaint: true,
domain: 'gmail.com',
flow_id: 'wibble',
locale: 'fr',
op: 'emailEvent',
template: 'blee',
templateVersion: '',
type: 'bounced'
})
})
it('did not delete the accounts', () => {
assert.equal(db.accountRecord.callCount, 2)
let args = db.accountRecord.args[0]
assert.lengthOf(args, 1)
assert.equal(args[0], 'foo@example.com')
args = db.accountRecord.args[1]
assert.lengthOf(args, 1)
assert.equal(args[0], 'pmbooth@gmail.com')
assert.equal(db.deleteAccount.callCount, 0)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('bounce message, 2 recipients, new unverified account:', () => {
beforeEach(() => {
emailRecord.createdAt += 1
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'en-gb',
'X-Flow-Begin-Time': now - 1,
'X-Flow-Id': 'foo',
'X-Template-Name': 'bar',
'X-Template-Version': 'baz',
}
},
bounce: {
bouncedRecipients: [
{ emailAddress: 'wibble@example.com' },
{ emailAddress: 'blee@example.com' }
]
}
})
})
it('logged events', () => {
assert.equal(log.flowEvent.callCount, 2)
assert.equal(log.info.callCount, 4)
let args = log.info.args[2]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
op: 'accountDeleted',
emailVerified: false,
createdAt: emailRecord.createdAt
})
args = log.info.args[3]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
op: 'accountDeleted',
emailVerified: false,
createdAt: emailRecord.createdAt
})
})
it('deleted the accounts', () => {
assert.equal(db.accountRecord.callCount, 2)
let args = db.accountRecord.args[0]
assert.lengthOf(args, 1)
assert.equal(args[0], 'wibble@example.com')
args = db.accountRecord.args[1]
assert.lengthOf(args, 1)
assert.equal(args[0], 'blee@example.com')
assert.equal(db.deleteAccount.callCount, 2)
args = db.deleteAccount.args[0]
assert.lengthOf(args, 1)
assert.equal(args[0], emailRecord)
args = db.deleteAccount.args[1]
assert.lengthOf(args, 1)
assert.equal(args[0], emailRecord)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('complaint message, new unverified account:', () => {
beforeEach(() => {
emailRecord.createdAt += 1
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'fr',
'X-Flow-Begin-Time': now - 2,
'X-Flow-Id': 'wibble',
'X-Template-Name': 'blee'
}
},
complaint: {
complainedRecipients: [
{ emailAddress: 'foo@example.com' }
]
}
})
})
it('logged events', () => {
assert.equal(log.flowEvent.callCount, 1)
assert.equal(log.info.callCount, 2)
})
it('deleted the account', () => {
assert.equal(db.accountRecord.callCount, 1)
assert.equal(db.deleteAccount.callCount, 1)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('bounce message, new verified account:', () => {
beforeEach(() => {
emailRecord.createdAt += 1
emailRecord.emailVerified = true
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'en-gb',
'X-Flow-Begin-Time': now - 1,
'X-Flow-Id': 'foo',
'X-Template-Name': 'bar',
'X-Template-Version': 'baz',
}
},
bounce: {
bouncedRecipients: [
{ emailAddress: 'wibble@example.com' }
]
}
})
})
it('logged events', () => {
assert.equal(log.flowEvent.callCount, 1)
assert.equal(log.info.callCount, 1)
})
it('did not delete the account', () => {
assert.equal(db.accountRecord.callCount, 1)
assert.equal(db.deleteAccount.callCount, 0)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('delivery message, new unverified account:', () => {
beforeEach(() => {
emailRecord.createdAt += 1
return queue.on.args[0][1]({
del,
mail: {
headers: {
'Content-Language': 'en-gb',
'X-Flow-Begin-Time': now - 1,
'X-Flow-Id': 'foo',
'X-Template-Name': 'bar',
'X-Template-Version': 'baz',
}
},
delivery: {
recipients: [ 'wibble@example.com' ]
}
})
})
it('logged a flow event', () => {
assert.equal(log.flowEvent.callCount, 1)
const args = log.flowEvent.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
event: 'email.bar.delivered',
flow_id: 'foo',
flow_time: 1,
time: now
})
})
it('logged an email event', () => {
assert.equal(log.info.callCount, 1)
const args = log.info.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
domain: 'other',
flow_id: 'foo',
locale: 'en-gb',
op: 'emailEvent',
template: 'bar',
templateVersion: 'baz',
type: 'delivered'
})
})
it('did not delete the account', () => {
assert.equal(db.accountRecord.callCount, 0)
assert.equal(db.deleteAccount.callCount, 0)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
it('did not log an error', () => {
assert.equal(log.error.callCount, 0)
})
})
describe('missing headers:', () => {
beforeEach(() => {
return queue.on.args[0][1]({
del,
mail: {},
bounce: {
bouncedRecipients: [ { emailAddress: 'wibble@example.com' } ]
}
})
})
it('logged an error', () => {
assert.isAtLeast(log.error.callCount, 1)
const args = log.error.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
op: 'emailHeaders.missing',
origin: 'notification'
})
})
it('did not log a flow event', () => {
assert.equal(log.flowEvent.callCount, 0)
})
it('logged an email event', () => {
assert.equal(log.info.callCount, 1)
const args = log.info.args[0]
assert.lengthOf(args, 1)
assert.deepEqual(args[0], {
bounced: true,
domain: 'other',
locale: '',
op: 'emailEvent',
template: '',
templateVersion: '',
type: 'bounced'
})
})
it('did not delete the account', () => {
assert.equal(db.accountRecord.callCount, 1)
assert.equal(db.deleteAccount.callCount, 0)
})
it('called message.del', () => {
assert.equal(del.callCount, 1)
})
})
})