Merge branch 'bpf-sock-migration'

Kuniyuki Iwashima says:

====================
The SO_REUSEPORT option allows sockets to listen on the same port and to
accept connections evenly. However, there is a defect in the current
implementation [1]. When a SYN packet is received, the connection is tied
to a listening socket. Accordingly, when the listener is closed, in-flight
requests during the three-way handshake and child sockets in the accept
queue are dropped even if other listeners on the same port could accept
such connections.

This situation can happen when various server management tools restart
server (such as nginx) processes. For instance, when we change nginx
configurations and restart it, it spins up new workers that respect the
new configuration and closes all listeners on the old workers, resulting
in the in-flight ACK of 3WHS is responded by RST.

To avoid such a situation, users have to know deeply how the kernel handles
SYN packets and implement connection draining by eBPF [2]:

  1. Stop routing SYN packets to the listener by eBPF.
  2. Wait for all timers to expire to complete requests
  3. Accept connections until EAGAIN, then close the listener.

  or

  1. Start counting SYN packets and accept syscalls using the eBPF map.
  2. Stop routing SYN packets.
  3. Accept connections up to the count, then close the listener.

In either way, we cannot close a listener immediately. However, ideally,
the application need not drain the not yet accepted sockets because 3WHS
and tying a connection to a listener are just the kernel behaviour. The
root cause is within the kernel, so the issue should be addressed in kernel
space and should not be visible to user space. This patchset fixes it so
that users need not take care of kernel implementation and connection
draining. With this patchset, the kernel redistributes requests and
connections from a listener to the others in the same reuseport group
at/after close or shutdown syscalls.

Although some software does connection draining, there are still merits in
migration. For some security reasons, such as replacing TLS certificates,
we may want to apply new settings as soon as possible and/or we may not be
able to wait for connection draining. The sockets in the accept queue have
not started application sessions yet. So, if we do not drain such sockets,
they can be handled by the newer listeners and could have a longer
lifetime. It is difficult to drain all connections in every case, but we
can decrease such aborted connections by migration. In that sense,
migration is always better than draining.

Moreover, auto-migration simplifies user space logic and also works well in
a case where we cannot modify and build a server program to implement the
workaround.

Note that the source and destination listeners MUST have the same settings
at the socket API level; otherwise, applications may face inconsistency and
cause errors. In such a case, we have to use the eBPF program to select a
specific listener or to cancel migration.

Special thanks to Martin KaFai Lau for bouncing ideas and exchanging code
snippets along the way.

Link:
 [1] The SO_REUSEPORT socket option
 https://lwn.net/Articles/542629/

 [2] Re: [PATCH 1/1] net: Add SO_REUSEPORT_LISTEN_OFF socket option as drain mode
 https://lore.kernel.org/netdev/1458828813.10868.65.camel@edumazet-glaptop3.roam.corp.google.com/

Changelog:
 v8:
  * Make reuse const in reuseport_sock_index()
  * Don't use __reuseport_add_sock() in reuseport_alloc()
  * Change the arg of the second memcpy() in reuseport_grow()
  * Fix coding style to use goto in reuseport_alloc()
  * Keep sk_refcnt uninitialized in inet_reqsk_clone()
  * Initialize ireq_opt and ipv6_opt separately in reqsk_migrate_reset()

  [ This series does not include a stats patch suggested by Yuchung Cheng
    not to drop Acked-by/Reviewed-by tags and save reviewer's time. I will
    post the patch as a follow up after this series is merged. ]

 v7:
 https://lore.kernel.org/bpf/20210521182104.18273-1-kuniyu@amazon.co.jp/
  * Prevent attaching/detaching a bpf prog via shutdowned socket
  * Fix typo in commit messages
  * Split selftest into subtests

 v6:
 https://lore.kernel.org/bpf/20210517002258.75019-1-kuniyu@amazon.co.jp/
  * Change description in ip-sysctl.rst
  * Test IPPROTO_TCP before reading tfo_listener
  * Move reqsk_clone() to inet_connection_sock.c and rename to
    inet_reqsk_clone()
  * Pass req->rsk_listener to inet_csk_reqsk_queue_drop() and
    reqsk_queue_removed() in the migration path of receiving ACK
  * s/ARG_PTR_TO_SOCKET/PTR_TO_SOCKET/ in sk_reuseport_is_valid_access()
  * In selftest, use atomic ops to increment global vars, drop ACK by XDP,
    enable force fastopen, use "skel->bss" instead of "skel->data"

 v5:
 https://lore.kernel.org/bpf/20210510034433.52818-1-kuniyu@amazon.co.jp/
  * Move initializtion of sk_node from 6th to 5th patch
  * Initialize sk_refcnt in reqsk_clone()
  * Modify some definitions in reqsk_timer_handler()
  * Validate in which path/state migration happens in selftest

 v4:
 https://lore.kernel.org/bpf/20210427034623.46528-1-kuniyu@amazon.co.jp/
  * Make some functions and variables 'static' in selftest
  * Remove 'scalability' from the cover letter

 v3:
 https://lore.kernel.org/bpf/20210420154140.80034-1-kuniyu@amazon.co.jp/
  * Add sysctl back for reuseport_grow()
  * Add helper functions to manage socks[]
  * Separate migration related logic into functions: reuseport_resurrect(),
    reuseport_stop_listen_sock(), reuseport_migrate_sock()
  * Clone request_sock to be migrated
  * Migrate request one by one
  * Pass child socket to eBPF prog

 v2:
 https://lore.kernel.org/netdev/20201207132456.65472-1-kuniyu@amazon.co.jp/
  * Do not save closed sockets in socks[]
  * Revert 607904c357
  * Extract inet_csk_reqsk_queue_migrate() into a single patch
  * Change the spin_lock order to avoid lockdep warning
  * Add static to __reuseport_select_sock
  * Use refcount_inc_not_zero() in reuseport_select_migrated_sock()
  * Set the default attach type in bpf_prog_load_check_attach()
  * Define new proto of BPF_FUNC_get_socket_cookie
  * Fix test to be compiled successfully
  * Update commit messages

 v1:
 https://lore.kernel.org/netdev/20201201144418.35045-1-kuniyu@amazon.co.jp/
  * Remove the sysctl option
  * Enable migration if eBPF progam is not attached
  * Add expected_attach_type to check if eBPF program can migrate sockets
  * Add a field to tell migration type to eBPF program
  * Support BPF_FUNC_get_socket_cookie to get the cookie of sk
  * Allocate an empty skb if skb is NULL
  * Pass req_to_sk(req)->sk_hash because listener's hash is zero
  * Update commit messages and coverletter

 RFC:
 https://lore.kernel.org/netdev/20201117094023.3685-1-kuniyu@amazon.co.jp/
====================

Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
This commit is contained in:
Daniel Borkmann 2021-06-15 18:01:06 +02:00
Родитель bbf29d3a2e c9d0bdef89
Коммит 1f26622b79
21 изменённых файлов: 1335 добавлений и 68 удалений

Просмотреть файл

@ -761,6 +761,31 @@ tcp_syncookies - INTEGER
network connections you can set this knob to 2 to enable
unconditionally generation of syncookies.
tcp_migrate_req - BOOLEAN
The incoming connection is tied to a specific listening socket when
the initial SYN packet is received during the three-way handshake.
When a listener is closed, in-flight request sockets during the
handshake and established sockets in the accept queue are aborted.
If the listener has SO_REUSEPORT enabled, other listeners on the
same port should have been able to accept such connections. This
option makes it possible to migrate such child sockets to another
listener after close() or shutdown().
The BPF_SK_REUSEPORT_SELECT_OR_MIGRATE type of eBPF program should
usually be used to define the policy to pick an alive listener.
Otherwise, the kernel will randomly pick an alive listener only if
this option is enabled.
Note that migration between listeners with different settings may
crash applications. Let's say migration happens from listener A to
B, and only B has TCP_SAVE_SYN enabled. B cannot read SYN data from
the requests migrated from A. To avoid such a situation, cancel
migration by returning SK_DROP in the type of eBPF program, or
disable this option.
Default: 0
tcp_fastopen - INTEGER
Enable TCP Fast Open (RFC7413) to send and accept data in the opening
SYN packet.

Просмотреть файл

@ -2048,6 +2048,7 @@ struct sk_reuseport_kern {
struct sk_buff *skb;
struct sock *sk;
struct sock *selected_sk;
struct sock *migrating_sk;
void *data_end;
u32 hash;
u32 reuseport_id;

Просмотреть файл

@ -996,11 +996,13 @@ void bpf_warn_invalid_xdp_action(u32 act);
#ifdef CONFIG_INET
struct sock *bpf_run_sk_reuseport(struct sock_reuseport *reuse, struct sock *sk,
struct bpf_prog *prog, struct sk_buff *skb,
struct sock *migrating_sk,
u32 hash);
#else
static inline struct sock *
bpf_run_sk_reuseport(struct sock_reuseport *reuse, struct sock *sk,
struct bpf_prog *prog, struct sk_buff *skb,
struct sock *migrating_sk,
u32 hash)
{
return NULL;

Просмотреть файл

@ -126,6 +126,7 @@ struct netns_ipv4 {
u8 sysctl_tcp_syn_retries;
u8 sysctl_tcp_synack_retries;
u8 sysctl_tcp_syncookies;
u8 sysctl_tcp_migrate_req;
int sysctl_tcp_reordering;
u8 sysctl_tcp_retries1;
u8 sysctl_tcp_retries2;

Просмотреть файл

@ -15,6 +15,7 @@ struct sock_reuseport {
u16 max_socks; /* length of socks */
u16 num_socks; /* elements in socks */
u16 num_closed_socks; /* closed elements in socks */
/* The last synq overflow event timestamp of this
* reuse->socks[] group.
*/
@ -31,10 +32,14 @@ extern int reuseport_alloc(struct sock *sk, bool bind_inany);
extern int reuseport_add_sock(struct sock *sk, struct sock *sk2,
bool bind_inany);
extern void reuseport_detach_sock(struct sock *sk);
void reuseport_stop_listen_sock(struct sock *sk);
extern struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len);
struct sock *reuseport_migrate_sock(struct sock *sk,
struct sock *migrating_sk,
struct sk_buff *skb);
extern int reuseport_attach_prog(struct sock *sk, struct bpf_prog *prog);
extern int reuseport_detach_prog(struct sock *sk);

Просмотреть файл

@ -994,6 +994,8 @@ enum bpf_attach_type {
BPF_SK_LOOKUP,
BPF_XDP,
BPF_SK_SKB_VERDICT,
BPF_SK_REUSEPORT_SELECT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE,
__MAX_BPF_ATTACH_TYPE
};
@ -5416,6 +5418,20 @@ struct sk_reuseport_md {
__u32 ip_protocol; /* IP protocol. e.g. IPPROTO_TCP, IPPROTO_UDP */
__u32 bind_inany; /* Is sock bound to an INANY address? */
__u32 hash; /* A hash of the packet 4 tuples */
/* When reuse->migrating_sk is NULL, it is selecting a sk for the
* new incoming connection request (e.g. selecting a listen sk for
* the received SYN in the TCP case). reuse->sk is one of the sk
* in the reuseport group. The bpf prog can use reuse->sk to learn
* the local listening ip/port without looking into the skb.
*
* When reuse->migrating_sk is not NULL, reuse->sk is closed and
* reuse->migrating_sk is the socket that needs to be migrated
* to another listening socket. migrating_sk could be a fullsock
* sk that is fully established or a reqsk that is in-the-middle
* of 3-way handshake.
*/
__bpf_md_ptr(struct bpf_sock *, sk);
__bpf_md_ptr(struct bpf_sock *, migrating_sk);
};
#define BPF_TAG_SIZE 8

Просмотреть файл

@ -1972,6 +1972,11 @@ static void bpf_prog_load_fixup_attach_type(union bpf_attr *attr)
attr->expected_attach_type =
BPF_CGROUP_INET_SOCK_CREATE;
break;
case BPF_PROG_TYPE_SK_REUSEPORT:
if (!attr->expected_attach_type)
attr->expected_attach_type =
BPF_SK_REUSEPORT_SELECT;
break;
}
}
@ -2055,6 +2060,14 @@ bpf_prog_load_check_attach(enum bpf_prog_type prog_type,
if (expected_attach_type == BPF_SK_LOOKUP)
return 0;
return -EINVAL;
case BPF_PROG_TYPE_SK_REUSEPORT:
switch (expected_attach_type) {
case BPF_SK_REUSEPORT_SELECT:
case BPF_SK_REUSEPORT_SELECT_OR_MIGRATE:
return 0;
default:
return -EINVAL;
}
case BPF_PROG_TYPE_SYSCALL:
case BPF_PROG_TYPE_EXT:
if (expected_attach_type)

Просмотреть файл

@ -10044,11 +10044,13 @@ out:
static void bpf_init_reuseport_kern(struct sk_reuseport_kern *reuse_kern,
struct sock_reuseport *reuse,
struct sock *sk, struct sk_buff *skb,
struct sock *migrating_sk,
u32 hash)
{
reuse_kern->skb = skb;
reuse_kern->sk = sk;
reuse_kern->selected_sk = NULL;
reuse_kern->migrating_sk = migrating_sk;
reuse_kern->data_end = skb->data + skb_headlen(skb);
reuse_kern->hash = hash;
reuse_kern->reuseport_id = reuse->reuseport_id;
@ -10057,12 +10059,13 @@ static void bpf_init_reuseport_kern(struct sk_reuseport_kern *reuse_kern,
struct sock *bpf_run_sk_reuseport(struct sock_reuseport *reuse, struct sock *sk,
struct bpf_prog *prog, struct sk_buff *skb,
struct sock *migrating_sk,
u32 hash)
{
struct sk_reuseport_kern reuse_kern;
enum sk_action action;
bpf_init_reuseport_kern(&reuse_kern, reuse, sk, skb, hash);
bpf_init_reuseport_kern(&reuse_kern, reuse, sk, skb, migrating_sk, hash);
action = BPF_PROG_RUN(prog, &reuse_kern);
if (action == SK_PASS)
@ -10172,6 +10175,8 @@ sk_reuseport_func_proto(enum bpf_func_id func_id,
return &sk_reuseport_load_bytes_proto;
case BPF_FUNC_skb_load_bytes_relative:
return &sk_reuseport_load_bytes_relative_proto;
case BPF_FUNC_get_socket_cookie:
return &bpf_get_socket_ptr_cookie_proto;
default:
return bpf_base_func_proto(func_id);
}
@ -10201,6 +10206,14 @@ sk_reuseport_is_valid_access(int off, int size,
case offsetof(struct sk_reuseport_md, hash):
return size == size_default;
case offsetof(struct sk_reuseport_md, sk):
info->reg_type = PTR_TO_SOCKET;
return size == sizeof(__u64);
case offsetof(struct sk_reuseport_md, migrating_sk):
info->reg_type = PTR_TO_SOCK_COMMON_OR_NULL;
return size == sizeof(__u64);
/* Fields that allow narrowing */
case bpf_ctx_range(struct sk_reuseport_md, eth_protocol):
if (size < sizeof_field(struct sk_buff, protocol))
@ -10273,6 +10286,14 @@ static u32 sk_reuseport_convert_ctx_access(enum bpf_access_type type,
case offsetof(struct sk_reuseport_md, bind_inany):
SK_REUSEPORT_LOAD_FIELD(bind_inany);
break;
case offsetof(struct sk_reuseport_md, sk):
SK_REUSEPORT_LOAD_FIELD(sk);
break;
case offsetof(struct sk_reuseport_md, migrating_sk):
SK_REUSEPORT_LOAD_FIELD(migrating_sk);
break;
}
return insn - insn_buf;

Просмотреть файл

@ -17,6 +17,74 @@
DEFINE_SPINLOCK(reuseport_lock);
static DEFINE_IDA(reuseport_ida);
static int reuseport_resurrect(struct sock *sk, struct sock_reuseport *old_reuse,
struct sock_reuseport *reuse, bool bind_inany);
static int reuseport_sock_index(struct sock *sk,
const struct sock_reuseport *reuse,
bool closed)
{
int left, right;
if (!closed) {
left = 0;
right = reuse->num_socks;
} else {
left = reuse->max_socks - reuse->num_closed_socks;
right = reuse->max_socks;
}
for (; left < right; left++)
if (reuse->socks[left] == sk)
return left;
return -1;
}
static void __reuseport_add_sock(struct sock *sk,
struct sock_reuseport *reuse)
{
reuse->socks[reuse->num_socks] = sk;
/* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
smp_wmb();
reuse->num_socks++;
}
static bool __reuseport_detach_sock(struct sock *sk,
struct sock_reuseport *reuse)
{
int i = reuseport_sock_index(sk, reuse, false);
if (i == -1)
return false;
reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
reuse->num_socks--;
return true;
}
static void __reuseport_add_closed_sock(struct sock *sk,
struct sock_reuseport *reuse)
{
reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
}
static bool __reuseport_detach_closed_sock(struct sock *sk,
struct sock_reuseport *reuse)
{
int i = reuseport_sock_index(sk, reuse, true);
if (i == -1)
return false;
reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
return true;
}
static struct sock_reuseport *__reuseport_alloc(unsigned int max_socks)
{
@ -49,6 +117,12 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
if (reuse) {
if (reuse->num_closed_socks) {
/* sk was shutdown()ed before */
ret = reuseport_resurrect(sk, reuse, NULL, bind_inany);
goto out;
}
/* Only set reuse->bind_inany if the bind_inany is true.
* Otherwise, it will overwrite the reuse->bind_inany
* which was set by the bind/hash path.
@ -72,9 +146,9 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
}
reuse->reuseport_id = id;
reuse->bind_inany = bind_inany;
reuse->socks[0] = sk;
reuse->num_socks = 1;
reuse->bind_inany = bind_inany;
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
out:
@ -90,14 +164,30 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
u32 more_socks_size, i;
more_socks_size = reuse->max_socks * 2U;
if (more_socks_size > U16_MAX)
if (more_socks_size > U16_MAX) {
if (reuse->num_closed_socks) {
/* Make room by removing a closed sk.
* The child has already been migrated.
* Only reqsk left at this point.
*/
struct sock *sk;
sk = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
RCU_INIT_POINTER(sk->sk_reuseport_cb, NULL);
__reuseport_detach_closed_sock(sk, reuse);
return reuse;
}
return NULL;
}
more_reuse = __reuseport_alloc(more_socks_size);
if (!more_reuse)
return NULL;
more_reuse->num_socks = reuse->num_socks;
more_reuse->num_closed_socks = reuse->num_closed_socks;
more_reuse->prog = reuse->prog;
more_reuse->reuseport_id = reuse->reuseport_id;
more_reuse->bind_inany = reuse->bind_inany;
@ -105,9 +195,13 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
memcpy(more_reuse->socks, reuse->socks,
reuse->num_socks * sizeof(struct sock *));
memcpy(more_reuse->socks +
(more_reuse->max_socks - more_reuse->num_closed_socks),
reuse->socks + (reuse->max_socks - reuse->num_closed_socks),
reuse->num_closed_socks * sizeof(struct sock *));
more_reuse->synq_overflow_ts = READ_ONCE(reuse->synq_overflow_ts);
for (i = 0; i < reuse->num_socks; ++i)
for (i = 0; i < reuse->max_socks; ++i)
rcu_assign_pointer(reuse->socks[i]->sk_reuseport_cb,
more_reuse);
@ -153,12 +247,20 @@ int reuseport_add_sock(struct sock *sk, struct sock *sk2, bool bind_inany)
lockdep_is_held(&reuseport_lock));
old_reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
if (old_reuse && old_reuse->num_closed_socks) {
/* sk was shutdown()ed before */
int err = reuseport_resurrect(sk, old_reuse, reuse, reuse->bind_inany);
spin_unlock_bh(&reuseport_lock);
return err;
}
if (old_reuse && old_reuse->num_socks != 1) {
spin_unlock_bh(&reuseport_lock);
return -EBUSY;
}
if (reuse->num_socks == reuse->max_socks) {
if (reuse->num_socks + reuse->num_closed_socks == reuse->max_socks) {
reuse = reuseport_grow(reuse);
if (!reuse) {
spin_unlock_bh(&reuseport_lock);
@ -166,10 +268,7 @@ int reuseport_add_sock(struct sock *sk, struct sock *sk2, bool bind_inany)
}
}
reuse->socks[reuse->num_socks] = sk;
/* paired with smp_rmb() in reuseport_select_sock() */
smp_wmb();
reuse->num_socks++;
__reuseport_add_sock(sk, reuse);
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
spin_unlock_bh(&reuseport_lock);
@ -180,15 +279,77 @@ int reuseport_add_sock(struct sock *sk, struct sock *sk2, bool bind_inany)
}
EXPORT_SYMBOL(reuseport_add_sock);
static int reuseport_resurrect(struct sock *sk, struct sock_reuseport *old_reuse,
struct sock_reuseport *reuse, bool bind_inany)
{
if (old_reuse == reuse) {
/* If sk was in the same reuseport group, just pop sk out of
* the closed section and push sk into the listening section.
*/
__reuseport_detach_closed_sock(sk, old_reuse);
__reuseport_add_sock(sk, old_reuse);
return 0;
}
if (!reuse) {
/* In bind()/listen() path, we cannot carry over the eBPF prog
* for the shutdown()ed socket. In setsockopt() path, we should
* not change the eBPF prog of listening sockets by attaching a
* prog to the shutdown()ed socket. Thus, we will allocate a new
* reuseport group and detach sk from the old group.
*/
int id;
reuse = __reuseport_alloc(INIT_SOCKS);
if (!reuse)
return -ENOMEM;
id = ida_alloc(&reuseport_ida, GFP_ATOMIC);
if (id < 0) {
kfree(reuse);
return id;
}
reuse->reuseport_id = id;
reuse->bind_inany = bind_inany;
} else {
/* Move sk from the old group to the new one if
* - all the other listeners in the old group were close()d or
* shutdown()ed, and then sk2 has listen()ed on the same port
* OR
* - sk listen()ed without bind() (or with autobind), was
* shutdown()ed, and then listen()s on another port which
* sk2 listen()s on.
*/
if (reuse->num_socks + reuse->num_closed_socks == reuse->max_socks) {
reuse = reuseport_grow(reuse);
if (!reuse)
return -ENOMEM;
}
}
__reuseport_detach_closed_sock(sk, old_reuse);
__reuseport_add_sock(sk, reuse);
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
if (old_reuse->num_socks + old_reuse->num_closed_socks == 0)
call_rcu(&old_reuse->rcu, reuseport_free_rcu);
return 0;
}
void reuseport_detach_sock(struct sock *sk)
{
struct sock_reuseport *reuse;
int i;
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
/* reuseport_grow() has detached a closed sk */
if (!reuse)
goto out;
/* Notify the bpf side. The sk may be added to a sockarray
* map. If so, sockarray logic will remove it from the map.
*
@ -201,19 +362,52 @@ void reuseport_detach_sock(struct sock *sk)
rcu_assign_pointer(sk->sk_reuseport_cb, NULL);
for (i = 0; i < reuse->num_socks; i++) {
if (reuse->socks[i] == sk) {
reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
reuse->num_socks--;
if (reuse->num_socks == 0)
if (!__reuseport_detach_closed_sock(sk, reuse))
__reuseport_detach_sock(sk, reuse);
if (reuse->num_socks + reuse->num_closed_socks == 0)
call_rcu(&reuse->rcu, reuseport_free_rcu);
break;
}
}
out:
spin_unlock_bh(&reuseport_lock);
}
EXPORT_SYMBOL(reuseport_detach_sock);
void reuseport_stop_listen_sock(struct sock *sk)
{
if (sk->sk_protocol == IPPROTO_TCP) {
struct sock_reuseport *reuse;
struct bpf_prog *prog;
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
prog = rcu_dereference_protected(reuse->prog,
lockdep_is_held(&reuseport_lock));
if (sock_net(sk)->ipv4.sysctl_tcp_migrate_req ||
(prog && prog->expected_attach_type == BPF_SK_REUSEPORT_SELECT_OR_MIGRATE)) {
/* Migration capable, move sk from the listening section
* to the closed section.
*/
bpf_sk_reuseport_detach(sk);
__reuseport_detach_sock(sk, reuse);
__reuseport_add_closed_sock(sk, reuse);
spin_unlock_bh(&reuseport_lock);
return;
}
spin_unlock_bh(&reuseport_lock);
}
/* Not capable to do migration, detach immediately */
reuseport_detach_sock(sk);
}
EXPORT_SYMBOL(reuseport_stop_listen_sock);
static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
struct bpf_prog *prog, struct sk_buff *skb,
int hdr_len)
@ -244,6 +438,23 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
return reuse->socks[index];
}
static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
u32 hash, u16 num_socks)
{
int i, j;
i = j = reciprocal_scale(hash, num_socks);
while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
i++;
if (i >= num_socks)
i = 0;
if (i == j)
return NULL;
}
return reuse->socks[i];
}
/**
* reuseport_select_sock - Select a socket from an SO_REUSEPORT group.
* @sk: First socket in the group.
@ -274,32 +485,21 @@ struct sock *reuseport_select_sock(struct sock *sk,
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
/* paired with smp_wmb() in __reuseport_add_sock() */
smp_rmb();
if (!prog || !skb)
goto select_by_hash;
if (prog->type == BPF_PROG_TYPE_SK_REUSEPORT)
sk2 = bpf_run_sk_reuseport(reuse, sk, prog, skb, hash);
sk2 = bpf_run_sk_reuseport(reuse, sk, prog, skb, NULL, hash);
else
sk2 = run_bpf_filter(reuse, socks, prog, skb, hdr_len);
select_by_hash:
/* no bpf or invalid bpf result: fall back to hash usage */
if (!sk2) {
int i, j;
i = j = reciprocal_scale(hash, socks);
while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
i++;
if (i >= socks)
i = 0;
if (i == j)
goto out;
}
sk2 = reuse->socks[i];
}
if (!sk2)
sk2 = reuseport_select_sock_by_hash(reuse, hash, socks);
}
out:
@ -308,14 +508,84 @@ out:
}
EXPORT_SYMBOL(reuseport_select_sock);
/**
* reuseport_migrate_sock - Select a socket from an SO_REUSEPORT group.
* @sk: close()ed or shutdown()ed socket in the group.
* @migrating_sk: ESTABLISHED/SYN_RECV full socket in the accept queue or
* NEW_SYN_RECV request socket during 3WHS.
* @skb: skb to run through BPF filter.
* Returns a socket (with sk_refcnt +1) that should accept the child socket
* (or NULL on error).
*/
struct sock *reuseport_migrate_sock(struct sock *sk,
struct sock *migrating_sk,
struct sk_buff *skb)
{
struct sock_reuseport *reuse;
struct sock *nsk = NULL;
bool allocated = false;
struct bpf_prog *prog;
u16 socks;
u32 hash;
rcu_read_lock();
reuse = rcu_dereference(sk->sk_reuseport_cb);
if (!reuse)
goto out;
socks = READ_ONCE(reuse->num_socks);
if (unlikely(!socks))
goto out;
/* paired with smp_wmb() in __reuseport_add_sock() */
smp_rmb();
hash = migrating_sk->sk_hash;
prog = rcu_dereference(reuse->prog);
if (!prog || prog->expected_attach_type != BPF_SK_REUSEPORT_SELECT_OR_MIGRATE) {
if (sock_net(sk)->ipv4.sysctl_tcp_migrate_req)
goto select_by_hash;
goto out;
}
if (!skb) {
skb = alloc_skb(0, GFP_ATOMIC);
if (!skb)
goto out;
allocated = true;
}
nsk = bpf_run_sk_reuseport(reuse, sk, prog, skb, migrating_sk, hash);
if (allocated)
kfree_skb(skb);
select_by_hash:
if (!nsk)
nsk = reuseport_select_sock_by_hash(reuse, hash, socks);
if (IS_ERR_OR_NULL(nsk) || unlikely(!refcount_inc_not_zero(&nsk->sk_refcnt)))
nsk = NULL;
out:
rcu_read_unlock();
return nsk;
}
EXPORT_SYMBOL(reuseport_migrate_sock);
int reuseport_attach_prog(struct sock *sk, struct bpf_prog *prog)
{
struct sock_reuseport *reuse;
struct bpf_prog *old_prog;
if (sk_unhashed(sk) && sk->sk_reuseport) {
int err = reuseport_alloc(sk, false);
if (sk_unhashed(sk)) {
int err;
if (!sk->sk_reuseport)
return -EINVAL;
err = reuseport_alloc(sk, false);
if (err)
return err;
} else if (!rcu_access_pointer(sk->sk_reuseport_cb)) {
@ -341,13 +611,24 @@ int reuseport_detach_prog(struct sock *sk)
struct sock_reuseport *reuse;
struct bpf_prog *old_prog;
if (!rcu_access_pointer(sk->sk_reuseport_cb))
return sk->sk_reuseport ? -ENOENT : -EINVAL;
old_prog = NULL;
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
/* reuse must be checked after acquiring the reuseport_lock
* because reuseport_grow() can detach a closed sk.
*/
if (!reuse) {
spin_unlock_bh(&reuseport_lock);
return sk->sk_reuseport ? -ENOENT : -EINVAL;
}
if (sk_unhashed(sk) && reuse->num_closed_socks) {
spin_unlock_bh(&reuseport_lock);
return -ENOENT;
}
old_prog = rcu_replace_pointer(reuse->prog, old_prog,
lockdep_is_held(&reuseport_lock));
spin_unlock_bh(&reuseport_lock);

Просмотреть файл

@ -135,10 +135,18 @@ static int inet_csk_bind_conflict(const struct sock *sk,
bool relax, bool reuseport_ok)
{
struct sock *sk2;
bool reuseport_cb_ok;
bool reuse = sk->sk_reuse;
bool reuseport = !!sk->sk_reuseport;
struct sock_reuseport *reuseport_cb;
kuid_t uid = sock_i_uid((struct sock *)sk);
rcu_read_lock();
reuseport_cb = rcu_dereference(sk->sk_reuseport_cb);
/* paired with WRITE_ONCE() in __reuseport_(add|detach)_closed_sock */
reuseport_cb_ok = !reuseport_cb || READ_ONCE(reuseport_cb->num_closed_socks);
rcu_read_unlock();
/*
* Unlike other sk lookup places we do not check
* for sk_net here, since _all_ the socks listed
@ -156,14 +164,14 @@ static int inet_csk_bind_conflict(const struct sock *sk,
if ((!relax ||
(!reuseport_ok &&
reuseport && sk2->sk_reuseport &&
!rcu_access_pointer(sk->sk_reuseport_cb) &&
reuseport_cb_ok &&
(sk2->sk_state == TCP_TIME_WAIT ||
uid_eq(uid, sock_i_uid(sk2))))) &&
inet_rcv_saddr_equal(sk, sk2, true))
break;
} else if (!reuseport_ok ||
!reuseport || !sk2->sk_reuseport ||
rcu_access_pointer(sk->sk_reuseport_cb) ||
!reuseport_cb_ok ||
(sk2->sk_state != TCP_TIME_WAIT &&
!uid_eq(uid, sock_i_uid(sk2)))) {
if (inet_rcv_saddr_equal(sk, sk2, true))
@ -687,6 +695,64 @@ int inet_rtx_syn_ack(const struct sock *parent, struct request_sock *req)
}
EXPORT_SYMBOL(inet_rtx_syn_ack);
static struct request_sock *inet_reqsk_clone(struct request_sock *req,
struct sock *sk)
{
struct sock *req_sk, *nreq_sk;
struct request_sock *nreq;
nreq = kmem_cache_alloc(req->rsk_ops->slab, GFP_ATOMIC | __GFP_NOWARN);
if (!nreq) {
/* paired with refcount_inc_not_zero() in reuseport_migrate_sock() */
sock_put(sk);
return NULL;
}
req_sk = req_to_sk(req);
nreq_sk = req_to_sk(nreq);
memcpy(nreq_sk, req_sk,
offsetof(struct sock, sk_dontcopy_begin));
memcpy(&nreq_sk->sk_dontcopy_end, &req_sk->sk_dontcopy_end,
req->rsk_ops->obj_size - offsetof(struct sock, sk_dontcopy_end));
sk_node_init(&nreq_sk->sk_node);
nreq_sk->sk_tx_queue_mapping = req_sk->sk_tx_queue_mapping;
#ifdef CONFIG_XPS
nreq_sk->sk_rx_queue_mapping = req_sk->sk_rx_queue_mapping;
#endif
nreq_sk->sk_incoming_cpu = req_sk->sk_incoming_cpu;
nreq->rsk_listener = sk;
/* We need not acquire fastopenq->lock
* because the child socket is locked in inet_csk_listen_stop().
*/
if (sk->sk_protocol == IPPROTO_TCP && tcp_rsk(nreq)->tfo_listener)
rcu_assign_pointer(tcp_sk(nreq->sk)->fastopen_rsk, nreq);
return nreq;
}
static void reqsk_queue_migrated(struct request_sock_queue *queue,
const struct request_sock *req)
{
if (req->num_timeout == 0)
atomic_inc(&queue->young);
atomic_inc(&queue->qlen);
}
static void reqsk_migrate_reset(struct request_sock *req)
{
req->saved_syn = NULL;
#if IS_ENABLED(CONFIG_IPV6)
inet_rsk(req)->ipv6_opt = NULL;
inet_rsk(req)->pktopts = NULL;
#else
inet_rsk(req)->ireq_opt = NULL;
#endif
}
/* return true if req was found in the ehash table */
static bool reqsk_queue_unlink(struct request_sock *req)
{
@ -727,15 +793,39 @@ EXPORT_SYMBOL(inet_csk_reqsk_queue_drop_and_put);
static void reqsk_timer_handler(struct timer_list *t)
{
struct request_sock *req = from_timer(req, t, rsk_timer);
struct request_sock *nreq = NULL, *oreq = req;
struct sock *sk_listener = req->rsk_listener;
struct net *net = sock_net(sk_listener);
struct inet_connection_sock *icsk = inet_csk(sk_listener);
struct request_sock_queue *queue = &icsk->icsk_accept_queue;
struct inet_connection_sock *icsk;
struct request_sock_queue *queue;
struct net *net;
int max_syn_ack_retries, qlen, expire = 0, resend = 0;
if (inet_sk_state_load(sk_listener) != TCP_LISTEN)
if (inet_sk_state_load(sk_listener) != TCP_LISTEN) {
struct sock *nsk;
nsk = reuseport_migrate_sock(sk_listener, req_to_sk(req), NULL);
if (!nsk)
goto drop;
nreq = inet_reqsk_clone(req, nsk);
if (!nreq)
goto drop;
/* The new timer for the cloned req can decrease the 2
* by calling inet_csk_reqsk_queue_drop_and_put(), so
* hold another count to prevent use-after-free and
* call reqsk_put() just before return.
*/
refcount_set(&nreq->rsk_refcnt, 2 + 1);
timer_setup(&nreq->rsk_timer, reqsk_timer_handler, TIMER_PINNED);
reqsk_queue_migrated(&inet_csk(nsk)->icsk_accept_queue, req);
req = nreq;
sk_listener = nsk;
}
icsk = inet_csk(sk_listener);
net = sock_net(sk_listener);
max_syn_ack_retries = icsk->icsk_syn_retries ? : net->ipv4.sysctl_tcp_synack_retries;
/* Normally all the openreqs are young and become mature
* (i.e. converted to established socket) for first timeout.
@ -754,6 +844,7 @@ static void reqsk_timer_handler(struct timer_list *t)
* embrions; and abort old ones without pity, if old
* ones are about to clog our table.
*/
queue = &icsk->icsk_accept_queue;
qlen = reqsk_queue_len(queue);
if ((qlen << 1) > max(8U, READ_ONCE(sk_listener->sk_max_ack_backlog))) {
int young = reqsk_queue_len_young(queue) << 1;
@ -778,10 +869,36 @@ static void reqsk_timer_handler(struct timer_list *t)
atomic_dec(&queue->young);
timeo = min(TCP_TIMEOUT_INIT << req->num_timeout, TCP_RTO_MAX);
mod_timer(&req->rsk_timer, jiffies + timeo);
if (!nreq)
return;
if (!inet_ehash_insert(req_to_sk(nreq), req_to_sk(oreq), NULL)) {
/* delete timer */
inet_csk_reqsk_queue_drop(sk_listener, nreq);
goto drop;
}
reqsk_migrate_reset(oreq);
reqsk_queue_removed(&inet_csk(oreq->rsk_listener)->icsk_accept_queue, oreq);
reqsk_put(oreq);
reqsk_put(nreq);
return;
}
drop:
inet_csk_reqsk_queue_drop_and_put(sk_listener, req);
/* Even if we can clone the req, we may need not retransmit any more
* SYN+ACKs (nreq->num_timeout > max_syn_ack_retries, etc), or another
* CPU may win the "own_req" race so that inet_ehash_insert() fails.
*/
if (nreq) {
reqsk_migrate_reset(nreq);
reqsk_queue_removed(queue, nreq);
__reqsk_free(nreq);
}
inet_csk_reqsk_queue_drop_and_put(oreq->rsk_listener, oreq);
}
static void reqsk_queue_hash_req(struct request_sock *req,
@ -997,12 +1114,40 @@ struct sock *inet_csk_complete_hashdance(struct sock *sk, struct sock *child,
struct request_sock *req, bool own_req)
{
if (own_req) {
inet_csk_reqsk_queue_drop(sk, req);
reqsk_queue_removed(&inet_csk(sk)->icsk_accept_queue, req);
if (inet_csk_reqsk_queue_add(sk, req, child))
inet_csk_reqsk_queue_drop(req->rsk_listener, req);
reqsk_queue_removed(&inet_csk(req->rsk_listener)->icsk_accept_queue, req);
if (sk != req->rsk_listener) {
/* another listening sk has been selected,
* migrate the req to it.
*/
struct request_sock *nreq;
/* hold a refcnt for the nreq->rsk_listener
* which is assigned in inet_reqsk_clone()
*/
sock_hold(sk);
nreq = inet_reqsk_clone(req, sk);
if (!nreq) {
inet_child_forget(sk, req, child);
goto child_put;
}
refcount_set(&nreq->rsk_refcnt, 1);
if (inet_csk_reqsk_queue_add(sk, nreq, child)) {
reqsk_migrate_reset(req);
reqsk_put(req);
return child;
}
reqsk_migrate_reset(nreq);
__reqsk_free(nreq);
} else if (inet_csk_reqsk_queue_add(sk, req, child)) {
return child;
}
}
/* Too bad, another child took ownership of the request, undo. */
child_put:
bh_unlock_sock(child);
sock_put(child);
return NULL;
@ -1028,14 +1173,36 @@ void inet_csk_listen_stop(struct sock *sk)
* of the variants now. --ANK
*/
while ((req = reqsk_queue_remove(queue, sk)) != NULL) {
struct sock *child = req->sk;
struct sock *child = req->sk, *nsk;
struct request_sock *nreq;
local_bh_disable();
bh_lock_sock(child);
WARN_ON(sock_owned_by_user(child));
sock_hold(child);
nsk = reuseport_migrate_sock(sk, child, NULL);
if (nsk) {
nreq = inet_reqsk_clone(req, nsk);
if (nreq) {
refcount_set(&nreq->rsk_refcnt, 1);
if (inet_csk_reqsk_queue_add(nsk, nreq, child)) {
reqsk_migrate_reset(req);
} else {
reqsk_migrate_reset(nreq);
__reqsk_free(nreq);
}
/* inet_csk_reqsk_queue_add() has already
* called inet_child_forget() on failure case.
*/
goto skip_child_forget;
}
}
inet_child_forget(sk, req, child);
skip_child_forget:
reqsk_put(req);
bh_unlock_sock(child);
local_bh_enable();

Просмотреть файл

@ -697,7 +697,7 @@ void inet_unhash(struct sock *sk)
goto unlock;
if (rcu_access_pointer(sk->sk_reuseport_cb))
reuseport_detach_sock(sk);
reuseport_stop_listen_sock(sk);
if (ilb) {
inet_unhash2(hashinfo, sk);
ilb->count--;

Просмотреть файл

@ -960,6 +960,15 @@ static struct ctl_table ipv4_net_table[] = {
.proc_handler = proc_dou8vec_minmax,
},
#endif
{
.procname = "tcp_migrate_req",
.data = &init_net.ipv4.sysctl_tcp_migrate_req,
.maxlen = sizeof(u8),
.mode = 0644,
.proc_handler = proc_dou8vec_minmax,
.extra1 = SYSCTL_ZERO,
.extra2 = SYSCTL_ONE
},
{
.procname = "tcp_reordering",
.data = &init_net.ipv4.sysctl_tcp_reordering,

Просмотреть файл

@ -2002,13 +2002,21 @@ process:
goto csum_error;
}
if (unlikely(sk->sk_state != TCP_LISTEN)) {
nsk = reuseport_migrate_sock(sk, req_to_sk(req), skb);
if (!nsk) {
inet_csk_reqsk_queue_drop_and_put(sk, req);
goto lookup;
}
sk = nsk;
/* reuseport_migrate_sock() has already held one sk_refcnt
* before returning.
*/
} else {
/* We own a reference on the listener, increase it again
* as we might lose it too soon.
*/
sock_hold(sk);
}
refcounted = true;
nsk = NULL;
if (!tcp_filter(sk, skb)) {

Просмотреть файл

@ -775,8 +775,8 @@ struct sock *tcp_check_req(struct sock *sk, struct sk_buff *skb,
goto listen_overflow;
if (own_req && rsk_drop_req(req)) {
reqsk_queue_removed(&inet_csk(sk)->icsk_accept_queue, req);
inet_csk_reqsk_queue_drop_and_put(sk, req);
reqsk_queue_removed(&inet_csk(req->rsk_listener)->icsk_accept_queue, req);
inet_csk_reqsk_queue_drop_and_put(req->rsk_listener, req);
return child;
}

Просмотреть файл

@ -1664,10 +1664,18 @@ process:
goto csum_error;
}
if (unlikely(sk->sk_state != TCP_LISTEN)) {
nsk = reuseport_migrate_sock(sk, req_to_sk(req), skb);
if (!nsk) {
inet_csk_reqsk_queue_drop_and_put(sk, req);
goto lookup;
}
sk = nsk;
/* reuseport_migrate_sock() has already held one sk_refcnt
* before returning.
*/
} else {
sock_hold(sk);
}
refcounted = true;
nsk = NULL;
if (!tcp_filter(sk, skb)) {

Просмотреть файл

@ -994,6 +994,8 @@ enum bpf_attach_type {
BPF_SK_LOOKUP,
BPF_XDP,
BPF_SK_SKB_VERDICT,
BPF_SK_REUSEPORT_SELECT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE,
__MAX_BPF_ATTACH_TYPE
};
@ -5416,6 +5418,20 @@ struct sk_reuseport_md {
__u32 ip_protocol; /* IP protocol. e.g. IPPROTO_TCP, IPPROTO_UDP */
__u32 bind_inany; /* Is sock bound to an INANY address? */
__u32 hash; /* A hash of the packet 4 tuples */
/* When reuse->migrating_sk is NULL, it is selecting a sk for the
* new incoming connection request (e.g. selecting a listen sk for
* the received SYN in the TCP case). reuse->sk is one of the sk
* in the reuseport group. The bpf prog can use reuse->sk to learn
* the local listening ip/port without looking into the skb.
*
* When reuse->migrating_sk is not NULL, reuse->sk is closed and
* reuse->migrating_sk is the socket that needs to be migrated
* to another listening socket. migrating_sk could be a fullsock
* sk that is fully established or a reqsk that is in-the-middle
* of 3-way handshake.
*/
__bpf_md_ptr(struct bpf_sock *, sk);
__bpf_md_ptr(struct bpf_sock *, migrating_sk);
};
#define BPF_TAG_SIZE 8

Просмотреть файл

@ -9075,7 +9075,10 @@ static struct bpf_link *attach_iter(const struct bpf_sec_def *sec,
static const struct bpf_sec_def section_defs[] = {
BPF_PROG_SEC("socket", BPF_PROG_TYPE_SOCKET_FILTER),
BPF_PROG_SEC("sk_reuseport", BPF_PROG_TYPE_SK_REUSEPORT),
BPF_EAPROG_SEC("sk_reuseport/migrate", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE),
BPF_EAPROG_SEC("sk_reuseport", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT),
SEC_DEF("kprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uprobe/", BPF_PROG_TYPE_KPROBE),

Просмотреть файл

@ -40,7 +40,7 @@ struct ipv6_packet pkt_v6 = {
.tcp.doff = 5,
};
static int settimeo(int fd, int timeout_ms)
int settimeo(int fd, int timeout_ms)
{
struct timeval timeout = { .tv_sec = 3 };

Просмотреть файл

@ -33,6 +33,7 @@ struct ipv6_packet {
} __packed;
extern struct ipv6_packet pkt_v6;
int settimeo(int fd, int timeout_ms);
int start_server(int family, int type, const char *addr, __u16 port,
int timeout_ms);
int connect_to_fd(int server_fd, int timeout_ms);

Просмотреть файл

@ -0,0 +1,555 @@
// SPDX-License-Identifier: GPL-2.0
/*
* Check if we can migrate child sockets.
*
* 1. call listen() for 4 server sockets.
* 2. call connect() for 25 client sockets.
* 3. call listen() for 1 server socket. (migration target)
* 4. update a map to migrate all child sockets
* to the last server socket (migrate_map[cookie] = 4)
* 5. call shutdown() for first 4 server sockets
* and migrate the requests in the accept queue
* to the last server socket.
* 6. call listen() for the second server socket.
* 7. call shutdown() for the last server
* and migrate the requests in the accept queue
* to the second server socket.
* 8. call listen() for the last server.
* 9. call shutdown() for the second server
* and migrate the requests in the accept queue
* to the last server socket.
* 10. call accept() for the last server socket.
*
* Author: Kuniyuki Iwashima <kuniyu@amazon.co.jp>
*/
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
#include "test_progs.h"
#include "test_migrate_reuseport.skel.h"
#include "network_helpers.h"
#define IFINDEX_LO 1
#define NR_SERVERS 5
#define NR_CLIENTS (NR_SERVERS * 5)
#define MIGRATED_TO (NR_SERVERS - 1)
/* fastopenq->max_qlen and sk->sk_max_ack_backlog */
#define QLEN (NR_CLIENTS * 5)
#define MSG "Hello World\0"
#define MSGLEN 12
static struct migrate_reuseport_test_case {
const char *name;
__s64 servers[NR_SERVERS];
__s64 clients[NR_CLIENTS];
struct sockaddr_storage addr;
socklen_t addrlen;
int family;
int state;
bool drop_ack;
bool expire_synack_timer;
bool fastopen;
struct bpf_link *link;
} test_cases[] = {
{
.name = "IPv4 TCP_ESTABLISHED inet_csk_listen_stop",
.family = AF_INET,
.state = BPF_TCP_ESTABLISHED,
.drop_ack = false,
.expire_synack_timer = false,
.fastopen = false,
},
{
.name = "IPv4 TCP_SYN_RECV inet_csk_listen_stop",
.family = AF_INET,
.state = BPF_TCP_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = false,
.fastopen = true,
},
{
.name = "IPv4 TCP_NEW_SYN_RECV reqsk_timer_handler",
.family = AF_INET,
.state = BPF_TCP_NEW_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = true,
.fastopen = false,
},
{
.name = "IPv4 TCP_NEW_SYN_RECV inet_csk_complete_hashdance",
.family = AF_INET,
.state = BPF_TCP_NEW_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = false,
.fastopen = false,
},
{
.name = "IPv6 TCP_ESTABLISHED inet_csk_listen_stop",
.family = AF_INET6,
.state = BPF_TCP_ESTABLISHED,
.drop_ack = false,
.expire_synack_timer = false,
.fastopen = false,
},
{
.name = "IPv6 TCP_SYN_RECV inet_csk_listen_stop",
.family = AF_INET6,
.state = BPF_TCP_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = false,
.fastopen = true,
},
{
.name = "IPv6 TCP_NEW_SYN_RECV reqsk_timer_handler",
.family = AF_INET6,
.state = BPF_TCP_NEW_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = true,
.fastopen = false,
},
{
.name = "IPv6 TCP_NEW_SYN_RECV inet_csk_complete_hashdance",
.family = AF_INET6,
.state = BPF_TCP_NEW_SYN_RECV,
.drop_ack = true,
.expire_synack_timer = false,
.fastopen = false,
}
};
static void init_fds(__s64 fds[], int len)
{
int i;
for (i = 0; i < len; i++)
fds[i] = -1;
}
static void close_fds(__s64 fds[], int len)
{
int i;
for (i = 0; i < len; i++) {
if (fds[i] != -1) {
close(fds[i]);
fds[i] = -1;
}
}
}
static int setup_fastopen(char *buf, int size, int *saved_len, bool restore)
{
int err = 0, fd, len;
fd = open("/proc/sys/net/ipv4/tcp_fastopen", O_RDWR);
if (!ASSERT_NEQ(fd, -1, "open"))
return -1;
if (restore) {
len = write(fd, buf, *saved_len);
if (!ASSERT_EQ(len, *saved_len, "write - restore"))
err = -1;
} else {
*saved_len = read(fd, buf, size);
if (!ASSERT_GE(*saved_len, 1, "read")) {
err = -1;
goto close;
}
err = lseek(fd, 0, SEEK_SET);
if (!ASSERT_OK(err, "lseek"))
goto close;
/* (TFO_CLIENT_ENABLE | TFO_SERVER_ENABLE |
* TFO_CLIENT_NO_COOKIE | TFO_SERVER_COOKIE_NOT_REQD)
*/
len = write(fd, "519", 3);
if (!ASSERT_EQ(len, 3, "write - setup"))
err = -1;
}
close:
close(fd);
return err;
}
static int drop_ack(struct migrate_reuseport_test_case *test_case,
struct test_migrate_reuseport *skel)
{
if (test_case->family == AF_INET)
skel->bss->server_port = ((struct sockaddr_in *)
&test_case->addr)->sin_port;
else
skel->bss->server_port = ((struct sockaddr_in6 *)
&test_case->addr)->sin6_port;
test_case->link = bpf_program__attach_xdp(skel->progs.drop_ack,
IFINDEX_LO);
if (!ASSERT_OK_PTR(test_case->link, "bpf_program__attach_xdp"))
return -1;
return 0;
}
static int pass_ack(struct migrate_reuseport_test_case *test_case)
{
int err;
err = bpf_link__detach(test_case->link);
if (!ASSERT_OK(err, "bpf_link__detach"))
return -1;
test_case->link = NULL;
return 0;
}
static int start_servers(struct migrate_reuseport_test_case *test_case,
struct test_migrate_reuseport *skel)
{
int i, err, prog_fd, reuseport = 1, qlen = QLEN;
prog_fd = bpf_program__fd(skel->progs.migrate_reuseport);
make_sockaddr(test_case->family,
test_case->family == AF_INET ? "127.0.0.1" : "::1", 0,
&test_case->addr, &test_case->addrlen);
for (i = 0; i < NR_SERVERS; i++) {
test_case->servers[i] = socket(test_case->family, SOCK_STREAM,
IPPROTO_TCP);
if (!ASSERT_NEQ(test_case->servers[i], -1, "socket"))
return -1;
err = setsockopt(test_case->servers[i], SOL_SOCKET,
SO_REUSEPORT, &reuseport, sizeof(reuseport));
if (!ASSERT_OK(err, "setsockopt - SO_REUSEPORT"))
return -1;
err = bind(test_case->servers[i],
(struct sockaddr *)&test_case->addr,
test_case->addrlen);
if (!ASSERT_OK(err, "bind"))
return -1;
if (i == 0) {
err = setsockopt(test_case->servers[i], SOL_SOCKET,
SO_ATTACH_REUSEPORT_EBPF,
&prog_fd, sizeof(prog_fd));
if (!ASSERT_OK(err,
"setsockopt - SO_ATTACH_REUSEPORT_EBPF"))
return -1;
err = getsockname(test_case->servers[i],
(struct sockaddr *)&test_case->addr,
&test_case->addrlen);
if (!ASSERT_OK(err, "getsockname"))
return -1;
}
if (test_case->fastopen) {
err = setsockopt(test_case->servers[i],
SOL_TCP, TCP_FASTOPEN,
&qlen, sizeof(qlen));
if (!ASSERT_OK(err, "setsockopt - TCP_FASTOPEN"))
return -1;
}
/* All requests will be tied to the first four listeners */
if (i != MIGRATED_TO) {
err = listen(test_case->servers[i], qlen);
if (!ASSERT_OK(err, "listen"))
return -1;
}
}
return 0;
}
static int start_clients(struct migrate_reuseport_test_case *test_case)
{
char buf[MSGLEN] = MSG;
int i, err;
for (i = 0; i < NR_CLIENTS; i++) {
test_case->clients[i] = socket(test_case->family, SOCK_STREAM,
IPPROTO_TCP);
if (!ASSERT_NEQ(test_case->clients[i], -1, "socket"))
return -1;
/* The attached XDP program drops only the final ACK, so
* clients will transition to TCP_ESTABLISHED immediately.
*/
err = settimeo(test_case->clients[i], 100);
if (!ASSERT_OK(err, "settimeo"))
return -1;
if (test_case->fastopen) {
int fastopen = 1;
err = setsockopt(test_case->clients[i], IPPROTO_TCP,
TCP_FASTOPEN_CONNECT, &fastopen,
sizeof(fastopen));
if (!ASSERT_OK(err,
"setsockopt - TCP_FASTOPEN_CONNECT"))
return -1;
}
err = connect(test_case->clients[i],
(struct sockaddr *)&test_case->addr,
test_case->addrlen);
if (!ASSERT_OK(err, "connect"))
return -1;
err = write(test_case->clients[i], buf, MSGLEN);
if (!ASSERT_EQ(err, MSGLEN, "write"))
return -1;
}
return 0;
}
static int update_maps(struct migrate_reuseport_test_case *test_case,
struct test_migrate_reuseport *skel)
{
int i, err, migrated_to = MIGRATED_TO;
int reuseport_map_fd, migrate_map_fd;
__u64 value;
reuseport_map_fd = bpf_map__fd(skel->maps.reuseport_map);
migrate_map_fd = bpf_map__fd(skel->maps.migrate_map);
for (i = 0; i < NR_SERVERS; i++) {
value = (__u64)test_case->servers[i];
err = bpf_map_update_elem(reuseport_map_fd, &i, &value,
BPF_NOEXIST);
if (!ASSERT_OK(err, "bpf_map_update_elem - reuseport_map"))
return -1;
err = bpf_map_lookup_elem(reuseport_map_fd, &i, &value);
if (!ASSERT_OK(err, "bpf_map_lookup_elem - reuseport_map"))
return -1;
err = bpf_map_update_elem(migrate_map_fd, &value, &migrated_to,
BPF_NOEXIST);
if (!ASSERT_OK(err, "bpf_map_update_elem - migrate_map"))
return -1;
}
return 0;
}
static int migrate_dance(struct migrate_reuseport_test_case *test_case)
{
int i, err;
/* Migrate TCP_ESTABLISHED and TCP_SYN_RECV requests
* to the last listener based on eBPF.
*/
for (i = 0; i < MIGRATED_TO; i++) {
err = shutdown(test_case->servers[i], SHUT_RDWR);
if (!ASSERT_OK(err, "shutdown"))
return -1;
}
/* No dance for TCP_NEW_SYN_RECV to migrate based on eBPF */
if (test_case->state == BPF_TCP_NEW_SYN_RECV)
return 0;
/* Note that we use the second listener instead of the
* first one here.
*
* The fist listener is bind()ed with port 0 and,
* SOCK_BINDPORT_LOCK is not set to sk_userlocks, so
* calling listen() again will bind() the first listener
* on a new ephemeral port and detach it from the existing
* reuseport group. (See: __inet_bind(), tcp_set_state())
*
* OTOH, the second one is bind()ed with a specific port,
* and SOCK_BINDPORT_LOCK is set. Thus, re-listen() will
* resurrect the listener on the existing reuseport group.
*/
err = listen(test_case->servers[1], QLEN);
if (!ASSERT_OK(err, "listen"))
return -1;
/* Migrate from the last listener to the second one.
*
* All listeners were detached out of the reuseport_map,
* so migration will be done by kernel random pick from here.
*/
err = shutdown(test_case->servers[MIGRATED_TO], SHUT_RDWR);
if (!ASSERT_OK(err, "shutdown"))
return -1;
/* Back to the existing reuseport group */
err = listen(test_case->servers[MIGRATED_TO], QLEN);
if (!ASSERT_OK(err, "listen"))
return -1;
/* Migrate back to the last one from the second one */
err = shutdown(test_case->servers[1], SHUT_RDWR);
if (!ASSERT_OK(err, "shutdown"))
return -1;
return 0;
}
static void count_requests(struct migrate_reuseport_test_case *test_case,
struct test_migrate_reuseport *skel)
{
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
int err, cnt = 0, client;
char buf[MSGLEN];
err = settimeo(test_case->servers[MIGRATED_TO], 4000);
if (!ASSERT_OK(err, "settimeo"))
goto out;
for (; cnt < NR_CLIENTS; cnt++) {
client = accept(test_case->servers[MIGRATED_TO],
(struct sockaddr *)&addr, &len);
if (!ASSERT_NEQ(client, -1, "accept"))
goto out;
memset(buf, 0, MSGLEN);
read(client, &buf, MSGLEN);
close(client);
if (!ASSERT_STREQ(buf, MSG, "read"))
goto out;
}
out:
ASSERT_EQ(cnt, NR_CLIENTS, "count in userspace");
switch (test_case->state) {
case BPF_TCP_ESTABLISHED:
cnt = skel->bss->migrated_at_close;
break;
case BPF_TCP_SYN_RECV:
cnt = skel->bss->migrated_at_close_fastopen;
break;
case BPF_TCP_NEW_SYN_RECV:
if (test_case->expire_synack_timer)
cnt = skel->bss->migrated_at_send_synack;
else
cnt = skel->bss->migrated_at_recv_ack;
break;
default:
cnt = 0;
}
ASSERT_EQ(cnt, NR_CLIENTS, "count in BPF prog");
}
static void run_test(struct migrate_reuseport_test_case *test_case,
struct test_migrate_reuseport *skel)
{
int err, saved_len;
char buf[16];
skel->bss->migrated_at_close = 0;
skel->bss->migrated_at_close_fastopen = 0;
skel->bss->migrated_at_send_synack = 0;
skel->bss->migrated_at_recv_ack = 0;
init_fds(test_case->servers, NR_SERVERS);
init_fds(test_case->clients, NR_CLIENTS);
if (test_case->fastopen) {
memset(buf, 0, sizeof(buf));
err = setup_fastopen(buf, sizeof(buf), &saved_len, false);
if (!ASSERT_OK(err, "setup_fastopen - setup"))
return;
}
err = start_servers(test_case, skel);
if (!ASSERT_OK(err, "start_servers"))
goto close_servers;
if (test_case->drop_ack) {
/* Drop the final ACK of the 3-way handshake and stick the
* in-flight requests on TCP_SYN_RECV or TCP_NEW_SYN_RECV.
*/
err = drop_ack(test_case, skel);
if (!ASSERT_OK(err, "drop_ack"))
goto close_servers;
}
/* Tie requests to the first four listners */
err = start_clients(test_case);
if (!ASSERT_OK(err, "start_clients"))
goto close_clients;
err = listen(test_case->servers[MIGRATED_TO], QLEN);
if (!ASSERT_OK(err, "listen"))
goto close_clients;
err = update_maps(test_case, skel);
if (!ASSERT_OK(err, "fill_maps"))
goto close_clients;
/* Migrate the requests in the accept queue only.
* TCP_NEW_SYN_RECV requests are not migrated at this point.
*/
err = migrate_dance(test_case);
if (!ASSERT_OK(err, "migrate_dance"))
goto close_clients;
if (test_case->expire_synack_timer) {
/* Wait for SYN+ACK timers to expire so that
* reqsk_timer_handler() migrates TCP_NEW_SYN_RECV requests.
*/
sleep(1);
}
if (test_case->link) {
/* Resume 3WHS and migrate TCP_NEW_SYN_RECV requests */
err = pass_ack(test_case);
if (!ASSERT_OK(err, "pass_ack"))
goto close_clients;
}
count_requests(test_case, skel);
close_clients:
close_fds(test_case->clients, NR_CLIENTS);
if (test_case->link) {
err = pass_ack(test_case);
ASSERT_OK(err, "pass_ack - clean up");
}
close_servers:
close_fds(test_case->servers, NR_SERVERS);
if (test_case->fastopen) {
err = setup_fastopen(buf, sizeof(buf), &saved_len, true);
ASSERT_OK(err, "setup_fastopen - restore");
}
}
void test_migrate_reuseport(void)
{
struct test_migrate_reuseport *skel;
int i;
skel = test_migrate_reuseport__open_and_load();
if (!ASSERT_OK_PTR(skel, "open_and_load"))
return;
for (i = 0; i < ARRAY_SIZE(test_cases); i++) {
test__start_subtest(test_cases[i].name);
run_test(&test_cases[i], skel);
}
test_migrate_reuseport__destroy(skel);
}

Просмотреть файл

@ -0,0 +1,135 @@
// SPDX-License-Identifier: GPL-2.0
/*
* Check if we can migrate child sockets.
*
* 1. If reuse_md->migrating_sk is NULL (SYN packet),
* return SK_PASS without selecting a listener.
* 2. If reuse_md->migrating_sk is not NULL (socket migration),
* select a listener (reuseport_map[migrate_map[cookie]])
*
* Author: Kuniyuki Iwashima <kuniyu@amazon.co.jp>
*/
#include <stddef.h>
#include <string.h>
#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/ipv6.h>
#include <linux/tcp.h>
#include <linux/in.h>
#include <bpf/bpf_endian.h>
#include <bpf/bpf_helpers.h>
struct {
__uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY);
__uint(max_entries, 256);
__type(key, int);
__type(value, __u64);
} reuseport_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 256);
__type(key, __u64);
__type(value, int);
} migrate_map SEC(".maps");
int migrated_at_close = 0;
int migrated_at_close_fastopen = 0;
int migrated_at_send_synack = 0;
int migrated_at_recv_ack = 0;
__be16 server_port;
SEC("xdp")
int drop_ack(struct xdp_md *xdp)
{
void *data_end = (void *)(long)xdp->data_end;
void *data = (void *)(long)xdp->data;
struct ethhdr *eth = data;
struct tcphdr *tcp = NULL;
if (eth + 1 > data_end)
goto pass;
switch (bpf_ntohs(eth->h_proto)) {
case ETH_P_IP: {
struct iphdr *ip = (struct iphdr *)(eth + 1);
if (ip + 1 > data_end)
goto pass;
if (ip->protocol != IPPROTO_TCP)
goto pass;
tcp = (struct tcphdr *)((void *)ip + ip->ihl * 4);
break;
}
case ETH_P_IPV6: {
struct ipv6hdr *ipv6 = (struct ipv6hdr *)(eth + 1);
if (ipv6 + 1 > data_end)
goto pass;
if (ipv6->nexthdr != IPPROTO_TCP)
goto pass;
tcp = (struct tcphdr *)(ipv6 + 1);
break;
}
default:
goto pass;
}
if (tcp + 1 > data_end)
goto pass;
if (tcp->dest != server_port)
goto pass;
if (!tcp->syn && tcp->ack)
return XDP_DROP;
pass:
return XDP_PASS;
}
SEC("sk_reuseport/migrate")
int migrate_reuseport(struct sk_reuseport_md *reuse_md)
{
int *key, flags = 0, state, err;
__u64 cookie;
if (!reuse_md->migrating_sk)
return SK_PASS;
state = reuse_md->migrating_sk->state;
cookie = bpf_get_socket_cookie(reuse_md->sk);
key = bpf_map_lookup_elem(&migrate_map, &cookie);
if (!key)
return SK_DROP;
err = bpf_sk_select_reuseport(reuse_md, &reuseport_map, key, flags);
if (err)
return SK_PASS;
switch (state) {
case BPF_TCP_ESTABLISHED:
__sync_fetch_and_add(&migrated_at_close, 1);
break;
case BPF_TCP_SYN_RECV:
__sync_fetch_and_add(&migrated_at_close_fastopen, 1);
break;
case BPF_TCP_NEW_SYN_RECV:
if (!reuse_md->len)
__sync_fetch_and_add(&migrated_at_send_synack, 1);
else
__sync_fetch_and_add(&migrated_at_recv_ack, 1);
break;
}
return SK_PASS;
}
char _license[] SEC("license") = "GPL";