зеркало из https://github.com/mozilla/fxa.git
Merge pull request #16836 from mozilla/fxa-9257
feat(scripts): refactor delete unverified account script to support cloud scheduler
This commit is contained in:
Коммит
69878baabb
|
@ -2046,6 +2046,44 @@ const convictConf = convict({
|
|||
},
|
||||
},
|
||||
cloudTasks: CloudTasksConvictConfigFactory(),
|
||||
cloudScheduler: {
|
||||
oidc: {
|
||||
aud: {
|
||||
default: '',
|
||||
doc: 'The audience value of the id token payload.',
|
||||
env: `AUTH_CLOUDSCHEDULER_OIDC_AUD`,
|
||||
format: String,
|
||||
},
|
||||
serviceAccountEmail: {
|
||||
default: '',
|
||||
doc: 'The GCP service account email address.',
|
||||
env: `AUTH_CLOUDSCHEDULER_OIDC_EMAIL`,
|
||||
format: String,
|
||||
},
|
||||
},
|
||||
deleteUnverifiedAccounts: {
|
||||
sinceDays: {
|
||||
default: 15,
|
||||
doc: 'The time since which unverified accounts should be deleted.',
|
||||
env: `AUTH_CLOUDSCHEDULER_DELETE_UNVERIFIED_ACCOUNTS_SINCE_DAYS`,
|
||||
format: Number,
|
||||
},
|
||||
durationDays: {
|
||||
default: 15,
|
||||
doc:
|
||||
'The duration to delete accounts from the since day. For example, ' +
|
||||
'if sinceDay is 15 and duration is 15, unverified accounts created ' +
|
||||
'between 15 and 30 days ago will be deleted.',
|
||||
env: `AUTH_CLOUDSCHEDULER_DELETE_UNVERIFIED_ACCOUNTS_DURATION_DAYS`,
|
||||
format: Number,
|
||||
},
|
||||
taskLimit: {
|
||||
default: 200,
|
||||
env: `AUTH_CLOUDSCHEDULER_DELETE_UNVERIFIED_ACCOUNTS_TASK_LIMIT`,
|
||||
format: Number,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// handle configuration files. you can specify a CSV list of configuration
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
import { ConfigType } from '../../config';
|
||||
import { AuthLogger, AuthRequest } from '../types';
|
||||
import { processDateRange } from '../../scripts/delete-unverified-accounts';
|
||||
import {
|
||||
AccountTasks,
|
||||
AccountTasksFactory,
|
||||
ReasonForDeletion,
|
||||
} from '@fxa/shared/cloud-tasks';
|
||||
import { StatsD } from 'hot-shots';
|
||||
|
||||
const MILLISECONDS_IN_A_DAY = 24 * 60 * 60 * 1000;
|
||||
|
||||
export class CloudSchedulerHandler {
|
||||
private accountTasks: AccountTasks;
|
||||
|
||||
constructor(
|
||||
private log: AuthLogger,
|
||||
private config: ConfigType,
|
||||
private statsd: StatsD
|
||||
) {
|
||||
this.accountTasks = AccountTasksFactory(this.config, this.statsd);
|
||||
}
|
||||
|
||||
private calculateDate(days: number): Date {
|
||||
return new Date(Date.now() - days * MILLISECONDS_IN_A_DAY);
|
||||
}
|
||||
|
||||
async deleteUnverifiedAccounts() {
|
||||
const { sinceDays, durationDays, taskLimit } =
|
||||
this.config.cloudScheduler.deleteUnverifiedAccounts;
|
||||
const endDate = this.calculateDate(sinceDays);
|
||||
const startDate = this.calculateDate(sinceDays + durationDays);
|
||||
const reason = ReasonForDeletion.Unverified;
|
||||
|
||||
this.log.info('Deleting unverified accounts', {
|
||||
initiatedAt: new Date().toISOString(),
|
||||
endDate: endDate.toISOString(),
|
||||
startDate: startDate.toISOString(),
|
||||
});
|
||||
this.statsd.increment('cloud-scheduler.deleteUnverifiedAccounts');
|
||||
await processDateRange(
|
||||
this.config,
|
||||
this.accountTasks,
|
||||
reason,
|
||||
startDate.getTime(),
|
||||
endDate.getTime(),
|
||||
taskLimit
|
||||
);
|
||||
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
export const cloudSchedulerRoutes = (
|
||||
log: AuthLogger,
|
||||
config: ConfigType,
|
||||
statsd: StatsD
|
||||
) => {
|
||||
const cloudSchedulerHandler = new CloudSchedulerHandler(log, config, statsd);
|
||||
return [
|
||||
{
|
||||
method: 'POST',
|
||||
path: '/cloud-scheduler/accounts/deleteUnverified',
|
||||
options: {
|
||||
auth: {
|
||||
strategy: 'cloudSchedulerOIDC',
|
||||
payload: false,
|
||||
},
|
||||
},
|
||||
handler: (request: AuthRequest) =>
|
||||
cloudSchedulerHandler.deleteUnverifiedAccounts(),
|
||||
},
|
||||
];
|
||||
};
|
||||
|
||||
export default cloudSchedulerRoutes;
|
|
@ -206,6 +206,9 @@ module.exports = function (
|
|||
const { cloudTaskRoutes } = require('./cloud-tasks');
|
||||
const cloudTasks = cloudTaskRoutes(log, config);
|
||||
|
||||
const { cloudSchedulerRoutes } = require('./cloud-scheduler');
|
||||
const cloudScheduler = cloudSchedulerRoutes(log, config, statsd);
|
||||
|
||||
let basePath = url.parse(config.publicUrl).path;
|
||||
if (basePath === '/') {
|
||||
basePath = '';
|
||||
|
@ -229,7 +232,8 @@ module.exports = function (
|
|||
subscriptions,
|
||||
newsletters,
|
||||
linkedAccounts,
|
||||
cloudTasks
|
||||
cloudTasks,
|
||||
cloudScheduler
|
||||
);
|
||||
|
||||
function optionallyIgnoreTrace(fn) {
|
||||
|
|
|
@ -442,6 +442,12 @@ async function create(log, error, config, routes, db, statsd, glean) {
|
|||
);
|
||||
server.auth.strategy('cloudTasksOIDC', 'cloudTasksOIDC');
|
||||
|
||||
server.auth.scheme(
|
||||
'cloudSchedulerOIDC',
|
||||
googleOIDC.strategy(config.cloudScheduler.oidc)
|
||||
);
|
||||
server.auth.strategy('cloudSchedulerOIDC', 'cloudSchedulerOIDC');
|
||||
|
||||
// register all plugins and Swagger configuration
|
||||
await server.register([
|
||||
{
|
||||
|
|
|
@ -9,7 +9,7 @@ import { Container } from 'typedi';
|
|||
|
||||
import { setupAccountDatabase } from '@fxa/shared/db/mysql/account';
|
||||
|
||||
import appConfig from '../config';
|
||||
import appConfig, { ConfigType } from '../config';
|
||||
import * as random from '../lib/crypto/random';
|
||||
import DB from '../lib/db';
|
||||
import { setupFirestore } from '../lib/firestore-db';
|
||||
|
@ -21,6 +21,7 @@ import Token from '../lib/tokens';
|
|||
import { AppConfig, AuthFirestore, AuthLogger } from '../lib/types';
|
||||
import { parseDryRun } from './lib/args';
|
||||
import {
|
||||
AccountTasks,
|
||||
AccountTasksFactory,
|
||||
ReasonForDeletion,
|
||||
} from '@fxa/shared/cloud-tasks';
|
||||
|
@ -214,7 +215,7 @@ const init = async () => {
|
|||
customerId: (await getAccountCustomerByUid(acct.uid))?.stripeCustomerId,
|
||||
reason,
|
||||
});
|
||||
console.log(`Created cloud task ${result} for uid ${x}`);
|
||||
console.log(`Created cloud task ${result} for email ${x}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -224,63 +225,97 @@ const init = async () => {
|
|||
return 0;
|
||||
}
|
||||
|
||||
const kyselyDb = await setupAccountDatabase(config.database.mysql.auth);
|
||||
const accounts = await kyselyDb
|
||||
.selectFrom('accounts')
|
||||
.where('accounts.emailVerified', '=', 0)
|
||||
.where('accounts.createdAt', '>=', program.startDate)
|
||||
.where('accounts.createdAt', '<=', program.endDate)
|
||||
.leftJoin('accountCustomers', 'accounts.uid', 'accountCustomers.uid')
|
||||
.select(['accounts.uid', 'accountCustomers.stripeCustomerId'])
|
||||
.execute();
|
||||
|
||||
// Scaling suggestion is 500/5/50 rule, may start at 500/sec, and increase every 5 minutes by 50%.
|
||||
// They also note increased latency may occur past 1000/sec, so we stop increasing as we approach that.
|
||||
const scaleUpIntervalMins = 5;
|
||||
let lastScaleUp = Date.now();
|
||||
let rateLimit = taskLimit;
|
||||
const queue = new PQueue({
|
||||
interval: 1000,
|
||||
intervalCap: rateLimit,
|
||||
concurrency: rateLimit * 2,
|
||||
});
|
||||
|
||||
for (const row of accounts) {
|
||||
if (
|
||||
rateLimit < 950 &&
|
||||
Date.now() - lastScaleUp > scaleUpIntervalMins * 60 * 1000
|
||||
) {
|
||||
rateLimit = Math.floor(rateLimit * 1.5);
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore
|
||||
queue['#intervalCap'] = rateLimit; // This is private, but we need to update it
|
||||
queue.concurrency = rateLimit * 2;
|
||||
lastScaleUp = Date.now();
|
||||
}
|
||||
|
||||
await queue.onSizeLessThan(rateLimit * 4); // Back-pressure
|
||||
|
||||
queue.add(async () => {
|
||||
try {
|
||||
const result = await accountTasks.deleteAccount({
|
||||
uid: row.uid.toString('hex'),
|
||||
customerId: row.stripeCustomerId || undefined,
|
||||
reason,
|
||||
});
|
||||
console.log(
|
||||
`Created cloud task ${result} for uid ${row.uid.toString('hex')}`
|
||||
);
|
||||
} catch (err) {
|
||||
console.error('Errored creating task', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
await queue.onIdle(); // Wait for the queue to empty and promises to complete
|
||||
await processDateRange(
|
||||
config,
|
||||
accountTasks,
|
||||
reason,
|
||||
program.startDate,
|
||||
program.endDate,
|
||||
taskLimit
|
||||
);
|
||||
}
|
||||
|
||||
return 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Process a date range of accounts to delete.
|
||||
*
|
||||
* @param config
|
||||
* @param accountTasks
|
||||
* @param reason
|
||||
* @param startDate
|
||||
* @param endDate
|
||||
* @param taskLimit
|
||||
*/
|
||||
export async function processDateRange(
|
||||
config: ConfigType,
|
||||
accountTasks: AccountTasks,
|
||||
reason: ReasonForDeletion,
|
||||
startDate: any,
|
||||
endDate: any,
|
||||
taskLimit: number
|
||||
) {
|
||||
const kyselyDb = await setupAccountDatabase(config.database.mysql.auth);
|
||||
const accounts = await kyselyDb
|
||||
.selectFrom('accounts')
|
||||
.where('accounts.emailVerified', '=', 0)
|
||||
.where('accounts.createdAt', '>=', startDate)
|
||||
.where('accounts.createdAt', '<=', endDate)
|
||||
.leftJoin('accountCustomers', 'accounts.uid', 'accountCustomers.uid')
|
||||
.select(['accounts.uid', 'accountCustomers.stripeCustomerId'])
|
||||
.execute();
|
||||
|
||||
// Scaling suggestion is 500/5/50 rule, may start at 500/sec, and increase every 5 minutes by 50%.
|
||||
// They also note increased latency may occur past 1000/sec, so we stop increasing as we approach that.
|
||||
const scaleUpIntervalMins = 5;
|
||||
let lastScaleUp = Date.now();
|
||||
let rateLimit = taskLimit;
|
||||
const queue = new PQueue({
|
||||
interval: 1000,
|
||||
intervalCap: rateLimit,
|
||||
concurrency: rateLimit * 2,
|
||||
});
|
||||
|
||||
if (accounts.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (const row of accounts) {
|
||||
if (
|
||||
rateLimit < 950 &&
|
||||
Date.now() - lastScaleUp > scaleUpIntervalMins * 60 * 1000
|
||||
) {
|
||||
rateLimit = Math.floor(rateLimit * 1.5);
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore
|
||||
queue['#intervalCap'] = rateLimit; // This is private, but we need to update it
|
||||
queue.concurrency = rateLimit * 2;
|
||||
lastScaleUp = Date.now();
|
||||
}
|
||||
|
||||
await queue.onSizeLessThan(rateLimit * 4); // Back-pressure
|
||||
|
||||
queue.add(async () => {
|
||||
try {
|
||||
const result = await accountTasks.deleteAccount({
|
||||
uid: row.uid.toString('hex'),
|
||||
customerId: row.stripeCustomerId || undefined,
|
||||
reason,
|
||||
});
|
||||
console.log(
|
||||
`Created cloud task ${result} for uid ${row.uid.toString('hex')}`
|
||||
);
|
||||
} catch (err) {
|
||||
console.error('Errored creating task', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
await queue.onIdle(); // Wait for the queue to empty and promises to complete
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (require.main === module) {
|
||||
init()
|
||||
.catch((err: Error) => {
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
const sinon = require('sinon');
|
||||
const assert = { ...sinon.assert, ...require('chai').assert };
|
||||
import { ReasonForDeletion } from '@fxa/shared/cloud-tasks';
|
||||
import proxyquire from 'proxyquire';
|
||||
|
||||
describe('CloudSchedulerHandler', function () {
|
||||
this.timeout(10000);
|
||||
|
||||
let cloudSchedulerHandler;
|
||||
let config;
|
||||
let log;
|
||||
let statsd;
|
||||
let AccountTasksFactory;
|
||||
let processDateRange;
|
||||
|
||||
beforeEach(() => {
|
||||
config = {
|
||||
cloudScheduler: {
|
||||
deleteUnverifiedAccounts: {
|
||||
sinceDays: 7,
|
||||
durationDays: 7,
|
||||
taskLimit: 1000,
|
||||
},
|
||||
},
|
||||
};
|
||||
log = {
|
||||
info: sinon.stub(),
|
||||
};
|
||||
statsd = {
|
||||
increment: sinon.stub(),
|
||||
};
|
||||
AccountTasksFactory = sinon.stub();
|
||||
processDateRange = sinon.stub();
|
||||
|
||||
const { CloudSchedulerHandler } = proxyquire(
|
||||
'../../../lib/routes/cloud-scheduler',
|
||||
{
|
||||
'../../scripts/delete-unverified-accounts': {
|
||||
processDateRange,
|
||||
},
|
||||
'@fxa/shared/cloud-tasks': {
|
||||
AccountTasksFactory,
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
cloudSchedulerHandler = new CloudSchedulerHandler(log, config, statsd);
|
||||
});
|
||||
|
||||
describe('deleteUnverifiedAccounts', () => {
|
||||
it('should call processDateRange with correct parameters', async () => {
|
||||
const accountTasks = AccountTasksFactory(config, statsd);
|
||||
const { sinceDays, durationDays, taskLimit } =
|
||||
config.cloudScheduler.deleteUnverifiedAccounts;
|
||||
const endDate = new Date(Date.now() - sinceDays * 24 * 60 * 60 * 1000);
|
||||
const startDate = new Date(
|
||||
endDate.getTime() - durationDays * 24 * 60 * 60 * 1000
|
||||
);
|
||||
const reason = ReasonForDeletion.Unverified;
|
||||
|
||||
await cloudSchedulerHandler.deleteUnverifiedAccounts();
|
||||
|
||||
assert.calledOnceWithExactly(
|
||||
processDateRange,
|
||||
config,
|
||||
accountTasks,
|
||||
reason,
|
||||
startDate.getTime(),
|
||||
endDate.getTime(),
|
||||
taskLimit
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -830,5 +830,11 @@ function getConfig() {
|
|||
serviceAccountEmail: 'testo@iam.gcp.g.co',
|
||||
},
|
||||
},
|
||||
cloudScheduler: {
|
||||
oidc: {
|
||||
aud: 'cloud-scheduler',
|
||||
serviceAccountEmail: 'testo@iam.gcp.g.co',
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче