Merge branch 'master' into reorganize_code

Conflicts:
	monitoring/heka/telemetry_channel_60days.lua
This commit is contained in:
Mark Reid 2013-10-28 15:14:44 -03:00
Родитель f0b5962dae 4a099c67ee
Коммит a08644cc93
17 изменённых файлов: 198 добавлений и 128 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -3,6 +3,7 @@
*.bz2
*.swp
*.swo
*.out
.idea/
data/
histogram_cache/

Просмотреть файл

@ -0,0 +1,28 @@
{
"incoming_bucket": "telemetry-incoming-v1",
"incoming_queue": "telemetry-incoming-v1",
"incoming_batch_size": 8,
"publish_bucket": "telemetry-published-v1",
"image": "ami-76831f46",
"skip_conversion": false,
"loop": true,
"instance_type": "c1.xlarge",
"ssl_key_name": "mreid",
"ssl_key_path": "~/.ssh/aws/mreid.pem",
"ssl_retries": 10,
"ssl_user": "ubuntu",
"security_groups": ["telemetry"],
"region": "us-west-2",
"placement": "us-west-2c",
"name": "telemetry-process-incoming-v1-1",
"default_tags": {
"Owner": "mreid",
"Application": "telemetry-server"
},
"ephemeral_map": {
"/dev/xvdb": "ephemeral0",
"/dev/xvdc": "ephemeral1",
"/dev/xvdd": "ephemeral2",
"/dev/xvde": "ephemeral3"
}
}

Просмотреть файл

@ -3,8 +3,8 @@ Telemetry MapReduce
We provide a basic [MapReduce][1] framework to process Telemetry Data.
The base code is in [job.py](job.py), and there are a few examples of job
scripts (and filters) in the `examples/` directory.
The base code is in [job.py](../job.py), and there are a few examples of job
scripts (and filters) in the [examples/](../examples) directory.
Given the storage layout, it is easy to parallelize the processing of data
between all available CPU cores within a machine, but also across multiple
@ -15,8 +15,8 @@ which avoids having to parse the json string for simple count-type tasks.
You can specify a `filter` that leverages the [storage layout][2] to quickly and
easily limit the data set you want to work with. It uses the same type of
[Telemetry Schema](telemetry_schema.py) document to determine which files are
included. There are examples of filters in the `examples/` dir as well.
[Telemetry Schema](../telemetry_schema.py) document to determine which files are
included. There are examples of filters in the [examples/](../examples) dir as well.
[1]: http://en.wikipedia.org/wiki/MapReduce "MapReduce"
[2]: StorageLayout.md "On-disk Storage Layout"

Просмотреть файл

@ -5,7 +5,7 @@ The basic idea is to partition the data by a number of useful dimensions, then
use the dimensions to form a directory hierarchy. Finally, the actual
submissions will be stored in compressed files that may be read and processed
in parallel. The files in a directory will be split into manageable sized
pieces. Each line in the file will be of the form <uuid><tab><json>. See
pieces. Each line in the file will be of the form `<uuid><tab><json>`. See
[StorageFormat](StorageFormat.md) for more details about the contents of the
files.
@ -92,7 +92,7 @@ This is the approach that will be used.
### `telemetry_schema.json`
The schema is defined in [telemetry_schema.json](telemetry_schema.json) and
The schema is defined in [telemetry_schema.json](../telemetry_schema.json) and
contains an array of `dimensions` that are used to determine what is allowed
at each level of the storage hierarchy. Currently supported values are:
- String value `*`: allow any value
@ -108,7 +108,7 @@ that the "long tail" of dimension values does not cause a huge number of small
files to be created.
Code for handling a schema is found in the `TelemetrySchema` class
in [telemetry_schema.py](telemetry_schema.py)
in [telemetry_schema.py](../telemetry_schema.py)
Considered, but unused approaches
---------------------------------

6
heka/sbmgr.toml Normal file
Просмотреть файл

@ -0,0 +1,6 @@
ip_address = "127.0.0.1:5565"
[signer]
name = "telemetry"
hmac_hash = "md5"
hmac_key = "TODO change on deploy"
version = 0

Просмотреть файл

@ -200,10 +200,9 @@ class Job:
conn = S3Connection(self._aws_key, self._aws_secret_key)
bucket = conn.get_bucket(self._bucket_name)
for r in remote_files:
key = bucket.lookup(r)
size = key.size
dims = self._input_filter.get_dimensions(".", r)
remote = {"type": "remote", "name": r, "size": size, "dimensions": dims}
size = r.size
dims = self._input_filter.get_dimensions(".", r.name)
remote = {"type": "remote", "name": r.name, "size": size, "dimensions": dims}
#print "putting", remote, "into partition", min_idx
partitions[min_idx].append(remote)
sums[min_idx] += size
@ -240,10 +239,14 @@ class Job:
conn = S3Connection(self._aws_key, self._aws_secret_key)
bucket = conn.get_bucket(self._bucket_name)
count = 0
# TODO: potential optimization - if our input filter is reasonably
# restrictive an/or our list of keys is very long, it may be
# a win to use the "prefix" and "delimiter" params.
for f in bucket.list():
count += 1
if count % 1000 == 0:
print "Listed", count, "so far"
dims = self._input_filter.get_dimensions(".", f.name)
#print f.name, "->", ",".join(dims)
include = True
@ -252,9 +255,9 @@ class Job:
include = False
break
if include:
out_files.append(f.name)
out_files.append(f)
conn.close()
print "Done!"
print "Done! Fetched", len(out_files), "files"
return out_files
def filter_includes(self, level, value):

Просмотреть файл

@ -2,21 +2,42 @@
maxprocs = 4
base_dir = "."
max_timer_inject = 100
max_process_duration = 500000
[TelemetryInput]
[ProtobufDecoder]
pool_size = 1
encoding_name = "PROTOCOL_BUFFER"
[TCP:5565]
type = "TcpInput"
parser_type = "message.proto"
decoder = "ProtobufDecoder"
address = ":5565"
[TCP:5565.signer.telemetry_0]
hmac_key = "TODO change on deploy" # TODO update on deploy
[TelemetrySandboxManager]
type = "SandboxManagerFilter"
message_signer = "telemetry"
message_matcher = "Type == 'heka.control.sandbox'"
working_directory = "sandbox"
max_filters = 100
[TelemetryServerInput]
type = "LogfileInput"
logfile = "/var/log/telemetry/telemetry-server.log"
decoder = "TelemetryDecoder"
seek_journal_name = "telemetry.journal"
decoder = "TelemetryServerDecoder"
seek_journal_name = "telemetry-server.journal"
use_seek_journal = true
[TelemetryDecoder]
[TelemetryServerDecoder]
type = "PayloadJsonDecoder"
pool_size = 1
timestamp_layout = "2006-01-02T15:04:05.999Z"
#{"url":"/submit/telemetry/60dfb8c5-9966-4fd8-9b82-3a5ec2ac3d6a/saved-session/Firefox/23.0.1/release/20130814063812","duration_ms":0.547324,"code":200,"size":4819,"level":"info","message":"OK","timestamp":"2013-09-10T20:43:17.217Z"}
[TelemetryDecoder.json_map]
[TelemetryServerDecoder.json_map]
Timestamp = "$.timestamp"
Severity = "$.level"
url = "$.url"
@ -25,13 +46,13 @@ code = "$.code"
size = "$.size"
message = "$.message"
[TelemetryDecoder.severity_map]
[TelemetryServerDecoder.severity_map]
debug = 1
warning = 2
info = 3
[TelemetryDecoder.message_fields]
Type = "TelemetryLog"
[TelemetryServerDecoder.message_fields]
Type = "TelemetryServerLog"
Payload = ""
url = "%url%"
duration|ms = "%duration%"
@ -41,7 +62,7 @@ message = "%message%"
[TelemetryFilter]
type = "SandboxFilter"
message_matcher = "Type == 'TelemetryLog'"
message_matcher = "Type == 'TelemetryServerLog'"
ticker_interval = 60
script_type = "lua"
filename = "telemetry.lua"
@ -52,7 +73,7 @@ output_limit = 64000
[TelemetryChannel]
type = "SandboxFilter"
message_matcher = "Type == 'TelemetryLog'"
message_matcher = "Type == 'TelemetryServerLog'"
ticker_interval = 60
script_type = "lua"
filename = "telemetry_channel.lua"
@ -61,19 +82,33 @@ memory_limit = 8000000
instruction_limit = 1000
output_limit = 64000
[TelemetryChannel.config]
rows = 1440
sec_per_row = 60
[TelemetryChannel60Days]
type = "SandboxFilter"
message_matcher = "Type == 'TelemetryLog'"
message_matcher = "Type == 'TelemetryServerLog'"
ticker_interval = 60
script_type = "lua"
filename = "telemetry_channel_60days.lua"
filename = "telemetry_channel.lua"
preserve_data = true
memory_limit = 8000000
instruction_limit = 1000
output_limit = 64000
[TelemetryChannel60Days.config]
rows = 1440
sec_per_row = 3600
[Dashboard]
type = "DashboardOutput"
address = "0.0.0.0:4352"
ticker_interval = 5
address = ":4352"
ticker_interval = 10
working_directory = "dashboard"
[AMQPOutput]
url = "amqp://guest:guest@10.250.68.186"
exchange = "testout"
exchangeType = "fanout"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'cbufd'"

Просмотреть файл

@ -5,7 +5,7 @@
local rows = 1440
local sec_per_row = 60
request = circular_buffer.new(rows, 4, sec_per_row)
request = circular_buffer.new(rows, 4, sec_per_row, true)
local SUCCESS = request:set_header(1, "Success" , "count")
local FAILURE = request:set_header(2, "Failure" , "count")
local AVG_REQUEST_SIZE = request:set_header(3, "Request Size", "B", "avg")
@ -45,6 +45,7 @@ function timer_event(ns)
-- request:add(ns, 1, 0)
-- sums:add(ns, 1, 0)
output(request)
inject_message("cbuf", "Request Statistics")
local title = "Request Statistics"
inject_message(request:format("cbuf"), title)
inject_message(request:format("cbufd"), title)
end

Просмотреть файл

@ -2,15 +2,23 @@
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
local rows = 1440
local sec_per_row = 60
all = circular_buffer.new(rows, 2, sec_per_row)
local REQUESTS = all:set_header(1, "Requests")
local TOTAL_SIZE = all:set_header(2, "Total Size", "KiB")
local rows = read_config("rows")
local sec_per_row = read_config("sec_per_row")
local REQUESTS = 1
local TOTAL_SIZE = 2
channels = {}
local function add_channel(channel)
channels[channel] = circular_buffer.new(rows, 2, sec_per_row, true)
local c = channels[channel]
c:set_header(REQUESTS, "Requests")
c:set_header(TOTAL_SIZE, "Total Size", "KiB")
return c
end
all = add_channel("ALL")
function process_message ()
local ts = read_message("Timestamp")
local rs = tonumber(read_message("Fields[size]"))
@ -33,10 +41,7 @@ function process_message ()
local c = channels[channel]
if not c then
channels[channel] = circular_buffer.new(rows, 2, sec_per_row)
c = channels[channel]
c:set_header(1, "Requests")
c:set_header(2, "Total Size", "KiB")
c = add_channel(channel)
end
c:add(ts, REQUESTS, 1)
c:add(ts, TOTAL_SIZE, rs)
@ -45,10 +50,8 @@ function process_message ()
end
function timer_event(ns)
output(all)
inject_message("cbuf", "ALL")
for k, v in pairs(channels) do
output(v)
inject_message("cbuf", k)
inject_message(v:format("cbuf"), k)
inject_message(v:format("cbufd"), k)
end
end

Просмотреть файл

@ -1,54 +0,0 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
local rows = 1440
local sec_per_row = 60 * 60
all = circular_buffer.new(rows, 2, sec_per_row)
local REQUESTS = all:set_header(1, "Requests")
local TOTAL_SIZE = all:set_header(2, "Total Size", "KiB")
channels = {}
function process_message ()
local ts = read_message("Timestamp")
local rs = tonumber(read_message("Fields[size]"))
local url = read_message("Fields[url]")
local cnt = all:add(ts, REQUESTS, 1)
if not cnt then return 0 end -- outside the buffer
if rs then
rs = rs / 1024
else
rs = 0
end
all:add(ts, TOTAL_SIZE, rs)
local channel = url:match("^/submit/telemetry/[^/]+/[^/]+/[^/]+/[^/]+/([^/]+)")
if not channel then return 0 end
if channel ~= "release" and channel ~= "beta" and channel ~= "aurora" and channel ~= "nightly" then
channel = "other"
end
local c = channels[channel]
if not c then
channels[channel] = circular_buffer.new(rows, 2, sec_per_row)
c = channels[channel]
c:set_header(1, "Requests")
c:set_header(2, "Total Size", "KiB")
end
c:add(ts, REQUESTS, 1)
c:add(ts, TOTAL_SIZE, rs)
return 0
end
function timer_event(ns)
output(all)
inject_message("cbuf", "ALL")
for k, v in pairs(channels) do
output(v)
inject_message("cbuf", k)
end
end

Просмотреть файл

@ -25,6 +25,7 @@ from telemetry.convert import Converter, BadPayloadError
from telemetry.revision_cache import RevisionCache
from telemetry.persist import StorageLayout
import boto.sqs
import traceback
S3FUNNEL_PATH = "/usr/local/bin/s3funnel"
def fetch_s3_files(incoming_files, fetch_cwd, bucket, aws_key, aws_secret_key):
@ -164,6 +165,10 @@ class ReadRawStep(PipeStep):
print self.label, "ERROR: Found corrupted data for record", record_count, "in", raw_file, "path:", path, "Error:", err
self.bad_records += 1
continue
if len(data) == 0:
print self.label, "ERROR: Found empty data for record", record_count, "in", raw_file, "path:", path
self.bad_records += 1
continue
# Incoming timestamps are in milliseconds, so convert to POSIX first
# (ie. seconds)
@ -232,6 +237,7 @@ class ReadRawStep(PipeStep):
if err_message != "Missing in payload: info.revision":
# TODO: recognize other common failure modes and handle them gracefully.
self.write_bad_record(key, dims, data, err_message, "Conversion Error:")
traceback.print_exc()
if self.print_stats:
this_update = datetime.now()
@ -248,6 +254,7 @@ class ReadRawStep(PipeStep):
except Exception, e:
# Corrupted data, let's skip this record.
print self.label, "- Error reading raw data from ", raw_file, e
traceback.print_exc()
def write_bad_record(self, key, dims, data, error, message=None):
self.bad_records += 1

Просмотреть файл

@ -5,7 +5,7 @@
"ssl_retries": 10,
"base_dir": "/mnt/telemetry",
"instance_type": "t1.micro",
"image": "ami-bf1d8a8f",
"image": "ami-ace67f9c",
"security_groups": ["telemetry"],
"region": "us-west-2",
"placement": "us-west-2c",

Просмотреть файл

@ -47,7 +47,12 @@ def create_instance(config, aws_key=None, aws_secret_key=None):
print "Creating a new instance of type", itype
# Known images:
# ami-bf1d8a8f == Ubuntu 13.04
# ami-76831f46 == telemetry-base (based on Ubuntu 13.04 with dependencies already installed)
# ami-ace67f9c == Ubuntu 13.10
# ami-76831f46 == telemetry-base - based on Ubuntu 13.04 with dependencies
# already installed
# ami-260c9516 == telemetry-server - Based on Ubuntu 13.10, everything is
# ready to go, server will be auto-started on boot. Does
# NOT auto-start process-incoming.
# See if ephemerals have been specified
mapping = None

Просмотреть файл

@ -46,12 +46,31 @@ class TelemetryServerLauncher(Launcher):
def start_suid_script(self, c_file, username):
sudo("echo 'setuid {1}' > {0}".format(c_file, username))
sudo("echo 'setgid {1}' >> {0}".format(c_file, username))
# Set the ulimit for # open files in the upstart scripts (since the
# ones set in limits.conf don't seem to apply here)
sudo("echo 'limit nofile 10000 40000' >> " + c_file)
sudo("echo 'script' >> " + c_file)
def end_suid_script(self, c_file):
sudo("echo 'end script' >> {0}".format(c_file))
sudo("echo 'respawn' >> {0}".format(c_file))
def create_logrotate_config(self, lr_file, target_log, create=True):
sudo("echo '%s {' > %s" % (target_log, lr_file))
sudo("echo ' su {1} {1}' >> {0}".format(lr_file, self.ssl_user))
sudo("echo ' rotate 5' >> {0}".format(lr_file))
sudo("echo ' daily' >> {0}".format(lr_file))
sudo("echo ' compress' >> {0}".format(lr_file))
sudo("echo ' missingok' >> {0}".format(lr_file))
if create:
sudo("echo ' create 640 {1} {1}' >> {0}".format(lr_file, self.ssl_user))
else:
sudo("echo ' copytruncate' >> {0}".format(lr_file))
sudo("echo '}' >> " + lr_file)
with settings(warn_only=True):
# This will warn if there's no file there.
sudo("logrotate -f {0}".format(lr_file))
def post_install(self, instance):
# Install some more:
self.install_nodejs_bin()
@ -82,19 +101,11 @@ class TelemetryServerLauncher(Launcher):
run("echo 'Soft limit:'; ulimit -S -n")
run("echo 'Hard limit:'; ulimit -H -n")
# Setup logrotate for the stats log
lr_file = "/etc/logrotate.d/telemetry"
sudo("echo '/var/log/telemetry/telemetry-server.log {' > " + lr_file)
sudo("echo ' su {1} {1}' >> {0}".format(lr_file, self.ssl_user))
sudo("echo ' rotate 10' >> {0}".format(lr_file))
sudo("echo ' daily' >> {0}".format(lr_file))
sudo("echo ' compress' >> {0}".format(lr_file))
sudo("echo ' missingok' >> {0}".format(lr_file))
sudo("echo ' create 640 {1} {1}' >> {0}".format(lr_file, self.ssl_user))
sudo("echo '}' >> " + lr_file)
with settings(warn_only=True):
# This will warn if there's no file there.
sudo("logrotate -f /etc/logrotate.d/telemetry")
# Setup logrotate for the stats log and process-incoming log
self.create_logrotate_config("/etc/logrotate.d/telemetry",
"/var/log/telemetry/telemetry-server.log")
self.create_logrotate_config("/etc/logrotate.d/telemetry-incoming",
"/var/log/telemetry/telemetry-incoming.out", False)
# Create startup scripts:
code_base = "/home/" + self.ssl_user + "/telemetry-server"
@ -103,12 +114,25 @@ class TelemetryServerLauncher(Launcher):
sudo("echo ' cd {1}/server' >> {0}".format(c_file, code_base))
sudo("echo ' /usr/local/bin/node ./server.js ./server_config.json >> /var/log/telemetry/telemetry-server.out' >> {0}".format(c_file))
self.end_suid_script(c_file)
sudo("echo 'start on runlevel [2345]' >> {0}".format(c_file))
sudo("echo 'stop on runlevel [016]' >> {0}".format(c_file))
c_file = "/etc/init/telemetry-export.conf"
base_export_command = "/usr/bin/python -u ./export.py -d {0}/data -p '^telemetry.log.*[.]finished$' -k '{1}' -s '{2}' -r '{3}' -b '{4}' -q '{5}' --remove-file".format(base_dir, self.aws_key, self.aws_secret_key, self.config["region"], self.config.get("incoming_bucket", "telemetry-incoming"), self.config.get("incoming_queue", "telemetry-incoming"))
self.start_suid_script(c_file, self.ssl_user)
sudo("echo ' cd {1}' >> {0}".format(c_file, code_base))
sudo("echo \" /usr/bin/python -u ./export.py -d {1}/data -p '^telemetry.log.*[.]finished$' -k '{2}' -s '{3}' -r '{4}' -b '{5}' -q '{6}' --remove-files --loop >> /var/log/telemetry/telemetry-export.out\" >> {0}".format(c_file, base_dir, self.aws_key, self.aws_secret_key, self.config["region"], self.config.get("incoming_bucket", "telemetry-incoming"), self.config.get("incoming_queue", "telemetry-incoming")))
sudo("echo \" {1} --loop >> /var/log/telemetry/telemetry-export.out\" >> {0}".format(c_file, base_export_command))
self.end_suid_script(c_file)
# after we receive "stop", run once more in non-looping mode to make
# sure we exported everything.
sudo("echo 'post-stop script' >> {0}".format(c_file))
sudo("echo ' cd {1}' >> {0}".format(c_file, code_base))
sudo("echo \" {1} >> /var/log/telemetry/telemetry-export.out\" >> {0}".format(c_file, base_export_command))
sudo("echo 'end script' >> {0}".format(c_file))
# Start/stop this in lock step with telemetry-server
sudo("echo 'start on started telemetry-server' >> {0}".format(c_file))
sudo("echo 'stop on stopped telemetry-server' >> {0}".format(c_file))
# Install a specific aws_incoming.json to use
process_incoming_config = self.config.get("process_incoming_config", "aws_incoming.json")
@ -120,6 +144,9 @@ class TelemetryServerLauncher(Launcher):
# Use unbuffered output (-u) so we can see things in the log
# immediately.
sudo("echo \" /usr/bin/python -u process_incoming_queue.py -k '{1}' -s '{2}' ./aws_incoming.json >> /var/log/telemetry/telemetry-incoming.out\" >> {0}".format(c_file, self.aws_key, self.aws_secret_key))
# NOTE: Don't automatically start/stop this service, since we only want
# to start it on "primary" nodes, and we only want to stop it in
# safe parts of the process-incoming code.
self.end_suid_script(c_file)
c_file = "/etc/init/telemetry-heka.conf"
@ -127,27 +154,25 @@ class TelemetryServerLauncher(Launcher):
sudo("echo ' cd {1}/heka' >> {0}".format(c_file, code_base))
sudo("echo \" /usr/bin/hekad -config heka.toml >> /var/log/telemetry/telemetry-heka.out\" >> {0}".format(c_file))
self.end_suid_script(c_file)
sudo("echo 'kill signal INT' >> {0}".format(c_file))
# Start/stop this in lock step with telemetry-server
sudo("echo 'start on started telemetry-server' >> {0}".format(c_file))
sudo("echo 'stop on stopped telemetry-server' >> {0}".format(c_file))
def run(self, instance):
# Start up HTTP server
sudo("start telemetry-server")
print "Telemetry server started"
# Start up exporter
sudo("start telemetry-export")
print "Telemetry export started"
# Note: This also starts up telemetry-export and telemetry-heka due to dependencies.
# Start up 'process incoming' only on the primary node
# TODO: pass in user-data to set this.
if self.config.get("primary_server", False):
sudo("start telemetry-incoming")
print "Telemetry incoming started"
else:
print "Not starting telemetry-incoming since this is not a primary server"
# Start up heka
sudo("start telemetry-heka")
print "Heka daemon started"
def main():
try:
launcher = TelemetryServerLauncher()

Просмотреть файл

@ -0,0 +1,10 @@
{
"motd": " ==== Spot Telemetry Server. Accepting Submissions since 2013. ====",
"max_data_lenth": 204800,
"max_path_lenth": 10240,
"port": 8080,
"log_path": "/mnt/telemetry/data",
"max_log_age_ms": 120000,
"max_log_size": 50000000,
"stats_log_file": "/var/log/telemetry/telemetry-server.log"
}

Просмотреть файл

@ -254,7 +254,7 @@ def main(argv=None):
uploadables = exporter.find_uploadables(args.data_dir)
if len(uploadables) == 0:
print "No files to export yet. Sleeping for a while..."
time.sleep(60)
time.sleep(10)
continue
print "Processing", len(uploadables), "uploadables:"

Просмотреть файл

@ -61,7 +61,7 @@ def unpack(filename, raw=False, verbose=False):
data = fin.read(len_data)
error = None
if not raw:
if ord(data[0]) == 0x1f and ord(data[1]) == 0x8b:
if len(data) > 1 and ord(data[0]) == 0x1f and ord(data[1]) == 0x8b:
# Data is gzipped, uncompress it:
try:
reader = StringIO.StringIO(data)