diff --git a/src/hg.js b/src/hg.js index 40bbc3b..a90c7d5 100644 --- a/src/hg.js +++ b/src/hg.js @@ -6,7 +6,6 @@ import request from "request-json"; import parse from "peoplestring-parse"; const HG_HOST = "https://hg.mozilla.org/"; -const HG_REPO = "projects/nss"; async function jsonRequest(url) { let client = request.createClient(HG_HOST); @@ -22,10 +21,10 @@ async function jsonRequest(url) { }); } -async function shortenRevision(revision) { +async function shortenRevision(revision, repo) { async function tryShortenedRevisionHash(length) { let shortened = revision.slice(0, length); - let json = jsonRequest(HG_REPO + "/json-pushes?changeset=" + shortened); + let json = jsonRequest(repo + "/json-pushes?changeset=" + shortened); if (typeof(json) == "object") { return shortened; @@ -37,8 +36,8 @@ async function shortenRevision(revision) { return tryShortenedRevisionHash(12); }; -async function fetchChangesets(revision) { - let json = await jsonRequest(HG_REPO + "/json-pushes?full&changeset=" + revision); +async function fetchChangesets(revision, repo) { + let json = await jsonRequest(repo + "/json-pushes?full&changeset=" + revision); let id = Object.keys(json)[0]; let changesets = json[id].changesets; @@ -47,8 +46,8 @@ async function fetchChangesets(revision) { changeset.author = parse(changeset.author); changeset.desc = changeset.desc.split("\n")[0]; - let short_rev = await shortenRevision(changeset.node); - changeset.href = HG_HOST + HG_REPO + "/rev/" + short_rev; + let short_rev = await shortenRevision(changeset.node, repo); + changeset.href = HG_HOST + repo + "/rev/" + short_rev; } return changesets; diff --git a/src/index.js b/src/index.js index d4ff8eb..29bcf85 100644 --- a/src/index.js +++ b/src/index.js @@ -47,27 +47,17 @@ function parseRevision(routes) { return matches[1]; } -tcc.onTaskDefined(async function (msg) { - let revision = parseRevision(msg.routes); - if (!revision) { - return; - } +tcc.onRevisionPushed(async function (msg) { + let {heads} = msg.payload.payload; - let taskId = msg.payload.status.taskId; - let task = await tcc.fetchTask(taskId); - let th = task.extra.treeherder; + for (let head of heads) { + let changesets = await hg.fetchChangesets(head, msg.routingKey); - // Check for decision tasks. - if (th.symbol != "D") { - return; - } - - let level = colors.blue("push"); - let changesets = await hg.fetchChangesets(revision); - - for (let changeset of changesets) { - let branch = changeset.branch == "default" ? "" : colors.gray(` ${changeset.branch}`); - irc.say(`[${level}${branch}] ${changeset.href} — ${changeset.author.name} — ${changeset.desc}`); + for (let changeset of changesets) { + let level = colors.blue("push"); + let branch = changeset.branch == "default" ? "" : colors.gray(` ${changeset.branch}`); + irc.say(`[${level}${branch}] ${changeset.href} — ${changeset.author.name} — ${changeset.desc}`); + } } }); @@ -98,7 +88,7 @@ tcc.onTaskFailed(async function (msg) { let platform = PLATFORMS[th.build.platform] || th.build.platform; // Fetch changesets. - let changesets = await hg.fetchChangesets(revision); + let changesets = await hg.fetchChangesets(revision, "projects/nss"); let authors = changesets.map(changeset => changeset.author); let url = TASK_INSPECTOR_URL + taskId; let level = colors.red("failure"); diff --git a/src/tcc.js b/src/tcc.js index f5996d9..d7cf80d 100644 --- a/src/tcc.js +++ b/src/tcc.js @@ -4,26 +4,37 @@ import taskcluster from "taskcluster-client"; +const HG_EXCHANGE = "exchange/hgpushes/v1"; + let queueEvents = new taskcluster.QueueEvents(); let queue = new taskcluster.Queue(); -function onTaskDefined(callback) { - onTaskEvent("taskDefined", callback); -} - -function onTaskFailed(callback) { - onTaskEvent("taskFailed", callback); -} - -function onTaskEvent(type, callback) { - let listener = new taskcluster.PulseListener({ +function createListener() { + return new taskcluster.PulseListener({ credentials: { username: process.env.PG_USERNAME, password: process.env.PG_PASSWORD } }); +} - listener.bind(queueEvents[type]({ +function onRevisionPushed(callback) { + let listener = createListener(); + + listener.bind({exchange: HG_EXCHANGE, routingKeyPattern: "projects/nspr"}); + listener.bind({exchange: HG_EXCHANGE, routingKeyPattern: "projects/nss"}); + + listener.on("message", callback); + + listener.connect().then(() => { + return listener.resume(); + }); +} + +function onTaskFailed(callback) { + let listener = createListener(); + + listener.bind(queueEvents.taskFailed({ provisionerId: "aws-provisioner-v1", workerType: "hg-worker" })); @@ -39,6 +50,6 @@ async function fetchTask(taskId) { return queue.task(taskId); } -module.exports.onTaskDefined = onTaskDefined; +module.exports.onRevisionPushed = onRevisionPushed; module.exports.onTaskFailed = onTaskFailed; module.exports.fetchTask = fetchTask;