rxrpc: Pass the last Tx packet marker in the annotation buffer
When the last packet of data to be transmitted on a call is queued, tx_top is set and then the RXRPC_CALL_TX_LAST flag is set. Unfortunately, this leaves a race in the ACK processing side of things because the flag affects the interpretation of tx_top and also allows us to start receiving reply data before we've finished transmitting. To fix this, make the following changes: (1) rxrpc_queue_packet() now sets a marker in the annotation buffer instead of setting the RXRPC_CALL_TX_LAST flag. (2) rxrpc_rotate_tx_window() detects the marker and sets the flag in the same context as the routines that use it. (3) rxrpc_end_tx_phase() is simplified to just shift the call state. The Tx window must have been rotated before calling to discard the last packet. (4) rxrpc_receiving_reply() is added to handle the arrival of the first DATA packet of a reply to a client call (which is an implicit ACK of the Tx phase). (5) The last part of rxrpc_input_ack() is reordered to perform Tx rotation, then soft-ACK application and then to end the phase if we've rotated the last packet. In the event of a terminal ACK, the soft-ACK application will be skipped as nAcks should be 0. (6) rxrpc_input_ackall() now has to rotate as well as ending the phase. In addition: (7) Alter the transmit tracepoint to log the rotation of the last packet. (8) Remove the no-longer relevant queue_reqack tracepoint note. The ACK-REQUESTED packet header flag is now set as needed when we actually transmit the packet and may vary by retransmission. Signed-off-by: David Howells <dhowells@redhat.com>
This commit is contained in:
Родитель
01a88f7f6b
Коммит
70790dbe3f
|
@ -508,7 +508,9 @@ struct rxrpc_call {
|
||||||
#define RXRPC_TX_ANNO_NAK 2
|
#define RXRPC_TX_ANNO_NAK 2
|
||||||
#define RXRPC_TX_ANNO_RETRANS 3
|
#define RXRPC_TX_ANNO_RETRANS 3
|
||||||
#define RXRPC_TX_ANNO_MASK 0x03
|
#define RXRPC_TX_ANNO_MASK 0x03
|
||||||
#define RXRPC_TX_ANNO_RESENT 0x04
|
#define RXRPC_TX_ANNO_LAST 0x04
|
||||||
|
#define RXRPC_TX_ANNO_RESENT 0x08
|
||||||
|
|
||||||
#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */
|
#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */
|
||||||
#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
|
#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
|
||||||
#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
|
#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
|
||||||
|
@ -621,9 +623,10 @@ extern const char rxrpc_call_traces[rxrpc_call__nr_trace][4];
|
||||||
enum rxrpc_transmit_trace {
|
enum rxrpc_transmit_trace {
|
||||||
rxrpc_transmit_wait,
|
rxrpc_transmit_wait,
|
||||||
rxrpc_transmit_queue,
|
rxrpc_transmit_queue,
|
||||||
rxrpc_transmit_queue_reqack,
|
|
||||||
rxrpc_transmit_queue_last,
|
rxrpc_transmit_queue_last,
|
||||||
rxrpc_transmit_rotate,
|
rxrpc_transmit_rotate,
|
||||||
|
rxrpc_transmit_rotate_last,
|
||||||
|
rxrpc_transmit_await_reply,
|
||||||
rxrpc_transmit_end,
|
rxrpc_transmit_end,
|
||||||
rxrpc_transmit__nr_trace
|
rxrpc_transmit__nr_trace
|
||||||
};
|
};
|
||||||
|
|
|
@ -59,6 +59,7 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to)
|
||||||
{
|
{
|
||||||
struct sk_buff *skb, *list = NULL;
|
struct sk_buff *skb, *list = NULL;
|
||||||
int ix;
|
int ix;
|
||||||
|
u8 annotation;
|
||||||
|
|
||||||
spin_lock(&call->lock);
|
spin_lock(&call->lock);
|
||||||
|
|
||||||
|
@ -66,16 +67,22 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to)
|
||||||
call->tx_hard_ack++;
|
call->tx_hard_ack++;
|
||||||
ix = call->tx_hard_ack & RXRPC_RXTX_BUFF_MASK;
|
ix = call->tx_hard_ack & RXRPC_RXTX_BUFF_MASK;
|
||||||
skb = call->rxtx_buffer[ix];
|
skb = call->rxtx_buffer[ix];
|
||||||
|
annotation = call->rxtx_annotations[ix];
|
||||||
rxrpc_see_skb(skb, rxrpc_skb_tx_rotated);
|
rxrpc_see_skb(skb, rxrpc_skb_tx_rotated);
|
||||||
call->rxtx_buffer[ix] = NULL;
|
call->rxtx_buffer[ix] = NULL;
|
||||||
call->rxtx_annotations[ix] = 0;
|
call->rxtx_annotations[ix] = 0;
|
||||||
skb->next = list;
|
skb->next = list;
|
||||||
list = skb;
|
list = skb;
|
||||||
|
|
||||||
|
if (annotation & RXRPC_TX_ANNO_LAST)
|
||||||
|
set_bit(RXRPC_CALL_TX_LAST, &call->flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
spin_unlock(&call->lock);
|
spin_unlock(&call->lock);
|
||||||
|
|
||||||
trace_rxrpc_transmit(call, rxrpc_transmit_rotate);
|
trace_rxrpc_transmit(call, (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ?
|
||||||
|
rxrpc_transmit_rotate_last :
|
||||||
|
rxrpc_transmit_rotate));
|
||||||
wake_up(&call->waitq);
|
wake_up(&call->waitq);
|
||||||
|
|
||||||
while (list) {
|
while (list) {
|
||||||
|
@ -92,42 +99,65 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to)
|
||||||
* This occurs when we get an ACKALL packet, the first DATA packet of a reply,
|
* This occurs when we get an ACKALL packet, the first DATA packet of a reply,
|
||||||
* or a final ACK packet.
|
* or a final ACK packet.
|
||||||
*/
|
*/
|
||||||
static bool rxrpc_end_tx_phase(struct rxrpc_call *call, const char *abort_why)
|
static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
|
||||||
|
const char *abort_why)
|
||||||
{
|
{
|
||||||
_enter("");
|
|
||||||
|
|
||||||
switch (call->state) {
|
ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
|
||||||
case RXRPC_CALL_CLIENT_RECV_REPLY:
|
|
||||||
return true;
|
|
||||||
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
|
|
||||||
case RXRPC_CALL_SERVER_AWAIT_ACK:
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
rxrpc_proto_abort(abort_why, call, call->tx_top);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
rxrpc_rotate_tx_window(call, call->tx_top);
|
|
||||||
|
|
||||||
write_lock(&call->state_lock);
|
write_lock(&call->state_lock);
|
||||||
|
|
||||||
switch (call->state) {
|
switch (call->state) {
|
||||||
default:
|
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
||||||
break;
|
|
||||||
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
|
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
|
||||||
call->tx_phase = false;
|
if (reply_begun)
|
||||||
call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
|
call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
|
||||||
|
else
|
||||||
|
call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case RXRPC_CALL_SERVER_AWAIT_ACK:
|
case RXRPC_CALL_SERVER_AWAIT_ACK:
|
||||||
__rxrpc_call_completed(call);
|
__rxrpc_call_completed(call);
|
||||||
rxrpc_notify_socket(call);
|
rxrpc_notify_socket(call);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
goto bad_state;
|
||||||
}
|
}
|
||||||
|
|
||||||
write_unlock(&call->state_lock);
|
write_unlock(&call->state_lock);
|
||||||
trace_rxrpc_transmit(call, rxrpc_transmit_end);
|
if (call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) {
|
||||||
|
trace_rxrpc_transmit(call, rxrpc_transmit_await_reply);
|
||||||
|
} else {
|
||||||
|
trace_rxrpc_transmit(call, rxrpc_transmit_end);
|
||||||
|
}
|
||||||
_leave(" = ok");
|
_leave(" = ok");
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
bad_state:
|
||||||
|
write_unlock(&call->state_lock);
|
||||||
|
kdebug("end_tx %s", rxrpc_call_states[call->state]);
|
||||||
|
rxrpc_proto_abort(abort_why, call, call->tx_top);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Begin the reply reception phase of a call.
|
||||||
|
*/
|
||||||
|
static bool rxrpc_receiving_reply(struct rxrpc_call *call)
|
||||||
|
{
|
||||||
|
rxrpc_seq_t top = READ_ONCE(call->tx_top);
|
||||||
|
|
||||||
|
if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags))
|
||||||
|
rxrpc_rotate_tx_window(call, top);
|
||||||
|
if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
|
||||||
|
rxrpc_proto_abort("TXL", call, top);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!rxrpc_end_tx_phase(call, true, "ETD"))
|
||||||
|
return false;
|
||||||
|
call->tx_phase = false;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -226,8 +256,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
|
||||||
/* Received data implicitly ACKs all of the request packets we sent
|
/* Received data implicitly ACKs all of the request packets we sent
|
||||||
* when we're acting as a client.
|
* when we're acting as a client.
|
||||||
*/
|
*/
|
||||||
if (call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY &&
|
if ((call->state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
|
||||||
!rxrpc_end_tx_phase(call, "ETD"))
|
call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
|
||||||
|
!rxrpc_receiving_reply(call))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
call->ackr_prev_seq = seq;
|
call->ackr_prev_seq = seq;
|
||||||
|
@ -587,27 +618,26 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
|
||||||
}
|
}
|
||||||
call->acks_latest = sp->hdr.serial;
|
call->acks_latest = sp->hdr.serial;
|
||||||
|
|
||||||
if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) &&
|
|
||||||
hard_ack == call->tx_top) {
|
|
||||||
rxrpc_end_tx_phase(call, "ETA");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (before(hard_ack, call->tx_hard_ack) ||
|
if (before(hard_ack, call->tx_hard_ack) ||
|
||||||
after(hard_ack, call->tx_top))
|
after(hard_ack, call->tx_top))
|
||||||
return rxrpc_proto_abort("AKW", call, 0);
|
return rxrpc_proto_abort("AKW", call, 0);
|
||||||
|
if (nr_acks > call->tx_top - hard_ack)
|
||||||
|
return rxrpc_proto_abort("AKN", call, 0);
|
||||||
|
|
||||||
if (after(hard_ack, call->tx_hard_ack))
|
if (after(hard_ack, call->tx_hard_ack))
|
||||||
rxrpc_rotate_tx_window(call, hard_ack);
|
rxrpc_rotate_tx_window(call, hard_ack);
|
||||||
|
|
||||||
if (after(first_soft_ack, call->tx_top))
|
if (nr_acks > 0) {
|
||||||
return;
|
if (skb_copy_bits(skb, sp->offset, buf.acks, nr_acks) < 0)
|
||||||
|
return rxrpc_proto_abort("XSA", call, 0);
|
||||||
|
rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
|
||||||
|
rxrpc_end_tx_phase(call, false, "ETA");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (nr_acks > call->tx_top - first_soft_ack + 1)
|
|
||||||
nr_acks = first_soft_ack - call->tx_top + 1;
|
|
||||||
if (skb_copy_bits(skb, sp->offset, buf.acks, nr_acks) < 0)
|
|
||||||
return rxrpc_proto_abort("XSA", call, 0);
|
|
||||||
rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -619,7 +649,9 @@ static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
|
||||||
|
|
||||||
_proto("Rx ACKALL %%%u", sp->hdr.serial);
|
_proto("Rx ACKALL %%%u", sp->hdr.serial);
|
||||||
|
|
||||||
rxrpc_end_tx_phase(call, "ETL");
|
rxrpc_rotate_tx_window(call, call->tx_top);
|
||||||
|
if (test_bit(RXRPC_CALL_TX_LAST, &call->flags))
|
||||||
|
rxrpc_end_tx_phase(call, false, "ETL");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -155,9 +155,10 @@ const char rxrpc_client_traces[rxrpc_client__nr_trace][7] = {
|
||||||
const char rxrpc_transmit_traces[rxrpc_transmit__nr_trace][4] = {
|
const char rxrpc_transmit_traces[rxrpc_transmit__nr_trace][4] = {
|
||||||
[rxrpc_transmit_wait] = "WAI",
|
[rxrpc_transmit_wait] = "WAI",
|
||||||
[rxrpc_transmit_queue] = "QUE",
|
[rxrpc_transmit_queue] = "QUE",
|
||||||
[rxrpc_transmit_queue_reqack] = "QRA",
|
|
||||||
[rxrpc_transmit_queue_last] = "QLS",
|
[rxrpc_transmit_queue_last] = "QLS",
|
||||||
[rxrpc_transmit_rotate] = "ROT",
|
[rxrpc_transmit_rotate] = "ROT",
|
||||||
|
[rxrpc_transmit_rotate_last] = "RLS",
|
||||||
|
[rxrpc_transmit_await_reply] = "AWR",
|
||||||
[rxrpc_transmit_end] = "END",
|
[rxrpc_transmit_end] = "END",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -94,11 +94,15 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
|
||||||
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
|
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
|
||||||
rxrpc_seq_t seq = sp->hdr.seq;
|
rxrpc_seq_t seq = sp->hdr.seq;
|
||||||
int ret, ix;
|
int ret, ix;
|
||||||
|
u8 annotation = RXRPC_TX_ANNO_UNACK;
|
||||||
|
|
||||||
_net("queue skb %p [%d]", skb, seq);
|
_net("queue skb %p [%d]", skb, seq);
|
||||||
|
|
||||||
ASSERTCMP(seq, ==, call->tx_top + 1);
|
ASSERTCMP(seq, ==, call->tx_top + 1);
|
||||||
|
|
||||||
|
if (last)
|
||||||
|
annotation |= RXRPC_TX_ANNO_LAST;
|
||||||
|
|
||||||
/* We have to set the timestamp before queueing as the retransmit
|
/* We have to set the timestamp before queueing as the retransmit
|
||||||
* algorithm can see the packet as soon as we queue it.
|
* algorithm can see the packet as soon as we queue it.
|
||||||
*/
|
*/
|
||||||
|
@ -106,18 +110,14 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
|
||||||
|
|
||||||
ix = seq & RXRPC_RXTX_BUFF_MASK;
|
ix = seq & RXRPC_RXTX_BUFF_MASK;
|
||||||
rxrpc_get_skb(skb, rxrpc_skb_tx_got);
|
rxrpc_get_skb(skb, rxrpc_skb_tx_got);
|
||||||
call->rxtx_annotations[ix] = RXRPC_TX_ANNO_UNACK;
|
call->rxtx_annotations[ix] = annotation;
|
||||||
smp_wmb();
|
smp_wmb();
|
||||||
call->rxtx_buffer[ix] = skb;
|
call->rxtx_buffer[ix] = skb;
|
||||||
call->tx_top = seq;
|
call->tx_top = seq;
|
||||||
if (last) {
|
if (last)
|
||||||
set_bit(RXRPC_CALL_TX_LAST, &call->flags);
|
|
||||||
trace_rxrpc_transmit(call, rxrpc_transmit_queue_last);
|
trace_rxrpc_transmit(call, rxrpc_transmit_queue_last);
|
||||||
} else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
|
else
|
||||||
trace_rxrpc_transmit(call, rxrpc_transmit_queue_reqack);
|
|
||||||
} else {
|
|
||||||
trace_rxrpc_transmit(call, rxrpc_transmit_queue);
|
trace_rxrpc_transmit(call, rxrpc_transmit_queue);
|
||||||
}
|
|
||||||
|
|
||||||
if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
|
if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
|
||||||
_debug("________awaiting reply/ACK__________");
|
_debug("________awaiting reply/ACK__________");
|
||||||
|
|
Загрузка…
Ссылка в новой задаче