现在我们有了想要连接的 Peer
,接下来要做的就是连接的建立了。在 Pingora
中,目前一共实现了三种协议:
- TCP(L4)
- SSL
- HTTP
我们也会按照这个顺序依次介绍。这一篇中我们先来看看这三种协议中最底层的:TCP(L4)
连接。
ToC
目录结构
在 Pingora
中,协议相关的内容都定义在 pingora-core::protocol
下而和协议对应的连接则定义在 pingora-core::connector
下。以 L4
连接为例,我们主要研究的文件结构就是:
- connectors/
- l4.rs
- protocols/l4/
- ext.rs
- socket.rs
- stream.rs
Link Start!
l4
的 connect
方法接收 Peer
和 bind_to
的 SocketAddr
两个参数,最终返回 L4
的 Stream
。
connect
同时支持 Inet
和 Unix Domain Socket
。我们从 Inet
下手,来理一下整个连接的基本流程。Unix Domain Socket
部分的实现大同小异,而且整体来看比 Inet
简单,这里就不展开赘述了。
连接部分的代码如下所示:
27/// Establish a connection (l4) to the given peer using its settings and an optional bind address.28pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>29where30 P: Peer + Send + Sync,31{5 collapsed lines
32 if peer.get_proxy().is_some() {33 return proxy_connect(peer)34 .await35 .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));36 }37 let peer_addr = peer.address();38 let mut stream: Stream = match peer_addr {39 SocketAddr::Inet(addr) => {40 let connect_future = tcp_connect(addr, bind_to.as_ref());41 let conn_res = match peer.connection_timeout() {42 Some(t) => pingora_timeout::timeout(t, connect_future)43 .await44 .explain_err(ConnectTimedout, |_| {45 format!("timeout {t:?} connecting to server {peer}")46 })?,47 None => connect_future.await,48 };49 match conn_res {50 Ok(socket) => {51 debug!("connected to new server: {}", peer.address());52 if let Some(ka) = peer.tcp_keepalive() {53 debug!("Setting tcp keepalive");54 set_tcp_keepalive(&socket, ka)?;55 }56 Ok(socket.into())57 }58 Err(e) => {59 let c = format!("Fail to connect to {peer}");60 match e.etype() {61 SocketError | BindError => Error::e_because(InternalError, c, e),62 _ => Err(e.more_context(c)),63 }64 }65 }66 }67 SocketAddr::Unix(addr) => {26 collapsed lines
68 let connect_future = connect_uds(69 addr.as_pathname()70 .expect("non-pathname unix sockets not supported as peer"),71 );72 let conn_res = match peer.connection_timeout() {73 Some(t) => pingora_timeout::timeout(t, connect_future)74 .await75 .explain_err(ConnectTimedout, |_| {76 format!("timeout {t:?} connecting to server {peer}")77 })?,78 None => connect_future.await,79 };80 match conn_res {81 Ok(socket) => {82 debug!("connected to new server: {}", peer.address());83 // no SO_KEEPALIVE for UDS84 Ok(socket.into())85 }86 Err(e) => {87 let c = format!("Fail to connect to {peer}");88 match e.etype() {89 SocketError | BindError => Error::e_because(InternalError, c, e),90 _ => Err(e.more_context(c)),91 }92 }93 }94 }95 }?;96 let tracer = peer.get_tracer();97 if let Some(t) = tracer {98 t.0.on_connected();99 stream.tracer = Some(t);100 }101
102 stream.set_nodelay()?;103
104 let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());105 digest106 .peer_addr107 .set(Some(peer_addr.clone()))108 .expect("newly created OnceCell must be empty");109 stream.set_socket_digest(digest);110
111 Ok(stream)112}
跟随上文代码高亮的顺序,可以基本整理出 l4
连接的建立逻辑,如下:
- 首先是连接的建立本身。这里调用了
tcp_connect
,也就是l4::ext::connect
进行了连接。 - 然后是
tcp_keepalive
。如果Peer
需要设置这一选项,则通过l4::ext::set_tcp_keepalive
进行设置。 - 最后是
set_nodelay
。由于Pingora
通过Tokio
在用户态实现了写缓存,因此这里需要将Nagle's algorithm
禁用。
tcp_connect
l4::ext::connect
的连接过程基本和 Tokio
中的连接相似。区别是增加了 bind_to
的实现。
214/*215 * this extension is needed until the following are addressed216 * https://github.com/tokio-rs/tokio/issues/1543217 * https://github.com/tokio-rs/mio/issues/1257218 * https://github.com/tokio-rs/mio/issues/1211219 */220/// connect() to the given address while optionally bind to the specific source address221///222/// `IP_BIND_ADDRESS_NO_PORT` is used.223pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {224 let socket = if addr.is_ipv4() {225 TcpSocket::new_v4()226 } else {227 TcpSocket::new_v6()228 }229 .or_err(SocketError, "failed to create socket")?;230
231 if cfg!(target_os = "linux") {232 ip_bind_addr_no_port(socket.as_raw_fd(), true)233 .or_err(SocketError, "failed to set socket opts")?;234
235 if let Some(baddr) = bind_to {236 socket237 .bind(*baddr)238 .or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?;239 };240 }241 // TODO: add support for bind on other platforms242
243 socket244 .connect(*addr)245 .await246 .map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))247}
首先在 [1]
,Pingora
调用了 ip_bind_addr_no_port
,要求操作系统不自动分配端口。
133#[cfg(target_os = "linux")]134fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> {135 const IP_BIND_ADDRESS_NO_PORT: i32 = 24;136
137 set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int)138}139
140#[cfg(not(target_os = "linux"))]141fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> {142 Ok(())143}
Pingora
目前只在 Linux
上正确实现了 ip_bind_addr_no_port
,其他系统下只有 dummy 实现。对于 Linux
,Pingora
通过 setsockopt
设置了 IP_BIND_ADDRESS_NO_PORT
属性。
在确保 OS
没有自动分配 port
之后,Pingora
在 [2]
用 bind
手动绑定了 bind_to
所要求的 SocketAddr
。
Stream
在 l4::ext::connect
成功后,我们得到的是一个 tokio::net::tcp::TcpStream
,而 connect
需要返回的却是 Stream
类型。将 TcpStream
转化为 Stream
则是 protocol::l4::Stream
中定义的了。
Pingora
定义的 l4::Stream
是一个带读写缓存的 BufStream
,其中写缓存可以选择关闭。如下代码所示:
132/// A concrete type for transport layer connection + extra fields for logging133#[derive(Debug)]134pub struct Stream {135 stream: BufStream<RawStream>,136 buffer_write: bool,137 proxy_digest: Option<Arc<ProxyDigest>>,138 socket_digest: Option<Arc<SocketDigest>>,139 /// When this connection is established140 pub established_ts: SystemTime,141 /// The distributed tracing object for this stream142 pub tracer: Option<Tracer>,143}144
145impl Stream {146 /// set TCP nodelay for this connection if `self` is TCP147 pub fn set_nodelay(&mut self) -> Result<()> {148 if let RawStream::Tcp(s) = &self.stream.get_ref() {149 s.set_nodelay(true)150 .or_err(ConnectError, "failed to set_nodelay")?;151 }152 Ok(())153 }154}
TcpStream -> Stream
Pingora
实现了 From
,因此 TcpStream
和 UnixStream
可以通过 into()
转化成 Stream
:
120// Large read buffering helps reducing syscalls with little trade-off121// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.122const BUF_READ_SIZE: usize = 64 * 1024;123// Small write buf to match MSS. Too large write buf delays real time communication.124// This buffering effectively implements something similar to Nagle's algorithm.125// The benefit is that user space can control when to flush, where Nagle's can't be controlled.126// And userspace buffering reduce both syscalls and small packets.127const BUF_WRITE_SIZE: usize = 1460;128
129// NOTE: with writer buffering, users need to call flush() to make sure the data is actually130// sent. Otherwise data could be stuck in the buffer forever or get lost when stream is closed.25 collapsed lines
131
132/// A concrete type for transport layer connection + extra fields for logging133#[derive(Debug)]134pub struct Stream {135 stream: BufStream<RawStream>,136 buffer_write: bool,137 proxy_digest: Option<Arc<ProxyDigest>>,138 socket_digest: Option<Arc<SocketDigest>>,139 /// When this connection is established140 pub established_ts: SystemTime,141 /// The distributed tracing object for this stream142 pub tracer: Option<Tracer>,143}144
145impl Stream {146 /// set TCP nodelay for this connection if `self` is TCP147 pub fn set_nodelay(&mut self) -> Result<()> {148 if let RawStream::Tcp(s) = &self.stream.get_ref() {149 s.set_nodelay(true)150 .or_err(ConnectError, "failed to set_nodelay")?;151 }152 Ok(())153 }154}155
156impl From<TcpStream> for Stream {157 fn from(s: TcpStream) -> Self {158 Stream {159 stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),160 buffer_write: true,161 established_ts: SystemTime::now(),162 proxy_digest: None,163 socket_digest: None,164 tracer: None,165 }166 }167}168
169impl From<UnixStream> for Stream {10 collapsed lines
170 fn from(s: UnixStream) -> Self {171 Stream {172 stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),173 buffer_write: true,174 established_ts: SystemTime::now(),175 proxy_digest: None,176 socket_digest: None,177 tracer: None,178 }179 }180}
这里比较关键的是两个常量的选值:
BUF_READ_SIZE
: 这个值是 64k。由于TLS record size
是 16k,因此这里通过64k
对齐,减少syscall
的数量。BUF_WRITE_SIZE
: 这个值是 1460,和MSS
匹配,通过Tokio
在用户态的写缓存实现,替换掉内核态的Nagle's algorithm
。
Stream::drop
Stream
的 trait Drop
实现也非常有趣。我们看看:
235impl Drop for Stream {236 fn drop(&mut self) {237 if let Some(t) = self.tracer.as_ref() {238 t.0.on_disconnected();239 }240 /* use nodelay/local_addr function to detect socket status */241 let ret = match &self.stream.get_ref() {242 RawStream::Tcp(s) => s.nodelay().err(),243 RawStream::Unix(s) => s.local_addr().err(),244 };245 if let Some(e) = ret {246 match e.kind() {247 tokio::io::ErrorKind::Other => {248 if let Some(ecode) = e.raw_os_error() {249 if ecode == 9 {250 // Or we could panic here251 error!("Crit: socket {:?} is being double closed", self.stream);252 }253 }254 }255 _ => {256 debug!("Socket is already broken {:?}", e);257 }258 }259 } else {260 // try flush the write buffer. We use now_or_never() because261 // 1. Drop cannot be async262 // 2. write should usually be ready, unless the buf is full.263 let _ = self.flush().now_or_never();264 }265 debug!("Dropping socket {:?}", self.stream);266 }267}
在 [1]
,Pingora
分别通过 nodelay
和 local_addr
尝试获取 Stream
的状态。而在 [2]
,当 Stream
仍然可写时,则调用了自身的 flush()
,并且通过 now_or_never()
尝试立即将缓存清空。
Stream::poll_write
另一个比较有趣的点是 Pingora
实现绕过写缓存的方式:
279impl AsyncWrite for Stream {280 fn poll_write(281 mut self: Pin<&mut Self>,282 cx: &mut Context,283 buf: &[u8],284 ) -> Poll<io::Result<usize>> {285 if self.buffer_write {286 Pin::new(&mut self.stream).poll_write(cx, buf)287 } else {288 Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf)289 }290 }28 collapsed lines
291
292 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {293 Pin::new(&mut self.stream).poll_flush(cx)294 }295
296 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {297 Pin::new(&mut self.stream).poll_shutdown(cx)298 }299
300 fn poll_write_vectored(301 mut self: Pin<&mut Self>,302 cx: &mut Context<'_>,303 bufs: &[std::io::IoSlice<'_>],304 ) -> Poll<io::Result<usize>> {305 if self.buffer_write {306 Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)307 } else {308 Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs)309 }310 }311
312 fn is_write_vectored(&self) -> bool {313 if self.buffer_write {314 self.stream.is_write_vectored() // it is true315 } else {316 self.stream.get_ref().is_write_vectored()317 }318 }319}
在需要 buffer_write
时,还是直接通过 self.steram
调用 poll_write
即可。但对于不希望使用缓存的场景,则是通过 get_mut()
直接拿到了 underlying I/O object
。而向 underlying stream
的写入自然就跳过 BufStream
的缓存了。
基于 HTTP 隧道的 L4 代理
在上文描述 l4::connect
的实现中,我们省略了 proxy_connect
的部分。代理连接部分代码如下所示:
27/// Establish a connection (l4) to the given peer using its settings and an optional bind address.28pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>29where30 P: Peer + Send + Sync,31{32 if peer.get_proxy().is_some() {33 return proxy_connect(peer)34 .await35 .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));36 }37 let peer_addr = peer.address();38 let mut stream: Stream = match peer_addr {
在 Pingora
中,L4
的代理是通过 HTTP
隧道实现的。简单来说,就是 Proxy Server
通过 Unix Domain Socket
启动一个 HTTP 1.1
的 HTTP Proxy
,然后 Proxy Client
(也就是 Pingora
)通过 CONNECT
请求连接到指定的远端 ip:port
。完整代码如下所示:
146async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {147 // safe to unwrap148 let proxy = peer.get_proxy().unwrap();149 let options = peer.get_peer_options().unwrap();150
151 // combine required and optional headers152 let mut headers = proxy153 .headers154 .iter()155 .chain(options.extra_proxy_headers.iter());156
157 // not likely to timeout during connect() to UDS158 let stream: Box<Stream> = Box::new(159 connect_uds(&proxy.next_hop)160 .await161 .or_err_with(ConnectError, || {162 format!("CONNECT proxy connect() error to {:?}", &proxy.next_hop)163 })?164 .into(),165 );166
167 let req_header = raw_connect::generate_connect_header(&proxy.host, proxy.port, &mut headers)?;168 let fut = raw_connect::connect(stream, &req_header);169 let (mut stream, digest) = match peer.connection_timeout() {170 Some(t) => pingora_timeout::timeout(t, fut)171 .await172 .explain_err(ConnectTimedout, |_| "establishing CONNECT proxy")?,173 None => fut.await,174 }175 .map_err(|mut e| {176 // http protocol may ask to retry if reused client177 e.retry.decide_reuse(false);178 e179 })?;180 debug!("CONNECT proxy established: {:?}", proxy);181 stream.set_proxy_digest(digest);182 let stream = stream.into_any().downcast::<Stream>().unwrap(); // safe, it is Stream from above183 Ok(*stream)184}
整个过程分为基本三部分:
- 首先在
[1]
,Pingora
通过connect_uds()
连接了指定的Unix Domain Socket
路径。 - 然后在
[2]
,通过raw_connect::generate_connect_header()
生成了req_header
。 - 最后在
[3]
,通过raw_connect::connect()
建连。
我们一步一步来看。
connect_uds
连接 Unix Domain Socket
很简单,直接调 tokio
的 connect()
方法就可以了:
249/// connect() to the given Unix domain socket250pub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> {251 UnixStream::connect(path)252 .await253 .map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", path.display())))254}
raw_connect::generate_connect_header
接下来是生成 ReqHeader
的过程。
60/// Generate the CONNECT header for the given destination61pub fn generate_connect_header<'a, H, S>(62 host: &str,63 port: u16,64 headers: H,65) -> Result<Box<ReqHeader>>66where67 S: AsRef<[u8]>,68 H: Iterator<Item = (S, &'a Vec<u8>)>,69{70 // TODO: valid that host doesn't have port71 // TODO: support adding ad-hoc headers72
73 let authority = format!("{host}:{port}");74 let req = http::request::Builder::new()75 .version(http::Version::HTTP_11)76 .method(http::method::Method::CONNECT)77 .uri(format!("https://{authority}/")) // scheme doesn't matter78 .header(http::header::HOST, &authority);79
80 let (mut req, _) = match req.body(()) {81 Ok(r) => r.into_parts(),82 Err(e) => {83 return Err(e).or_err(InvalidHTTPHeader, "Invalid CONNECT request");84 }85 };86
87 for (k, v) in headers {88 let header_name = http::header::HeaderName::from_bytes(k.as_ref())89 .or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;90 let header_value = http::header::HeaderValue::from_bytes(v.as_slice())91 .or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;92 req.headers.insert(header_name, header_value);93 }94
95 Ok(Box::new(req))96}
这里 Pingora
通过 http crate
的 Builder
构造了一个合法的 HTTP CONNECT Request
,并填充了 Pingora
在 Peer
中指定的 Proxy Header
。
raw_connect::connect
万事就绪,最后只要建立连接就可以了。
33/// Try to establish a CONNECT proxy via the given `stream`.34///35/// `request_header` should include the necessary request headers for the CONNECT protocol.36///37/// When successful, a [`Stream`] will be returned which is the established CONNECT proxy connection.38pub async fn connect(stream: Stream, request_header: &ReqHeader) -> Result<(Stream, ProxyDigest)> {39 let mut http = HttpSession::new(stream);40
41 // We write to stream directly because HttpSession doesn't write req header in auth form42 let to_wire = http_req_header_to_wire_auth_form(request_header);43 http.underlying_stream44 .write_all(to_wire.as_ref())45 .await46 .or_err(WriteError, "while writing request headers")?;47 http.underlying_stream48 .flush()49 .await50 .or_err(WriteError, "while flushing request headers")?;51
52 // TODO: set http.read_timeout53 let resp_header = http.read_resp_header_parts().await?;54 Ok((55 http.underlying_stream,56 validate_connect_response(resp_header)?,57 ))58}
整个连接过程分三个部分。首先,在 [1]
将 request_header
通过 http_req_header_to_wire_auth_form
转换为 Buffer
:
144#[inline]145fn http_req_header_to_wire_auth_form(req: &ReqHeader) -> BytesMut {146 let mut buf = BytesMut::with_capacity(512);147
148 // Request-Line149 let method = req.method.as_str().as_bytes();150 buf.put_slice(method);151 buf.put_u8(b' ');152 // NOTE: CONNECT doesn't need URI path so we just skip that153 if let Some(path) = req.uri.authority() {154 buf.put_slice(path.as_str().as_bytes());155 }156 buf.put_u8(b' ');157
158 let version = match req.version {159 Version::HTTP_09 => "HTTP/0.9",160 Version::HTTP_10 => "HTTP/1.0",161 Version::HTTP_11 => "HTTP/1.1",162 _ => "HTTP/0.9",163 };164 buf.put_slice(version.as_bytes());165 buf.put_slice(CRLF);166
167 // headers168 let headers = &req.headers;169 for (key, value) in headers.iter() {170 buf.put_slice(key.as_ref());171 buf.put_slice(HEADER_KV_DELIMITER);172 buf.put_slice(value.as_ref());173 buf.put_slice(CRLF);174 }175
176 buf.put_slice(CRLF);177 buf178}
然后,在 [2]
读取并校验 HTTP Server
的返回值、Header
是否合法:
180#[inline]181fn validate_connect_response(resp: Box<ResponseHeader>) -> Result<ProxyDigest> {182 if !resp.status.is_success() {183 return Error::e_because(184 ConnectProxyFailure,185 "None 2xx code",186 ConnectProxyError::boxed_new(resp),187 );188 }189
190 // Checking Content-Length and Transfer-Encoding is optional because we already ignore them.191 // We choose to do so because we want to be strict for internal use of CONNECT.192 // Ignore Content-Length header because our internal CONNECT server is coded to send it.193 if resp.headers.get(http::header::TRANSFER_ENCODING).is_some() {194 return Error::e_because(195 ConnectProxyFailure,196 "Invalid Transfer-Encoding presents",197 ConnectProxyError::boxed_new(resp),198 );199 }200 Ok(ProxyDigest::new(resp))201}
最后在 [3]
,将 http
的 underlying_stream
返回。至此,TCP
连接已经建立,Pingora
也就可以在这条通道中继续进行 TCP
通信了。