refactor(transport): reuse `now` in qlog wherever available (#2216)

* refactor(transport): reuse `now` in qlog whereever available

Instead of using `QLogStream::add_event_data_now`, which internally calls
`std::time::Instant::now()`, pass `now to
`QLogStream::add_event_data_with_instant`.

* Move regex to workspace dep

* Don't prefix now, before and after with time_

* Document preference for _with_time

---------

Co-authored-by: Lars Eggert <lars@eggert.org>
This commit is contained in:
Max Inden 2024-11-14 15:18:24 +01:00 коммит произвёл GitHub
Родитель 978aa4e8ff
Коммит fe76cdc6c2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
17 изменённых файлов: 645 добавлений и 482 удалений

Просмотреть файл

@ -34,6 +34,7 @@ enum-map = { version = "2.7", default-features = false }
log = { version = "0.4", default-features = false }
qlog = { version = "0.13", default-features = false }
quinn-udp = { version = "0.5.6", default-features = false, features = ["direct-log"] }
regex = { version = "1.9", default-features = false, features = ["unicode-perl"] }
static_assertions = { version = "1.1", default-features = false }
url = { version = "2.5", default-features = false, features = ["std"] }

Просмотреть файл

@ -39,7 +39,7 @@ neqo-transport = { path = "./../neqo-transport" }
neqo-udp = { path = "./../neqo-udp" }
qlog = { workspace = true }
quinn-udp = { workspace = true }
regex = { version = "1.9", default-features = false, features = ["unicode-perl"] }
regex = { workspace = true }
tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] }
url = { workspace = true }

Просмотреть файл

@ -29,6 +29,7 @@ windows = { version = "0.58", default-features = false, features = ["Win32_Media
[dev-dependencies]
test-fixture = { path = "../test-fixture" }
regex = { workspace = true }
[features]
ci = []

Просмотреть файл

@ -11,7 +11,7 @@ use std::{
io::BufWriter,
path::PathBuf,
rc::Rc,
time::SystemTime,
time::{Instant, SystemTime},
};
use qlog::{
@ -95,20 +95,41 @@ impl NeqoQlog {
}
/// If logging enabled, closure may generate an event to be logged.
pub fn add_event<F>(&self, f: F)
pub fn add_event_with_instant<F>(&self, f: F, now: Instant)
where
F: FnOnce() -> Option<qlog::events::Event>,
{
self.add_event_with_stream(|s| {
if let Some(evt) = f() {
s.add_event(evt)?;
s.add_event_with_instant(evt, now)?;
}
Ok(())
});
}
/// If logging enabled, closure may generate an event to be logged.
pub fn add_event_data<F>(&self, f: F)
pub fn add_event_data_with_instant<F>(&self, f: F, now: Instant)
where
F: FnOnce() -> Option<qlog::events::EventData>,
{
self.add_event_with_stream(|s| {
if let Some(ev_data) = f() {
s.add_event_data_with_instant(ev_data, now)?;
}
Ok(())
});
}
/// If logging enabled, closure may generate an event to be logged.
///
/// This function is similar to [`NeqoQlog::add_event_data_with_instant`],
/// but it does not take `now: Instant` as an input parameter. Instead, it
/// internally calls [`std::time::Instant::now`]. Prefer calling
/// [`NeqoQlog::add_event_data_with_instant`] when `now` is available, as it
/// ensures consistency with the current time, which might differ from
/// [`std::time::Instant::now`] (e.g., when using simulated time instead of
/// real time).
pub fn add_event_data_now<F>(&self, f: F)
where
F: FnOnce() -> Option<qlog::events::EventData>,
{
@ -184,7 +205,10 @@ pub fn new_trace(role: Role) -> qlog::TraceSeq {
#[cfg(test)]
mod test {
use std::time::Instant;
use qlog::events::Event;
use regex::Regex;
use test_fixture::EXPECTED_LOG_HEADER;
const EV_DATA: qlog::events::EventData =
@ -205,15 +229,14 @@ mod test {
}
#[test]
fn add_event() {
fn add_event_with_instant() {
let (log, contents) = test_fixture::new_neqo_qlog();
log.add_event(|| Some(Event::with_time(1.1, EV_DATA)));
log.add_event_with_instant(|| Some(Event::with_time(0.0, EV_DATA)), Instant::now());
assert_eq!(
contents.to_string(),
format!(
"{EXPECTED_LOG_HEADER}{e}",
e = EXPECTED_LOG_EVENT.replace("\"time\":0.0,", "\"time\":1.1,")
)
Regex::new("\"time\":[0-9].[0-9]*,")
.unwrap()
.replace(&contents.to_string(), "\"time\":0.0,"),
format!("{EXPECTED_LOG_HEADER}{EXPECTED_LOG_EVENT}"),
);
}
}

Просмотреть файл

@ -10,8 +10,11 @@ use neqo_common::qlog::NeqoQlog;
use neqo_transport::StreamId;
use qlog::events::{DataRecipient, EventData};
/// Uses [`NeqoQlog::add_event_data_now`] instead of
/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available
/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details.
pub fn h3_data_moved_up(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) {
qlog.add_event_data(|| {
qlog.add_event_data_now(|| {
let ev_data = EventData::DataMoved(qlog::events::quic::DataMoved {
stream_id: Some(stream_id.as_u64()),
offset: None,
@ -25,8 +28,11 @@ pub fn h3_data_moved_up(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) {
});
}
/// Uses [`NeqoQlog::add_event_data_now`] instead of
/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available
/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details.
pub fn h3_data_moved_down(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) {
qlog.add_event_data(|| {
qlog.add_event_data_now(|| {
let ev_data = EventData::DataMoved(qlog::events::quic::DataMoved {
stream_id: Some(stream_id.as_u64()),
offset: None,

Просмотреть файл

@ -12,8 +12,11 @@ use qlog::events::{
EventData, RawInfo,
};
/// Uses [`NeqoQlog::add_event_data_now`] instead of
/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available
/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details.
pub fn qpack_read_insert_count_increment_instruction(qlog: &NeqoQlog, increment: u64, data: &[u8]) {
qlog.add_event_data(|| {
qlog.add_event_data_now(|| {
let raw = RawInfo {
length: Some(8),
payload_length: None,

Просмотреть файл

@ -216,8 +216,8 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
}
if self.state.in_recovery() {
self.set_state(State::CongestionAvoidance);
qlog::metrics_updated(&self.qlog, &[QlogMetric::InRecovery(false)]);
self.set_state(State::CongestionAvoidance, now);
qlog::metrics_updated(&self.qlog, &[QlogMetric::InRecovery(false)], now);
}
new_acked += pkt.len();
@ -239,7 +239,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if self.congestion_window == self.ssthresh {
// This doesn't look like it is necessary, but it can happen
// after persistent congestion.
self.set_state(State::CongestionAvoidance);
self.set_state(State::CongestionAvoidance, now);
}
}
// Congestion avoidance, above the slow start threshold.
@ -276,6 +276,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::BytesInFlight(self.bytes_in_flight),
],
now,
);
qdebug!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
}
@ -287,6 +288,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
prev_largest_acked_sent: Option<Instant>,
pto: Duration,
lost_packets: &[SentPacket],
now: Instant,
) -> bool {
if lost_packets.is_empty() {
return false;
@ -306,6 +308,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
now,
);
let is_pmtud_probe = self.pmtud.is_probe_filter();
@ -320,12 +323,13 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
return false;
};
let congestion = self.on_congestion_event(last_lost_packet);
let congestion = self.on_congestion_event(last_lost_packet, now);
let persistent_congestion = self.detect_persistent_congestion(
first_rtt_sample_time,
prev_largest_acked_sent,
pto,
lost_packets.rev(),
now,
);
qdebug!(
"on_packets_lost this={:p}, bytes_in_flight={}, cwnd={}, state={:?}",
@ -341,31 +345,33 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
/// congestion event.
///
/// See <https://datatracker.ietf.org/doc/html/rfc9002#section-b.7>.
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool {
self.on_congestion_event(largest_acked_pkt)
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool {
self.on_congestion_event(largest_acked_pkt, now)
}
fn discard(&mut self, pkt: &SentPacket) {
fn discard(&mut self, pkt: &SentPacket, now: Instant) {
if pkt.cc_outstanding() {
assert!(self.bytes_in_flight >= pkt.len());
self.bytes_in_flight -= pkt.len();
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
now,
);
qtrace!([self], "Ignore pkt with size {}", pkt.len());
}
}
fn discard_in_flight(&mut self) {
fn discard_in_flight(&mut self, now: Instant) {
self.bytes_in_flight = 0;
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
now,
);
}
fn on_packet_sent(&mut self, pkt: &SentPacket) {
fn on_packet_sent(&mut self, pkt: &SentPacket, now: Instant) {
// Record the recovery time and exit any transient state.
if self.state.transient() {
self.recovery_start = Some(pkt.pn());
@ -393,6 +399,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::BytesInFlight(self.bytes_in_flight)],
now,
);
}
@ -448,23 +455,26 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
self.acked_bytes
}
fn set_state(&mut self, state: State) {
fn set_state(&mut self, state: State, now: Instant) {
if self.state != state {
qdebug!([self], "state -> {:?}", state);
let old_state = self.state;
self.qlog.add_event_data(|| {
// No need to tell qlog about exit from transient states.
if old_state.transient() {
None
} else {
let ev_data = EventData::CongestionStateUpdated(CongestionStateUpdated {
old: Some(old_state.to_qlog().to_owned()),
new: state.to_qlog().to_owned(),
trigger: None,
});
Some(ev_data)
}
});
self.qlog.add_event_data_with_instant(
|| {
// No need to tell qlog about exit from transient states.
if old_state.transient() {
None
} else {
let ev_data = EventData::CongestionStateUpdated(CongestionStateUpdated {
old: Some(old_state.to_qlog().to_owned()),
new: state.to_qlog().to_owned(),
trigger: None,
});
Some(ev_data)
}
},
now,
);
self.state = state;
}
}
@ -475,6 +485,7 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
prev_largest_acked_sent: Option<Instant>,
pto: Duration,
lost_packets: impl IntoIterator<Item = &'a SentPacket>,
now: Instant,
) -> bool {
if first_rtt_sample_time.is_none() {
return false;
@ -512,10 +523,11 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
qinfo!([self], "persistent congestion");
self.congestion_window = self.cwnd_min();
self.acked_bytes = 0;
self.set_state(State::PersistentCongestion);
self.set_state(State::PersistentCongestion, now);
qlog::metrics_updated(
&self.qlog,
&[QlogMetric::CongestionWindow(self.congestion_window)],
now,
);
return true;
}
@ -539,7 +551,7 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
/// Handle a congestion event.
/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
@ -567,8 +579,9 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
now,
);
self.set_state(State::RecoveryStart);
self.set_state(State::RecoveryStart, now);
true
}
@ -668,10 +681,10 @@ mod tests {
persistent_expected: bool,
) {
for p in lost_packets {
cc.on_packet_sent(p);
cc.on_packet_sent(p, now());
}
cc.on_packets_lost(Some(now()), None, PTO, lost_packets);
cc.on_packets_lost(Some(now()), None, PTO, lost_packets, Instant::now());
let persistent = if cc.cwnd() == reduced_cwnd {
false
@ -874,18 +887,19 @@ mod tests {
rtt_time: u32,
lost: &[SentPacket],
) -> bool {
let now = Instant::now();
assert_eq!(cc.cwnd(), cc.cwnd_initial());
let last_ack = Some(by_pto(last_ack));
let rtt_time = Some(by_pto(rtt_time));
// Persistent congestion is never declared if the RTT time is `None`.
cc.detect_persistent_congestion(None, None, PTO, lost.iter());
cc.detect_persistent_congestion(None, None, PTO, lost.iter(), now);
assert_eq!(cc.cwnd(), cc.cwnd_initial());
cc.detect_persistent_congestion(None, last_ack, PTO, lost.iter());
cc.detect_persistent_congestion(None, last_ack, PTO, lost.iter(), now);
assert_eq!(cc.cwnd(), cc.cwnd_initial());
cc.detect_persistent_congestion(rtt_time, last_ack, PTO, lost.iter());
cc.detect_persistent_congestion(rtt_time, last_ack, PTO, lost.iter(), now);
cc.cwnd() == cc.cwnd_min()
}
@ -1023,7 +1037,7 @@ mod tests {
fn persistent_congestion_no_prev_ack_newreno() {
let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter());
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter(), Instant::now());
assert_eq!(cc.cwnd(), cc.cwnd_min());
}
@ -1031,7 +1045,7 @@ mod tests {
fn persistent_congestion_no_prev_ack_cubic() {
let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]);
let mut cc = ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR));
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter());
cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter(), Instant::now());
assert_eq!(cc.cwnd(), cc.cwnd_min());
}
@ -1085,7 +1099,7 @@ mod tests {
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
cc.on_packet_sent(&p, now);
pkts.push(p);
}
assert_eq!(
@ -1113,7 +1127,7 @@ mod tests {
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
cc.on_packet_sent(&p, now);
pkts.push(p);
}
assert_eq!(
@ -1163,10 +1177,10 @@ mod tests {
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_lost);
cc.on_packet_sent(&p_lost, now);
cwnd_is_default(&cc);
now += PTO;
cc.on_packets_lost(Some(now), None, PTO, &[p_lost]);
cc.on_packets_lost(Some(now), None, PTO, &[p_lost], now);
cwnd_is_halved(&cc);
let p_not_lost = SentPacket::new(
PacketType::Short,
@ -1177,7 +1191,7 @@ mod tests {
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_not_lost);
cc.on_packet_sent(&p_not_lost, now);
now += RTT;
cc.on_packets_acked(&[p_not_lost], &RTT_ESTIMATE, now);
cwnd_is_halved(&cc);
@ -1202,7 +1216,7 @@ mod tests {
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
cc.on_packet_sent(&p, now);
pkts.push(p);
}
assert_eq!(
@ -1236,7 +1250,7 @@ mod tests {
cc.max_datagram_size(),
);
next_pn += 1;
cc.on_packet_sent(&p);
cc.on_packet_sent(&p, now);
pkts.push(p);
}
assert_eq!(
@ -1264,22 +1278,23 @@ mod tests {
#[test]
fn ecn_ce() {
let now = now();
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
let p_ce = SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
now(),
now,
true,
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&p_ce);
cc.on_packet_sent(&p_ce, now);
cwnd_is_default(&cc);
assert_eq!(cc.state, State::SlowStart);
// Signal congestion (ECN CE) and thus change state to recovery start.
cc.on_ecn_ce_received(&p_ce);
cc.on_ecn_ce_received(&p_ce, now);
cwnd_is_halved(&cc);
assert_eq!(cc.state, State::RecoveryStart);
}

Просмотреть файл

@ -60,19 +60,20 @@ pub trait CongestionControl: Display + Debug {
prev_largest_acked_sent: Option<Instant>,
pto: Duration,
lost_packets: &[SentPacket],
now: Instant,
) -> bool;
/// Returns true if the congestion window was reduced.
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool;
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool;
#[must_use]
fn recovery_packet(&self) -> bool;
fn discard(&mut self, pkt: &SentPacket);
fn discard(&mut self, pkt: &SentPacket, now: Instant);
fn on_packet_sent(&mut self, pkt: &SentPacket);
fn on_packet_sent(&mut self, pkt: &SentPacket, now: Instant);
fn discard_in_flight(&mut self);
fn discard_in_flight(&mut self, now: Instant);
}
#[derive(Debug, Copy, Clone)]

Просмотреть файл

@ -53,7 +53,7 @@ fn fill_cwnd(cc: &mut ClassicCongestionControl<Cubic>, mut next_pn: u64, now: In
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packet_sent(&sent);
cc.on_packet_sent(&sent, now);
next_pn += 1;
}
next_pn
@ -83,7 +83,7 @@ fn packet_lost(cc: &mut ClassicCongestionControl<Cubic>, pn: u64) {
Vec::new(),
cc.max_datagram_size(),
);
cc.on_packets_lost(None, None, PTO, &[p_lost]);
cc.on_packets_lost(None, None, PTO, &[p_lost], now());
}
fn expected_tcp_acks(cwnd_rtt_start: usize, mtu: usize) -> u64 {

Просмотреть файл

@ -38,18 +38,19 @@ fn cwnd_is_halved(cc: &ClassicCongestionControl<NewReno>) {
}
#[test]
#[allow(clippy::too_many_lines)]
fn issue_876() {
let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR));
let time_now = now();
let time_before = time_now.checked_sub(Duration::from_millis(100)).unwrap();
let time_after = time_now + Duration::from_millis(150);
let now = now();
let before = now.checked_sub(Duration::from_millis(100)).unwrap();
let after = now + Duration::from_millis(150);
let sent_packets = &[
SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size() - 1,
@ -58,7 +59,7 @@ fn issue_876() {
PacketType::Short,
2,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size() - 2,
@ -67,7 +68,7 @@ fn issue_876() {
PacketType::Short,
3,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size(),
@ -76,7 +77,7 @@ fn issue_876() {
PacketType::Short,
4,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size(),
@ -85,7 +86,7 @@ fn issue_876() {
PacketType::Short,
5,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size(),
@ -94,7 +95,7 @@ fn issue_876() {
PacketType::Short,
6,
IpTosEcn::default(),
time_before,
before,
true,
Vec::new(),
cc.max_datagram_size(),
@ -103,7 +104,7 @@ fn issue_876() {
PacketType::Short,
7,
IpTosEcn::default(),
time_after,
after,
true,
Vec::new(),
cc.max_datagram_size() - 3,
@ -112,13 +113,13 @@ fn issue_876() {
// Send some more packets so that the cc is not app-limited.
for p in &sent_packets[..6] {
cc.on_packet_sent(p);
cc.on_packet_sent(p, now);
}
assert_eq!(cc.acked_bytes(), 0);
cwnd_is_default(&cc);
assert_eq!(cc.bytes_in_flight(), 6 * cc.max_datagram_size() - 3);
cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[0..1]);
cc.on_packets_lost(Some(now), None, PTO, &sent_packets[0..1], now);
// We are now in recovery
assert!(cc.recovery_packet());
@ -127,20 +128,20 @@ fn issue_876() {
assert_eq!(cc.bytes_in_flight(), 5 * cc.max_datagram_size() - 2);
// Send a packet after recovery starts
cc.on_packet_sent(&sent_packets[6]);
cc.on_packet_sent(&sent_packets[6], now);
assert!(!cc.recovery_packet());
cwnd_is_halved(&cc);
assert_eq!(cc.acked_bytes(), 0);
assert_eq!(cc.bytes_in_flight(), 6 * cc.max_datagram_size() - 5);
// and ack it. cwnd increases slightly
cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, time_now);
cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, now);
assert_eq!(cc.acked_bytes(), sent_packets[6].len());
cwnd_is_halved(&cc);
assert_eq!(cc.bytes_in_flight(), 5 * cc.max_datagram_size() - 2);
// Packet from before is lost. Should not hurt cwnd.
cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]);
cc.on_packets_lost(Some(now), None, PTO, &sent_packets[1..2], now);
assert!(!cc.recovery_packet());
assert_eq!(cc.acked_bytes(), sent_packets[6].len());
cwnd_is_halved(&cc);
@ -169,7 +170,7 @@ fn issue_1465() {
};
let mut send_next = |cc: &mut ClassicCongestionControl<NewReno>, now| {
let p = next_packet(now);
cc.on_packet_sent(&p);
cc.on_packet_sent(&p, now);
p
};
@ -184,7 +185,7 @@ fn issue_1465() {
// advance one rtt to detect lost packet there this simplifies the timers, because
// on_packet_loss would only be called after RTO, but that is not relevant to the problem
now += RTT;
cc.on_packets_lost(Some(now), None, PTO, &[p1]);
cc.on_packets_lost(Some(now), None, PTO, &[p1], now);
// We are now in recovery
assert!(cc.recovery_packet());
@ -193,7 +194,7 @@ fn issue_1465() {
assert_eq!(cc.bytes_in_flight(), 2 * cc.max_datagram_size());
// Don't reduce the cwnd again on second packet loss
cc.on_packets_lost(Some(now), None, PTO, &[p3]);
cc.on_packets_lost(Some(now), None, PTO, &[p3], now);
assert_eq!(cc.acked_bytes(), 0);
cwnd_is_halved(&cc); // still the same as after first packet loss
assert_eq!(cc.bytes_in_flight(), cc.max_datagram_size());
@ -206,7 +207,7 @@ fn issue_1465() {
// send out recovery packet and get it acked to get out of recovery state
let p4 = send_next(&mut cc, now);
cc.on_packet_sent(&p4);
cc.on_packet_sent(&p4, now);
now += RTT;
cc.on_packets_acked(&[p4], &RTT_ESTIMATE, now);
@ -216,7 +217,7 @@ fn issue_1465() {
now += RTT;
let cur_cwnd = cc.cwnd();
cc.on_packets_lost(Some(now), None, PTO, &[p5]);
cc.on_packets_lost(Some(now), None, PTO, &[p5], now);
// go back into recovery
assert!(cc.recovery_packet());
@ -225,6 +226,6 @@ fn issue_1465() {
assert_eq!(cc.bytes_in_flight(), 2 * cc.max_datagram_size());
// this shouldn't introduce further cwnd reduction, but it did before https://github.com/mozilla/neqo/pull/1465
cc.on_packets_lost(Some(now), None, PTO, &[p6]);
cc.on_packets_lost(Some(now), None, PTO, &[p6], now);
assert_eq!(cc.cwnd(), cur_cwnd / 2);
}

Просмотреть файл

@ -921,7 +921,7 @@ impl Connection {
State::Init => {
// We have not even sent anything just close the connection without sending any
// error. This may happen when client_start fails.
self.set_state(State::Closed(error));
self.set_state(State::Closed(error), now);
}
State::WaitInitial => {
// We don't have any state yet, so don't bother with
@ -930,22 +930,25 @@ impl Connection {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
}
self.set_state(State::Closed(error));
self.set_state(State::Closed(error), now);
}
_ => {
if let Some(path) = path.or_else(|| self.paths.primary()) {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
if matches!(v, Error::KeysExhausted) {
self.set_state(State::Closed(error));
self.set_state(State::Closed(error), now);
} else {
self.set_state(State::Closing {
error,
timeout: self.get_closing_period_time(now),
});
self.set_state(
State::Closing {
error,
timeout: self.get_closing_period_time(now),
},
now,
);
}
} else {
self.set_state(State::Closed(error));
self.set_state(State::Closed(error), now);
}
}
}
@ -967,7 +970,7 @@ impl Connection {
State::Closing { error, timeout } | State::Draining { error, timeout } => {
if *timeout <= now {
let st = State::Closed(error.clone());
self.set_state(st);
self.set_state(st, now);
qinfo!("Closing timer expired");
return;
}
@ -982,7 +985,10 @@ impl Connection {
let pto = self.pto();
if self.idle_timeout.expired(now, pto) {
qinfo!([self], "idle timeout expired");
self.set_state(State::Closed(CloseReason::Transport(Error::IdleTimeout)));
self.set_state(
State::Closed(CloseReason::Transport(Error::IdleTimeout)),
now,
);
return;
}
@ -999,7 +1005,7 @@ impl Connection {
if let Some(path) = self.paths.primary() {
let lost = self.loss_recovery.timeout(&path, now);
self.handle_lost_packets(&lost);
qlog::packets_lost(&self.qlog, &lost);
qlog::packets_lost(&self.qlog, &lost, now);
}
if self.release_resumption_token_timer.is_some() {
@ -1264,10 +1270,13 @@ impl Connection {
// indicate that there is a stateless reset present.
qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..]));
self.state_signaling.reset();
self.set_state(State::Draining {
error: CloseReason::Transport(Error::StatelessReset),
timeout: self.get_closing_period_time(now),
});
self.set_state(
State::Draining {
error: CloseReason::Transport(Error::StatelessReset),
timeout: self.get_closing_period_time(now),
},
now,
);
Err(Error::StatelessReset)
} else {
Ok(())
@ -1339,14 +1348,16 @@ impl Connection {
self.conn_params.get_versions().all(),
supported,
version,
now,
);
Ok(())
} else {
qinfo!([self], "Version negotiation: failed with {:?}", supported);
// This error goes straight to closed.
self.set_state(State::Closed(CloseReason::Transport(
Error::VersionNegotiation,
)));
self.set_state(
State::Closed(CloseReason::Transport(Error::VersionNegotiation)),
now,
);
Err(Error::VersionNegotiation)
}
}
@ -1399,7 +1410,7 @@ impl Connection {
let dcid = ConnectionId::from(packet.dcid());
self.crypto.states.init_server(version, &dcid)?;
self.original_destination_cid = Some(dcid);
self.set_state(State::WaitInitial);
self.set_state(State::WaitInitial, now);
// We need to make sure that we set this transport parameter.
// This has to happen prior to processing the packet so that
@ -1606,7 +1617,7 @@ impl Connection {
neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]);
}
qlog::packet_received(&self.qlog, &packet, &payload);
qlog::packet_received(&self.qlog, &packet, &payload, now);
let space = PacketNumberSpace::from(payload.packet_type());
if let Some(space) = self.acks.get_mut(space) {
if space.is_duplicate(payload.pn()) {
@ -1659,7 +1670,7 @@ impl Connection {
// the rest of the datagram on the floor, but don't generate an error.
self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
self.stats.borrow_mut().pkt_dropped("Decryption failure");
qlog::packet_dropped(&self.qlog, &packet);
qlog::packet_dropped(&self.qlog, &packet, now);
}
}
slc = remainder;
@ -1759,6 +1770,7 @@ impl Connection {
.unwrap()
.clone(),
),
now,
);
if self.role == Role::Client {
path.borrow_mut().set_valid(now);
@ -1766,13 +1778,13 @@ impl Connection {
}
/// If the path isn't permanent, assign it a connection ID to make it so.
fn ensure_permanent(&mut self, path: &PathRef) -> Res<()> {
fn ensure_permanent(&mut self, path: &PathRef, now: Instant) -> Res<()> {
if self.paths.is_temporary(path) {
// If there isn't a connection ID to use for this path, the packet
// will be processed, but it won't be attributed to a path. That means
// no path probes or PATH_RESPONSE. But it's not fatal.
if let Some(cid) = self.connection_ids.next() {
self.paths.make_permanent(path, None, cid);
self.paths.make_permanent(path, None, cid, now);
Ok(())
} else if let Some(primary) = self.paths.primary() {
if primary
@ -1781,7 +1793,7 @@ impl Connection {
.map_or(true, |id| id.is_empty())
{
self.paths
.make_permanent(path, None, ConnectionIdEntry::empty_remote());
.make_permanent(path, None, ConnectionIdEntry::empty_remote(), now);
Ok(())
} else {
qtrace!([self], "Unable to make path permanent: {}", path.borrow());
@ -1808,7 +1820,7 @@ impl Connection {
self.setup_handshake_path(path, now);
} else {
// Otherwise try to get a usable connection ID.
mem::drop(self.ensure_permanent(path));
mem::drop(self.ensure_permanent(path, now));
}
}
}
@ -1843,9 +1855,9 @@ impl Connection {
self.stats.borrow().frame_rx.crypto > 0
};
if got_version {
self.set_state(State::Handshaking);
self.set_state(State::Handshaking, now);
} else {
self.set_state(State::WaitVersion);
self.set_state(State::WaitVersion, now);
}
}
@ -1905,7 +1917,7 @@ impl Connection {
self.conn_params.pacing_enabled(),
now,
);
self.ensure_permanent(&path)?;
self.ensure_permanent(&path, now)?;
qinfo!(
[self],
"Migrate to {} probe {}",
@ -1981,7 +1993,7 @@ impl Connection {
return;
}
if self.ensure_permanent(path).is_ok() {
if self.ensure_permanent(path, now).is_ok() {
self.paths
.handle_migration(path, d.source(), now, &mut self.stats.borrow_mut());
} else {
@ -2441,6 +2453,7 @@ impl Connection {
pn,
builder.len() - header_start + aead_expansion,
&builder.as_ref()[payload_start..],
now,
);
self.stats.borrow_mut().packets_tx += 1;
@ -2466,7 +2479,7 @@ impl Connection {
);
if padded {
needs_padding = false;
self.loss_recovery.on_packet_sent(path, sent);
self.loss_recovery.on_packet_sent(path, sent, now);
} else if pt == PacketType::Initial && (self.role == Role::Client || ack_eliciting) {
// Packets containing Initial packets might need padding, and we want to
// track that padding along with the Initial packet. So defer tracking.
@ -2476,7 +2489,7 @@ impl Connection {
if pt == PacketType::Handshake && self.role == Role::Client {
needs_padding = false;
}
self.loss_recovery.on_packet_sent(path, sent);
self.loss_recovery.on_packet_sent(path, sent, now);
}
if *space == PacketNumberSpace::Handshake
@ -2508,7 +2521,7 @@ impl Connection {
// packet, which is why we don't increase `frame_tx.padding` count here.
packets.resize(profile.limit(), 0);
}
self.loss_recovery.on_packet_sent(path, initial);
self.loss_recovery.on_packet_sent(path, initial, now);
}
path.borrow_mut().add_sent(packets.len());
Ok(SendOption::Yes(
@ -2542,12 +2555,16 @@ impl Connection {
qdebug!([self], "client_start");
debug_assert_eq!(self.role, Role::Client);
if let Some(path) = self.paths.primary() {
qlog::client_connection_started(&self.qlog, &path);
qlog::client_connection_started(&self.qlog, &path, now);
}
qlog::client_version_information_initiated(&self.qlog, self.conn_params.get_versions());
qlog::client_version_information_initiated(
&self.qlog,
self.conn_params.get_versions(),
now,
);
self.handshake(now, self.version, PacketNumberSpace::Initial, None)?;
self.set_state(State::WaitInitial);
self.set_state(State::WaitInitial, now);
self.zero_rtt_state = if self.crypto.enable_0rtt(self.version, self.role)? {
qdebug!([self], "Enabled 0-RTT");
ZeroRttState::Sending
@ -2568,9 +2585,9 @@ impl Connection {
let timeout = self.get_closing_period_time(now);
if let Some(path) = self.paths.primary() {
self.state_signaling.close(path, error.clone(), 0, msg);
self.set_state(State::Closing { error, timeout });
self.set_state(State::Closing { error, timeout }, now);
} else {
self.set_state(State::Closed(error));
self.set_state(State::Closed(error), now);
}
}
@ -2600,7 +2617,7 @@ impl Connection {
}
/// Process the final set of transport parameters.
fn process_tps(&mut self) -> Res<()> {
fn process_tps(&mut self, now: Instant) -> Res<()> {
self.validate_cids()?;
self.validate_versions()?;
{
@ -2645,7 +2662,7 @@ impl Connection {
self.cid_manager.set_limit(max_active_cids);
}
self.set_initial_limits();
qlog::connection_tparams_set(&self.qlog, &self.tps.borrow());
qlog::connection_tparams_set(&self.qlog, &self.tps.borrow(), now);
Ok(())
}
@ -2848,8 +2865,8 @@ impl Connection {
Ok(())
}
fn set_confirmed(&mut self) -> Res<()> {
self.set_state(State::Confirmed);
fn set_confirmed(&mut self, now: Instant) -> Res<()> {
self.set_state(State::Confirmed, now);
if self.conn_params.pmtud_enabled() {
self.paths
.primary()
@ -2970,7 +2987,7 @@ impl Connection {
self.stats.borrow_mut().frame_rx.path_challenge += 1;
// If we were challenged, try to make the path permanent.
// Report an error if we don't have enough connection IDs.
self.ensure_permanent(path)?;
self.ensure_permanent(path, now)?;
path.borrow_mut().challenged(data);
}
Frame::PathResponse { data } => {
@ -3012,17 +3029,20 @@ impl Connection {
let error = CloseReason::Transport(detail);
self.state_signaling
.drain(Rc::clone(path), error.clone(), frame_type, "");
self.set_state(State::Draining {
error,
timeout: self.get_closing_period_time(now),
});
self.set_state(
State::Draining {
error,
timeout: self.get_closing_period_time(now),
},
now,
);
}
Frame::HandshakeDone => {
self.stats.borrow_mut().frame_rx.handshake_done += 1;
if self.role == Role::Server || !self.state.connected() {
return Err(Error::ProtocolViolation);
}
self.set_confirmed()?;
self.set_confirmed(now)?;
self.discard_keys(PacketNumberSpace::Handshake, now);
self.migrate_to_preferred_address(now)?;
}
@ -3140,7 +3160,7 @@ impl Connection {
}
}
self.handle_lost_packets(&lost_packets);
qlog::packets_lost(&self.qlog, &lost_packets);
qlog::packets_lost(&self.qlog, &lost_packets, now);
let stats = &mut self.stats.borrow_mut().frame_rx;
stats.ack += 1;
if let Some(largest_acknowledged) = largest_acknowledged {
@ -3182,7 +3202,7 @@ impl Connection {
let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
path.borrow_mut().set_valid(now);
// Generate a qlog event that the server connection started.
qlog::server_connection_started(&self.qlog, &path);
qlog::server_connection_started(&self.qlog, &path, now);
} else {
self.zero_rtt_state = if self
.crypto
@ -3202,8 +3222,8 @@ impl Connection {
let pto = self.pto();
self.crypto
.install_application_keys(self.version, now + pto)?;
self.process_tps()?;
self.set_state(State::Connected);
self.process_tps(now)?;
self.set_state(State::Connected, now);
self.create_resumption_token(now);
self.saved_datagrams
.make_available(CryptoSpace::ApplicationData);
@ -3215,13 +3235,13 @@ impl Connection {
.resumed();
if self.role == Role::Server {
self.state_signaling.handshake_done();
self.set_confirmed()?;
self.set_confirmed(now)?;
}
qinfo!([self], "Connection established");
Ok(())
}
fn set_state(&mut self, state: State) {
fn set_state(&mut self, state: State, now: Instant) {
if state > self.state {
qdebug!([self], "State change from {:?} -> {:?}", self.state, state);
self.state = state.clone();
@ -3229,7 +3249,7 @@ impl Connection {
self.streams.clear_streams();
}
self.events.connection_state_change(state);
qlog::connection_state_updated(&self.qlog, &self.state);
qlog::connection_state_updated(&self.qlog, &self.state, now);
} else if mem::discriminant(&state) != mem::discriminant(&self.state) {
// Only tolerate a regression in state if the new state is closing
// and the connection is already closed.

Просмотреть файл

@ -168,6 +168,7 @@ impl Paths {
path: &PathRef,
local_cid: Option<ConnectionId>,
remote_cid: RemoteConnectionIdEntry,
now: Instant,
) {
debug_assert!(self.is_temporary(path));
@ -195,7 +196,7 @@ impl Paths {
path.borrow_mut().make_permanent(local_cid, remote_cid);
self.paths.push(Rc::clone(path));
if self.primary.is_none() {
assert!(self.select_primary(path).is_none());
assert!(self.select_primary(path, now).is_none());
}
}
@ -203,10 +204,10 @@ impl Paths {
/// Using the old path is only necessary if this change in path is a reaction
/// to a migration from a peer, in which case the old path needs to be probed.
#[must_use]
fn select_primary(&mut self, path: &PathRef) -> Option<PathRef> {
fn select_primary(&mut self, path: &PathRef, now: Instant) -> Option<PathRef> {
qdebug!([path.borrow()], "set as primary path");
let old_path = self.primary.replace(Rc::clone(path)).inspect(|old| {
old.borrow_mut().set_primary(false);
old.borrow_mut().set_primary(false, now);
});
// Swap the primary path into slot 0, so that it is protected from eviction.
@ -218,7 +219,7 @@ impl Paths {
.expect("migration target should be permanent");
self.paths.swap(0, idx);
path.borrow_mut().set_primary(true);
path.borrow_mut().set_primary(true, now);
old_path
}
@ -242,7 +243,7 @@ impl Paths {
path.borrow_mut().set_ecn_baseline(baseline);
if force || path.borrow().is_valid() {
path.borrow_mut().set_valid(now);
mem::drop(self.select_primary(path));
mem::drop(self.select_primary(path, now));
self.migration_target = None;
} else {
self.migration_target = Some(Rc::clone(path));
@ -285,7 +286,7 @@ impl Paths {
// Need a clone as `fallback` is borrowed from `self`.
let path = Rc::clone(fallback);
qinfo!([path.borrow()], "Failing over after primary path failed");
mem::drop(self.select_primary(&path));
mem::drop(self.select_primary(&path, now));
true
} else {
false
@ -328,7 +329,7 @@ impl Paths {
return;
}
if let Some(old_path) = self.select_primary(path) {
if let Some(old_path) = self.select_primary(path, now) {
// Need to probe the old path if the peer migrates.
old_path.borrow_mut().probe(stats);
// TODO(mt) - suppress probing if the path was valid within 3PTO.
@ -366,7 +367,7 @@ impl Paths {
.map_or(false, |target| Rc::ptr_eq(target, p))
{
let primary = self.migration_target.take();
mem::drop(self.select_primary(&primary.unwrap()));
mem::drop(self.select_primary(&primary.unwrap(), now));
return true;
}
break;
@ -637,12 +638,12 @@ impl Path {
}
/// Set whether this path is primary.
pub(crate) fn set_primary(&mut self, primary: bool) {
pub(crate) fn set_primary(&mut self, primary: bool, now: Instant) {
qtrace!([self], "Make primary {}", primary);
debug_assert!(self.remote_cid.is_some());
self.primary = primary;
if !primary {
self.sender.discard_in_flight();
self.sender.discard_in_flight(now);
}
}
@ -958,11 +959,11 @@ impl Path {
}
/// Record a packet as having been sent on this path.
pub fn packet_sent(&mut self, sent: &mut SentPacket) {
pub fn packet_sent(&mut self, sent: &mut SentPacket, now: Instant) {
if !self.is_primary() {
sent.clear_primary_path();
}
self.sender.on_packet_sent(sent, self.rtt.estimate());
self.sender.on_packet_sent(sent, self.rtt.estimate(), now);
}
/// Discard a packet that previously might have been in-flight.
@ -988,7 +989,7 @@ impl Path {
);
}
self.sender.discard(sent);
self.sender.discard(sent, now);
}
/// Record packets as acknowledged with the sender.
@ -1005,7 +1006,7 @@ impl Path {
if ecn_ce_received {
let cwnd_reduced = self
.sender
.on_ecn_ce_received(acked_pkts.first().expect("must be there"));
.on_ecn_ce_received(acked_pkts.first().expect("must be there"), now);
if cwnd_reduced {
self.rtt.update_ack_delay(self.sender.cwnd(), self.plpmtu());
}

Просмотреть файл

@ -8,7 +8,7 @@
use std::{
ops::{Deref, RangeInclusive},
time::Duration,
time::{Duration, Instant},
};
use neqo_common::{hex, qinfo, qlog::NeqoQlog, Decoder, IpTosEcn};
@ -33,8 +33,8 @@ use crate::{
version::{Version, VersionConfig, WireVersion},
};
pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler) {
qlog.add_event_data(|| {
pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler, now: Instant) {
qlog.add_event_data_with_instant(|| {
let remote = tph.remote();
#[allow(clippy::cast_possible_truncation)] // Nope.
let ev_data = EventData::TransportParametersSet(
@ -79,72 +79,86 @@ pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler)
});
Some(ev_data)
});
}, now);
}
pub fn server_connection_started(qlog: &NeqoQlog, path: &PathRef) {
connection_started(qlog, path);
pub fn server_connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) {
connection_started(qlog, path, now);
}
pub fn client_connection_started(qlog: &NeqoQlog, path: &PathRef) {
connection_started(qlog, path);
pub fn client_connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) {
connection_started(qlog, path, now);
}
fn connection_started(qlog: &NeqoQlog, path: &PathRef) {
qlog.add_event_data(|| {
let p = path.deref().borrow();
let ev_data = EventData::ConnectionStarted(ConnectionStarted {
ip_version: if p.local_address().ip().is_ipv4() {
Some("ipv4".into())
} else {
Some("ipv6".into())
},
src_ip: format!("{}", p.local_address().ip()),
dst_ip: format!("{}", p.remote_address().ip()),
protocol: Some("QUIC".into()),
src_port: p.local_address().port().into(),
dst_port: p.remote_address().port().into(),
src_cid: p.local_cid().map(ToString::to_string),
dst_cid: p.remote_cid().map(ToString::to_string),
});
fn connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) {
qlog.add_event_data_with_instant(
|| {
let p = path.deref().borrow();
let ev_data = EventData::ConnectionStarted(ConnectionStarted {
ip_version: if p.local_address().ip().is_ipv4() {
Some("ipv4".into())
} else {
Some("ipv6".into())
},
src_ip: format!("{}", p.local_address().ip()),
dst_ip: format!("{}", p.remote_address().ip()),
protocol: Some("QUIC".into()),
src_port: p.local_address().port().into(),
dst_port: p.remote_address().port().into(),
src_cid: p.local_cid().map(ToString::to_string),
dst_cid: p.remote_cid().map(ToString::to_string),
});
Some(ev_data)
});
Some(ev_data)
},
now,
);
}
pub fn connection_state_updated(qlog: &NeqoQlog, new: &State) {
qlog.add_event_data(|| {
let ev_data = EventData::ConnectionStateUpdated(ConnectionStateUpdated {
old: None,
new: match new {
State::Init | State::WaitInitial => ConnectionState::Attempted,
State::WaitVersion | State::Handshaking => ConnectionState::HandshakeStarted,
State::Connected => ConnectionState::HandshakeCompleted,
State::Confirmed => ConnectionState::HandshakeConfirmed,
State::Closing { .. } => ConnectionState::Closing,
State::Draining { .. } => ConnectionState::Draining,
State::Closed { .. } => ConnectionState::Closed,
},
});
#[allow(clippy::similar_names)]
pub fn connection_state_updated(qlog: &NeqoQlog, new: &State, now: Instant) {
qlog.add_event_data_with_instant(
|| {
let ev_data = EventData::ConnectionStateUpdated(ConnectionStateUpdated {
old: None,
new: match new {
State::Init | State::WaitInitial => ConnectionState::Attempted,
State::WaitVersion | State::Handshaking => ConnectionState::HandshakeStarted,
State::Connected => ConnectionState::HandshakeCompleted,
State::Confirmed => ConnectionState::HandshakeConfirmed,
State::Closing { .. } => ConnectionState::Closing,
State::Draining { .. } => ConnectionState::Draining,
State::Closed { .. } => ConnectionState::Closed,
},
});
Some(ev_data)
});
Some(ev_data)
},
now,
);
}
pub fn client_version_information_initiated(qlog: &NeqoQlog, version_config: &VersionConfig) {
qlog.add_event_data(|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(
version_config
.all()
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
server_versions: None,
chosen_version: Some(format!("{:02x}", version_config.initial().wire_version())),
}))
});
pub fn client_version_information_initiated(
qlog: &NeqoQlog,
version_config: &VersionConfig,
now: Instant,
) {
qlog.add_event_data_with_instant(
|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(
version_config
.all()
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
server_versions: None,
chosen_version: Some(format!("{:02x}", version_config.initial().wire_version())),
}))
},
now,
);
}
pub fn client_version_information_negotiated(
@ -152,96 +166,119 @@ pub fn client_version_information_negotiated(
client: &[Version],
server: &[WireVersion],
chosen: Version,
now: Instant,
) {
qlog.add_event_data(|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(
client
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
server_versions: Some(server.iter().map(|v| format!("{v:02x}")).collect()),
chosen_version: Some(format!("{:02x}", chosen.wire_version())),
}))
});
qlog.add_event_data_with_instant(
|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(
client
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
server_versions: Some(server.iter().map(|v| format!("{v:02x}")).collect()),
chosen_version: Some(format!("{:02x}", chosen.wire_version())),
}))
},
now,
);
}
pub fn server_version_information_failed(qlog: &NeqoQlog, server: &[Version], client: WireVersion) {
qlog.add_event_data(|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(vec![format!("{client:02x}")]),
server_versions: Some(
server
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
chosen_version: None,
}))
});
pub fn server_version_information_failed(
qlog: &NeqoQlog,
server: &[Version],
client: WireVersion,
now: Instant,
) {
qlog.add_event_data_with_instant(
|| {
Some(EventData::VersionInformation(VersionInformation {
client_versions: Some(vec![format!("{client:02x}")]),
server_versions: Some(
server
.iter()
.map(|v| format!("{:02x}", v.wire_version()))
.collect(),
),
chosen_version: None,
}))
},
now,
);
}
pub fn packet_sent(qlog: &NeqoQlog, pt: PacketType, pn: PacketNumber, plen: usize, body: &[u8]) {
qlog.add_event_with_stream(|stream| {
let mut d = Decoder::from(body);
let header = PacketHeader::with_type(pt.into(), Some(pn), None, None, None);
let raw = RawInfo {
length: Some(plen as u64),
payload_length: None,
data: None,
};
pub fn packet_sent(
qlog: &NeqoQlog,
pt: PacketType,
pn: PacketNumber,
plen: usize,
body: &[u8],
now: Instant,
) {
qlog.add_event_data_with_instant(
|| {
let mut d = Decoder::from(body);
let header = PacketHeader::with_type(pt.into(), Some(pn), None, None, None);
let raw = RawInfo {
length: Some(plen as u64),
payload_length: None,
data: None,
};
let mut frames = SmallVec::new();
while d.remaining() > 0 {
if let Ok(f) = Frame::decode(&mut d) {
frames.push(QuicFrame::from(f));
} else {
qinfo!("qlog: invalid frame");
break;
let mut frames = SmallVec::new();
while d.remaining() > 0 {
if let Ok(f) = Frame::decode(&mut d) {
frames.push(QuicFrame::from(f));
} else {
qinfo!("qlog: invalid frame");
break;
}
}
}
let ev_data = EventData::PacketSent(PacketSent {
header,
frames: Some(frames),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
supported_versions: None,
raw: Some(raw),
datagram_id: None,
send_at_time: None,
trigger: None,
});
stream.add_event_data_now(ev_data)
});
Some(EventData::PacketSent(PacketSent {
header,
frames: Some(frames),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
supported_versions: None,
raw: Some(raw),
datagram_id: None,
send_at_time: None,
trigger: None,
}))
},
now,
);
}
pub fn packet_dropped(qlog: &NeqoQlog, public_packet: &PublicPacket) {
qlog.add_event_data(|| {
let header =
PacketHeader::with_type(public_packet.packet_type().into(), None, None, None, None);
let raw = RawInfo {
length: Some(public_packet.len() as u64),
payload_length: None,
data: None,
};
pub fn packet_dropped(qlog: &NeqoQlog, public_packet: &PublicPacket, now: Instant) {
qlog.add_event_data_with_instant(
|| {
let header =
PacketHeader::with_type(public_packet.packet_type().into(), None, None, None, None);
let raw = RawInfo {
length: Some(public_packet.len() as u64),
payload_length: None,
data: None,
};
let ev_data = EventData::PacketDropped(PacketDropped {
header: Some(header),
raw: Some(raw),
datagram_id: None,
details: None,
trigger: None,
});
let ev_data = EventData::PacketDropped(PacketDropped {
header: Some(header),
raw: Some(raw),
datagram_id: None,
details: None,
trigger: None,
});
Some(ev_data)
});
Some(ev_data)
},
now,
);
}
pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket]) {
pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket], now: Instant) {
qlog.add_event_with_stream(|stream| {
for pkt in pkts {
let header =
@ -253,54 +290,60 @@ pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket]) {
frames: None,
});
stream.add_event_data_now(ev_data)?;
stream.add_event_data_with_instant(ev_data, now)?;
}
Ok(())
});
}
pub fn packet_received(qlog: &NeqoQlog, public_packet: &PublicPacket, payload: &DecryptedPacket) {
qlog.add_event_with_stream(|stream| {
let mut d = Decoder::from(&payload[..]);
pub fn packet_received(
qlog: &NeqoQlog,
public_packet: &PublicPacket,
payload: &DecryptedPacket,
now: Instant,
) {
qlog.add_event_data_with_instant(
|| {
let mut d = Decoder::from(&payload[..]);
let header = PacketHeader::with_type(
public_packet.packet_type().into(),
Some(payload.pn()),
None,
None,
None,
);
let raw = RawInfo {
length: Some(public_packet.len() as u64),
payload_length: None,
data: None,
};
let header = PacketHeader::with_type(
public_packet.packet_type().into(),
Some(payload.pn()),
None,
None,
None,
);
let raw = RawInfo {
length: Some(public_packet.len() as u64),
payload_length: None,
data: None,
};
let mut frames = Vec::new();
let mut frames = Vec::new();
while d.remaining() > 0 {
if let Ok(f) = Frame::decode(&mut d) {
frames.push(QuicFrame::from(f));
} else {
qinfo!("qlog: invalid frame");
break;
while d.remaining() > 0 {
if let Ok(f) = Frame::decode(&mut d) {
frames.push(QuicFrame::from(f));
} else {
qinfo!("qlog: invalid frame");
break;
}
}
}
let ev_data = EventData::PacketReceived(PacketReceived {
header,
frames: Some(frames),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
supported_versions: None,
raw: Some(raw),
datagram_id: None,
trigger: None,
});
stream.add_event_data_now(ev_data)
});
Some(EventData::PacketReceived(PacketReceived {
header,
frames: Some(frames),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
supported_versions: None,
raw: Some(raw),
datagram_id: None,
trigger: None,
}))
},
now,
);
}
#[allow(dead_code)]
@ -319,55 +362,60 @@ pub enum QlogMetric {
PacingRate(u64),
}
pub fn metrics_updated(qlog: &NeqoQlog, updated_metrics: &[QlogMetric]) {
pub fn metrics_updated(qlog: &NeqoQlog, updated_metrics: &[QlogMetric], now: Instant) {
debug_assert!(!updated_metrics.is_empty());
qlog.add_event_data(|| {
let mut min_rtt: Option<f32> = None;
let mut smoothed_rtt: Option<f32> = None;
let mut latest_rtt: Option<f32> = None;
let mut rtt_variance: Option<f32> = None;
let mut pto_count: Option<u16> = None;
let mut congestion_window: Option<u64> = None;
let mut bytes_in_flight: Option<u64> = None;
let mut ssthresh: Option<u64> = None;
let mut packets_in_flight: Option<u64> = None;
let mut pacing_rate: Option<u64> = None;
qlog.add_event_data_with_instant(
|| {
let mut min_rtt: Option<f32> = None;
let mut smoothed_rtt: Option<f32> = None;
let mut latest_rtt: Option<f32> = None;
let mut rtt_variance: Option<f32> = None;
let mut pto_count: Option<u16> = None;
let mut congestion_window: Option<u64> = None;
let mut bytes_in_flight: Option<u64> = None;
let mut ssthresh: Option<u64> = None;
let mut packets_in_flight: Option<u64> = None;
let mut pacing_rate: Option<u64> = None;
for metric in updated_metrics {
#[allow(clippy::cast_precision_loss)] // Nought to do here.
match metric {
QlogMetric::MinRtt(v) => min_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::SmoothedRtt(v) => smoothed_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::LatestRtt(v) => latest_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::RttVariance(v) => rtt_variance = Some(v.as_secs_f32() * 1000.0),
QlogMetric::PtoCount(v) => pto_count = Some(u16::try_from(*v).unwrap()),
QlogMetric::CongestionWindow(v) => {
congestion_window = Some(u64::try_from(*v).unwrap());
for metric in updated_metrics {
#[allow(clippy::cast_precision_loss)] // Nought to do here.
match metric {
QlogMetric::MinRtt(v) => min_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::SmoothedRtt(v) => smoothed_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::LatestRtt(v) => latest_rtt = Some(v.as_secs_f32() * 1000.0),
QlogMetric::RttVariance(v) => rtt_variance = Some(v.as_secs_f32() * 1000.0),
QlogMetric::PtoCount(v) => pto_count = Some(u16::try_from(*v).unwrap()),
QlogMetric::CongestionWindow(v) => {
congestion_window = Some(u64::try_from(*v).unwrap());
}
QlogMetric::BytesInFlight(v) => {
bytes_in_flight = Some(u64::try_from(*v).unwrap());
}
QlogMetric::SsThresh(v) => ssthresh = Some(u64::try_from(*v).unwrap()),
QlogMetric::PacketsInFlight(v) => packets_in_flight = Some(*v),
QlogMetric::PacingRate(v) => pacing_rate = Some(*v),
_ => (),
}
QlogMetric::BytesInFlight(v) => bytes_in_flight = Some(u64::try_from(*v).unwrap()),
QlogMetric::SsThresh(v) => ssthresh = Some(u64::try_from(*v).unwrap()),
QlogMetric::PacketsInFlight(v) => packets_in_flight = Some(*v),
QlogMetric::PacingRate(v) => pacing_rate = Some(*v),
_ => (),
}
}
let ev_data = EventData::MetricsUpdated(MetricsUpdated {
min_rtt,
smoothed_rtt,
latest_rtt,
rtt_variance,
pto_count,
congestion_window,
bytes_in_flight,
ssthresh,
packets_in_flight,
pacing_rate,
});
let ev_data = EventData::MetricsUpdated(MetricsUpdated {
min_rtt,
smoothed_rtt,
latest_rtt,
rtt_variance,
pto_count,
congestion_window,
bytes_in_flight,
ssthresh,
packets_in_flight,
pacing_rate,
});
Some(ev_data)
});
Some(ev_data)
},
now,
);
}
// Helper functions

Просмотреть файл

@ -527,11 +527,11 @@ impl LossRecovery {
dropped
}
pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket) {
pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket, now: Instant) {
let pn_space = PacketNumberSpace::from(sent_packet.packet_type());
qtrace!([self], "packet {}-{} sent", pn_space, sent_packet.pn());
if let Some(space) = self.spaces.get_mut(pn_space) {
path.borrow_mut().packet_sent(&mut sent_packet);
path.borrow_mut().packet_sent(&mut sent_packet, now);
space.on_packet_sent(sent_packet);
} else {
qwarn!(
@ -690,7 +690,7 @@ impl LossRecovery {
if let Some(pto) = self.pto_time(rtt, PacketNumberSpace::ApplicationData) {
if pto < now {
let probes = PacketNumberSpaceSet::from(&[PacketNumberSpace::ApplicationData]);
self.fire_pto(PacketNumberSpace::ApplicationData, probes);
self.fire_pto(PacketNumberSpace::ApplicationData, probes, now);
}
}
}
@ -804,7 +804,12 @@ impl LossRecovery {
}
}
fn fire_pto(&mut self, pn_space: PacketNumberSpace, allow_probes: PacketNumberSpaceSet) {
fn fire_pto(
&mut self,
pn_space: PacketNumberSpace,
allow_probes: PacketNumberSpaceSet,
now: Instant,
) {
if let Some(st) = &mut self.pto_state {
st.pto(pn_space, allow_probes);
} else {
@ -821,6 +826,7 @@ impl LossRecovery {
&[QlogMetric::PtoCount(
self.pto_state.as_ref().unwrap().count(),
)],
now,
);
}
@ -853,7 +859,7 @@ impl LossRecovery {
// pto_time to increase which might cause PTO for later packet number spaces to not fire.
if let Some(pn_space) = pto_space {
qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes);
self.fire_pto(pn_space, allow_probes);
self.fire_pto(pn_space, allow_probes, now);
}
}
@ -986,8 +992,8 @@ mod tests {
.on_ack_received(&self.path, pn_space, acked_ranges, ack_ecn, ack_delay, now)
}
pub fn on_packet_sent(&mut self, sent_packet: SentPacket) {
self.lr.on_packet_sent(&self.path, sent_packet);
pub fn on_packet_sent(&mut self, sent_packet: SentPacket, now: Instant) {
self.lr.on_packet_sent(&self.path, sent_packet, now);
}
pub fn timeout(&mut self, now: Instant) -> Vec<SentPacket> {
@ -1026,7 +1032,7 @@ mod tests {
None,
ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]),
);
path.set_primary(true);
path.set_primary(true, now());
path.rtt_mut().set_initial(TEST_RTT);
Self {
lr: LossRecovery::new(StatsCell::default(), FAST_PTO_SCALE),
@ -1119,15 +1125,18 @@ mod tests {
fn pace(lr: &mut Fixture, count: u64) {
for pn in 0..count {
lr.on_packet_sent(SentPacket::new(
PacketType::Short,
pn,
IpTosEcn::default(),
pn_time(pn),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Short,
pn,
IpTosEcn::default(),
pn_time(pn),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
}
}
@ -1269,24 +1278,30 @@ mod tests {
// So send two packets with 1/4 RTT between them. Acknowledge pn 1 after 1 RTT.
// pn 0 should then be marked lost because it is then outstanding for 5RTT/4
// the loss time for packets is 9RTT/8.
lr.on_packet_sent(SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
pn_time(0),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
pn_time(0) + TEST_RTT / 4,
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
pn_time(0),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
lr.on_packet_sent(
SentPacket::new(
PacketType::Short,
1,
IpTosEcn::default(),
pn_time(0) + TEST_RTT / 4,
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
let (_, lost) = lr.on_ack_received(
PacketNumberSpace::ApplicationData,
vec![1..=1],
@ -1374,33 +1389,42 @@ mod tests {
#[test]
fn drop_spaces() {
let mut lr = Fixture::default();
lr.on_packet_sent(SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
pn_time(0),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(SentPacket::new(
PacketType::Handshake,
0,
IpTosEcn::default(),
pn_time(1),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
pn_time(2),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
pn_time(0),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
lr.on_packet_sent(
SentPacket::new(
PacketType::Handshake,
0,
IpTosEcn::default(),
pn_time(1),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
lr.on_packet_sent(
SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
pn_time(2),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
// Now put all spaces on the LR timer so we can see them.
for sp in &[
@ -1418,7 +1442,7 @@ mod tests {
ON_SENT_SIZE,
);
let pn_space = PacketNumberSpace::from(sent_pkt.packet_type());
lr.on_packet_sent(sent_pkt);
lr.on_packet_sent(sent_pkt, Instant::now());
lr.on_ack_received(
pn_space,
vec![1..=1],
@ -1444,30 +1468,36 @@ mod tests {
// There are cases where we send a packet that is not subsequently tracked.
// So check that this works.
lr.on_packet_sent(SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
pn_time(3),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
pn_time(3),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
assert_sent_times(&lr, None, None, Some(pn_time(2)));
}
#[test]
fn rearm_pto_after_confirmed() {
let mut lr = Fixture::default();
lr.on_packet_sent(SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
// Set the RTT to the initial value so that discarding doesn't
// alter the estimate.
let rtt = lr.path.borrow().rtt().estimate();
@ -1479,24 +1509,30 @@ mod tests {
now() + rtt,
);
lr.on_packet_sent(SentPacket::new(
PacketType::Handshake,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Handshake,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
lr.on_packet_sent(
SentPacket::new(
PacketType::Short,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some());
lr.discard(PacketNumberSpace::Initial, pn_time(1));
@ -1526,15 +1562,18 @@ mod tests {
assert_eq!(path.amplification_limit(), SPARE);
}
lr.on_packet_sent(SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
));
lr.on_packet_sent(
SentPacket::new(
PacketType::Initial,
0,
IpTosEcn::default(),
now(),
true,
Vec::new(),
ON_SENT_SIZE,
),
Instant::now(),
);
let handshake_pto = lr.path.borrow().rtt().pto(false);
let expected_pto = now() + handshake_pto;

Просмотреть файл

@ -154,6 +154,7 @@ impl RttEstimate {
QlogMetric::SmoothedRtt(self.smoothed_rtt),
QlogMetric::RttVariance(self.rttvar),
],
now,
);
}

Просмотреть файл

@ -128,6 +128,7 @@ impl PacketSender {
prev_largest_acked_sent,
pto,
lost_packets,
now,
);
// Call below may change the size of MTU probes, so it needs to happen after the CC
// reaction above, which needs to ignore probes based on their size.
@ -137,24 +138,24 @@ impl PacketSender {
}
/// Called when ECN CE mark received. Returns true if the congestion window was reduced.
pub fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool {
self.cc.on_ecn_ce_received(largest_acked_pkt)
pub fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool {
self.cc.on_ecn_ce_received(largest_acked_pkt, now)
}
pub fn discard(&mut self, pkt: &SentPacket) {
self.cc.discard(pkt);
pub fn discard(&mut self, pkt: &SentPacket, now: Instant) {
self.cc.discard(pkt, now);
}
/// When we migrate, the congestion controller for the previously active path drops
/// all bytes in flight.
pub fn discard_in_flight(&mut self) {
self.cc.discard_in_flight();
pub fn discard_in_flight(&mut self, now: Instant) {
self.cc.discard_in_flight(now);
}
pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration) {
pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration, now: Instant) {
self.pacer
.spend(pkt.time_sent(), rtt, self.cc.cwnd(), pkt.len());
self.cc.on_packet_sent(pkt);
self.cc.on_packet_sent(pkt, now);
}
#[must_use]

Просмотреть файл

@ -332,6 +332,7 @@ impl Server {
&self.create_qlog_trace(orig_dcid.unwrap_or(initial.dst_cid).as_cid_ref()),
self.conn_params.get_versions().all(),
initial.version.wire_version(),
now,
);
}
Output::None
@ -390,6 +391,7 @@ impl Server {
&self.create_qlog_trace(packet.dcid()),
self.conn_params.get_versions().all(),
packet.wire_version(),
now,
);
return Output::Datagram(Datagram::new(