Add Connection::events() which returns a list of things that happened since
the last time it was called.

Add hooks to record said things when they happen.

Add get_{send,recv}_stream_mut() to work nicely with event-based handling.

SendStream::mark_as_acked() needed some fleshing out to properly
transition into DataRecvd, and set events correctly.
This commit is contained in:
Andy Grover 2019-04-02 09:50:30 -07:00
Родитель e5f02d1d27
Коммит 54c15c31a4
5 изменённых файлов: 184 добавлений и 21 удалений

Просмотреть файл

@ -1,4 +1,4 @@
Add stream events to transport API (readable, writable, new, closed) // grover
Max streams tracking/checking // grover
Use stream events in h3 // grover or dragana?
harmonize our rust usage:
- use foo::* or use foo::{bar, baz} and ordering/grouping

Просмотреть файл

@ -2,7 +2,7 @@
use log::Level;
use std::cell::RefCell;
use std::cmp::{max, min};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::{self, Debug};
use std::mem;
use std::net::SocketAddr;
@ -107,6 +107,67 @@ pub enum TxMode {
Pto,
}
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum ConnectionEvent {
/// A new recv stream has been opened by the peer and is available for
/// reading.
NewRecvStream { stream_id: u64 },
/// Space available in the buffer for an application write to succeed.
SendStreamWritable { stream_id: u64 },
/// New bytes available for reading.
RecvStreamReadable { stream_id: u64 },
/// Peer reset the stream.
RecvStreamReset { stream_id: u64, app_error: AppError },
/// Peer has acked everything sent on the stream.
SendStreamComplete { stream_id: u64 },
/// Peer increased MAX_STREAMS
SendStreamCreatable { stream_type: StreamType },
// TODO(agrover@mozilla.com): Are there more?
}
#[derive(Debug, Default)]
pub struct ConnectionEvents {
events: BTreeSet<ConnectionEvent>,
}
impl ConnectionEvents {
pub fn new_recv_stream(&mut self, stream_id: u64) {
self.events
.insert(ConnectionEvent::NewRecvStream { stream_id });
}
pub fn send_stream_writable(&mut self, stream_id: u64) {
self.events
.insert(ConnectionEvent::SendStreamWritable { stream_id });
}
pub fn recv_stream_readable(&mut self, stream_id: u64) {
self.events
.insert(ConnectionEvent::RecvStreamReadable { stream_id });
}
pub fn recv_stream_reset(&mut self, stream_id: u64, app_error: AppError) {
self.events.insert(ConnectionEvent::RecvStreamReset {
stream_id,
app_error,
});
}
pub fn send_stream_complete(&mut self, stream_id: u64) {
self.events
.insert(ConnectionEvent::SendStreamComplete { stream_id });
}
pub fn send_stream_creatable(&mut self, stream_type: StreamType) {
self.events
.insert(ConnectionEvent::SendStreamCreatable { stream_type });
}
fn events(&mut self) -> BTreeSet<ConnectionEvent> {
mem::replace(&mut self.events, BTreeSet::new())
}
}
#[derive(Debug, Default)]
pub struct FlowMgr {
from_send_streams: BTreeMap<u64, Frame>, // key: stream_id
@ -337,6 +398,7 @@ pub struct Connection {
pmtu: usize,
flow_mgr: Rc<RefCell<FlowMgr>>,
loss_recovery: LossRecovery,
events: Rc<RefCell<ConnectionEvents>>,
}
impl Debug for Connection {
@ -462,6 +524,7 @@ impl Connection {
pmtu: 1280,
flow_mgr: Rc::new(RefCell::new(FlowMgr::default())),
loss_recovery: LossRecovery::new(),
events: Rc::new(RefCell::new(ConnectionEvents::default())),
};
c.scid = c.generate_cid();
@ -1223,7 +1286,12 @@ impl Connection {
self.recv_streams.insert(
stream_id,
RecvStream::new(stream_id, max_data_if_new_stream, self.flow_mgr.clone()),
RecvStream::new(
stream_id,
max_data_if_new_stream,
self.flow_mgr.clone(),
self.events.clone(),
),
);
// If this is bidirectional, insert a send stream.
@ -1241,6 +1309,7 @@ impl Connection {
stream_id,
send_initial_max_stream_data,
self.flow_mgr.clone(),
self.events.clone(),
),
);
}
@ -1285,7 +1354,12 @@ impl Connection {
self.send_streams.insert(
new_id,
SendStream::new(new_id, initial_max_stream_data, self.flow_mgr.clone()),
SendStream::new(
new_id,
initial_max_stream_data,
self.flow_mgr.clone(),
self.events.clone(),
),
);
new_id
}
@ -1302,7 +1376,12 @@ impl Connection {
self.send_streams.insert(
new_id,
SendStream::new(new_id, send_initial_max_stream_data, self.flow_mgr.clone()),
SendStream::new(
new_id,
send_initial_max_stream_data,
self.flow_mgr.clone(),
self.events.clone(),
),
);
let recv_initial_max_stream_data = self
@ -1313,7 +1392,12 @@ impl Connection {
self.recv_streams.insert(
new_id,
RecvStream::new(new_id, recv_initial_max_stream_data, self.flow_mgr.clone()),
RecvStream::new(
new_id,
recv_initial_max_stream_data,
self.flow_mgr.clone(),
self.events.clone(),
),
);
new_id
}
@ -1390,6 +1474,23 @@ impl Connection {
.filter(|(_, stream)| stream.send_data_ready())
}
pub fn get_recv_stream_mut(&mut self, stream_id: u64) -> Option<&mut Recvable> {
self.recv_streams
.get_mut(&stream_id)
.map(|rs| rs as &mut Recvable)
}
pub fn get_send_stream_mut(&mut self, stream_id: u64) -> Option<&mut Sendable> {
self.send_streams
.get_mut(&stream_id)
.map(|rs| rs as &mut Sendable)
}
pub fn events(&mut self) -> Vec<ConnectionEvent> {
// Turn it into a vec for simplicity's sake
self.events.borrow_mut().events().into_iter().collect()
}
fn check_loss_detection_timeout(&mut self, cur_time: u64) {
qdebug!(self, "check_loss_detection_timeout");
let (mut lost_packets, retransmit_unacked_crypto, send_one_or_two_packets) =

Просмотреть файл

@ -35,7 +35,7 @@ const STREAM_FRAME_BIT_FIN: u64 = 0x01;
const STREAM_FRAME_BIT_LEN: u64 = 0x02;
const STREAM_FRAME_BIT_OFF: u64 = 0x04;
#[derive(PartialEq, Debug, Copy, Clone)]
#[derive(PartialEq, Debug, Copy, Clone, PartialOrd, Eq, Ord)]
pub enum StreamType {
BiDi,
UniDi,

Просмотреть файл

@ -5,7 +5,7 @@ use std::fmt::Debug;
use std::mem;
use std::rc::Rc;
use crate::connection::FlowMgr;
use crate::connection::{ConnectionEvents, FlowMgr};
use crate::{AppError, Error, Res};
pub const RX_STREAM_DATA_WINDOW: u64 = 0xFFFF; // 64 KiB
@ -333,14 +333,22 @@ pub struct RecvStream {
stream_id: u64,
state: RecvStreamState,
flow_mgr: Rc<RefCell<FlowMgr>>,
conn_events: Rc<RefCell<ConnectionEvents>>,
}
impl RecvStream {
pub fn new(stream_id: u64, max_stream_data: u64, flow_mgr: Rc<RefCell<FlowMgr>>) -> RecvStream {
pub fn new(
stream_id: u64,
max_stream_data: u64,
flow_mgr: Rc<RefCell<FlowMgr>>,
conn_events: Rc<RefCell<ConnectionEvents>>,
) -> RecvStream {
conn_events.borrow_mut().new_recv_stream(stream_id);
RecvStream {
stream_id,
state: RecvStreamState::new(max_stream_data),
flow_mgr,
conn_events,
}
}
@ -410,11 +418,20 @@ impl RecvStream {
qtrace!("data received when we are in state {}", self.state.name())
}
}
if self.data_ready() {
self.conn_events
.borrow_mut()
.recv_stream_readable(self.stream_id)
}
Ok(())
}
pub fn reset(&mut self, _application_error_code: AppError) {
// TODO(agrover@mozilla.com): queue an event to application
pub fn reset(&mut self, application_error_code: AppError) {
self.conn_events
.borrow_mut()
.recv_stream_reset(self.stream_id, application_error_code);
self.state.transition(RecvStreamState::ResetRecvd)
}
@ -494,8 +511,9 @@ mod tests {
#[test]
fn test_stream_rx() {
let flow_mgr = Rc::new(RefCell::new(FlowMgr::default()));
let conn_events = Rc::new(RefCell::new(ConnectionEvents::default()));
let mut s = RecvStream::new(567, 1024, flow_mgr.clone());
let mut s = RecvStream::new(567, 1024, flow_mgr.clone(), conn_events.clone());
// test receiving a contig frame and reading it works
s.inbound_stream_frame(false, 0, vec![1; 10]).unwrap();
@ -545,8 +563,9 @@ mod tests {
#[test]
fn test_stream_rx_dedupe() {
let flow_mgr = Rc::new(RefCell::new(FlowMgr::default()));
let conn_events = Rc::new(RefCell::new(ConnectionEvents::default()));
let mut s = RecvStream::new(3, 1024, flow_mgr.clone());
let mut s = RecvStream::new(3, 1024, flow_mgr.clone(), conn_events.clone());
let mut buf = vec![0u8; 100];
@ -633,10 +652,16 @@ mod tests {
#[test]
fn test_stream_flowc_update() {
let flow_mgr = Rc::new(RefCell::new(FlowMgr::default()));
let conn_events = Rc::new(RefCell::new(ConnectionEvents::default()));
let frame1 = vec![0; RX_STREAM_DATA_WINDOW as usize];
let mut s = RecvStream::new(4, RX_STREAM_DATA_WINDOW, flow_mgr.clone());
let mut s = RecvStream::new(
4,
RX_STREAM_DATA_WINDOW,
flow_mgr.clone(),
conn_events.clone(),
);
let mut buf = vec![0u8; RX_STREAM_DATA_WINDOW as usize * 4]; // Make it overlarge
@ -663,10 +688,16 @@ mod tests {
#[test]
fn test_stream_max_stream_data() {
let flow_mgr = Rc::new(RefCell::new(FlowMgr::default()));
let conn_events = Rc::new(RefCell::new(ConnectionEvents::default()));
let frame1 = vec![0; RX_STREAM_DATA_WINDOW as usize];
let mut s = RecvStream::new(67, RX_STREAM_DATA_WINDOW, flow_mgr.clone());
let mut s = RecvStream::new(
67,
RX_STREAM_DATA_WINDOW,
flow_mgr.clone(),
conn_events.clone(),
);
s.maybe_send_flowc_update();
assert_eq!(s.flow_mgr.borrow().peek(), None);

Просмотреть файл

@ -7,7 +7,7 @@ use std::rc::Rc;
use slice_deque::SliceDeque;
use crate::connection::{FlowMgr, TxMode};
use crate::connection::{ConnectionEvents, FlowMgr, TxMode};
use crate::{AppError, Error, Res};
@ -411,15 +411,22 @@ pub struct SendStream {
state: SendStreamState,
stream_id: u64,
flow_mgr: Rc<RefCell<FlowMgr>>,
conn_events: Rc<RefCell<ConnectionEvents>>,
}
impl SendStream {
pub fn new(stream_id: u64, max_stream_data: u64, flow_mgr: Rc<RefCell<FlowMgr>>) -> SendStream {
pub fn new(
stream_id: u64,
max_stream_data: u64,
flow_mgr: Rc<RefCell<FlowMgr>>,
conn_events: Rc<RefCell<ConnectionEvents>>,
) -> SendStream {
SendStream {
stream_id,
max_stream_data,
state: SendStreamState::Ready,
flow_mgr,
conn_events,
}
}
@ -461,9 +468,31 @@ impl SendStream {
}
pub fn mark_as_acked(&mut self, offset: u64, len: usize) {
self.state
.tx_buf_mut()
.map(|buf| buf.mark_as_acked(offset, len));
match self.state {
SendStreamState::Send { ref mut send_buf } => {
send_buf.mark_as_acked(offset, len);
if send_buf.buffered() < TX_STREAM_BUFFER {
self.conn_events
.borrow_mut()
.send_stream_writable(self.stream_id)
}
}
SendStreamState::DataSent {
ref mut send_buf,
final_size,
} => {
send_buf.mark_as_acked(offset, len);
if send_buf.buffered() == 0 {
self.conn_events
.borrow_mut()
.send_stream_complete(self.stream_id);
self.state.transition(SendStreamState::DataRecvd {
final_size: final_size,
});
}
}
_ => qtrace!("mark_as_acked called from state {}", self.state.name()),
}
}
pub fn final_size(&self) -> Option<u64> {
@ -631,7 +660,9 @@ mod tests {
#[test]
fn test_stream_tx() {
let flow_mgr = Rc::new(RefCell::new(FlowMgr::default()));
let mut s = SendStream::new(4, 1024, flow_mgr.clone());
let conn_events = Rc::new(RefCell::new(ConnectionEvents::default()));
let mut s = SendStream::new(4, 1024, flow_mgr.clone(), conn_events.clone());
let res = s.send(&vec![4; 100]).unwrap();
assert_eq!(res, 100);