在讲完一些有的没的(?)之后,我们继续回到 Pingora
的主线。这一篇我们来看一看 Peer
,也就是 pingora-core::upstreams
中的一些结构与实现。
ToC
从 Example 继续
回到 Getting Started 开头的 Example
。这次我们的注意力放在 HttpPeer
上:
use async_trait::async_trait;use pingora::prelude::*;use std::sync::Arc;
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]impl ProxyHttp for LB {7 collapsed lines
/// For this small example, we don't need context storage type CTX = ();
fn new_ctx(&self) -> () { () }
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> { let upstream = self .0 .select(b"", 256) // hash doesn't matter for round robin .unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); Ok(peer) }25 collapsed lines
async fn upstream_request_filter( &self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX, ) -> Result<()> { upstream_request .insert_header("Host", "one.one.one.one") .unwrap(); Ok(()) }}
fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap();
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams))); lb.add_tcp("0.0.0.0:6188");
my_server.add_service(lb); my_server.run_forever();}
可以看到,在实现 ProxyHttp
的过程中,我们需要返回一个 HttpPeer
用于连接。HttpPeer
,以及它实现了的 trait Peer
,都是定义在 pingora-core
中的结构。让我们深入看看。
trait Peer
Peer
是 Connector
在连接时用于承载连接参数的结构。它的 Trait
定义如下:
/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to/// connect to and how to connect to it.pub trait Peer: Display + Clone { /// The remote address to connect to fn address(&self) -> &SocketAddr; /// If TLS should be used; fn tls(&self) -> bool; /// The SNI to send, if TLS is used fn sni(&self) -> &str; /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. /// /// The connection to two peers are considered reusable to each other if their reuse hashes are /// the same fn reuse_hash(&self) -> u64;103 collapsed lines
/// Get the proxy setting to connect to the remote server fn get_proxy(&self) -> Option<&Proxy> { None } /// Get the additional options to connect to the peer. /// /// See [`PeerOptions`] for more details fn get_peer_options(&self) -> Option<&PeerOptions> { None } /// Get the additional options for modification. fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> { None } /// Whether the TLS handshake should validate the cert of the server. fn verify_cert(&self) -> bool { match self.get_peer_options() { Some(opt) => opt.verify_cert, None => false, } } /// Whether the TLS handshake should verify that the server cert matches the SNI. fn verify_hostname(&self) -> bool { match self.get_peer_options() { Some(opt) => opt.verify_hostname, None => false, } } /// The alternative common name to use to verify the server cert. /// /// If the server cert doesn't match the SNI, this name will be used to /// verify the cert. fn alternative_cn(&self) -> Option<&String> { match self.get_peer_options() { Some(opt) => opt.alternative_cn.as_ref(), None => None, } } /// Which local source address this connection should be bind to. fn bind_to(&self) -> Option<&InetSocketAddr> { match self.get_peer_options() { Some(opt) => opt.bind_to.as_ref(), None => None, } } /// How long connect() call should be wait before it returns a timeout error. fn connection_timeout(&self) -> Option<Duration> { match self.get_peer_options() { Some(opt) => opt.connection_timeout, None => None, } } /// How long the overall connection establishment should take before a timeout error is returned. fn total_connection_timeout(&self) -> Option<Duration> { match self.get_peer_options() { Some(opt) => opt.total_connection_timeout, None => None, } } /// If the connection can be reused, how long the connection should wait to be reused before it /// shuts down. fn idle_timeout(&self) -> Option<Duration> { self.get_peer_options().and_then(|o| o.idle_timeout) }
/// Get the ALPN preference. fn get_alpn(&self) -> Option<&ALPN> { self.get_peer_options().map(|opt| &opt.alpn) }
/// Get the CA cert to use to validate the server cert. /// /// If not set, the default CAs will be used. fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> { match self.get_peer_options() { Some(opt) => opt.ca.as_ref(), None => None, } }
/// Get the client cert and key for mutual TLS if any fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> { None }
/// The TCP keepalive setting that should be applied to this connection fn tcp_keepalive(&self) -> Option<&TcpKeepalive> { self.get_peer_options() .and_then(|o| o.tcp_keepalive.as_ref()) }
/// The interval H2 pings to send to the server if any fn h2_ping_interval(&self) -> Option<Duration> { self.get_peer_options().and_then(|o| o.h2_ping_interval) }
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool { self.address().check_fd_match(fd) }
fn get_tracer(&self) -> Option<Tracer> { None }}
为了节省篇幅,这里将大量拥有默认实现的 trait Method
折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer
必须实现的,也就是连接的过程中必须告知 Connector
的属性有:
方法 | 简介 |
---|---|
address() | 连接的地址。 |
tls() | 是否要以 TLS 连接。 |
sni() | 当使用 TLS 时的 SNI 。 |
reuse_hash() | 是否应该复用连接。当这个值相等时,则可以复用。 |
初次之外,还有一些有用的属性,包括:
get_proxy()
: 是否使用代理连接verify_cert()
: 是否检查证书verify_hostname
: 是否检查Hostname
connection_timeout()
: 通过connect
建立连接时的超时时间total_connection_timeout()
: 连接的总超时时间matches_fd()
: 判断某一FD
是否和该连接匹配get_tracer()
: 获取该连接对应的Tracer
Tracing 与 Tracer
在 trait Peer
中,定义了一种 Tracer
类型。这也是 Peer
用于追踪的手段。Tracer
需要实现 trace Tracing
,负责在连接成功或失败时被调用。定义如下:
/// The interface to trace the connectionpub trait Tracing: Send + Sync + std::fmt::Debug { /// This method is called when successfully connected to a remote server fn on_connected(&self); /// This method is called when the connection is disconnected. fn on_disconnected(&self); /// A way to clone itself fn boxed_clone(&self) -> Box<dyn Tracing>;}
/// An object-safe version of Tracing object that can use Clone#[derive(Debug)]pub struct Tracer(pub Box<dyn Tracing>);
BasicPeer
在了解完 trait Peer
的定义后,我们来看一个简单的 Peer
实现:BasicPeer
。Pingora
在一些简单的场景中会用到 BasicPeer
,比如 TcpHealthCheck
和单元测试。这个实现也比较粗糙,目前只支持建立到 address
的非 TLS
连接。源码非常简单,基本就是把 field
填了一下。如下所示:
/// A simple TCP or TLS peer without many complicated settings.#[derive(Debug, Clone)]pub struct BasicPeer { pub _address: SocketAddr, pub sni: String, pub options: PeerOptions,}
impl BasicPeer { /// Create a new [`BasicPeer`] pub fn new(address: &str) -> Self { BasicPeer { _address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support // for UDS sni: "".to_string(), // TODO: add support for SNI options: PeerOptions::new(), } }}7 collapsed lines
impl Display for BasicPeer { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "{:?}", self) }}
impl Peer for BasicPeer { fn address(&self) -> &SocketAddr { &self._address }
fn tls(&self) -> bool { !self.sni.is_empty() }
fn bind_to(&self) -> Option<&InetSocketAddr> { None }
fn sni(&self) -> &str { &self.sni }
// TODO: change connection pool to accept u64 instead of String fn reuse_hash(&self) -> u64 { let mut hasher = AHasher::default(); self._address.hash(&mut hasher); hasher.finish() }
fn get_peer_options(&self) -> Option<&PeerOptions> { Some(&self.options) }}
HttpPeer
看完了比较简单的,再来看看相对复杂一些的。HttpPeer
,也就是我们在 Example
中用到的结构。相比 BasicPeer
,它额外支持了 https
、SNI
、代理、客户端证书,还有诸如 UDS
(Unix Domain Socket
) 的使用。
/// A peer representing the remote HTTP server to connect to#[derive(Debug, Clone)]pub struct HttpPeer { pub _address: SocketAddr, pub scheme: Scheme, pub sni: String, pub proxy: Option<Proxy>, pub client_cert_key: Option<Arc<CertKey>>, pub options: PeerOptions,}
impl HttpPeer { // These methods are pretty ad-hoc pub fn is_tls(&self) -> bool { match self.scheme { Scheme::HTTP => false, Scheme::HTTPS => true, } }
fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self { HttpPeer { _address: address, scheme: Scheme::from_tls_bool(tls), sni, proxy: None, client_cert_key: None, options: PeerOptions::new(), } }
/// Create a new [`HttpPeer`] with the given socket address and TLS settings. pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self { let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error let addr = addrs_iter.next().unwrap(); Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni) }
/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings. pub fn new_uds(path: &str, tls: bool, sni: String) -> Self { let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error Self::new_from_sockaddr(addr, tls, sni) }
/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port /// combination. pub fn new_proxy( next_hop: &str, ip_addr: IpAddr, port: u16, tls: bool, sni: &str, headers: BTreeMap<String, Vec<u8>>, ) -> Self { HttpPeer { _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)), scheme: Scheme::from_tls_bool(tls), sni: sni.to_string(), proxy: Some(Proxy { next_hop: PathBuf::from(next_hop).into(), host: ip_addr.to_string(), port, headers, }), client_cert_key: None, options: PeerOptions::new(), } }
fn peer_hash(&self) -> u64 { let mut hasher = AHasher::default(); self.hash(&mut hasher); hasher.finish() }}32 collapsed lines
impl Hash for HttpPeer { fn hash<H: Hasher>(&self, state: &mut H) { self._address.hash(state); self.scheme.hash(state); self.proxy.hash(state); self.sni.hash(state); // client cert serial self.client_cert_key.hash(state); // origin server cert verification self.verify_cert().hash(state); self.verify_hostname().hash(state); self.alternative_cn().hash(state); }}
impl Display for HttpPeer { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?; if !self.sni.is_empty() { write!(f, "sni: {},", self.sni)?; } if let Some(p) = self.proxy.as_ref() { write!(f, "proxy: {p},")?; } if let Some(cert) = &self.client_cert_key { write!(f, "client cert: {},", cert)?; } Ok(()) }}
impl Peer for HttpPeer { fn address(&self) -> &SocketAddr { &self._address }
fn tls(&self) -> bool { self.is_tls() }
fn sni(&self) -> &str { &self.sni }
// TODO: change connection pool to accept u64 instead of String fn reuse_hash(&self) -> u64 { self.peer_hash() }
fn get_peer_options(&self) -> Option<&PeerOptions> { Some(&self.options) }
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> { Some(&mut self.options) }
fn get_proxy(&self) -> Option<&Proxy> { self.proxy.as_ref() }
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool { if let Some(proxy) = self.get_proxy() { proxy.next_hop.check_fd_match(fd) } else { self.address().check_fd_match(fd) } }
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> { self.client_cert_key.as_ref() }
fn get_tracer(&self) -> Option<Tracer> { self.options.tracer.clone() }}
HttpPeer
支持三种构造形式:
[1]
:new()
,可以建立到某个SocketAddr
的连接。[2]
:new_uds()
,可以通过Unix Domain Socket
的路径建立连接。[3]
:new_proxy()
,则可以以一个http proxy
作为跳板继续建立连接。
Proxy
HttpProxy
中使用的 Proxy
支持连接任意 host:port
或 UDS
路径,并可以指定额外的 HTTP Header
,定义如下:
/// The proxy settings to connect to the remote server, CONNECT only for now#[derive(Debug, Hash, Clone)]pub struct Proxy { pub next_hop: Box<Path>, // for now this will be the path to the UDS pub host: String, // the proxied host. Could be either IP addr or hostname. pub port: u16, // the port to proxy to pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT}12 collapsed lines
impl Display for Proxy { fn fmt(&self, f: &mut Formatter) -> FmtResult { write!( f, "next_hop: {}, host: {}, port: {}", self.next_hop.display(), self.host, self.port ) }}