From 65769c62088a54c837123146143c3e82f35f4a15 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 6 Dec 2019 12:24:02 -0800 Subject: [PATCH] Bug 1601139 - Update ingestion-beam integration workflow with sampled documents (#1021) * Replace sampled-landfill with bq query * Update samples for testing the pipeline * Update documentation to include document sample * Use CTE and remove reference to landfill --- .../ingestion_testing_workflow.md | 4 +- ingestion-beam/bin/download-document-sample | 89 ++++++++++ ingestion-beam/bin/download-sampled-landfill | 161 ------------------ 3 files changed, 92 insertions(+), 162 deletions(-) create mode 100755 ingestion-beam/bin/download-document-sample delete mode 100755 ingestion-beam/bin/download-sampled-landfill diff --git a/docs/ingestion-beam/ingestion_testing_workflow.md b/docs/ingestion-beam/ingestion_testing_workflow.md index 2924877f..fbcc1e24 100644 --- a/docs/ingestion-beam/ingestion_testing_workflow.md +++ b/docs/ingestion-beam/ingestion_testing_workflow.md @@ -44,6 +44,8 @@ about the sandbox environment that is provided by data operations. - This may take several minutes. Read the script for usage information. - Each namespace will be given its own dataset and each document type its own table. * Verify that tables have been updated by viewing the BigQuery console. +* Download a copy of sampled documents using `bin/download-document-sample` + - Upload this to your project's data bucket e.g. `gs://$PROJECT/data/` ## Building the project @@ -59,7 +61,7 @@ path="$BUCKET/data/*.ndjson" # use local maven instead of the docker container in bin/mvn, otherwise make sure to mount # credentials into the proper location in the container -./bin/mvn compile exec:java -Dexec.args="\ +mvn compile exec:java -Dexec.args="\ --runner=Dataflow \ --project=$PROJECT \ --autoscalingAlgorithm=NONE \ diff --git a/ingestion-beam/bin/download-document-sample b/ingestion-beam/bin/download-document-sample new file mode 100755 index 00000000..f03bcec0 --- /dev/null +++ b/ingestion-beam/bin/download-document-sample @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +"""Download samples of json documents from the decoded and error stream. + +This is meant for integration testing, and is easily inspected through the command-line. +For example, to count the total number documents per group: + + cat document_sample.ndjson | \ + jq -rc '.attributeMap | [.document_namespace, .document_type, .document_version]' | \ + uniq -c +""" + +import argparse +import base64 +import gzip +import json +import logging +import os +import time + +from google.cloud import bigquery + +INGESTION_BEAM_ROOT = os.path.realpath( + os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") +) + +# formatted using the BigQuery console formatter +QUERY = """ +-- Create a PubSub compatible row with the most recent document samples that +-- have been decoded. +with most_recent_timestamp AS ( + SELECT + MAX(submission_timestamp) + FROM + `moz-fx-data-shared-prod`.monitoring.document_sample_nonprod_v1 +) +SELECT + STRUCT( document_namespace, + document_type, + document_version ) AS attributeMap, + payload +FROM + `moz-fx-data-shared-prod`.monitoring.document_sample_nonprod_v1 +WHERE + document_decoded + AND submission_timestamp = (SELECT * FROM most_recent_timestamp) +ORDER BY + document_namespace, + document_type, + document_version +""" + + +def extract_samples(query): + """A generator for a query on the document sample table. + + Documents can be processed using jq e.g. + + jq '.payload | @base64d | fromjson' + """ + client = bigquery.Client() + query_job = client.query(query) + for row in query_job: + row_dict = dict(row.items()) + row_dict["payload"] = base64.b64encode( + gzip.decompress(row_dict["payload"]) + ).decode("utf-8") + yield row_dict + + +def main(args): + os.chdir(INGESTION_BEAM_ROOT) + start = time.time() + with open(args.output_file, "w") as fp: + for pubsub_document in extract_samples(QUERY): + fp.write(f"{json.dumps(pubsub_document)}\n") + logging.info(f"Done in {time.time()-start} seconds!") + + +def parse_arguments(): + parser = argparse.ArgumentParser("download-document-sample") + parser.add_argument("--output-file", default="document_sample.ndjson") + args = parser.parse_args() + return args + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main(parse_arguments()) diff --git a/ingestion-beam/bin/download-sampled-landfill b/ingestion-beam/bin/download-sampled-landfill deleted file mode 100755 index 165ba4cc..00000000 --- a/ingestion-beam/bin/download-sampled-landfill +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 - -"""Download samples of raw data from the AWS ingestion pipeline into ndjson. - -The sanitized-landfill-sample.v3 dataset contain newline delimited json with the -JSON payload stored as a string. This sampled data is used to run integration -tests for the avro file-sink using generated avro schemas from -mozilla-pipeline-schemas. - -This script requires boto3 and python-rapidjson to be installed on the system. -Additionally, the credentials for accessing -"telemetry-parquet/sanitized-landfill-sample" should be available on the system. -""" - -import argparse -import base64 -import logging -import json -import os -import tarfile -from datetime import datetime, timedelta -from functools import partial - -import boto3 -import rapidjson - - -INGESTION_BEAM_ROOT = os.path.realpath( - os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") -) - - -def parse_schema_name_archive(path): - """Given a directory path to a json schema in the mps directory, generate - the fully qualified name in the form `{namespace}.{doctype}.{docver}`.""" - elements = path.split("/") - doctype, docver = elements[-1].split(".")[:-2] - namespace = elements[-3] - return f"{namespace}.{doctype}.{docver}" - - -def load_schemas(path): - """Return a dictionary containing "{namespace}.{doctype}.{docver}" to validator""" - schemas = {} - with tarfile.open(path, "r") as tf: - paths = [ - name - for name in tf.getnames() - if name.endswith(".schema.json") and name.split("/")[1] == "schemas" - ] - for path in paths: - name = parse_schema_name_archive(path) - schemas[name] = rapidjson.Validator(tf.extractfile(path).read()) - return schemas - - -def parse_schema_name_sample(key): - """Return the fully qualified name in the form of `{namespace}.{doctype}.{docver}` for - a key in s3 corresponding to `sanitized-landfill-sample`. - - Example path: - ``` - sanitized-landfill-sample/v3/ - submission_date_s3=20190308/ - namespace=webpagetest/ - doc_type=webpagetest-run/ - doc_version=1/ - part-00122-tid-2954272513278013416-c06a39af-9979-41a5-8459-76412a4554b3-650.c000.json - ``` - """ - params = dict([x.split("=") for x in key.split("/") if "=" in x]) - return ".".join(map(params.get, ["namespace", "doc_type", "doc_version"])) - - -def read_documents(schemas, bucket, prefix): - """A generator for reading documents in the sampled landfill dataset that exist in the schemas.""" - s3 = boto3.client("s3") - - # enumerate all keys that end in json - objects = s3.list_objects(Bucket=bucket, Prefix=prefix) - keys = [obj["Key"] for obj in objects["Contents"] if obj["Key"].endswith(".json")] - - # yield the (fully-qualified name, ndjson string) for all valid keys - for key in keys: - name = parse_schema_name_sample(key) - if name not in schemas: - logging.info("schema does not exist for {}".format(name)) - continue - data = ( - s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8").strip() - ) - yield name, data - - -def generate_pubsub_message(namespace, doctype, docver, payload): - """Generate a valid pubsub message that can be used for integration testing.""" - document = { - "attributeMap": { - "document_namespace": namespace, - "document_type": doctype, - "document_version": docver, - }, - # payload is already a string - "payload": base64.b64encode(payload.encode("utf-8")).decode("utf-8"), - } - return json.dumps(document) - - -def write_pubsub(fp, schemas, name, document): - """Write sampled pings into serialized pubsub messages.""" - logging.info(f"Generating messages for {name}") - - success = 0 - error = 0 - namespace, doctype, docver = name.split(".") - - for line in document.split("\n"): - content = json.loads(line).get("content") - try: - schemas[name](content) - except ValueError: - error += 1 - continue - message = generate_pubsub_message(namespace, doctype, docver, content) - fp.write(message + "\n") - success += 1 - - logging.info(f"Wrote {success} documents, skipped {error}") - - -def main(args): - os.chdir(INGESTION_BEAM_ROOT) - schemas = load_schemas(args.schema_file) - - bucket = args.bucket - prefix = f"{args.prefix}/submission_date_s3={args.submission_date}" - documents = read_documents(schemas, bucket, prefix) - - with open(args.output_file, "w") as fp: - for name, document in documents: - write_pubsub(fp, schemas, name, document) - - logging.info("Done!") - - -def parse_arguments(): - parser = argparse.ArgumentParser("download-sampled-landfill") - parser.add_argument("--bucket", default="telemetry-parquet") - parser.add_argument("--prefix", default="sanitized-landfill-sample/v3") - parser.add_argument( - "--submission-date", default=(datetime.now() - timedelta(1)).strftime("%Y%m%d") - ) - parser.add_argument("--schema-file", default="schemas.tar.gz") - parser.add_argument("--output-file", default="landfill-integration.ndjson") - args = parser.parse_args() - return args - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - main(parse_arguments())