Skip to content

Learning Pingora 03 - Upstreams and Peers

Published: at 22:52
Updated: at 17:20

在讲完一些有的没的(?)之后,我们继续回到 Pingora 的主线。这一篇我们来看一看 Peer,也就是 pingora-core::upstreams 中的一些结构与实现。

ToC

从 Example 继续

回到 Getting Started 开头的 Example。这次我们的注意力放在 HttpPeer 上:

use async_trait::async_trait;
use pingora::prelude::*;
use std::sync::Arc;
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]
impl ProxyHttp for LB {
7 collapsed lines
/// For this small example, we don't need context storage
type CTX = ();
fn new_ctx(&self) -> () {
()
}
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
let upstream = self
.0
.select(b"", 256) // hash doesn't matter for round robin
.unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
25 collapsed lines
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
}
fn main() {
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));
lb.add_tcp("0.0.0.0:6188");
my_server.add_service(lb);
my_server.run_forever();
}

可以看到,在实现 ProxyHttp 的过程中,我们需要返回一个 HttpPeer 用于连接。HttpPeer,以及它实现了的 trait Peer,都是定义在 pingora-core 中的结构。让我们深入看看。

trait Peer

PeerConnector 在连接时用于承载连接参数的结构。它的 Trait 定义如下:

pingora-core/src/upstreams/peer.rs
/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to
/// connect to and how to connect to it.
pub trait Peer: Display + Clone {
/// The remote address to connect to
fn address(&self) -> &SocketAddr;
/// If TLS should be used;
fn tls(&self) -> bool;
/// The SNI to send, if TLS is used
fn sni(&self) -> &str;
/// To decide whether a [`Peer`] can use the connection established by another [`Peer`].
///
/// The connection to two peers are considered reusable to each other if their reuse hashes are
/// the same
fn reuse_hash(&self) -> u64;
103 collapsed lines
/// Get the proxy setting to connect to the remote server
fn get_proxy(&self) -> Option<&Proxy> {
None
}
/// Get the additional options to connect to the peer.
///
/// See [`PeerOptions`] for more details
fn get_peer_options(&self) -> Option<&PeerOptions> {
None
}
/// Get the additional options for modification.
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
None
}
/// Whether the TLS handshake should validate the cert of the server.
fn verify_cert(&self) -> bool {
match self.get_peer_options() {
Some(opt) => opt.verify_cert,
None => false,
}
}
/// Whether the TLS handshake should verify that the server cert matches the SNI.
fn verify_hostname(&self) -> bool {
match self.get_peer_options() {
Some(opt) => opt.verify_hostname,
None => false,
}
}
/// The alternative common name to use to verify the server cert.
///
/// If the server cert doesn't match the SNI, this name will be used to
/// verify the cert.
fn alternative_cn(&self) -> Option<&String> {
match self.get_peer_options() {
Some(opt) => opt.alternative_cn.as_ref(),
None => None,
}
}
/// Which local source address this connection should be bind to.
fn bind_to(&self) -> Option<&InetSocketAddr> {
match self.get_peer_options() {
Some(opt) => opt.bind_to.as_ref(),
None => None,
}
}
/// How long connect() call should be wait before it returns a timeout error.
fn connection_timeout(&self) -> Option<Duration> {
match self.get_peer_options() {
Some(opt) => opt.connection_timeout,
None => None,
}
}
/// How long the overall connection establishment should take before a timeout error is returned.
fn total_connection_timeout(&self) -> Option<Duration> {
match self.get_peer_options() {
Some(opt) => opt.total_connection_timeout,
None => None,
}
}
/// If the connection can be reused, how long the connection should wait to be reused before it
/// shuts down.
fn idle_timeout(&self) -> Option<Duration> {
self.get_peer_options().and_then(|o| o.idle_timeout)
}
/// Get the ALPN preference.
fn get_alpn(&self) -> Option<&ALPN> {
self.get_peer_options().map(|opt| &opt.alpn)
}
/// Get the CA cert to use to validate the server cert.
///
/// If not set, the default CAs will be used.
fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> {
match self.get_peer_options() {
Some(opt) => opt.ca.as_ref(),
None => None,
}
}
/// Get the client cert and key for mutual TLS if any
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
None
}
/// The TCP keepalive setting that should be applied to this connection
fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
self.get_peer_options()
.and_then(|o| o.tcp_keepalive.as_ref())
}
/// The interval H2 pings to send to the server if any
fn h2_ping_interval(&self) -> Option<Duration> {
self.get_peer_options().and_then(|o| o.h2_ping_interval)
}
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
self.address().check_fd_match(fd)
}
fn get_tracer(&self) -> Option<Tracer> {
None
}
}

为了节省篇幅,这里将大量拥有默认实现的 trait Method 折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer 必须实现的,也就是连接的过程中必须告知 Connector 的属性有:

方法简介
address()连接的地址。
tls()是否要以 TLS 连接。
sni()当使用 TLS 时的 SNI
reuse_hash()是否应该复用连接。当这个值相等时,则可以复用。

初次之外,还有一些有用的属性,包括:

Tracing 与 Tracer

trait Peer 中,定义了一种 Tracer 类型。这也是 Peer 用于追踪的手段。Tracer 需要实现 trace Tracing,负责在连接成功失败时被调用。定义如下:

pingora-core/src/upstreams/peer.rs
/// The interface to trace the connection
pub trait Tracing: Send + Sync + std::fmt::Debug {
/// This method is called when successfully connected to a remote server
fn on_connected(&self);
/// This method is called when the connection is disconnected.
fn on_disconnected(&self);
/// A way to clone itself
fn boxed_clone(&self) -> Box<dyn Tracing>;
}
/// An object-safe version of Tracing object that can use Clone
#[derive(Debug)]
pub struct Tracer(pub Box<dyn Tracing>);

BasicPeer

在了解完 trait Peer 的定义后,我们来看一个简单的 Peer 实现:BasicPeerPingora 在一些简单的场景中会用到 BasicPeer,比如 TcpHealthCheck 和单元测试。这个实现也比较粗糙,目前只支持建立到 address 的非 TLS 连接。源码非常简单,基本就是把 field 填了一下。如下所示:

pingora-core/src/upstreams/peer.rs
/// A simple TCP or TLS peer without many complicated settings.
#[derive(Debug, Clone)]
pub struct BasicPeer {
pub _address: SocketAddr,
pub sni: String,
pub options: PeerOptions,
}
impl BasicPeer {
/// Create a new [`BasicPeer`]
pub fn new(address: &str) -> Self {
BasicPeer {
_address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support
// for UDS
sni: "".to_string(), // TODO: add support for SNI
options: PeerOptions::new(),
}
}
}
7 collapsed lines
impl Display for BasicPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{:?}", self)
}
}
impl Peer for BasicPeer {
fn address(&self) -> &SocketAddr {
&self._address
}
fn tls(&self) -> bool {
!self.sni.is_empty()
}
fn bind_to(&self) -> Option<&InetSocketAddr> {
None
}
fn sni(&self) -> &str {
&self.sni
}
// TODO: change connection pool to accept u64 instead of String
fn reuse_hash(&self) -> u64 {
let mut hasher = AHasher::default();
self._address.hash(&mut hasher);
hasher.finish()
}
fn get_peer_options(&self) -> Option<&PeerOptions> {
Some(&self.options)
}
}

HttpPeer

看完了比较简单的,再来看看相对复杂一些的。HttpPeer,也就是我们在 Example 中用到的结构。相比 BasicPeer,它额外支持了 httpsSNI、代理、客户端证书,还有诸如 UDS(Unix Domain Socket) 的使用。

pingora-core/src/upstreams/peer.rs
/// A peer representing the remote HTTP server to connect to
#[derive(Debug, Clone)]
pub struct HttpPeer {
pub _address: SocketAddr,
pub scheme: Scheme,
pub sni: String,
pub proxy: Option<Proxy>,
pub client_cert_key: Option<Arc<CertKey>>,
pub options: PeerOptions,
}
impl HttpPeer {
// These methods are pretty ad-hoc
pub fn is_tls(&self) -> bool {
match self.scheme {
Scheme::HTTP => false,
Scheme::HTTPS => true,
}
}
fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
HttpPeer {
_address: address,
scheme: Scheme::from_tls_bool(tls),
sni,
proxy: None,
client_cert_key: None,
options: PeerOptions::new(),
}
}
/// Create a new [`HttpPeer`] with the given socket address and TLS settings.
pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error
let addr = addrs_iter.next().unwrap();
Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
}
/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.
pub fn new_uds(path: &str, tls: bool, sni: String) -> Self {
let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error
Self::new_from_sockaddr(addr, tls, sni)
}
/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port
/// combination.
pub fn new_proxy(
next_hop: &str,
ip_addr: IpAddr,
port: u16,
tls: bool,
sni: &str,
headers: BTreeMap<String, Vec<u8>>,
) -> Self {
HttpPeer {
_address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
scheme: Scheme::from_tls_bool(tls),
sni: sni.to_string(),
proxy: Some(Proxy {
next_hop: PathBuf::from(next_hop).into(),
host: ip_addr.to_string(),
port,
headers,
}),
client_cert_key: None,
options: PeerOptions::new(),
}
}
fn peer_hash(&self) -> u64 {
let mut hasher = AHasher::default();
self.hash(&mut hasher);
hasher.finish()
}
}
32 collapsed lines
impl Hash for HttpPeer {
fn hash<H: Hasher>(&self, state: &mut H) {
self._address.hash(state);
self.scheme.hash(state);
self.proxy.hash(state);
self.sni.hash(state);
// client cert serial
self.client_cert_key.hash(state);
// origin server cert verification
self.verify_cert().hash(state);
self.verify_hostname().hash(state);
self.alternative_cn().hash(state);
}
}
impl Display for HttpPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
if !self.sni.is_empty() {
write!(f, "sni: {},", self.sni)?;
}
if let Some(p) = self.proxy.as_ref() {
write!(f, "proxy: {p},")?;
}
if let Some(cert) = &self.client_cert_key {
write!(f, "client cert: {},", cert)?;
}
Ok(())
}
}
impl Peer for HttpPeer {
fn address(&self) -> &SocketAddr {
&self._address
}
fn tls(&self) -> bool {
self.is_tls()
}
fn sni(&self) -> &str {
&self.sni
}
// TODO: change connection pool to accept u64 instead of String
fn reuse_hash(&self) -> u64 {
self.peer_hash()
}
fn get_peer_options(&self) -> Option<&PeerOptions> {
Some(&self.options)
}
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
Some(&mut self.options)
}
fn get_proxy(&self) -> Option<&Proxy> {
self.proxy.as_ref()
}
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
if let Some(proxy) = self.get_proxy() {
proxy.next_hop.check_fd_match(fd)
} else {
self.address().check_fd_match(fd)
}
}
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
self.client_cert_key.as_ref()
}
fn get_tracer(&self) -> Option<Tracer> {
self.options.tracer.clone()
}
}

HttpPeer 支持三种构造形式:

Proxy

HttpProxy 中使用的 Proxy 支持连接任意 host:portUDS 路径,并可以指定额外的 HTTP Header,定义如下:

pingora-core/src/upstreams/peer.rs
/// The proxy settings to connect to the remote server, CONNECT only for now
#[derive(Debug, Hash, Clone)]
pub struct Proxy {
pub next_hop: Box<Path>, // for now this will be the path to the UDS
pub host: String, // the proxied host. Could be either IP addr or hostname.
pub port: u16, // the port to proxy to
pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT
}
12 collapsed lines
impl Display for Proxy {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(
f,
"next_hop: {}, host: {}, port: {}",
self.next_hop.display(),
self.host,
self.port
)
}
}

Previous Post
Learning Pingora 02 - A Simple HTTP Server
Next Post
Learning Pingora 04 - Establish L4 Connection