比较有趣的是,作为一个通常意义上用于「反向代理」的工具,Pingora
中也有可以实现 HTTP Server
的部分。与大部分 HTTP Server
类似,但也有 Pingora
的醍醐味在里面。话不多说,赶紧端上来吧(
ToC
- Example: Logging
- trait
ServeHttp
- struct
HttpServer
- HttpModule
- 回到 Prometheus
- 最后看看……
ResponseCompression
?
Example: Logging
官方示例里有一个 logging
的部分,代码如下:
1pub struct MyGateway {2 req_metric: prometheus::IntCounter,3}4
5#[async_trait]6impl ProxyHttp for MyGateway {7 ...8 async fn logging(9 &self,10 session: &mut Session,11 _e: Option<&pingora::Error>,12 ctx: &mut Self::CTX,13 ) {14 let response_code = session15 .response_written()16 .map_or(0, |resp| resp.status.as_u16());17 // access log18 info!(19 "{} response code: {response_code}",20 self.request_summary(session, ctx)21 );22
23 self.req_metric.inc();24 }25
26fn main() {27 ...28 let mut prometheus_service_http =29 pingora::services::listening::Service::prometheus_http_service();30 prometheus_service_http.add_tcp("127.0.0.1:6192");31 my_server.add_service(prometheus_service_http);32
33 my_server.run_forever();34}
可以看到,通过高亮部分的代码,这段 logging
的代码在 127.0.0.1:6192
上启动了一个 Prometheus metric server
。这个 Server
是按什么逻辑去实现的呢?
trait ServeHttp
我们自底向上分解一下。首先来看 ServeHttp
:
30/// This trait defines how to map a request to a response31#[cfg_attr(not(doc_async_trait), async_trait)]32pub trait ServeHttp {33 /// Define the mapping from a request to a response.34 /// Note that the request header is already read, but the implementation needs to read the35 /// request body if any.36 ///37 /// # Limitation38 /// In this API, the entire response has to be generated before the end of this call.39 /// So it is not suitable for streaming response or interactive communications.40 /// Users need to implement their own [`super::HttpServerApp`] for those use cases.41 async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;42}
这是一个标准又简单的 HTTP Server
,将 Request
映射为 Response
。这个 Trait
的定义也比较简单,不支持 streaming,只能一次性返回全部 Body
内容。
struct HttpServer
而 HttpServer
则是一个捆绑了 ServeHttp
和一系列 HttpModules
中间件的存在:
110/// A helper struct for HTTP server with http modules embedded111pub struct HttpServer<SV> {112 app: SV,113 modules: HttpModules,114}
针对 SV: ServeHttp
,它实现了 HttpServerApp
。这实际也是约束了 app
必须是一个实现了 ServeHttp
的 Server
实现:
131#[cfg_attr(not(doc_async_trait), async_trait)]132impl<SV> HttpServerApp for HttpServer<SV>133where134 SV: ServeHttp + Send + Sync,135{136 async fn process_new_http(137 self: &Arc<Self>,138 mut http: ServerSession,139 shutdown: &ShutdownWatch,140 ) -> Option<Stream> {141 // ...
HttpModule
HttpModule
可以简单理解成用于修改 ServeHttp
接收和返回的中间件,用于修改请求和返回的内容。Pingora
中有一系列用于构造/使用 HttpModule
的方式。
trait HttpModule
我们先从最基本的 Trait
定义来看:
33pub trait HttpModule {34 fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {35 Ok(())36 }37
38 fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> {39 Ok(body)40 }41
42 fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> {43 Ok(())44 }45
46 fn as_any(&self) -> &dyn Any;47 fn as_any_mut(&mut self) -> &mut dyn Any;48}49
50type Module = Box<dyn HttpModule + 'static + Send + Sync>
可以看到,HttpModule
对请求和返回都可以进行修改。对于请求,Trait
中定义了 request_header_filter()
用于修改 header
,以及 request_body_filter()
用来修改 Body
;而对于返回,HttpTask
中的所有部分都可以更改。HttpTask
定义如下:
31/// An enum to hold all possible HTTP response events.32#[derive(Debug)]33pub enum HttpTask {34 /// the response header and the boolean end of response flag35 Header(Box<pingora_http::ResponseHeader>, bool),36 /// A piece of response header and the end of response boolean flag37 Body(Option<bytes::Bytes>, bool),38 /// HTTP response trailer39 Trailer(Option<Box<http::HeaderMap>>),40 /// Signal that the response is already finished41 Done,42 /// Signal that the reading of the response encounters errors.43 Failed(pingora_error::BError),44}
基本上涵盖了 Response
返回的所有内容。
trait HttpModuleBuilder
定义了模块后,我们还不能直接使用。我们需要为这个模块创建一个 Builder
,符合以下 Trait
的定义:
54/// Trait to init the http module ctx for each request55pub trait HttpModuleBuilder {56 /// The order the module will run57 ///58 /// The lower the value, the later it runs relative to other filters.59 /// If the order of the filter is not important, leave it to the default 0.60 fn order(&self) -> i16 {61 062 }63
64 /// Initialize and return the per request module context65 fn init(&self) -> Module;66}67
68pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>
在 HttpModuleBuilder
中,Pingora
规定了 HttpModule
加载与应用的优先级。
HttpModules
最后,回到我们 HttpServer
中使用的 HttpModules
类型。这其实是一个很像 Builder
的结构,定义如下:
70/// The object to hold multiple http modules71pub struct HttpModules {72 modules: Vec<ModuleBuilder>,73 module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,74}75
76impl HttpModules {77 /// Create a new [HttpModules]78 pub fn new() -> Self {79 HttpModules {80 modules: vec![],81 module_index: OnceCell::new(),82 }83 }84
85 /// Add a new [ModuleBuilder] to [HttpModules]86 ///87 /// Each type of [HttpModule] can be only added once.88 /// # Panic89 /// Panic if any [HttpModule] is added more than once.90 pub fn add_module(&mut self, builder: ModuleBuilder) {91 if self.module_index.get().is_some() {92 // We use a shared module_index the index would be out of sync if we93 // add more modules.94 panic!("cannot add module after ctx is already built")95 }96 self.modules.push(builder);97 // not the most efficient way but should be fine98 // largest order first99 self.modules.sort_by_key(|m| -m.order());100 }101
102 /// Build the contexts of all the modules added to this [HttpModules]103 pub fn build_ctx(&self) -> HttpModuleCtx {104 let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();105 let module_index = self106 .module_index107 .get_or_init(|| {108 let mut module_index = HashMap::with_capacity(self.modules.len());109 for (i, c) in module_ctx.iter().enumerate() {110 let exist = module_index.insert(c.as_any().type_id(), i);111 if exist.is_some() {112 panic!("duplicated filters found")113 }114 }115 Arc::new(module_index)116 })117 .clone();118
119 HttpModuleCtx {120 module_ctx,121 module_index,122 }123 }124}
在构造期间,我们需要通过 HttpModules::add_module
添加新的 ModuleBuilder
。在每次添加时,都会以 HttpModuleBuilder::order()
进行排序,确定最终模块的执行顺序([2])。
在每次 HTTP
请求来临时,build_ctx()
都会被调用。在这个过程中,Pingora
会通过 HttpModuleBuilder::init()
创建对应的 HttpModule
([3]),并且将顺序信息以 HashMap
的形式存储来生成一个 index
([4])。顺序信息只会创建一次,之后便通过 OnceCell
的形式保留在 module_index
中,供后续复用([1])。
HttpModuleCtx
HttpModule
的最后一环就是 HttpModuleCtx
了。其实这个部分已经没什么可讲的了,就是很简单的顺序执行 Module
的内容,代码一看就懂了x
126/// The Contexts of multiple modules127///128/// This is the object that will apply all the included modules to a certain HTTP request.129/// The modules are ordered according to their `order()`.130pub struct HttpModuleCtx {131 // the modules in the order of execution132 module_ctx: Vec<Module>,133 // find the module in the vec with its type ID134 module_index: Arc<HashMap<TypeId, usize>>,135}136
137impl HttpModuleCtx {138 /// Create a placeholder empty [HttpModuleCtx].139 ///140 /// [HttpModules] should be used to create nonempty [HttpModuleCtx].141 pub fn empty() -> Self {142 HttpModuleCtx {143 module_ctx: vec![],144 module_index: Arc::new(HashMap::new()),145 }146 }147
148 /// Get a ref to [HttpModule] if any.149 pub fn get<T: 'static>(&self) -> Option<&T> {150 let idx = self.module_index.get(&TypeId::of::<T>())?;151 let ctx = &self.module_ctx[*idx];152 Some(153 ctx.as_any()154 .downcast_ref::<T>()155 .expect("type should always match"),156 )157 }158
159 /// Get a mut ref to [HttpModule] if any.160 pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {161 let idx = self.module_index.get(&TypeId::of::<T>())?;162 let ctx = &mut self.module_ctx[*idx];163 Some(164 ctx.as_any_mut()165 .downcast_mut::<T>()166 .expect("type should always match"),167 )168 }169
170 /// Run the `request_header_filter` for all the modules according to their orders.171 pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {172 for filter in self.module_ctx.iter_mut() {173 filter.request_header_filter(req)?;174 }175 Ok(())176 }177
178 /// Run the `request_body_filter` for all the modules according to their orders.179 pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> {180 for filter in self.module_ctx.iter_mut() {181 body = filter.request_body_filter(body)?;182 }183 Ok(body)184 }185
186 /// Run the `response_filter` for all the modules according to their orders.187 pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {188 for filter in self.module_ctx.iter_mut() {189 filter.response_filter(t)?;190 }191 Ok(())192 }193}
回到 Prometheus
看完一圈底层的实现,我们再回到 Prometheus
。现在再来看 PrometheusServer
的实现就非常简单清晰了:
26/// A HTTP application that reports Prometheus metrics.27///28/// This application will report all the [static metrics](https://docs.rs/prometheus/latest/prometheus/index.html#static-metrics)29/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;30pub struct PrometheusHttpApp;31
32#[cfg_attr(not(doc_async_trait), async_trait)]33impl ServeHttp for PrometheusHttpApp {34 async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> {35 let encoder = TextEncoder::new();36 let metric_families = prometheus::gather();37 let mut buffer = vec![];38 encoder.encode(&metric_families, &mut buffer).unwrap();39 Response::builder()40 .status(200)41 .header(http::header::CONTENT_TYPE, encoder.format_type())42 .header(http::header::CONTENT_LENGTH, buffer.len())43 .body(buffer)44 .unwrap()45 }46}47
48/// The [HttpServer] for [PrometheusHttpApp]49///50/// This type provides the functionality of [PrometheusHttpApp] with compression enabled51pub type PrometheusServer = HttpServer<PrometheusHttpApp>;52
53impl PrometheusServer {54 pub fn new() -> Self {55 let mut server = Self::new_app(PrometheusHttpApp);56 // enable gzip level 7 compression57 server.add_module(ResponseCompressionBuilder::enable(7));58 server59 }60}
整个过程中最主要的步骤只有:
- 给
PrometheusHttpApp
实现ServeHttp
- 定义一个
PrometheusServer
,将PrometheusHttpApp
构造进去,并且添加上需要的HttpModuleBuilder
真的是⑨都能看懂的程度,这里就不过多赘述了。
最后看看……ResponseCompression
?
PrometheusServer
中用到了 ResponseCompressionBuilder
,而这也是 Pingora
目前在其仓库内实现的唯一一个 HttpModule
。这里我们简单看看它在 pingora-core
中的基本实现,详情留到以后再展开(咕):
20/// HTTP response compression module21pub struct ResponseCompression(ResponseCompressionCtx);22
23impl HttpModule for ResponseCompression {24 fn as_any(&self) -> &dyn std::any::Any {25 self26 }27 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {28 self29 }30
31 fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {32 self.0.request_filter(req);33 Ok(())34 }35
36 fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {37 self.0.response_filter(t);38 Ok(())39 }40}41
42/// The builder for HTTP response compression module43pub struct ResponseCompressionBuilder {44 level: u32,45}46
47impl ResponseCompressionBuilder {48 /// Return a [ModuleBuilder] for [ResponseCompression] with the given compression level49 pub fn enable(level: u32) -> ModuleBuilder {50 Box::new(ResponseCompressionBuilder { level })51 }52}53
54impl HttpModuleBuilder for ResponseCompressionBuilder {55 fn init(&self) -> Module {56 Box::new(ResponseCompression(ResponseCompressionCtx::new(57 self.level, false,58 )))59 }60
61 fn order(&self) -> i16 {62 // run the response filter later than most others filters63 i16::MIN / 264 }65}
嘛,就是这样(x