updating amo_data.py to use redash

This commit is contained in:
Sarah Melancon 2019-04-03 13:17:32 -07:00
Родитель 150612c1b4
Коммит 296922635f
2 изменённых файлов: 30 добавлений и 25 удалений

Просмотреть файл

@ -3,6 +3,11 @@
import os
import requests
import time
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local", "first app")
sqlContext = SQLContext(sc)
api_key = 'KFj4lLJnrFDMxdNU0b0yTC2BnuMXNT1U1apLO7y9'
redash_url = 'https://sql.telemetry.mozilla.org'
@ -25,4 +30,7 @@ while job['status'] not in (3,4):
result_id = job['query_result_id']
response = s.get('{}/api/queries/{}/results/{}.json'.format(redash_url, query_id, result_id))
print(response.json()['query_result']['data']['rows'])
data_dict = response.json()['query_result']['data']['rows']
df = sqlContext.createDataFrame(data=data_dict)
df.show(5)

Просмотреть файл

@ -1,29 +1,26 @@
# This file contains a helper function to create the AMO portion of the dataset
# This is copied over from Databricks - it probably won't work as is
import os
import requests
import time
from pyspark.sql import SQLContext
def load_amo(api_key, redash_api, query_id, sqlContext):
def load_amo():
db = 'db-slave-amoprod1.amo.us-west-2.prod.mozaws.net:3306'
hostname, port = db.split(":")
port = int(port)
database = 'addons_mozilla_org'
params = {'p_param': 1234}
tempdir = "s3n://mozilla-databricks-telemetry-test/amo-mysql/_temp"
jdbcurl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}&ssl=true&sslMode=verify-ca" \
.format(hostname, port, database,
dbutils.secrets.get("amo-mysql", "amo-mysql-user"),
dbutils.secrets.get("amo-mysql", "amo-mysql-pass"))
s = requests.Session()
s.headers.update({'Authorization': 'Key {}'.format(api_key)})
response = s.post('{}/api/queries/{}/refresh'.format(redash_url, query_id), params=params)
print(jdbcurl)
sql_context = SQLContext(sc)
job = response.json()['job']
amo_df = (
sql_context.read
.format("jdbc")
.option("forward_spark_s3_credentials", True)
.option("url", jdbcurl)
.option("tempdir", tempdir)
.option("query", "select guid, slug, averagerating, totalreviews from addons limit 1000")
.load()
)
while job['status'] not in (3,4):
response = s.get('{}/api/jobs/{}'.format(redash_url, job['id']))
job = response.json()['job']
time.sleep(1)
result_id = job['query_result_id']
response = s.get('{}/api/queries/{}/results/{}.json'.format(redash_url, query_id, result_id))
data_dict = response.json()['query_result']['data']['rows']
df = sqlContext.createDataFrame(data=data_dict)
return df