This commit is contained in:
Nick Gauthier 2016-10-31 18:35:23 -04:00
Родитель 83680ccff3
Коммит da22d038e6
4 изменённых файлов: 96 добавлений и 81 удалений

Просмотреть файл

@ -37,10 +37,14 @@ module LightStep
child_of_guid: nil,
trace_guid:,
start_micros:,
tags: nil
tags: nil,
max_log_records:
)
@tags = Concurrent::Hash.new(tags)
@baggage = Concurrent::Hash.new
@log_records = Concurrent::Array.new
@dropped_logs = Concurrent::AtomicFixnum.new
@max_log_records = max_log_records
@tracer = tracer
@guid = LightStep.guid
@ -87,12 +91,26 @@ module LightStep
# @param timestamp [Time] time of the log
# @param fields [Hash] Additional information to log
def log(event: nil, timestamp: Time.now, **fields)
@tracer.raw_log_record(
timestamp: timestamp,
stable_name: event,
span_guid: @guid,
fields: fields
)
return unless tracer.enabled?
record = {
runtime_guid: tracer.guid,
timestamp_micros: LightStep.micros(timestamp)
}
record[:stable_name] = event.to_s if !event.nil?
begin
record[:payload_json] = JSON.generate(fields, max_nesting: 8)
rescue
# TODO: failure to encode a payload as JSON should be recorded in the
# internal library logs, with catioun not flooding the internal logs.
end
@log_records.push(record)
if @log_records.size > @max_log_records
@log_records.shift
@dropped_logs.increment
end
end
# Finish the {Span}
@ -115,8 +133,18 @@ module LightStep
},
oldest_micros: start_micros,
youngest_micros: end_micros,
error_flag: false
error_flag: false,
dropped_logs: dropped_logs_count,
log_records: @log_records
}
end
def dropped_logs_count
@dropped_logs.value
end
def logs_count
@log_records.size
end
end
end

Просмотреть файл

@ -1,6 +1,5 @@
require 'json'
require 'concurrent'
require 'concurrent/channel'
require 'lightstep/span'
require 'lightstep/transport/http_json'
@ -94,7 +93,8 @@ module LightStep
child_of_guid: child_of_guid,
trace_guid: trace_guid,
start_micros: start_time.nil? ? LightStep.micros(Time.now) : LightStep.micros(start_time),
tags: tags
tags: tags,
max_log_records: max_log_records
)
end
@ -158,30 +158,13 @@ module LightStep
# @private
def _finish_span(span)
return unless enabled?
@dropped_spans.increment if @span_records.put(span.to_h)
flush_if_needed
end
# Internal use only
# @private
def raw_log_record(timestamp: Time.now, stable_name: nil, span_guid:, fields: {})
return unless enabled?
record = {
runtime_guid: guid,
timestamp_micros: LightStep.micros(timestamp)
}
record[:stable_name] = stable_name.to_s if !stable_name.nil?
begin
record[:payload_json] = JSON.generate(fields, max_nesting: 8)
rescue
# TODO: failure to encode a payload as JSON should be recorded in the
# internal library logs, with catioun not flooding the internal logs.
@span_records.push(span.to_h)
if @span_records.size > max_span_records
@span_records.shift
@dropped_spans.increment
@dropped_span_logs.increment(span.logs_count + span.dropped_logs_count)
end
full = push_with_max(@log_records, record, max_log_records)
@dropped_logs.increment if full
flush_if_needed
end
protected
@ -197,10 +180,9 @@ module LightStep
raise ConfigurationError, "component_name must be a string" unless String === component_name
raise ConfigurationError, "component_name cannot be blank" if component_name.empty?
@log_records = Concurrent::Array.new
@span_records = Concurrent::Channel::Buffer::Sliding.new(max_span_records)
@dropped_logs = Concurrent::AtomicFixnum.new
@span_records = Concurrent::Array.new
@dropped_spans = Concurrent::AtomicFixnum.new
@dropped_span_logs = Concurrent::AtomicFixnum.new
start_time = LightStep.micros(Time.now)
@guid = LightStep.guid
@ -238,33 +220,13 @@ module LightStep
end
end
def push_with_max(arr, item, max)
max = 1 if max < 1
arr.push item
if arr.size <= max
return false
end
while arr.size > max
arr.delete_at(rand(arr.size))
end
return true
end
def flush_if_needed
return unless enabled?
delta = LightStep.micros(Time.now) - @last_flush_micros
# Set a bound on maximum flush frequency
return if delta < min_flush_period_micros
# Look for a trigger that a flush is warranted
# Set a bound of minimum flush frequency
if delta > max_flush_period_micros ||
@log_records.length >= max_log_records / 2 ||
@span_records.size >= max_span_records / 2
if delta > max_flush_period_micros || @span_records.size >= max_span_records / 2
flush
end
end
@ -288,6 +250,7 @@ module LightStep
start_micros: LightStep.micros(Time.now),
child_of_guid: carrier[CARRIER_TRACER_STATE_PREFIX + 'spanid'],
trace_guid: carrier[CARRIER_TRACER_STATE_PREFIX + 'traceid'],
max_log_records: max_log_records
)
carrier.each do |key, value|
@ -300,33 +263,28 @@ module LightStep
def _flush_worker
return unless enabled?
now = LightStep.micros(Time.now)
# The thrift configuration has not yet been set: allow logs and spans
# to be buffered in this case, but flushes won't yet be possible.
return if @runtime.nil?
return if @span_records.empty?
return if @log_records.empty? && @span_records.empty?
now = LightStep.micros(Time.now)
span_records = []
loop do
span = @span_records.poll
break if span == Concurrent::NULL
span_records.push(span)
end
log_records = @log_records.slice!(0, @log_records.length)
dropped_logs = 0
span_records = @span_records.slice!(0, @span_records.length)
dropped_spans = 0
@dropped_logs.update{|old| dropped_logs = old; 0 }
@dropped_spans.update{|old| dropped_spans = old; 0 }
old_dropped_span_logs = 0
@dropped_span_logs.update{|old| old_dropped_span_logs = old; 0 }
dropped_logs = old_dropped_span_logs
dropped_logs = span_records.reduce(dropped_logs) do |memo, span|
memo += span.delete :dropped_logs
end
report_request = {
runtime: @runtime,
oldest_micros: @report_start_time.to_i,
youngest_micros: now.to_i,
log_records: log_records,
oldest_micros: @report_start_time,
youngest_micros: now,
span_records: span_records,
counters: [
{Name: "dropped_logs", Value: dropped_logs},
@ -343,8 +301,8 @@ module LightStep
# If the queue is full, add the previous dropped logs to the logs
# that were going to get reported, as well as the previous dropped
# spans and spans that would have been recorded
@dropped_logs.increment(dropped_logs + log_records.length)
@dropped_spans.increment(dropped_spans + span_records.length)
@dropped_span_logs.increment(old_dropped_span_logs)
end
end
end

Просмотреть файл

@ -17,7 +17,6 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.add_dependency 'concurrent-ruby', '~> 1.0.0'
spec.add_dependency 'concurrent-ruby-edge', '= 0.2.2'
spec.add_development_dependency 'rake', '~> 11.3.0'
spec.add_development_dependency 'rack', '~> 2.0.0'
spec.add_development_dependency 'rspec', '~> 3.0'

Просмотреть файл

@ -198,10 +198,10 @@ describe LightStep do
s0.finish
tracer.flush
expect(result).to include(:runtime, :span_records, :log_records, :oldest_micros, :youngest_micros)
expect(result).to include(:runtime, :span_records, :oldest_micros, :youngest_micros)
expect(result[:span_records].length).to eq(1)
expect(result[:log_records].length).to eq(1)
expect(result[:span_records][0][:log_records].length).to eq(1)
expect(result[:oldest_micros]).to be <= result[:youngest_micros]
# Decompose back into a plain hash
@ -223,7 +223,7 @@ describe LightStep do
s0.log(event: 'test_event', **fields)
s0.finish
tracer.flush
JSON.generate(JSON.parse(result[:log_records][0][:payload_json]))
JSON.generate(JSON.parse(result[:span_records][0][:log_records][0][:payload_json]))
end
# NOTE: these comparisons rely on Ruby generating a consistent ordering to
@ -276,7 +276,10 @@ describe LightStep do
tracer.flush
expect(result[:span_records].length).to eq(65)
expect(result[:log_records].length).to eq(64 * 10)
result[:span_records].each do |span|
expect(span[:log_records].length).to eq(10) unless span[:span_name] == "parent_span"
end
end
it 'should handle concurrent tracers' do
@ -305,7 +308,10 @@ describe LightStep do
for i in 1..8
r = results[i]
expect(r[:span_records].length).to eq(17)
expect(r[:log_records].length).to eq(16 * 10)
r[:span_records].each do |span|
expect(span[:log_records].length).to eq(10) unless span[:span_name] == "parent_span"
expect(span[:log_records].length).to eq(0) if span[:span_name] == "parent_span"
end
end
end
@ -327,4 +333,28 @@ describe LightStep do
tracer.enable
tracer.enable
end
it 'should report dropped spans and logs' do
result = nil
tracer = init_callback_tracer(proc { |obj| result = obj })
tracer.max_span_records = 5
tracer.max_log_records = 5
(1..10).map do |i|
Thread.new do
span = tracer.start_span("span_#{i}")
(1..10).map do |j|
Thread.new do
span.log(j: j)
end
end.map(&:join)
span.finish
end
end.map(&:join)
tracer.flush
expect(result[:counters]).to eq([
{Name: "dropped_logs", Value: 75},
{Name: "dropped_spans", Value: 5}
])
end
end