现在我们有了想要连接的 Peer
,接下来要做的就是连接的建立了。在 Pingora
中,目前一共实现了三种协议:
- TCP(L4)
- SSL
- HTTP
我们也会按照这个顺序依次介绍。这一篇中我们先来看看这三种协议中最底层的:TCP(L4)
连接。
ToC
目录结构
在 Pingora
中,协议相关的内容都定义在 pingora-core::protocol
下而和协议对应的连接则定义在 pingora-core::connector
下。以 L4
连接为例,我们主要研究的文件结构就是:
- connectors/
- l4.rs
- protocols/l4/
- ext.rs
- socket.rs
- stream.rs
Link Start!
l4
的 connect
方法接收 Peer
和 bind_to
的 SocketAddr
两个参数,最终返回 L4
的 Stream
。
connect
同时支持 Inet
和 Unix Domain Socket
。我们从 Inet
下手,来理一下整个连接的基本流程。Unix Domain Socket
部分的实现大同小异,而且整体来看比 Inet
简单,这里就不展开赘述了。
连接部分的代码如下所示:
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>where P: Peer + Send + Sync,{5 collapsed lines
if peer.get_proxy().is_some() { return proxy_connect(peer) .await .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } let peer_addr = peer.address(); let mut stream: Stream = match peer_addr { SocketAddr::Inet(addr) => { let connect_future = tcp_connect(addr, bind_to.as_ref()); let conn_res = match peer.connection_timeout() { Some(t) => pingora_timeout::timeout(t, connect_future) .await .explain_err(ConnectTimedout, |_| { format!("timeout {t:?} connecting to server {peer}") })?, None => connect_future.await, }; match conn_res { Ok(socket) => { debug!("connected to new server: {}", peer.address()); if let Some(ka) = peer.tcp_keepalive() { debug!("Setting tcp keepalive"); set_tcp_keepalive(&socket, ka)?; } Ok(socket.into()) } Err(e) => { let c = format!("Fail to connect to {peer}"); match e.etype() { SocketError | BindError => Error::e_because(InternalError, c, e), _ => Err(e.more_context(c)), } } } } SocketAddr::Unix(addr) => {26 collapsed lines
let connect_future = connect_uds( addr.as_pathname() .expect("non-pathname unix sockets not supported as peer"), ); let conn_res = match peer.connection_timeout() { Some(t) => pingora_timeout::timeout(t, connect_future) .await .explain_err(ConnectTimedout, |_| { format!("timeout {t:?} connecting to server {peer}") })?, None => connect_future.await, }; match conn_res { Ok(socket) => { debug!("connected to new server: {}", peer.address()); // no SO_KEEPALIVE for UDS Ok(socket.into()) } Err(e) => { let c = format!("Fail to connect to {peer}"); match e.etype() { SocketError | BindError => Error::e_because(InternalError, c, e), _ => Err(e.more_context(c)), } } } } }?; let tracer = peer.get_tracer(); if let Some(t) = tracer { t.0.on_connected(); stream.tracer = Some(t); }
stream.set_nodelay()?;
let digest = SocketDigest::from_raw_fd(stream.as_raw_fd()); digest .peer_addr .set(Some(peer_addr.clone())) .expect("newly created OnceCell must be empty"); stream.set_socket_digest(digest);
Ok(stream)}
跟随上文代码高亮的顺序,可以基本整理出 l4
连接的建立逻辑,如下:
- 首先是连接的建立本身。这里调用了
tcp_connect
,也就是l4::ext::connect
进行了连接。 - 然后是
tcp_keepalive
。如果Peer
需要设置这一选项,则通过l4::ext::set_tcp_keepalive
进行设置。 - 最后是
set_nodelay
。由于Pingora
通过Tokio
在用户态实现了写缓存,因此这里需要将Nagle's algorithm
禁用。
tcp_connect
l4::ext::connect
的连接过程基本和 Tokio
中的连接相似。区别是增加了 bind_to
的实现。
/* * this extension is needed until the following are addressed * https://github.com/tokio-rs/tokio/issues/1543 * https://github.com/tokio-rs/mio/issues/1257 * https://github.com/tokio-rs/mio/issues/1211 *//// connect() to the given address while optionally bind to the specific source address////// `IP_BIND_ADDRESS_NO_PORT` is used.pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> { let socket = if addr.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() } .or_err(SocketError, "failed to create socket")?;
if cfg!(target_os = "linux") { ip_bind_addr_no_port(socket.as_raw_fd(), true) .or_err(SocketError, "failed to set socket opts")?;
if let Some(baddr) = bind_to { socket .bind(*baddr) .or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?; }; } // TODO: add support for bind on other platforms
socket .connect(*addr) .await .map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))}
首先在 [1]
,Pingora
调用了 ip_bind_addr_no_port
,要求操作系统不自动分配端口。
#[cfg(target_os = "linux")]fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> { const IP_BIND_ADDRESS_NO_PORT: i32 = 24;
set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int)}
#[cfg(not(target_os = "linux"))]fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> { Ok(())}
Pingora
目前只在 Linux
上正确实现了 ip_bind_addr_no_port
,其他系统下只有 dummy 实现。对于 Linux
,Pingora
通过 setsockopt
设置了 IP_BIND_ADDRESS_NO_PORT
属性。
在确保 OS
没有自动分配 port
之后,Pingora
在 [2]
用 bind
手动绑定了 bind_to
所要求的 SocketAddr
。
Stream
在 l4::ext::connect
成功后,我们得到的是一个 tokio::net::tcp::TcpStream
,而 connect
需要返回的却是 Stream
类型。将 TcpStream
转化为 Stream
则是 protocol::l4::Stream
中定义的了。
Pingora
定义的 l4::Stream
是一个带读写缓存的 BufStream
,其中写缓存可以选择关闭。如下代码所示:
/// A concrete type for transport layer connection + extra fields for logging#[derive(Debug)]pub struct Stream { stream: BufStream<RawStream>, buffer_write: bool, proxy_digest: Option<Arc<ProxyDigest>>, socket_digest: Option<Arc<SocketDigest>>, /// When this connection is established pub established_ts: SystemTime, /// The distributed tracing object for this stream pub tracer: Option<Tracer>,}
impl Stream { /// set TCP nodelay for this connection if `self` is TCP pub fn set_nodelay(&mut self) -> Result<()> { if let RawStream::Tcp(s) = &self.stream.get_ref() { s.set_nodelay(true) .or_err(ConnectError, "failed to set_nodelay")?; } Ok(()) }}
TcpStream -> Stream
Pingora
实现了 From
,因此 TcpStream
和 UnixStream
可以通过 into()
转化成 Stream
:
// Large read buffering helps reducing syscalls with little trade-off// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.const BUF_READ_SIZE: usize = 64 * 1024;// Small write buf to match MSS. Too large write buf delays real time communication.// This buffering effectively implements something similar to Nagle's algorithm.// The benefit is that user space can control when to flush, where Nagle's can't be controlled.// And userspace buffering reduce both syscalls and small packets.const BUF_WRITE_SIZE: usize = 1460;
// NOTE: with writer buffering, users need to call flush() to make sure the data is actually// sent. Otherwise data could be stuck in the buffer forever or get lost when stream is closed.25 collapsed lines
/// A concrete type for transport layer connection + extra fields for logging#[derive(Debug)]pub struct Stream { stream: BufStream<RawStream>, buffer_write: bool, proxy_digest: Option<Arc<ProxyDigest>>, socket_digest: Option<Arc<SocketDigest>>, /// When this connection is established pub established_ts: SystemTime, /// The distributed tracing object for this stream pub tracer: Option<Tracer>,}
impl Stream { /// set TCP nodelay for this connection if `self` is TCP pub fn set_nodelay(&mut self) -> Result<()> { if let RawStream::Tcp(s) = &self.stream.get_ref() { s.set_nodelay(true) .or_err(ConnectError, "failed to set_nodelay")?; } Ok(()) }}
impl From<TcpStream> for Stream { fn from(s: TcpStream) -> Self { Stream { stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)), buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, socket_digest: None, tracer: None, } }}
impl From<UnixStream> for Stream {10 collapsed lines
fn from(s: UnixStream) -> Self { Stream { stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)), buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, socket_digest: None, tracer: None, } }}
这里比较关键的是两个常量的选值:
BUF_READ_SIZE
: 这个值是 64k。由于TLS record size
是 16k,因此这里通过64k
对齐,减少syscall
的数量。BUF_WRITE_SIZE
: 这个值是 1460,和MSS
匹配,通过Tokio
在用户态的写缓存实现,替换掉内核态的Nagle's algorithm
。
Stream::drop
Stream
的 trait Drop
实现也非常有趣。我们看看:
impl Drop for Stream { fn drop(&mut self) { if let Some(t) = self.tracer.as_ref() { t.0.on_disconnected(); } /* use nodelay/local_addr function to detect socket status */ let ret = match &self.stream.get_ref() { RawStream::Tcp(s) => s.nodelay().err(), RawStream::Unix(s) => s.local_addr().err(), }; if let Some(e) = ret { match e.kind() { tokio::io::ErrorKind::Other => { if let Some(ecode) = e.raw_os_error() { if ecode == 9 { // Or we could panic here error!("Crit: socket {:?} is being double closed", self.stream); } } } _ => { debug!("Socket is already broken {:?}", e); } } } else { // try flush the write buffer. We use now_or_never() because // 1. Drop cannot be async // 2. write should usually be ready, unless the buf is full. let _ = self.flush().now_or_never(); } debug!("Dropping socket {:?}", self.stream); }}
在 [1]
,Pingora
分别通过 nodelay
和 local_addr
尝试获取 Stream
的状态。而在 [2]
,当 Stream
仍然可写时,则调用了自身的 flush()
,并且通过 now_or_never()
尝试立即将缓存清空。
Stream::poll_write
另一个比较有趣的点是 Pingora
实现绕过写缓存的方式:
impl AsyncWrite for Stream { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll<io::Result<usize>> { if self.buffer_write { Pin::new(&mut self.stream).poll_write(cx, buf) } else { Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf) } }28 collapsed lines
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { Pin::new(&mut self.stream).poll_flush(cx) }
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { Pin::new(&mut self.stream).poll_shutdown(cx) }
fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll<io::Result<usize>> { if self.buffer_write { Pin::new(&mut self.stream).poll_write_vectored(cx, bufs) } else { Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs) } }
fn is_write_vectored(&self) -> bool { if self.buffer_write { self.stream.is_write_vectored() // it is true } else { self.stream.get_ref().is_write_vectored() } }}
在需要 buffer_write
时,还是直接通过 self.steram
调用 poll_write
即可。但对于不希望使用缓存的场景,则是通过 get_mut()
直接拿到了 underlying I/O object
。而向 underlying stream
的写入自然就跳过 BufStream
的缓存了。
基于 HTTP 隧道的 L4 代理
在上文描述 l4::connect
的实现中,我们省略了 proxy_connect
的部分。代理连接部分代码如下所示:
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>where P: Peer + Send + Sync,{ if peer.get_proxy().is_some() { return proxy_connect(peer) .await .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } let peer_addr = peer.address(); let mut stream: Stream = match peer_addr {
在 Pingora
中,L4
的代理是通过 HTTP
隧道实现的。简单来说,就是 Proxy Server
通过 Unix Domain Socket
启动一个 HTTP 1.1
的 HTTP Proxy
,然后 Proxy Client
(也就是 Pingora
)通过 CONNECT
请求连接到指定的远端 ip:port
。完整代码如下所示:
async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> { // safe to unwrap let proxy = peer.get_proxy().unwrap(); let options = peer.get_peer_options().unwrap();
// combine required and optional headers let mut headers = proxy .headers .iter() .chain(options.extra_proxy_headers.iter());
// not likely to timeout during connect() to UDS let stream: Box<Stream> = Box::new( connect_uds(&proxy.next_hop) .await .or_err_with(ConnectError, || { format!("CONNECT proxy connect() error to {:?}", &proxy.next_hop) })? .into(), );
let req_header = raw_connect::generate_connect_header(&proxy.host, proxy.port, &mut headers)?; let fut = raw_connect::connect(stream, &req_header); let (mut stream, digest) = match peer.connection_timeout() { Some(t) => pingora_timeout::timeout(t, fut) .await .explain_err(ConnectTimedout, |_| "establishing CONNECT proxy")?, None => fut.await, } .map_err(|mut e| { // http protocol may ask to retry if reused client e.retry.decide_reuse(false); e })?; debug!("CONNECT proxy established: {:?}", proxy); stream.set_proxy_digest(digest); let stream = stream.into_any().downcast::<Stream>().unwrap(); // safe, it is Stream from above Ok(*stream)}
整个过程分为基本三部分:
- 首先在
[1]
,Pingora
通过connect_uds()
连接了指定的Unix Domain Socket
路径。 - 然后在
[2]
,通过raw_connect::generate_connect_header()
生成了req_header
。 - 最后在
[3]
,通过raw_connect::connect()
建连。
我们一步一步来看。
connect_uds
连接 Unix Domain Socket
很简单,直接调 tokio
的 connect()
方法就可以了:
/// connect() to the given Unix domain socketpub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> { UnixStream::connect(path) .await .map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", path.display())))}
raw_connect::generate_connect_header
接下来是生成 ReqHeader
的过程。
/// Generate the CONNECT header for the given destinationpub fn generate_connect_header<'a, H, S>( host: &str, port: u16, headers: H,) -> Result<Box<ReqHeader>>where S: AsRef<[u8]>, H: Iterator<Item = (S, &'a Vec<u8>)>,{ // TODO: valid that host doesn't have port // TODO: support adding ad-hoc headers
let authority = format!("{host}:{port}"); let req = http::request::Builder::new() .version(http::Version::HTTP_11) .method(http::method::Method::CONNECT) .uri(format!("https://{authority}/")) // scheme doesn't matter .header(http::header::HOST, &authority);
let (mut req, _) = match req.body(()) { Ok(r) => r.into_parts(), Err(e) => { return Err(e).or_err(InvalidHTTPHeader, "Invalid CONNECT request"); } };
for (k, v) in headers { let header_name = http::header::HeaderName::from_bytes(k.as_ref()) .or_err(InvalidHTTPHeader, "Invalid CONNECT request")?; let header_value = http::header::HeaderValue::from_bytes(v.as_slice()) .or_err(InvalidHTTPHeader, "Invalid CONNECT request")?; req.headers.insert(header_name, header_value); }
Ok(Box::new(req))}
这里 Pingora
通过 http crate
的 Builder
构造了一个合法的 HTTP CONNECT Request
,并填充了 Pingora
在 Peer
中指定的 Proxy Header
。
raw_connect::connect
万事就绪,最后只要建立连接就可以了。
/// Try to establish a CONNECT proxy via the given `stream`.////// `request_header` should include the necessary request headers for the CONNECT protocol.////// When successful, a [`Stream`] will be returned which is the established CONNECT proxy connection.pub async fn connect(stream: Stream, request_header: &ReqHeader) -> Result<(Stream, ProxyDigest)> { let mut http = HttpSession::new(stream);
// We write to stream directly because HttpSession doesn't write req header in auth form let to_wire = http_req_header_to_wire_auth_form(request_header); http.underlying_stream .write_all(to_wire.as_ref()) .await .or_err(WriteError, "while writing request headers")?; http.underlying_stream .flush() .await .or_err(WriteError, "while flushing request headers")?;
// TODO: set http.read_timeout let resp_header = http.read_resp_header_parts().await?; Ok(( http.underlying_stream, validate_connect_response(resp_header)?, ))}
整个连接过程分三个部分。首先,在 [1]
将 request_header
通过 http_req_header_to_wire_auth_form
转换为 Buffer
:
#[inline]fn http_req_header_to_wire_auth_form(req: &ReqHeader) -> BytesMut { let mut buf = BytesMut::with_capacity(512);
// Request-Line let method = req.method.as_str().as_bytes(); buf.put_slice(method); buf.put_u8(b' '); // NOTE: CONNECT doesn't need URI path so we just skip that if let Some(path) = req.uri.authority() { buf.put_slice(path.as_str().as_bytes()); } buf.put_u8(b' ');
let version = match req.version { Version::HTTP_09 => "HTTP/0.9", Version::HTTP_10 => "HTTP/1.0", Version::HTTP_11 => "HTTP/1.1", _ => "HTTP/0.9", }; buf.put_slice(version.as_bytes()); buf.put_slice(CRLF);
// headers let headers = &req.headers; for (key, value) in headers.iter() { buf.put_slice(key.as_ref()); buf.put_slice(HEADER_KV_DELIMITER); buf.put_slice(value.as_ref()); buf.put_slice(CRLF); }
buf.put_slice(CRLF); buf}
然后,在 [2]
读取并校验 HTTP Server
的返回值、Header
是否合法:
#[inline]fn validate_connect_response(resp: Box<ResponseHeader>) -> Result<ProxyDigest> { if !resp.status.is_success() { return Error::e_because( ConnectProxyFailure, "None 2xx code", ConnectProxyError::boxed_new(resp), ); }
// Checking Content-Length and Transfer-Encoding is optional because we already ignore them. // We choose to do so because we want to be strict for internal use of CONNECT. // Ignore Content-Length header because our internal CONNECT server is coded to send it. if resp.headers.get(http::header::TRANSFER_ENCODING).is_some() { return Error::e_because( ConnectProxyFailure, "Invalid Transfer-Encoding presents", ConnectProxyError::boxed_new(resp), ); } Ok(ProxyDigest::new(resp))}
最后在 [3]
,将 http
的 underlying_stream
返回。至此,TCP
连接已经建立,Pingora
也就可以在这条通道中继续进行 TCP
通信了。