final pass through tracer
This commit is contained in:
Родитель
34d46a848b
Коммит
17c6eabe72
|
@ -29,15 +29,15 @@ module LightStep
|
|||
# Implemenation specific
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
# TODO(ngauthier@gmail.com) document all options, convert to keyword args
|
||||
# Creates a new tracer instance.
|
||||
#
|
||||
# Initialize a new tracer. Either an access_token or a transport must be
|
||||
# provided. A component_name is always required.
|
||||
# @param $component_name Component name to use for the tracer
|
||||
# @param $access_token The project access token
|
||||
# @param $access_token The project access token when pushing to LightStep
|
||||
# @param $transport LightStep::Transport to use
|
||||
# @return LightStepBase_Tracer
|
||||
# @throws Exception if the group name or access token is not a valid string.
|
||||
def initialize(options = {})
|
||||
configure(options)
|
||||
# @throws LightStep::ConfigurationError if the group name or access token is not a valid string.
|
||||
def initialize(component_name:, access_token: nil, transport: nil)
|
||||
configure(component_name: component_name, access_token: access_token, transport: transport)
|
||||
end
|
||||
|
||||
def max_log_records
|
||||
|
@ -110,7 +110,7 @@ module LightStep
|
|||
def inject(span, format, carrier)
|
||||
case format
|
||||
when LightStep::Tracer::FORMAT_TEXT_MAP
|
||||
_inject_to_text_map(span, carrier)
|
||||
inject_to_text_map(span, carrier)
|
||||
when LightStep::Tracer::FORMAT_BINARY
|
||||
warn 'Binary inject format not yet implemented'
|
||||
else
|
||||
|
@ -121,7 +121,7 @@ module LightStep
|
|||
def join(operation_name, format, carrier)
|
||||
case format
|
||||
when LightStep::Tracer::FORMAT_TEXT_MAP
|
||||
_join_from_text_map(operation_name, carrier)
|
||||
join_from_text_map(operation_name, carrier)
|
||||
when LightStep::Tracer::FORMAT_BINARY
|
||||
warn 'Binary join format not yet implemented'
|
||||
nil
|
||||
|
@ -131,54 +131,25 @@ module LightStep
|
|||
end
|
||||
end
|
||||
|
||||
# FIXME(ngauthier@gmail.com) private
|
||||
def _inject_to_text_map(span, carrier)
|
||||
carrier[LightStep::Tracer::CARRIER_TRACER_STATE_PREFIX + 'spanid'] = span.guid
|
||||
carrier[LightStep::Tracer::CARRIER_TRACER_STATE_PREFIX + 'traceid'] = span.trace_guid unless span.trace_guid.nil?
|
||||
carrier[LightStep::Tracer::CARRIER_TRACER_STATE_PREFIX + 'sampled'] = 'true'
|
||||
|
||||
span.baggage.each do |key, value|
|
||||
carrier[LightStep::Tracer::CARRIER_BAGGAGE_PREFIX + key] = value
|
||||
end
|
||||
end
|
||||
|
||||
# FIXME(ngauthier@gmail.com) private
|
||||
def _join_from_text_map(operation_name, carrier)
|
||||
span = Span.new(self)
|
||||
span.set_operation_name(operation_name)
|
||||
span.set_start_micros(now_micros)
|
||||
|
||||
parent_guid = carrier[LightStep::Tracer::CARRIER_TRACER_STATE_PREFIX + 'spanid']
|
||||
trace_guid = carrier[LightStep::Tracer::CARRIER_TRACER_STATE_PREFIX + 'traceid']
|
||||
span.trace_guid = trace_guid
|
||||
span.set_tag(:parent_span_guid, parent_guid)
|
||||
|
||||
carrier.each do |key, value|
|
||||
next unless key.start_with?(LightStep::Tracer::CARRIER_BAGGAGE_PREFIX)
|
||||
plain_key = key.to_s[LightStep::Tracer::CARRIER_BAGGAGE_PREFIX.length..key.to_s.length]
|
||||
span.set_baggage_item(plain_key, value)
|
||||
end
|
||||
span
|
||||
end
|
||||
|
||||
# The GUID of the tracer
|
||||
def guid
|
||||
@_guid ||= generate_guid
|
||||
@guid ||= generate_guid
|
||||
end
|
||||
|
||||
# @return true if the tracer is enabled
|
||||
def enabled?
|
||||
@_enabled ||= true
|
||||
@enabled ||= true
|
||||
end
|
||||
|
||||
# Enables the tracer
|
||||
def enable
|
||||
@_enabled = true
|
||||
@enabled = true
|
||||
end
|
||||
|
||||
# Disables the tracer
|
||||
# @param discard [Boolean] whether to discard queued data
|
||||
def disable(discard: false)
|
||||
@_enabled = false
|
||||
@enabled = false
|
||||
@tracer_transport.close(discard)
|
||||
end
|
||||
|
||||
|
@ -186,54 +157,6 @@ module LightStep
|
|||
_flush_worker
|
||||
end
|
||||
|
||||
# FIXME(ngauthier@gmail.com) private
|
||||
def _flush_worker
|
||||
return unless enabled?
|
||||
|
||||
now = now_micros
|
||||
|
||||
# The thrift configuration has not yet been set: allow logs and spans
|
||||
# to be buffered in this case, but flushes won't yet be possible.
|
||||
return if @tracer_thrift_runtime.nil?
|
||||
|
||||
return if @tracer_log_records.empty? && @tracer_span_records.empty?
|
||||
|
||||
# Convert the counters to thrift form
|
||||
thrift_counters = @tracer_counters.map do |key, value|
|
||||
{"Name" => key.to_s, "Value" => value.to_i}
|
||||
end
|
||||
|
||||
report_request = {
|
||||
'runtime' => @tracer_thrift_runtime,
|
||||
'oldest_micros' => @tracer_report_start_time.to_i,
|
||||
'youngest_micros' => now.to_i,
|
||||
'log_records' => @tracer_log_records,
|
||||
'span_records' => @tracer_span_records,
|
||||
'counters' => thrift_counters
|
||||
}
|
||||
|
||||
@tracer_last_flush_micros = now
|
||||
|
||||
resp = @tracer_transport.flush_report(report_request)
|
||||
|
||||
# ALWAYS reset the buffers and update the counters as the RPC response
|
||||
# is, by design, not waited for and not reliable.
|
||||
@tracer_report_start_time = now
|
||||
@tracer_log_records = []
|
||||
@tracer_span_records = []
|
||||
@tracer_counters.each do |key, _value|
|
||||
@tracer_counters[key] = 0
|
||||
end
|
||||
|
||||
# Process server response commands
|
||||
# FIXME(ngauthier@gmail.com) triple equals
|
||||
if !resp.nil? && resp.commands.class.name == 'Array'
|
||||
resp.commands.each do |cmd|
|
||||
disable if cmd.disable
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Internal use only.
|
||||
def _finish_span(span)
|
||||
return unless enabled?
|
||||
|
@ -255,22 +178,22 @@ module LightStep
|
|||
|
||||
# TODO: data scrubbing and size limiting
|
||||
json = nil
|
||||
# FIXME(ngauthier@gmail.com) triple equals
|
||||
# FIXME(ngauthier@gmail.com) can do as a type case
|
||||
if payload.is_a?(Array) || payload.is_a?(Hash)
|
||||
case payload
|
||||
when Array, Hash
|
||||
begin
|
||||
json = JSON.generate(payload, max_nesting: 8)
|
||||
fields['payload_json'] = JSON.generate(payload, max_nesting: 8)
|
||||
rescue
|
||||
# TODO(ngauthier@gmail.com) naked rescue
|
||||
# TODO: failure to encode a payload as JSON should be recorded in the
|
||||
# internal library logs, with catioun not flooding the internal logs.
|
||||
end
|
||||
elsif !payload.nil?
|
||||
when nil
|
||||
# noop
|
||||
else
|
||||
# TODO: Remove the outer 'payload' key wrapper. Just transport the JSON
|
||||
# Value (Value in the sense of the JSON spec).
|
||||
json = JSON.generate(payload: payload)
|
||||
fields['payload_json'] = JSON.generate(payload: payload)
|
||||
end
|
||||
# FIXME(ngauthier@gmail.com) triple equal String
|
||||
fields['payload_json'] = json if json.class.name == 'String'
|
||||
|
||||
full = push_with_max(@tracer_log_records, fields, max_log_records)
|
||||
@tracer_counters[:dropped_logs] += 1 if full
|
||||
|
@ -292,11 +215,7 @@ module LightStep
|
|||
@access_token = token
|
||||
end
|
||||
|
||||
def configure(
|
||||
component_name:,
|
||||
access_token: nil,
|
||||
transport: nil
|
||||
)
|
||||
def configure(component_name:, access_token: nil, transport: nil)
|
||||
raise ConfigurationError, "component_name must be a string" unless String === component_name
|
||||
raise ConfigurationError, "component_name cannot be blank" if component_name.empty?
|
||||
|
||||
|
@ -320,7 +239,7 @@ module LightStep
|
|||
{"Key" => "lightstep_tracer_version", "Value" => LightStep::Tracer::VERSION},
|
||||
{"Key" => "ruby_version", "Value" => RUBY_VERSION}
|
||||
]
|
||||
}
|
||||
}.freeze
|
||||
|
||||
if !transport.nil?
|
||||
if !(LightStep::Transport::Base === transport)
|
||||
|
@ -378,5 +297,81 @@ module LightStep
|
|||
def now_micros
|
||||
(Time.now.to_f * 1e6).floor
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inject_to_text_map(span, carrier)
|
||||
carrier[CARRIER_TRACER_STATE_PREFIX + 'spanid'] = span.guid
|
||||
carrier[CARRIER_TRACER_STATE_PREFIX + 'traceid'] = span.trace_guid unless span.trace_guid.nil?
|
||||
carrier[CARRIER_TRACER_STATE_PREFIX + 'sampled'] = 'true'
|
||||
|
||||
span.baggage.each do |key, value|
|
||||
carrier[CARRIER_BAGGAGE_PREFIX + key] = value
|
||||
end
|
||||
end
|
||||
|
||||
def join_from_text_map(operation_name, carrier)
|
||||
span = Span.new(self)
|
||||
span.set_operation_name(operation_name)
|
||||
span.set_start_micros(now_micros)
|
||||
|
||||
parent_guid = carrier[CARRIER_TRACER_STATE_PREFIX + 'spanid']
|
||||
trace_guid = carrier[CARRIER_TRACER_STATE_PREFIX + 'traceid']
|
||||
span.trace_guid = trace_guid
|
||||
span.set_tag(:parent_span_guid, parent_guid)
|
||||
|
||||
carrier.each do |key, value|
|
||||
next unless key.start_with?(CARRIER_BAGGAGE_PREFIX)
|
||||
plain_key = key.to_s[CARRIER_BAGGAGE_PREFIX.length..key.to_s.length]
|
||||
span.set_baggage_item(plain_key, value)
|
||||
end
|
||||
span
|
||||
end
|
||||
|
||||
def _flush_worker
|
||||
return unless enabled?
|
||||
|
||||
now = now_micros
|
||||
|
||||
# The thrift configuration has not yet been set: allow logs and spans
|
||||
# to be buffered in this case, but flushes won't yet be possible.
|
||||
return if @tracer_thrift_runtime.nil?
|
||||
|
||||
return if @tracer_log_records.empty? && @tracer_span_records.empty?
|
||||
|
||||
# Convert the counters to thrift form
|
||||
thrift_counters = @tracer_counters.map do |key, value|
|
||||
{"Name" => key.to_s, "Value" => value.to_i}
|
||||
end
|
||||
|
||||
report_request = {
|
||||
'runtime' => @tracer_thrift_runtime,
|
||||
'oldest_micros' => @tracer_report_start_time.to_i,
|
||||
'youngest_micros' => now.to_i,
|
||||
'log_records' => @tracer_log_records,
|
||||
'span_records' => @tracer_span_records,
|
||||
'counters' => thrift_counters
|
||||
}
|
||||
|
||||
@tracer_last_flush_micros = now
|
||||
|
||||
resp = @tracer_transport.flush_report(report_request)
|
||||
|
||||
# ALWAYS reset the buffers and update the counters as the RPC response
|
||||
# is, by design, not waited for and not reliable.
|
||||
@tracer_report_start_time = now
|
||||
@tracer_log_records = []
|
||||
@tracer_span_records = []
|
||||
@tracer_counters.each do |key, _value|
|
||||
@tracer_counters[key] = 0
|
||||
end
|
||||
|
||||
# Process server response commands
|
||||
if !resp.nil? && Array === resp.commands
|
||||
resp.commands.each do |cmd|
|
||||
disable if cmd.disable
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Загрузка…
Ссылка в новой задаче