大家好,好久不见,我是某昨。
前两天读了院士关于 Showroom
录制的文章,其中涉及到的一些问题其实借助 iori
的架构设计很容易解决。但由于估计没有第二个人真正读过 iori
的源码,于是正好借这个机会,想来简单讲讲 iori
的架构设计,以及通过一个比较复杂的下载源的例子 iori-niconico
,讲讲这类复杂视频源在 iori
中的实现方式。
ToC
开始
从设计之初,iori
就希望成为一个通用但不失定制性的下载库。为了通用,我们实现了最常见的 HLS
和部分 DASH
下载能力。那如何又能够定制呢?这就要讲到 iori
的架构设计了。
iori
将整个下载器分成了四个部分:
- StreamingSource:一个下载源。它负责获取有哪些块需要下载,以及每个块具体如何下载。
- Cache:负责缓存下载好的块。目前 Iori 实现了内存中和磁盘上的缓存,如果之后有需求也可以实现诸如 S3 等远程缓存。
- Merger:在下载完成后/进行时,从缓存中读取下载好的块,并合并成完整的文件。
- Downloader:一个抽象的并行下载器,可以从任意 Source 下载块,经由 Cache 缓存,最后通过 Merger 合并,完成下载。
StreamingSource
iori
将每一个下载源抽象成了 StreamingSource
,定义如下(为方便展示,简化成了 async trait
的形式):
pub trait StreamingSource { type Segment: StreamingSegment + Send + 'static;
async fn fetch_info(&self) -> IoriResult<UnboundedReceiver<IoriResult<Vec<Self::Segment>>>>;
async fn fetch_segment<W>(&self, segment: &Self::Segment, writer: &mut W) -> IoriResult<()> where W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static;}
每个 StreamingSource
会和与之匹配的 Segment
对应。对于一个 Source
,其需要实现的其实只有两个关键部分:
- 获取需要下载的 Segment 流
- 下载 Segment
对应了 trait StreamingSource
中的两个 method
。fetch_info
会返回一个 UnboundedReceiver
,可以理解为一条管道,管道内流淌着 Self::Segment
的数组。每次下载时,iori
会试图从这个 Segment
源中获取需要下载的部分,然后将其喂给 fetch_segment
进行下载。
讲点不一样的:Niconico Timeshift
相信大家对最常见的 HLS
都已经习以为常了,那我们就来讲点不一样的,Niconico 的 Timeshift。
Nico 的下载方式在空空的博客里早有记载。简单来说,我们扮演的是一个不断拉动进度条的用户,在每次拉动进度条之后 nico 会允许我们下载进度条当前进度附近的一些块。通过不断的试探,我们最终将整个视频下载完成。在这个过程中,我们需要通过“拉动进度条”来激活可以下载的 ts
列表,并通过当前下载进度的 time、offset、audience_token 等一系列参数拼凑出最终 ts
块的下载链接。
fetch_info
在有了下载的整体思路之后,我们需要将其与 iori
的架构匹配起来。首先,我们需要有一个 NicoTimeshitSource
:
pub struct NicoTimeshiftSource { client: Client,
m3u8_url: String, sequence: Arc<AtomicU64>,
host: Arc<RwLock<Url>>, token: Arc<RwLock<String>>, retry: u32,}
在下载过程中,audience_token
可能会随时失效,所以我们不能一直复用最开始获得的 token
,而是要在实际获取 Segment 内容的时候去做一次替换。所以这里用到了 Arc<RwLock<String>>
,通过运行一个 tokio
的后台任务,实时地将最新的 Token
放到这个可以多进程间共享的 RwLock
中。
在准备好了 Source
的结构之后,接下来就是获取 Segment
流了。我们先定义一个 mpsc
通道(虽然这里其实是当 spsc
用了),然后通过 iori::hls
中提供的 load_m3u8
函数获取 m3u8
文件。此外,我们还需要计算出每个 Segment
的时长,供后续获取各个块的名称时使用:
let (sender, receiver) = mpsc::unbounded_channel();
let (playlist_url, playlist) = load_m3u8(&self.client, Url::parse(&self.m3u8_url)?, self.retry).await?;let chunk_length = (playlist.segments.iter().map(|s| s.duration).sum::<f32>() / playlist.segments.len() as f32) as u64;
然后是通过 #DMC-STREAM-DURATION
获取整个视频的时间:
let playlist_text = self .client .get(playlist_url.clone()) .send() .await? .text() .await?;let regex = Regex::new(r#"#DMC-STREAM-DURATION:(.+)"#).unwrap();let video_length = regex .captures(&playlist_text) .and_then(|cap| cap.get(1)) .and_then(|d| d.as_str().parse().ok()) .ok_or_else(|| anyhow::anyhow!("{playlist_text}")) .expect("Failed to parse video length");
log::info!("video_length: {video_length}, chunk_length: {chunk_length}");
然后是通过获取前两个块的链接,计算出 offset:
let first_chunk_url = &playlist.segments[0].uri; let second_chunk_url = &playlist.segments[1].uri; let offset = NICO_SEGMENT_OFFSET_REGEXP .captures(if first_chunk_url.starts_with("0.ts") { second_chunk_url } else { first_chunk_url }) .unwrap() .get(1) .unwrap() .as_str() .to_string(); log::debug!("offset: {offset}");
最后则是通过这个 playlist 的 segment 数量,推断我们每波需要下载的块数量,接下来就是准备进入正题了:
let limit = playlist.segments.len();
let sequence = self.sequence.clone(); let client = self.client.clone();
let host = self.host.clone(); let token = self.token.clone(); tokio::spawn(async move {
还记得上面拿到的 limit
吗,它记录了我们每轮需要下载多少个块。我们需要通过这个 limit
对 Stream Source
进行一定的限流,确保一波获取到的所有块都下载完成之后,再去“拉动进度条”,获取下一波的待下载块。为此,这里的实现方式是定义一个信号量,并将信号量的 Permit
作为 NicoTimeshiftSegment
的成员传递。Segment
下载完成之后会被释放,而对应地,其获取包含的信号量也会被同时回收。
此外,我们在开始处定义一个 time,进入循环:
let permits = Arc::new(Semaphore::new(limit));
let mut time = 0.;
while time < video_length {
当不足 1s 时,跳出循环,表示所有块都已经下载完成。
if video_length - format!("{time}.{offset}").parse::<f32>().unwrap() < 1. { break; }
构造并请求 m3u8
的地址。对这个地址的请求就是我们“拉动进度条”的动作。这里需要删掉 url 中原有的 start,并修改为新的 time:
let mut url = playlist_url.clone(); // replace `start` with the current time let query: Vec<(_, _)> = playlist_url .query_pairs() .filter(|(name, _)| name != "start") .collect(); url.query_pairs_mut() .clear() .extend_pairs(query) .append_pair("start", &format!("{time}")); log::debug!("ping {url}");
// fetch url(ping), ignore the result let _ = client.get(url.clone()).send().await;
接下来要取 m3u8
地址中 1/ts
的部分,这是 ts
地址的一部分:
// https://liveedge265.dmc.nico/hlsarchive/ht2_nicolive/nicolive-production-pg41793477411455_4a94f2f2a857a6bf7dca13d2825bf5acef5c8c77fedf0dd83912367632a4c7b1/1/ts/playlist.m3u8?start_time=-575435206444&ht2_nicolive=86127604.knv7k8rg2e_sa5alt_3rt0vxccmbc1b&start=15.114 // Extract the 1/ts part let regex = Regex::new(r#"(?:http(?:s):\/\/.+\/)(\d\/ts)"#).unwrap(); let ts = regex .captures(&url.to_string()) .and_then(|cap| cap.get(1)) .map(|r| r.as_str().to_string()) .unwrap();
好,万事就绪,可以开始获取这个 m3u8
地址中的 ts
块了。我们给每个 segment
分配一个 permit
,并通过 time
和 offset
构造出 filename
。
// 0-<limit>, <limit> chunks per list // fetch the next <limit> chunks let mut segments = Vec::new(); for _ in 0..limit { let permit = permits.clone().acquire_owned().await.unwrap(); let filename = if time == 0. { format!("0.ts") } else { format!("{time}{offset}.ts") }; let mut segment_url = url.join(&filename).unwrap(); segment_url.set_query(url.query());
segments.push(NicoTimeshiftSegment { host: host.clone(), token: token.clone(), _permit: permit,
ts: ts.clone(), file_name: filename, query: url.query().map(|q| q.to_string()), sequence: sequence.fetch_add(1, Ordering::Relaxed), });
最后别忘了把 time
加上 chunk
的时间,并加上提前退出循环的条件:
time += chunk_length as f32; if video_length - format!("{time}.{offset}").parse::<f32>().unwrap() < 1. { break; } }
好,现在就可以把这些 segment
发给 Downloader
了:
// send segments if let Err(_) = sender.send(Ok(segments)) { break; }
最后是等待这一波下载完成再继续下一次的进度条拖动。这里我们尝试获取 limit
个 permit
,也就是等待所有 Segment
释放:
// wait for all segments to be fetched let _ = permits.acquire_many(limit as u32).await; } }); Ok(receiver) }
至此,整个 Segment
流的实现就完成了。
NicoTimeshiftSegment
在 iori
中,我们定义了一个 trait RemoteStreamingSegment
以简化 Segment
的实现。这个 trait
非常简单,只接收一个 url
。对于 Nico Timeshift,我们可以在这一层进行实际的 URL
拼接。这样也确保了能够在下载前拿到最新的 token
。
具体的拼接规则是这样:
impl RemoteStreamingSegment for NicoTimeshiftSegment { fn url(&self) -> reqwest::Url { let host = self.host.read().clone(); let token = self.token.read().clone();
let mut url = host .join(&format!("{}/{}", self.ts, self.file_name)) .unwrap(); url.set_query(self.query.as_deref());
// remove ht2_nicolive first let query: Vec<(_, _)> = url .query_pairs() .filter(|(name, _)| name != "ht2_nicolive") .map(|r| (r.0.to_string(), r.1.to_string())) .collect(); // add new ht2_nicolive token then url.query_pairs_mut() .clear() .extend_pairs(query) .append_pair("ht2_nicolive", token.as_str());
url }}
说回 Showroom
由于 Iori 的设计本意就是希望能够兼容更多的网站,因此对于院士博客中提到的 Showroom
的各种问题也是有相应的解决办法的。
首先是 m3u8
的超时机制。获取 m3u8
的行为对应的是 StreamingSource
中的 fetch_info
部分,我们只需要在这里对 ShowroomSource
做一定的特殊超时处理,就可以比较轻松地解决这个问题。
然后是 Segment
分片的跳变。在目前的 Iori
架构中,需要稍微绕一下远路(之后可能可以在 fetch_info
层直接搞定)。比如,我们需要在 ShowroomSegment
中增加一个 fail
字段:
struct ShowroomSegment { fail: bool,}
在出现 Segment
跳变时,我们需要将不存在的 Segment 也根据其顺序吐给 mpsc
,然后在 fetch_segment
的时候直接失败:
async fn fetch_segment<W>(&self, segment: &Self::Segment, writer: &mut W) -> IoriResult<()>where W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static;{ if segment.fail { return Err(IoriError::HttpError(reqwest::StatusCode::NOT_FOUND)); }
fetch_segment(self.client.clone(), segment, writer).await?;}
这样,在 Downloader
确定下载失败后,Merger
就可以获得这个失败信息,并进行一定的处理了。在 iori
中,如果下载过程中出现缺块的情况,ConcatMerger
会根据 fail
的情况自动生成 output.1.ts
、output.2.ts
等一系列文件,也就不会有直接合并导致的 Timestamp
问题了。