diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index ed0457cc99d6..c0ca7be46f7a 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, */ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); int index = hash(type); struct name_seq *seq; struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (!seq) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { @@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); struct name_seq *seq; u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&s->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index b8268c0882a7..7933fb92d852 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -84,9 +84,9 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { - u32 evt; + bool inactive; + struct tipc_event evt; struct list_head list; - struct kvec iov; }; static void tipc_recv_work(struct work_struct *work); @@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) return con; } +/* sock_data_ready - interrupt callback indicating the socket has data to read + * The queued job is launched in tipc_recv_from_sock() + */ static void sock_data_ready(struct sock *sk) { struct tipc_conn *con; @@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk) read_unlock_bh(&sk->sk_callback_lock); } +/* sock_write_space - interrupt callback after a sendmsg EAGAIN + * Indicates that there now is more is space in the send buffer + * The queued job is launched in tipc_send_to_sock() + */ static void sock_write_space(struct sock *sk) { struct tipc_conn *con; @@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } -int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, - void *buf, size_t len) +static int tipc_con_rcv_sub(struct tipc_server *srv, + struct tipc_conn *con, + struct tipc_subscr *s) { - struct tipc_subscr *s = (struct tipc_subscr *)buf; struct tipc_subscription *sub; bool status; int swap; @@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, return 0; } status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status); if (!sub) return -1; @@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, static int tipc_receive_from_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct tipc_server *srv = con->server; struct sock *sk = con->sock->sk; struct msghdr msg = {}; + struct tipc_subscr s; struct kvec iov; - void *buf; int ret; - buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); - if (!buf) { - ret = -ENOMEM; - goto out_close; - } - - iov.iov_base = buf; - iov.iov_len = s->max_rcvbuf_size; + iov.iov_base = &s; + iov.iov_len = sizeof(s); msg.msg_name = NULL; iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); - if (ret <= 0) { - kmem_cache_free(s->rcvbuf_cache, buf); - goto out_close; + if (ret == -EWOULDBLOCK) + return -EWOULDBLOCK; + if (ret > 0) { + read_lock_bh(&sk->sk_callback_lock); + ret = tipc_con_rcv_sub(srv, con, &s); + read_unlock_bh(&sk->sk_callback_lock); } - - read_lock_bh(&sk->sk_callback_lock); - ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); - read_unlock_bh(&sk->sk_callback_lock); - kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) - tipc_conn_terminate(s, con->conid); - return ret; - -out_close: - if (ret != -EWOULDBLOCK) tipc_close_conn(con); - else if (ret == 0) - /* Don't return success if we really got EOF */ - ret = -EAGAIN; return ret; } @@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s) return 0; } -static struct outqueue_entry *tipc_alloc_entry(void *data, int len) -{ - struct outqueue_entry *entry; - void *buf; - - entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); - if (!entry) - return NULL; - - buf = kmemdup(data, len, GFP_ATOMIC); - if (!buf) { - kfree(entry); - return NULL; - } - - entry->iov.iov_base = buf; - entry->iov.iov_len = len; - - return entry; -} - -static void tipc_free_entry(struct outqueue_entry *e) -{ - kfree(e->iov.iov_base); - kfree(e); -} - static void tipc_clean_outqueues(struct tipc_conn *con) { struct outqueue_entry *e, *safe; @@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con) spin_lock_bh(&con->outqueue_lock); list_for_each_entry_safe(e, safe, &con->outqueue, list) { list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); } -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len) +/* tipc_conn_queue_evt - interrupt level call from a subscription instance + * The queued job is launched in tipc_send_to_sock() + */ +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt) { struct outqueue_entry *e; struct tipc_conn *con; con = tipc_conn_lookup(s, conid); if (!con) - return -EINVAL; + return; - if (!connected(con)) { - conn_put(con); - return 0; - } + if (!connected(con)) + goto err; - e = tipc_alloc_entry(data, len); - if (!e) { - conn_put(con); - return -ENOMEM; - } - e->evt = evt; + e = kmalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + goto err; + e->inactive = (event == TIPC_SUBSCR_TIMEOUT); + memcpy(&e->evt, evt, sizeof(*evt)); spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (!queue_work(s->send_wq, &con->swork)) - conn_put(con); - return 0; -} - -void tipc_conn_terminate(struct tipc_server *s, int conid) -{ - struct tipc_conn *con; - - con = tipc_conn_lookup(s, conid); - if (con) { - tipc_close_conn(con); - conn_put(con); - } + if (queue_work(s->send_wq, &con->swork)) + return; +err: + conn_put(con); } bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, @@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, *conid = con->conid; con->sock = NULL; - rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub); if (rc < 0) tipc_close_conn(con); return !rc; @@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; + struct kvec iov; int count = 0; int ret; @@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con) while (!list_empty(queue)) { e = list_first_entry(queue, struct outqueue_entry, list); - + evt = &e->evt; spin_unlock_bh(&con->outqueue_lock); - if (e->evt == TIPC_SUBSCR_TIMEOUT) { - evt = (struct tipc_event *)e->iov.iov_base; + if (e->inactive) tipc_con_delete_sub(con, &evt->s); - } + memset(&msg, 0, sizeof(msg)); msg.msg_flags = MSG_DONTWAIT; + iov.iov_base = evt; + iov.iov_len = sizeof(*evt); + msg.msg_name = NULL; if (con->sock) { - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); + ret = kernel_sendmsg(con->sock, &msg, &iov, + 1, sizeof(*evt)); if (ret == -EWOULDBLOCK || ret == 0) { cond_resched(); goto out; } else if (ret < 0) { - goto send_err; + goto err; } } else { - evt = e->iov.iov_base; tipc_send_kern_top_evt(srv->net, evt); } @@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con) } spin_lock_bh(&con->outqueue_lock); list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); out: return; - -send_err: +err: tipc_close_conn(con); } @@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s) idr_init(&s->conn_idr); s->idr_in_use = 0; - s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, - 0, SLAB_HWCACHE_ALIGN, NULL); - if (!s->rcvbuf_cache) - return -ENOMEM; - ret = tipc_work_start(s); - if (ret < 0) { - kmem_cache_destroy(s->rcvbuf_cache); + if (ret < 0) return ret; - } + ret = tipc_open_listening_sock(s); - if (ret < 0) { + if (ret < 0) tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); - return ret; - } + return ret; } @@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s) spin_unlock_bh(&s->idr_lock); tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); idr_destroy(&s->conn_idr); } diff --git a/net/tipc/server.h b/net/tipc/server.h index fcc72321362d..2de8709b467d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -36,6 +36,7 @@ #ifndef _TIPC_SERVER_H #define _TIPC_SERVER_H +#include "core.h" #include #include #include @@ -68,7 +69,6 @@ struct tipc_server { spinlock_t idr_lock; int idr_in_use; struct net *net; - struct kmem_cache *rcvbuf_cache; struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; @@ -76,19 +76,13 @@ struct tipc_server { char name[TIPC_SERVER_NAME_LEN]; }; -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len); +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid); -/** - * tipc_conn_terminate - terminate connection with server - * - * Note: Must call it in process context since it might sleep - */ -void tipc_conn_terminate(struct tipc_server *s, int conid); int tipc_server_start(struct tipc_server *s); void tipc_server_stop(struct tipc_server *s); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c6de1452db69..c3e0b924e8c2 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap) static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, - u32 event, u32 port_ref, u32 node) + u32 event, u32 port, u32 node) { - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct kvec msg_sect; + struct tipc_event *evt = &sub->evt; + bool swap = sub->swap; if (sub->inactive) return; - msg_sect.iov_base = (void *)&sub->evt; - msg_sect.iov_len = sizeof(struct tipc_event); - sub->evt.event = htohl(event, sub->swap); - sub->evt.found_lower = htohl(found_lower, sub->swap); - sub->evt.found_upper = htohl(found_upper, sub->swap); - sub->evt.port.ref = htohl(port_ref, sub->swap); - sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, sub->conid, event, - msg_sect.iov_base, msg_sect.iov_len); + evt->event = htohl(event, swap); + evt->found_lower = htohl(found_lower, swap); + evt->found_upper = htohl(found_upper, swap); + evt->port.ref = htohl(port, swap); + evt->port.node = htohl(node, swap); + tipc_conn_queue_evt(sub->server, sub->conid, event, evt); } /** @@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t) static void tipc_subscrp_kref_release(struct kref *kref) { - struct tipc_subscription *sub = container_of(kref, - struct tipc_subscription, - kref); - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + struct tipc_subscription *sub; + struct tipc_net *tn; + + sub = container_of(kref, struct tipc_subscription, kref); + tn = tipc_net(sub->server->net); atomic_dec(&tn->subscription_count); kfree(sub); @@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -static struct tipc_subscription *tipc_subscrp_create(struct net *net, +static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap) { - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; u32 filter = htohl(s->filter, swap); @@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, } /* Initialize subscription object */ - sub->net = net; + sub->server = srv; sub->conid = conid; sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || @@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, return sub; } -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status) @@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, conid, swap); + sub = tipc_subscrp_create(srv, s, conid, swap); if (!sub) return NULL; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index cfd0cb3a1da8..64ffd32d15e6 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -49,7 +49,6 @@ struct tipc_conn; * struct tipc_subscription - TIPC network topology subscription object * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription - * @net: point to network namespace * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list @@ -58,7 +57,7 @@ struct tipc_conn; */ struct tipc_subscription { struct kref kref; - struct net *net; + struct tipc_server *server; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; @@ -69,7 +68,7 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status);