Skip to content

Learning Pingora 03 - Upstreams and Peers

Updated: at 17:20

在讲完一些有的没的(?)之后,我们继续回到 Pingora 的主线。这一篇我们来看一看 Peer,也就是 pingora-core::upstreams 中的一些结构与实现。

ToC

从 Example 继续

回到 Getting Started 开头的 Example。这次我们的注意力放在 HttpPeer 上:

1
use async_trait::async_trait;
2
use pingora::prelude::*;
3
use std::sync::Arc;
4
5
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
6
7
#[async_trait]
8
impl ProxyHttp for LB {
7 collapsed lines
9
/// For this small example, we don't need context storage
10
type CTX = ();
11
12
fn new_ctx(&self) -> () {
13
()
14
}
15
16
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
17
let upstream = self
18
.0
19
.select(b"", 256) // hash doesn't matter for round robin
20
.unwrap();
21
22
println!("upstream peer is: {upstream:?}");
23
24
// Set SNI to one.one.one.one
25
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
26
Ok(peer)
27
}
25 collapsed lines
28
29
async fn upstream_request_filter(
30
&self,
31
_session: &mut Session,
32
upstream_request: &mut RequestHeader,
33
_ctx: &mut Self::CTX,
34
) -> Result<()> {
35
upstream_request
36
.insert_header("Host", "one.one.one.one")
37
.unwrap();
38
Ok(())
39
}
40
}
41
42
fn main() {
43
let mut my_server = Server::new(None).unwrap();
44
my_server.bootstrap();
45
46
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
47
let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));
48
lb.add_tcp("0.0.0.0:6188");
49
50
my_server.add_service(lb);
51
my_server.run_forever();
52
}

可以看到,在实现 ProxyHttp 的过程中,我们需要返回一个 HttpPeer 用于连接。HttpPeer,以及它实现了的 trait Peer,都是定义在 pingora-core 中的结构。让我们深入看看。

trait Peer

PeerConnector 在连接时用于承载连接参数的结构。它的 Trait 定义如下:

pingora-core/src/upstreams/peer.rs
56
/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to
57
/// connect to and how to connect to it.
58
pub trait Peer: Display + Clone {
59
/// The remote address to connect to
60
fn address(&self) -> &SocketAddr;
61
/// If TLS should be used;
62
fn tls(&self) -> bool;
63
/// The SNI to send, if TLS is used
64
fn sni(&self) -> &str;
65
/// To decide whether a [`Peer`] can use the connection established by another [`Peer`].
66
///
67
/// The connection to two peers are considered reusable to each other if their reuse hashes are
68
/// the same
69
fn reuse_hash(&self) -> u64;
103 collapsed lines
70
/// Get the proxy setting to connect to the remote server
71
fn get_proxy(&self) -> Option<&Proxy> {
72
None
73
}
74
/// Get the additional options to connect to the peer.
75
///
76
/// See [`PeerOptions`] for more details
77
fn get_peer_options(&self) -> Option<&PeerOptions> {
78
None
79
}
80
/// Get the additional options for modification.
81
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
82
None
83
}
84
/// Whether the TLS handshake should validate the cert of the server.
85
fn verify_cert(&self) -> bool {
86
match self.get_peer_options() {
87
Some(opt) => opt.verify_cert,
88
None => false,
89
}
90
}
91
/// Whether the TLS handshake should verify that the server cert matches the SNI.
92
fn verify_hostname(&self) -> bool {
93
match self.get_peer_options() {
94
Some(opt) => opt.verify_hostname,
95
None => false,
96
}
97
}
98
/// The alternative common name to use to verify the server cert.
99
///
100
/// If the server cert doesn't match the SNI, this name will be used to
101
/// verify the cert.
102
fn alternative_cn(&self) -> Option<&String> {
103
match self.get_peer_options() {
104
Some(opt) => opt.alternative_cn.as_ref(),
105
None => None,
106
}
107
}
108
/// Which local source address this connection should be bind to.
109
fn bind_to(&self) -> Option<&InetSocketAddr> {
110
match self.get_peer_options() {
111
Some(opt) => opt.bind_to.as_ref(),
112
None => None,
113
}
114
}
115
/// How long connect() call should be wait before it returns a timeout error.
116
fn connection_timeout(&self) -> Option<Duration> {
117
match self.get_peer_options() {
118
Some(opt) => opt.connection_timeout,
119
None => None,
120
}
121
}
122
/// How long the overall connection establishment should take before a timeout error is returned.
123
fn total_connection_timeout(&self) -> Option<Duration> {
124
match self.get_peer_options() {
125
Some(opt) => opt.total_connection_timeout,
126
None => None,
127
}
128
}
129
/// If the connection can be reused, how long the connection should wait to be reused before it
130
/// shuts down.
131
fn idle_timeout(&self) -> Option<Duration> {
132
self.get_peer_options().and_then(|o| o.idle_timeout)
133
}
134
135
/// Get the ALPN preference.
136
fn get_alpn(&self) -> Option<&ALPN> {
137
self.get_peer_options().map(|opt| &opt.alpn)
138
}
139
140
/// Get the CA cert to use to validate the server cert.
141
///
142
/// If not set, the default CAs will be used.
143
fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> {
144
match self.get_peer_options() {
145
Some(opt) => opt.ca.as_ref(),
146
None => None,
147
}
148
}
149
150
/// Get the client cert and key for mutual TLS if any
151
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
152
None
153
}
154
155
/// The TCP keepalive setting that should be applied to this connection
156
fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
157
self.get_peer_options()
158
.and_then(|o| o.tcp_keepalive.as_ref())
159
}
160
161
/// The interval H2 pings to send to the server if any
162
fn h2_ping_interval(&self) -> Option<Duration> {
163
self.get_peer_options().and_then(|o| o.h2_ping_interval)
164
}
165
166
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
167
self.address().check_fd_match(fd)
168
}
169
170
fn get_tracer(&self) -> Option<Tracer> {
171
None
172
}
173
}

为了节省篇幅,这里将大量拥有默认实现的 trait Method 折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer 必须实现的,也就是连接的过程中必须告知 Connector 的属性有:

方法简介
address()连接的地址。
tls()是否要以 TLS 连接。
sni()当使用 TLS 时的 SNI
reuse_hash()是否应该复用连接。当这个值相等时,则可以复用。

初次之外,还有一些有用的属性,包括:

Tracing 与 Tracer

trait Peer 中,定义了一种 Tracer 类型。这也是 Peer 用于追踪的手段。Tracer 需要实现 trace Tracing,负责在连接成功失败时被调用。定义如下:

pingora-core/src/upstreams/peer.rs
36
/// The interface to trace the connection
37
pub trait Tracing: Send + Sync + std::fmt::Debug {
38
/// This method is called when successfully connected to a remote server
39
fn on_connected(&self);
40
/// This method is called when the connection is disconnected.
41
fn on_disconnected(&self);
42
/// A way to clone itself
43
fn boxed_clone(&self) -> Box<dyn Tracing>;
44
}
45
46
/// An object-safe version of Tracing object that can use Clone
47
#[derive(Debug)]
48
pub struct Tracer(pub Box<dyn Tracing>);

BasicPeer

在了解完 trait Peer 的定义后,我们来看一个简单的 Peer 实现:BasicPeerPingora 在一些简单的场景中会用到 BasicPeer,比如 TcpHealthCheck 和单元测试。这个实现也比较粗糙,目前只支持建立到 address 的非 TLS 连接。源码非常简单,基本就是把 field 填了一下。如下所示:

pingora-core/src/upstreams/peer.rs
75
/// A simple TCP or TLS peer without many complicated settings.
76
#[derive(Debug, Clone)]
77
pub struct BasicPeer {
78
pub _address: SocketAddr,
79
pub sni: String,
80
pub options: PeerOptions,
81
}
82
83
impl BasicPeer {
84
/// Create a new [`BasicPeer`]
85
pub fn new(address: &str) -> Self {
86
BasicPeer {
87
_address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support
88
// for UDS
89
sni: "".to_string(), // TODO: add support for SNI
90
options: PeerOptions::new(),
91
}
92
}
93
}
7 collapsed lines
94
95
impl Display for BasicPeer {
96
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
97
write!(f, "{:?}", self)
98
}
99
}
100
101
impl Peer for BasicPeer {
102
fn address(&self) -> &SocketAddr {
103
&self._address
104
}
105
106
fn tls(&self) -> bool {
107
!self.sni.is_empty()
108
}
109
110
fn bind_to(&self) -> Option<&InetSocketAddr> {
111
None
112
}
113
114
fn sni(&self) -> &str {
115
&self.sni
116
}
117
118
// TODO: change connection pool to accept u64 instead of String
119
fn reuse_hash(&self) -> u64 {
120
let mut hasher = AHasher::default();
121
self._address.hash(&mut hasher);
122
hasher.finish()
123
}
124
125
fn get_peer_options(&self) -> Option<&PeerOptions> {
126
Some(&self.options)
127
}
128
}

HttpPeer

看完了比较简单的,再来看看相对复杂一些的。HttpPeer,也就是我们在 Example 中用到的结构。相比 BasicPeer,它额外支持了 httpsSNI、代理、客户端证书,还有诸如 UDS(Unix Domain Socket) 的使用。

pingora-core/src/upstreams/peer.rs
364
/// A peer representing the remote HTTP server to connect to
365
#[derive(Debug, Clone)]
366
pub struct HttpPeer {
367
pub _address: SocketAddr,
368
pub scheme: Scheme,
369
pub sni: String,
370
pub proxy: Option<Proxy>,
371
pub client_cert_key: Option<Arc<CertKey>>,
372
pub options: PeerOptions,
373
}
374
375
impl HttpPeer {
376
// These methods are pretty ad-hoc
377
pub fn is_tls(&self) -> bool {
378
match self.scheme {
379
Scheme::HTTP => false,
380
Scheme::HTTPS => true,
381
}
382
}
383
384
fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
385
HttpPeer {
386
_address: address,
387
scheme: Scheme::from_tls_bool(tls),
388
sni,
389
proxy: None,
390
client_cert_key: None,
391
options: PeerOptions::new(),
392
}
393
}
394
395
/// Create a new [`HttpPeer`] with the given socket address and TLS settings.
396
pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
397
let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error
398
let addr = addrs_iter.next().unwrap();
399
Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
400
}
401
402
/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.
403
pub fn new_uds(path: &str, tls: bool, sni: String) -> Self {
404
let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error
405
Self::new_from_sockaddr(addr, tls, sni)
406
}
407
408
/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port
409
/// combination.
410
pub fn new_proxy(
411
next_hop: &str,
412
ip_addr: IpAddr,
413
port: u16,
414
tls: bool,
415
sni: &str,
416
headers: BTreeMap<String, Vec<u8>>,
417
) -> Self {
418
HttpPeer {
419
_address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
420
scheme: Scheme::from_tls_bool(tls),
421
sni: sni.to_string(),
422
proxy: Some(Proxy {
423
next_hop: PathBuf::from(next_hop).into(),
424
host: ip_addr.to_string(),
425
port,
426
headers,
427
}),
428
client_cert_key: None,
429
options: PeerOptions::new(),
430
}
431
}
432
433
fn peer_hash(&self) -> u64 {
434
let mut hasher = AHasher::default();
435
self.hash(&mut hasher);
436
hasher.finish()
437
}
438
}
32 collapsed lines
439
440
impl Hash for HttpPeer {
441
fn hash<H: Hasher>(&self, state: &mut H) {
442
self._address.hash(state);
443
self.scheme.hash(state);
444
self.proxy.hash(state);
445
self.sni.hash(state);
446
// client cert serial
447
self.client_cert_key.hash(state);
448
// origin server cert verification
449
self.verify_cert().hash(state);
450
self.verify_hostname().hash(state);
451
self.alternative_cn().hash(state);
452
}
453
}
454
455
impl Display for HttpPeer {
456
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
457
write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
458
if !self.sni.is_empty() {
459
write!(f, "sni: {},", self.sni)?;
460
}
461
if let Some(p) = self.proxy.as_ref() {
462
write!(f, "proxy: {p},")?;
463
}
464
if let Some(cert) = &self.client_cert_key {
465
write!(f, "client cert: {},", cert)?;
466
}
467
Ok(())
468
}
469
}
470
471
impl Peer for HttpPeer {
472
fn address(&self) -> &SocketAddr {
473
&self._address
474
}
475
476
fn tls(&self) -> bool {
477
self.is_tls()
478
}
479
480
fn sni(&self) -> &str {
481
&self.sni
482
}
483
484
// TODO: change connection pool to accept u64 instead of String
485
fn reuse_hash(&self) -> u64 {
486
self.peer_hash()
487
}
488
489
fn get_peer_options(&self) -> Option<&PeerOptions> {
490
Some(&self.options)
491
}
492
493
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
494
Some(&mut self.options)
495
}
496
497
fn get_proxy(&self) -> Option<&Proxy> {
498
self.proxy.as_ref()
499
}
500
501
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
502
if let Some(proxy) = self.get_proxy() {
503
proxy.next_hop.check_fd_match(fd)
504
} else {
505
self.address().check_fd_match(fd)
506
}
507
}
508
509
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
510
self.client_cert_key.as_ref()
511
}
512
513
fn get_tracer(&self) -> Option<Tracer> {
514
self.options.tracer.clone()
515
}
516
}

HttpPeer 支持三种构造形式:

Proxy

HttpProxy 中使用的 Proxy 支持连接任意 host:portUDS 路径,并可以指定额外的 HTTP Header,定义如下:

pingora-core/src/upstreams/peer.rs
518
/// The proxy settings to connect to the remote server, CONNECT only for now
519
#[derive(Debug, Hash, Clone)]
520
pub struct Proxy {
521
pub next_hop: Box<Path>, // for now this will be the path to the UDS
522
pub host: String, // the proxied host. Could be either IP addr or hostname.
523
pub port: u16, // the port to proxy to
524
pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT
525
}
12 collapsed lines
526
527
impl Display for Proxy {
528
fn fmt(&self, f: &mut Formatter) -> FmtResult {
529
write!(
530
f,
531
"next_hop: {}, host: {}, port: {}",
532
self.next_hop.display(),
533
self.host,
534
self.port
535
)
536
}
537
}