зеркало из https://github.com/github/ruby.git
138 строки
4.1 KiB
Plaintext
138 строки
4.1 KiB
Plaintext
= Fiber
|
|
|
|
Fiber is a flow-control primitive which enable cooperative scheduling. This is
|
|
in contrast to threads which can be preemptively scheduled at any time. While
|
|
having a similar memory profiles, the cost of context switching fibers can be
|
|
significantly less than threads as it does not involve a system call.
|
|
|
|
== Design
|
|
|
|
=== Scheduler
|
|
|
|
The per-thread fiber scheduler interface is used to intercept blocking
|
|
operations. A typical implementation would be a wrapper for a gem like
|
|
EventMachine or Async. This design provides separation of concerns between the
|
|
event loop implementation and application code. It also allows for layered
|
|
schedulers which can perform instrumentation.
|
|
|
|
class Scheduler
|
|
# Wait for the given file descriptor to become readable.
|
|
def wait_readable(io)
|
|
end
|
|
|
|
# Wait for the given file descriptor to become writable.
|
|
def wait_writable(io)
|
|
end
|
|
|
|
# Wait for the given file descriptor to match the specified events within
|
|
# the specified timeout.
|
|
# @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
|
|
# `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
|
|
# @param timeout [#to_f] the amount of time to wait for the event.
|
|
def wait_any(io, events, timeout)
|
|
end
|
|
|
|
# Sleep the current task for the specified duration, or forever if not
|
|
# specified.
|
|
# @param duration [#to_f] the amount of time to sleep.
|
|
def wait_sleep(duration = nil)
|
|
end
|
|
|
|
# The Ruby virtual machine is going to enter a system level blocking
|
|
# operation.
|
|
def enter_blocking_region
|
|
end
|
|
|
|
# The Ruby virtual machine has completed the system level blocking
|
|
# operation.
|
|
def exit_blocking_region
|
|
end
|
|
|
|
# Intercept the creation of a non-blocking fiber.
|
|
def fiber(&block)
|
|
Fiber.new(blocking: false, &block)
|
|
end
|
|
|
|
# Invoked when the thread exits.
|
|
def run
|
|
# Implement event loop here.
|
|
end
|
|
end
|
|
|
|
On CRuby, the following extra methods need to be implemented to handle the
|
|
public C interface:
|
|
|
|
class Scheduler
|
|
# Wrapper for rb_wait_readable(int) C function.
|
|
def wait_readable_fd(fd)
|
|
wait_readable(::IO.for_fd(fd, autoclose: false))
|
|
end
|
|
|
|
# Wrapper for rb_wait_readable(int) C function.
|
|
def wait_writable_fd(fd)
|
|
wait_writable(::IO.for_fd(fd, autoclose: false))
|
|
end
|
|
|
|
# Wrapper for rb_wait_for_single_fd(int) C function.
|
|
def wait_for_single_fd(fd, events, duration)
|
|
wait_any(::IO.for_fd(fd, autoclose: false), events, duration)
|
|
end
|
|
end
|
|
|
|
=== Non-blocking Fibers
|
|
|
|
By default fibers are blocking. Non-blocking fibers may invoke specific
|
|
scheduler hooks when a blocking operation occurs, and these hooks may introduce
|
|
context switching points.
|
|
|
|
Fiber.new(blocking: false) do
|
|
puts Fiber.current.blocking? # false
|
|
|
|
# May invoke `Thread.current.scheduler&.wait_readable`.
|
|
io.read(...)
|
|
|
|
# May invoke `Thread.current.scheduler&.wait_writable`.
|
|
io.write(...)
|
|
|
|
# Will invoke `Thread.current.scheduler&.wait_sleep`.
|
|
sleep(n)
|
|
end.resume
|
|
|
|
We also introduce a new method which simplifies the creation of these
|
|
non-blocking fibers:
|
|
|
|
Fiber do
|
|
puts Fiber.current.blocking? # false
|
|
end
|
|
|
|
The purpose of this method is to allow the scheduler to internally decide the
|
|
policy for when to start the fiber, and whether to use symmetric or asymmetric
|
|
fibers.
|
|
|
|
=== Mutex
|
|
|
|
Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex
|
|
is held by that thread. On +Mutex#lock+, fiber switching via the scheduler
|
|
is disabled and operations become blocking for all fibers of the same +Thread+.
|
|
On +Mutex#unlock+, the scheduler is enabled again.
|
|
|
|
mutex = Mutex.new
|
|
|
|
puts Thread.current.blocking? # 1 (true)
|
|
|
|
Fiber.new(blocking: false) do
|
|
puts Thread.current.blocking? # false
|
|
mutex.synchronize do
|
|
puts Thread.current.blocking? # (1) true
|
|
end
|
|
|
|
puts Thread.current.blocking? # false
|
|
end.resume
|
|
|
|
=== Non-blocking I/O
|
|
|
|
By default, I/O is non-blocking. Not all operating systems support non-blocking
|
|
I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
|
|
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
|
|
non-blocking*, the operation will invoke the scheduler.
|