[Feature #20646]Improve Socket.tcp

This is a proposed improvement to `Socket.tcp`, which has implemented Happy Eyeballs version 2 (RFC8305) in PR9374.

1. Background
I implemented Happy Eyeballs version 2 (HEv2) for Socket.tcp in PR9374, but several issues have been identified:

- `IO.select` waits for name resolution or connection establishment in v46w, but it does not consider the case where both events occur simultaneously when it returns a value.
  - In this case, Socket.tcp can only capture one event and needs to execute an unnecessary loop to capture the other one, calling `IO.select` one extra time.
- `IO.select` waits for both IPv6/IPv4 name resolution (in start), but when it returns a value, it doesn't consider the case where name resolution for both address families is complete.
  - In this case, `Socket.tcp` can only obtain the addresses of one address family and needs to execute an unnecessary loop obtain the other addresses, calling `IO.select` one extra time.
- The consideration for `connect_timeout` was insufficient. After initiating one or more connections, it raises a 'user specified timeout' after the `connect_timeout` period even if there were addresses that have been resolved and have not yet tried to connect.
- It does not retry with another address in case of a connection failure.
- It executes unnecessary state transitions even when an IP address is passed as the `host` argument.
- The regex for IP addresses did not correctly specify the start and end.

2. Proposal & Outcome
To overcome the aforementioned issues, this PR introduces the following changes:

- Previously, each loop iteration represented a single state transition. This has been changed to execute all processes that meet the execution conditions within a single loop iteration.
  - This prevents unnecessary repeated loops and calling `IO.select`
- Introduced logic to determine the timeout value set for `IO.select`. During the Resolution Delay and Connection Attempt Delay, the user-specified timeout is ignored. Otherwise, the timeout value is set to the larger of `resolv_timeout` and `connect_timeout`.
  - This ensures that the `connect_timeout` is only detected after attempting to connect to all resolved addresses.
- Retry with another address in case of a connection failure.
  - This prevents unnecessary repeated loops upon connection failure.
- Call `tcp_without_fast_fallback` when an IP address is passed as the host argument.
  - This prevents unnecessary state transitions when an IP address is passed.
- Fixed regex for IP addresses.

Additionally, the code has been reduced by over 100 lines, and redundancy has been minimized, which is expected to improve readability.

3. Performance
No significant performance changes were observed in the happy case before and after the improvement.
However, improvements in state transition deficiencies are expected to enhance performance in edge cases.

```ruby
require 'socket'
require 'benchmark'

Benchmark.bmbm do |x|
  x.report('fast_fallback: true') do
    30.times { Socket.tcp("www.ruby-lang.org", 80) }
  end

  x.report('fast_fallback: false') do # Ruby3.3時点と同じ
    30.times { Socket.tcp("www.ruby-lang.org", 80, fast_fallback: false) }
  end
end
```

Before:

```
~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb

                           user     system      total        real
fast_fallback: true    0.021315   0.040723   0.062038 (  0.504866)
fast_fallback: false   0.007553   0.026248   0.033801 (  0.533211)
```

After:

```
~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb

                           user     system      total        real
fast_fallback: true    0.023081   0.040525   0.063606 (  0.406219)
fast_fallback: false   0.007302   0.025515   0.032817 (  0.418680)
```
This commit is contained in:
Misaki Shioi 2024-07-30 12:58:31 +09:00 коммит произвёл GitHub
Родитель 7ea678b24b
Коммит b3baa11ee9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
1 изменённых файлов: 376 добавлений и 479 удалений

Просмотреть файл

@ -614,7 +614,7 @@ class Socket < BasicSocket
HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0
private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED
IPV6_ADRESS_FORMAT = /(?i)(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|::(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|::(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,6}[0-9A-F]{1,4}|:))(?:%.+)?/
IPV6_ADRESS_FORMAT = /\A(?i:(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?:(?::[0-9A-F]{1,4}){1,2}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?:(?::[0-9A-F]{1,4}){1,3}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?:(?::[0-9A-F]{1,4}){1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?:(?::[0-9A-F]{1,4}){1,5}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?:(?::[0-9A-F]{1,4}){1,6}|:(?:[0-9A-F]{1,4}:){0,5}[0-9A-F]{1,4}|:)|(?:::(?:[0-9A-F]{1,4}:){0,7}[0-9A-F]{1,4}|::)))(?:%.+)?\z/
private_constant :IPV6_ADRESS_FORMAT
@tcp_fast_fallback = true
@ -649,489 +649,254 @@ class Socket < BasicSocket
# puts sock.read
# }
def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &block) # :yield: socket
unless fast_fallback
return tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
end
# Happy Eyeballs' states
# - :start
# - :v6c
# - :v4w
# - :v4c
# - :v46c
# - :v46w
# - :success
# - :failure
# - :timeout
specified_family_name = nil
hostname_resolution_threads = []
hostname_resolution_queue = nil
hostname_resolution_waiting = nil
hostname_resolution_expires_at = nil
selectable_addrinfos = SelectableAddrinfos.new
connecting_sockets = ConnectingSockets.new
local_addrinfos = []
connection_attempt_delay_expires_at = nil
connection_attempt_started_at = nil
state = :start
connected_socket = nil
last_error = nil
is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/)
ret = loop do
case state
when :start
specified_family_name, next_state = host && specified_family_name_and_next_state(host)
if local_host && local_port
specified_family_name, next_state = specified_family_name_and_next_state(local_host) unless specified_family_name
local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
end
if specified_family_name
addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
selectable_addrinfos.add(specified_family_name, addrinfos)
hostname_resolution_queue = NoHostnameResolutionQueue.new
state = next_state
next
end
resolving_family_names = ADDRESS_FAMILIES.keys
hostname_resolution_queue = HostnameResolutionQueue.new(resolving_family_names.size)
hostname_resolution_waiting = hostname_resolution_queue.waiting_pipe
hostname_resolution_started_at = current_clocktime if resolv_timeout
hostname_resolution_args = [host, port, hostname_resolution_queue]
hostname_resolution_threads.concat(
resolving_family_names.map { |family|
thread_args = [family, *hostname_resolution_args]
thread = Thread.new(*thread_args) { |*thread_args| hostname_resolution(*thread_args) }
Thread.pass
thread
}
)
hostname_resolution_retry_count = resolving_family_names.size - 1
hostname_resolution_expires_at = hostname_resolution_started_at + resolv_timeout if resolv_timeout
while hostname_resolution_retry_count >= 0
remaining = resolv_timeout ? second_to_timeout(hostname_resolution_started_at + resolv_timeout) : nil
hostname_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, remaining)
unless hostname_resolved
state = :timeout
break
end
family_name, res = hostname_resolution_queue.get
if res.is_a? Exception
unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
(res.is_a?(Socket::ResolutionError)) &&
(res.error_code == Socket::EAI_ADDRFAMILY)
last_error = res
end
if hostname_resolution_retry_count.zero?
state = :failure
break
end
hostname_resolution_retry_count -= 1
else
state = case family_name
when :ipv6 then :v6c
when :ipv4 then hostname_resolution_queue.closed? ? :v4c : :v4w
end
selectable_addrinfos.add(family_name, res)
last_error = nil
break
end
end
next
when :v4w
ipv6_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, RESOLUTION_DELAY)
if ipv6_resolved
family_name, res = hostname_resolution_queue.get
selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
state = :v46c
else
state = :v4c
end
next
when :v4c, :v6c, :v46c
connection_attempt_started_at = current_clocktime unless connection_attempt_started_at
addrinfo = selectable_addrinfos.get
if local_addrinfos.any?
local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
if local_addrinfo.nil?
if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
last_error = SocketError.new 'no appropriate local address'
state = :failure
elsif selectable_addrinfos.any?
# Try other Addrinfo in next loop
else
if resolv_timeout && hostname_resolution_queue.opened? &&
(current_clocktime >= hostname_resolution_expires_at)
state = :timeout
else
# Wait for connection to be established or hostname resolution in next loop
connection_attempt_delay_expires_at = nil
state = :v46w
end
end
next
end
end
connection_attempt_delay_expires_at = current_clocktime + CONNECTION_ATTEMPT_DELAY
begin
result = if specified_family_name && selectable_addrinfos.empty? &&
connecting_sockets.empty? && hostname_resolution_queue.closed?
local_addrinfo ?
addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
addrinfo.connect(timeout: connect_timeout)
else
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
socket.bind(local_addrinfo) if local_addrinfo
socket.connect_nonblock(addrinfo, exception: false)
end
case result
when 0
connected_socket = socket
state = :success
when Socket
connected_socket = result
state = :success
when :wait_writable
connecting_sockets.add(socket, addrinfo)
state = :v46w
end
rescue SystemCallError => e
last_error = e
socket.close if socket && !socket.closed?
if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
state = :failure
elsif selectable_addrinfos.any?
# Try other Addrinfo in next loop
else
if resolv_timeout && hostname_resolution_queue.opened? &&
(current_clocktime >= hostname_resolution_expires_at)
state = :timeout
else
# Wait for connection to be established or hostname resolution in next loop
connection_attempt_delay_expires_at = nil
state = :v46w
end
end
end
next
when :v46w
if connect_timeout && second_to_timeout(connection_attempt_started_at + connect_timeout).zero?
state = :timeout
next
end
remaining = second_to_timeout(connection_attempt_delay_expires_at)
hostname_resolution_waiting = hostname_resolution_waiting && hostname_resolution_queue.closed? ? nil : hostname_resolution_waiting
hostname_resolved, connectable_sockets, = IO.select(hostname_resolution_waiting, connecting_sockets.all, nil, remaining)
if connectable_sockets&.any?
while (connectable_socket = connectable_sockets.pop)
is_connected =
if is_windows_environment
sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
sockopt.unpack('i').first >= 0
else
sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
sockopt.int.zero?
end
if is_connected
connected_socket = connectable_socket
connecting_sockets.delete connectable_socket
connectable_sockets.each do |other_connectable_socket|
other_connectable_socket.close unless other_connectable_socket.closed?
end
state = :success
break
else
failed_ai = connecting_sockets.delete connectable_socket
inspected_ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
last_error = SystemCallError.new("connect(2) for #{inspected_ip_address}:#{failed_ai.ip_port}", sockopt.int)
connectable_socket.close unless connectable_socket.closed?
next if connectable_sockets.any?
if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
state = :failure
elsif selectable_addrinfos.any?
# Wait for connection attempt delay timeout in next loop
else
if resolv_timeout && hostname_resolution_queue.opened? &&
(current_clocktime >= hostname_resolution_expires_at)
state = :timeout
else
# Wait for connection to be established or hostname resolution in next loop
connection_attempt_delay_expires_at = nil
end
end
end
end
elsif hostname_resolved&.any?
family_name, res = hostname_resolution_queue.get
selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
else
if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
state = :failure
elsif selectable_addrinfos.any?
# Try other Addrinfo in next loop
state = :v46c
else
if resolv_timeout && hostname_resolution_queue.opened? &&
(current_clocktime >= hostname_resolution_expires_at)
state = :timeout
else
# Wait for connection to be established or hostname resolution in next loop
connection_attempt_delay_expires_at = nil
end
end
end
next
when :success
break connected_socket
when :failure
raise last_error
when :timeout
raise Errno::ETIMEDOUT, 'user specified timeout'
end
sock = if fast_fallback && !(host && ip_address?(host))
tcp_with_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
else
tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
end
if block_given?
begin
yield ret
yield sock
ensure
ret.close
sock.close
end
else
ret
sock
end
end
def self.tcp_with_fast_fallback(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil)
if local_host || local_port
local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, timeout: resolv_timeout)
resolving_family_names = local_addrinfos.map { |lai| ADDRESS_FAMILIES.key(lai.afamily) }.uniq
else
local_addrinfos = []
resolving_family_names = ADDRESS_FAMILIES.keys
end
hostname_resolution_threads = []
resolution_store = HostnameResolutionStore.new(resolving_family_names)
connecting_sockets = {}
is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/)
now = current_clock_time
resolution_delay_expires_at = nil
connection_attempt_delay_expires_at = nil
user_specified_connect_timeout_at = nil
last_error = nil
if resolving_family_names.size == 1
family_name = resolving_family_names.first
addrinfos = Addrinfo.getaddrinfo(host, port, family_name, :STREAM, timeout: resolv_timeout)
resolution_store.add_resolved(family_name, addrinfos)
hostname_resolution_result = nil
hostname_resolution_notifier = nil
user_specified_resolv_timeout_at = nil
else
hostname_resolution_result = HostnameResolutionResult.new(resolving_family_names.size)
hostname_resolution_notifier = hostname_resolution_result.notifier
hostname_resolution_threads.concat(
resolving_family_names.map { |family|
thread_args = [family, host, port, hostname_resolution_result]
thread = Thread.new(*thread_args) { |*thread_args| resolve_hostname(*thread_args) }
Thread.pass
thread
}
)
user_specified_resolv_timeout_at = resolv_timeout ? now + resolv_timeout : Float::INFINITY
end
loop do
if resolution_store.any_addrinfos? &&
!resolution_delay_expires_at &&
!connection_attempt_delay_expires_at
while (addrinfo = resolution_store.get_addrinfo)
if local_addrinfos.any?
local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
if local_addrinfo.nil? # Connecting addrinfoと同じアドレスファミリのLocal addrinfoがない
if resolution_store.any_addrinfos?
# Try other Addrinfo in next "while"
next
elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
# Exit this "while" and wait for connections to be established or hostname resolution in next loop
# Or exit this "while" and wait for hostname resolution in next loop
break
else
raise SocketError.new 'no appropriate local address'
end
end
end
begin
if resolution_store.any_addrinfos? ||
connecting_sockets.any? ||
resolution_store.any_unresolved_family?
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
socket.bind(local_addrinfo) if local_addrinfo
result = socket.connect_nonblock(addrinfo, exception: false)
else
result = socket = local_addrinfo ?
addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
addrinfo.connect(timeout: connect_timeout)
end
if result == :wait_writable
connection_attempt_delay_expires_at = now + CONNECTION_ATTEMPT_DELAY
if resolution_store.empty_addrinfos?
user_specified_connect_timeout_at = connect_timeout ? now + connect_timeout : Float::INFINITY
end
connecting_sockets[socket] = addrinfo
break
else
return socket # connection established
end
rescue SystemCallError => e
socket&.close
last_error = e
if resolution_store.any_addrinfos?
# Try other Addrinfo in next "while"
next
elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
# Exit this "while" and wait for connections to be established or hostname resolution in next loop
# Or exit this "while" and wait for hostname resolution in next loop
break
else
raise last_error
end
end
end
end
ends_at =
if resolution_store.any_addrinfos?
resolution_delay_expires_at || connection_attempt_delay_expires_at
else
[user_specified_resolv_timeout_at, user_specified_connect_timeout_at].compact.max
end
hostname_resolved, writable_sockets, except_sockets = IO.select(
hostname_resolution_notifier,
connecting_sockets.keys,
# Use errorfds to wait for non-blocking connect failures on Windows
is_windows_environment ? connecting_sockets.keys : nil,
second_to_timeout(current_clock_time, ends_at),
)
now = current_clock_time
resolution_delay_expires_at = nil if expired?(now, resolution_delay_expires_at)
connection_attempt_delay_expires_at = nil if expired?(now, connection_attempt_delay_expires_at)
if writable_sockets&.any?
while (writable_socket = writable_sockets.pop)
is_connected = is_windows_environment || (
sockopt = writable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
sockopt.int.zero?
)
if is_connected
connecting_sockets.delete writable_socket
return writable_socket
else
failed_ai = connecting_sockets.delete writable_socket
writable_socket.close
ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
if writable_sockets.any? ||
resolution_store.any_addrinfos? ||
connecting_sockets.any? ||
resolution_store.any_unresolved_family?
user_specified_connect_timeout_at = nil if connecting_sockets.empty?
# Try other writable socket in next "while"
# Or exit this "while" and try other connection attempt
# Or exit this "while" and wait for connections to be established or hostname resolution in next loop
# Or exit this "while" and wait for hostname resolution in next loop
else
raise last_error
end
end
end
end
if except_sockets&.any?
except_sockets.each do |except_socket|
failed_ai = connecting_sockets.delete except_socket
sockopt = except_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
except_socket.close
ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
if writable_sockets.any? ||
resolution_store.any_addrinfos? ||
connecting_sockets.any? ||
resolution_store.any_unresolved_family?
user_specified_connect_timeout_at = nil if connecting_sockets.empty?
# Try other writable socket in next "while"
# Or exit this "while" and try other connection attempt
# Or exit this "while" and wait for connections to be established or hostname resolution in next loop
# Or exit this "while" and wait for hostname resolution in next loop
else
raise last_error
end
end
end
if hostname_resolved&.any?
while (family_and_result = hostname_resolution_result.get)
family_name, result = family_and_result
if result.is_a? Exception
resolution_store.add_error(family_name, result)
unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
(result.is_a?(Socket::ResolutionError)) &&
(result.error_code == Socket::EAI_ADDRFAMILY)
last_error = result
end
else
resolution_store.add_resolved(family_name, result)
end
end
if resolution_store.resolved?(:ipv4)
if resolution_store.resolved?(:ipv6)
hostname_resolution_notifier = nil
resolution_delay_expires_at = nil
user_specified_resolv_timeout_at = nil
elsif resolution_store.resolved_successfully?(:ipv4)
resolution_delay_expires_at = now + RESOLUTION_DELAY
end
end
end
if resolution_store.empty_addrinfos?
if connecting_sockets.empty? && resolution_store.resolved_all_families?
raise last_error
end
if (expired?(now, user_specified_resolv_timeout_at) || resolution_store.resolved_all_families?) &&
(expired?(now, user_specified_connect_timeout_at) || connecting_sockets.empty?)
raise Errno::ETIMEDOUT, 'user specified timeout'
end
end
end
ensure
if fast_fallback
hostname_resolution_threads.each do |thread|
thread&.exit
end
hostname_resolution_threads.each do |thread|
thread.exit
end
hostname_resolution_queue&.close_all
hostname_resolution_result&.close
connecting_sockets.each do |connecting_socket|
connecting_socket.close unless connecting_socket.closed?
end
connecting_sockets.each_key do |connecting_socket|
connecting_socket.close
end
end
def self.specified_family_name_and_next_state(hostname)
if hostname.match?(IPV6_ADRESS_FORMAT) then [:ipv6, :v6c]
elsif hostname.match?(/^([0-9]{1,3}\.){3}[0-9]{1,3}$/) then [:ipv4, :v4c]
end
end
private_class_method :specified_family_name_and_next_state
def self.hostname_resolution(family, host, port, hostname_resolution_queue)
begin
resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
hostname_resolution_queue.add_resolved(family, resolved_addrinfos)
rescue => e
hostname_resolution_queue.add_error(family, e)
end
end
private_class_method :hostname_resolution
def self.second_to_timeout(ends_at)
return 0 unless ends_at
remaining = (ends_at - current_clocktime)
remaining.negative? ? 0 : remaining
end
private_class_method :second_to_timeout
def self.current_clocktime
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
private_class_method :current_clocktime
class SelectableAddrinfos
PRIORITY_ON_V6 = [:ipv6, :ipv4]
PRIORITY_ON_V4 = [:ipv4, :ipv6]
def initialize
@addrinfo_dict = {}
@last_family = nil
end
def add(family_name, addrinfos)
@addrinfo_dict[family_name] = addrinfos
end
def get
return nil if empty?
if @addrinfo_dict.size == 1
@addrinfo_dict.each { |_, addrinfos| return addrinfos.shift }
end
precedences = case @last_family
when :ipv4, nil then PRIORITY_ON_V6
when :ipv6 then PRIORITY_ON_V4
end
precedences.each do |family_name|
addrinfo = @addrinfo_dict[family_name]&.shift
next unless addrinfo
@last_family = family_name
return addrinfo
end
end
def empty?
@addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
end
def any?
!empty?
end
end
private_constant :SelectableAddrinfos
class NoHostnameResolutionQueue
def waiting_pipe
nil
end
def add_resolved(_, _)
raise StandardError, "This #{self.class} cannot respond to:"
end
def add_error(_, _)
raise StandardError, "This #{self.class} cannot respond to:"
end
def get
nil
end
def opened?
false
end
def closed?
true
end
def close_all
# Do nothing
end
end
private_constant :NoHostnameResolutionQueue
class HostnameResolutionQueue
def initialize(size)
@size = size
@taken_count = 0
@rpipe, @wpipe = IO.pipe
@queue = Queue.new
@mutex = Mutex.new
end
def waiting_pipe
[@rpipe]
end
def add_resolved(family, resolved_addrinfos)
@mutex.synchronize do
@queue.push [family, resolved_addrinfos]
@wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
end
end
def add_error(family, error)
@mutex.synchronize do
@queue.push [family, error]
@wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
end
end
def get
return nil if @queue.empty?
res = nil
@mutex.synchronize do
@rpipe.getbyte
res = @queue.pop
end
@taken_count += 1
close_all if @taken_count == @size
res
end
def closed?
@rpipe.closed?
end
def opened?
!closed?
end
def close_all
@queue.close unless @queue.closed?
@rpipe.close unless @rpipe.closed?
@wpipe.close unless @wpipe.closed?
end
end
private_constant :HostnameResolutionQueue
class ConnectingSockets
def initialize
@socket_dict = {}
end
def all
@socket_dict.keys
end
def add(socket, addrinfo)
@socket_dict[socket] = addrinfo
end
def delete(socket)
@socket_dict.delete socket
end
def empty?
@socket_dict.empty?
end
def each
@socket_dict.keys.each do |socket|
yield socket
end
end
end
private_constant :ConnectingSockets
def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
last_error = nil
ret = nil
@ -1166,18 +931,150 @@ class Socket < BasicSocket
raise SocketError, "no appropriate local address"
end
end
if block_given?
begin
yield ret
ensure
ret.close
end
else
ret
end
ret
end
private_class_method :tcp_without_fast_fallback
def self.ip_address?(hostname)
hostname.match?(IPV6_ADRESS_FORMAT) || hostname.match?(/\A([0-9]{1,3}\.){3}[0-9]{1,3}\z/)
end
private_class_method :ip_address?
def self.resolve_hostname(family, host, port, hostname_resolution_result)
begin
resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
hostname_resolution_result.add(family, resolved_addrinfos)
rescue => e
hostname_resolution_result.add(family, e)
end
end
private_class_method :resolve_hostname
def self.current_clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
private_class_method :current_clock_time
def self.second_to_timeout(started_at, ends_at)
return nil if ends_at == Float::INFINITY || ends_at.nil?
remaining = (ends_at - started_at)
remaining.negative? ? 0 : remaining
end
private_class_method :second_to_timeout
def self.expired?(started_at, ends_at)
second_to_timeout(started_at, ends_at)&.zero?
end
private_class_method :expired?
class HostnameResolutionResult
def initialize(size)
@size = size
@taken_count = 0
@rpipe, @wpipe = IO.pipe
@results = []
@mutex = Mutex.new
end
def notifier
[@rpipe]
end
def add(family, result)
@mutex.synchronize do
@results.push [family, result]
@wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
end
end
def get
return nil if @results.empty?
res = nil
@mutex.synchronize do
@rpipe.getbyte
res = @results.shift
end
@taken_count += 1
close if @taken_count == @size
res
end
def close
@rpipe.close
@wpipe.close
end
end
private_constant :HostnameResolutionResult
class HostnameResolutionStore
PRIORITY_ON_V6 = [:ipv6, :ipv4]
PRIORITY_ON_V4 = [:ipv4, :ipv6]
def initialize(family_names)
@family_names = family_names
@addrinfo_dict = {}
@error_dict = {}
@last_family = nil
end
def add_resolved(family_name, addrinfos)
@addrinfo_dict[family_name] = addrinfos
end
def add_error(family_name, error)
@addrinfo_dict[family_name] = []
@error_dict[family_name] = error
end
def get_addrinfo
precedences =
case @last_family
when :ipv4, nil then PRIORITY_ON_V6
when :ipv6 then PRIORITY_ON_V4
end
precedences.each do |family_name|
addrinfo = @addrinfo_dict[family_name]&.shift
next unless addrinfo
@last_family = family_name
return addrinfo
end
nil
end
def empty_addrinfos?
@addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
end
def any_addrinfos?
!empty_addrinfos?
end
def resolved?(family)
@addrinfo_dict.has_key? family
end
def resolved_successfully?(family)
resolved?(family) && !!@error_dict[family]
end
def resolved_all_families?
(@family_names - @addrinfo_dict.keys).empty?
end
def any_unresolved_family?
!resolved_all_families?
end
end
private_constant :HostnameResolutionStore
# :stopdoc:
def self.ip_sockets_port0(ai_list, reuseaddr)
sockets = []