Skip to content

谈谈 Iori 的设计思路(一):从 Nico Timeshift 说起

Published: at 23:41

大家好,好久不见,我是某昨。

前两天读了院士关于 Showroom 录制的文章,其中涉及到的一些问题其实借助 iori 的架构设计很容易解决。但由于估计没有第二个人真正读过 iori 的源码,于是正好借这个机会,想来简单讲讲 iori 的架构设计,以及通过一个比较复杂的下载源的例子 iori-niconico,讲讲这类复杂视频源在 iori 中的实现方式。

ToC

开始

从设计之初,iori 就希望成为一个通用但不失定制性的下载库。为了通用,我们实现了最常见的 HLS 和部分 DASH 下载能力。那如何又能够定制呢?这就要讲到 iori 的架构设计了。

iori 将整个下载器分成了四个部分:

  1. StreamingSource:一个下载源。它负责获取有哪些块需要下载,以及每个块具体如何下载。
  2. Cache:负责缓存下载好的块。目前 Iori 实现了内存中和磁盘上的缓存,如果之后有需求也可以实现诸如 S3 等远程缓存。
  3. Merger:在下载完成后/进行时,从缓存中读取下载好的块,并合并成完整的文件。
  4. Downloader:一个抽象的并行下载器,可以从任意 Source 下载块,经由 Cache 缓存,最后通过 Merger 合并,完成下载。

StreamingSource

iori 将每一个下载源抽象成了 StreamingSource,定义如下(为方便展示,简化成了 async trait 的形式):

pub trait StreamingSource {
type Segment: StreamingSegment + Send + 'static;
async fn fetch_info(&self) -> IoriResult<UnboundedReceiver<IoriResult<Vec<Self::Segment>>>>;
async fn fetch_segment<W>(&self, segment: &Self::Segment, writer: &mut W) -> IoriResult<()>
where
W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static;
}

每个 StreamingSource 会和与之匹配的 Segment 对应。对于一个 Source,其需要实现的其实只有两个关键部分:

  1. 获取需要下载的 Segment 流
  2. 下载 Segment

对应了 trait StreamingSource 中的两个 methodfetch_info 会返回一个 UnboundedReceiver,可以理解为一条管道,管道内流淌着 Self::Segment 的数组。每次下载时,iori 会试图从这个 Segment 源中获取需要下载的部分,然后将其喂给 fetch_segment 进行下载。

讲点不一样的:Niconico Timeshift

相信大家对最常见的 HLS 都已经习以为常了,那我们就来讲点不一样的,Niconico 的 Timeshift。

Nico 的下载方式在空空的博客里早有记载。简单来说,我们扮演的是一个不断拉动进度条的用户,在每次拉动进度条之后 nico 会允许我们下载进度条当前进度附近的一些块。通过不断的试探,我们最终将整个视频下载完成。在这个过程中,我们需要通过“拉动进度条”来激活可以下载的 ts 列表,并通过当前下载进度的 time、offset、audience_token 等一系列参数拼凑出最终 ts 块的下载链接。

fetch_info

在有了下载的整体思路之后,我们需要将其与 iori 的架构匹配起来。首先,我们需要有一个 NicoTimeshitSource

pub struct NicoTimeshiftSource {
client: Client,
m3u8_url: String,
sequence: Arc<AtomicU64>,
host: Arc<RwLock<Url>>,
token: Arc<RwLock<String>>,
retry: u32,
}

在下载过程中,audience_token 可能会随时失效,所以我们不能一直复用最开始获得的 token,而是要在实际获取 Segment 内容的时候去做一次替换。所以这里用到了 Arc<RwLock<String>>,通过运行一个 tokio 的后台任务,实时地将最新的 Token 放到这个可以多进程间共享的 RwLock 中。

在准备好了 Source 的结构之后,接下来就是获取 Segment 流了。我们先定义一个 mpsc 通道(虽然这里其实是当 spsc 用了),然后通过 iori::hls 中提供的 load_m3u8 函数获取 m3u8 文件。此外,我们还需要计算出每个 Segment 的时长,供后续获取各个块的名称时使用:

let (sender, receiver) = mpsc::unbounded_channel();
let (playlist_url, playlist) =
load_m3u8(&self.client, Url::parse(&self.m3u8_url)?, self.retry).await?;
let chunk_length = (playlist.segments.iter().map(|s| s.duration).sum::<f32>()
/ playlist.segments.len() as f32) as u64;

然后是通过 #DMC-STREAM-DURATION 获取整个视频的时间:

let playlist_text = self
.client
.get(playlist_url.clone())
.send()
.await?
.text()
.await?;
let regex = Regex::new(r#"#DMC-STREAM-DURATION:(.+)"#).unwrap();
let video_length = regex
.captures(&playlist_text)
.and_then(|cap| cap.get(1))
.and_then(|d| d.as_str().parse().ok())
.ok_or_else(|| anyhow::anyhow!("{playlist_text}"))
.expect("Failed to parse video length");
log::info!("video_length: {video_length}, chunk_length: {chunk_length}");

然后是通过获取前两个块的链接,计算出 offset:

let first_chunk_url = &playlist.segments[0].uri;
let second_chunk_url = &playlist.segments[1].uri;
let offset = NICO_SEGMENT_OFFSET_REGEXP
.captures(if first_chunk_url.starts_with("0.ts") {
second_chunk_url
} else {
first_chunk_url
})
.unwrap()
.get(1)
.unwrap()
.as_str()
.to_string();
log::debug!("offset: {offset}");

最后则是通过这个 playlist 的 segment 数量,推断我们每波需要下载的块数量,接下来就是准备进入正题了:

let limit = playlist.segments.len();
let sequence = self.sequence.clone();
let client = self.client.clone();
let host = self.host.clone();
let token = self.token.clone();
tokio::spawn(async move {

还记得上面拿到的 limit 吗,它记录了我们每轮需要下载多少个块。我们需要通过这个 limitStream Source 进行一定的限流,确保一波获取到的所有块都下载完成之后,再去“拉动进度条”,获取下一波的待下载块。为此,这里的实现方式是定义一个信号量,并将信号量的 Permit 作为 NicoTimeshiftSegment 的成员传递。Segment 下载完成之后会被释放,而对应地,其获取包含的信号量也会被同时回收。

此外,我们在开始处定义一个 time,进入循环:

let permits = Arc::new(Semaphore::new(limit));
let mut time = 0.;
while time < video_length {

当不足 1s 时,跳出循环,表示所有块都已经下载完成。

if video_length - format!("{time}.{offset}").parse::<f32>().unwrap() < 1. {
break;
}

构造并请求 m3u8 的地址。对这个地址的请求就是我们“拉动进度条”的动作。这里需要删掉 url 中原有的 start,并修改为新的 time:

let mut url = playlist_url.clone();
// replace `start` with the current time
let query: Vec<(_, _)> = playlist_url
.query_pairs()
.filter(|(name, _)| name != "start")
.collect();
url.query_pairs_mut()
.clear()
.extend_pairs(query)
.append_pair("start", &format!("{time}"));
log::debug!("ping {url}");
// fetch url(ping), ignore the result
let _ = client.get(url.clone()).send().await;

接下来要取 m3u8 地址中 1/ts 的部分,这是 ts 地址的一部分:

// https://liveedge265.dmc.nico/hlsarchive/ht2_nicolive/nicolive-production-pg41793477411455_4a94f2f2a857a6bf7dca13d2825bf5acef5c8c77fedf0dd83912367632a4c7b1/1/ts/playlist.m3u8?start_time=-575435206444&ht2_nicolive=86127604.knv7k8rg2e_sa5alt_3rt0vxccmbc1b&start=15.114
// Extract the 1/ts part
let regex = Regex::new(r#"(?:http(?:s):\/\/.+\/)(\d\/ts)"#).unwrap();
let ts = regex
.captures(&url.to_string())
.and_then(|cap| cap.get(1))
.map(|r| r.as_str().to_string())
.unwrap();

好,万事就绪,可以开始获取这个 m3u8 地址中的 ts 块了。我们给每个 segment 分配一个 permit,并通过 timeoffset 构造出 filename

// 0-<limit>, <limit> chunks per list
// fetch the next <limit> chunks
let mut segments = Vec::new();
for _ in 0..limit {
let permit = permits.clone().acquire_owned().await.unwrap();
let filename = if time == 0. {
format!("0.ts")
} else {
format!("{time}{offset}.ts")
};
let mut segment_url = url.join(&filename).unwrap();
segment_url.set_query(url.query());
segments.push(NicoTimeshiftSegment {
host: host.clone(),
token: token.clone(),
_permit: permit,
ts: ts.clone(),
file_name: filename,
query: url.query().map(|q| q.to_string()),
sequence: sequence.fetch_add(1, Ordering::Relaxed),
});

最后别忘了把 time 加上 chunk 的时间,并加上提前退出循环的条件:

time += chunk_length as f32;
if video_length - format!("{time}.{offset}").parse::<f32>().unwrap() < 1. {
break;
}
}

好,现在就可以把这些 segment 发给 Downloader 了:

// send segments
if let Err(_) = sender.send(Ok(segments)) {
break;
}

最后是等待这一波下载完成再继续下一次的进度条拖动。这里我们尝试获取 limitpermit,也就是等待所有 Segment 释放:

// wait for all segments to be fetched
let _ = permits.acquire_many(limit as u32).await;
}
});
Ok(receiver)
}

至此,整个 Segment 流的实现就完成了。

NicoTimeshiftSegment

iori 中,我们定义了一个 trait RemoteStreamingSegment 以简化 Segment 的实现。这个 trait 非常简单,只接收一个 url。对于 Nico Timeshift,我们可以在这一层进行实际的 URL 拼接。这样也确保了能够在下载前拿到最新的 token

具体的拼接规则是这样:

impl RemoteStreamingSegment for NicoTimeshiftSegment {
fn url(&self) -> reqwest::Url {
let host = self.host.read().clone();
let token = self.token.read().clone();
let mut url = host
.join(&format!("{}/{}", self.ts, self.file_name))
.unwrap();
url.set_query(self.query.as_deref());
// remove ht2_nicolive first
let query: Vec<(_, _)> = url
.query_pairs()
.filter(|(name, _)| name != "ht2_nicolive")
.map(|r| (r.0.to_string(), r.1.to_string()))
.collect();
// add new ht2_nicolive token then
url.query_pairs_mut()
.clear()
.extend_pairs(query)
.append_pair("ht2_nicolive", token.as_str());
url
}
}

说回 Showroom

由于 Iori 的设计本意就是希望能够兼容更多的网站,因此对于院士博客中提到的 Showroom 的各种问题也是有相应的解决办法的。

首先是 m3u8 的超时机制。获取 m3u8 的行为对应的是 StreamingSource 中的 fetch_info 部分,我们只需要在这里对 ShowroomSource 做一定的特殊超时处理,就可以比较轻松地解决这个问题。

然后是 Segment 分片的跳变。在目前的 Iori 架构中,需要稍微绕一下远路(之后可能可以在 fetch_info 层直接搞定)。比如,我们需要在 ShowroomSegment 中增加一个 fail 字段:

struct ShowroomSegment {
fail: bool,
}

在出现 Segment 跳变时,我们需要将不存在的 Segment 也根据其顺序吐给 mpsc,然后在 fetch_segment 的时候直接失败:

async fn fetch_segment<W>(&self, segment: &Self::Segment, writer: &mut W) -> IoriResult<()>
where
W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static;
{
if segment.fail {
return Err(IoriError::HttpError(reqwest::StatusCode::NOT_FOUND));
}
fetch_segment(self.client.clone(), segment, writer).await?;
}

这样,在 Downloader 确定下载失败后,Merger 就可以获得这个失败信息,并进行一定的处理了。在 iori 中,如果下载过程中出现缺块的情况,ConcatMerger 会根据 fail 的情况自动生成 output.1.tsoutput.2.ts 等一系列文件,也就不会有直接合并导致的 Timestamp 问题了。


Previous Post
Iori Minyami 0.1.0 发布