Skip to content

Learning Pingora 02 - A Simple HTTP Server

Published: at 20:28

比较有趣的是,作为一个通常意义上用于「反向代理」的工具,Pingora 中也有可以实现 HTTP Server 的部分。与大部分 HTTP Server 类似,但也有 Pingora 的醍醐味在里面。话不多说,赶紧端上来吧(

ToC

Example: Logging

官方示例里有一个 logging 的部分,代码如下:

1
pub struct MyGateway {
2
req_metric: prometheus::IntCounter,
3
}
4
5
#[async_trait]
6
impl ProxyHttp for MyGateway {
7
...
8
async fn logging(
9
&self,
10
session: &mut Session,
11
_e: Option<&pingora::Error>,
12
ctx: &mut Self::CTX,
13
) {
14
let response_code = session
15
.response_written()
16
.map_or(0, |resp| resp.status.as_u16());
17
// access log
18
info!(
19
"{} response code: {response_code}",
20
self.request_summary(session, ctx)
21
);
22
23
self.req_metric.inc();
24
}
25
26
fn main() {
27
...
28
let mut prometheus_service_http =
29
pingora::services::listening::Service::prometheus_http_service();
30
prometheus_service_http.add_tcp("127.0.0.1:6192");
31
my_server.add_service(prometheus_service_http);
32
33
my_server.run_forever();
34
}

可以看到,通过高亮部分的代码,这段 logging 的代码在 127.0.0.1:6192 上启动了一个 Prometheus metric server。这个 Server 是按什么逻辑去实现的呢?

trait ServeHttp

我们自底向上分解一下。首先来看 ServeHttp

pingora-core/src/apps/http_app.rs
30
/// This trait defines how to map a request to a response
31
#[cfg_attr(not(doc_async_trait), async_trait)]
32
pub trait ServeHttp {
33
/// Define the mapping from a request to a response.
34
/// Note that the request header is already read, but the implementation needs to read the
35
/// request body if any.
36
///
37
/// # Limitation
38
/// In this API, the entire response has to be generated before the end of this call.
39
/// So it is not suitable for streaming response or interactive communications.
40
/// Users need to implement their own [`super::HttpServerApp`] for those use cases.
41
async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
42
}

这是一个标准又简单的 HTTP Server,将 Request 映射为 Response。这个 Trait 的定义也比较简单,不支持 streaming,只能一次性返回全部 Body 内容。

struct HttpServer

HttpServer 则是一个捆绑了 ServeHttp 和一系列 HttpModules 中间件的存在:

pingora-core/src/apps/http_app.rs
110
/// A helper struct for HTTP server with http modules embedded
111
pub struct HttpServer<SV> {
112
app: SV,
113
modules: HttpModules,
114
}

针对 SV: ServeHttp,它实现了 HttpServerApp。这实际也是约束了 app 必须是一个实现了 ServeHttpServer 实现:

pingora-core/src/apps/http_app.rs
131
#[cfg_attr(not(doc_async_trait), async_trait)]
132
impl<SV> HttpServerApp for HttpServer<SV>
133
where
134
SV: ServeHttp + Send + Sync,
135
{
136
async fn process_new_http(
137
self: &Arc<Self>,
138
mut http: ServerSession,
139
shutdown: &ShutdownWatch,
140
) -> Option<Stream> {
141
// ...

HttpModule

HttpModule 可以简单理解成用于修改 ServeHttp 接收和返回的中间件,用于修改请求和返回的内容。Pingora 中有一系列用于构造/使用 HttpModule 的方式。

trait HttpModule

我们先从最基本的 Trait 定义来看:

pingora-core/src/modules/http/mod.rs
33
pub trait HttpModule {
34
fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
35
Ok(())
36
}
37
38
fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> {
39
Ok(body)
40
}
41
42
fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> {
43
Ok(())
44
}
45
46
fn as_any(&self) -> &dyn Any;
47
fn as_any_mut(&mut self) -> &mut dyn Any;
48
}
49
50
type Module = Box<dyn HttpModule + 'static + Send + Sync>

可以看到,HttpModule 对请求和返回都可以进行修改。对于请求,Trait 中定义了 request_header_filter()用于修改 header,以及 request_body_filter() 用来修改 Body;而对于返回,HttpTask 中的所有部分都可以更改。HttpTask 定义如下:

pingora-core/src/protocols/http/mod.rs
31
/// An enum to hold all possible HTTP response events.
32
#[derive(Debug)]
33
pub enum HttpTask {
34
/// the response header and the boolean end of response flag
35
Header(Box<pingora_http::ResponseHeader>, bool),
36
/// A piece of response header and the end of response boolean flag
37
Body(Option<bytes::Bytes>, bool),
38
/// HTTP response trailer
39
Trailer(Option<Box<http::HeaderMap>>),
40
/// Signal that the response is already finished
41
Done,
42
/// Signal that the reading of the response encounters errors.
43
Failed(pingora_error::BError),
44
}

基本上涵盖了 Response 返回的所有内容。

trait HttpModuleBuilder

定义了模块后,我们还不能直接使用。我们需要为这个模块创建一个 Builder,符合以下 Trait 的定义:

pingora-core/src/modules/http/mod.rs
54
/// Trait to init the http module ctx for each request
55
pub trait HttpModuleBuilder {
56
/// The order the module will run
57
///
58
/// The lower the value, the later it runs relative to other filters.
59
/// If the order of the filter is not important, leave it to the default 0.
60
fn order(&self) -> i16 {
61
0
62
}
63
64
/// Initialize and return the per request module context
65
fn init(&self) -> Module;
66
}
67
68
pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>

HttpModuleBuilder 中,Pingora 规定了 HttpModule 加载与应用的优先级。

HttpModules

最后,回到我们 HttpServer 中使用的 HttpModules 类型。这其实是一个很像 Builder 的结构,定义如下:

pingora-core/src/modules/http/mod.rs
70
/// The object to hold multiple http modules
71
pub struct HttpModules {
72
modules: Vec<ModuleBuilder>,
73
module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,
74
}
75
76
impl HttpModules {
77
/// Create a new [HttpModules]
78
pub fn new() -> Self {
79
HttpModules {
80
modules: vec![],
81
module_index: OnceCell::new(),
82
}
83
}
84
85
/// Add a new [ModuleBuilder] to [HttpModules]
86
///
87
/// Each type of [HttpModule] can be only added once.
88
/// # Panic
89
/// Panic if any [HttpModule] is added more than once.
90
pub fn add_module(&mut self, builder: ModuleBuilder) {
91
if self.module_index.get().is_some() {
92
// We use a shared module_index the index would be out of sync if we
93
// add more modules.
94
panic!("cannot add module after ctx is already built")
95
}
96
self.modules.push(builder);
97
// not the most efficient way but should be fine
98
// largest order first
99
self.modules.sort_by_key(|m| -m.order());
100
}
101
102
/// Build the contexts of all the modules added to this [HttpModules]
103
pub fn build_ctx(&self) -> HttpModuleCtx {
104
let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();
105
let module_index = self
106
.module_index
107
.get_or_init(|| {
108
let mut module_index = HashMap::with_capacity(self.modules.len());
109
for (i, c) in module_ctx.iter().enumerate() {
110
let exist = module_index.insert(c.as_any().type_id(), i);
111
if exist.is_some() {
112
panic!("duplicated filters found")
113
}
114
}
115
Arc::new(module_index)
116
})
117
.clone();
118
119
HttpModuleCtx {
120
module_ctx,
121
module_index,
122
}
123
}
124
}

在构造期间,我们需要通过 HttpModules::add_module 添加新的 ModuleBuilder。在每次添加时,都会以 HttpModuleBuilder::order() 进行排序,确定最终模块的执行顺序([2])。

在每次 HTTP 请求来临时,build_ctx() 都会被调用。在这个过程中,Pingora 会通过 HttpModuleBuilder::init() 创建对应的 HttpModule([3]),并且将顺序信息以 HashMap 的形式存储来生成一个 index([4])。顺序信息只会创建一次,之后便通过 OnceCell 的形式保留在 module_index 中,供后续复用([1])。

HttpModuleCtx

HttpModule 的最后一环就是 HttpModuleCtx 了。其实这个部分已经没什么可讲的了,就是很简单的顺序执行 Module 的内容,代码一看就懂了x

pingora-core/src/modules/http/mod.rs
126
/// The Contexts of multiple modules
127
///
128
/// This is the object that will apply all the included modules to a certain HTTP request.
129
/// The modules are ordered according to their `order()`.
130
pub struct HttpModuleCtx {
131
// the modules in the order of execution
132
module_ctx: Vec<Module>,
133
// find the module in the vec with its type ID
134
module_index: Arc<HashMap<TypeId, usize>>,
135
}
136
137
impl HttpModuleCtx {
138
/// Create a placeholder empty [HttpModuleCtx].
139
///
140
/// [HttpModules] should be used to create nonempty [HttpModuleCtx].
141
pub fn empty() -> Self {
142
HttpModuleCtx {
143
module_ctx: vec![],
144
module_index: Arc::new(HashMap::new()),
145
}
146
}
147
148
/// Get a ref to [HttpModule] if any.
149
pub fn get<T: 'static>(&self) -> Option<&T> {
150
let idx = self.module_index.get(&TypeId::of::<T>())?;
151
let ctx = &self.module_ctx[*idx];
152
Some(
153
ctx.as_any()
154
.downcast_ref::<T>()
155
.expect("type should always match"),
156
)
157
}
158
159
/// Get a mut ref to [HttpModule] if any.
160
pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
161
let idx = self.module_index.get(&TypeId::of::<T>())?;
162
let ctx = &mut self.module_ctx[*idx];
163
Some(
164
ctx.as_any_mut()
165
.downcast_mut::<T>()
166
.expect("type should always match"),
167
)
168
}
169
170
/// Run the `request_header_filter` for all the modules according to their orders.
171
pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
172
for filter in self.module_ctx.iter_mut() {
173
filter.request_header_filter(req)?;
174
}
175
Ok(())
176
}
177
178
/// Run the `request_body_filter` for all the modules according to their orders.
179
pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> {
180
for filter in self.module_ctx.iter_mut() {
181
body = filter.request_body_filter(body)?;
182
}
183
Ok(body)
184
}
185
186
/// Run the `response_filter` for all the modules according to their orders.
187
pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
188
for filter in self.module_ctx.iter_mut() {
189
filter.response_filter(t)?;
190
}
191
Ok(())
192
}
193
}

回到 Prometheus

看完一圈底层的实现,我们再回到 Prometheus。现在再来看 PrometheusServer 的实现就非常简单清晰了:

pingora-core/src/apps/prometheus_http_app.rs
26
/// A HTTP application that reports Prometheus metrics.
27
///
28
/// This application will report all the [static metrics](https://docs.rs/prometheus/latest/prometheus/index.html#static-metrics)
29
/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;
30
pub struct PrometheusHttpApp;
31
32
#[cfg_attr(not(doc_async_trait), async_trait)]
33
impl ServeHttp for PrometheusHttpApp {
34
async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> {
35
let encoder = TextEncoder::new();
36
let metric_families = prometheus::gather();
37
let mut buffer = vec![];
38
encoder.encode(&metric_families, &mut buffer).unwrap();
39
Response::builder()
40
.status(200)
41
.header(http::header::CONTENT_TYPE, encoder.format_type())
42
.header(http::header::CONTENT_LENGTH, buffer.len())
43
.body(buffer)
44
.unwrap()
45
}
46
}
47
48
/// The [HttpServer] for [PrometheusHttpApp]
49
///
50
/// This type provides the functionality of [PrometheusHttpApp] with compression enabled
51
pub type PrometheusServer = HttpServer<PrometheusHttpApp>;
52
53
impl PrometheusServer {
54
pub fn new() -> Self {
55
let mut server = Self::new_app(PrometheusHttpApp);
56
// enable gzip level 7 compression
57
server.add_module(ResponseCompressionBuilder::enable(7));
58
server
59
}
60
}

整个过程中最主要的步骤只有:

  1. PrometheusHttpApp 实现 ServeHttp
  2. 定义一个 PrometheusServer,将 PrometheusHttpApp 构造进去,并且添加上需要的 HttpModuleBuilder

真的是⑨都能看懂的程度,这里就不过多赘述了。

最后看看……ResponseCompression

PrometheusServer 中用到了 ResponseCompressionBuilder,而这也是 Pingora 目前在其仓库内实现的唯一一个 HttpModule。这里我们简单看看它在 pingora-core 中的基本实现,详情留到以后再展开(咕):

pingora-core/src/modules/http/compression.rs
20
/// HTTP response compression module
21
pub struct ResponseCompression(ResponseCompressionCtx);
22
23
impl HttpModule for ResponseCompression {
24
fn as_any(&self) -> &dyn std::any::Any {
25
self
26
}
27
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
28
self
29
}
30
31
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
32
self.0.request_filter(req);
33
Ok(())
34
}
35
36
fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
37
self.0.response_filter(t);
38
Ok(())
39
}
40
}
41
42
/// The builder for HTTP response compression module
43
pub struct ResponseCompressionBuilder {
44
level: u32,
45
}
46
47
impl ResponseCompressionBuilder {
48
/// Return a [ModuleBuilder] for [ResponseCompression] with the given compression level
49
pub fn enable(level: u32) -> ModuleBuilder {
50
Box::new(ResponseCompressionBuilder { level })
51
}
52
}
53
54
impl HttpModuleBuilder for ResponseCompressionBuilder {
55
fn init(&self) -> Module {
56
Box::new(ResponseCompression(ResponseCompressionCtx::new(
57
self.level, false,
58
)))
59
}
60
61
fn order(&self) -> i16 {
62
// run the response filter later than most others filters
63
i16::MIN / 2
64
}
65
}

嘛,就是这样(x