From 1463f1dfe70005ca1eac16547097370f6b9c6466 Mon Sep 17 00:00:00 2001 From: akr Date: Tue, 10 Feb 2009 12:38:16 +0000 Subject: [PATCH] * ext/socket/lib/socket.rb (Socket.udp_server_sockets): new method. (Socket.udp_server_loop_on): new method. (Socket.udp_server_loop): new method (Socket.ip_sockets_port0): extracted from tcp_server_sockets_port0. (Socket::UDPSource): new class. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@22212 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 8 ++ ext/socket/lib/socket.rb | 187 +++++++++++++++++++++++++++++++++++-- test/socket/test_socket.rb | 34 +++++++ 3 files changed, 219 insertions(+), 10 deletions(-) diff --git a/ChangeLog b/ChangeLog index 4184f916d6..ba2ebc4553 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +Tue Feb 10 21:26:33 2009 Tanaka Akira + + * ext/socket/lib/socket.rb (Socket.udp_server_sockets): new method. + (Socket.udp_server_loop_on): new method. + (Socket.udp_server_loop): new method + (Socket.ip_sockets_port0): extracted from tcp_server_sockets_port0. + (Socket::UDPSource): new class. + Tue Feb 10 21:14:43 2009 Tanaka Akira * ext/socket/socket.c (sockaddr_obj): fill pfamily. diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb index 3e26c9d6c2..ff5fb088da 100644 --- a/ext/socket/lib/socket.rb +++ b/ext/socket/lib/socket.rb @@ -226,8 +226,7 @@ class Socket end end - def self.tcp_server_sockets_port0(host) - ai_list = Addrinfo.getaddrinfo(host, 0, nil, :STREAM, nil, Socket::AI_PASSIVE) + def self.ip_sockets_port0(ai_list, reuseaddr) begin sockets = [] port = nil @@ -239,14 +238,15 @@ class Socket end sockets << s s.ipv6only! if ai.ipv6? - s.setsockopt(:SOCKET, :REUSEADDR, 1) + if reuseaddr + s.setsockopt(:SOCKET, :REUSEADDR, 1) + end if !port s.bind(ai) port = s.local_address.ip_port else - s.bind(Addrinfo.tcp(ai.ip_address, port)) + s.bind(ai.family_addrinfo(ai.ip_address, port)) end - s.listen(5) } rescue Errno::EADDRINUSE sockets.each {|s| @@ -256,11 +256,21 @@ class Socket end sockets ensure - if $! - sockets.each {|s| - s.close if !s.closed? - } - end + sockets.each {|s| s.close if !s.closed? } if $! + end + class << self + private :ip_sockets_port0 + end + + def self.tcp_server_sockets_port0(host) + ai_list = Addrinfo.getaddrinfo(host, 0, nil, :STREAM, nil, Socket::AI_PASSIVE) + sockets = ip_sockets_port0(ai_list, true) + sockets.each {|s| + s.listen(5) + } + sockets + ensure + sockets.each {|s| s.close if !s.closed? } if $! end class << self private :tcp_server_sockets_port0 @@ -395,6 +405,163 @@ class Socket end end + # :call-seq: + # Socket.udp_server_sockets([host, ] port) + # + # Creates UDP sockets for a UDP server. + # It returns an array of sockets. + # + # If _port_ is zero, some port is choosen. + # But the choosen port is used for the all sockets. + # + # # UDP echo server + # sockets = Socket.udp_server_sockets(0) + # p sockets.first.local_address.ip_port #=> 32963 + # Socket.udp_server_loop_on(sockets) {|msg, msg_src| + # msg_src.reply msg + # } + # + def self.udp_server_sockets(host=nil, port) + last_error = nil + sockets = [] + addr_hash = {} + + ipv6_recvpktinfo = nil + if defined? Socket::AncillaryData + if defined? Socket::IPV6_RECVPKTINFO # RFC 3542 + ipv6_recvpktinfo = Socket::IPV6_RECVPKTINFO + elsif defined? Socket::IPV6_PKTINFO # RFC 2292 + ipv6_recvpktinfo = Socket::IPV6_PKTINFO + end + end + + local_addrs = Socket.ip_address_list + + ip_list = [] + Addrinfo.foreach(host, port, nil, :DGRAM, nil, Socket::AI_PASSIVE) {|ai| + if ai.ipv4? && ai.ip_address == "0.0.0.0" + local_addrs.each {|a| + next if !a.ipv4? + ip_list << Addrinfo.new(a.to_sockaddr, :INET, :DGRAM, 0); + } + elsif ai.ipv6? && ai.ip_address == "::" && !ipv6_recvpktinfo + local_addrs.each {|a| + next if !a.ipv6? + ip_list << Addrinfo.new(a.to_sockaddr, :INET6, :DGRAM, 0); + } + else + ip_list << ai + end + } + + if port == 0 + sockets = ip_sockets_port0(ip_list, false) + else + ip_list.each {|ip| + ai = Addrinfo.udp(ip.ip_address, port) + begin + s = ai.bind + rescue SystemCallError + last_error = $! + next + end + sockets << s + } + if sockets.empty? + raise last_error + end + end + + pktinfo_sockets = {} + sockets.each {|s| + ai = s.local_address + if ipv6_recvpktinfo && ai.ipv6? && ai.ip_address == "::" + s.setsockopt(:IPV6, ipv6_recvpktinfo, 1) + pktinfo_sockets[s] = true + end + } + + sockets + end + + # :call-seq: + # Socket.udp_server_loop_on(sockets) {|msg, msg_src| ... } + # + # Run UDP server loop on the given sockets. + # + # The return value of Socket.udp_server_sockets is appropriate for the argument. + # + # It calls the block for each message received. + # + def self.udp_server_loop_on(sockets) # :yield: msg, msg_src + loop { + readable, _, _ = IO.select(sockets) + readable.each {|r| + begin + msg, sender_addrinfo, rflags, *controls = r.recvmsg_nonblock + rescue Errno::EWOULDBLOCK + next + end + ai = r.local_address + if ai.ipv6? and pktinfo = controls.find {|c| c.cmsg_is?(:IPV6, :PKTINFO) } + ai = Addrinfo.udp(pktinfo.ipv6_pktinfo_addr.ip_address, ai.ip_port) + yield msg, UDPSource.new(sender_addrinfo, ai) {|reply_msg| + r.sendmsg reply_msg, 0, sender_addrinfo, pktinfo + } + else + yield msg, UDPSource.new(sender_addrinfo, ai) {|reply_msg| + r.send reply_msg, 0, sender_addrinfo + } + end + } + } + end + + # :call-seq: + # Socket.udp_server_loop(port) {|msg, msg_src| ... } + # Socket.udp_server_loop(host, port) {|msg, msg_src| ... } + # + # creates a UDP server on _port_ and calls the block for each message arrived. + # The block is called with the message and its source information. + # + # This method allocates sockets internally using _port_. + # If _host_ is specified, it is used conjunction with _port_ to determine the server addresses. + # + # The _msg_ is a string. + # + # The _msg_src_ is a Socket::UDPSource object. + # It is used for reply. + # + # # UDP echo server. + # Socket.udp_server_loop(9261) {|msg, msg_src| + # msg_src.reply msg + # } + # + def self.udp_server_loop(host=nil, port, &b) # :yield: message, message_source + sockets = udp_server_sockets(host, port) + udp_server_loop_on(sockets, &b) + ensure + sockets.each {|s| s.close if !s.closed? } if sockets + end + + # UDP address information used by Socket.udp_server_loop. + class UDPSource + def initialize(remote_address, local_address, &reply_proc) + @remote_address = remote_address + @local_address = local_address + @reply_proc = reply_proc + end + attr_reader :remote_address, :local_address + + def inspect + "\#<#{self.class}: #{@sender.inspect_sockaddr} to #{@receiver.inspect_sockaddr}>" + end + + def reply(msg) + @reply_proc.call msg + end + end + # creates a new socket connected to path using UNIX socket socket. # # If a block is given, the block is called with the socket. diff --git a/test/socket/test_socket.rb b/test/socket/test_socket.rb index cb54540249..5b2b7b41d7 100644 --- a/test/socket/test_socket.rb +++ b/test/socket/test_socket.rb @@ -225,4 +225,38 @@ class TestSocket < Test::Unit::TestCase end + def test_udp_server + begin + ip_addrs = Socket.ip_address_list + rescue NotImplementedError + end + + sockets = Socket.udp_server_sockets(0) + port = sockets.first.local_address.ip_port + + th = Thread.new { + Socket.udp_server_loop_on(sockets) {|msg, msg_src| + break if msg == "exit" + rmsg = Marshal.dump([msg, msg_src.remote_address, msg_src.local_address]) + msg_src.reply rmsg + } + } + + ip_addrs.each {|ai| + Addrinfo.udp(ai.ip_address, port).connect {|s| + msg1 = "<<<#{ai.inspect}>>>" + s.sendmsg msg1 + msg2, addr = s.recvmsg + msg2, remote_address, local_address = Marshal.load(msg2) + assert_equal(msg1, msg2) + assert_equal(ai.ip_address, addr.ip_address) + } + } + ensure + if th + Addrinfo.udp("127.0.0.1", port).connect {|s| s.sendmsg "exit" } + th.join + end + end + end if defined?(Socket)