82 строки
2.2 KiB
JavaScript
82 строки
2.2 KiB
JavaScript
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
'use strict'
|
|
|
|
var AWS = require('aws-sdk')
|
|
var inherits = require('util').inherits
|
|
var EventEmitter = require('events').EventEmitter
|
|
|
|
module.exports = function (log) {
|
|
|
|
function SQSReceiver(region, urls) {
|
|
this.sqs = new AWS.SQS({ region : region })
|
|
this.queueUrls = urls || []
|
|
EventEmitter.call(this)
|
|
}
|
|
inherits(SQSReceiver, EventEmitter)
|
|
|
|
function checkDeleteError(err) {
|
|
if (err) {
|
|
log.error('deleteMessage', { err: err })
|
|
}
|
|
}
|
|
|
|
SQSReceiver.prototype.fetch = function (url) {
|
|
var errTimer = null
|
|
this.sqs.receiveMessage(
|
|
{
|
|
QueueUrl: url,
|
|
AttributeNames: [],
|
|
MaxNumberOfMessages: 10,
|
|
WaitTimeSeconds: 20
|
|
},
|
|
function (err, data) {
|
|
if (err) {
|
|
log.error('fetch', { url: url, err: err })
|
|
if (! errTimer) {
|
|
// unacceptable! this aws lib will call the callback
|
|
// more than once with different errors. ಠ_ಠ
|
|
errTimer = setTimeout(this.fetch.bind(this, url), 2000)
|
|
}
|
|
return
|
|
}
|
|
function deleteMessage(message) {
|
|
this.sqs.deleteMessage(
|
|
{
|
|
QueueUrl: url,
|
|
ReceiptHandle: message.ReceiptHandle
|
|
},
|
|
checkDeleteError
|
|
)
|
|
}
|
|
data.Messages = data.Messages || []
|
|
for (var i = 0; i < data.Messages.length; i++) {
|
|
var msg = data.Messages[i]
|
|
var deleteFromQueue = deleteMessage.bind(this, msg)
|
|
try {
|
|
var body = JSON.parse(msg.Body)
|
|
var message = JSON.parse(body.Message)
|
|
message.del = deleteFromQueue
|
|
this.emit('data', message)
|
|
}
|
|
catch (e) {
|
|
log.error('fetch', { url: url, err: e })
|
|
deleteFromQueue()
|
|
}
|
|
}
|
|
this.fetch(url)
|
|
}.bind(this)
|
|
)
|
|
}
|
|
|
|
SQSReceiver.prototype.start = function () {
|
|
for (var i = 0; i < this.queueUrls.length; i++) {
|
|
this.fetch(this.queueUrls[i])
|
|
}
|
|
}
|
|
|
|
return SQSReceiver
|
|
}
|