channel-signal based fork-safe implementation of reporter

This commit is contained in:
Nick Gauthier 2016-11-03 16:55:53 -04:00
Родитель 656daf4b0c
Коммит 2a2c70d094
4 изменённых файлов: 101 добавлений и 51 удалений

Просмотреть файл

@ -45,9 +45,9 @@ The LightStep Tracer is threadsafe. For increased performance, you can add the
`concurrent-ruby-ext` gem to your Gemfile. This will enable C extensions for
concurrent operations.
**The LightStep Tracer is not inherently Fork-safe**. When forking, you should
`disable` the Tracer before forking, then `enable` it in the child Process
and parent Process after forking. See the `fork_children` example for more.
The LightStep Tracer is also Fork-safe. When forking, the child process will
not inherit the unflushed spans of the parent, so they will only be flushed
once.
## Development

Просмотреть файл

@ -1,6 +1,5 @@
# A simple, manual test ensuring that tracer instances still report after a
# Process.fork. Currently this requires the tracer instance to be explicitly
# disabled before the fork and reenabled afterward.
# Process.fork.
require 'bundler/setup'
require 'lightstep'
@ -14,10 +13,7 @@ puts 'Starting...'
(1..20).each do |k|
puts "Explicit reset iteration #{k}..."
# NOTE: the tracer is disabled and reenabled on either side of the fork
LightStep.disable
pid = Process.fork do
LightStep.enable
10.times do
span = LightStep.start_span("my_forked_span-#{Process.pid}")
sleep(0.0025 * rand(k))
@ -25,9 +21,6 @@ puts 'Starting...'
end
end
# Also renable the parent process' tracer
LightStep.enable
10.times do
span = LightStep.start_span("my_process_span-#{Process.pid}")
sleep(0.0025 * rand(k))
@ -35,6 +28,8 @@ puts 'Starting...'
end
# Make sure redundant enable calls don't cause problems
# NOTE: disabling discards the buffer by default, so all spans
# get cleared here except the final toggle span
10.times do
LightStep.disable
LightStep.enable
@ -48,7 +43,7 @@ puts 'Starting...'
end
puts "Parent, pid #{Process.pid}, waiting on child pid #{pid}"
Process.wait
Process.wait(pid)
end
puts 'Done!'

Просмотреть файл

@ -1,3 +1,5 @@
require 'concurrent/channel'
module LightStep
# Reporter builds up reports of spans and flushes them to a transport
class Reporter
@ -13,7 +15,6 @@ module LightStep
start_time = LightStep.micros(Time.now)
@guid = LightStep.guid
@report_start_time = start_time
@last_flush_micros = start_time
@runtime = {
guid: guid,
@ -26,14 +27,64 @@ module LightStep
]
}.freeze
# At exit, flush this objects data to the transport and close the transport
# (which in turn will send the flushed data over the network).
reset_on_fork
at_exit do
flush
@quit_signal << true
@thread.join
end
end
def add_span(span)
reset_on_fork
@span_records.push(span.to_h)
if @span_records.size > max_span_records
dropped = @span_records.shift
@dropped_spans.increment
@dropped_span_logs.increment(dropped[:log_records].size + dropped[:dropped_logs])
end
@span_signal << true
end
def clear
span_records = @span_records.slice!(0, @span_records.length)
@dropped_spans.increment(span_records.size)
@dropped_span_logs.increment(
span_records.reduce(0) {|memo, span|
memo + span[:log_records].size + span[:dropped_logs]
}
)
end
def flush
@flush_signal << true
~@flush_response_signal
end
private
MIN_PERIOD_SECS = 1.5
MAX_PERIOD_SECS = 30.0
# When the process forks, reset the child. All data that was copied will be handled
# by the parent. Also, restart the thread since forking killed it
def reset_on_fork
if @pid != $$
@pid = $$
@span_signal = Concurrent::Channel.new(buffer: :dropping, capacity: 1)
@quit_signal = Concurrent::Channel.new(buffer: :dropping, capacity: 1)
@flush_signal = Concurrent::Channel.new
@flush_response_signal = Concurrent::Channel.new
@span_records.clear
@dropped_spans.value = 0
@dropped_span_logs.value = 0
report_spans
end
end
def perform_flush
reset_on_fork
return if @span_records.empty?
now = LightStep.micros(Time.now)
@ -60,13 +111,12 @@ module LightStep
]
}
@last_flush_micros = now
@report_start_time = now
begin
@transport.report(report_request)
rescue LightStep::Transport::HTTPJSON::QueueFullError
# If the queue is full, add the previous dropped logs to the logs
rescue
# an error occurs, add the previous dropped logs to the logs
# that were going to get reported, as well as the previous dropped
# spans and spans that would have been recorded
@dropped_spans.increment(dropped_spans + span_records.length)
@ -74,38 +124,42 @@ module LightStep
end
end
def add_span(span)
@span_records.push(span.to_h)
if @span_records.size > max_span_records
@span_records.shift
@dropped_spans.increment
@dropped_span_logs.increment(span.logs_count + span.dropped_logs_count)
end
flush_if_needed
end
def clear
@dropped_spans.increment(@span_records.size)
@dropped_span_logs.increment(
@span_records.reduce(0) {|memo, span|
memo + span[:log_records].size + span[:dropped_logs]
}
)
@span_records.clear
end
private
MIN_PERIOD_SECS = 1.5
MAX_PERIOD_SECS = 30.0
MIN_PERIOD_MICROS = MIN_PERIOD_SECS * 1E6
MAX_PERIOD_MICROS = MAX_PERIOD_SECS * 1E6
def flush_if_needed
delta = LightStep.micros(Time.now) - @last_flush_micros
return if delta < MIN_PERIOD_MICROS
if delta > MAX_PERIOD_MICROS || @span_records.size >= max_span_records / 2
flush
def report_spans
@thread = Thread.new do
begin
loop do
min_reached = false
max_reached = false
min_timer = Concurrent::Channel.timer(MIN_PERIOD_SECS)
max_timer = Concurrent::Channel.timer(MAX_PERIOD_SECS)
loop do
Concurrent::Channel.select do |s|
s.take(@span_signal) do
# we'll check span count below
end
s.take(min_timer) do
min_reached = true
end
s.take(max_timer) do
max_reached = true
end
s.take(@quit_signal) do
perform_flush
Thread.exit
end
s.take(@flush_signal) do
perform_flush
@flush_response_signal << true
end
end
if max_reached || (min_reached && @span_records.size >= max_span_records / 2)
perform_flush
end
end
end
rescue => ex
# TODO: internally log the exception
end
end
end
end

Просмотреть файл

@ -17,6 +17,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.add_dependency 'concurrent-ruby', '~> 1.0'
spec.add_dependency 'concurrent-ruby-edge', '= 0.2.2'
spec.add_development_dependency 'rake', '~> 11.3'
spec.add_development_dependency 'rack', '~> 2.0'
spec.add_development_dependency 'rspec', '~> 3.0'