nonblocking queue push and rescue and drop when full
This commit is contained in:
Родитель
c93e69a7f1
Коммит
29cdc1ce15
|
@ -342,26 +342,22 @@ module LightStep
|
|||
|
||||
@tracer_last_flush_micros = now
|
||||
|
||||
resp = @tracer_transport.report(report_request)
|
||||
|
||||
# ALWAYS reset the buffers and update the counters as the RPC response
|
||||
# is, by design, not waited for and not reliable.
|
||||
@tracer_report_start_time = now
|
||||
@tracer_log_records = []
|
||||
@tracer_span_records = []
|
||||
@tracer_counters.each do |key, _value|
|
||||
@tracer_counters[key] = 0
|
||||
end
|
||||
|
||||
# Process server response commands
|
||||
if !resp.nil? && Array === resp.commands
|
||||
resp.commands.each do |cmd|
|
||||
disable if cmd.disable
|
||||
begin
|
||||
@tracer_transport.report(report_request)
|
||||
# reset the counters if it was queued
|
||||
@tracer_counters.each do |key, _value|
|
||||
@tracer_counters[key] = 0
|
||||
end
|
||||
rescue LightStep::Transport::HTTPJSON::QueueFullError
|
||||
# If the queue is full, preserve counters and add logs and spans to them
|
||||
@tracer_counters[:dropped_logs] += @tracer_log_records.size
|
||||
@tracer_counters[:dropped_spans] += @tracer_span_records.size
|
||||
ensure
|
||||
# Always reset records to discard when lagging
|
||||
@tracer_report_start_time = now
|
||||
@tracer_log_records = []
|
||||
@tracer_span_records = []
|
||||
end
|
||||
|
||||
# TODO(ngauthier@gmail.com): log unknown commands
|
||||
# TODO(ngauthier@gmail.com): log errors from server
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -15,6 +15,8 @@ module LightStep
|
|||
LIGHTSTEP_PORT = 443
|
||||
QUEUE_SIZE = 16
|
||||
|
||||
class QueueFullError < LightStep::Error; end
|
||||
|
||||
def initialize(host: LIGHTSTEP_HOST, port: LIGHTSTEP_PORT, verbose: 0, secure: true, access_token:)
|
||||
@host = host
|
||||
@port = port
|
||||
|
@ -32,15 +34,17 @@ module LightStep
|
|||
p report if @verbose >= 3
|
||||
# TODO(ngauthier@gmail.com): the queue could be full here if we're
|
||||
# lagging, which would cause this to block!
|
||||
@queue << {
|
||||
@queue.push({
|
||||
host: @host,
|
||||
port: @port,
|
||||
secure: @secure,
|
||||
access_token: @access_token,
|
||||
content: report,
|
||||
verbose: @verbose
|
||||
}
|
||||
}, true)
|
||||
nil
|
||||
rescue ThreadError
|
||||
raise QueueFullError
|
||||
end
|
||||
|
||||
def flush
|
||||
|
@ -84,6 +88,9 @@ module LightStep
|
|||
res = https.request(req)
|
||||
|
||||
puts res.to_s if params[:verbose] >= 3
|
||||
|
||||
# TODO(ngauthier@gmail.com): log unknown commands
|
||||
# TODO(ngauthier@gmail.com): log errors from server
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Загрузка…
Ссылка в новой задаче