在讲完一些有的没的(?)之后,我们继续回到 Pingora
的主线。这一篇我们来看一看 Peer
,也就是 pingora-core::upstreams
中的一些结构与实现。
ToC
从 Example 继续
回到 Getting Started 开头的 Example
。这次我们的注意力放在 HttpPeer
上:
1use async_trait::async_trait;2use pingora::prelude::*;3use std::sync::Arc;4
5pub struct LB(Arc<LoadBalancer<RoundRobin>>);6
7#[async_trait]8impl ProxyHttp for LB {7 collapsed lines
9 /// For this small example, we don't need context storage10 type CTX = ();11
12 fn new_ctx(&self) -> () {13 ()14 }15
16 async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {17 let upstream = self18 .019 .select(b"", 256) // hash doesn't matter for round robin20 .unwrap();21
22 println!("upstream peer is: {upstream:?}");23
24 // Set SNI to one.one.one.one25 let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));26 Ok(peer)27 }25 collapsed lines
28
29 async fn upstream_request_filter(30 &self,31 _session: &mut Session,32 upstream_request: &mut RequestHeader,33 _ctx: &mut Self::CTX,34 ) -> Result<()> {35 upstream_request36 .insert_header("Host", "one.one.one.one")37 .unwrap();38 Ok(())39 }40}41
42fn main() {43 let mut my_server = Server::new(None).unwrap();44 my_server.bootstrap();45
46 let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();47 let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));48 lb.add_tcp("0.0.0.0:6188");49
50 my_server.add_service(lb);51 my_server.run_forever();52}
可以看到,在实现 ProxyHttp
的过程中,我们需要返回一个 HttpPeer
用于连接。HttpPeer
,以及它实现了的 trait Peer
,都是定义在 pingora-core
中的结构。让我们深入看看。
trait Peer
Peer
是 Connector
在连接时用于承载连接参数的结构。它的 Trait
定义如下:
56/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to57/// connect to and how to connect to it.58pub trait Peer: Display + Clone {59 /// The remote address to connect to60 fn address(&self) -> &SocketAddr;61 /// If TLS should be used;62 fn tls(&self) -> bool;63 /// The SNI to send, if TLS is used64 fn sni(&self) -> &str;65 /// To decide whether a [`Peer`] can use the connection established by another [`Peer`].66 ///67 /// The connection to two peers are considered reusable to each other if their reuse hashes are68 /// the same69 fn reuse_hash(&self) -> u64;103 collapsed lines
70 /// Get the proxy setting to connect to the remote server71 fn get_proxy(&self) -> Option<&Proxy> {72 None73 }74 /// Get the additional options to connect to the peer.75 ///76 /// See [`PeerOptions`] for more details77 fn get_peer_options(&self) -> Option<&PeerOptions> {78 None79 }80 /// Get the additional options for modification.81 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {82 None83 }84 /// Whether the TLS handshake should validate the cert of the server.85 fn verify_cert(&self) -> bool {86 match self.get_peer_options() {87 Some(opt) => opt.verify_cert,88 None => false,89 }90 }91 /// Whether the TLS handshake should verify that the server cert matches the SNI.92 fn verify_hostname(&self) -> bool {93 match self.get_peer_options() {94 Some(opt) => opt.verify_hostname,95 None => false,96 }97 }98 /// The alternative common name to use to verify the server cert.99 ///100 /// If the server cert doesn't match the SNI, this name will be used to101 /// verify the cert.102 fn alternative_cn(&self) -> Option<&String> {103 match self.get_peer_options() {104 Some(opt) => opt.alternative_cn.as_ref(),105 None => None,106 }107 }108 /// Which local source address this connection should be bind to.109 fn bind_to(&self) -> Option<&InetSocketAddr> {110 match self.get_peer_options() {111 Some(opt) => opt.bind_to.as_ref(),112 None => None,113 }114 }115 /// How long connect() call should be wait before it returns a timeout error.116 fn connection_timeout(&self) -> Option<Duration> {117 match self.get_peer_options() {118 Some(opt) => opt.connection_timeout,119 None => None,120 }121 }122 /// How long the overall connection establishment should take before a timeout error is returned.123 fn total_connection_timeout(&self) -> Option<Duration> {124 match self.get_peer_options() {125 Some(opt) => opt.total_connection_timeout,126 None => None,127 }128 }129 /// If the connection can be reused, how long the connection should wait to be reused before it130 /// shuts down.131 fn idle_timeout(&self) -> Option<Duration> {132 self.get_peer_options().and_then(|o| o.idle_timeout)133 }134
135 /// Get the ALPN preference.136 fn get_alpn(&self) -> Option<&ALPN> {137 self.get_peer_options().map(|opt| &opt.alpn)138 }139
140 /// Get the CA cert to use to validate the server cert.141 ///142 /// If not set, the default CAs will be used.143 fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> {144 match self.get_peer_options() {145 Some(opt) => opt.ca.as_ref(),146 None => None,147 }148 }149
150 /// Get the client cert and key for mutual TLS if any151 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {152 None153 }154
155 /// The TCP keepalive setting that should be applied to this connection156 fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {157 self.get_peer_options()158 .and_then(|o| o.tcp_keepalive.as_ref())159 }160
161 /// The interval H2 pings to send to the server if any162 fn h2_ping_interval(&self) -> Option<Duration> {163 self.get_peer_options().and_then(|o| o.h2_ping_interval)164 }165
166 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {167 self.address().check_fd_match(fd)168 }169
170 fn get_tracer(&self) -> Option<Tracer> {171 None172 }173}
为了节省篇幅,这里将大量拥有默认实现的 trait Method
折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer
必须实现的,也就是连接的过程中必须告知 Connector
的属性有:
方法 | 简介 |
---|---|
address() | 连接的地址。 |
tls() | 是否要以 TLS 连接。 |
sni() | 当使用 TLS 时的 SNI 。 |
reuse_hash() | 是否应该复用连接。当这个值相等时,则可以复用。 |
初次之外,还有一些有用的属性,包括:
get_proxy()
: 是否使用代理连接verify_cert()
: 是否检查证书verify_hostname
: 是否检查Hostname
connection_timeout()
: 通过connect
建立连接时的超时时间total_connection_timeout()
: 连接的总超时时间matches_fd()
: 判断某一FD
是否和该连接匹配get_tracer()
: 获取该连接对应的Tracer
Tracing 与 Tracer
在 trait Peer
中,定义了一种 Tracer
类型。这也是 Peer
用于追踪的手段。Tracer
需要实现 trace Tracing
,负责在连接成功或失败时被调用。定义如下:
36/// The interface to trace the connection37pub trait Tracing: Send + Sync + std::fmt::Debug {38 /// This method is called when successfully connected to a remote server39 fn on_connected(&self);40 /// This method is called when the connection is disconnected.41 fn on_disconnected(&self);42 /// A way to clone itself43 fn boxed_clone(&self) -> Box<dyn Tracing>;44}45
46/// An object-safe version of Tracing object that can use Clone47#[derive(Debug)]48pub struct Tracer(pub Box<dyn Tracing>);
BasicPeer
在了解完 trait Peer
的定义后,我们来看一个简单的 Peer
实现:BasicPeer
。Pingora
在一些简单的场景中会用到 BasicPeer
,比如 TcpHealthCheck
和单元测试。这个实现也比较粗糙,目前只支持建立到 address
的非 TLS
连接。源码非常简单,基本就是把 field
填了一下。如下所示:
75/// A simple TCP or TLS peer without many complicated settings.76#[derive(Debug, Clone)]77pub struct BasicPeer {78 pub _address: SocketAddr,79 pub sni: String,80 pub options: PeerOptions,81}82
83impl BasicPeer {84 /// Create a new [`BasicPeer`]85 pub fn new(address: &str) -> Self {86 BasicPeer {87 _address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support88 // for UDS89 sni: "".to_string(), // TODO: add support for SNI90 options: PeerOptions::new(),91 }92 }93}7 collapsed lines
94
95impl Display for BasicPeer {96 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {97 write!(f, "{:?}", self)98 }99}100
101impl Peer for BasicPeer {102 fn address(&self) -> &SocketAddr {103 &self._address104 }105
106 fn tls(&self) -> bool {107 !self.sni.is_empty()108 }109
110 fn bind_to(&self) -> Option<&InetSocketAddr> {111 None112 }113
114 fn sni(&self) -> &str {115 &self.sni116 }117
118 // TODO: change connection pool to accept u64 instead of String119 fn reuse_hash(&self) -> u64 {120 let mut hasher = AHasher::default();121 self._address.hash(&mut hasher);122 hasher.finish()123 }124
125 fn get_peer_options(&self) -> Option<&PeerOptions> {126 Some(&self.options)127 }128}
HttpPeer
看完了比较简单的,再来看看相对复杂一些的。HttpPeer
,也就是我们在 Example
中用到的结构。相比 BasicPeer
,它额外支持了 https
、SNI
、代理、客户端证书,还有诸如 UDS
(Unix Domain Socket
) 的使用。
364/// A peer representing the remote HTTP server to connect to365#[derive(Debug, Clone)]366pub struct HttpPeer {367 pub _address: SocketAddr,368 pub scheme: Scheme,369 pub sni: String,370 pub proxy: Option<Proxy>,371 pub client_cert_key: Option<Arc<CertKey>>,372 pub options: PeerOptions,373}374
375impl HttpPeer {376 // These methods are pretty ad-hoc377 pub fn is_tls(&self) -> bool {378 match self.scheme {379 Scheme::HTTP => false,380 Scheme::HTTPS => true,381 }382 }383
384 fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {385 HttpPeer {386 _address: address,387 scheme: Scheme::from_tls_bool(tls),388 sni,389 proxy: None,390 client_cert_key: None,391 options: PeerOptions::new(),392 }393 }394
395 /// Create a new [`HttpPeer`] with the given socket address and TLS settings.396 pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {397 let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error398 let addr = addrs_iter.next().unwrap();399 Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)400 }401
402 /// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.403 pub fn new_uds(path: &str, tls: bool, sni: String) -> Self {404 let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error405 Self::new_from_sockaddr(addr, tls, sni)406 }407
408 /// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port409 /// combination.410 pub fn new_proxy(411 next_hop: &str,412 ip_addr: IpAddr,413 port: u16,414 tls: bool,415 sni: &str,416 headers: BTreeMap<String, Vec<u8>>,417 ) -> Self {418 HttpPeer {419 _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),420 scheme: Scheme::from_tls_bool(tls),421 sni: sni.to_string(),422 proxy: Some(Proxy {423 next_hop: PathBuf::from(next_hop).into(),424 host: ip_addr.to_string(),425 port,426 headers,427 }),428 client_cert_key: None,429 options: PeerOptions::new(),430 }431 }432
433 fn peer_hash(&self) -> u64 {434 let mut hasher = AHasher::default();435 self.hash(&mut hasher);436 hasher.finish()437 }438}32 collapsed lines
439
440impl Hash for HttpPeer {441 fn hash<H: Hasher>(&self, state: &mut H) {442 self._address.hash(state);443 self.scheme.hash(state);444 self.proxy.hash(state);445 self.sni.hash(state);446 // client cert serial447 self.client_cert_key.hash(state);448 // origin server cert verification449 self.verify_cert().hash(state);450 self.verify_hostname().hash(state);451 self.alternative_cn().hash(state);452 }453}454
455impl Display for HttpPeer {456 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {457 write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;458 if !self.sni.is_empty() {459 write!(f, "sni: {},", self.sni)?;460 }461 if let Some(p) = self.proxy.as_ref() {462 write!(f, "proxy: {p},")?;463 }464 if let Some(cert) = &self.client_cert_key {465 write!(f, "client cert: {},", cert)?;466 }467 Ok(())468 }469}470
471impl Peer for HttpPeer {472 fn address(&self) -> &SocketAddr {473 &self._address474 }475
476 fn tls(&self) -> bool {477 self.is_tls()478 }479
480 fn sni(&self) -> &str {481 &self.sni482 }483
484 // TODO: change connection pool to accept u64 instead of String485 fn reuse_hash(&self) -> u64 {486 self.peer_hash()487 }488
489 fn get_peer_options(&self) -> Option<&PeerOptions> {490 Some(&self.options)491 }492
493 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {494 Some(&mut self.options)495 }496
497 fn get_proxy(&self) -> Option<&Proxy> {498 self.proxy.as_ref()499 }500
501 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {502 if let Some(proxy) = self.get_proxy() {503 proxy.next_hop.check_fd_match(fd)504 } else {505 self.address().check_fd_match(fd)506 }507 }508
509 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {510 self.client_cert_key.as_ref()511 }512
513 fn get_tracer(&self) -> Option<Tracer> {514 self.options.tracer.clone()515 }516}
HttpPeer
支持三种构造形式:
[1]
:new()
,可以建立到某个SocketAddr
的连接。[2]
:new_uds()
,可以通过Unix Domain Socket
的路径建立连接。[3]
:new_proxy()
,则可以以一个http proxy
作为跳板继续建立连接。
Proxy
HttpProxy
中使用的 Proxy
支持连接任意 host:port
或 UDS
路径,并可以指定额外的 HTTP Header
,定义如下:
518/// The proxy settings to connect to the remote server, CONNECT only for now519#[derive(Debug, Hash, Clone)]520pub struct Proxy {521 pub next_hop: Box<Path>, // for now this will be the path to the UDS522 pub host: String, // the proxied host. Could be either IP addr or hostname.523 pub port: u16, // the port to proxy to524 pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT525}12 collapsed lines
526
527impl Display for Proxy {528 fn fmt(&self, f: &mut Formatter) -> FmtResult {529 write!(530 f,531 "next_hop: {}, host: {}, port: {}",532 self.next_hop.display(),533 self.host,534 self.port535 )536 }537}