зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1914331 - Support PPA-over-DAP-over-OHTTP r=Gijs,dmueller
- Update the DAPTelemetrySender to provision DAP HPKE keys from prefs and allow Nimbus to update them if a key-rotation is needed. This removes fetching of such keys from the '/hpke_config' endpoints. The DAP preferences are also renamed slightly for consistency. - The DAPTelemetrySender can optionally take an OHTTP configuration which can be used to upload reports. - The PrivateAttributionService component now requires OHTTP for DAP uploads and uses the servers that the Shopping component configures. - Add support to ObliviousHTTP.sys.mjs to allow uploading from ArrayBuffers and also to support the PUT verb. - The DAPTelemetrySender interface is updated to pass extra options in an options bag parameter. Differential Revision: https://phabricator.services.mozilla.com/D220738
This commit is contained in:
Родитель
18e1bd6a28
Коммит
eec1bf3d5f
|
@ -11,6 +11,8 @@ import { AppConstants } from "resource://gre/modules/AppConstants.sys.mjs";
|
|||
ChromeUtils.defineESModuleGetters(lazy, {
|
||||
IndexedDB: "resource://gre/modules/IndexedDB.sys.mjs",
|
||||
DAPTelemetrySender: "resource://gre/modules/DAPTelemetrySender.sys.mjs",
|
||||
HPKEConfigManager: "resource://gre/modules/HPKEConfigManager.sys.mjs",
|
||||
setTimeout: "resource://gre/modules/Timer.sys.mjs",
|
||||
});
|
||||
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
|
@ -27,6 +29,17 @@ XPCOMUtils.defineLazyPreferenceGetter(
|
|||
true
|
||||
);
|
||||
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
lazy,
|
||||
"gOhttpRelayUrl",
|
||||
"toolkit.shopping.ohttpRelayURL"
|
||||
);
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
lazy,
|
||||
"gOhttpGatewayKeyUrl",
|
||||
"toolkit.shopping.ohttpConfigURL"
|
||||
);
|
||||
|
||||
const MAX_CONVERSIONS = 2;
|
||||
const DAY_IN_MILLI = 1000 * 60 * 60 * 24;
|
||||
const CONVERSION_RESET_MILLI = 7 * DAY_IN_MILLI;
|
||||
|
@ -36,10 +49,16 @@ const DAP_TIMEOUT_MILLI = 30000;
|
|||
*
|
||||
*/
|
||||
export class PrivateAttributionService {
|
||||
constructor({ dapTelemetrySender, dateProvider, testForceEnabled } = {}) {
|
||||
constructor({
|
||||
dapTelemetrySender,
|
||||
dateProvider,
|
||||
testForceEnabled,
|
||||
testDapOptions,
|
||||
} = {}) {
|
||||
this._dapTelemetrySender = dapTelemetrySender;
|
||||
this._dateProvider = dateProvider ?? Date;
|
||||
this._testForceEnabled = testForceEnabled;
|
||||
this._testDapOptions = testDapOptions;
|
||||
|
||||
this.dbName = "PrivateAttribution";
|
||||
this.impressionStoreName = "impressions";
|
||||
|
@ -260,10 +279,35 @@ export class PrivateAttributionService {
|
|||
const measurement = new Array(size).fill(0);
|
||||
measurement[index] = value;
|
||||
|
||||
let options = {
|
||||
timeout: DAP_TIMEOUT_MILLI,
|
||||
ohttp_relay: lazy.gOhttpRelayUrl,
|
||||
...this._testDapOptions,
|
||||
};
|
||||
|
||||
if (options.ohttp_relay) {
|
||||
// Fetch the OHTTP-Gateway-HPKE key if not provided yet.
|
||||
if (!options.ohttp_hpke) {
|
||||
const controller = new AbortController();
|
||||
lazy.setTimeout(() => controller.abort(), DAP_TIMEOUT_MILLI);
|
||||
|
||||
options.ohttp_hpke = await lazy.HPKEConfigManager.get(
|
||||
lazy.gOhttpGatewayKeyUrl,
|
||||
{
|
||||
maxAge: DAY_IN_MILLI,
|
||||
abortSignal: controller.signal,
|
||||
}
|
||||
);
|
||||
}
|
||||
} else if (!this._testForceEnabled) {
|
||||
// Except for testing, do no allow PPA to bypass OHTTP.
|
||||
throw new Error("PPA requires an OHTTP relay for submission");
|
||||
}
|
||||
|
||||
await this.dapTelemetrySender.sendDAPMeasurement(
|
||||
task,
|
||||
measurement,
|
||||
DAP_TIMEOUT_MILLI
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,14 +21,8 @@ const BinaryInputStream = Components.Constructor(
|
|||
"setInputStream"
|
||||
);
|
||||
|
||||
const BinaryOutputStream = Components.Constructor(
|
||||
"@mozilla.org/binaryoutputstream;1",
|
||||
"nsIBinaryOutputStream",
|
||||
"setOutputStream"
|
||||
);
|
||||
|
||||
const PREF_LEADER = "toolkit.telemetry.dap_leader";
|
||||
const PREF_HELPER = "toolkit.telemetry.dap_helper";
|
||||
const PREF_LEADER = "toolkit.telemetry.dap.leader.url";
|
||||
const PREF_HELPER = "toolkit.telemetry.dap.helper.url";
|
||||
const TASK_ID = "DSZGMFh26hBYXNaKvhL_N4AHA3P5lDn19on1vFPBxJM";
|
||||
const MAX_CONVERSIONS = 2;
|
||||
const DAY_IN_MILLI = 1000 * 60 * 60 * 24;
|
||||
|
@ -54,7 +48,7 @@ class MockDAPTelemetrySender {
|
|||
this.receivedMeasurements = [];
|
||||
}
|
||||
|
||||
async sendDAPMeasurement(task, measurement, timeout) {
|
||||
async sendDAPMeasurement(task, measurement, { timeout }) {
|
||||
this.receivedMeasurements.push({
|
||||
task,
|
||||
measurement,
|
||||
|
@ -68,14 +62,6 @@ class MockServer {
|
|||
this.receivedReports = [];
|
||||
|
||||
const server = new HttpServer();
|
||||
server.registerPathHandler(
|
||||
"/leader_endpoint/hpke_config",
|
||||
this.hpkeConfigHandler
|
||||
);
|
||||
server.registerPathHandler(
|
||||
"/helper_endpoint/hpke_config",
|
||||
this.hpkeConfigHandler
|
||||
);
|
||||
|
||||
server.registerPrefixHandler(
|
||||
"/leader_endpoint/tasks/",
|
||||
|
@ -105,26 +91,6 @@ class MockServer {
|
|||
await this._server.stop();
|
||||
}
|
||||
|
||||
hpkeConfigHandler(request, response) {
|
||||
let config_bytes;
|
||||
if (request.path.startsWith("/leader")) {
|
||||
config_bytes = new Uint8Array([
|
||||
0, 41, 47, 0, 32, 0, 1, 0, 1, 0, 32, 11, 33, 206, 33, 131, 56, 220, 82,
|
||||
153, 110, 228, 200, 53, 98, 210, 38, 177, 197, 252, 198, 36, 201, 86,
|
||||
121, 169, 238, 220, 34, 143, 112, 177, 10,
|
||||
]);
|
||||
} else {
|
||||
config_bytes = new Uint8Array([
|
||||
0, 41, 42, 0, 32, 0, 1, 0, 1, 0, 32, 28, 62, 242, 195, 117, 7, 173, 149,
|
||||
250, 15, 139, 178, 86, 241, 117, 143, 75, 26, 57, 60, 88, 130, 199, 175,
|
||||
195, 9, 241, 130, 61, 47, 215, 101,
|
||||
]);
|
||||
}
|
||||
response.setHeader("Content-Type", "application/dap-hpke-config");
|
||||
let bos = new BinaryOutputStream(response.bodyOutputStream);
|
||||
bos.writeByteArray(config_bytes);
|
||||
}
|
||||
|
||||
uploadHandler(request, response) {
|
||||
let body = new BinaryInputStream(request.bodyInputStream);
|
||||
|
||||
|
@ -166,6 +132,7 @@ add_task(async function testSuccessfulConversion() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source.test";
|
||||
|
@ -220,6 +187,7 @@ add_task(async function testImpressionsOnMultipleSites() {
|
|||
dapTelemetrySender: mockSender,
|
||||
dateProvider: mockDateProvider,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost1 = "source-multiple-1.test";
|
||||
|
@ -308,6 +276,7 @@ add_task(async function testConversionWithoutImpression() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-no-impression.test";
|
||||
|
@ -337,6 +306,7 @@ add_task(async function testSelectionByInteractionType() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-by-type.test";
|
||||
|
@ -388,6 +358,7 @@ add_task(async function testSelectionBySourceSite() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost1 = "source-by-site-1.test";
|
||||
|
@ -455,6 +426,7 @@ add_task(async function testSelectionByAdIdentifier() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-by-ad-id.test";
|
||||
|
@ -524,6 +496,7 @@ add_task(async function testExpiredImpressions() {
|
|||
dapTelemetrySender: mockSender,
|
||||
dateProvider: mockDateProvider,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-expired.test";
|
||||
|
@ -567,6 +540,7 @@ add_task(async function testConversionBudget() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-budget.test";
|
||||
|
@ -622,6 +596,7 @@ add_task(async function testHistogramSize() {
|
|||
const privateAttribution = new PrivateAttributionService({
|
||||
dapTelemetrySender: mockSender,
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-histogram.test";
|
||||
|
@ -658,12 +633,161 @@ add_task(async function testHistogramSize() {
|
|||
add_task(async function testWithRealDAPSender() {
|
||||
Services.fog.testResetFOG();
|
||||
|
||||
// Omit mocking DAP telemetry sender in this test to defend against mock sender getting out of sync
|
||||
// Omit mocking DAP telemetry sender in this test to defend against mock
|
||||
// sender getting out of sync
|
||||
const mockServer = new MockServer();
|
||||
mockServer.start();
|
||||
|
||||
const privateAttribution = new PrivateAttributionService({
|
||||
testForceEnabled: true,
|
||||
testDapOptions: { ohttp_relay: null },
|
||||
});
|
||||
|
||||
const sourceHost = "source-telemetry.test";
|
||||
const targetHost = "target-telemetry.test";
|
||||
const adIdentifier = "ad_identifier_telemetry";
|
||||
const adIndex = 4;
|
||||
|
||||
await privateAttribution.onAttributionEvent(
|
||||
sourceHost,
|
||||
"view",
|
||||
adIndex,
|
||||
adIdentifier,
|
||||
targetHost
|
||||
);
|
||||
|
||||
await privateAttribution.onAttributionConversion(
|
||||
targetHost,
|
||||
TASK_ID,
|
||||
HISTOGRAM_SIZE,
|
||||
LOOKBACK_DAYS,
|
||||
"view",
|
||||
[adIdentifier],
|
||||
[sourceHost]
|
||||
);
|
||||
|
||||
await mockServer.stop();
|
||||
|
||||
Assert.equal(mockServer.receivedReports.length, 1);
|
||||
|
||||
const expectedReport = {
|
||||
contentType: "application/dap-report",
|
||||
size: 1318,
|
||||
};
|
||||
|
||||
const receivedReport = mockServer.receivedReports.pop();
|
||||
Assert.deepEqual(receivedReport, expectedReport);
|
||||
});
|
||||
|
||||
function BinaryHttpResponse(status, headerNames, headerValues, content) {
|
||||
this.status = status;
|
||||
this.headerNames = headerNames;
|
||||
this.headerValues = headerValues;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
BinaryHttpResponse.prototype = {
|
||||
QueryInterface: ChromeUtils.generateQI(["nsIBinaryHttpResponse"]),
|
||||
};
|
||||
|
||||
class MockOHTTPBackend {
|
||||
HOST;
|
||||
#ohttpServer;
|
||||
|
||||
constructor() {
|
||||
let ohttp = Cc["@mozilla.org/network/oblivious-http;1"].getService(
|
||||
Ci.nsIObliviousHttp
|
||||
);
|
||||
this.#ohttpServer = ohttp.server();
|
||||
|
||||
let httpServer = new HttpServer(); // This is the OHTTP relay endpoint.
|
||||
httpServer.registerPathHandler(
|
||||
new URL(this.getRelayAddress()).pathname,
|
||||
this.handle_relay_request.bind(this)
|
||||
);
|
||||
httpServer.start(-1);
|
||||
this.HOST = `localhost:${httpServer.identity.primaryPort}`;
|
||||
registerCleanupFunction(() => {
|
||||
return new Promise(resolve => {
|
||||
httpServer.stop(resolve);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getRelayAddress() {
|
||||
return `http://${this.HOST}/relay/`;
|
||||
}
|
||||
|
||||
getOHTTPConfig() {
|
||||
return this.#ohttpServer.encodedConfig;
|
||||
}
|
||||
|
||||
async handle_relay_request(request, response) {
|
||||
let inputStream = Cc["@mozilla.org/binaryinputstream;1"].createInstance(
|
||||
Ci.nsIBinaryInputStream
|
||||
);
|
||||
inputStream.setInputStream(request.bodyInputStream);
|
||||
let requestBody = inputStream.readByteArray(inputStream.available());
|
||||
let ohttpRequest = this.#ohttpServer.decapsulate(requestBody);
|
||||
let bhttp = Cc["@mozilla.org/network/binary-http;1"].getService(
|
||||
Ci.nsIBinaryHttp
|
||||
);
|
||||
let decodedRequest = bhttp.decodeRequest(ohttpRequest.request);
|
||||
response.processAsync();
|
||||
let real_destination =
|
||||
decodedRequest.scheme +
|
||||
"://" +
|
||||
decodedRequest.authority +
|
||||
decodedRequest.path;
|
||||
let innerBody = new Uint8Array(decodedRequest.content);
|
||||
let innerRequestHeaders = Object.fromEntries(
|
||||
decodedRequest.headerNames.map((name, index) => {
|
||||
return [name, decodedRequest.headerValues[index]];
|
||||
})
|
||||
);
|
||||
let innerResponse = await fetch(real_destination, {
|
||||
method: decodedRequest.method,
|
||||
headers: innerRequestHeaders,
|
||||
body: innerBody,
|
||||
});
|
||||
let bytes = new Uint8Array(await innerResponse.arrayBuffer());
|
||||
let binaryResponse = new BinaryHttpResponse(
|
||||
innerResponse.status,
|
||||
["Content-Type"],
|
||||
["application/octet-stream"],
|
||||
bytes
|
||||
);
|
||||
let encResponse = ohttpRequest.encapsulate(
|
||||
bhttp.encodeResponse(binaryResponse)
|
||||
);
|
||||
response.setStatusLine(request.httpVersion, 200, "OK");
|
||||
response.setHeader("Content-Type", "message/ohttp-res", false);
|
||||
|
||||
let bstream = Cc["@mozilla.org/binaryoutputstream;1"].createInstance(
|
||||
Ci.nsIBinaryOutputStream
|
||||
);
|
||||
bstream.setOutputStream(response.bodyOutputStream);
|
||||
bstream.writeByteArray(encResponse);
|
||||
response.finish();
|
||||
}
|
||||
}
|
||||
|
||||
add_task(async function testWithRealDAPSenderAndOHTTP() {
|
||||
Services.fog.testResetFOG();
|
||||
|
||||
const mockServer = new MockServer();
|
||||
mockServer.start();
|
||||
|
||||
let ohttpBackend = new MockOHTTPBackend();
|
||||
|
||||
let testDapOptions = {
|
||||
ohttp_relay: ohttpBackend.getRelayAddress(),
|
||||
ohttp_hpke: ohttpBackend.getOHTTPConfig(),
|
||||
};
|
||||
|
||||
const privateAttribution = new PrivateAttributionService({
|
||||
testForceEnabled: true,
|
||||
testDapOptions,
|
||||
});
|
||||
|
||||
const sourceHost = "source-telemetry.test";
|
||||
|
|
|
@ -565,6 +565,7 @@ pref("toolkit.telemetry.unified", true);
|
|||
|
||||
// DAP related preferences
|
||||
pref("toolkit.telemetry.dap_enabled", false);
|
||||
pref("toolkit.telemetry.dap.logLevel", "Warn");
|
||||
// Verification tasks
|
||||
pref("toolkit.telemetry.dap_task1_enabled", false);
|
||||
pref("toolkit.telemetry.dap_task1_taskid", "");
|
||||
|
@ -573,14 +574,14 @@ pref("toolkit.telemetry.dap_visit_counting_enabled", false);
|
|||
// Note: format of patterns is "<proto>://<host>/<path>"
|
||||
// See https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/Match_patterns
|
||||
pref("toolkit.telemetry.dap_visit_counting_experiment_list", "[]");
|
||||
// Leader endpoint for the DAP protocol
|
||||
pref("toolkit.telemetry.dap_leader", "https://dap-09-3.api.divviup.org/");
|
||||
// Not used for anything. Just additional information.
|
||||
pref("toolkit.telemetry.dap_leader_owner", "ISRG");
|
||||
// Second DAP server. Only two are currently supported.
|
||||
pref("toolkit.telemetry.dap_helper", "https://dap.services.mozilla.com");
|
||||
pref("toolkit.telemetry.dap_helper_owner", "Mozilla");
|
||||
pref("toolkit.telemetry.dap.logLevel", "Warn");
|
||||
// DAP protocol Leader endpoint. Operated by DivviUp/ISRG.
|
||||
// - HPKE key is base64url-encoded response of the /hpke_config path on server.
|
||||
pref("toolkit.telemetry.dap.leader.url", "https://dap-09-3.api.divviup.org");
|
||||
pref("toolkit.telemetry.dap.leader.hpke", "ACkAACAAAQABACDk8wgwe2-TqHyaL74uqjVWMcF1zi9pxiwQhu4aPwncYw");
|
||||
// DAP protocol Helper endpoint. Operated by Mozilla.
|
||||
// - HPKE key is base64url-encoded response of the /hpke_config path on server.
|
||||
pref("toolkit.telemetry.dap.helper.url", "https://dap.services.mozilla.com");
|
||||
pref("toolkit.telemetry.dap.helper.hpke", "ACkAACAAAQABACAucqWdIQRN6BxumPBRXIlg2JsxcznwWX7vyqzM3cjuQA");
|
||||
|
||||
// Controls telemetry logs for the Translations feature throughout Firefox.
|
||||
pref("toolkit.telemetry.translations.logLevel", "Error");
|
||||
|
|
|
@ -5,6 +5,17 @@
|
|||
let knownConfigs = new Map();
|
||||
|
||||
export class HPKEConfigManager {
|
||||
/**
|
||||
* Decodes a base64url-encoded key string.
|
||||
* @param {string} aBase64Key
|
||||
* @returns {Uint8Array}
|
||||
*/
|
||||
static decodeKey(aBase64Key) {
|
||||
return new Uint8Array(
|
||||
ChromeUtils.base64URLDecode(aBase64Key, { padding: "ignore" })
|
||||
);
|
||||
}
|
||||
|
||||
static async get(aURL, aOptions = {}) {
|
||||
// If we're in a child, forward to the parent.
|
||||
let { remoteType } = Services.appinfo;
|
||||
|
|
|
@ -28,6 +28,12 @@ const StringInputStream = Components.Constructor(
|
|||
"setData"
|
||||
);
|
||||
|
||||
const ArrayBufferInputStream = Components.Constructor(
|
||||
"@mozilla.org/io/arraybuffer-input-stream;1",
|
||||
"nsIArrayBufferInputStream",
|
||||
"setData"
|
||||
);
|
||||
|
||||
function readFromStream(stream, count) {
|
||||
let binaryStream = new BinaryInputStream(stream);
|
||||
let arrayBuffer = new ArrayBuffer(count);
|
||||
|
@ -66,9 +72,9 @@ export class ObliviousHTTP {
|
|||
* The URL of the request we want to make over the relay.
|
||||
* @param {object} options
|
||||
* @param {string} options.method
|
||||
* The HTTP method to use for the inner request. Only GET and POST
|
||||
* are supported right now.
|
||||
* @param {string} options.body
|
||||
* The HTTP method to use for the inner request. Only GET, POST, and PUT are
|
||||
* supported right now.
|
||||
* @param {string|ArrayBuffer} options.body
|
||||
* The body content to send over the request.
|
||||
* @param {object} options.headers
|
||||
* The request headers to set. Each property of the object represents
|
||||
|
@ -91,7 +97,7 @@ export class ObliviousHTTP {
|
|||
obliviousHTTPRelay,
|
||||
config,
|
||||
requestURL,
|
||||
{ method, body, headers, signal, abortCallback } = {}
|
||||
{ method = "GET", body, headers, signal, abortCallback } = {}
|
||||
) {
|
||||
let relayURI = Services.io.newURI(obliviousHTTPRelay);
|
||||
let requestURI = Services.io.newURI(requestURL);
|
||||
|
@ -99,18 +105,27 @@ export class ObliviousHTTP {
|
|||
.newChannel(relayURI, requestURI, config)
|
||||
.QueryInterface(Ci.nsIHttpChannel);
|
||||
|
||||
if (method == "POST") {
|
||||
if (method == "POST" || method == "PUT") {
|
||||
let uploadChannel = obliviousHttpChannel.QueryInterface(
|
||||
Ci.nsIUploadChannel2
|
||||
);
|
||||
let bodyStream = new StringInputStream(body, body.length);
|
||||
let bodyStream;
|
||||
if (typeof body === "string") {
|
||||
bodyStream = new StringInputStream(body, body.length);
|
||||
} else if (body instanceof ArrayBuffer) {
|
||||
bodyStream = new ArrayBufferInputStream(body, 0, body.byteLength);
|
||||
} else {
|
||||
throw new Error("ohttpRequest got unexpected body payload type.");
|
||||
}
|
||||
uploadChannel.explicitSetUploadStream(
|
||||
bodyStream,
|
||||
null,
|
||||
-1,
|
||||
"POST",
|
||||
method,
|
||||
false
|
||||
);
|
||||
} else if (method != "GET") {
|
||||
throw new Error(`Unsupported HTTP verb ${method}`);
|
||||
}
|
||||
|
||||
for (let headerName of Object.keys(headers)) {
|
||||
|
|
|
@ -2323,6 +2323,36 @@ dapTelemetry:
|
|||
type: json
|
||||
description: A list of experiments with URLs for which we want to count visits.
|
||||
|
||||
dapAggregators:
|
||||
description: Aggregator server configuration to use for submitting DAP reports.
|
||||
owner: tcampbell@mozilla.com
|
||||
hasExposure: false
|
||||
variables:
|
||||
leader_url:
|
||||
description: The URL of the DAP Leader server where reports are submitted.
|
||||
type: string
|
||||
setPref:
|
||||
branch: default
|
||||
pref: "toolkit.telemetry.dap.leader.url"
|
||||
leader_hpke:
|
||||
description: The base64-encode HPKE key to encrypt leader shares with.
|
||||
type: string
|
||||
setPref:
|
||||
branch: default
|
||||
pref: "toolkit.telemetry.dap.leader.hpke"
|
||||
helper_url:
|
||||
description: The URL of the DAP Helper server.
|
||||
type: string
|
||||
setPref:
|
||||
branch: default
|
||||
pref: "toolkit.telemetry.dap.helper.url"
|
||||
helper_hpke:
|
||||
description: The base64-encode HPKE key to encrypt helper shares with.
|
||||
type: string
|
||||
setPref:
|
||||
branch: default
|
||||
pref: "toolkit.telemetry.dap.helper.hpke"
|
||||
|
||||
etpLevel2PBMPref:
|
||||
description: The pref that controls the ETP level 2 list in the private browsing mode
|
||||
owner: tihuang@mozilla.com
|
||||
|
|
|
@ -18,13 +18,24 @@ ChromeUtils.defineESModuleGetters(lazy, {
|
|||
NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs",
|
||||
DAPVisitCounter: "resource://gre/modules/DAPVisitCounter.sys.mjs",
|
||||
setTimeout: "resource://gre/modules/Timer.sys.mjs",
|
||||
ObliviousHTTP: "resource://gre/modules/ObliviousHTTP.sys.mjs",
|
||||
});
|
||||
|
||||
const PREF_LEADER = "toolkit.telemetry.dap_leader";
|
||||
const PREF_HELPER = "toolkit.telemetry.dap_helper";
|
||||
|
||||
XPCOMUtils.defineLazyPreferenceGetter(lazy, "LEADER", PREF_LEADER, undefined);
|
||||
XPCOMUtils.defineLazyPreferenceGetter(lazy, "HELPER", PREF_HELPER, undefined);
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
lazy,
|
||||
"gDapEndpoint",
|
||||
"toolkit.telemetry.dap.leader.url"
|
||||
);
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
lazy,
|
||||
"gLeaderHpke",
|
||||
"toolkit.telemetry.dap.leader.hpke"
|
||||
);
|
||||
XPCOMUtils.defineLazyPreferenceGetter(
|
||||
lazy,
|
||||
"gHelperHpke",
|
||||
"toolkit.telemetry.dap.helper.hpke"
|
||||
);
|
||||
|
||||
/**
|
||||
* The purpose of this singleton is to handle sending of DAP telemetry data.
|
||||
|
@ -58,16 +69,12 @@ export const DAPTelemetrySender = new (class {
|
|||
/**
|
||||
* @typedef {object} Task
|
||||
* @property {string} id - The task ID, base 64 encoded.
|
||||
* @property {string} leader_endpoint - Base URL for the leader.
|
||||
* @property {string} helper_endpoint - Base URL for the helper.
|
||||
* @property {number} time_precision - Timestamps (in s) are rounded to the nearest multiple of this.
|
||||
* @property {measurementtype} measurement_type - Defines measurements and aggregations used by this task. Effectively specifying the VDAF.
|
||||
*/
|
||||
let task = {
|
||||
// this is testing task 1
|
||||
id: task1_id,
|
||||
leader_endpoint: null,
|
||||
helper_endpoint: null,
|
||||
time_precision: 300,
|
||||
measurement_type: "vecu8",
|
||||
};
|
||||
|
@ -80,7 +87,7 @@ export const DAPTelemetrySender = new (class {
|
|||
|
||||
lazy.NimbusFeatures.dapTelemetry.onUpdate(async () => {
|
||||
if (typeof this.counters !== "undefined") {
|
||||
await this.sendTestReports(tasks, 30 * 1000, "nimbus-update");
|
||||
await this.sendTestReports(tasks, { reason: "nimbus-update" });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -88,7 +95,10 @@ export const DAPTelemetrySender = new (class {
|
|||
this._asyncShutdownBlocker = async () => {
|
||||
lazy.logConsole.debug(`Sending on shutdown.`);
|
||||
// Shorter timeout to prevent crashing due to blocking shutdown
|
||||
await this.sendTestReports(tasks, 2 * 1000, "shutdown");
|
||||
await this.sendTestReports(tasks, {
|
||||
timeout: 2_000,
|
||||
reason: "shutdown",
|
||||
});
|
||||
};
|
||||
|
||||
lazy.AsyncShutdown.quitApplicationGranted.addBlocker(
|
||||
|
@ -98,7 +108,7 @@ export const DAPTelemetrySender = new (class {
|
|||
}
|
||||
}
|
||||
|
||||
async sendTestReports(tasks, timeout) {
|
||||
async sendTestReports(tasks, options = {}) {
|
||||
for (let task of tasks) {
|
||||
let measurement;
|
||||
if (task.measurement_type == "u8") {
|
||||
|
@ -110,13 +120,13 @@ export const DAPTelemetrySender = new (class {
|
|||
measurement[19] += 1;
|
||||
}
|
||||
|
||||
await this.sendDAPMeasurement(task, measurement, timeout);
|
||||
await this.sendDAPMeasurement(task, measurement, options);
|
||||
}
|
||||
}
|
||||
|
||||
async timedSendTestReports(tasks) {
|
||||
lazy.logConsole.debug("Sending on timer.");
|
||||
await this.sendTestReports(tasks, 30 * 1000);
|
||||
await this.sendTestReports(tasks);
|
||||
lazy.setTimeout(
|
||||
() => this.timedSendTestReports(tasks),
|
||||
this.timeout_value()
|
||||
|
@ -128,38 +138,66 @@ export const DAPTelemetrySender = new (class {
|
|||
return MINUTE * (9 + Math.random() * 2); // 9 - 11 minutes
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal testing function to verify the DAP aggregator keys match current
|
||||
* values advertised by servers.
|
||||
*/
|
||||
async checkHpkeKeys() {
|
||||
async function check_key(url, expected) {
|
||||
let response = await fetch(url + "/hpke_config");
|
||||
let body = await response.arrayBuffer();
|
||||
let actual = ChromeUtils.base64URLEncode(body, { pad: false });
|
||||
if (actual != expected) {
|
||||
throw new Error(`HPKE for ${url} does not match`);
|
||||
}
|
||||
}
|
||||
await Promise.allSettled([
|
||||
await check_key(
|
||||
Services.prefs.getStringPref("toolkit.telemetry.dap.leader.url"),
|
||||
Services.prefs.getStringPref("toolkit.telemetry.dap.leader.hpke")
|
||||
),
|
||||
await check_key(
|
||||
Services.prefs.getStringPref("toolkit.telemetry.dap.helper.url"),
|
||||
Services.prefs.getStringPref("toolkit.telemetry.dap.helper.hpke")
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a DAP report for a specific task from a measurement and sends it.
|
||||
*
|
||||
* @param {Task} task
|
||||
* Definition of the task for which the measurement was taken.
|
||||
* @param {number} measurement
|
||||
* @param {number|Array<Number>} measurement
|
||||
* The measured value for which a report is generated.
|
||||
* @param {object} options
|
||||
* @param {number} options.timeout
|
||||
* The timeout for request in milliseconds. Defaults to 30s.
|
||||
* @param {string} options.reason
|
||||
* A string to indicate the reason for triggering a submission. This is
|
||||
* currently ignored and not recorded.
|
||||
* @param {string} options.ohttp_relay
|
||||
* @param {Uint8Array} options.ohttp_hpke
|
||||
* If an OHTTP relay is specified, the reports are uploaded over OHTTP.
|
||||
*/
|
||||
async sendDAPMeasurement(task, measurement, timeout) {
|
||||
task.leader_endpoint = lazy.LEADER;
|
||||
if (!task.leader_endpoint) {
|
||||
throw new Error(`Preference ${PREF_LEADER} not set`);
|
||||
}
|
||||
|
||||
task.helper_endpoint = lazy.HELPER;
|
||||
if (!task.helper_endpoint) {
|
||||
throw new Error(`Preference ${PREF_HELPER} not set`);
|
||||
}
|
||||
|
||||
async sendDAPMeasurement(task, measurement, options = {}) {
|
||||
try {
|
||||
const controller = new AbortController();
|
||||
lazy.setTimeout(() => controller.abort(), timeout);
|
||||
let report = await this.generateReport(
|
||||
task,
|
||||
measurement,
|
||||
controller.signal
|
||||
);
|
||||
lazy.setTimeout(() => controller.abort(), options.timeout ?? 30_000);
|
||||
|
||||
let keys = {
|
||||
leader_hpke: HPKEConfigManager.decodeKey(lazy.gLeaderHpke),
|
||||
helper_hpke: HPKEConfigManager.decodeKey(lazy.gHelperHpke),
|
||||
};
|
||||
|
||||
let report = this.generateReport(task, measurement, keys);
|
||||
|
||||
await this.sendReport(
|
||||
task.leader_endpoint,
|
||||
lazy.gDapEndpoint,
|
||||
task.id,
|
||||
report,
|
||||
controller.signal
|
||||
controller.signal,
|
||||
options
|
||||
);
|
||||
} catch (e) {
|
||||
if (e.name === "AbortError") {
|
||||
|
@ -172,103 +210,56 @@ export const DAPTelemetrySender = new (class {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @typedef {object} AggregatorKeys
|
||||
* @property {Uint8Array} leader_hpke - The leader's DAP HPKE key.
|
||||
* @property {Uint8Array} helper_hpke - The helper's DAP HPKE key.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Downloads HPKE configs for endpoints and generates report.
|
||||
* Generates the encrypted DAP report.
|
||||
*
|
||||
* @param {Task} task
|
||||
* Definition of the task for which the measurement was taken.
|
||||
* @param {number} measurement
|
||||
* @param {number|Array<number>} measurement
|
||||
* The measured value for which a report is generated.
|
||||
* @returns Promise
|
||||
* @resolves {Uint8Array} The generated binary report data.
|
||||
* @rejects {Error} If an exception is thrown while generating the report.
|
||||
* @param {AggregatorKeys} keys
|
||||
* The DAP encryption keys for each aggregator.
|
||||
*
|
||||
* @returns {ArrayBuffer} The generated binary report data.
|
||||
*/
|
||||
async generateReport(task, measurement, abortSignal) {
|
||||
let [leader_config_bytes, helper_config_bytes] = await Promise.all([
|
||||
this.getHpkeConfig(
|
||||
task.leader_endpoint + "/hpke_config?task_id=" + task.id,
|
||||
abortSignal
|
||||
),
|
||||
this.getHpkeConfig(
|
||||
task.helper_endpoint + "/hpke_config?task_id=" + task.id,
|
||||
abortSignal
|
||||
),
|
||||
]);
|
||||
if (leader_config_bytes == null) {
|
||||
lazy.logConsole.error("HPKE config download failed for leader.");
|
||||
}
|
||||
if (helper_config_bytes == null) {
|
||||
lazy.logConsole.error("HPKE config download failed for helper.");
|
||||
}
|
||||
if (abortSignal.aborted) {
|
||||
throw new DOMException("HPKE config download was aborted", "AbortError");
|
||||
}
|
||||
if (leader_config_bytes === null || helper_config_bytes === null) {
|
||||
throw new Error(`HPKE config download failed.`);
|
||||
generateReport(task, measurement, keys) {
|
||||
let encoder = null;
|
||||
switch (task.measurement_type) {
|
||||
case "u8":
|
||||
encoder = Services.DAPTelemetry.GetReportU8;
|
||||
break;
|
||||
case "vecu8":
|
||||
encoder = Services.DAPTelemetry.GetReportVecU8;
|
||||
break;
|
||||
case "vecu16":
|
||||
encoder = Services.DAPTelemetry.GetReportVecU16;
|
||||
break;
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown measurement type for task ${task.id}: ${task.measurement_type}`
|
||||
);
|
||||
}
|
||||
|
||||
let task_id = new Uint8Array(
|
||||
ChromeUtils.base64URLDecode(task.id, { padding: "ignore" })
|
||||
);
|
||||
let report = {};
|
||||
if (task.measurement_type == "u8") {
|
||||
Services.DAPTelemetry.GetReportU8(
|
||||
leader_config_bytes,
|
||||
helper_config_bytes,
|
||||
measurement,
|
||||
task_id,
|
||||
task.time_precision,
|
||||
report
|
||||
);
|
||||
} else if (task.measurement_type == "vecu8") {
|
||||
Services.DAPTelemetry.GetReportVecU8(
|
||||
leader_config_bytes,
|
||||
helper_config_bytes,
|
||||
measurement,
|
||||
task_id,
|
||||
task.time_precision,
|
||||
report
|
||||
);
|
||||
} else if (task.measurement_type == "vecu16") {
|
||||
Services.DAPTelemetry.GetReportVecU16(
|
||||
leader_config_bytes,
|
||||
helper_config_bytes,
|
||||
measurement,
|
||||
task_id,
|
||||
task.time_precision,
|
||||
report
|
||||
);
|
||||
} else {
|
||||
throw new Error(
|
||||
`Unknown measurement type for task ${task.id}: ${task.measurement_type}`
|
||||
);
|
||||
}
|
||||
let reportData = new Uint8Array(report.value);
|
||||
return reportData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches TLS encoded HPKE config from a URL.
|
||||
*
|
||||
* @param {string} endpoint
|
||||
* The URL from where to get the data.
|
||||
* @returns Promise
|
||||
* @resolves {Uint8Array} The binary representation of the endpoint configuration.
|
||||
* @rejects {Error} If an exception is thrown while fetching the configuration.
|
||||
*/
|
||||
async getHpkeConfig(endpoint, abortSignal) {
|
||||
// Use HPKEConfigManager to cache config for up to 24 hr. This reduces
|
||||
// unecessary requests while limiting how long a stale config can be stuck
|
||||
// if a server change is made ungracefully.
|
||||
let buffer = await HPKEConfigManager.get(endpoint, {
|
||||
maxAge: 24 * 60 * 60 * 1000,
|
||||
abortSignal,
|
||||
});
|
||||
if (buffer === null) {
|
||||
return null;
|
||||
}
|
||||
let hpke_config_bytes = new Uint8Array(buffer);
|
||||
return hpke_config_bytes;
|
||||
let reportOut = {};
|
||||
encoder(
|
||||
keys.leader_hpke,
|
||||
keys.helper_hpke,
|
||||
measurement,
|
||||
task_id,
|
||||
task.time_precision,
|
||||
reportOut
|
||||
);
|
||||
return new Uint8Array(reportOut.value).buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -276,20 +267,42 @@ export const DAPTelemetrySender = new (class {
|
|||
*
|
||||
* @param {string} leader_endpoint
|
||||
* The URL for the leader.
|
||||
* @param {Uint8Array} report
|
||||
* @param {string} task_id
|
||||
* Base64 encoded task_id as it appears in the upload path.
|
||||
* @param {ArrayBuffer} report
|
||||
* Raw bytes of the TLS encoded report.
|
||||
* @param {AbortSignal} abortSignal
|
||||
* Can be used to cancel network requests. Does not cancel computation.
|
||||
* @param {object} options
|
||||
* @param {string} options.ohttp_relay
|
||||
* @param {Uint8Array} options.ohttp_hpke
|
||||
* If an OHTTP relay is specified, the reports are uploaded over OHTTP. In
|
||||
* this case, the OHTTP and DAP keys must be provided and this code will not
|
||||
* attempt to fetch them.
|
||||
*
|
||||
* @returns Promise
|
||||
* @resolves {undefined} Once the attempt to send the report completes, whether or not it was successful.
|
||||
*/
|
||||
async sendReport(leader_endpoint, task_id, report, abortSignal) {
|
||||
async sendReport(leader_endpoint, task_id, report, abortSignal, options) {
|
||||
const upload_path = leader_endpoint + "/tasks/" + task_id + "/reports";
|
||||
try {
|
||||
let response = await fetch(upload_path, {
|
||||
let requestOptions = {
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/dap-report" },
|
||||
body: report,
|
||||
signal: abortSignal,
|
||||
});
|
||||
};
|
||||
let response;
|
||||
if (options.ohttp_relay) {
|
||||
response = await lazy.ObliviousHTTP.ohttpRequest(
|
||||
options.ohttp_relay,
|
||||
options.ohttp_hpke,
|
||||
upload_path,
|
||||
requestOptions
|
||||
);
|
||||
} else {
|
||||
response = await fetch(upload_path, requestOptions);
|
||||
}
|
||||
|
||||
if (response.status != 200) {
|
||||
const content_type = response.headers.get("content-type");
|
||||
|
|
|
@ -138,12 +138,10 @@ export const DAPVisitCounter = new (class {
|
|||
};
|
||||
|
||||
send_promises.push(
|
||||
DAPTelemetrySender.sendDAPMeasurement(
|
||||
task,
|
||||
measurement,
|
||||
DAPTelemetrySender.sendDAPMeasurement(task, measurement, {
|
||||
timeout,
|
||||
reason
|
||||
)
|
||||
reason,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,20 +14,14 @@ ChromeUtils.defineESModuleGetters(lazy, {
|
|||
DAPTelemetrySender: "resource://gre/modules/DAPTelemetrySender.sys.mjs",
|
||||
});
|
||||
|
||||
const BinaryOutputStream = Components.Constructor(
|
||||
"@mozilla.org/binaryoutputstream;1",
|
||||
"nsIBinaryOutputStream",
|
||||
"setOutputStream"
|
||||
);
|
||||
|
||||
const BinaryInputStream = Components.Constructor(
|
||||
"@mozilla.org/binaryinputstream;1",
|
||||
"nsIBinaryInputStream",
|
||||
"setInputStream"
|
||||
);
|
||||
|
||||
const PREF_LEADER = "toolkit.telemetry.dap_leader";
|
||||
const PREF_HELPER = "toolkit.telemetry.dap_helper";
|
||||
const PREF_LEADER = "toolkit.telemetry.dap.leader.url";
|
||||
const PREF_HELPER = "toolkit.telemetry.dap.helper.url";
|
||||
|
||||
let received = false;
|
||||
let server;
|
||||
|
@ -52,34 +46,6 @@ const tasks = [
|
|||
},
|
||||
];
|
||||
|
||||
function hpkeConfigHandler(request, response) {
|
||||
if (
|
||||
request.queryString ==
|
||||
"task_id=QjMD4n8l_MHBoLrbCfLTFi8hC264fC59SKHPviPF0q8" ||
|
||||
request.queryString == "task_id=DSZGMFh26hBYXNaKvhL_N4AHA3P5lDn19on1vFPBxJM"
|
||||
) {
|
||||
let config_bytes;
|
||||
if (request.path.startsWith("/leader")) {
|
||||
config_bytes = new Uint8Array([
|
||||
0, 41, 47, 0, 32, 0, 1, 0, 1, 0, 32, 11, 33, 206, 33, 131, 56, 220, 82,
|
||||
153, 110, 228, 200, 53, 98, 210, 38, 177, 197, 252, 198, 36, 201, 86,
|
||||
121, 169, 238, 220, 34, 143, 112, 177, 10,
|
||||
]);
|
||||
} else {
|
||||
config_bytes = new Uint8Array([
|
||||
0, 41, 42, 0, 32, 0, 1, 0, 1, 0, 32, 28, 62, 242, 195, 117, 7, 173, 149,
|
||||
250, 15, 139, 178, 86, 241, 117, 143, 75, 26, 57, 60, 88, 130, 199, 175,
|
||||
195, 9, 241, 130, 61, 47, 215, 101,
|
||||
]);
|
||||
}
|
||||
response.setHeader("Content-Type", "application/dap-hpke-config");
|
||||
let bos = new BinaryOutputStream(response.bodyOutputStream);
|
||||
bos.writeByteArray(config_bytes);
|
||||
} else {
|
||||
Assert.ok(false, `Unknown query string: ${request.queryString}`);
|
||||
}
|
||||
}
|
||||
|
||||
function uploadHandler(request, response) {
|
||||
Assert.equal(
|
||||
request.getHeader("Content-Type"),
|
||||
|
@ -104,8 +70,6 @@ add_setup(async function () {
|
|||
|
||||
// Set up a mock server to represent the DAP endpoints.
|
||||
server = new HttpServer();
|
||||
server.registerPathHandler("/leader_endpoint/hpke_config", hpkeConfigHandler);
|
||||
server.registerPathHandler("/helper_endpoint/hpke_config", hpkeConfigHandler);
|
||||
server.registerPrefixHandler("/leader_endpoint/tasks/", uploadHandler);
|
||||
server.start(-1);
|
||||
|
||||
|
@ -144,5 +108,5 @@ add_task(async function testNetworkError() {
|
|||
thrownErr = e;
|
||||
}
|
||||
|
||||
Assert.equal("HPKE config download failed.", thrownErr.message);
|
||||
Assert.ok(thrownErr.message.startsWith("Sending failed."));
|
||||
});
|
||||
|
|
Загрузка…
Ссылка в новой задаче