Skip to content

Learning Pingora 04 - Establish L4 Connection

Published: at 01:16
Updated: at 01:22

现在我们有了想要连接的 Peer,接下来要做的就是连接的建立了。在 Pingora 中,目前一共实现了三种协议:

我们也会按照这个顺序依次介绍。这一篇中我们先来看看这三种协议中最底层的:TCP(L4) 连接。

ToC

目录结构

Pingora 中,协议相关的内容都定义在 pingora-core::protocol 下而和协议对应的连接则定义在 pingora-core::connector 下。以 L4 连接为例,我们主要研究的文件结构就是:

l4connect 方法接收 Peerbind_toSocketAddr 两个参数,最终返回 L4Stream

connect 同时支持 InetUnix Domain Socket。我们从 Inet 下手,来理一下整个连接的基本流程。Unix Domain Socket 部分的实现大同小异,而且整体来看比 Inet 简单,这里就不展开赘述了。

连接部分的代码如下所示:

pingora-core/src/connectors/l4.rs
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
P: Peer + Send + Sync,
{
5 collapsed lines
if peer.get_proxy().is_some() {
return proxy_connect(peer)
.await
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref());
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
if let Some(ka) = peer.tcp_keepalive() {
debug!("Setting tcp keepalive");
set_tcp_keepalive(&socket, ka)?;
}
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
SocketAddr::Unix(addr) => {
26 collapsed lines
let connect_future = connect_uds(
addr.as_pathname()
.expect("non-pathname unix sockets not supported as peer"),
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
// no SO_KEEPALIVE for UDS
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
}?;
let tracer = peer.get_tracer();
if let Some(t) = tracer {
t.0.on_connected();
stream.tracer = Some(t);
}
stream.set_nodelay()?;
let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
digest
.peer_addr
.set(Some(peer_addr.clone()))
.expect("newly created OnceCell must be empty");
stream.set_socket_digest(digest);
Ok(stream)
}

跟随上文代码高亮的顺序,可以基本整理出 l4 连接的建立逻辑,如下:

  1. 首先是连接的建立本身。这里调用了 tcp_connect,也就是 l4::ext::connect 进行了连接。
  2. 然后是 tcp_keepalive。如果 Peer 需要设置这一选项,则通过 l4::ext::set_tcp_keepalive 进行设置。
  3. 最后是 set_nodelay。由于 Pingora 通过 Tokio 在用户态实现了写缓存,因此这里需要将 Nagle's algorithm 禁用。

tcp_connect

l4::ext::connect 的连接过程基本和 Tokio 中的连接相似。区别是增加了 bind_to 的实现。

pingora-core/src/protocols/l4/ext.rs
/*
* this extension is needed until the following are addressed
* https://github.com/tokio-rs/tokio/issues/1543
* https://github.com/tokio-rs/mio/issues/1257
* https://github.com/tokio-rs/mio/issues/1211
*/
/// connect() to the given address while optionally bind to the specific source address
///
/// `IP_BIND_ADDRESS_NO_PORT` is used.
pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
.or_err(SocketError, "failed to create socket")?;
if cfg!(target_os = "linux") {
ip_bind_addr_no_port(socket.as_raw_fd(), true)
.or_err(SocketError, "failed to set socket opts")?;
if let Some(baddr) = bind_to {
socket
.bind(*baddr)
.or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?;
};
}
// TODO: add support for bind on other platforms
socket
.connect(*addr)
.await
.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))
}

首先在 [1]Pingora 调用了 ip_bind_addr_no_port,要求操作系统不自动分配端口。

pingora-core/src/protocols/l4/ext.rs
#[cfg(target_os = "linux")]
fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> {
const IP_BIND_ADDRESS_NO_PORT: i32 = 24;
set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int)
}
#[cfg(not(target_os = "linux"))]
fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> {
Ok(())
}

Pingora 目前只在 Linux 上正确实现了 ip_bind_addr_no_port,其他系统下只有 dummy 实现。对于 LinuxPingora 通过 setsockopt 设置了 IP_BIND_ADDRESS_NO_PORT 属性。

在确保 OS 没有自动分配 port 之后,Pingora[2]bind 手动绑定了 bind_to 所要求的 SocketAddr

Stream

l4::ext::connect 成功后,我们得到的是一个 tokio::net::tcp::TcpStream,而 connect 需要返回的却是 Stream 类型。将 TcpStream 转化为 Stream 则是 protocol::l4::Stream 中定义的了。

Pingora 定义的 l4::Stream 是一个带读写缓存的 BufStream,其中写缓存可以选择关闭。如下代码所示:

pingora-core/src/protocols/l4/stream.rs
/// A concrete type for transport layer connection + extra fields for logging
#[derive(Debug)]
pub struct Stream {
stream: BufStream<RawStream>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
socket_digest: Option<Arc<SocketDigest>>,
/// When this connection is established
pub established_ts: SystemTime,
/// The distributed tracing object for this stream
pub tracer: Option<Tracer>,
}
impl Stream {
/// set TCP nodelay for this connection if `self` is TCP
pub fn set_nodelay(&mut self) -> Result<()> {
if let RawStream::Tcp(s) = &self.stream.get_ref() {
s.set_nodelay(true)
.or_err(ConnectError, "failed to set_nodelay")?;
}
Ok(())
}
}

TcpStream -> Stream

Pingora 实现了 From,因此 TcpStreamUnixStream 可以通过 into() 转化成 Stream

pingora-core/src/protocols/l4/stream.rs
// Large read buffering helps reducing syscalls with little trade-off
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
const BUF_READ_SIZE: usize = 64 * 1024;
// Small write buf to match MSS. Too large write buf delays real time communication.
// This buffering effectively implements something similar to Nagle's algorithm.
// The benefit is that user space can control when to flush, where Nagle's can't be controlled.
// And userspace buffering reduce both syscalls and small packets.
const BUF_WRITE_SIZE: usize = 1460;
// NOTE: with writer buffering, users need to call flush() to make sure the data is actually
// sent. Otherwise data could be stuck in the buffer forever or get lost when stream is closed.
25 collapsed lines
/// A concrete type for transport layer connection + extra fields for logging
#[derive(Debug)]
pub struct Stream {
stream: BufStream<RawStream>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
socket_digest: Option<Arc<SocketDigest>>,
/// When this connection is established
pub established_ts: SystemTime,
/// The distributed tracing object for this stream
pub tracer: Option<Tracer>,
}
impl Stream {
/// set TCP nodelay for this connection if `self` is TCP
pub fn set_nodelay(&mut self) -> Result<()> {
if let RawStream::Tcp(s) = &self.stream.get_ref() {
s.set_nodelay(true)
.or_err(ConnectError, "failed to set_nodelay")?;
}
Ok(())
}
}
impl From<TcpStream> for Stream {
fn from(s: TcpStream) -> Self {
Stream {
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
}
}
}
impl From<UnixStream> for Stream {
10 collapsed lines
fn from(s: UnixStream) -> Self {
Stream {
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
}
}
}

这里比较关键的是两个常量的选值:

  1. BUF_READ_SIZE: 这个值是 64k。由于 TLS record size 是 16k,因此这里通过 64k 对齐,减少 syscall 的数量。
  2. BUF_WRITE_SIZE: 这个值是 1460,和 MSS 匹配,通过 Tokio 在用户态的写缓存实现,替换掉内核态的 Nagle's algorithm

Stream::drop

Streamtrait Drop 实现也非常有趣。我们看看:

pingora-core/src/protocols/l4/stream.rs
impl Drop for Stream {
fn drop(&mut self) {
if let Some(t) = self.tracer.as_ref() {
t.0.on_disconnected();
}
/* use nodelay/local_addr function to detect socket status */
let ret = match &self.stream.get_ref() {
RawStream::Tcp(s) => s.nodelay().err(),
RawStream::Unix(s) => s.local_addr().err(),
};
if let Some(e) = ret {
match e.kind() {
tokio::io::ErrorKind::Other => {
if let Some(ecode) = e.raw_os_error() {
if ecode == 9 {
// Or we could panic here
error!("Crit: socket {:?} is being double closed", self.stream);
}
}
}
_ => {
debug!("Socket is already broken {:?}", e);
}
}
} else {
// try flush the write buffer. We use now_or_never() because
// 1. Drop cannot be async
// 2. write should usually be ready, unless the buf is full.
let _ = self.flush().now_or_never();
}
debug!("Dropping socket {:?}", self.stream);
}
}

[1]Pingora 分别通过 nodelaylocal_addr 尝试获取 Stream 的状态。而在 [2],当 Stream 仍然可写时,则调用了自身的 flush(),并且通过 now_or_never() 尝试立即将缓存清空。

Stream::poll_write

另一个比较有趣的点是 Pingora 实现绕过写缓存的方式:

pingora-core/src/protocols/l4/stream.rs
impl AsyncWrite for Stream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.buffer_write {
Pin::new(&mut self.stream).poll_write(cx, buf)
} else {
Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf)
}
}
28 collapsed lines
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
if self.buffer_write {
Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
} else {
Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs)
}
}
fn is_write_vectored(&self) -> bool {
if self.buffer_write {
self.stream.is_write_vectored() // it is true
} else {
self.stream.get_ref().is_write_vectored()
}
}
}

在需要 buffer_write 时,还是直接通过 self.steram 调用 poll_write 即可。但对于不希望使用缓存的场景,则是通过 get_mut() 直接拿到了 underlying I/O object。而向 underlying stream 的写入自然就跳过 BufStream 的缓存了。

基于 HTTP 隧道的 L4 代理

在上文描述 l4::connect 的实现中,我们省略了 proxy_connect 的部分。代理连接部分代码如下所示:

pingora-core/src/connectors/l4.rs
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
P: Peer + Send + Sync,
{
if peer.get_proxy().is_some() {
return proxy_connect(peer)
.await
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {

Pingora 中,L4 的代理是通过 HTTP 隧道实现的。简单来说,就是 Proxy Server 通过 Unix Domain Socket 启动一个 HTTP 1.1HTTP Proxy,然后 Proxy Client(也就是 Pingora)通过 CONNECT 请求连接到指定的远端 ip:port。完整代码如下所示:

pingora-core/src/connectors/l4.rs
async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {
// safe to unwrap
let proxy = peer.get_proxy().unwrap();
let options = peer.get_peer_options().unwrap();
// combine required and optional headers
let mut headers = proxy
.headers
.iter()
.chain(options.extra_proxy_headers.iter());
// not likely to timeout during connect() to UDS
let stream: Box<Stream> = Box::new(
connect_uds(&proxy.next_hop)
.await
.or_err_with(ConnectError, || {
format!("CONNECT proxy connect() error to {:?}", &proxy.next_hop)
})?
.into(),
);
let req_header = raw_connect::generate_connect_header(&proxy.host, proxy.port, &mut headers)?;
let fut = raw_connect::connect(stream, &req_header);
let (mut stream, digest) = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, fut)
.await
.explain_err(ConnectTimedout, |_| "establishing CONNECT proxy")?,
None => fut.await,
}
.map_err(|mut e| {
// http protocol may ask to retry if reused client
e.retry.decide_reuse(false);
e
})?;
debug!("CONNECT proxy established: {:?}", proxy);
stream.set_proxy_digest(digest);
let stream = stream.into_any().downcast::<Stream>().unwrap(); // safe, it is Stream from above
Ok(*stream)
}

整个过程分为基本三部分:

  1. 首先在 [1]Pingora 通过 connect_uds() 连接了指定的 Unix Domain Socket 路径。
  2. 然后在 [2],通过 raw_connect::generate_connect_header() 生成了 req_header
  3. 最后在 [3],通过 raw_connect::connect() 建连。

我们一步一步来看。

connect_uds

连接 Unix Domain Socket 很简单,直接调 tokioconnect() 方法就可以了:

pingora-core/src/protocols/l4/ext.rs
/// connect() to the given Unix domain socket
pub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> {
UnixStream::connect(path)
.await
.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", path.display())))
}

raw_connect::generate_connect_header

接下来是生成 ReqHeader 的过程。

pingora-core/src/protocol/raw_connect.rs
/// Generate the CONNECT header for the given destination
pub fn generate_connect_header<'a, H, S>(
host: &str,
port: u16,
headers: H,
) -> Result<Box<ReqHeader>>
where
S: AsRef<[u8]>,
H: Iterator<Item = (S, &'a Vec<u8>)>,
{
// TODO: valid that host doesn't have port
// TODO: support adding ad-hoc headers
let authority = format!("{host}:{port}");
let req = http::request::Builder::new()
.version(http::Version::HTTP_11)
.method(http::method::Method::CONNECT)
.uri(format!("https://{authority}/")) // scheme doesn't matter
.header(http::header::HOST, &authority);
let (mut req, _) = match req.body(()) {
Ok(r) => r.into_parts(),
Err(e) => {
return Err(e).or_err(InvalidHTTPHeader, "Invalid CONNECT request");
}
};
for (k, v) in headers {
let header_name = http::header::HeaderName::from_bytes(k.as_ref())
.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;
let header_value = http::header::HeaderValue::from_bytes(v.as_slice())
.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;
req.headers.insert(header_name, header_value);
}
Ok(Box::new(req))
}

这里 Pingora 通过 http crateBuilder 构造了一个合法的 HTTP CONNECT Request,并填充了 PingoraPeer 中指定的 Proxy Header

raw_connect::connect

万事就绪,最后只要建立连接就可以了。

pingora-core/src/protocol/raw_connect.rs
/// Try to establish a CONNECT proxy via the given `stream`.
///
/// `request_header` should include the necessary request headers for the CONNECT protocol.
///
/// When successful, a [`Stream`] will be returned which is the established CONNECT proxy connection.
pub async fn connect(stream: Stream, request_header: &ReqHeader) -> Result<(Stream, ProxyDigest)> {
let mut http = HttpSession::new(stream);
// We write to stream directly because HttpSession doesn't write req header in auth form
let to_wire = http_req_header_to_wire_auth_form(request_header);
http.underlying_stream
.write_all(to_wire.as_ref())
.await
.or_err(WriteError, "while writing request headers")?;
http.underlying_stream
.flush()
.await
.or_err(WriteError, "while flushing request headers")?;
// TODO: set http.read_timeout
let resp_header = http.read_resp_header_parts().await?;
Ok((
http.underlying_stream,
validate_connect_response(resp_header)?,
))
}

整个连接过程分三个部分。首先,在 [1]request_header 通过 http_req_header_to_wire_auth_form 转换为 Buffer

pingora-core/src/protocol/raw_connect.rs
#[inline]
fn http_req_header_to_wire_auth_form(req: &ReqHeader) -> BytesMut {
let mut buf = BytesMut::with_capacity(512);
// Request-Line
let method = req.method.as_str().as_bytes();
buf.put_slice(method);
buf.put_u8(b' ');
// NOTE: CONNECT doesn't need URI path so we just skip that
if let Some(path) = req.uri.authority() {
buf.put_slice(path.as_str().as_bytes());
}
buf.put_u8(b' ');
let version = match req.version {
Version::HTTP_09 => "HTTP/0.9",
Version::HTTP_10 => "HTTP/1.0",
Version::HTTP_11 => "HTTP/1.1",
_ => "HTTP/0.9",
};
buf.put_slice(version.as_bytes());
buf.put_slice(CRLF);
// headers
let headers = &req.headers;
for (key, value) in headers.iter() {
buf.put_slice(key.as_ref());
buf.put_slice(HEADER_KV_DELIMITER);
buf.put_slice(value.as_ref());
buf.put_slice(CRLF);
}
buf.put_slice(CRLF);
buf
}

然后,在 [2] 读取并校验 HTTP Server 的返回值、Header 是否合法:

pingora-core/src/protocol/raw_connect.rs
#[inline]
fn validate_connect_response(resp: Box<ResponseHeader>) -> Result<ProxyDigest> {
if !resp.status.is_success() {
return Error::e_because(
ConnectProxyFailure,
"None 2xx code",
ConnectProxyError::boxed_new(resp),
);
}
// Checking Content-Length and Transfer-Encoding is optional because we already ignore them.
// We choose to do so because we want to be strict for internal use of CONNECT.
// Ignore Content-Length header because our internal CONNECT server is coded to send it.
if resp.headers.get(http::header::TRANSFER_ENCODING).is_some() {
return Error::e_because(
ConnectProxyFailure,
"Invalid Transfer-Encoding presents",
ConnectProxyError::boxed_new(resp),
);
}
Ok(ProxyDigest::new(resp))
}

最后在 [3],将 httpunderlying_stream 返回。至此,TCP 连接已经建立,Pingora 也就可以在这条通道中继续进行 TCP 通信了。


Previous Post
Learning Pingora 03 - Upstreams and Peers
Next Post
How To Blog 01: Why, How, and the Future