diff --git a/functions/processQueueItem.js b/functions/processQueueItem.js index 4a0bf53..617cc48 100644 --- a/functions/processQueueItem.js +++ b/functions/processQueueItem.js @@ -10,6 +10,8 @@ const Sentry = require("../lib/sentry"); const Metrics = require("../lib/metrics"); const { wait, epochNow } = require("../lib/utils.js"); +const Raven = Sentry(); + exports.handler = async function(event = {}, context = {}) { const { Records } = event; const log = require("../lib/logging")({ @@ -90,8 +92,6 @@ exports.handleOne = async function(event, context) { EMAIL_EXPIRES, } = process.env; - const Raven = Sentry(); - log.verbose("env", { HITRATE_TABLE, Bucket, @@ -132,6 +132,20 @@ exports.handleOne = async function(event, context) { log.info("processing", { id }); + const handleError = async (err, logType, extra = {}, isDone = true) => { + Raven.captureException(err); + metricsPing.is_error = true; + log.error(logType, Object.assign({ err }, extra)); + return isDone ? done() : Promise.resolve(); + }; + + const done = async () => { + const metricsResult = await Metrics.workerWorks(metricsPing); + log.verbose("metricsResult", { metricsResult }); + return id; + }; + + // Step #1: Handle rate limiting and pause if necessary try { // Pause if we're at the rate limit for current expiration window let rateLimited = false; @@ -168,7 +182,13 @@ exports.handleOne = async function(event, context) { .promise(); log.verbose("hitRatePutResult", { hitRatePutResult }); + } catch (err) { + return handleError(err, "hitRateError"); + } + // Step #2: Make a request to the upstream service + let upstreamServiceResponse, IsMatch; + try { const imageUrl = S3.getSignedUrl("getObject", { Bucket, Key: image, @@ -180,7 +200,7 @@ exports.handleOne = async function(event, context) { metricsPing.timing_sent = Date.now() - Date.parse(datestamp); const timingReceivedStart = Date.now(); - const upstreamServiceResponse = await request.post({ + upstreamServiceResponse = await request.post({ url: `${upstreamServiceUrl}?enhance`, headers: { "Content-Type": "application/json", @@ -195,20 +215,29 @@ exports.handleOne = async function(event, context) { metricsPing.timing_received = Date.now() - timingReceivedStart; metricsPing.photodna_tracking_id = upstreamServiceResponse.TrackingId; - log.verbose("upstreamServiceResponse", { upstreamServiceResponse }); - - const { IsMatch } = upstreamServiceResponse; + ({ IsMatch } = upstreamServiceResponse); metricsPing.is_match = IsMatch; - if (!IsMatch) { - // On negative match, clean up the image and request details. + log.verbose("upstreamServiceResponse", { upstreamServiceResponse }); + } catch (err) { + return handleError(err, "upstreamServiceError"); + } + + // Step #3: Handle the response from the upstream service + if (!IsMatch) { + try { + // Step #3a: On negative match, clean up the image and request details. const deleteResult = await Promise.all([ S3.deleteObject({ Bucket, Key: `${image}` }).promise(), S3.deleteObject({ Bucket, Key: `${image}-request.json` }).promise(), ]); log.verbose("deleteResult", { deleteResult }); - } else { - // On positive match, store the details of the match response. + } catch (err) { + return handleError(err, "deleteError"); + } + } else { + try { + // Step #3b: On positive match, store the details of the match response. const putResult = await S3.putObject({ Bucket, Key: `${image}-response.json`, @@ -226,8 +255,14 @@ exports.handleOne = async function(event, context) { }).promise(); log.verbose("putResult", { putResult }); + } catch (err) { + return handleError(err, "putError"); + } + } - // Send an email alert on positive match, if addresses are available. + // Step #4: Send an email alert on positive match. + if (IsMatch) { + try { const ToAddresses = []; if (positive_email) { ToAddresses.push(positive_email); @@ -291,15 +326,22 @@ exports.handleOne = async function(event, context) { log.verbose("emailResult", { emailResult }); log.info("sentEmail", { messageId: emailResult.MessageId }); } + } catch (err) { + // Do not bail out on an error from email, we can still send a callback + await handleError(err, "emailError", {}, false); } + } + // Step #5: Send a callback request to the client service + const callbackUrl = IsMatch ? positive_uri : negative_uri; + try { const timingSubmittedStart = Date.now(); const upstreamIsError = !upstreamServiceResponse || !upstreamServiceResponse.Status || upstreamServiceResponse.Status.Code !== 3000; const callbackResult = await request.post({ - url: IsMatch ? positive_uri : negative_uri, + url: callbackUrl, headers: { "Content-Type": "application/json", }, @@ -315,13 +357,8 @@ exports.handleOne = async function(event, context) { metricsPing.timing_submitted = Date.now() - timingSubmittedStart; log.verbose("callbackResult", { callbackResult }); } catch (err) { - Raven.captureException(err); - metricsPing.is_error = true; - log.error("callbackError", { err }); - throw err; + return handleError(err, "callbackError", { callbackUrl }); } - const metricsResult = await Metrics.workerWorks(metricsPing); - log.verbose("metricsResult", { metricsResult }); - return id; + return done(); };