最近在 YouTube 上听了很多 Harutya 的作品,然后也看了各种 Talk 的视频。在电脑上看的时候,对感兴趣的视频可以很方便的用 Python 的 youtube-dl
直接下下来,但是在 iPad 或者手机上的时候就不那么方便了。
- 整体思路
- 创建公开的 YouTube Playlist
- Playlist Watcher Config 配置文件
- Rust 代码模块组织
- PlaylistWatcherConfig
- YouTubeError
- Decode Percentage Encoded URL
- Access YouTube API
- Stream
- Video
- Playlist
- Downloader
- Watch Playlist and Download
- YouTube module / crate
- YouTube Playlist Watcher & Downloader
- 后记
项目的源代码同时也在我的 GitHub 上~ #/watchyou
1. 整体思路
那么想了一个 workaround ——
先在 YouTube 上创建一个公开的 Playlist,然后看到有想要下载的视频之后,就把它放进这个 Playlist 里面。假设这个 Playlist 的 URL 是
https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G
接下来,用代码每隔一段时间抓取一次这个 Playlist 的网页,然后 parse 出在这个 Playlist 里的所有 Video 的 ID。
下一步的话,就是用 YouTube 的 API 去获取每个 Video ID 所对应的视频的信息。
最后 parse 出来 API 返回数据里面的每一个 Video 的音频流和视频流的下载地址,并且下载即可~
2. 创建公开的 YouTube Playlist
比如我们这里创建一个名为 save2disk
的公开 Playlist
然后就可以拿到这个 Playlist 对应的 URL 了~
这里的的 URL 是
https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G
3. Playlist Watcher Config 配置文件
既然是用代码每隔一段时间抓取一次这个 Playlist 的网页,那就写得通用一点~比如可以支持多个 Playlist,并且可以设置不同的 Playlist 下载到不同的目录里,以及每个 Playlist 抓取的时间间隔等等~那么相应的配置文件大概就是这样的~
[ { "playlist": "https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "interval": 10, "saveto": "/Users/cocoa/YouTube", "downloader": "default", "post_download": "export videofile=\$videofile; export audiofile=\$audiofile; export videoext=\$vidoext; export audioext=\$audioext; ffmpeg -i \$videofile -i \$audiofile -acodec aac -vcodec copy \${videofile%%\$extension}.mkv", "proxy": "http://127.0.0.1:1087", "allow_fallback": true } ]
因为要支持多个 Playlist,所以 Root Object 是一个数组。数组里的每一个元素则是一个 Playlist Watcher Config 的实例。这里每一个 Key 的含义如下~
playlist
,String
类型。这个就是 Playlist 所对应的 URL 了。然后因为懒得在代码做过多的 handling,所以这里默认 URL 是正确、完整的interval
,u64
类型。这里考虑了一下 YouTube API 也许会限制频率,所以默认的每次检查 Playlist 的时间单位是分钟(对绝大多数情况应该都蛮够用了)。saveto
,String
类型。指定了这个 Playlist 里包含的 Video 会被下载到哪个目录。同样的,这里会默认目录存在,并且有权限读写(当然,实际上要是不能创建文件或者不能写入的话,还是做了 error handling)downloader
,String
类型。其实暂时还没用到,默认的 Downloader 会用我自己一会写的坑_(:3」∠)_。然后在考虑 “外包” 给 Python 的youtube-dl
。post_download
,String
类型。暂时还没有用上,但是还是先写上了~因为 YouTube 上高质量的视频流和音频流是分开的,所以需要在下载完成之后再做一些处理。直接在代码里调用 ffmpeg 什么也行,但是这样分离出来的话自由度会更高一些。代码里设想的是把$videofile
替换为下载好的视频流文件路径(使用"
括起来),把$audiofile
替换为下载好的音频流文件路径,以及音频流和视频流的文件扩展名$audioext
和$videoext
。proxy
,String
类型。访问 YouTube API 的代理。不过这里因为我们会用到的 HTTP 库是reqwest
,所以能支持的代理似乎暂时只有HTTP
的,SOCKS5
类型的还没有支持。当然,如果不使用 Proxy 就可以访问 YouTube API 的话,那就留个空字符串""
即可(●°u°●) 」allow_fallback
,bool
类型。假如reqwest
不能正确处理proxy
的话,是否允许回退到直接访问 YouTube API。如果设置为false
的话,在reqwest
不能正确处理proxy
的时候就会直接panic
4. Rust 代码模块组织
因为这个坑涉及到的模块会蛮多的,因此这里先把代码的模块组织贴上来~
.
├── Cargo.toml
└── src
├── config.json
├── main.rs # main app
└── youtube
├── api.rs # YouTube API Access
├── audio_stream.rs # YouTube Audio Stream Model
├── downloader.rs # YouTube Video Downloader
├── error.rs # YouTubeError
├── mod.rs # To export public mod of youtube
├── playlist.rs # YouTube Playlist Model
├── playlist_watcher.rs # YouTube Playlist Watcher
├── playlist_watcher_config.rs # YouTube Playlist Watcher Config
├── stream.rs # Audio Stream and Video Stream related functions and trait
├── urldecode.rs # Decode percentage encoded URL function
├── video.rs # YouTube Video Model
├── video_consumer.rs # Consume Video Download Request
└── video_stream.rs # YouTube Video Stream Model
5. PlaylistWatcherConfig 类
我们现在需要实现的是 PlaylistWatcherConfig 类,src/youtube/playlist_watcher_config.rs
我们使用 JSON 格式保存配置文件,然后直接用 serde
的 Deserialize
trait,以便可以直接从文件里读出来然后给我们 PlaylistWatcherConfig
的实例。
use serde::Deserialize; #[derive(Deserialize, Debug, Clone)] pub struct PlaylistWatcherConfig { pub playlist: String, pub interval: u64, pub saveto: String, pub downloader: String, pub post_download: String, pub proxy: String, pub allow_fallback: bool, }
这样一来在 src/main.rs
里就可以直接尝试加载配置文件了~
lazy_static! { static ref CONFIG: Vec<PlaylistWatcherConfig> = from_cli_args(); } fn from_cli_args() -> Vec<PlaylistWatcherConfig> { let args: Vec<String> = std::env::args().collect(); if args.len() < 2 { panic!("[ERROR] no config file provided."); } match load_config(&*args[1]) { Ok(value) => value, Err(e) => panic!("[ERROR] Cannot read config file {}", e), } } fn load_config<P: AsRef<Path>>(conf_path: P) -> Result<Vec<PlaylistWatcherConfig>, Box<dyn Error>> { // https://docs.serde.rs/serde_json/fn.from_reader.html#example // Open the file in read-only mode with buffer. let file = File::open(conf_path)?; let reader = BufReader::new(file); // Read the JSON contents of the file as an instance of `Vec<PlaylistWatcherConfig>`. let u = serde_json::from_reader(reader)?; // Return the `Vec<PlaylistWatcherConfig>` Ok(u) }
6. YouTubeError
显然这个坑里各种地方都可能会有问题,于是我们可以写个自己的 YouTubeError
类。这也方便我们之后在写各种函数时直接一层一层的往上抛 YouTubeError
~
当然,这个类可以设计得比较简单,主要就是记录了字符串信息的报错在 reason
里。然后实现了 std::fmt::Display
和 std::error::Error
这两个 traits。还是很简单的,于是注释就不写了╮(╯▽╰)╭
完整的 src/youtube/error.rs
类如下
use std::error::Error; use std::fmt::{self, Display, Formatter}; use std::string::String; #[derive(Debug)] pub struct YouTubeError { pub reason: String, } impl Display for YouTubeError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "{}", self.reason) } } impl Error for YouTubeError { fn description(&self) -> &str { &*self.reason } }
7. Decode Percentage Encoded URL
Percentage Encoded URL 比如 https%3A%2F%2Fgithub.com
,它多用来嵌套在 URL 里进行传值。编码规则也很简单啦,比如 :
的 16 进制为 3A
,那么编码之后就是 %3A
要 decode 的话,那就直接读到 %
之后,把接下来的两个字符 parse 成 1 个 byte,最后再转为 u8
(也就是 Rust 里的 char
啦)
这里偷个懒, urldecode
就直接拿了 Rosetta Code 上的代码( ´▽`)
完整的 src/youtube/urldecode.rs
如下~
/// https://www.rosettacode.org/wiki/URL_decoding#Rust fn append_frag(text: &mut String, frag: &mut String) { if !frag.is_empty() { let encoded = frag.chars() .collect::<Vec<char>>() .chunks(2) .map(|ch| { u8::from_str_radix(&ch.iter().collect::<String>(), 16).unwrap() }).collect::<Vec<u8>>(); text.push_str(&std::str::from_utf8(&encoded).unwrap()); frag.clear(); } } pub fn urldecode(text: &str) -> String { let mut output = String::new(); let mut encoded_ch = String::new(); let mut iter = text.chars(); while let Some(ch) = iter.next() { if ch == '%' { encoded_ch.push_str(&format!("{}{}", iter.next().unwrap(), iter.next().unwrap())); } else { append_frag(&mut output, &mut encoded_ch); output.push(ch); } } append_frag(&mut output, &mut encoded_ch); output }
8. Access YouTube API
那么现在就可以写从 YouTube API 获取各种信息的坑,也就是 src/youtube.api.rs
~
8.1 HTTP Client
总的来说就是根据我们 PlaylistWatcherConfig
里面的 proxy
和 allow_fallback
来生成一个 HTTP Client,同时也要支持设置 HTTP Request Header,最后请求不同的数据都用这个 HTTP Client 去做~
8.2 Get All Video ID in Playlist
抓取 Playlist 里所有的 Video ID 的话,就只能先看看 Playlist 页面的源码了~
在测试之后发现,所有的 Video ID 都在 tr
里的 data-video-id
属性里~
<tr class="pl-video yt-uix-tile " data-video-id="bu_86J8gbmA" data-set-video-id="" data-title="Harutya 春茶 best cover playlist - Harutya 春茶 best songs of all time - Best cover of Harutya 春茶">
8.3 Get Video Info
要获取 Video Info 的话,需要上面的 Video ID,比如说 bu_86J8gbmA
,然后可以通过这个 YouTube API 去请求
https://www.youtube.com/get_video_info?video_id=bu_86J8gbmA
返回回来的其实是一个符合 URL Query 规范的字符串,里面包含了很多参数,其中 playerResponse
是我们比较关心的~我们用 PHP 来验证一下
<?php $aContext = array( 'http' => array( 'proxy' => 'tcp://127.0.0.1:1087', 'request_fulluri' => true, ), ); $cxContext = stream_context_create($aContext); $content = file_get_contents('https://www.youtube.com/get_video_info?video_id=bu_86J8gbmA', false, $cxContext); parse_str($content, $query); printf("query parameters: \n - %s\n\n\n", join("\n - ", array_keys($query))); printf("parsed player_response: \n"); print_r(json_decode($query["player_response"], true)); ?>
输出的结果截取部分如下~
query parameters: - c - player_response - hl - innertube_context_client_version - innertube_api_key - csn - innertube_api_version - host_language - watermark - cver - csi_page_type - vss_host - enablecsi - gapi_hint_params - status - cr - fexp - root_ve_type - fflags parsed player_response: Array ( [playabilityStatus] => Array ( [status] => OK [playableInEmbed] => 1 ) [streamingData] => Array ( [expiresInSeconds] => 21540 [formats] => Array ( ...
是的~在 player_response
里,其实是一个 JSON 字符串,再对其 json_decode
之后,就能看到 streamingData
等内容了~其中的 formats
和 adaptiveFormats
里就包含了各种音频流和视频流~
随便拿一个视频流举例子?~
{ "itag": 398, "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1577022319&ei=Dx__XfuPIc-UkwbT5Ltg&ip=...", "mimeType": "video\/mp4; codecs=\"av01.0.05M.08\"", "bitrate": 990238, "width": 1280, "height": 720, "initRange": { "start": "0", "end": "699" }, "indexRange": { "start": "700", "end": "11123" }, "lastModified": "1575857942294141", "contentLength": "179709401", "quality": "hd720", "fps": 6, "qualityLabel": "720p", "projectionType": "RECTANGULAR", "averageBitrate": 311522, "colorInfo": { "primaries": "COLOR_PRIMARIES_BT709", "transferCharacteristics": "COLOR_TRANSFER_CHARACTERISTICS_BT709", "matrixCoefficients": "COLOR_MATRIX_COEFFICIENTS_BT709" }, "approxDurationMs": "4614999" },
以及来一个音频流~
{ "itag": 249, "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1576848511&ei=H3j8XencDpaukwbd1o3YDg&ip=...", "mimeType": "audio/webm; codecs=\"opus\"", "bitrate": 71995, "initRange": { "start": "0", "end": "265" }, "indexRange": { "start": "266", "end": "8325" }, "lastModified": "1555729332528048", "contentLength": "30486124", "quality": "tiny", "projectionType": "RECTANGULAR", "averageBitrate": 52846, "audioQuality": "AUDIO_QUALITY_LOW", "approxDurationMs": "4615081", "audioSampleRate": "48000", "audioChannels": 2 },
8.4 Code of src/youtube/api.rs
那么完整的 src/youtube/api.rs
如下~不过这里其实就没有做 URL Query Parsing,而是直接去找了 player_response
的位置,然后截取到下一个 &
之前。
extern crate reqwest; use reqwest::{Client, Proxy, header}; use select::document::Document; use select::predicate::Name; use std::error::Error; use std::string::String; use super::urldecode::urldecode; use super::error::YouTubeError; /// get http client with proxy and optinnal header /// /// # Example /// /// ``` /// // With proxy enabled, allow fallback, and custom header /// let mut headers = header::HeaderMap::new(); /// let field = "SOME HEADER"; /// let value = "SOME VALUE"; /// headers.insert(field, header::HeaderValue::from_str(value).unwrap()); /// get_client("http://127.0.0.1:1087", true, Some(header)); /// /// // With proxy enabled and allow fallback /// get_client("http://127.0.0.1:1087", true, None); /// /// // Without proxy /// get_client("", true); /// ``` pub fn get_client(with_proxy: &str, allow_fallback: bool, header: Option<header::HeaderMap>) -> Client { let mut client_builder = Client::builder(); // try to add proxy for http client match Proxy::all(&*with_proxy) { Ok(proxy) => client_builder = client_builder.proxy(proxy), Err(e) => match allow_fallback { // if fallback is prohibited // then panic with error message false => panic!(format!("[ERROR] Proxy `{}` is invalid and fallback is disabled: {}\n", with_proxy, e)), true => (), } }; // also check for headers if let Some(header_map) = header { // if we have some headers client_builder = client_builder.default_headers(header_map); } // build http client match client_builder.build() { Ok(client) => client, Err(e) => panic!(format!("[ERROR] Error occurred while build http client: {}\n", e)), } } /// get sts param with client /// /// # Example /// /// ``` /// let client = get_client("http://127.0.0.1:1087", true, None); /// get_sts(&client); /// ``` /// /// # Note /// this function will be used in `get_video_info(&Client, &str) -> Result<serde_json::Value, YouTubeError>` pub fn get_sts(client: &Client) -> Result<String, Box<dyn Error>> { // get `sts` from https://www.youtube.com/yts/jsbin/player-vflYXLM5n/en_US/base.js // try to grab the playlist webpage let mut resp = client.get("https://www.youtube.com/yts/jsbin/player-vflYXLM5n/en_US/base.js").send()?; match resp.status().is_success() { // if succeeded true => { // try to get the content of response let content = resp.text()?; // then try to extract `,sts={:token},` let sts_param = ",sts:"; if let Some(mut sts_param_index) = content.find(sts_param) { // `{:token}` starts after the `,sts=` sts_param_index += sts_param.len(); // get everything since `{:token}` let partial_sts_string = &content[sts_param_index..]; // expecting another comma behined `{:token}` if let Some(index) = partial_sts_string.find(",") { // this suggest that we do have a `,` followed by `{:token}` return Ok(String::from(&partial_sts_string[..index])) } } println!("[WARNING] Cannot fetch `sts` at the moment"); Ok(String::new()) }, // otherwise false => { // don't panic, just print warning println!("[WARNING] Cannot fetch `sts` at the moment"); Ok(String::new()) } } } /// get all video ids from playlist with client and url /// /// # Example /// /// ``` /// let client = get_client("http://127.0.0.1:1087", true, None); /// get_playlist_from_url(&client, "https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G"); /// ``` pub fn get_playlist_from_url(client: &Client, playlist_url: &str) -> Result<Vec<String>, YouTubeError> { // try to grab the playlist webpage let resp = match client.get(&*playlist_url).send() { Ok(resp) => resp, Err(e) => return Err(YouTubeError { reason: format!("Error occurred while sending GET request to {}: {}", playlist_url, e) }), }; match resp.status().is_success() { // if succeeded true => { // then parse the response HTML match Document::from_read(resp) { // if successfully parsed Ok(parsed) => { // given that the videos are listed as // ``` // <tr class="pl-video yt-uix-tile " // data-title="(女性声音)LEMON(柠檬)/米津玄師(完整版/由KOBASOLO和春茶翻唱)" // data-video-id="clU8c2fpk2s" // data-set-video-id=""> // ``` // so we need to find all `<tr></tr>` with attribute `data-video-id="{:video_id}"` let videos: Vec<String> = parsed.find(Name("tr")) .filter_map(|n| n.attr("data-video-id")) .map(|video_id| String::from(video_id)) .collect(); Ok(videos) }, // if failed in parsing Err(e) => Err(YouTubeError { reason: format!("Error occurred while parsing HTML dom: {}", e) }), } }, // if cannot grab the webpage of given playlist false => Err(YouTubeError { reason: format!("Cannot fetch playlist `{}` at the moment", playlist_url) }) } } /// get video info from YouTube API with client and id /// /// # Example /// /// ``` /// let client = get_client("http://127.0.0.1:1087", true, None); /// get_playlist_from_url(&client, "clU8c2fpk2s"); /// ``` pub fn get_video_info(client: &Client, video_id: &str) -> Result<serde_json::Value, YouTubeError> { // get `sts` param let sts = match get_sts(&client) { Ok(sts) => sts, Err(e) => { println!("[WARNING] Cannot get `sts` param at the moment: {}", e); String::from("") }, }; // build video info url let video_info_url = format!("https://www.youtube.com/get_video_info?video_id={}&sts={}", video_id, sts); // send request let mut resp = match client.get(&*video_info_url).send() { Ok(resp) => resp, Err(e) => return Err(YouTubeError { reason: format!("Error occurred while sending GET request to {}: {}", video_info_url, e) }), }; match resp.status().is_success() { // if the server did answer to the request true => { // try to get the content of response let content = match resp.text() { Ok(content) => content, Err(e) => return Err(YouTubeError { reason: format!("Error occurred while reading response body of {}: {}", video_info_url, e)}), }; // then try to extract `player_response={:json_string}` let player_response_param = "player_response="; if let Some(mut player_response_index) = content.find(player_response_param) { // if we did find `player_response={:json_string}` in response // that suggests YouTube did answer the request without any issue // the `{:json_string}` should start at `player_response_param + player_response_param.len()` player_response_index += player_response_param.len(); let partial_json_string = &content[player_response_index..]; // however, the whole repsonse body is a URI query // ```status=ok&c=WEB&player_response=......&...``` // so we need to where `{:json_string}` stops // which means to find out the next `&` let delimeter_index; if let Some(index) = partial_json_string.find("&") { // this suggest that we do have another `&` followed by `{:json_string}` delimeter_index = index; } else { // but chances are that `player_response` is the last parameter of the query string delimeter_index = partial_json_string.len(); } // then we can get the whole `{:json_string}` without other uri query string let json_string = &partial_json_string[0..delimeter_index]; // but we need to do a urldecode on `{:json_string}` part // to convert all percentage encoded char to its original form let mut json_string = urldecode(&json_string); // and also replace all `\u0026` to `&` json_string = json_string.replace("\\u0026", "&"); // and replace all `+` to ` ` json_string = json_string.replace("+", " "); // try to parse `json_string` into object match serde_json::from_str(&json_string) { // if `json_string` can be parsed into json object // then return it Ok(json) => Ok(json), // otherwise throw error message Err(e) => Err(YouTubeError { reason: format!("Cannot parsing json for video `{}`: {}", video_info_url, e) }) } } else { // if no anchor string found in response // then throw error message Err(YouTubeError { reason: format!("No anchor string `{}` found in response", player_response_param) }) } }, // if the server did NOT answer to the request // don't panic, just print warning false => Err(YouTubeError { reason: format!("Error occured while fetching video info: {}", video_info_url) }), } }
9. Stream
9.1 Stream utility functions
因为在 YouTube API 里面有时数字直接作为 Value 出现的,比如 23333
,有时又是在字符串里面,比如 "233333"
。因此就需要一个 utility function 来帮我们 check, cast 或者 parse.
9.2 DownloadableStream Trait
在上面 YouTube API 的返回中,既有 Audio Stream 也有 Video Stream,为了之后写起来方便,于是就写了个 DownloadableStream
的 trait。主要的函数是返回下载的 URL,文件大小以及根据 MIME Type 猜测的文件扩展名~
9.3 Code of src/youtube/stream.rs
这里似乎还是挺直观的,就是声明了一个 trait(说实话真的很像 Objective-C 里的 protocol)。
剩下的就是一些辅助函数了,里面几乎是各种 check,而且也没办法写成模版函数之类的_(:3」∠)_
use serde_json; use super::error::YouTubeError; pub trait DownloadableStream { fn url(&self) -> &str; fn content_length(&self) -> usize; fn extension(&self) -> &str; } pub fn check_null_or_i64(info: &serde_json::Value, property_name: &str) -> Result<i64, YouTubeError> { match info[property_name].is_null() { true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }), false => { match info[property_name].is_i64() { true => Ok(info[property_name].as_i64().unwrap()), false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to i64", property_name) }), } } } } pub fn check_null_or_str(info: &serde_json::Value, property_name: &str) -> Result<String, YouTubeError> { match info[property_name].is_null() { true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }), false => { match info[property_name].is_string() { true => Ok(String::from(info[property_name].as_str().unwrap())), false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to String", property_name) }), } } } } pub fn check_null_or_u64(info: &serde_json::Value, property_name: &str) -> Result<u64, YouTubeError> { match info[property_name].is_null() { true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }), false => { match info[property_name].is_u64() { true => Ok(info[property_name].as_u64().unwrap()), false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to u64", property_name) }), } } } } pub fn check_null_or_parse_u64(info: &serde_json::Value, property_name: &str) -> Result<u64, YouTubeError> { match info[property_name].is_null() { true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }), false => { match info[property_name].is_u64() { true => Ok(info[property_name].as_u64().unwrap()), false => { let string = check_null_or_str(info, property_name)?; match string.parse::<u64>() { Ok(value) => Ok(value), Err(e) => Err(YouTubeError{ reason: format!("`{}` cannot be converted to u64: {}", property_name, e) }), } }, } } } }
10. Video
正如之前看到的那样,一个 YouTube Video 可以有多个不同清晰度和质量的 Video Stream 和 Audio Stream 提供
那么我们来先看看 Video Stream~
10.1 VideoStream
怎么说呢,基本上就是把 JSON 翻译成对应的 Rust struct 吧,这里省略了某些 key(因为不是每个视频流都有,而且有一些 Video Stream 是“加密”的,等到下一次来做加密的 Video Stream 的处理吧)
然后对于每一个 key 都需要 check 它的类型,这里当然就用到前面 Stream 里面的 utility functions 啦~
为了方便对比,把之前的 JSON 也复制一个过来/
{ "itag": 398, "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1577022319&ei=Dx__XfuPIc-UkwbT5Ltg&ip=...", "mimeType": "video\/mp4; codecs=\"av01.0.05M.08\"", "bitrate": 990238, "width": 1280, "height": 720, "initRange": { "start": "0", "end": "699" }, "indexRange": { "start": "700", "end": "11123" }, "lastModified": "1575857942294141", "contentLength": "179709401", "quality": "hd720", "fps": 6, "qualityLabel": "720p", "projectionType": "RECTANGULAR", "averageBitrate": 311522, "colorInfo": { "primaries": "COLOR_PRIMARIES_BT709", "transferCharacteristics": "COLOR_TRANSFER_CHARACTERISTICS_BT709", "matrixCoefficients": "COLOR_MATRIX_COEFFICIENTS_BT709" }, "approxDurationMs": "4614999" },
完整的 src/youtube/video_stream.rs
如下~
use super::error::YouTubeError; use super::stream::{self, DownloadableStream}; #[derive(Debug, Clone)] pub struct VideoStream { pub video_id: String, pub video_title: String, pub itag: i64, pub url: String, pub mime_type: String, pub bitrate: i64, pub last_modified: u64, pub content_length: u64, pub width: u64, pub height: u64, pub quality: String, } impl VideoStream { pub fn from_json(video_info: &serde_json::Value, video_id: &String, video_title: &String) -> Result<VideoStream, YouTubeError> { Ok(VideoStream { video_id: video_id.clone(), video_title: video_title.clone(), itag: stream::check_null_or_i64(&video_info, "itag")?, url: stream::check_null_or_str(&video_info, "url")?, mime_type: stream::check_null_or_str(&video_info, "mimeType")?, bitrate: stream::check_null_or_i64(&video_info, "bitrate")?, last_modified: stream::check_null_or_parse_u64(&video_info, "lastModified")?, content_length: stream::check_null_or_parse_u64(&video_info, "contentLength")?, width: stream::check_null_or_u64(&video_info, "width")?, height: stream::check_null_or_u64(&video_info, "height")?, quality: stream::check_null_or_str(&video_info, "quality")?, }) } } impl DownloadableStream for VideoStream { fn url(&self) -> &str { &*self.url } fn content_length(&self) -> usize { self.content_length as usize } fn extension(&self) -> &str { match self.mime_type.trim().split(';').nth(0) { Some(extension) => match extension { "video/mp4" => "mp4", "video/webm" => "webm", _ => match extension.split('/').nth(1) { Some(hint) => hint, _ => "webm", }, }, _ => "webm", } } }
10.2 AudioStream
那么下一个不就是 Audio Stream 了嘛,几乎和 VideoStream 一模一样的处理方式~
use super::error::YouTubeError; use super::stream::{self, DownloadableStream}; #[derive(Debug, Clone)] pub struct AudioStream { pub video_id: String, pub video_title: String, pub itag: i64, pub url: String, pub mime_type: String, pub bitrate: i64, pub last_modified: u64, pub content_length: u64, pub quality: String, pub projection_type: String, pub average_bitrate: u64, pub audio_quality: String, pub approx_duration_ms: u64, pub audio_sample_rate: u64, pub audio_channels: u64, } impl AudioStream { pub fn from_json(video_info: &serde_json::Value, video_id: &String, video_title: &String) -> Result<AudioStream, YouTubeError> { Ok(AudioStream { video_id: video_id.clone(), video_title: video_title.clone(), itag: stream::check_null_or_i64(&video_info, "itag")?, url: stream::check_null_or_str(&video_info, "url")?, mime_type: stream::check_null_or_str(&video_info, "mimeType")?, bitrate: stream::check_null_or_i64(&video_info, "bitrate")?, last_modified: stream::check_null_or_parse_u64(&video_info, "lastModified")?, content_length: stream::check_null_or_parse_u64(&video_info, "contentLength")?, quality: stream::check_null_or_str(&video_info, "quality")?, projection_type: stream::check_null_or_str(&video_info, "projectionType")?, average_bitrate: stream::check_null_or_u64(&video_info, "averageBitrate")?, audio_quality: stream::check_null_or_str(&video_info, "audioQuality")?, approx_duration_ms: stream::check_null_or_parse_u64(&video_info, "approxDurationMs")?, audio_sample_rate: stream::check_null_or_parse_u64(&video_info, "audioSampleRate")?, audio_channels: stream::check_null_or_parse_u64(&video_info, "audioChannels")?, }) } } impl DownloadableStream for AudioStream { fn url(&self) -> &str { &*self.url } fn content_length(&self) -> usize { self.content_length as usize } fn extension(&self) -> &str { match self.mime_type.trim().split(';').nth(0) { Some(extension) => match extension { "audio/mp4" => "m4a", "audio/webm" => "opus", _ => match extension.split('/').nth(1) { Some(hint) => hint, _ => "opus", }, }, _ => "m4a", } } }
10.3 Video 类
在有了 VideoStream 和 AudioStream 之后,就可以写 Video 类了~
1) 首先 Video 肯定有 id
和 title
,然后就是一组 VideoStream
和 一组 AudioStream
2) 然后构造 Video 的实例的话,则考虑
2.a) 在已经有 YouTube API 获取到的 Video Info 结果的 JSON 之后,parse 成相应的一组 VideoStream
和 一组 AudioStream
,然后创建 Video
实例
2.b) 直接传 Video ID,然后通过 YouTube API 去获取到 Video Info,接下来传入 2.a) 处理~
3) 最后的话,再多加一个选择最好质量的 VideoStream
和 AudioStream
的函数~
那么完整的 src/youtube/video.rs
如下~
use serde_json; use std::vec::Vec; use super::api::{get_client, get_video_info}; use super::error::YouTubeError; use super::audio_stream::AudioStream; use super::video_stream::VideoStream; #[derive(Debug)] pub struct Video { pub id: String, pub title: String, pub audio_stream: Vec<AudioStream>, pub video_stream: Vec<VideoStream>, } impl Video { /// get video from its id /// /// # Example /// /// ``` /// // With proxy enabled /// Video::from_video_id("https://www.youtube.com/watch?v=Z8oxYSEUnuU", "http://127.0.0.1:1087", true); /// /// // Withput proxy /// Video::from_video_id("https://www.youtube.com/watch?v=Z8oxYSEUnuU", "", true); /// ``` pub fn from_video_id(id: &str, proxy: &str, allow_fallback: bool) -> Result<Video, YouTubeError> { // get http client let client = get_client(proxy, allow_fallback, None); // try to get its info let video_info_json = get_video_info(&client, id)?; // try to instantiate Video from the video info json Video::from_video_info_json(&video_info_json) } pub fn from_video_info_json(video_info: &serde_json::Value) -> Result<Video, YouTubeError> { // the json object should contains "streamingData" // // ``` // "videoDetails": { // "videoId": "bu_86J8gbmA", // "title": "Harutya \u6625\u8336 best cover playlist - Harutya \u6625\u8336 best songs of all time - Best cover of Harutya \u6625\u8336", // ... // }, // "streamingData": { // "expiresInSeconds": "21540", // "adaptiveFormats": [ // { // ... // }, // ], // "formats": [ // { // ... // }, // ], // } // ``` // get video details let video_details = &video_info["videoDetails"]; // get video id and title let (video_id, video_title) = match video_details.is_null() { false => { let video_id = &video_details["videoId"]; let video_title = &video_details["title"]; match (video_id.is_string(), video_title.is_string()) { (true, true) => (video_id.as_str().unwrap().to_string(), video_title.as_str().unwrap().to_string()), (true, false) => (video_id.as_str().unwrap().to_string(), String::from("unknown-video-title")), (false, true) => (String::from("unknown-video-id"), video_title.as_str().unwrap().to_string()), (false, false) => (String::from("unknown-video-id"), String::from("unknown-video-title")), } }, true => (String::from("unknown-video-id"), String::from("unknown-video-title")), }; // we will also try to get `streamingData` let streaming_data = &video_info["streamingData"]; match streaming_data.is_null() { // if there is `streamingData` false => { // then try to extract `adaptiveFormats` and `formats` let adaptive_formats = &streaming_data["adaptiveFormats"]; let formats = &streaming_data["formats"]; // merge all formats into a single array let all_formats = match (adaptive_formats.is_array(), formats.is_array()) { (true, true) => { let mut all_formats_array = adaptive_formats.as_array().unwrap().clone(); all_formats_array.extend(formats.as_array().unwrap().iter().cloned()); Ok(all_formats_array) }, (true, false) => Ok(formats.as_array().unwrap().clone()), (false, true) => Ok(adaptive_formats.as_array().unwrap().clone()), (false, false) => Err(YouTubeError{ reason: "No downloadable formats available".to_string() }), }?; let mut audio_streams: Vec<AudioStream> = Vec::new(); let mut video_streams: Vec<VideoStream> = Vec::new(); all_formats.iter().for_each(|format| { let mime_type = &format["mimeType"]; if mime_type.is_string() { match &mime_type.as_str().unwrap()[..5] { "audio" => { match AudioStream::from_json(format, &video_id, &video_title) { Ok(audio_stream) => { audio_streams.push(audio_stream); }, Err(_) => (), } }, "video" => { match VideoStream::from_json(format, &video_id, &video_title) { Ok(video_stream) => { video_streams.push(video_stream); }, Err(_) => (), } }, _ => println!("[WARNING] Unknown mime type: {}", mime_type), } } }); match (audio_streams.len(), video_streams.len()) { (0, 0) => Err(YouTubeError{ reason: "Neither available audio stream nor video stream available".to_string() }), (_, _) => Ok(Video { id: video_id, title: video_title, audio_stream: audio_streams, video_stream: video_streams, }) } }, // otherwise true => Err(YouTubeError{ reason: "No `streamingData` found".to_string() }), } } pub fn pick_best_stream(&self) -> (Option<AudioStream>, Option<VideoStream>) { let mut best_audio: Option<AudioStream> = None; let mut best_video: Option<VideoStream> = None; let mut current_max_size = 0; let mut current_max_audio_bitrate = 0; let mut current_max_video_bitrate = 0; for audio_stream in &self.audio_stream { if audio_stream.bitrate > current_max_audio_bitrate { current_max_audio_bitrate = audio_stream.bitrate; best_audio = Some(audio_stream.clone()); } } for video_stream in &self.video_stream { if video_stream.mime_type.starts_with("video/webm") { let size = video_stream.width * video_stream.height; if size >= current_max_size { current_max_size = size; if video_stream.bitrate > current_max_video_bitrate { current_max_video_bitrate = video_stream.bitrate; best_video = Some(video_stream.clone()); } } } } (best_audio, best_video) } }
11. Playlist
因为一个播放列表里可以有多个 Video
嘛,所以在写好 Video
类之后,我们就可以来写 Playlist
类啦~
这里的话,一个是用于获取某个播放列表所有的 Video ID 的 associated function~另一个则是给出 Playlist 的 URL,然后返回一个 Playlist
的实例~
完整的 src/youtube/playlist.rs
如下~
use super::api::{get_client, get_playlist_from_url}; use super::error::YouTubeError; use super::video::Video; #[derive(Debug)] pub struct Playlist { pub video: Vec<Video>, } impl Playlist { /// get all video ids in the playlist with proxy /// /// # Example /// /// ``` /// // With proxy enabled /// Playlist::get_all_video_ids("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "http://127.0.0.1:1087", true); /// /// // Withput proxy /// Playlist::get_all_video_ids("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "", true); /// ``` pub fn get_all_video_ids(url: &str, proxy: &str, allow_fallback: bool) -> Result<Vec<String>, YouTubeError> { // get http client let client = get_client(proxy, allow_fallback, None); get_playlist_from_url(&client, url) } /// get playlist from url with proxy /// /// # Example /// /// ``` /// // With proxy enabled /// Playlist::from_url("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "http://127.0.0.1:1087", true); /// /// // Withput proxy /// Playlist::from_url("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "", true); /// ``` pub fn from_url(url: &str, proxy: &str, allow_fallback: bool) -> Result<Playlist, YouTubeError> { let video: Vec<Video> = Playlist::get_all_video_ids(url, proxy, allow_fallback)?.iter().map(|video_id| { // try to get Video from its video id Video::from_video_id(video_id, proxy, allow_fallback) }).filter_map(|maybe_video| { // filter all unsuccessful request match maybe_video { Ok(video) => Some(video), Err(e) => { println!("[ERROR] Error occurred while instantiating `Video`: {}", e); None }, } }).collect(); Ok(Playlist { video: video }) } }
12. Downloader
终于到了重点之一了(*^3^)
其实在有了前面的东西之后,我们完全可以实现一个 Rust 版的 youtube-dl
了~当然在我们写完这个 Downloader 类之后,以及部分“加密”的视频流需要额外处理一下╮( ̄▽ ̄"")╭
虽然暂时没做“加密”视频的处理,但是 Downloader 类还是可以先写起来的~因为“加密”视频要下载的话,只是在 url
里增加了一个解密之后的 signature
参数
这里 Downloader
理论上应该接收的是 DownloadableStream
和 PlaylistWatcherConfig
,但是现在考虑到想做 AudioStream
和 VideoStream
的合并,就同时接收了 AudioStream
和 VideoStream
。不过写起来的时候确实感觉到不太科学,因此这里下一版应该会再重构一下。
在开始下载的时候,在同目录下创建一个空白文件用于标示正在下载~比如 clU8c2fpk2s.downloading
然后 YouTube 上无论是下载 AudioStream
还是 VideoStream
,似乎都不能一次性请求完,于是 buffer 就设置成 10Mbytes,然后通过 HTTP 的 Range
header 来指定范围。
最后就是把每次下载回来的数据写入文件,当写入成功之后,判断 HTTP 请求的 Range 的 end 是否等于 AudioStream
或 VideoStream
的 content_length
相等就说明下载完成~并且在同目录下创建一个相应的隐藏的空白文件,比如 .clU8c2fpk2s.audio-session
或者 .clU8c2fpk2s.video-session
用于判断是否下载完成~
不相等的话,就继续构造下一个请求~
use reqwest::{Client, header}; use std::error::Error; use std::fs::File; use std::io::prelude::*; use std::sync::Arc; use std::thread; use super::api::get_client; use super::audio_stream::AudioStream; use super::error::YouTubeError; use super::playlist_watcher_config::PlaylistWatcherConfig; use super::stream::DownloadableStream; use super::video_stream::VideoStream; #[derive(Debug)] pub struct Downloader { pub audio: AudioStream, pub video: VideoStream, pub config: PlaylistWatcherConfig, } impl Downloader { pub fn new(audio: AudioStream, video: VideoStream, config: &PlaylistWatcherConfig) -> Self { Downloader { audio: audio, video: video, config: config.clone(), } } pub fn start(mut self) { thread::spawn(move || { println!("[INFO] start downloading `[{}] {}`", self.video.video_id, self.video.video_title); self.download() }); } fn download(&mut self) { let _ = File::create(format!("{}/{}.downloading", self.config.saveto, self.video.video_id)); match self.download_impl(true) { Ok(()) => match self.download_impl(false) { Ok(()) => (), Err(e) => println!("[ERROR] Cannot download audio stream: {}", e), }, Err(e) => println!("[ERROR] Cannot download video stream: {}", e), } } fn download_partial(client: &Client, url: &str) -> Result<Arc<Vec<u8>>, Box<dyn Error>> { let mut res = client.get(url).send()?; let mut buf: Vec<u8> = vec![]; res.copy_to(&mut buf)?; Ok(Arc::new(buf)) } fn download_impl(&mut self, is_video: bool) -> Result<(), YouTubeError> { // generate file path let extension = match is_video { true => self.video.extension(), false => self.audio.extension(), }; let filename = format!("{}/{}-{}.{}", self.config.saveto, self.video.video_title, self.video.video_id, extension); let mut file = match File::create(&*filename) { Ok(fc) => fc, Err(e) => return Err(YouTubeError { reason: format!("Cannot create file `{}`: {}", &*filename, e)} ), }; // buffer 10 MB let buffer_size: usize = 1024 * 1024 * 10; let mut start: usize = 0; let mut end: usize = start + buffer_size; let mut retry = 0; let max_retry = 3; let content_length = match is_video { true => self.video.content_length(), false => self.audio.content_length(), }; while retry < max_retry { if end > content_length { end = content_length; } println!("[INFO] {}: [{}, {}/{}]", self.video.video_id, start, end, content_length); let mut headers = header::HeaderMap::new(); let range = format!("bytes={}-{}", start, end); headers.insert("Range", header::HeaderValue::from_str(&*range).unwrap()); let client = get_client(&self.config.proxy, self.config.allow_fallback, Some(headers)); match Downloader::download_partial(&client, &self.video.url) { Ok(buf) => { // write to file match file.write_all(&buf) { Ok(()) => { if end == content_length { let filepath = match is_video { true => { println!("[INFO] Downloaded video stream of `[{}] {}`", self.video.video_id, self.video.video_title); format!("{}/.{}.video-session", self.config.saveto, self.video.video_id) }, false => { println!("[INFO] Downloaded audio stream of `[{}] {}`", self.video.video_id, self.video.video_title); format!("{}/.{}.audio-session", self.config.saveto, self.video.video_id) } }; let _ = File::create(&*filepath); return Ok(()) } }, Err(e) => return Err(YouTubeError {reason: format!("Cannot write to file `{}`: {}", &*filename, e)} ) }; start += buf.len(); end += buf.len(); }, Err(e) => { retry += 1; println!("[ERROR] Error occurred while downloading, will retry: {}", e); }, }; } Err(YouTubeError {reason: "Download failed, exceeded max retries".to_string() } ) } }
13. Watch Playlist and Download
这个就是典型的 Producer 和 Consumer 啦~
这里的 Producer 就是 PlaylistWatcher
,Consumer 则是 VideoConsumer
。
同时,我们设计允许同时 watch 多个不同的 playlist,因此是 Multiple Producer and Single Consumer,而这种模式在 Rust 里通信的话,正好就可以用到 std::sync::mpsc
。另外,mpsc
其实就是 Multiple Producer and Single Consumer 的缩写~
13.1 VideoConsumerMessage
我们先暂不考虑 Producer 怎么写,先来看看 Consumer 这边。
首先就是决定 Consumer 这边每次收到的消息 VideoConsumerMessage
,都应该包含什么。既然是要下载的话,那么 Video ID 少不了,其次下载到什么地方是需要的,同时 proxy 的设置也需要。也就是 String
类型的 video_id
和 PlaylistWatcherConfig
类型的 config
。
13.2 VideoConsumer
在收到 VideoConsumerMessage
之后,也就拿到了 video_id
,那么利用 Video::from_video_id
就可以获取到 Video
的实例,接下来再用 Video.pick_best_stream
方法就能拿到这个实例对应的最好质量的 VideoStream
和 AudioStream
。
于是就可以丢进前面的 Downloader
里下载啦~
13.3 PlaylistWatcher
那么对于 PlaylistWatcher
来说的话,它需要的是什么呢~
首先肯定要有对应的 PlaylistWatcherConfig
;接下来就是那个 Single Consumer 对应的 sender,有了 sender 才能给那边发消息~
在 start watching 之后,就可以利用 Playlist::get_all_video_ids
来获取那个 playlist 当前有哪些 Video ID;
接下来判断有哪些 Video ID 是还没有开始下载到指定目录的~过滤之后的 Video ID 就可以连带着 PlaylistWatcherConfig
组成一个 VideoConsumerMessage
,然后用 sender 发过去了~
13.4 Code of src/youtube/video_consumer.rs
use std::option::Option; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{self, Sender, Receiver}; use std::time::Duration; use std::thread::{self, JoinHandle}; use super::downloader::Downloader; use super::playlist_watcher_config::PlaylistWatcherConfig; use super::video::Video; #[derive(Debug, Clone)] pub struct VideoConsumerMessage { pub video_id: String, pub config: PlaylistWatcherConfig, } #[derive(Debug)] pub struct VideoConsumer { sender: Arc<Mutex<Sender<VideoConsumerMessage>>>, receiver: Arc<Mutex<Receiver<VideoConsumerMessage>>>, should_stop: Arc<AtomicBool>, thread: Option<JoinHandle<()>>, } impl Drop for VideoConsumer { fn drop(&mut self) { self.should_stop.store(true, Ordering::Relaxed); if let Some(thread) = self.thread.take() { thread.join().unwrap(); } } } impl VideoConsumer { pub fn new() -> Self { let (sender, receiver) = mpsc::channel(); VideoConsumer { should_stop: Arc::new(AtomicBool::new(false)), thread: None, sender: Arc::new(Mutex::new(sender)), receiver: Arc::new(Mutex::new(receiver)), } } pub fn consume(&mut self) { // consume in new thread // listen for stop let receiver = Arc::clone(&self.receiver); let should_stop = Arc::clone(&self.should_stop); self.thread = Some(thread::spawn(move || { let timeout = Duration::from_secs(1); loop { // break if should stop if should_stop.load(Ordering::Relaxed) { break; } // this wait messages from its sender match receiver.lock().unwrap().recv_timeout(timeout) { // this will be an instance of YouTubeConsumerMessage Ok(message) => { match Video::from_video_id(&message.video_id, &message.config.proxy, message.config.allow_fallback) { Ok(video) => { let (best_audio, best_video) = video.pick_best_stream(); if let Some(video_info) = best_video { if let Some(audio_info) = best_audio { // todo: find downloader let downloader = Downloader::new(audio_info, video_info, &message.config); downloader.start(); } else { println!("[ERROR] Cannot find any audio track for: {}", message.video_id); } } else { println!("[ERROR] Cannot find any video track for: {}", message.video_id); } }, Err(e) => println!("[ERROR] Cannot get video from video id while preparing download: {}", e), } }, Err(_) => (), } } })); } pub fn get_sender(&self) -> Arc<Mutex<Sender<VideoConsumerMessage>>> { self.sender.clone() } }
13.5 Code of src/youtube/playlist_watcher.rs
extern crate reqwest; extern crate select; use crossbeam_channel::tick; use std::option::Option; use std::path::Path; use std::string::String; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Sender; use std::time::Duration; use std::thread; use super::playlist::Playlist; use super::playlist_watcher_config::PlaylistWatcherConfig; use super::video_consumer::VideoConsumerMessage; pub struct PlaylistWatcher { config: PlaylistWatcherConfig, sender: Arc<Mutex<Sender<VideoConsumerMessage>>>, should_stop: Arc<AtomicBool>, thread: Option<thread::JoinHandle<()>>, } impl Drop for PlaylistWatcher { fn drop(&mut self) { self.should_stop.store(true, Ordering::Relaxed); if let Some(thread) = self.thread.take() { thread.join().unwrap(); } } } impl PlaylistWatcher { pub fn new(playlist_watcher_config: &PlaylistWatcherConfig, sender: Arc<Mutex<Sender<VideoConsumerMessage>>>) -> Self { PlaylistWatcher { config: playlist_watcher_config.clone(), sender: sender, should_stop: Arc::new(AtomicBool::new(false)), thread: None, } } pub fn start(mut self) -> Self { let config = self.config.clone(); let should_stop = Arc::clone(&self.should_stop); let sender = Arc::clone(&self.sender); self.thread = Some(thread::spawn(move || { let one_second = tick(Duration::from_secs(1)); // convert minute to second let interval = config.interval.clone() * 60; loop { // wait for user defined intervals for _ in 0..interval { // tick 1 sec at a time and check for exit one_second.recv().unwrap(); if should_stop.load(Ordering::Relaxed) { return; } } match Playlist::get_all_video_ids(&config.playlist, &config.proxy, config.allow_fallback) { // if successfully get ids of corresponding videos Ok(video_ids) => { // then for each of them video_ids.iter().filter(|video_id| { // build session file path for the video let video_download_session = format!("{}/{}.downloading", config.saveto, video_id); // check whether we already got the session file for the video or not !Path::new(&*video_download_session).exists() }) .map(|video_id| { // for all videos that has no session file/ // generate a VideoConsumerMessage VideoConsumerMessage { video_id: String::from(video_id), config: config.clone(), } }).for_each(|video_consumer_message| { // send each VideoConsumerMessage via the sender sender.lock().unwrap().send(video_consumer_message).unwrap(); }); }, // if failed in fetching the webpage of that playlist // don't panic, just print error message Err(e) => println!("[ERROR] Error occurred while fetching playlist: {}", e), } } })); self } pub fn stop(&mut self) { self.should_stop.store(true, Ordering::Relaxed); } }
14. YouTube module / crate
把上面的各种类都实现好之后,就可以组合成一个 module 或者 crate 了!
完整的 src/youtube/mod.rs
如下~
mod urldecode; mod stream; pub mod api; pub mod audio_stream; pub mod downloader; pub mod error; pub mod playlist; pub mod playlist_watcher; pub mod playlist_watcher_config; pub mod video; pub mod video_stream; pub mod video_consumer; pub use downloader::Downloader; pub use error::YouTubeError; pub use playlist::Playlist; pub use playlist_watcher::PlaylistWatcher; pub use playlist_watcher_config::PlaylistWatcherConfig; pub use video::Video; pub use video_stream::VideoStream; pub use video_consumer::{VideoConsumer, VideoConsumerMessage};
15. YouTube Playlist Watcher & Downloader
最后再在 src/main.rs
里组合一下~就差不多成了!
当然,为了响应 Ctrl+C 事件,额外用了一个 ctrlc
的 crate~
完整的 src/main.rs
如下~
#[macro_use] extern crate lazy_static; extern crate ctrlc; extern crate chrono; extern crate reqwest; extern crate select; mod youtube; use crate::youtube::{PlaylistWatcher, PlaylistWatcherConfig, VideoConsumer}; use std::error::Error; use std::fs::File; use std::io::BufReader; use std::panic; use std::path::Path; use std::sync::mpsc; use std::vec::Vec; lazy_static! { static ref CONFIG: Vec<PlaylistWatcherConfig> = from_cli_args(); } fn main() { // this channel will communicate between main thread and ctrlc handler let (exit_sender, exit_event_receiver) = mpsc::channel(); // set handler for Ctrl+C ctrlc::set_handler(move || { // which will notify the receiver on main thread exit_sender.send(()).unwrap(); println!("[INFO] Ctrl+C event received"); }).expect("Error setting Ctrl-C handler"); // create global youtube video consumer let mut global_youtube_video_consumer = VideoConsumer::new(); let video_sender = global_youtube_video_consumer.get_sender(); // lazy load config files let mut watchers: Vec<PlaylistWatcher> = CONFIG.iter().map(|playlist_watcher_config| { // then maps every playlist to a watcher let watcher = PlaylistWatcher::new(playlist_watcher_config, video_sender.clone()); // and start watching watcher.start() }).collect::<Vec<PlaylistWatcher>>(); // start consuming message global_youtube_video_consumer.consume(); // if we recevied Ctrl+C event let _ = exit_event_receiver.recv().unwrap(); // then we shutdown all watcher println!("[INFO] Shutting down..."); for watcher in &mut watchers { watcher.stop() } println!("[INFO] Done"); } fn from_cli_args() -> Vec<PlaylistWatcherConfig> { let args: Vec<String> = std::env::args().collect(); if args.len() < 2 { panic!("[ERROR] no config file provided."); } match load_config(&*args[1]) { Ok(value) => value, Err(e) => panic!("[ERROR] Cannot read config file {}", e), } } fn load_config<P: AsRef<Path>>(conf_path: P) -> Result<Vec<PlaylistWatcherConfig>, Box<dyn Error>> { // https://docs.serde.rs/serde_json/fn.from_reader.html#example // Open the file in read-only mode with buffer. let file = File::open(conf_path)?; let reader = BufReader::new(file); // Read the JSON contents of the file as an instance of `Vec<Playlist>`. let u = serde_json::from_reader(reader)?; // Return the `Vec<Playlist>` Ok(u) }
16. 后记
其实还可以重构一下 Downloader
和 VideoConsumer
的部分。想法是拆出一个 VideoDownloader
,加一个 PostDownload
VideoConsumer
直接把Video
实例发给VideoDownloader
,- 再由
VideoDownloader
再选出用户要求的DownloadableStream
, - 然后使用
Downloader
下载好VideoStream
和AudioStream
, - 最后交给
PostDownload
去调用用户定义的post_download
但是!这两天写了这么大一堆之后,我觉得可能有一小段时间不想写 Rust 了哈哈哈哈哈哈哈♪(´ε` )