Skip to content

Learning Pingora 02 - A Simple HTTP Server

Published: at 20:28

比较有趣的是,作为一个通常意义上用于「反向代理」的工具,Pingora 中也有可以实现 HTTP Server 的部分。与大部分 HTTP Server 类似,但也有 Pingora 的醍醐味在里面。话不多说,赶紧端上来吧(

ToC

Example: Logging

官方示例里有一个 logging 的部分,代码如下:

pub struct MyGateway {
req_metric: prometheus::IntCounter,
}
#[async_trait]
impl ProxyHttp for MyGateway {
...
async fn logging(
&self,
session: &mut Session,
_e: Option<&pingora::Error>,
ctx: &mut Self::CTX,
) {
let response_code = session
.response_written()
.map_or(0, |resp| resp.status.as_u16());
// access log
info!(
"{} response code: {response_code}",
self.request_summary(session, ctx)
);
self.req_metric.inc();
}
fn main() {
...
let mut prometheus_service_http =
pingora::services::listening::Service::prometheus_http_service();
prometheus_service_http.add_tcp("127.0.0.1:6192");
my_server.add_service(prometheus_service_http);
my_server.run_forever();
}

可以看到,通过高亮部分的代码,这段 logging 的代码在 127.0.0.1:6192 上启动了一个 Prometheus metric server。这个 Server 是按什么逻辑去实现的呢?

trait ServeHttp

我们自底向上分解一下。首先来看 ServeHttp

pingora-core/src/apps/http_app.rs
/// This trait defines how to map a request to a response
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait ServeHttp {
/// Define the mapping from a request to a response.
/// Note that the request header is already read, but the implementation needs to read the
/// request body if any.
///
/// # Limitation
/// In this API, the entire response has to be generated before the end of this call.
/// So it is not suitable for streaming response or interactive communications.
/// Users need to implement their own [`super::HttpServerApp`] for those use cases.
async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
}

这是一个标准又简单的 HTTP Server,将 Request 映射为 Response。这个 Trait 的定义也比较简单,不支持 streaming,只能一次性返回全部 Body 内容。

struct HttpServer

HttpServer 则是一个捆绑了 ServeHttp 和一系列 HttpModules 中间件的存在:

pingora-core/src/apps/http_app.rs
/// A helper struct for HTTP server with http modules embedded
pub struct HttpServer<SV> {
app: SV,
modules: HttpModules,
}

针对 SV: ServeHttp,它实现了 HttpServerApp。这实际也是约束了 app 必须是一个实现了 ServeHttpServer 实现:

pingora-core/src/apps/http_app.rs
#[cfg_attr(not(doc_async_trait), async_trait)]
impl<SV> HttpServerApp for HttpServer<SV>
where
SV: ServeHttp + Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
// ...

HttpModule

HttpModule 可以简单理解成用于修改 ServeHttp 接收和返回的中间件,用于修改请求和返回的内容。Pingora 中有一系列用于构造/使用 HttpModule 的方式。

trait HttpModule

我们先从最基本的 Trait 定义来看:

pingora-core/src/modules/http/mod.rs
pub trait HttpModule {
fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
Ok(())
}
fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> {
Ok(body)
}
fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
type Module = Box<dyn HttpModule + 'static + Send + Sync>

可以看到,HttpModule 对请求和返回都可以进行修改。对于请求,Trait 中定义了 request_header_filter()用于修改 header,以及 request_body_filter() 用来修改 Body;而对于返回,HttpTask 中的所有部分都可以更改。HttpTask 定义如下:

pingora-core/src/protocols/http/mod.rs
/// An enum to hold all possible HTTP response events.
#[derive(Debug)]
pub enum HttpTask {
/// the response header and the boolean end of response flag
Header(Box<pingora_http::ResponseHeader>, bool),
/// A piece of response header and the end of response boolean flag
Body(Option<bytes::Bytes>, bool),
/// HTTP response trailer
Trailer(Option<Box<http::HeaderMap>>),
/// Signal that the response is already finished
Done,
/// Signal that the reading of the response encounters errors.
Failed(pingora_error::BError),
}

基本上涵盖了 Response 返回的所有内容。

trait HttpModuleBuilder

定义了模块后,我们还不能直接使用。我们需要为这个模块创建一个 Builder,符合以下 Trait 的定义:

pingora-core/src/modules/http/mod.rs
/// Trait to init the http module ctx for each request
pub trait HttpModuleBuilder {
/// The order the module will run
///
/// The lower the value, the later it runs relative to other filters.
/// If the order of the filter is not important, leave it to the default 0.
fn order(&self) -> i16 {
0
}
/// Initialize and return the per request module context
fn init(&self) -> Module;
}
pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>

HttpModuleBuilder 中,Pingora 规定了 HttpModule 加载与应用的优先级。

HttpModules

最后,回到我们 HttpServer 中使用的 HttpModules 类型。这其实是一个很像 Builder 的结构,定义如下:

pingora-core/src/modules/http/mod.rs
/// The object to hold multiple http modules
pub struct HttpModules {
modules: Vec<ModuleBuilder>,
module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,
}
impl HttpModules {
/// Create a new [HttpModules]
pub fn new() -> Self {
HttpModules {
modules: vec![],
module_index: OnceCell::new(),
}
}
/// Add a new [ModuleBuilder] to [HttpModules]
///
/// Each type of [HttpModule] can be only added once.
/// # Panic
/// Panic if any [HttpModule] is added more than once.
pub fn add_module(&mut self, builder: ModuleBuilder) {
if self.module_index.get().is_some() {
// We use a shared module_index the index would be out of sync if we
// add more modules.
panic!("cannot add module after ctx is already built")
}
self.modules.push(builder);
// not the most efficient way but should be fine
// largest order first
self.modules.sort_by_key(|m| -m.order());
}
/// Build the contexts of all the modules added to this [HttpModules]
pub fn build_ctx(&self) -> HttpModuleCtx {
let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();
let module_index = self
.module_index
.get_or_init(|| {
let mut module_index = HashMap::with_capacity(self.modules.len());
for (i, c) in module_ctx.iter().enumerate() {
let exist = module_index.insert(c.as_any().type_id(), i);
if exist.is_some() {
panic!("duplicated filters found")
}
}
Arc::new(module_index)
})
.clone();
HttpModuleCtx {
module_ctx,
module_index,
}
}
}

在构造期间,我们需要通过 HttpModules::add_module 添加新的 ModuleBuilder。在每次添加时,都会以 HttpModuleBuilder::order() 进行排序,确定最终模块的执行顺序([2])。

在每次 HTTP 请求来临时,build_ctx() 都会被调用。在这个过程中,Pingora 会通过 HttpModuleBuilder::init() 创建对应的 HttpModule([3]),并且将顺序信息以 HashMap 的形式存储来生成一个 index([4])。顺序信息只会创建一次,之后便通过 OnceCell 的形式保留在 module_index 中,供后续复用([1])。

HttpModuleCtx

HttpModule 的最后一环就是 HttpModuleCtx 了。其实这个部分已经没什么可讲的了,就是很简单的顺序执行 Module 的内容,代码一看就懂了x

pingora-core/src/modules/http/mod.rs
/// The Contexts of multiple modules
///
/// This is the object that will apply all the included modules to a certain HTTP request.
/// The modules are ordered according to their `order()`.
pub struct HttpModuleCtx {
// the modules in the order of execution
module_ctx: Vec<Module>,
// find the module in the vec with its type ID
module_index: Arc<HashMap<TypeId, usize>>,
}
impl HttpModuleCtx {
/// Create a placeholder empty [HttpModuleCtx].
///
/// [HttpModules] should be used to create nonempty [HttpModuleCtx].
pub fn empty() -> Self {
HttpModuleCtx {
module_ctx: vec![],
module_index: Arc::new(HashMap::new()),
}
}
/// Get a ref to [HttpModule] if any.
pub fn get<T: 'static>(&self) -> Option<&T> {
let idx = self.module_index.get(&TypeId::of::<T>())?;
let ctx = &self.module_ctx[*idx];
Some(
ctx.as_any()
.downcast_ref::<T>()
.expect("type should always match"),
)
}
/// Get a mut ref to [HttpModule] if any.
pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
let idx = self.module_index.get(&TypeId::of::<T>())?;
let ctx = &mut self.module_ctx[*idx];
Some(
ctx.as_any_mut()
.downcast_mut::<T>()
.expect("type should always match"),
)
}
/// Run the `request_header_filter` for all the modules according to their orders.
pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
filter.request_header_filter(req)?;
}
Ok(())
}
/// Run the `request_body_filter` for all the modules according to their orders.
pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> {
for filter in self.module_ctx.iter_mut() {
body = filter.request_body_filter(body)?;
}
Ok(body)
}
/// Run the `response_filter` for all the modules according to their orders.
pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
filter.response_filter(t)?;
}
Ok(())
}
}

回到 Prometheus

看完一圈底层的实现,我们再回到 Prometheus。现在再来看 PrometheusServer 的实现就非常简单清晰了:

pingora-core/src/apps/prometheus_http_app.rs
/// A HTTP application that reports Prometheus metrics.
///
/// This application will report all the [static metrics](https://docs.rs/prometheus/latest/prometheus/index.html#static-metrics)
/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;
pub struct PrometheusHttpApp;
#[cfg_attr(not(doc_async_trait), async_trait)]
impl ServeHttp for PrometheusHttpApp {
async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
Response::builder()
.status(200)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.header(http::header::CONTENT_LENGTH, buffer.len())
.body(buffer)
.unwrap()
}
}
/// The [HttpServer] for [PrometheusHttpApp]
///
/// This type provides the functionality of [PrometheusHttpApp] with compression enabled
pub type PrometheusServer = HttpServer<PrometheusHttpApp>;
impl PrometheusServer {
pub fn new() -> Self {
let mut server = Self::new_app(PrometheusHttpApp);
// enable gzip level 7 compression
server.add_module(ResponseCompressionBuilder::enable(7));
server
}
}

整个过程中最主要的步骤只有:

  1. PrometheusHttpApp 实现 ServeHttp
  2. 定义一个 PrometheusServer,将 PrometheusHttpApp 构造进去,并且添加上需要的 HttpModuleBuilder

真的是⑨都能看懂的程度,这里就不过多赘述了。

最后看看……ResponseCompression

PrometheusServer 中用到了 ResponseCompressionBuilder,而这也是 Pingora 目前在其仓库内实现的唯一一个 HttpModule。这里我们简单看看它在 pingora-core 中的基本实现,详情留到以后再展开(咕):

pingora-core/src/modules/http/compression.rs
/// HTTP response compression module
pub struct ResponseCompression(ResponseCompressionCtx);
impl HttpModule for ResponseCompression {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
self.0.request_filter(req);
Ok(())
}
fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
self.0.response_filter(t);
Ok(())
}
}
/// The builder for HTTP response compression module
pub struct ResponseCompressionBuilder {
level: u32,
}
impl ResponseCompressionBuilder {
/// Return a [ModuleBuilder] for [ResponseCompression] with the given compression level
pub fn enable(level: u32) -> ModuleBuilder {
Box::new(ResponseCompressionBuilder { level })
}
}
impl HttpModuleBuilder for ResponseCompressionBuilder {
fn init(&self) -> Module {
Box::new(ResponseCompression(ResponseCompressionCtx::new(
self.level, false,
)))
}
fn order(&self) -> i16 {
// run the response filter later than most others filters
i16::MIN / 2
}
}

嘛,就是这样(x


Previous Post
Learning Pingora 01 - Getting Started
Next Post
Learning Pingora 03 - Upstreams and Peers