ToC
示例代码
让我们首先按照 Pingora
的 example 来看吧。首先是建立一个项目:
cargo init pingora-learning
然后是照葫芦画瓢:
use async_trait :: async_trait;
pub struct LB ( Arc < LoadBalancer < RoundRobin >>);
/// For this small example, we don't need context storage
fn new_ctx ( & self ) -> () {
async fn upstream_peer ( & self , _session : & mut Session , _ctx : & mut ()) -> Result < Box < HttpPeer >> {
. select ( b"" , 256 ) // hash doesn't matter for round robin
println! ( "upstream peer is: {upstream:?}" );
// Set SNI to one.one.one.one
let peer = Box :: new ( HttpPeer :: new ( upstream , true , "one.one.one.one" . to_string ()));
async fn upstream_request_filter (
upstream_request : & mut RequestHeader ,
. insert_header ( "Host" , "one.one.one.one" )
let mut my_server = Server :: new ( None ) . unwrap ();
let upstreams = LoadBalancer :: try_from_iter ([ "1.1.1.1:443" , "1.0.0.1:443" ]) . unwrap ();
let mut lb = http_proxy_service ( & my_server . configuration, LB ( Arc :: new ( upstreams )));
lb . add_tcp ( "0.0.0.0:6188" );
my_server . add_service ( lb );
这个案例足够简单,让我们看看 Pingora
实际是怎么操作的。
Server、Service 与 Http
让我们首先观察一下 fn main()
的实现。
在整个 main
中,贯彻始终的是 Server
,也就是上面高亮的 [1]
和 [3]
部分。从 bootstrap
到 add_service
,以及最后的 run_forever
,它都是最主要的服务组成部分。而 Server
真正需要的内部逻辑,比如 example 中的 Load Balancer,则是通过 Service
的方式向 Server
注册的。
Service
在 pingora-core
中的定义如下:
/// The service interface
pub trait Service : Sync + Send {
/// This function will be called when the server is ready to start the service.
/// - `fds`: a collection of listening file descriptors. During zero downtime restart
/// the `fds` would contain the listening sockets passed from the old service, services should
/// take the sockets they need to use then. If the sockets the service looks for don't appear in
/// the collection, the service should create its own listening sockets and then put them into
/// the collection in order for them to be passed to the next server.
/// - `shutdown`: the shutdown signal this server would receive.
async fn start_service ( & mut self , fds : Option < ListenFds >, mut shutdown : ShutdownWatch );
/// The name of the service, just for logging and naming the threads assigned to this service
/// Note that due to the limit of the underlying system, only the first 16 chars will be used
/// The preferred number of threads to run this service
/// If `None`, the global setting will be used
fn threads ( & self ) -> Option < usize > {
最核心的就是 start_service
了。Service
需要负责热更新相关的事项,因此 start_service
接收了 fds
作为参数,用于在 server
更新时通过 fd 继承旧服务的 service
。此外,shutdown
也可以在停止时监听并作一些移交的事项。
HTTP Proxy Service
在 example 的 47 行,我们调用了 http_proxy_service
作为我们 LB 的 Wrapper
。这其实是一个 Service 初始化的快捷手段:
/// Create a [Service] from the user implemented [ProxyHttp].
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service < SV >( conf : & Arc < ServerConf >, inner : SV ) -> Service < HttpProxy < SV >> {
"Pingora HTTP Proxy Service" . into (),
HttpProxy :: new ( inner , conf . clone ()),
在内部,我们实际上是创建了一个 struct Service
,并且在这个 Service 里包了一个 HttpProxy
。struct Service
实现了 trait Service
:
impl < A : ServerApp + Send + Sync + ' static > ServiceTrait for Service < A > {
async fn start_service ( & mut self , fds : Option < ListenFds >, shutdown : ShutdownWatch ) {
let runtime = current_handle ();
let endpoints = self . listeners . build ( fds );
let handlers = endpoints . into_iter () . map ( | endpoint | {
let app_logic = self . app_logic . clone ();
let shutdown = shutdown . clone ();
runtime . spawn ( async move {
Self :: run_endpoint ( app_logic , endpoint , shutdown ) . await ;
futures :: future :: join_all ( handlers ) . await ;
self . listeners . cleanup ();
self . app_logic . cleanup ();
fn threads ( & self ) -> Option < usize > {
这使得通过 http_proxy_service
构造的 HttpProxy 能作为 Service
被 Server
加载。而 HttpProxy
则实现了 HttpServerApp
:
impl < SV > HttpServerApp for HttpProxy < SV >
SV : ProxyHttp + Send + Sync + ' static ,
< SV as ProxyHttp > :: CTX : Send + Sync ,
async fn process_new_http (
shutdown : & ShutdownWatch ,
let session = Box :: new ( session );
// TODO: keepalive pool, use stack
let mut session = match self . handle_new_request ( session ) . await {
Some ( downstream_session ) => Session :: new ( downstream_session ),
None => return None , // bad request
// stop downstream from reusing if this service is shutting down soon
session . set_keepalive ( None );
session . set_keepalive ( Some ( 60 ));
let ctx = self . inner . new_ctx ();
self . process_request ( session , ctx ) . await
// Notify all keepalived request blocking on read_request() to abort
self . shutdown . notify_waiters ();
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
// TODO implement h2_options
让我们看看 HttpServerApp
是什么。
ServerApp 与 HttpServerApp
提到 HttpServerApp
绕不开的就是 ServerApp
。我们先来看一下 ServerApp
:
#[cfg_attr(not(doc_async_trait), async_trait)]
/// This trait defines the interface of a transport layer (TCP or TLS) application.
/// Whenever a new connection is established, this function will be called with the established
/// [`Stream`] object provided.
/// The application can do whatever it wants with the `session`.
/// After processing the `session`, if the `session`'s connection is reusable, This function
/// can return it to the service by returning `Some(session)`. The returned `session` will be
/// fed to another [`Self::process_new()`] for another round of processing.
/// If not reusable, `None` should be returned.
/// The `shutdown` argument will change from `false` to `true` when the server receives a
/// signal to shutdown. This argument allows the application to react accordingly.
// TODO: make this ShutdownWatch so that all task can await on this event
shutdown : & ShutdownWatch ,
/// This callback will be called once after the service stops listening to its endpoints.
简单易懂。在新的连接进入时,Pingora
会调用 process_new()
进行处理;而当 Service
停止时,则会使用 cleanup()
进行清理。
再看看 HttpServerApp
:
/// This trait defines the interface of a HTTP application.
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait HttpServerApp {
/// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
/// After successful processing, [`ServerSession::finish()`] can be called to return an optionally reusable
/// connection back to the service. The caller needs to make sure that the connection is in a reusable state
/// i.e., no error or incomplete read or write headers or bodies. Otherwise a `None` should be returned.
async fn process_new_http (
mut session : ServerSession ,
// TODO: make this ShutdownWatch so that all task can await on this event
shutdown : & ShutdownWatch ,
/// Provide options on how HTTP/2 connection should be established. This function will be called
/// every time a new HTTP/2 **connection** needs to be established.
/// A `None` means to use the built-in default options. See [`server::H2Options`] for more details.
fn h2_options ( & self ) -> Option < server :: H2Options > {
fn http_cleanup ( & self ) {}
首先是 process_new_http
。与 process_new
不能说是一致,几乎可以说是完全一样了。唯一的区别是传入的第二个参数 session
的类型从 Stream
变成了 ServerSession
。ServerSession
是 HTTP 下特有的 context,针对 http1
和 http2
分别存储了不同的内容。
Pingora
为所有 trait HttpServerApp
实现了 trait ServerApp
。因此上面实现了 HttpServerApp
的 HttpProxy
也就自动实现了 ServerApp
。