chore(webhook-service): refactor multiregion

This commit is contained in:
Alessandro Magionami 2024-10-22 16:54:25 +02:00
Родитель 89882c4fd6
Коммит c1f47828f6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: EC367516F896CBA4
1 изменённых файлов: 102 добавлений и 91 удалений

Просмотреть файл

@ -12,10 +12,12 @@ const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query'
const { makeNetworkRequest } = require('./webhookCaller')
const WebhookError = require('./errors')
async function startTask() {
const { rows } = await knex.raw(`
const startTaskFactory =
({ db }) =>
async () => {
const { rows } = await db.raw(`
UPDATE webhooks_events
SET
SET
"status" = 1,
"lastUpdate" = NOW()
FROM (
@ -27,15 +29,17 @@ async function startTask() {
WHERE webhooks_events."id" = task."id"
RETURNING webhooks_events."id"
`)
return rows[0]
}
return rows[0]
}
async function doTask(task) {
let boundLogger = logger.child({ taskId: task.id })
try {
const { rows } = await knex.raw(
`
SELECT
const doTaskFactory =
({ db }) =>
async (task) => {
let boundLogger = logger.child({ taskId: task.id })
try {
const { rows } = await db.raw(
`
SELECT
ev.payload as evt,
cnf.id as wh_id, cnf.url as wh_url, cnf.secret as wh_secret, cnf.enabled as wh_enabled
FROM webhooks_events ev
@ -43,49 +47,49 @@ async function doTask(task) {
WHERE ev.id = ?
LIMIT 1
`,
[task.id]
)
const info = rows[0]
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
boundLogger = boundLogger.child({ webhookId: info.wh_id })
const fullPayload = JSON.parse(info.evt)
boundLogger = boundLogger.child({
streamId: fullPayload.streamId,
eventName: fullPayload.event.event_name
})
const postData = { payload: fullPayload }
const signature = crypto
.createHmac('sha256', info.wh_secret || '')
.update(JSON.stringify(postData))
.digest('hex')
const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature }
boundLogger.info('Calling webhook.')
const result = await makeNetworkRequest({
url: info.wh_url,
data: postData,
headersData: postHeaders,
logger: boundLogger
})
boundLogger.info({ result }, `Received response from webhook.`)
if (!result.success) {
throw new WebhookError(
result.error,
'Calling webhook was unsuccessful.',
result.responseCode,
result.responseBody
[task.id]
)
}
const info = rows[0]
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
boundLogger = boundLogger.child({ webhookId: info.wh_id })
await knex.raw(
`
const fullPayload = JSON.parse(info.evt)
boundLogger = boundLogger.child({
streamId: fullPayload.streamId,
eventName: fullPayload.event.event_name
})
const postData = { payload: fullPayload }
const signature = crypto
.createHmac('sha256', info.wh_secret || '')
.update(JSON.stringify(postData))
.digest('hex')
const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature }
boundLogger.info('Calling webhook.')
const result = await makeNetworkRequest({
url: info.wh_url,
data: postData,
headersData: postHeaders,
logger: boundLogger
})
boundLogger.info({ result }, `Received response from webhook.`)
if (!result.success) {
throw new WebhookError(
result.error,
'Calling webhook was unsuccessful.',
result.responseCode,
result.responseBody
)
}
await db.raw(
`
UPDATE webhooks_events
SET
"status" = 2,
@ -93,18 +97,18 @@ async function doTask(task) {
"statusInfo" = 'Webhook called'
WHERE "id" = ?
`,
[task.id]
)
} catch (err) {
switch (err.constructor) {
case WebhookError:
boundLogger.warn({ err }, 'Failed to trigger webhook event.')
break
default:
boundLogger.error(err, 'Failed to trigger webhook event.')
}
await knex.raw(
`
[task.id]
)
} catch (err) {
switch (err.constructor) {
case WebhookError:
boundLogger.warn({ err }, 'Failed to trigger webhook event.')
break
default:
boundLogger.error(err, 'Failed to trigger webhook event.')
}
await db.raw(
`
UPDATE webhooks_events
SET
"status" = 3,
@ -112,41 +116,43 @@ async function doTask(task) {
"statusInfo" = ?
WHERE "id" = ?
`,
[err.toString(), task.id]
)
metrics.metricOperationErrors.labels('webhook').inc()
}
}
async function tick() {
if (shouldExit) {
process.exit(0)
[err.toString(), task.id]
)
metrics.metricOperationErrors.labels('webhook').inc()
}
}
try {
const task = await startTask()
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
if (!task) {
setTimeout(tick, 1000)
return
const tickFactory =
({ doTask, startTask, tick }) =>
async () => {
if (shouldExit) {
process.exit(0)
}
const metricDurationEnd = metrics.metricDuration.startTimer()
try {
const task = await startTask()
await doTask(task)
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
metricDurationEnd({ op: 'webhook' })
if (!task) {
setTimeout(tick, 1000)
return
}
// Check for another task very soon
setTimeout(tick, 10)
} catch (err) {
metrics.metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
setTimeout(tick, 5000)
const metricDurationEnd = metrics.metricDuration.startTimer()
await doTask(task)
metricDurationEnd({ op: 'webhook' })
// Check for another task very soon
setTimeout(tick, 10)
} catch (err) {
metrics.metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
setTimeout(tick, 5000)
}
}
}
async function main() {
logger.info('Starting Webhook Service...')
@ -157,6 +163,11 @@ async function main() {
})
metrics.initPrometheusMetrics()
const tick = tickFactory({
doTask: doTaskFactory({ db: knex }),
startTask: startTaskFactory({ db: knex }),
tick: (...args) => tick(...args)
})
tick()
}