ToC
示例代码
让我们首先按照 Pingora
的 example 来看吧。首先是建立一个项目:
1# 初始化项目2cargo init pingora-learning3cd pingora-learning4
5# 加入引用6cargo add pingora -F lb7cargo add async-trait
然后是照葫芦画瓢:
1use async_trait::async_trait;2use pingora::prelude::*;3use std::sync::Arc;4
5pub struct LB(Arc<LoadBalancer<RoundRobin>>);6
7#[async_trait]8impl ProxyHttp for LB {9 /// For this small example, we don't need context storage10 type CTX = ();11
12 fn new_ctx(&self) -> () {13 ()14 }15
16 async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {17 let upstream = self18 .019 .select(b"", 256) // hash doesn't matter for round robin20 .unwrap();21
22 println!("upstream peer is: {upstream:?}");23
24 // Set SNI to one.one.one.one25 let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));26 Ok(peer)27 }28
29 async fn upstream_request_filter(30 &self,31 _session: &mut Session,32 upstream_request: &mut RequestHeader,33 _ctx: &mut Self::CTX,34 ) -> Result<()> {35 upstream_request36 .insert_header("Host", "one.one.one.one")37 .unwrap();38 Ok(())39 }40}41
42fn main() {43 let mut my_server = Server::new(None).unwrap();44 my_server.bootstrap();45
46 let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();47 let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams)));48 lb.add_tcp("0.0.0.0:6188");49
50 my_server.add_service(lb);51 my_server.run_forever();52}
这个案例足够简单,让我们看看 Pingora
实际是怎么操作的。
Server、Service 与 Http
让我们首先观察一下 fn main()
的实现。
在整个 main
中,贯彻始终的是 Server
,也就是上面高亮的 [1]
和 [3]
部分。从 bootstrap
到 add_service
,以及最后的 run_forever
,它都是最主要的服务组成部分。而 Server
真正需要的内部逻辑,比如 example 中的 Load Balancer,则是通过 Service
的方式向 Server
注册的。
Service
在 pingora-core
中的定义如下:
31/// The service interface32#[async_trait]33pub trait Service: Sync + Send {34 /// This function will be called when the server is ready to start the service.35 ///36 /// - `fds`: a collection of listening file descriptors. During zero downtime restart37 /// the `fds` would contain the listening sockets passed from the old service, services should38 /// take the sockets they need to use then. If the sockets the service looks for don't appear in39 /// the collection, the service should create its own listening sockets and then put them into40 /// the collection in order for them to be passed to the next server.41 /// - `shutdown`: the shutdown signal this server would receive.42 async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch);43
44 /// The name of the service, just for logging and naming the threads assigned to this service45 ///46 /// Note that due to the limit of the underlying system, only the first 16 chars will be used47 fn name(&self) -> &str;48
49 /// The preferred number of threads to run this service50 ///51 /// If `None`, the global setting will be used52 fn threads(&self) -> Option<usize> {53 None54 }55}
最核心的就是 start_service
了。Service
需要负责热更新相关的事项,因此 start_service
接收了 fds
作为参数,用于在 server
更新时通过 fd 继承旧服务的 service
。此外,shutdown
也可以在停止时监听并作一些移交的事项。
HTTP Proxy Service
在 example 的 47 行,我们调用了 http_proxy_service
作为我们 LB 的 Wrapper
。这其实是一个 Service 初始化的快捷手段:
620/// Create a [Service] from the user implemented [ProxyHttp].621///622/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.623pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {624 Service::new(625 "Pingora HTTP Proxy Service".into(),626 HttpProxy::new(inner, conf.clone()),627 )628}
在内部,我们实际上是创建了一个 struct Service
,并且在这个 Service 里包了一个 HttpProxy
。struct Service
实现了 trait Service
:
192#[async_trait]193impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {24 collapsed lines
194 async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) {195 let runtime = current_handle();196 let endpoints = self.listeners.build(fds);197
198 let handlers = endpoints.into_iter().map(|endpoint| {199 let app_logic = self.app_logic.clone();200 let shutdown = shutdown.clone();201 runtime.spawn(async move {202 Self::run_endpoint(app_logic, endpoint, shutdown).await;203 })204 });205
206 futures::future::join_all(handlers).await;207 self.listeners.cleanup();208 self.app_logic.cleanup();209 }210
211 fn name(&self) -> &str {212 &self.name213 }214
215 fn threads(&self) -> Option<usize> {216 self.threads217 }218}
这使得通过 http_proxy_service
构造的 HttpProxy 能作为 Service
被 Server
加载。而 HttpProxy
则实现了 HttpServerApp
:
577#[async_trait]578impl<SV> HttpServerApp for HttpProxy<SV>579where580 SV: ProxyHttp + Send + Sync + 'static,581 <SV as ProxyHttp>::CTX: Send + Sync,582{583 async fn process_new_http(584 self: &Arc<Self>,585 session: HttpSession,586 shutdown: &ShutdownWatch,587 ) -> Option<Stream> {18 collapsed lines
588 let session = Box::new(session);589
590 // TODO: keepalive pool, use stack591 let mut session = match self.handle_new_request(session).await {592 Some(downstream_session) => Session::new(downstream_session),593 None => return None, // bad request594 };595
596 if *shutdown.borrow() {597 // stop downstream from reusing if this service is shutting down soon598 session.set_keepalive(None);599 } else {600 // default 60s601 session.set_keepalive(Some(60));602 }603
604 let ctx = self.inner.new_ctx();605 self.process_request(session, ctx).await606 }607
608 fn http_cleanup(&self) {4 collapsed lines
609 // Notify all keepalived request blocking on read_request() to abort610 self.shutdown.notify_waiters();611
612 // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()613 }614
615 // TODO implement h2_options616}
让我们看看 HttpServerApp
是什么。
ServerApp 与 HttpServerApp
提到 HttpServerApp
绕不开的就是 ServerApp
。我们先来看一下 ServerApp
:
31#[cfg_attr(not(doc_async_trait), async_trait)]32/// This trait defines the interface of a transport layer (TCP or TLS) application.33pub trait ServerApp {34 /// Whenever a new connection is established, this function will be called with the established35 /// [`Stream`] object provided.36 ///37 /// The application can do whatever it wants with the `session`.38 ///39 /// After processing the `session`, if the `session`'s connection is reusable, This function40 /// can return it to the service by returning `Some(session)`. The returned `session` will be41 /// fed to another [`Self::process_new()`] for another round of processing.42 /// If not reusable, `None` should be returned.43 ///44 /// The `shutdown` argument will change from `false` to `true` when the server receives a45 /// signal to shutdown. This argument allows the application to react accordingly.46 async fn process_new(47 self: &Arc<Self>,48 mut session: Stream,49 // TODO: make this ShutdownWatch so that all task can await on this event50 shutdown: &ShutdownWatch,51 ) -> Option<Stream>;52
53 /// This callback will be called once after the service stops listening to its endpoints.54 fn cleanup(&self) {}55}
简单易懂。在新的连接进入时,Pingora
会调用 process_new()
进行处理;而当 Service
停止时,则会使用 cleanup()
进行清理。
再看看 HttpServerApp
:
57/// This trait defines the interface of a HTTP application.58#[cfg_attr(not(doc_async_trait), async_trait)]59pub trait HttpServerApp {60 /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.61 ///62 /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable63 /// connection back to the service. The caller needs to make sure that the connection is in a reusable state64 /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.65 async fn process_new_http(66 self: &Arc<Self>,67 mut session: ServerSession,68 // TODO: make this ShutdownWatch so that all task can await on this event69 shutdown: &ShutdownWatch,70 ) -> Option<Stream>;71
72 /// Provide options on how HTTP/2 connection should be established. This function will be called73 /// every time a new HTTP/2 **connection** needs to be established.74 ///75 /// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.76 fn h2_options(&self) -> Option<server::H2Options> {77 None78 }79
80 fn http_cleanup(&self) {}81}
首先是 process_new_http
。与 process_new
不能说是一致,几乎可以说是完全一样了。唯一的区别是传入的第二个参数 session
的类型从 Stream
变成了 ServerSession
。ServerSession
是 HTTP 下特有的 context,针对 http1
和 http2
分别存储了不同的内容。
Pingora
为所有 trait HttpServerApp
实现了 trait ServerApp
。因此上面实现了 HttpServerApp
的 HttpProxy
也就自动实现了 ServerApp
。