Skip to content

Learning Pingora 01 - Getting Started

Published: at 18:36

ToC

示例代码

让我们首先按照 Pingora 的 example 来看吧。首先是建立一个项目:

Terminal window
# 初始化项目
cargo init pingora-learning
cd pingora-learning
# 加入引用
cargo add pingora -F lb
cargo add async-trait

然后是照葫芦画瓢:

use async_trait::async_trait;
use pingora::prelude::*;
use std::sync::Arc;
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]
impl ProxyHttp for LB {
/// For this small example, we don't need context storage
type CTX = ();
fn new_ctx(&self) -> () {
()
}
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
let upstream = self
.0
.select(b"", 256) // hash doesn't matter for round robin
.unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
}
fn main() {
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));
lb.add_tcp("0.0.0.0:6188");
my_server.add_service(lb);
my_server.run_forever();
}

这个案例足够简单,让我们看看 Pingora 实际是怎么操作的。

Server、Service 与 Http

让我们首先观察一下 fn main() 的实现。

在整个 main 中,贯彻始终的是 Server,也就是上面高亮的 [1][3] 部分。从 bootstrapadd_service,以及最后的 run_forever,它都是最主要的服务组成部分。而 Server 真正需要的内部逻辑,比如 example 中的 Load Balancer,则是通过 Service 的方式向 Server 注册的。

Servicepingora-core 中的定义如下:

pingora-core/src/services/mod.rs
/// The service interface
#[async_trait]
pub trait Service: Sync + Send {
/// This function will be called when the server is ready to start the service.
///
/// - `fds`: a collection of listening file descriptors. During zero downtime restart
/// the `fds` would contain the listening sockets passed from the old service, services should
/// take the sockets they need to use then. If the sockets the service looks for don't appear in
/// the collection, the service should create its own listening sockets and then put them into
/// the collection in order for them to be passed to the next server.
/// - `shutdown`: the shutdown signal this server would receive.
async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch);
/// The name of the service, just for logging and naming the threads assigned to this service
///
/// Note that due to the limit of the underlying system, only the first 16 chars will be used
fn name(&self) -> &str;
/// The preferred number of threads to run this service
///
/// If `None`, the global setting will be used
fn threads(&self) -> Option<usize> {
None
}
}

最核心的就是 start_service 了。Service 需要负责热更新相关的事项,因此 start_service 接收了 fds 作为参数,用于在 server 更新时通过 fd 继承旧服务的 service。此外,shutdown 也可以在停止时监听并作一些移交的事项。

HTTP Proxy Service

在 example 的 47 行,我们调用了 http_proxy_service 作为我们 LB 的 Wrapper。这其实是一个 Service 初始化的快捷手段:

pingora-proxy/src/lib.rs
/// Create a [Service] from the user implemented [ProxyHttp].
///
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {
Service::new(
"Pingora HTTP Proxy Service".into(),
HttpProxy::new(inner, conf.clone()),
)
}

在内部,我们实际上是创建了一个 struct Service,并且在这个 Service 里包了一个 HttpProxystruct Service 实现了 trait Service

pingora-proxy/src/lib.rs
#[async_trait]
impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {
24 collapsed lines
async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) {
let runtime = current_handle();
let endpoints = self.listeners.build(fds);
let handlers = endpoints.into_iter().map(|endpoint| {
let app_logic = self.app_logic.clone();
let shutdown = shutdown.clone();
runtime.spawn(async move {
Self::run_endpoint(app_logic, endpoint, shutdown).await;
})
});
futures::future::join_all(handlers).await;
self.listeners.cleanup();
self.app_logic.cleanup();
}
fn name(&self) -> &str {
&self.name
}
fn threads(&self) -> Option<usize> {
self.threads
}
}

这使得通过 http_proxy_service 构造的 HttpProxy 能作为 ServiceServer 加载。而 HttpProxy 则实现了 HttpServerApp

pingora-proxy/src/lib.rs
#[async_trait]
impl<SV> HttpServerApp for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
session: HttpSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
18 collapsed lines
let session = Box::new(session);
// TODO: keepalive pool, use stack
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session),
None => return None, // bad request
};
if *shutdown.borrow() {
// stop downstream from reusing if this service is shutting down soon
session.set_keepalive(None);
} else {
// default 60s
session.set_keepalive(Some(60));
}
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await
}
fn http_cleanup(&self) {
4 collapsed lines
// Notify all keepalived request blocking on read_request() to abort
self.shutdown.notify_waiters();
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
}
// TODO implement h2_options
}

让我们看看 HttpServerApp 是什么。

ServerApp 与 HttpServerApp

提到 HttpServerApp 绕不开的就是 ServerApp。我们先来看一下 ServerApp

pingora-core/src/apps/mod.rs
#[cfg_attr(not(doc_async_trait), async_trait)]
/// This trait defines the interface of a transport layer (TCP or TLS) application.
pub trait ServerApp {
/// Whenever a new connection is established, this function will be called with the established
/// [`Stream`] object provided.
///
/// The application can do whatever it wants with the `session`.
///
/// After processing the `session`, if the `session`'s connection is reusable, This function
/// can return it to the service by returning `Some(session)`. The returned `session` will be
/// fed to another [`Self::process_new()`] for another round of processing.
/// If not reusable, `None` should be returned.
///
/// The `shutdown` argument will change from `false` to `true` when the server receives a
/// signal to shutdown. This argument allows the application to react accordingly.
async fn process_new(
self: &Arc<Self>,
mut session: Stream,
// TODO: make this ShutdownWatch so that all task can await on this event
shutdown: &ShutdownWatch,
) -> Option<Stream>;
/// This callback will be called once after the service stops listening to its endpoints.
fn cleanup(&self) {}
}

简单易懂。在新的连接进入时,Pingora 会调用 process_new() 进行处理;而当 Service 停止时,则会使用 cleanup() 进行清理。

再看看 HttpServerApp

pingora-core/src/apps/mod.rs
/// This trait defines the interface of a HTTP application.
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait HttpServerApp {
/// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
///
/// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
/// connection back to the service. The caller needs to make sure that the connection is in a reusable state
/// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
async fn process_new_http(
self: &Arc<Self>,
mut session: ServerSession,
// TODO: make this ShutdownWatch so that all task can await on this event
shutdown: &ShutdownWatch,
) -> Option<Stream>;
/// Provide options on how HTTP/2 connection should be established. This function will be called
/// every time a new HTTP/2 **connection** needs to be established.
///
/// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
fn h2_options(&self) -> Option<server::H2Options> {
None
}
fn http_cleanup(&self) {}
}

首先是 process_new_http。与 process_new 不能说是一致,几乎可以说是完全一样了。唯一的区别是传入的第二个参数 session 的类型从 Stream 变成了 ServerSessionServerSession 是 HTTP 下特有的 context,针对 http1http2 分别存储了不同的内容。

Pingora 为所有 trait HttpServerApp 实现了 trait ServerApp。因此上面实现了 HttpServerAppHttpProxy 也就自动实现了 ServerApp


Previous Post
2024 新年解密红包 / Melody Flag
Next Post
Learning Pingora 02 - A Simple HTTP Server