Bug 1601139 - Update ingestion-beam integration workflow with sampled documents (#1021)

* Replace sampled-landfill with bq query

* Update samples for testing the pipeline

* Update documentation to include document sample

* Use CTE and remove reference to landfill
This commit is contained in:
Anthony Miyaguchi 2019-12-06 12:24:02 -08:00 коммит произвёл GitHub
Родитель 7b719ca419
Коммит 65769c6208
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 92 добавлений и 162 удалений

Просмотреть файл

@ -44,6 +44,8 @@ about the sandbox environment that is provided by data operations.
- This may take several minutes. Read the script for usage information.
- Each namespace will be given its own dataset and each document type its own table.
* Verify that tables have been updated by viewing the BigQuery console.
* Download a copy of sampled documents using `bin/download-document-sample`
- Upload this to your project's data bucket e.g. `gs://$PROJECT/data/`
## Building the project
@ -59,7 +61,7 @@ path="$BUCKET/data/*.ndjson"
# use local maven instead of the docker container in bin/mvn, otherwise make sure to mount
# credentials into the proper location in the container
./bin/mvn compile exec:java -Dexec.args="\
mvn compile exec:java -Dexec.args="\
--runner=Dataflow \
--project=$PROJECT \
--autoscalingAlgorithm=NONE \

Просмотреть файл

@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""Download samples of json documents from the decoded and error stream.
This is meant for integration testing, and is easily inspected through the command-line.
For example, to count the total number documents per group:
cat document_sample.ndjson | \
jq -rc '.attributeMap | [.document_namespace, .document_type, .document_version]' | \
uniq -c
"""
import argparse
import base64
import gzip
import json
import logging
import os
import time
from google.cloud import bigquery
INGESTION_BEAM_ROOT = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
)
# formatted using the BigQuery console formatter
QUERY = """
-- Create a PubSub compatible row with the most recent document samples that
-- have been decoded.
with most_recent_timestamp AS (
SELECT
MAX(submission_timestamp)
FROM
`moz-fx-data-shared-prod`.monitoring.document_sample_nonprod_v1
)
SELECT
STRUCT( document_namespace,
document_type,
document_version ) AS attributeMap,
payload
FROM
`moz-fx-data-shared-prod`.monitoring.document_sample_nonprod_v1
WHERE
document_decoded
AND submission_timestamp = (SELECT * FROM most_recent_timestamp)
ORDER BY
document_namespace,
document_type,
document_version
"""
def extract_samples(query):
"""A generator for a query on the document sample table.
Documents can be processed using jq e.g.
jq '.payload | @base64d | fromjson'
"""
client = bigquery.Client()
query_job = client.query(query)
for row in query_job:
row_dict = dict(row.items())
row_dict["payload"] = base64.b64encode(
gzip.decompress(row_dict["payload"])
).decode("utf-8")
yield row_dict
def main(args):
os.chdir(INGESTION_BEAM_ROOT)
start = time.time()
with open(args.output_file, "w") as fp:
for pubsub_document in extract_samples(QUERY):
fp.write(f"{json.dumps(pubsub_document)}\n")
logging.info(f"Done in {time.time()-start} seconds!")
def parse_arguments():
parser = argparse.ArgumentParser("download-document-sample")
parser.add_argument("--output-file", default="document_sample.ndjson")
args = parser.parse_args()
return args
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main(parse_arguments())

Просмотреть файл

@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""Download samples of raw data from the AWS ingestion pipeline into ndjson.
The sanitized-landfill-sample.v3 dataset contain newline delimited json with the
JSON payload stored as a string. This sampled data is used to run integration
tests for the avro file-sink using generated avro schemas from
mozilla-pipeline-schemas.
This script requires boto3 and python-rapidjson to be installed on the system.
Additionally, the credentials for accessing
"telemetry-parquet/sanitized-landfill-sample" should be available on the system.
"""
import argparse
import base64
import logging
import json
import os
import tarfile
from datetime import datetime, timedelta
from functools import partial
import boto3
import rapidjson
INGESTION_BEAM_ROOT = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
)
def parse_schema_name_archive(path):
"""Given a directory path to a json schema in the mps directory, generate
the fully qualified name in the form `{namespace}.{doctype}.{docver}`."""
elements = path.split("/")
doctype, docver = elements[-1].split(".")[:-2]
namespace = elements[-3]
return f"{namespace}.{doctype}.{docver}"
def load_schemas(path):
"""Return a dictionary containing "{namespace}.{doctype}.{docver}" to validator"""
schemas = {}
with tarfile.open(path, "r") as tf:
paths = [
name
for name in tf.getnames()
if name.endswith(".schema.json") and name.split("/")[1] == "schemas"
]
for path in paths:
name = parse_schema_name_archive(path)
schemas[name] = rapidjson.Validator(tf.extractfile(path).read())
return schemas
def parse_schema_name_sample(key):
"""Return the fully qualified name in the form of `{namespace}.{doctype}.{docver}` for
a key in s3 corresponding to `sanitized-landfill-sample`.
Example path:
```
sanitized-landfill-sample/v3/
submission_date_s3=20190308/
namespace=webpagetest/
doc_type=webpagetest-run/
doc_version=1/
part-00122-tid-2954272513278013416-c06a39af-9979-41a5-8459-76412a4554b3-650.c000.json
```
"""
params = dict([x.split("=") for x in key.split("/") if "=" in x])
return ".".join(map(params.get, ["namespace", "doc_type", "doc_version"]))
def read_documents(schemas, bucket, prefix):
"""A generator for reading documents in the sampled landfill dataset that exist in the schemas."""
s3 = boto3.client("s3")
# enumerate all keys that end in json
objects = s3.list_objects(Bucket=bucket, Prefix=prefix)
keys = [obj["Key"] for obj in objects["Contents"] if obj["Key"].endswith(".json")]
# yield the (fully-qualified name, ndjson string) for all valid keys
for key in keys:
name = parse_schema_name_sample(key)
if name not in schemas:
logging.info("schema does not exist for {}".format(name))
continue
data = (
s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8").strip()
)
yield name, data
def generate_pubsub_message(namespace, doctype, docver, payload):
"""Generate a valid pubsub message that can be used for integration testing."""
document = {
"attributeMap": {
"document_namespace": namespace,
"document_type": doctype,
"document_version": docver,
},
# payload is already a string
"payload": base64.b64encode(payload.encode("utf-8")).decode("utf-8"),
}
return json.dumps(document)
def write_pubsub(fp, schemas, name, document):
"""Write sampled pings into serialized pubsub messages."""
logging.info(f"Generating messages for {name}")
success = 0
error = 0
namespace, doctype, docver = name.split(".")
for line in document.split("\n"):
content = json.loads(line).get("content")
try:
schemas[name](content)
except ValueError:
error += 1
continue
message = generate_pubsub_message(namespace, doctype, docver, content)
fp.write(message + "\n")
success += 1
logging.info(f"Wrote {success} documents, skipped {error}")
def main(args):
os.chdir(INGESTION_BEAM_ROOT)
schemas = load_schemas(args.schema_file)
bucket = args.bucket
prefix = f"{args.prefix}/submission_date_s3={args.submission_date}"
documents = read_documents(schemas, bucket, prefix)
with open(args.output_file, "w") as fp:
for name, document in documents:
write_pubsub(fp, schemas, name, document)
logging.info("Done!")
def parse_arguments():
parser = argparse.ArgumentParser("download-sampled-landfill")
parser.add_argument("--bucket", default="telemetry-parquet")
parser.add_argument("--prefix", default="sanitized-landfill-sample/v3")
parser.add_argument(
"--submission-date", default=(datetime.now() - timedelta(1)).strftime("%Y%m%d")
)
parser.add_argument("--schema-file", default="schemas.tar.gz")
parser.add_argument("--output-file", default="landfill-integration.ndjson")
args = parser.parse_args()
return args
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main(parse_arguments())