比较有趣的是,作为一个通常意义上用于「反向代理」的工具,Pingora
中也有可以实现 HTTP Server
的部分。与大部分 HTTP Server
类似,但也有 Pingora
的醍醐味在里面。话不多说,赶紧端上来吧(
ToC
- Example: Logging
- trait
ServeHttp
- struct
HttpServer
- HttpModule
- 回到 Prometheus
- 最后看看……
ResponseCompression
?
Example: Logging
官方示例里有一个 logging
的部分,代码如下:
pub struct MyGateway { req_metric: prometheus::IntCounter,}
#[async_trait]impl ProxyHttp for MyGateway { ... async fn logging( &self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX, ) { let response_code = session .response_written() .map_or(0, |resp| resp.status.as_u16()); // access log info!( "{} response code: {response_code}", self.request_summary(session, ctx) );
self.req_metric.inc(); }
fn main() { ... let mut prometheus_service_http = pingora::services::listening::Service::prometheus_http_service(); prometheus_service_http.add_tcp("127.0.0.1:6192"); my_server.add_service(prometheus_service_http);
my_server.run_forever();}
可以看到,通过高亮部分的代码,这段 logging
的代码在 127.0.0.1:6192
上启动了一个 Prometheus metric server
。这个 Server
是按什么逻辑去实现的呢?
trait ServeHttp
我们自底向上分解一下。首先来看 ServeHttp
:
/// This trait defines how to map a request to a response#[cfg_attr(not(doc_async_trait), async_trait)]pub trait ServeHttp { /// Define the mapping from a request to a response. /// Note that the request header is already read, but the implementation needs to read the /// request body if any. /// /// # Limitation /// In this API, the entire response has to be generated before the end of this call. /// So it is not suitable for streaming response or interactive communications. /// Users need to implement their own [`super::HttpServerApp`] for those use cases. async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;}
这是一个标准又简单的 HTTP Server
,将 Request
映射为 Response
。这个 Trait
的定义也比较简单,不支持 streaming,只能一次性返回全部 Body
内容。
struct HttpServer
而 HttpServer
则是一个捆绑了 ServeHttp
和一系列 HttpModules
中间件的存在:
/// A helper struct for HTTP server with http modules embeddedpub struct HttpServer<SV> { app: SV, modules: HttpModules,}
针对 SV: ServeHttp
,它实现了 HttpServerApp
。这实际也是约束了 app
必须是一个实现了 ServeHttp
的 Server
实现:
#[cfg_attr(not(doc_async_trait), async_trait)]impl<SV> HttpServerApp for HttpServer<SV>where SV: ServeHttp + Send + Sync,{ async fn process_new_http( self: &Arc<Self>, mut http: ServerSession, shutdown: &ShutdownWatch, ) -> Option<Stream> { // ...
HttpModule
HttpModule
可以简单理解成用于修改 ServeHttp
接收和返回的中间件,用于修改请求和返回的内容。Pingora
中有一系列用于构造/使用 HttpModule
的方式。
trait HttpModule
我们先从最基本的 Trait
定义来看:
pub trait HttpModule { fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> { Ok(()) }
fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> { Ok(body) }
fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> { Ok(()) }
fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any;}
type Module = Box<dyn HttpModule + 'static + Send + Sync>
可以看到,HttpModule
对请求和返回都可以进行修改。对于请求,Trait
中定义了 request_header_filter()
用于修改 header
,以及 request_body_filter()
用来修改 Body
;而对于返回,HttpTask
中的所有部分都可以更改。HttpTask
定义如下:
/// An enum to hold all possible HTTP response events.#[derive(Debug)]pub enum HttpTask { /// the response header and the boolean end of response flag Header(Box<pingora_http::ResponseHeader>, bool), /// A piece of response header and the end of response boolean flag Body(Option<bytes::Bytes>, bool), /// HTTP response trailer Trailer(Option<Box<http::HeaderMap>>), /// Signal that the response is already finished Done, /// Signal that the reading of the response encounters errors. Failed(pingora_error::BError),}
基本上涵盖了 Response
返回的所有内容。
trait HttpModuleBuilder
定义了模块后,我们还不能直接使用。我们需要为这个模块创建一个 Builder
,符合以下 Trait
的定义:
/// Trait to init the http module ctx for each requestpub trait HttpModuleBuilder { /// The order the module will run /// /// The lower the value, the later it runs relative to other filters. /// If the order of the filter is not important, leave it to the default 0. fn order(&self) -> i16 { 0 }
/// Initialize and return the per request module context fn init(&self) -> Module;}
pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>
在 HttpModuleBuilder
中,Pingora
规定了 HttpModule
加载与应用的优先级。
HttpModules
最后,回到我们 HttpServer
中使用的 HttpModules
类型。这其实是一个很像 Builder
的结构,定义如下:
/// The object to hold multiple http modulespub struct HttpModules { modules: Vec<ModuleBuilder>, module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,}
impl HttpModules { /// Create a new [HttpModules] pub fn new() -> Self { HttpModules { modules: vec![], module_index: OnceCell::new(), } }
/// Add a new [ModuleBuilder] to [HttpModules] /// /// Each type of [HttpModule] can be only added once. /// # Panic /// Panic if any [HttpModule] is added more than once. pub fn add_module(&mut self, builder: ModuleBuilder) { if self.module_index.get().is_some() { // We use a shared module_index the index would be out of sync if we // add more modules. panic!("cannot add module after ctx is already built") } self.modules.push(builder); // not the most efficient way but should be fine // largest order first self.modules.sort_by_key(|m| -m.order()); }
/// Build the contexts of all the modules added to this [HttpModules] pub fn build_ctx(&self) -> HttpModuleCtx { let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect(); let module_index = self .module_index .get_or_init(|| { let mut module_index = HashMap::with_capacity(self.modules.len()); for (i, c) in module_ctx.iter().enumerate() { let exist = module_index.insert(c.as_any().type_id(), i); if exist.is_some() { panic!("duplicated filters found") } } Arc::new(module_index) }) .clone();
HttpModuleCtx { module_ctx, module_index, } }}
在构造期间,我们需要通过 HttpModules::add_module
添加新的 ModuleBuilder
。在每次添加时,都会以 HttpModuleBuilder::order()
进行排序,确定最终模块的执行顺序([2])。
在每次 HTTP
请求来临时,build_ctx()
都会被调用。在这个过程中,Pingora
会通过 HttpModuleBuilder::init()
创建对应的 HttpModule
([3]),并且将顺序信息以 HashMap
的形式存储来生成一个 index
([4])。顺序信息只会创建一次,之后便通过 OnceCell
的形式保留在 module_index
中,供后续复用([1])。
HttpModuleCtx
HttpModule
的最后一环就是 HttpModuleCtx
了。其实这个部分已经没什么可讲的了,就是很简单的顺序执行 Module
的内容,代码一看就懂了x
/// The Contexts of multiple modules////// This is the object that will apply all the included modules to a certain HTTP request./// The modules are ordered according to their `order()`.pub struct HttpModuleCtx { // the modules in the order of execution module_ctx: Vec<Module>, // find the module in the vec with its type ID module_index: Arc<HashMap<TypeId, usize>>,}
impl HttpModuleCtx { /// Create a placeholder empty [HttpModuleCtx]. /// /// [HttpModules] should be used to create nonempty [HttpModuleCtx]. pub fn empty() -> Self { HttpModuleCtx { module_ctx: vec![], module_index: Arc::new(HashMap::new()), } }
/// Get a ref to [HttpModule] if any. pub fn get<T: 'static>(&self) -> Option<&T> { let idx = self.module_index.get(&TypeId::of::<T>())?; let ctx = &self.module_ctx[*idx]; Some( ctx.as_any() .downcast_ref::<T>() .expect("type should always match"), ) }
/// Get a mut ref to [HttpModule] if any. pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> { let idx = self.module_index.get(&TypeId::of::<T>())?; let ctx = &mut self.module_ctx[*idx]; Some( ctx.as_any_mut() .downcast_mut::<T>() .expect("type should always match"), ) }
/// Run the `request_header_filter` for all the modules according to their orders. pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> { for filter in self.module_ctx.iter_mut() { filter.request_header_filter(req)?; } Ok(()) }
/// Run the `request_body_filter` for all the modules according to their orders. pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> { for filter in self.module_ctx.iter_mut() { body = filter.request_body_filter(body)?; } Ok(body) }
/// Run the `response_filter` for all the modules according to their orders. pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> { for filter in self.module_ctx.iter_mut() { filter.response_filter(t)?; } Ok(()) }}
回到 Prometheus
看完一圈底层的实现,我们再回到 Prometheus
。现在再来看 PrometheusServer
的实现就非常简单清晰了:
/// A HTTP application that reports Prometheus metrics.////// This application will report all the [static metrics](https://docs.rs/prometheus/latest/prometheus/index.html#static-metrics)/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;pub struct PrometheusHttpApp;
#[cfg_attr(not(doc_async_trait), async_trait)]impl ServeHttp for PrometheusHttpApp { async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> { let encoder = TextEncoder::new(); let metric_families = prometheus::gather(); let mut buffer = vec![]; encoder.encode(&metric_families, &mut buffer).unwrap(); Response::builder() .status(200) .header(http::header::CONTENT_TYPE, encoder.format_type()) .header(http::header::CONTENT_LENGTH, buffer.len()) .body(buffer) .unwrap() }}
/// The [HttpServer] for [PrometheusHttpApp]////// This type provides the functionality of [PrometheusHttpApp] with compression enabledpub type PrometheusServer = HttpServer<PrometheusHttpApp>;
impl PrometheusServer { pub fn new() -> Self { let mut server = Self::new_app(PrometheusHttpApp); // enable gzip level 7 compression server.add_module(ResponseCompressionBuilder::enable(7)); server }}
整个过程中最主要的步骤只有:
- 给
PrometheusHttpApp
实现ServeHttp
- 定义一个
PrometheusServer
,将PrometheusHttpApp
构造进去,并且添加上需要的HttpModuleBuilder
真的是⑨都能看懂的程度,这里就不过多赘述了。
最后看看……ResponseCompression
?
PrometheusServer
中用到了 ResponseCompressionBuilder
,而这也是 Pingora
目前在其仓库内实现的唯一一个 HttpModule
。这里我们简单看看它在 pingora-core
中的基本实现,详情留到以后再展开(咕):
/// HTTP response compression modulepub struct ResponseCompression(ResponseCompressionCtx);
impl HttpModule for ResponseCompression { fn as_any(&self) -> &dyn std::any::Any { self } fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self }
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> { self.0.request_filter(req); Ok(()) }
fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> { self.0.response_filter(t); Ok(()) }}
/// The builder for HTTP response compression modulepub struct ResponseCompressionBuilder { level: u32,}
impl ResponseCompressionBuilder { /// Return a [ModuleBuilder] for [ResponseCompression] with the given compression level pub fn enable(level: u32) -> ModuleBuilder { Box::new(ResponseCompressionBuilder { level }) }}
impl HttpModuleBuilder for ResponseCompressionBuilder { fn init(&self) -> Module { Box::new(ResponseCompression(ResponseCompressionCtx::new( self.level, false, ))) }
fn order(&self) -> i16 { // run the response filter later than most others filters i16::MIN / 2 }}
嘛,就是这样(x