Bug 1237610: update `submit_telemetry_data.py` r=ted

This commit updates submit_telemetry_data.py to send data
to the Telemetry pipeline. The script assumes the presence
of a "telemetry" directory within the statedir, and an
"outgoing" directory within the "telemetry" directory (otherwise
there is no data to submit). The script will create a
"submitted" directory and "telemetry.log" file if absent,
making the assumption that this is the first build telemetry
submission for that user. UUID values for submitted data points
are seeded from the filename, without the ".json" suffix.

Differential Revision: https://phabricator.services.mozilla.com/D5687

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Connor Sheehan 2018-09-20 18:54:07 +00:00
Родитель 9d2d9836e6
Коммит 1d722a9ac1
1 изменённых файлов: 157 добавлений и 47 удалений

Просмотреть файл

@ -2,76 +2,186 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import errno
from __future__ import print_function
import datetime
import json
import logging
import os
import sys
import time
HERE = os.path.abspath(os.path.dirname(__file__))
sys.path.append(os.path.join(HERE, '..', 'third_party', 'python', 'requests'))
PYTHIRDPARTY = os.path.join(HERE, '..', 'third_party', 'python')
# Add some required files to $PATH to ensure they are available
sys.path.append(os.path.join(HERE, '..', 'python', 'mozbuild', 'mozbuild'))
sys.path.append(os.path.join(PYTHIRDPARTY, 'requests'))
sys.path.append(os.path.join(PYTHIRDPARTY, 'voluptuous'))
import requests
import voluptuous
import voluptuous.humanize
from mozbuild.telemetry import schema as build_telemetry_schema
BUILD_TELEMETRY_URL = 'https://incoming.telemetry.mozilla.org/{endpoint}'
SUBMIT_ENDPOINT = 'submit/eng-workflow/build/1/{ping_uuid}'
STATUS_ENDPOINT = 'status'
# Server to which to submit telemetry data
BUILD_TELEMETRY_SERVER = 'http://52.88.27.118/build-metrics-dev'
def delete_expired_files(directory, days=30):
'''Discards files in a directory older than a specified number
of days
'''
now = datetime.datetime.now()
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
ctime = os.path.getctime(filepath)
then = datetime.datetime.fromtimestamp(ctime)
if (now - then) > datetime.timedelta(days=days):
os.remove(filepath)
return
def submit_telemetry_data(statedir):
def check_edge_server_status(session):
'''Returns True if the Telemetry Edge Server
is ready to accept data
'''
status_url = BUILD_TELEMETRY_URL.format(endpoint=STATUS_ENDPOINT)
response = session.get(status_url)
if response.status_code != 200:
return False
return True
# No data to work with anyway
outgoing = os.path.join(statedir, 'telemetry', 'outgoing')
if not os.path.isdir(outgoing):
return 0
submitted = os.path.join(statedir, 'telemetry', 'submitted')
try:
os.mkdir(submitted)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def send_telemetry_ping(session, data, ping_uuid):
'''Sends a single build telemetry ping to the
edge server, returning the response object
'''
resource_url = SUBMIT_ENDPOINT.format(ping_uuid=str(ping_uuid))
url = BUILD_TELEMETRY_URL.format(endpoint=resource_url)
response = session.post(url, json=data)
session = requests.Session()
for filename in os.listdir(outgoing):
path = os.path.join(outgoing, filename)
if os.path.isdir(path) or not path.endswith('.json'):
continue
with open(path, 'r') as f:
data = f.read()
try:
r = session.post(BUILD_TELEMETRY_SERVER, data=data,
headers={'Content-Type': 'application/json'})
except Exception as e:
logging.error('Exception posting to telemetry '
'server: %s' % str(e))
break
# TODO: some of these errors are likely not recoverable, as
# written, we'll retry indefinitely
if r.status_code != 200:
logging.error('Error posting to telemetry: %s %s' %
(r.status_code, r.text))
return response
def submit_telemetry_data(outgoing, submitted):
'''Sends information about `./mach build` invocations to
the Telemetry pipeline
'''
with requests.Session() as session:
# Confirm the server is OK
if not check_edge_server_status(session):
logging.error('Error posting to telemetry: server status is not "200 OK"')
return 1
for filename in os.listdir(outgoing):
path = os.path.join(outgoing, filename)
if os.path.isdir(path) or not path.endswith('.json'):
logging.info('skipping item {}'.format(path))
continue
os.rename(os.path.join(outgoing, filename),
os.path.join(submitted, filename))
ping_uuid = os.path.splitext(filename)[0] # strip ".json" to get ping UUID
session.close()
try:
with open(path, 'r') as f:
data = json.load(f)
# Discard submitted data that is >= 30 days old
now = time.time()
for filename in os.listdir(submitted):
ctime = os.stat(os.path.join(submitted, filename)).st_ctime
if now - ctime >= 60 * 60 * 24 * 30:
os.remove(os.path.join(submitted, filename))
# Verify the data matches the schema
voluptuous.humanize.validate_with_humanized_errors(
data, build_telemetry_schema
)
response = send_telemetry_ping(session, data, ping_uuid)
if response.status_code != 200:
msg = 'response code {code} sending {uuid} to telemetry: {body}'.format(
body=response.content,
code=response.status_code,
uuid=ping_uuid,
)
logging.error(msg)
continue
# Move from "outgoing" to "submitted"
os.rename(os.path.join(outgoing, filename),
os.path.join(submitted, filename))
logging.info('successfully posted {} to telemetry'.format(ping_uuid))
except ValueError as ve:
# ValueError is thrown if JSON cannot be decoded
logging.exception('exception parsing JSON at %s: %s'
% (path, str(ve)))
os.remove(path)
except voluptuous.Error as e:
# Invalid is thrown if some data does not fit
# the correct Schema
logging.exception('invalid data found at %s: %s'
% (path, e.message))
os.remove(path)
except Exception as e:
logging.error('exception posting to telemetry '
'server: %s' % str(e))
break
delete_expired_files(submitted)
return 0
def verify_statedir(statedir):
'''Verifies the statedir is structured according to the assumptions of
this script
Requires presence of the following directories; will raise if absent:
- statedir/telemetry
- statedir/telemetry/outgoing
Creates the following directories and files if absent (first submission):
- statedir/telemetry/submitted
'''
telemetry_dir = os.path.join(statedir, 'telemetry')
outgoing = os.path.join(telemetry_dir, 'outgoing')
submitted = os.path.join(telemetry_dir, 'submitted')
telemetry_log = os.path.join(telemetry_dir, 'telemetry.log')
if not os.path.isdir(telemetry_dir):
raise Exception('{} does not exist'.format(telemetry_dir))
if not os.path.isdir(outgoing):
raise Exception('{} does not exist'.format(outgoing))
if not os.path.isdir(submitted):
os.mkdir(submitted)
return outgoing, submitted, telemetry_log
if __name__ == '__main__':
if len(sys.argv) != 2:
print('usage: python submit_telemetry_data.py <statedir>')
sys.exit(1)
statedir = sys.argv[1]
logging.basicConfig(filename=os.path.join(statedir, 'telemetry', 'telemetry.log'),
format='%(asctime)s %(message)s')
sys.exit(submit_telemetry_data(statedir))
try:
outgoing, submitted, telemetry_log = verify_statedir(statedir)
# Configure logging
logging.basicConfig(filename=telemetry_log,
format='%(asctime)s %(message)s',
level=logging.DEBUG)
sys.exit(submit_telemetry_data(outgoing, submitted))
except Exception as e:
# Handle and print messages from `statedir` verification
print(e.message)
sys.exit(1)