Skip to content

Learning Pingora 04 - Establish L4 Connection

Updated: at 01:22

现在我们有了想要连接的 Peer,接下来要做的就是连接的建立了。在 Pingora 中,目前一共实现了三种协议:

我们也会按照这个顺序依次介绍。这一篇中我们先来看看这三种协议中最底层的:TCP(L4) 连接。

ToC

目录结构

Pingora 中,协议相关的内容都定义在 pingora-core::protocol 下而和协议对应的连接则定义在 pingora-core::connector 下。以 L4 连接为例,我们主要研究的文件结构就是:

l4connect 方法接收 Peerbind_toSocketAddr 两个参数,最终返回 L4Stream

connect 同时支持 InetUnix Domain Socket。我们从 Inet 下手,来理一下整个连接的基本流程。Unix Domain Socket 部分的实现大同小异,而且整体来看比 Inet 简单,这里就不展开赘述了。

连接部分的代码如下所示:

pingora-core/src/connectors/l4.rs
27
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
28
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
29
where
30
P: Peer + Send + Sync,
31
{
5 collapsed lines
32
if peer.get_proxy().is_some() {
33
return proxy_connect(peer)
34
.await
35
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
36
}
37
let peer_addr = peer.address();
38
let mut stream: Stream = match peer_addr {
39
SocketAddr::Inet(addr) => {
40
let connect_future = tcp_connect(addr, bind_to.as_ref());
41
let conn_res = match peer.connection_timeout() {
42
Some(t) => pingora_timeout::timeout(t, connect_future)
43
.await
44
.explain_err(ConnectTimedout, |_| {
45
format!("timeout {t:?} connecting to server {peer}")
46
})?,
47
None => connect_future.await,
48
};
49
match conn_res {
50
Ok(socket) => {
51
debug!("connected to new server: {}", peer.address());
52
if let Some(ka) = peer.tcp_keepalive() {
53
debug!("Setting tcp keepalive");
54
set_tcp_keepalive(&socket, ka)?;
55
}
56
Ok(socket.into())
57
}
58
Err(e) => {
59
let c = format!("Fail to connect to {peer}");
60
match e.etype() {
61
SocketError | BindError => Error::e_because(InternalError, c, e),
62
_ => Err(e.more_context(c)),
63
}
64
}
65
}
66
}
67
SocketAddr::Unix(addr) => {
26 collapsed lines
68
let connect_future = connect_uds(
69
addr.as_pathname()
70
.expect("non-pathname unix sockets not supported as peer"),
71
);
72
let conn_res = match peer.connection_timeout() {
73
Some(t) => pingora_timeout::timeout(t, connect_future)
74
.await
75
.explain_err(ConnectTimedout, |_| {
76
format!("timeout {t:?} connecting to server {peer}")
77
})?,
78
None => connect_future.await,
79
};
80
match conn_res {
81
Ok(socket) => {
82
debug!("connected to new server: {}", peer.address());
83
// no SO_KEEPALIVE for UDS
84
Ok(socket.into())
85
}
86
Err(e) => {
87
let c = format!("Fail to connect to {peer}");
88
match e.etype() {
89
SocketError | BindError => Error::e_because(InternalError, c, e),
90
_ => Err(e.more_context(c)),
91
}
92
}
93
}
94
}
95
}?;
96
let tracer = peer.get_tracer();
97
if let Some(t) = tracer {
98
t.0.on_connected();
99
stream.tracer = Some(t);
100
}
101
102
stream.set_nodelay()?;
103
104
let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
105
digest
106
.peer_addr
107
.set(Some(peer_addr.clone()))
108
.expect("newly created OnceCell must be empty");
109
stream.set_socket_digest(digest);
110
111
Ok(stream)
112
}

跟随上文代码高亮的顺序,可以基本整理出 l4 连接的建立逻辑,如下:

  1. 首先是连接的建立本身。这里调用了 tcp_connect,也就是 l4::ext::connect 进行了连接。
  2. 然后是 tcp_keepalive。如果 Peer 需要设置这一选项,则通过 l4::ext::set_tcp_keepalive 进行设置。
  3. 最后是 set_nodelay。由于 Pingora 通过 Tokio 在用户态实现了写缓存,因此这里需要将 Nagle's algorithm 禁用。

tcp_connect

l4::ext::connect 的连接过程基本和 Tokio 中的连接相似。区别是增加了 bind_to 的实现。

pingora-core/src/protocols/l4/ext.rs
214
/*
215
* this extension is needed until the following are addressed
216
* https://github.com/tokio-rs/tokio/issues/1543
217
* https://github.com/tokio-rs/mio/issues/1257
218
* https://github.com/tokio-rs/mio/issues/1211
219
*/
220
/// connect() to the given address while optionally bind to the specific source address
221
///
222
/// `IP_BIND_ADDRESS_NO_PORT` is used.
223
pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {
224
let socket = if addr.is_ipv4() {
225
TcpSocket::new_v4()
226
} else {
227
TcpSocket::new_v6()
228
}
229
.or_err(SocketError, "failed to create socket")?;
230
231
if cfg!(target_os = "linux") {
232
ip_bind_addr_no_port(socket.as_raw_fd(), true)
233
.or_err(SocketError, "failed to set socket opts")?;
234
235
if let Some(baddr) = bind_to {
236
socket
237
.bind(*baddr)
238
.or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?;
239
};
240
}
241
// TODO: add support for bind on other platforms
242
243
socket
244
.connect(*addr)
245
.await
246
.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))
247
}

首先在 [1]Pingora 调用了 ip_bind_addr_no_port,要求操作系统不自动分配端口。

pingora-core/src/protocols/l4/ext.rs
133
#[cfg(target_os = "linux")]
134
fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> {
135
const IP_BIND_ADDRESS_NO_PORT: i32 = 24;
136
137
set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int)
138
}
139
140
#[cfg(not(target_os = "linux"))]
141
fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> {
142
Ok(())
143
}

Pingora 目前只在 Linux 上正确实现了 ip_bind_addr_no_port,其他系统下只有 dummy 实现。对于 LinuxPingora 通过 setsockopt 设置了 IP_BIND_ADDRESS_NO_PORT 属性。

在确保 OS 没有自动分配 port 之后,Pingora[2]bind 手动绑定了 bind_to 所要求的 SocketAddr

Stream

l4::ext::connect 成功后,我们得到的是一个 tokio::net::tcp::TcpStream,而 connect 需要返回的却是 Stream 类型。将 TcpStream 转化为 Stream 则是 protocol::l4::Stream 中定义的了。

Pingora 定义的 l4::Stream 是一个带读写缓存的 BufStream,其中写缓存可以选择关闭。如下代码所示:

pingora-core/src/protocols/l4/stream.rs
132
/// A concrete type for transport layer connection + extra fields for logging
133
#[derive(Debug)]
134
pub struct Stream {
135
stream: BufStream<RawStream>,
136
buffer_write: bool,
137
proxy_digest: Option<Arc<ProxyDigest>>,
138
socket_digest: Option<Arc<SocketDigest>>,
139
/// When this connection is established
140
pub established_ts: SystemTime,
141
/// The distributed tracing object for this stream
142
pub tracer: Option<Tracer>,
143
}
144
145
impl Stream {
146
/// set TCP nodelay for this connection if `self` is TCP
147
pub fn set_nodelay(&mut self) -> Result<()> {
148
if let RawStream::Tcp(s) = &self.stream.get_ref() {
149
s.set_nodelay(true)
150
.or_err(ConnectError, "failed to set_nodelay")?;
151
}
152
Ok(())
153
}
154
}

TcpStream -> Stream

Pingora 实现了 From,因此 TcpStreamUnixStream 可以通过 into() 转化成 Stream

pingora-core/src/protocols/l4/stream.rs
120
// Large read buffering helps reducing syscalls with little trade-off
121
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
122
const BUF_READ_SIZE: usize = 64 * 1024;
123
// Small write buf to match MSS. Too large write buf delays real time communication.
124
// This buffering effectively implements something similar to Nagle's algorithm.
125
// The benefit is that user space can control when to flush, where Nagle's can't be controlled.
126
// And userspace buffering reduce both syscalls and small packets.
127
const BUF_WRITE_SIZE: usize = 1460;
128
129
// NOTE: with writer buffering, users need to call flush() to make sure the data is actually
130
// sent. Otherwise data could be stuck in the buffer forever or get lost when stream is closed.
25 collapsed lines
131
132
/// A concrete type for transport layer connection + extra fields for logging
133
#[derive(Debug)]
134
pub struct Stream {
135
stream: BufStream<RawStream>,
136
buffer_write: bool,
137
proxy_digest: Option<Arc<ProxyDigest>>,
138
socket_digest: Option<Arc<SocketDigest>>,
139
/// When this connection is established
140
pub established_ts: SystemTime,
141
/// The distributed tracing object for this stream
142
pub tracer: Option<Tracer>,
143
}
144
145
impl Stream {
146
/// set TCP nodelay for this connection if `self` is TCP
147
pub fn set_nodelay(&mut self) -> Result<()> {
148
if let RawStream::Tcp(s) = &self.stream.get_ref() {
149
s.set_nodelay(true)
150
.or_err(ConnectError, "failed to set_nodelay")?;
151
}
152
Ok(())
153
}
154
}
155
156
impl From<TcpStream> for Stream {
157
fn from(s: TcpStream) -> Self {
158
Stream {
159
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),
160
buffer_write: true,
161
established_ts: SystemTime::now(),
162
proxy_digest: None,
163
socket_digest: None,
164
tracer: None,
165
}
166
}
167
}
168
169
impl From<UnixStream> for Stream {
10 collapsed lines
170
fn from(s: UnixStream) -> Self {
171
Stream {
172
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),
173
buffer_write: true,
174
established_ts: SystemTime::now(),
175
proxy_digest: None,
176
socket_digest: None,
177
tracer: None,
178
}
179
}
180
}

这里比较关键的是两个常量的选值:

  1. BUF_READ_SIZE: 这个值是 64k。由于 TLS record size 是 16k,因此这里通过 64k 对齐,减少 syscall 的数量。
  2. BUF_WRITE_SIZE: 这个值是 1460,和 MSS 匹配,通过 Tokio 在用户态的写缓存实现,替换掉内核态的 Nagle's algorithm

Stream::drop

Streamtrait Drop 实现也非常有趣。我们看看:

pingora-core/src/protocols/l4/stream.rs
235
impl Drop for Stream {
236
fn drop(&mut self) {
237
if let Some(t) = self.tracer.as_ref() {
238
t.0.on_disconnected();
239
}
240
/* use nodelay/local_addr function to detect socket status */
241
let ret = match &self.stream.get_ref() {
242
RawStream::Tcp(s) => s.nodelay().err(),
243
RawStream::Unix(s) => s.local_addr().err(),
244
};
245
if let Some(e) = ret {
246
match e.kind() {
247
tokio::io::ErrorKind::Other => {
248
if let Some(ecode) = e.raw_os_error() {
249
if ecode == 9 {
250
// Or we could panic here
251
error!("Crit: socket {:?} is being double closed", self.stream);
252
}
253
}
254
}
255
_ => {
256
debug!("Socket is already broken {:?}", e);
257
}
258
}
259
} else {
260
// try flush the write buffer. We use now_or_never() because
261
// 1. Drop cannot be async
262
// 2. write should usually be ready, unless the buf is full.
263
let _ = self.flush().now_or_never();
264
}
265
debug!("Dropping socket {:?}", self.stream);
266
}
267
}

[1]Pingora 分别通过 nodelaylocal_addr 尝试获取 Stream 的状态。而在 [2],当 Stream 仍然可写时,则调用了自身的 flush(),并且通过 now_or_never() 尝试立即将缓存清空。

Stream::poll_write

另一个比较有趣的点是 Pingora 实现绕过写缓存的方式:

pingora-core/src/protocols/l4/stream.rs
279
impl AsyncWrite for Stream {
280
fn poll_write(
281
mut self: Pin<&mut Self>,
282
cx: &mut Context,
283
buf: &[u8],
284
) -> Poll<io::Result<usize>> {
285
if self.buffer_write {
286
Pin::new(&mut self.stream).poll_write(cx, buf)
287
} else {
288
Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf)
289
}
290
}
28 collapsed lines
291
292
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
293
Pin::new(&mut self.stream).poll_flush(cx)
294
}
295
296
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
297
Pin::new(&mut self.stream).poll_shutdown(cx)
298
}
299
300
fn poll_write_vectored(
301
mut self: Pin<&mut Self>,
302
cx: &mut Context<'_>,
303
bufs: &[std::io::IoSlice<'_>],
304
) -> Poll<io::Result<usize>> {
305
if self.buffer_write {
306
Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
307
} else {
308
Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs)
309
}
310
}
311
312
fn is_write_vectored(&self) -> bool {
313
if self.buffer_write {
314
self.stream.is_write_vectored() // it is true
315
} else {
316
self.stream.get_ref().is_write_vectored()
317
}
318
}
319
}

在需要 buffer_write 时,还是直接通过 self.steram 调用 poll_write 即可。但对于不希望使用缓存的场景,则是通过 get_mut() 直接拿到了 underlying I/O object。而向 underlying stream 的写入自然就跳过 BufStream 的缓存了。

基于 HTTP 隧道的 L4 代理

在上文描述 l4::connect 的实现中,我们省略了 proxy_connect 的部分。代理连接部分代码如下所示:

pingora-core/src/connectors/l4.rs
27
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
28
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
29
where
30
P: Peer + Send + Sync,
31
{
32
if peer.get_proxy().is_some() {
33
return proxy_connect(peer)
34
.await
35
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
36
}
37
let peer_addr = peer.address();
38
let mut stream: Stream = match peer_addr {

Pingora 中,L4 的代理是通过 HTTP 隧道实现的。简单来说,就是 Proxy Server 通过 Unix Domain Socket 启动一个 HTTP 1.1HTTP Proxy,然后 Proxy Client(也就是 Pingora)通过 CONNECT 请求连接到指定的远端 ip:port。完整代码如下所示:

pingora-core/src/connectors/l4.rs
146
async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {
147
// safe to unwrap
148
let proxy = peer.get_proxy().unwrap();
149
let options = peer.get_peer_options().unwrap();
150
151
// combine required and optional headers
152
let mut headers = proxy
153
.headers
154
.iter()
155
.chain(options.extra_proxy_headers.iter());
156
157
// not likely to timeout during connect() to UDS
158
let stream: Box<Stream> = Box::new(
159
connect_uds(&proxy.next_hop)
160
.await
161
.or_err_with(ConnectError, || {
162
format!("CONNECT proxy connect() error to {:?}", &proxy.next_hop)
163
})?
164
.into(),
165
);
166
167
let req_header = raw_connect::generate_connect_header(&proxy.host, proxy.port, &mut headers)?;
168
let fut = raw_connect::connect(stream, &req_header);
169
let (mut stream, digest) = match peer.connection_timeout() {
170
Some(t) => pingora_timeout::timeout(t, fut)
171
.await
172
.explain_err(ConnectTimedout, |_| "establishing CONNECT proxy")?,
173
None => fut.await,
174
}
175
.map_err(|mut e| {
176
// http protocol may ask to retry if reused client
177
e.retry.decide_reuse(false);
178
e
179
})?;
180
debug!("CONNECT proxy established: {:?}", proxy);
181
stream.set_proxy_digest(digest);
182
let stream = stream.into_any().downcast::<Stream>().unwrap(); // safe, it is Stream from above
183
Ok(*stream)
184
}

整个过程分为基本三部分:

  1. 首先在 [1]Pingora 通过 connect_uds() 连接了指定的 Unix Domain Socket 路径。
  2. 然后在 [2],通过 raw_connect::generate_connect_header() 生成了 req_header
  3. 最后在 [3],通过 raw_connect::connect() 建连。

我们一步一步来看。

connect_uds

连接 Unix Domain Socket 很简单,直接调 tokioconnect() 方法就可以了:

pingora-core/src/protocols/l4/ext.rs
249
/// connect() to the given Unix domain socket
250
pub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> {
251
UnixStream::connect(path)
252
.await
253
.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", path.display())))
254
}

raw_connect::generate_connect_header

接下来是生成 ReqHeader 的过程。

pingora-core/src/protocol/raw_connect.rs
60
/// Generate the CONNECT header for the given destination
61
pub fn generate_connect_header<'a, H, S>(
62
host: &str,
63
port: u16,
64
headers: H,
65
) -> Result<Box<ReqHeader>>
66
where
67
S: AsRef<[u8]>,
68
H: Iterator<Item = (S, &'a Vec<u8>)>,
69
{
70
// TODO: valid that host doesn't have port
71
// TODO: support adding ad-hoc headers
72
73
let authority = format!("{host}:{port}");
74
let req = http::request::Builder::new()
75
.version(http::Version::HTTP_11)
76
.method(http::method::Method::CONNECT)
77
.uri(format!("https://{authority}/")) // scheme doesn't matter
78
.header(http::header::HOST, &authority);
79
80
let (mut req, _) = match req.body(()) {
81
Ok(r) => r.into_parts(),
82
Err(e) => {
83
return Err(e).or_err(InvalidHTTPHeader, "Invalid CONNECT request");
84
}
85
};
86
87
for (k, v) in headers {
88
let header_name = http::header::HeaderName::from_bytes(k.as_ref())
89
.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;
90
let header_value = http::header::HeaderValue::from_bytes(v.as_slice())
91
.or_err(InvalidHTTPHeader, "Invalid CONNECT request")?;
92
req.headers.insert(header_name, header_value);
93
}
94
95
Ok(Box::new(req))
96
}

这里 Pingora 通过 http crateBuilder 构造了一个合法的 HTTP CONNECT Request,并填充了 PingoraPeer 中指定的 Proxy Header

raw_connect::connect

万事就绪,最后只要建立连接就可以了。

pingora-core/src/protocol/raw_connect.rs
33
/// Try to establish a CONNECT proxy via the given `stream`.
34
///
35
/// `request_header` should include the necessary request headers for the CONNECT protocol.
36
///
37
/// When successful, a [`Stream`] will be returned which is the established CONNECT proxy connection.
38
pub async fn connect(stream: Stream, request_header: &ReqHeader) -> Result<(Stream, ProxyDigest)> {
39
let mut http = HttpSession::new(stream);
40
41
// We write to stream directly because HttpSession doesn't write req header in auth form
42
let to_wire = http_req_header_to_wire_auth_form(request_header);
43
http.underlying_stream
44
.write_all(to_wire.as_ref())
45
.await
46
.or_err(WriteError, "while writing request headers")?;
47
http.underlying_stream
48
.flush()
49
.await
50
.or_err(WriteError, "while flushing request headers")?;
51
52
// TODO: set http.read_timeout
53
let resp_header = http.read_resp_header_parts().await?;
54
Ok((
55
http.underlying_stream,
56
validate_connect_response(resp_header)?,
57
))
58
}

整个连接过程分三个部分。首先,在 [1]request_header 通过 http_req_header_to_wire_auth_form 转换为 Buffer

pingora-core/src/protocol/raw_connect.rs
144
#[inline]
145
fn http_req_header_to_wire_auth_form(req: &ReqHeader) -> BytesMut {
146
let mut buf = BytesMut::with_capacity(512);
147
148
// Request-Line
149
let method = req.method.as_str().as_bytes();
150
buf.put_slice(method);
151
buf.put_u8(b' ');
152
// NOTE: CONNECT doesn't need URI path so we just skip that
153
if let Some(path) = req.uri.authority() {
154
buf.put_slice(path.as_str().as_bytes());
155
}
156
buf.put_u8(b' ');
157
158
let version = match req.version {
159
Version::HTTP_09 => "HTTP/0.9",
160
Version::HTTP_10 => "HTTP/1.0",
161
Version::HTTP_11 => "HTTP/1.1",
162
_ => "HTTP/0.9",
163
};
164
buf.put_slice(version.as_bytes());
165
buf.put_slice(CRLF);
166
167
// headers
168
let headers = &req.headers;
169
for (key, value) in headers.iter() {
170
buf.put_slice(key.as_ref());
171
buf.put_slice(HEADER_KV_DELIMITER);
172
buf.put_slice(value.as_ref());
173
buf.put_slice(CRLF);
174
}
175
176
buf.put_slice(CRLF);
177
buf
178
}

然后,在 [2] 读取并校验 HTTP Server 的返回值、Header 是否合法:

pingora-core/src/protocol/raw_connect.rs
180
#[inline]
181
fn validate_connect_response(resp: Box<ResponseHeader>) -> Result<ProxyDigest> {
182
if !resp.status.is_success() {
183
return Error::e_because(
184
ConnectProxyFailure,
185
"None 2xx code",
186
ConnectProxyError::boxed_new(resp),
187
);
188
}
189
190
// Checking Content-Length and Transfer-Encoding is optional because we already ignore them.
191
// We choose to do so because we want to be strict for internal use of CONNECT.
192
// Ignore Content-Length header because our internal CONNECT server is coded to send it.
193
if resp.headers.get(http::header::TRANSFER_ENCODING).is_some() {
194
return Error::e_because(
195
ConnectProxyFailure,
196
"Invalid Transfer-Encoding presents",
197
ConnectProxyError::boxed_new(resp),
198
);
199
}
200
Ok(ProxyDigest::new(resp))
201
}

最后在 [3],将 httpunderlying_stream 返回。至此,TCP 连接已经建立,Pingora 也就可以在这条通道中继续进行 TCP 通信了。