Skip to content
71 changes: 71 additions & 0 deletions quinn-proto/src/address_discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! Address discovery types from
//! <https://datatracker.ietf.org/doc/draft-seemann-quic-address-discovery/>

use crate::coding::BufExt;
use crate::{transport_parameters::Error, VarInt};

/// The role of each participant.
///
/// When enabled, this is reported as a transport parameter.
#[derive(PartialEq, Eq, Clone, Copy, Debug, Default)]
pub(crate) struct Role {
pub(crate) send_reports: bool,
pub(crate) receive_reports: bool,
}

impl TryFrom<VarInt> for Role {
type Error = Error;

fn try_from(value: VarInt) -> Result<Self, Self::Error> {
let mut role = Self::default();
match value.0 {
0 => role.send_reports = true,
1 => role.receive_reports = true,
2 => {
role.send_reports = true;
role.receive_reports = true;
}
_ => return Err(Error::IllegalValue),
}

Ok(role)
}
}

impl Role {
pub(crate) fn from_transport_parameter(
len: usize,
role: &Role,
r: &mut impl bytes::Buf,
) -> Result<Self, Error> {
if !role.is_disabled() {
// duplicate parameter
return Err(Error::Malformed);
}
let value: VarInt = r.get()?;
if len != value.size() {
return Err(Error::Malformed);
}

value.try_into()
}
/// Whether address discovery is disabled.
pub(crate) fn is_disabled(&self) -> bool {
!self.receive_reports && !self.send_reports
}

/// Whether this peer should report observed addresses to the other peer.
pub(crate) fn should_report(&self, other: &Self) -> bool {
self.send_reports && other.receive_reports
}

/// Gives the [`VarInt`] representing this [`Role`] as a transport parameter.
pub(crate) fn as_transport_parameter(&self) -> Option<VarInt> {
match (self.send_reports, self.receive_reports) {
(true, true) => Some(VarInt(2)),
(true, false) => Some(VarInt(0)),
(false, true) => Some(VarInt(1)),
(false, false) => None,
}
}
}
31 changes: 30 additions & 1 deletion quinn-proto/src/config/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{fmt, sync::Arc};

use crate::{congestion, Duration, VarInt, VarIntBoundsExceeded, INITIAL_MTU, MAX_UDP_PAYLOAD};
use crate::{
address_discovery, congestion, Duration, VarInt, VarIntBoundsExceeded, INITIAL_MTU,
MAX_UDP_PAYLOAD,
};

/// Parameters governing the core QUIC state machine
///
Expand Down Expand Up @@ -43,6 +46,8 @@ pub struct TransportConfig {
pub(crate) congestion_controller_factory: Arc<dyn congestion::ControllerFactory + Send + Sync>,

pub(crate) enable_segmentation_offload: bool,

pub(crate) address_discovery_role: crate::address_discovery::Role,
}

impl TransportConfig {
Expand Down Expand Up @@ -314,6 +319,26 @@ impl TransportConfig {
self.enable_segmentation_offload = enabled;
self
}

/// Whether to send observed address reports to peers.
///
/// This will aid peers in inferring their reachable address, which in most NATd networks
/// will not be easily available to them.
pub fn send_observed_address_reports(&mut self, enabled: bool) -> &mut Self {
self.address_discovery_role.send_reports = enabled;
self
}

/// Whether to receive observed address reports from other peers.
///
/// Peers with the address discovery extension enabled that are willing to provide observed
/// address reports will do so if this transport parameter is set. In general, observed address
/// reports cannot be trusted. This, however, can aid the current endpoint in inferring its
/// reachable address, which in most NATd networks will not be easily available.
pub fn receive_observed_address_reports(&mut self, enabled: bool) -> &mut Self {
self.address_discovery_role.receive_reports = enabled;
self
}
}

impl Default for TransportConfig {
Expand Down Expand Up @@ -354,6 +379,8 @@ impl Default for TransportConfig {
congestion_controller_factory: Arc::new(congestion::CubicConfig::default()),

enable_segmentation_offload: true,

address_discovery_role: address_discovery::Role::default(),
}
}
}
Expand Down Expand Up @@ -385,6 +412,7 @@ impl fmt::Debug for TransportConfig {
deterministic_packet_numbers: _,
congestion_controller_factory: _,
enable_segmentation_offload,
address_discovery_role,
} = self;
fmt.debug_struct("TransportConfig")
.field("max_concurrent_bidi_streams", max_concurrent_bidi_streams)
Expand Down Expand Up @@ -412,6 +440,7 @@ impl fmt::Debug for TransportConfig {
.field("datagram_send_buffer_size", datagram_send_buffer_size)
// congestion_controller_factory not debug
.field("enable_segmentation_offload", enable_segmentation_offload)
.field("address_discovery_role", address_discovery_role)
.finish_non_exhaustive()
}
}
Expand Down
126 changes: 121 additions & 5 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
coding::BufMutExt,
config::{ServerConfig, TransportConfig},
crypto::{self, KeyPair, Keys, PacketKey},
frame::{self, Close, Datagram, FrameStruct},
frame::{self, Close, Datagram, FrameStruct, ObservedAddr},
packet::{
FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
PacketNumber, PartialDecode, SpaceId,
Expand Down Expand Up @@ -222,6 +222,12 @@ pub struct Connection {
/// no outgoing application data.
app_limited: bool,

//
// ObservedAddr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably Address discovery would be a better section divider comment

//
/// Sequence number for the next observed address frame sent to the peer.
next_observed_addr_seq_no: VarInt,

streams: StreamsState,
/// Surplus remote CIDs for future use on new paths
rem_cids: CidQueue,
Expand Down Expand Up @@ -336,6 +342,8 @@ impl Connection {
receiving_ecn: false,
total_authed_packets: 0,

next_observed_addr_seq_no: 0u32.into(),

streams: StreamsState::new(
side,
config.max_concurrent_uni_streams,
Expand Down Expand Up @@ -2633,6 +2641,9 @@ impl Connection {
let mut close = None;
let payload_len = payload.len();
let mut ack_eliciting = false;
// if this packet triggers a path migration and includes a observed address frame, it's
// stored here
let mut migration_observed_addr = None;
for result in frame::Iter::new(payload)? {
let frame = result?;
let span = match frame {
Expand Down Expand Up @@ -2676,7 +2687,8 @@ impl Connection {
Frame::Padding
| Frame::PathChallenge(_)
| Frame::PathResponse(_)
| Frame::NewConnectionId(_) => {}
| Frame::NewConnectionId(_)
| Frame::ObservedAddr(_) => {}
_ => {
is_probing_packet = false;
}
Expand Down Expand Up @@ -2904,6 +2916,33 @@ impl Connection {
self.discard_space(now, SpaceId::Handshake);
}
}
Frame::ObservedAddr(observed) => {
// check if params allows the peer to send report and this node to receive it
if !self
.peer_params
.address_discovery_role
.should_report(&self.config.address_discovery_role)
{
return Err(TransportError::PROTOCOL_VIOLATION(
"received OBSERVED_ADDRESS frame when not negotiated",
));
Comment on lines +2925 to +2928
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we yield this error from within should_report() instead? (Maybe also pass in packet.header.space()?)

}
// must only be sent in data space
if packet.header.space() != SpaceId::Data {
return Err(TransportError::PROTOCOL_VIOLATION(
"OBSERVED_ADDRESS frame outside data space",
));
}

if remote == self.path.remote {
if let Some(updated) = self.path.update_observed_addr_report(observed) {
self.events.push_back(Event::ObservedAddr(updated));
}
} else {
// include in migration
migration_observed_addr = Some(observed)
Comment on lines +2942 to +2943
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a special case?

}
}
}
}

Expand Down Expand Up @@ -2940,7 +2979,7 @@ impl Connection {
server_config.migration,
"migration-initiating packets should have been dropped immediately"
);
self.migrate(now, remote);
self.migrate(now, remote, migration_observed_addr);
// Break linkability, if possible
self.update_rem_cid();
self.spin = false;
Expand All @@ -2949,7 +2988,7 @@ impl Connection {
Ok(())
}

fn migrate(&mut self, now: Instant, remote: SocketAddr) {
fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option<ObservedAddr>) {
trace!(%remote, "migration initiated");
// Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
// Note that the congestion window will not grow until validation terminates. Helps mitigate
Expand All @@ -2969,6 +3008,12 @@ impl Connection {
&self.config,
)
};
new_path.last_observed_addr_report = self.path.last_observed_addr_report;
if let Some(report) = observed_addr {
if let Some(updated) = new_path.update_observed_addr_report(report) {
self.events.push_back(Event::ObservedAddr(updated));
}
}
new_path.challenge = Some(self.rng.gen());
new_path.challenge_pending = true;
let prev_pto = self.pto(SpaceId::Data);
Expand Down Expand Up @@ -3053,6 +3098,53 @@ impl Connection {
self.stats.frame_tx.handshake_done.saturating_add(1);
}

// OBSERVED_ADDR
let mut send_observed_address =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a helper method on Connection? Large inline lambdas aren't great for readability.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the only reason for this to be a lambda is because of the short-circuit. I think this can be refactored from:

let do_thing = |a, b, c, d, e| {
    if (!should_do_thing) {
        return;
    }
    do_stuff....
}
do_thing(a, b, c, d, e);

To just:

if (should_do_thing) {
    do_stuff...
}

It would be equally indented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have loved this, and that's the first thing I tried. But since the self.space is used thought the code rust is not able to tell that the new function modifies disjoint fields so it believes we are trying to get two mutable references to self at the same time

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name should reflect that it might not do anything.

Suggested change
let mut send_observed_address =
let mut maybe_send_observed_address =

|space_id: SpaceId,
buf: &mut Vec<u8>,
max_size: usize,
space: &mut PacketSpace,
sent: &mut SentFrames,
stats: &mut ConnectionStats,
skip_sent_check: bool| {
// should only be sent within Data space and only if allowed by extension
// negotiation
// send is also skipped if the path has already sent an observed address
let send_allowed = self
.config
.address_discovery_role
.should_report(&self.peer_params.address_discovery_role);
let send_required =
space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having a bool to skip these checks in all but one case, lift the checks out to that one case.

if space_id != SpaceId::Data || !send_allowed || !send_required {
return;
}

let observed =
frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no);

if buf.len() + observed.size() < max_size {
observed.write(buf);

self.next_observed_addr_seq_no =
self.next_observed_addr_seq_no.saturating_add(1u8);
self.path.observed_addr_sent = true;

stats.frame_tx.observed_addr += 1;
sent.retransmits.get_or_create().observed_addr = true;
space.pending.observed_addr = false;
Comment on lines +3127 to +3135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere in this block it should log that this frame is being written into the packet, to be consistent with the rest of the logging:

trace!(?observed, "OBSERVED_ADDRESS");

}
};
send_observed_address(
space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
false,
);

// PING
if mem::replace(&mut space.ping_pending, false) {
trace!("PING");
Expand Down Expand Up @@ -3122,7 +3214,16 @@ impl Connection {
trace!("PATH_CHALLENGE {:08x}", token);
buf.write(frame::FrameType::PATH_CHALLENGE);
buf.write(token);
self.stats.frame_tx.path_challenge += 1;

send_observed_address(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know this doesn't result in redundant frames being included in a single packet? Would it be easier to make the decision to include this frame in exactly one place?

space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
true,
);
}
}

Expand All @@ -3135,6 +3236,19 @@ impl Connection {
buf.write(frame::FrameType::PATH_RESPONSE);
buf.write(token);
self.stats.frame_tx.path_response += 1;

// NOTE: this is technically not required but might be useful to ride the
// request/response nature of path challenges to refresh an observation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is refreshing an observation useful, given that we don't believe the path has changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why we added this is that we need to refresh the observation when the path (our end, their end) changes. We can identify that their end changed but I don't think we can identify 100% of the cases when there is a local change. Since the frame is cheap this seemed useful to keep information as up to date as possible

// Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter whether PATH_RESPONSE is a probing frame?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be mistaken since I need to refresh my memory about the spec, but: the reason was to ensure this was sent on probing packets only

send_observed_address(
space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
true,
);
}
}

Expand Down Expand Up @@ -3838,6 +3952,8 @@ pub enum Event {
DatagramReceived,
/// One or more application datagrams have been sent after blocking
DatagramsUnblocked,
/// Received an observation of our external address from the peer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly document whether this is necessarily different from the previous one, and the required configuration and peer support.

ObservedAddr(SocketAddr),
}

fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
Expand Down
Loading
Loading