Bug 1610347 - Extract to BigQuery - Fixes for Heroku (#5984)

* more error handling
* more explanation
* django query does not encode datetime
* fix redis
* update hashes
* Handle Decimal on prototype
* add --start for testing, limit some string length
* more logging
This commit is contained in:
Kyle Lahnakoski 2020-02-20 11:05:03 -05:00 коммит произвёл GitHub
Родитель 6710bb43e6
Коммит 4283107ea3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 232 добавлений и 111 удалений

Просмотреть файл

@ -424,42 +424,44 @@ django-cache-memoize==0.1.6 \
--hash=sha256:d239e8c37734b0a70b74f94fa33b180b3b0c82c3784beb21209bb4ab64a3e6fb
# Required for extraction to BigQuery
mo-times==3.46.20032 \
--hash=sha256:94661e2fdfe43b85a1bdd3a40e2b77c4aa91c5863389acb0c2017e4690415af4 (
mo-future==3.45.20031 \
--hash=sha256:aeed75cbd211e297526f7b0ea5f3111b6930edd0e334cc5042ee6bd263078ee1
pyLibrary==3.45.20031 \
--hash=sha256:bb13210d319b58310126c1b0757e37486149c7578b00e5696d167c1d0e056d11
jx-bigquery==3.45.20031 \
--hash=sha256:153f2ca02c426770451329f76d4f622e470604a1bd607f446781cb7e1177bf7d
jx-mysql==3.45.20031 \
--hash=sha256:2bcffa2e013126ec3267449d3119fcff017f4c6c7d38a04fbc6194b81cda52dd
mo-dots==3.46.20032 \
--hash=sha256:698380ed29c0be3677b8e68ae2e74cadac37299bf6914aefb695c2b34d14bf3f
mo-files==3.41.20031 \
--hash=sha256:b5e7bd0b51844a54f5d9f6d8c8ef5b2c86defd81c49b2b5aed7850732042f7b3
mo-math==3.41.20031 \
--hash=sha256:76ac7894c45986fcbe84eaa638643561bae59ada8f523ed4e064c556fdd1f9d3
jx-python==3.43.20031 \
--hash=sha256:85315b458954629b75902ca97841b0a0a57cd22d12ba2b85484b32604a41ad31
mo-logs==3.46.20032 \
--hash=sha256:f69fa8aae7c3c493823259b7d28220b318bc5433b6bba53796824f7c7ba88c3d
mo-sql==3.38.20031 \
--hash=sha256:18db8a496dc86a057cc8e6dd515369a23cd5113e049788744513ec88deee99ae
mo-json==3.38.20029 \
--hash=sha256:c9fc794eca55bb220a050b0c3e5b34a807fbe3d7d72b84f5d76c1a32ef618600
mo-kwargs==3.38.20029 \
--hash=sha256:f4075da450d9c73655c6431136c970648c4f57d1e29930abba006db83f2a5836
mo-collections==3.38.20029 \
--hash=sha256:291069c12c7275fbbddc4437b961805a55ca501555c2dac6538f9d899feb0253
mo-testing==3.38.20029 \
--hash=sha256:48c4a9b10f2d20a6fdfe66a6e72d9c47c456023eefcc3845ed6c988b44380cac
mo-threads==3.42.20031 \
--hash=sha256:15a7caddbfd5067be737909c214861898a357370f41d7e590ced7061797019b3
mo-json-config==3.42.20031 \
--hash=sha256:65f926de86c4cd21f6b13e4dbb858927a0e8be5e83635daa64ffe9f5133eb36b
jx-bigquery==3.48.20042 \
--hash=sha256:c9e22d07fbf3cb5032ecaefa849ef422e2b5c7f87615bdfd3dd13b0b834d94a2
jx-mysql==3.49.20042 \
--hash=sha256:adb47e8e13151178cbb81e3e352776ec848b799e9b038de7796a889782564519
jx-python==3.50.20043 \
--hash=sha256:3c2bfaeb77027bec1d35c1475a3716c52a77db1f26064c09603964cf4f383779
jx-sqlite==3.42.20031 \
--hash=sha256:929070fd5b6663247bfa73254732f71eda817e30f3a25a04af4274f1bb156e04
mo-collections==3.46.20032 \
--hash=sha256:b96df523a7894e78c9160b45426b8db45d47841f5185f4dd1ccfe9a3708e89d6
mo-dots==3.47.20042 \
--hash=sha256:4bed834b4932a6b258d7d6244fed0931c4f2b1ae77b991302c4cf74515f47ff8
mo-files==3.41.20031 \
--hash=sha256:b5e7bd0b51844a54f5d9f6d8c8ef5b2c86defd81c49b2b5aed7850732042f7b3
mo-future==3.47.20042 \
--hash=sha256:6b17166d41bd26d9204a7becdceb3f024f83095bf8caac979de163a2ff86ae34
mo-json==3.50.20043 \
--hash=sha256:afa82f794e35a6e100948c98e0bc2529949e5724dca44ce287354dca42afbf8b
mo-json-config==3.42.20031 \
--hash=sha256:65f926de86c4cd21f6b13e4dbb858927a0e8be5e83635daa64ffe9f5133eb36b
mo-kwargs==3.47.20042 \
--hash=sha256:2b6c92f55c88a2320057f9fc774b8b27ff6ba8bf9e665b98b487dfc40a29a405
mo-logs==3.50.20043 \
--hash=sha256:7a8ee849ca6d3712b5fcc71eb7a73c5230756de1a686044898ec72b750a8e45e
mo-math==3.41.20031 \
--hash=sha256:76ac7894c45986fcbe84eaa638643561bae59ada8f523ed4e064c556fdd1f9d3
mo-sql==3.47.20042 \
--hash=sha256:a6a93be00fe9f7cc667e25fb1f6cf267003b457a0cce8226edcfb46a58f291cf
mo-testing==3.38.20029 \
--hash=sha256:48c4a9b10f2d20a6fdfe66a6e72d9c47c456023eefcc3845ed6c988b44380cac
mo-times==3.50.20043 \
--hash=sha256:794a36aeeacc504b778a00a431d073f9d1ef77fb7278bb8bff224c99a7cc3720
mo-threads==3.42.20031 \
--hash=sha256:15a7caddbfd5067be737909c214861898a357370f41d7e590ced7061797019b3
pyLibrary==3.47.20042 \
--hash=sha256:b15eb994c819eca8dc860af8ea6437e482d519c1a1c7618a28fc6bcd4f8043f5
pymysql==0.9.3 \
--hash=sha256:3943fbbbc1e902f41daf7f9165519f140c4451c179380677e6a848587042561a
google-cloud-bigquery==1.24.0 \

Просмотреть файл

@ -1,3 +1,5 @@
import pytest
from django.db.models import Q
from jx_base.expressions import NULL
from jx_mysql.mysql import MySQL
from jx_mysql.mysql_snowflake_extractor import MySqlSnowflakeExtractor
@ -5,6 +7,9 @@ from mo_files import File
from mo_future import text
from mo_sql import SQL
from mo_testing.fuzzytestcase import assertAlmostEqual
from mo_times import Date
from treeherder.model.models import Job
def test_make_repository(test_repository, extract_job_settings):
@ -36,18 +41,66 @@ def test_make_job(complex_job, extract_job_settings):
assert result[0].num == 4
# VERIFY SQL OVER DATABASE
def test_extract_job_sql(extract_job_settings, transactional_db):
"""
VERIFY SQL OVER DATABASE
"""
extractor = MySqlSnowflakeExtractor(extract_job_settings.source)
sql = extractor.get_sql(SQL("SELECT 0"))
assert "".join(sql.sql.split()) == "".join(EXTRACT_JOB_SQL.split())
def test_django_cannot_encode_datetime(extract_job_settings):
"""
DJANGO DOES NOT ENCODE THE DATETIME PROPERLY
"""
epoch = Date(Date.EPOCH).datetime
get_ids = SQL(
str(
(
Job.objects.filter(
Q(last_modified__gt=epoch) | (Q(last_modified=epoch) & Q(id__gt=0))
)
.annotate()
.values("id")
.order_by("last_modified", "id")[:2000]
).query
)
)
source = MySQL(extract_job_settings.source.database)
with pytest.raises(Exception):
with source.transaction():
list(source.query(get_ids, stream=True, row_tuples=True))
def test_django_cannot_encode_datetime_strings(extract_job_settings):
"""
DJANGO/MYSQL DATETIME MATH WORKS WHEN STRINGS
"""
epoch_string = Date.EPOCH.format()
sql_query = SQL(
str(
(
Job.objects.filter(
Q(last_modified__gt=epoch_string)
| (Q(last_modified=epoch_string) & Q(id__gt=0))
)
.annotate()
.values("id")
.order_by("last_modified", "id")[:2000]
).query
)
)
source = MySQL(extract_job_settings.source.database)
with pytest.raises(Exception):
with source.transaction():
list(source.query(sql_query, stream=True, row_tuples=True))
def test_extract_job(complex_job, extract_job_settings, now):
source = MySQL(extract_job_settings.source.database)
# with source.transaction():
# result = source.query(SQL("SELECT * from text_log_error"))
# assert result[0].guid == complex_job.guid
extractor = MySqlSnowflakeExtractor(extract_job_settings.source)
sql = extractor.get_sql(SQL("SELECT " + text(complex_job.id) + " as id"))
@ -60,7 +113,9 @@ def test_extract_job(complex_job, extract_job_settings, now):
doc.guid = complex_job.guid
doc.last_modified = complex_job.last_modified
assertAlmostEqual(acc, JOB, places=3) # TH MIXES LOCAL TIMEZONE WITH GMT: https://bugzilla.mozilla.org/show_bug.cgi?id=1612603
assertAlmostEqual(
acc, JOB, places=3
) # TH MIXES LOCAL TIMEZONE WITH GMT: https://bugzilla.mozilla.org/show_bug.cgi?id=1612603
EXTRACT_JOB_SQL = (File(__file__).parent / "test_extract_job.sql").read()

Просмотреть файл

@ -0,0 +1,44 @@
# Extracting to BigQuery
## Run in Heroku
### Set Environment variables
The Heroku ***Config Vars*** are not delivered to Python environment variables; there is an obfuscation step which changes the characters provided.
**Some escaping examples**
| Heroku Config Var | Realized | JSON String* |
| ----------------- | ----------- | -------------------- |
| `\n` | | `"\n"` |
| `\\n` | `\n` | `"\\n"` |
| `"\n"` | | `"\n"` |
| `"\""` | `"` | `"\""` |
| `"\"\n"` | `"` | `"\"\n"` |
| `"\"\\n\""` | `"\n"` | `"\"\\n\""` |
***Note:** The JSON String is the JSON encoding of the realized string, for clarity.*
**Some not-escaping examples**
| Heroku Config Var | Realized | JSON String* |
| ----------------- | ----------- | -------------------- |
| `""\n"` | `""\n"` | `"\"\"\\n\""` |
| `\n"\n"` | `\n"\n"` | `"\\n\"\\n\""` |
| `\"\\n\"` | `\"\\n\"` | `"\\\"\\\\n\\\""` |
| `\n\"\\n\"` | `\n\"\\n\"` | `"\\n\\\"\\\\n\\\""`|
In general, basic escaping works with or without quotes. But if you provide an **invalid** escape sequence, then escaping is disabled; If you try to escape a quote outside quotes, or escape a character that does not require escaping, then the whole string is treated as literal.
### Setup Schedule Job
Heroku has an **Heroku Scheduler Addon** which will create a new machine to execute management commands. You can setup an hourly (or daily) job with the following command:
```
newrelic-admin run-program ./manage.py extract_jobs
```
The `newrelic-admin run-program` prefix ensures NewRelic captures and reports the output.

Просмотреть файл

@ -43,7 +43,7 @@
"database": {
"debug": false,
"ssl": {
"ca": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
"pem": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
},
"host": {
"$ref": "env://DATABASE_URL"

Просмотреть файл

@ -1,5 +1,3 @@
import logging
from django.db.models import Q
from jx_bigquery import bigquery
from jx_mysql.mysql import MySQL
@ -14,25 +12,30 @@ from mo_sql import SQL
from mo_times import (DAY,
YEAR,
Timer)
from mo_times.dates import (Date,
parse)
from mo_times.dates import Date
from redis import Redis
from treeherder.config.settings import REDIS_URL
from treeherder.perf.models import PerformanceAlertSummary
CONFIG_FILE = (File.new_instance(__file__).parent / "extract_alerts.json").abspath
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class ExtractAlerts:
def run(self, force=False, restart=False, merge=False):
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
try:
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
self.extract(settings, force, restart, merge)
except Exception as e:
Log.error("could not extract alerts", cause=e)
finally:
Log.stop()
def extract(self, settings, force, restart, merge):
if not settings.extractor.app_name:
Log.error("Expecting an extractor.app_name in config file")
@ -47,7 +50,7 @@ class ExtractAlerts:
destination.merge_shards()
# RECOVER LAST SQL STATE
redis = Redis()
redis = Redis.from_url(REDIS_URL)
state = redis.get(settings.extractor.key)
if restart or not state:
@ -57,7 +60,7 @@ class ExtractAlerts:
state = json2value(state.decode("utf8"))
last_modified, alert_id = state
last_modified = parse(last_modified)
last_modified = Date(last_modified)
# SCAN SCHEMA, GENERATE EXTRACTION SQL
extractor = MySqlSnowflakeExtractor(settings.source)

Просмотреть файл

@ -44,7 +44,7 @@
"database": {
"debug": false,
"ssl": {
"ca": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
"pem": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
},
"host": {
"$ref": "env://DATABASE_URL"
@ -97,7 +97,9 @@
"debug": {
"trace": true,
"logs": [
{ "log_type": "console" },
{
"log_type": "console"
},
{
"log_type": "logger",
"name": "extract_jobs"

Просмотреть файл

@ -1,30 +1,40 @@
from django.db.models import Q
from jx_bigquery import bigquery
from jx_mysql.mysql import MySQL
from jx_mysql.mysql import (MySQL,
sql_query)
from jx_mysql.mysql_snowflake_extractor import MySqlSnowflakeExtractor
from jx_python import jx
from mo_files import File
from mo_json import (json2value,
value2json)
from mo_logs import (Log,
constants,
startup)
startup,
strings)
from mo_sql import SQL
from mo_times import Timer
from mo_times.dates import parse
from mo_times.dates import Date
from redis import Redis
from treeherder.model.models import Job
from treeherder.config.settings import REDIS_URL
CONFIG_FILE = (File.new_instance(__file__).parent / "extract_jobs.json").abspath
class ExtractJobs:
def run(self, force=False, restart=False, merge=False):
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
def run(self, force=False, restart=False, start=None, merge=False):
try:
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
self.extract(settings, force, restart, start, merge)
except Exception as e:
Log.error("could not extract jobs", cause=e)
finally:
Log.stop()
def extract(self, settings, force, restart, start, merge):
if not settings.extractor.app_name:
Log.error("Expecting an extractor.app_name in config file")
@ -39,10 +49,12 @@ class ExtractJobs:
destination.merge_shards()
# RECOVER LAST SQL STATE
redis = Redis()
redis = Redis.from_url(REDIS_URL)
state = redis.get(settings.extractor.key)
if restart or not state:
if start:
state = start, 0
elif restart or not state:
state = (0, 0)
redis.set(settings.extractor.key, value2json(state).encode("utf8"))
else:
@ -77,43 +89,25 @@ class ExtractJobs:
# get_ids = ConcatSQL(
# (SQL_SELECT, sql_alias(quote_value(283890114), "id"))
# )
# get_ids = sql_query(
# {
# "from": "job",
# "select": ["id"],
# "where": {
# "or": [
# {"gt": {"last_modified": parse(last_modified)}},
# {
# "and": [
# {"eq": {"last_modified": parse(last_modified)}},
# {"gt": {"id": job_id}},
# ]
# },
# ]
# },
# "sort": ["last_modified", "id"],
# "limit": settings.extractor.chunk_size,
# }
# )
get_ids = SQL(str(
(
Job.objects.filter(
Q(last_modified__gt=parse(last_modified).datetime)
| (
Q(last_modified=parse(last_modified).datetime)
& Q(id__gt=job_id)
)
)
.annotate()
.values("id")
.order_by("last_modified", "id")[
: settings.extractor.chunk_size
]
).query
))
get_ids = sql_query(
{
"from": "job",
"select": ["id"],
"where": {
"or": [
{"gt": {"last_modified": Date(last_modified)}},
{
"and": [
{"eq": {"last_modified": Date(last_modified)}},
{"gt": {"id": job_id}},
]
},
]
},
"sort": ["last_modified", "id"],
"limit": settings.extractor.chunk_size,
}
)
sql = extractor.get_sql(get_ids)
# PULL FROM source, AND PUSH TO destination
@ -123,6 +117,11 @@ class ExtractJobs:
extractor.construct_docs(cursor, acc.append, False)
if not acc:
break
# SOME LIMITS PLACES ON STRING SIZE
for fl in jx.drill(acc, "job_log.failure_line"):
fl.message = strings.limit(fl.message, 10000)
destination.extend(acc)
# RECORD THE STATE

Просмотреть файл

@ -40,7 +40,7 @@
"database": {
"debug": false,
"ssl": {
"ca": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
"pem": "https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"
},
"host": {
"$ref": "env://DATABASE_URL"

Просмотреть файл

@ -11,6 +11,7 @@ from mo_sql import SQL
from mo_times import Timer
from redis import Redis
from treeherder.config.settings import REDIS_URL
from treeherder.perf.models import PerformanceDatum
CONFIG_FILE = (File.new_instance(__file__).parent / "extract_perf.json").abspath
@ -18,11 +19,19 @@ CONFIG_FILE = (File.new_instance(__file__).parent / "extract_perf.json").abspath
class ExtractPerf:
def run(self, force=False, restart=False, merge=False):
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
try:
# SETUP LOGGING
settings = startup.read_settings(filename=CONFIG_FILE)
constants.set(settings.constants)
Log.start(settings.debug)
self.extract(settings, force, restart, merge)
except Exception as e:
Log.error("could not extract perf", cause=e)
finally:
Log.stop()
def extract(self, settings, force, restart, merge):
if not settings.extractor.app_name:
Log.error("Expecting an extractor.app_name in config file")
@ -37,7 +46,7 @@ class ExtractPerf:
destination.merge_shards()
# RECOVER LAST SQL STATE
redis = Redis()
redis = Redis.from_url(REDIS_URL)
state = redis.get(settings.extractor.key)
if restart or not state:

Просмотреть файл

@ -20,6 +20,11 @@ class Command(BaseCommand):
dest="restart",
help="start extraction from the beginning"
)
parser.add_argument(
"--start",
dest="start",
help="date/time to start extraction"
)
parser.add_argument(
"--merge",
action='store_true',
@ -31,5 +36,7 @@ class Command(BaseCommand):
ExtractJobs().run(
force=options.get("force"),
restart=options.get("restart"),
merge=options.get("merge")
start=options.get("start"),
merge=options.get("merge"),
)