ToC
示例代码
让我们首先按照 Pingora
的 example 来看吧。首先是建立一个项目:
# 初始化项目cargo init pingora-learningcd pingora-learning
# 加入引用cargo add pingora -F lbcargo add async-trait
然后是照葫芦画瓢:
use async_trait::async_trait;use pingora::prelude::*;use std::sync::Arc;
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]impl ProxyHttp for LB { /// For this small example, we don't need context storage type CTX = ();
fn new_ctx(&self) -> () { () }
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> { let upstream = self .0 .select(b"", 256) // hash doesn't matter for round robin .unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); Ok(peer) }
async fn upstream_request_filter( &self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX, ) -> Result<()> { upstream_request .insert_header("Host", "one.one.one.one") .unwrap(); Ok(()) }}
fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap();
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams))); lb.add_tcp("0.0.0.0:6188");
my_server.add_service(lb); my_server.run_forever();}
这个案例足够简单,让我们看看 Pingora
实际是怎么操作的。
Server、Service 与 Http
让我们首先观察一下 fn main()
的实现。
在整个 main
中,贯彻始终的是 Server
,也就是上面高亮的 [1]
和 [3]
部分。从 bootstrap
到 add_service
,以及最后的 run_forever
,它都是最主要的服务组成部分。而 Server
真正需要的内部逻辑,比如 example 中的 Load Balancer,则是通过 Service
的方式向 Server
注册的。
Service
在 pingora-core
中的定义如下:
/// The service interface#[async_trait]pub trait Service: Sync + Send { /// This function will be called when the server is ready to start the service. /// /// - `fds`: a collection of listening file descriptors. During zero downtime restart /// the `fds` would contain the listening sockets passed from the old service, services should /// take the sockets they need to use then. If the sockets the service looks for don't appear in /// the collection, the service should create its own listening sockets and then put them into /// the collection in order for them to be passed to the next server. /// - `shutdown`: the shutdown signal this server would receive. async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch);
/// The name of the service, just for logging and naming the threads assigned to this service /// /// Note that due to the limit of the underlying system, only the first 16 chars will be used fn name(&self) -> &str;
/// The preferred number of threads to run this service /// /// If `None`, the global setting will be used fn threads(&self) -> Option<usize> { None }}
最核心的就是 start_service
了。Service
需要负责热更新相关的事项,因此 start_service
接收了 fds
作为参数,用于在 server
更新时通过 fd 继承旧服务的 service
。此外,shutdown
也可以在停止时监听并作一些移交的事项。
HTTP Proxy Service
在 example 的 47 行,我们调用了 http_proxy_service
作为我们 LB 的 Wrapper
。这其实是一个 Service 初始化的快捷手段:
/// Create a [Service] from the user implemented [ProxyHttp].////// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> { Service::new( "Pingora HTTP Proxy Service".into(), HttpProxy::new(inner, conf.clone()), )}
在内部,我们实际上是创建了一个 struct Service
,并且在这个 Service 里包了一个 HttpProxy
。struct Service
实现了 trait Service
:
#[async_trait]impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {24 collapsed lines
async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) { let runtime = current_handle(); let endpoints = self.listeners.build(fds);
let handlers = endpoints.into_iter().map(|endpoint| { let app_logic = self.app_logic.clone(); let shutdown = shutdown.clone(); runtime.spawn(async move { Self::run_endpoint(app_logic, endpoint, shutdown).await; }) });
futures::future::join_all(handlers).await; self.listeners.cleanup(); self.app_logic.cleanup(); }
fn name(&self) -> &str { &self.name }
fn threads(&self) -> Option<usize> { self.threads }}
这使得通过 http_proxy_service
构造的 HttpProxy 能作为 Service
被 Server
加载。而 HttpProxy
则实现了 HttpServerApp
:
#[async_trait]impl<SV> HttpServerApp for HttpProxy<SV>where SV: ProxyHttp + Send + Sync + 'static, <SV as ProxyHttp>::CTX: Send + Sync,{ async fn process_new_http( self: &Arc<Self>, session: HttpSession, shutdown: &ShutdownWatch, ) -> Option<Stream> {18 collapsed lines
let session = Box::new(session);
// TODO: keepalive pool, use stack let mut session = match self.handle_new_request(session).await { Some(downstream_session) => Session::new(downstream_session), None => return None, // bad request };
if *shutdown.borrow() { // stop downstream from reusing if this service is shutting down soon session.set_keepalive(None); } else { // default 60s session.set_keepalive(Some(60)); }
let ctx = self.inner.new_ctx(); self.process_request(session, ctx).await }
fn http_cleanup(&self) {4 collapsed lines
// Notify all keepalived request blocking on read_request() to abort self.shutdown.notify_waiters();
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down() }
// TODO implement h2_options}
让我们看看 HttpServerApp
是什么。
ServerApp 与 HttpServerApp
提到 HttpServerApp
绕不开的就是 ServerApp
。我们先来看一下 ServerApp
:
#[cfg_attr(not(doc_async_trait), async_trait)]/// This trait defines the interface of a transport layer (TCP or TLS) application.pub trait ServerApp { /// Whenever a new connection is established, this function will be called with the established /// [`Stream`] object provided. /// /// The application can do whatever it wants with the `session`. /// /// After processing the `session`, if the `session`'s connection is reusable, This function /// can return it to the service by returning `Some(session)`. The returned `session` will be /// fed to another [`Self::process_new()`] for another round of processing. /// If not reusable, `None` should be returned. /// /// The `shutdown` argument will change from `false` to `true` when the server receives a /// signal to shutdown. This argument allows the application to react accordingly. async fn process_new( self: &Arc<Self>, mut session: Stream, // TODO: make this ShutdownWatch so that all task can await on this event shutdown: &ShutdownWatch, ) -> Option<Stream>;
/// This callback will be called once after the service stops listening to its endpoints. fn cleanup(&self) {}}
简单易懂。在新的连接进入时,Pingora
会调用 process_new()
进行处理;而当 Service
停止时,则会使用 cleanup()
进行清理。
再看看 HttpServerApp
:
/// This trait defines the interface of a HTTP application.#[cfg_attr(not(doc_async_trait), async_trait)]pub trait HttpServerApp { /// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established. /// /// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable /// connection back to the service. The caller needs to make sure that the connection is in a reusable state /// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned. async fn process_new_http( self: &Arc<Self>, mut session: ServerSession, // TODO: make this ShutdownWatch so that all task can await on this event shutdown: &ShutdownWatch, ) -> Option<Stream>;
/// Provide options on how HTTP/2 connection should be established. This function will be called /// every time a new HTTP/2 **connection** needs to be established. /// /// A `None` means to use the built-in default options. See [`server::H2Options`] for more details. fn h2_options(&self) -> Option<server::H2Options> { None }
fn http_cleanup(&self) {}}
首先是 process_new_http
。与 process_new
不能说是一致,几乎可以说是完全一样了。唯一的区别是传入的第二个参数 session
的类型从 Stream
变成了 ServerSession
。ServerSession
是 HTTP 下特有的 context,针对 http1
和 http2
分别存储了不同的内容。
Pingora
为所有 trait HttpServerApp
实现了 trait ServerApp
。因此上面实现了 HttpServerApp
的 HttpProxy
也就自动实现了 ServerApp
。