diff --git a/lib/lightstep/span.rb b/lib/lightstep/span.rb index 955a0d6..cb606ae 100644 --- a/lib/lightstep/span.rb +++ b/lib/lightstep/span.rb @@ -37,10 +37,14 @@ module LightStep child_of_guid: nil, trace_guid:, start_micros:, - tags: nil + tags: nil, + max_log_records: ) @tags = Concurrent::Hash.new(tags) @baggage = Concurrent::Hash.new + @log_records = Concurrent::Array.new + @dropped_logs = Concurrent::AtomicFixnum.new + @max_log_records = max_log_records @tracer = tracer @guid = LightStep.guid @@ -87,12 +91,26 @@ module LightStep # @param timestamp [Time] time of the log # @param fields [Hash] Additional information to log def log(event: nil, timestamp: Time.now, **fields) - @tracer.raw_log_record( - timestamp: timestamp, - stable_name: event, - span_guid: @guid, - fields: fields - ) + return unless tracer.enabled? + + record = { + runtime_guid: tracer.guid, + timestamp_micros: LightStep.micros(timestamp) + } + record[:stable_name] = event.to_s if !event.nil? + + begin + record[:payload_json] = JSON.generate(fields, max_nesting: 8) + rescue + # TODO: failure to encode a payload as JSON should be recorded in the + # internal library logs, with catioun not flooding the internal logs. + end + + @log_records.push(record) + if @log_records.size > @max_log_records + @log_records.shift + @dropped_logs.increment + end end # Finish the {Span} @@ -115,8 +133,18 @@ module LightStep }, oldest_micros: start_micros, youngest_micros: end_micros, - error_flag: false + error_flag: false, + dropped_logs: dropped_logs_count, + log_records: @log_records } end + + def dropped_logs_count + @dropped_logs.value + end + + def logs_count + @log_records.size + end end end diff --git a/lib/lightstep/tracer.rb b/lib/lightstep/tracer.rb index 7a306b5..ccd9f2e 100644 --- a/lib/lightstep/tracer.rb +++ b/lib/lightstep/tracer.rb @@ -1,6 +1,5 @@ require 'json' require 'concurrent' -require 'concurrent/channel' require 'lightstep/span' require 'lightstep/transport/http_json' @@ -94,7 +93,8 @@ module LightStep child_of_guid: child_of_guid, trace_guid: trace_guid, start_micros: start_time.nil? ? LightStep.micros(Time.now) : LightStep.micros(start_time), - tags: tags + tags: tags, + max_log_records: max_log_records ) end @@ -158,30 +158,13 @@ module LightStep # @private def _finish_span(span) return unless enabled? - @dropped_spans.increment if @span_records.put(span.to_h) - flush_if_needed - end - - # Internal use only - # @private - def raw_log_record(timestamp: Time.now, stable_name: nil, span_guid:, fields: {}) - return unless enabled? - - record = { - runtime_guid: guid, - timestamp_micros: LightStep.micros(timestamp) - } - record[:stable_name] = stable_name.to_s if !stable_name.nil? - - begin - record[:payload_json] = JSON.generate(fields, max_nesting: 8) - rescue - # TODO: failure to encode a payload as JSON should be recorded in the - # internal library logs, with catioun not flooding the internal logs. + @span_records.push(span.to_h) + if @span_records.size > max_span_records + @span_records.shift + @dropped_spans.increment + @dropped_span_logs.increment(span.logs_count + span.dropped_logs_count) end - - full = push_with_max(@log_records, record, max_log_records) - @dropped_logs.increment if full + flush_if_needed end protected @@ -197,10 +180,9 @@ module LightStep raise ConfigurationError, "component_name must be a string" unless String === component_name raise ConfigurationError, "component_name cannot be blank" if component_name.empty? - @log_records = Concurrent::Array.new - @span_records = Concurrent::Channel::Buffer::Sliding.new(max_span_records) - @dropped_logs = Concurrent::AtomicFixnum.new + @span_records = Concurrent::Array.new @dropped_spans = Concurrent::AtomicFixnum.new + @dropped_span_logs = Concurrent::AtomicFixnum.new start_time = LightStep.micros(Time.now) @guid = LightStep.guid @@ -238,33 +220,13 @@ module LightStep end end - def push_with_max(arr, item, max) - max = 1 if max < 1 - arr.push item - - if arr.size <= max - return false - end - - while arr.size > max - arr.delete_at(rand(arr.size)) - end - return true - end - def flush_if_needed return unless enabled? delta = LightStep.micros(Time.now) - @last_flush_micros - - # Set a bound on maximum flush frequency return if delta < min_flush_period_micros - # Look for a trigger that a flush is warranted - # Set a bound of minimum flush frequency - if delta > max_flush_period_micros || - @log_records.length >= max_log_records / 2 || - @span_records.size >= max_span_records / 2 + if delta > max_flush_period_micros || @span_records.size >= max_span_records / 2 flush end end @@ -288,6 +250,7 @@ module LightStep start_micros: LightStep.micros(Time.now), child_of_guid: carrier[CARRIER_TRACER_STATE_PREFIX + 'spanid'], trace_guid: carrier[CARRIER_TRACER_STATE_PREFIX + 'traceid'], + max_log_records: max_log_records ) carrier.each do |key, value| @@ -300,33 +263,28 @@ module LightStep def _flush_worker return unless enabled? - - now = LightStep.micros(Time.now) - # The thrift configuration has not yet been set: allow logs and spans # to be buffered in this case, but flushes won't yet be possible. return if @runtime.nil? + return if @span_records.empty? - return if @log_records.empty? && @span_records.empty? + now = LightStep.micros(Time.now) - span_records = [] - loop do - span = @span_records.poll - break if span == Concurrent::NULL - span_records.push(span) - end - - log_records = @log_records.slice!(0, @log_records.length) - dropped_logs = 0 + span_records = @span_records.slice!(0, @span_records.length) dropped_spans = 0 - @dropped_logs.update{|old| dropped_logs = old; 0 } @dropped_spans.update{|old| dropped_spans = old; 0 } + old_dropped_span_logs = 0 + @dropped_span_logs.update{|old| old_dropped_span_logs = old; 0 } + dropped_logs = old_dropped_span_logs + dropped_logs = span_records.reduce(dropped_logs) do |memo, span| + memo += span.delete :dropped_logs + end + report_request = { runtime: @runtime, - oldest_micros: @report_start_time.to_i, - youngest_micros: now.to_i, - log_records: log_records, + oldest_micros: @report_start_time, + youngest_micros: now, span_records: span_records, counters: [ {Name: "dropped_logs", Value: dropped_logs}, @@ -343,8 +301,8 @@ module LightStep # If the queue is full, add the previous dropped logs to the logs # that were going to get reported, as well as the previous dropped # spans and spans that would have been recorded - @dropped_logs.increment(dropped_logs + log_records.length) @dropped_spans.increment(dropped_spans + span_records.length) + @dropped_span_logs.increment(old_dropped_span_logs) end end end diff --git a/lightstep-tracer.gemspec b/lightstep-tracer.gemspec index 0aabcfc..b14d3ab 100644 --- a/lightstep-tracer.gemspec +++ b/lightstep-tracer.gemspec @@ -17,7 +17,6 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] spec.add_dependency 'concurrent-ruby', '~> 1.0.0' - spec.add_dependency 'concurrent-ruby-edge', '= 0.2.2' spec.add_development_dependency 'rake', '~> 11.3.0' spec.add_development_dependency 'rack', '~> 2.0.0' spec.add_development_dependency 'rspec', '~> 3.0' diff --git a/spec/lightstep_spec.rb b/spec/lightstep_spec.rb index 73511c1..7d63d28 100644 --- a/spec/lightstep_spec.rb +++ b/spec/lightstep_spec.rb @@ -198,10 +198,10 @@ describe LightStep do s0.finish tracer.flush - expect(result).to include(:runtime, :span_records, :log_records, :oldest_micros, :youngest_micros) + expect(result).to include(:runtime, :span_records, :oldest_micros, :youngest_micros) expect(result[:span_records].length).to eq(1) - expect(result[:log_records].length).to eq(1) + expect(result[:span_records][0][:log_records].length).to eq(1) expect(result[:oldest_micros]).to be <= result[:youngest_micros] # Decompose back into a plain hash @@ -223,7 +223,7 @@ describe LightStep do s0.log(event: 'test_event', **fields) s0.finish tracer.flush - JSON.generate(JSON.parse(result[:log_records][0][:payload_json])) + JSON.generate(JSON.parse(result[:span_records][0][:log_records][0][:payload_json])) end # NOTE: these comparisons rely on Ruby generating a consistent ordering to @@ -276,7 +276,10 @@ describe LightStep do tracer.flush expect(result[:span_records].length).to eq(65) - expect(result[:log_records].length).to eq(64 * 10) + result[:span_records].each do |span| + expect(span[:log_records].length).to eq(10) unless span[:span_name] == "parent_span" + end + end it 'should handle concurrent tracers' do @@ -305,7 +308,10 @@ describe LightStep do for i in 1..8 r = results[i] expect(r[:span_records].length).to eq(17) - expect(r[:log_records].length).to eq(16 * 10) + r[:span_records].each do |span| + expect(span[:log_records].length).to eq(10) unless span[:span_name] == "parent_span" + expect(span[:log_records].length).to eq(0) if span[:span_name] == "parent_span" + end end end @@ -327,4 +333,28 @@ describe LightStep do tracer.enable tracer.enable end + + it 'should report dropped spans and logs' do + result = nil + tracer = init_callback_tracer(proc { |obj| result = obj }) + tracer.max_span_records = 5 + tracer.max_log_records = 5 + + (1..10).map do |i| + Thread.new do + span = tracer.start_span("span_#{i}") + (1..10).map do |j| + Thread.new do + span.log(j: j) + end + end.map(&:join) + span.finish + end + end.map(&:join) + tracer.flush + expect(result[:counters]).to eq([ + {Name: "dropped_logs", Value: 75}, + {Name: "dropped_spans", Value: 5} + ]) + end end