Skip to content

Learning Pingora 01 - Getting Started

Published: at 18:36

ToC

示例代码

让我们首先按照 Pingora 的 example 来看吧。首先是建立一个项目:

Terminal window
1
# 初始化项目
2
cargo init pingora-learning
3
cd pingora-learning
4
5
# 加入引用
6
cargo add pingora -F lb
7
cargo add async-trait

然后是照葫芦画瓢:

1
use async_trait::async_trait;
2
use pingora::prelude::*;
3
use std::sync::Arc;
4
5
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
6
7
#[async_trait]
8
impl ProxyHttp for LB {
9
/// For this small example, we don't need context storage
10
type CTX = ();
11
12
fn new_ctx(&self) -> () {
13
()
14
}
15
16
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
17
let upstream = self
18
.0
19
.select(b"", 256) // hash doesn't matter for round robin
20
.unwrap();
21
22
println!("upstream peer is: {upstream:?}");
23
24
// Set SNI to one.one.one.one
25
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
26
Ok(peer)
27
}
28
29
async fn upstream_request_filter(
30
&self,
31
_session: &mut Session,
32
upstream_request: &mut RequestHeader,
33
_ctx: &mut Self::CTX,
34
) -> Result<()> {
35
upstream_request
36
.insert_header("Host", "one.one.one.one")
37
.unwrap();
38
Ok(())
39
}
40
}
41
42
fn main() {
43
let mut my_server = Server::new(None).unwrap();
44
my_server.bootstrap();
45
46
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
47
let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));
48
lb.add_tcp("0.0.0.0:6188");
49
50
my_server.add_service(lb);
51
my_server.run_forever();
52
}

这个案例足够简单,让我们看看 Pingora 实际是怎么操作的。

Server、Service 与 Http

让我们首先观察一下 fn main() 的实现。

在整个 main 中,贯彻始终的是 Server,也就是上面高亮的 [1][3] 部分。从 bootstrapadd_service,以及最后的 run_forever,它都是最主要的服务组成部分。而 Server 真正需要的内部逻辑,比如 example 中的 Load Balancer,则是通过 Service 的方式向 Server 注册的。

Servicepingora-core 中的定义如下:

pingora-core/src/services/mod.rs
31
/// The service interface
32
#[async_trait]
33
pub trait Service: Sync + Send {
34
/// This function will be called when the server is ready to start the service.
35
///
36
/// - `fds`: a collection of listening file descriptors. During zero downtime restart
37
/// the `fds` would contain the listening sockets passed from the old service, services should
38
/// take the sockets they need to use then. If the sockets the service looks for don't appear in
39
/// the collection, the service should create its own listening sockets and then put them into
40
/// the collection in order for them to be passed to the next server.
41
/// - `shutdown`: the shutdown signal this server would receive.
42
async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch);
43
44
/// The name of the service, just for logging and naming the threads assigned to this service
45
///
46
/// Note that due to the limit of the underlying system, only the first 16 chars will be used
47
fn name(&self) -> &str;
48
49
/// The preferred number of threads to run this service
50
///
51
/// If `None`, the global setting will be used
52
fn threads(&self) -> Option<usize> {
53
None
54
}
55
}

最核心的就是 start_service 了。Service 需要负责热更新相关的事项,因此 start_service 接收了 fds 作为参数,用于在 server 更新时通过 fd 继承旧服务的 service。此外,shutdown 也可以在停止时监听并作一些移交的事项。

HTTP Proxy Service

在 example 的 47 行,我们调用了 http_proxy_service 作为我们 LB 的 Wrapper。这其实是一个 Service 初始化的快捷手段:

pingora-proxy/src/lib.rs
620
/// Create a [Service] from the user implemented [ProxyHttp].
621
///
622
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
623
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {
624
Service::new(
625
"Pingora HTTP Proxy Service".into(),
626
HttpProxy::new(inner, conf.clone()),
627
)
628
}

在内部,我们实际上是创建了一个 struct Service,并且在这个 Service 里包了一个 HttpProxystruct Service 实现了 trait Service

pingora-proxy/src/lib.rs
192
#[async_trait]
193
impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {
24 collapsed lines
194
async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) {
195
let runtime = current_handle();
196
let endpoints = self.listeners.build(fds);
197
198
let handlers = endpoints.into_iter().map(|endpoint| {
199
let app_logic = self.app_logic.clone();
200
let shutdown = shutdown.clone();
201
runtime.spawn(async move {
202
Self::run_endpoint(app_logic, endpoint, shutdown).await;
203
})
204
});
205
206
futures::future::join_all(handlers).await;
207
self.listeners.cleanup();
208
self.app_logic.cleanup();
209
}
210
211
fn name(&self) -> &str {
212
&self.name
213
}
214
215
fn threads(&self) -> Option<usize> {
216
self.threads
217
}
218
}

这使得通过 http_proxy_service 构造的 HttpProxy 能作为 ServiceServer 加载。而 HttpProxy 则实现了 HttpServerApp

pingora-proxy/src/lib.rs
577
#[async_trait]
578
impl<SV> HttpServerApp for HttpProxy<SV>
579
where
580
SV: ProxyHttp + Send + Sync + 'static,
581
<SV as ProxyHttp>::CTX: Send + Sync,
582
{
583
async fn process_new_http(
584
self: &Arc<Self>,
585
session: HttpSession,
586
shutdown: &ShutdownWatch,
587
) -> Option<Stream> {
18 collapsed lines
588
let session = Box::new(session);
589
590
// TODO: keepalive pool, use stack
591
let mut session = match self.handle_new_request(session).await {
592
Some(downstream_session) => Session::new(downstream_session),
593
None => return None, // bad request
594
};
595
596
if *shutdown.borrow() {
597
// stop downstream from reusing if this service is shutting down soon
598
session.set_keepalive(None);
599
} else {
600
// default 60s
601
session.set_keepalive(Some(60));
602
}
603
604
let ctx = self.inner.new_ctx();
605
self.process_request(session, ctx).await
606
}
607
608
fn http_cleanup(&self) {
4 collapsed lines
609
// Notify all keepalived request blocking on read_request() to abort
610
self.shutdown.notify_waiters();
611
612
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
613
}
614
615
// TODO implement h2_options
616
}

让我们看看 HttpServerApp 是什么。

ServerApp 与 HttpServerApp

提到 HttpServerApp 绕不开的就是 ServerApp。我们先来看一下 ServerApp

pingora-core/src/apps/mod.rs
31
#[cfg_attr(not(doc_async_trait), async_trait)]
32
/// This trait defines the interface of a transport layer (TCP or TLS) application.
33
pub trait ServerApp {
34
/// Whenever a new connection is established, this function will be called with the established
35
/// [`Stream`] object provided.
36
///
37
/// The application can do whatever it wants with the `session`.
38
///
39
/// After processing the `session`, if the `session`'s connection is reusable, This function
40
/// can return it to the service by returning `Some(session)`. The returned `session` will be
41
/// fed to another [`Self::process_new()`] for another round of processing.
42
/// If not reusable, `None` should be returned.
43
///
44
/// The `shutdown` argument will change from `false` to `true` when the server receives a
45
/// signal to shutdown. This argument allows the application to react accordingly.
46
async fn process_new(
47
self: &Arc<Self>,
48
mut session: Stream,
49
// TODO: make this ShutdownWatch so that all task can await on this event
50
shutdown: &ShutdownWatch,
51
) -> Option<Stream>;
52
53
/// This callback will be called once after the service stops listening to its endpoints.
54
fn cleanup(&self) {}
55
}

简单易懂。在新的连接进入时,Pingora 会调用 process_new() 进行处理;而当 Service 停止时,则会使用 cleanup() 进行清理。

再看看 HttpServerApp

pingora-core/src/apps/mod.rs
57
/// This trait defines the interface of a HTTP application.
58
#[cfg_attr(not(doc_async_trait), async_trait)]
59
pub trait HttpServerApp {
60
/// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
61
///
62
/// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
63
/// connection back to the service. The caller needs to make sure that the connection is in a reusable state
64
/// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
65
async fn process_new_http(
66
self: &Arc<Self>,
67
mut session: ServerSession,
68
// TODO: make this ShutdownWatch so that all task can await on this event
69
shutdown: &ShutdownWatch,
70
) -> Option<Stream>;
71
72
/// Provide options on how HTTP/2 connection should be established. This function will be called
73
/// every time a new HTTP/2 **connection** needs to be established.
74
///
75
/// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
76
fn h2_options(&self) -> Option<server::H2Options> {
77
None
78
}
79
80
fn http_cleanup(&self) {}
81
}

首先是 process_new_http。与 process_new 不能说是一致,几乎可以说是完全一样了。唯一的区别是传入的第二个参数 session 的类型从 Stream 变成了 ServerSessionServerSession 是 HTTP 下特有的 context,针对 http1http2 分别存储了不同的内容。

Pingora 为所有 trait HttpServerApp 实现了 trait ServerApp。因此上面实现了 HttpServerAppHttpProxy 也就自动实现了 ServerApp