Use bigquery load to overwrite partitions

This commit is contained in:
Benjamin Wu 2020-06-29 18:43:34 -04:00 коммит произвёл Ben Wu
Родитель 5f836971b8
Коммит f5275fba22
4 изменённых файлов: 69 добавлений и 21 удалений

Просмотреть файл

@ -92,12 +92,14 @@ async function getMetric(client, date, appId, appName, measure, dimension) {
);
}
return data.results.map((result) => ({
date,
app_name: appName,
value: result.totals.value,
[toUnderscore(dimension)]: result.group.title,
})).filter((result) => result.value !== -1);
return data.results
.filter((result) => result.totals.value !== -1)
.map((result) => ({
date,
app_name: appName,
value: result.totals.value,
[toUnderscore(dimension)]: result.group.title,
}));
}
/**
@ -119,31 +121,32 @@ async function startExport(client, project, dataset, overwrite, appId, appName,
// Run in for loop to synchronously send each request to avoid hitting rate limit
for (const [dimension, measures] of measuresByDimension) {
for (const measure of measures) {
let retry = false;
let retry;
let retryCount = 0;
let data = null;
do {
const retryDelay = 5 + 2 * 2 ** retryCount;
retry = false;
const retryDelay = 3 + 2 * 2 ** retryCount;
try {
data = await getMetric(client, date, appId, appName, measure, dimension);
} catch (err) {
console.error(`Failed to get ${measure} grouped by ${dimension}: ${err.message}`);
console.error(`Failed to get ${measure} by ${dimension}: ${err.message}`);
if (err.errorCode === 429) {
console.error(`Retrying in ${retryDelay} seconds due to API rate limit`);
retry = true;
retryCount += 1;
}
}
await sleep(retryDelay);
retryCount += 1;
if (retryCount >= 5 && retry === true) {
console.error(`Failed to get metrics after ${retryCount} attempts`);
if (retryCount > 5 && retry === true) {
console.error(`Failed to get ${measure} by ${dimension} after ${retryCount} attempts`);
break;
}
} while (retry);
if (data !== null && data.length > 0) {
const tableName = `${toUnderscore(measure)}_by_${toUnderscore(dimension)}`;
bqClient.writeRows(tableName, date, data, toUnderscore(dimension), overwrite)
bqClient.writeData(tableName, date, data, toUnderscore(dimension), overwrite)
.then(() => console.log(`Wrote to table ${tableName}`))
.catch((err) => console.error(`Failed to write to table ${tableName}: ${err}`));
}

Просмотреть файл

@ -1,5 +1,7 @@
'use strict';
const fs = require('fs');
const tempy = require('tempy');
const { BigQuery } = require('@google-cloud/bigquery');
/**
@ -47,22 +49,32 @@ class BigqueryClient {
return table;
}
async writeRows(tableName, date, data, dimension) {
async writeData(tableName, date, data, dimension, overwrite) {
const schema = [
{ name: 'date', type: 'DATE', mode: 'REQUIRED' },
{ name: 'app_name', type: 'STRING', mode: 'REQUIRED' },
{ name: 'value', type: 'STRING', mode: 'REQUIRED' },
{ name: dimension, type: 'STRING', mode: 'REQUIRED' },
];
// TODO: overwrite partition
const table = await this.createTableIfNotExists(tableName, schema);
await table.insert(
data,
{
schema,
let table = await this.createTableIfNotExists(tableName, schema);
const csvData = data.map((entry) => [
entry.date, entry.app_name, entry.value, entry[dimension]].join('\t'));
const csvPath = tempy.file();
fs.writeFileSync(csvPath, csvData.join('\n'));
table = this.dataset.table(`${tableName}$${date.replace(/-/g, '')}`);
await table.load(csvPath, {
format: 'CSV',
createDisposition: 'CREATE_NEVER',
writeDisposition: overwrite ? 'WRITE_TRUNCATE' : 'WRITE_APPEND',
fieldDelimiter: '\t',
schema: {
fields: schema,
},
);
});
}
}

Просмотреть файл

@ -17,6 +17,7 @@
"itunesconnectanalytics": "JanHalozan/iTunesConnectAnalytics#822494f03764a33a5e064a74078f5fa45a71e53a",
"node-fetch": "^2.6.0",
"request": "^2.88.2",
"tempy": "^0.5.0",
"yargs": "^15.3.1"
},
"devDependencies": {

Просмотреть файл

@ -450,6 +450,11 @@ cross-spawn@^7.0.2:
shebang-command "^2.0.0"
which "^2.0.1"
crypto-random-string@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/crypto-random-string/-/crypto-random-string-2.0.0.tgz#ef2a7a966ec11083388369baa02ebead229b30d5"
integrity sha512-v1plID3y9r/lPhviJ1wrXpLeyUIGAZ2SHNYTEapm7/8A9nLPoyvVp3RK/EPFqn5kEznyWgYZNsRtYYIWbuG8KA==
dashdash@^1.12.0:
version "1.14.1"
resolved "https://registry.yarnpkg.com/dashdash/-/dashdash-1.14.1.tgz#853cfa0f7cbe2fed5de20326b8dd581035f6e2f0"
@ -2119,6 +2124,21 @@ teeny-request@^7.0.0:
stream-events "^1.0.5"
uuid "^8.0.0"
temp-dir@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/temp-dir/-/temp-dir-2.0.0.tgz#bde92b05bdfeb1516e804c9c00ad45177f31321e"
integrity sha512-aoBAniQmmwtcKp/7BzsH8Cxzv8OL736p7v1ihGb5e9DJ9kTwGWHrQrVB5+lfVDzfGrdRzXch+ig7LHaY1JTOrg==
tempy@^0.5.0:
version "0.5.0"
resolved "https://registry.yarnpkg.com/tempy/-/tempy-0.5.0.tgz#2785c89df39fcc4d1714fc554813225e1581d70b"
integrity sha512-VEY96x7gbIRfsxqsafy2l5yVxxp3PhwAGoWMyC2D2Zt5DmEv+2tGiPOrquNRpf21hhGnKLVEsuqleqiZmKG/qw==
dependencies:
is-stream "^2.0.0"
temp-dir "^2.0.0"
type-fest "^0.12.0"
unique-string "^2.0.0"
text-table@^0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"
@ -2181,6 +2201,11 @@ type-detect@^4.0.0, type-detect@^4.0.5:
resolved "https://registry.yarnpkg.com/type-detect/-/type-detect-4.0.8.tgz#7646fb5f18871cfbb7749e69bd39a6388eb7450c"
integrity sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==
type-fest@^0.12.0:
version "0.12.0"
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.12.0.tgz#f57a27ab81c68d136a51fd71467eff94157fa1ee"
integrity sha512-53RyidyjvkGpnWPMF9bQgFtWp+Sl8O2Rp13VavmJgfAP9WWG6q6TkrKU8iyJdnwnfgHI6k2hTlgqH4aSdjoTbg==
type-fest@^0.8.1:
version "0.8.1"
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d"
@ -2191,6 +2216,13 @@ underscore@^1.9.1:
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.10.2.tgz#73d6aa3668f3188e4adb0f1943bd12cfd7efaaaf"
integrity sha512-N4P+Q/BuyuEKFJ43B9gYuOj4TQUHXX+j2FqguVOpjkssLUUrnJofCcBccJSCoeturDoZU6GorDTHSvUDlSQbTg==
unique-string@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/unique-string/-/unique-string-2.0.0.tgz#39c6451f81afb2749de2b233e3f7c5e8843bd89d"
integrity sha512-uNaeirEPvpZWSgzwsPGtU2zVSTrn/8L5q/IexZmH0eH6SA73CmAA5U4GwORTxQAZs95TAXLNqeLoPPNO5gZfWg==
dependencies:
crypto-random-string "^2.0.0"
uri-js@^4.2.2:
version "4.2.2"
resolved "https://registry.yarnpkg.com/uri-js/-/uri-js-4.2.2.tgz#94c540e1ff772956e2299507c010aea6c8838eb0"