从零开始的 Rust 学习笔记(13) —— YouTube Playlist Watcher & Downloader

最近在 YouTube 上听了很多 Harutya 的作品,然后也看了各种 Talk 的视频。在电脑上看的时候,对感兴趣的视频可以很方便的用 Python 的 youtube-dl 直接下下来,但是在 iPad 或者手机上的时候就不那么方便了。

  1. 整体思路
  2. 创建公开的 YouTube Playlist
  3. Playlist Watcher Config 配置文件
  4. Rust 代码模块组织
  5. PlaylistWatcherConfig
  6. YouTubeError
  7. Decode Percentage Encoded URL
  8. Access YouTube API
    1. HTTP Client
    2. Get All Video ID in Playlist
    3. Get Video Info
    4. Code of src/youtube/api.rs
  9. Stream
    1. Stream utility functions
    2. DownloadableStream Trait
    3. Code of src/youtube/stream.rs
  10. Video
    1. VideoStream
    2. AudioStream
    3. Video 类
  11. Playlist
  12. Downloader
  13. Watch Playlist and Download
    1. VideoConsumerMessage
    2. VideoConsumer
    3. PlaylistWatcher
    4. Code of src/youtube/video_consumer.rs
    5. Code of src/youtube/playlist_watcher.rs
  14. YouTube module / crate
  15. YouTube Playlist Watcher & Downloader
  16. 后记

项目的源代码同时也在我的 GitHub 上~ #/watchyou

1. 整体思路

那么想了一个 workaround ——

先在 YouTube 上创建一个公开的 Playlist,然后看到有想要下载的视频之后,就把它放进这个 Playlist 里面。假设这个 Playlist 的 URL 是

https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G

接下来,用代码每隔一段时间抓取一次这个 Playlist 的网页,然后 parse 出在这个 Playlist 里的所有 Video 的 ID。

下一步的话,就是用 YouTube 的 API 去获取每个 Video ID 所对应的视频的信息。

最后 parse 出来 API 返回数据里面的每一个 Video 的音频流和视频流的下载地址,并且下载即可~

2. 创建公开的 YouTube Playlist

比如我们这里创建一个名为 save2disk 的公开 Playlist

然后就可以拿到这个 Playlist 对应的 URL 了~

这里的的 URL 是

https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G

3. Playlist Watcher Config 配置文件

既然是用代码每隔一段时间抓取一次这个 Playlist 的网页,那就写得通用一点~比如可以支持多个 Playlist,并且可以设置不同的 Playlist 下载到不同的目录里,以及每个 Playlist 抓取的时间间隔等等~那么相应的配置文件大概就是这样的~

[
    {
        "playlist": "https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G",
        "interval": 10,
        "saveto": "/Users/cocoa/YouTube",
        "downloader": "default",
        "post_download": "export videofile=\$videofile; export audiofile=\$audiofile; export videoext=\$vidoext; export audioext=\$audioext; ffmpeg -i \$videofile -i \$audiofile -acodec aac -vcodec copy \${videofile%%\$extension}.mkv",
        "proxy": "http://127.0.0.1:1087",
        "allow_fallback": true
    }
]

因为要支持多个 Playlist,所以 Root Object 是一个数组。数组里的每一个元素则是一个 Playlist Watcher Config 的实例。这里每一个 Key 的含义如下~

  1. playlistString 类型。这个就是 Playlist 所对应的 URL 了。然后因为懒得在代码做过多的 handling,所以这里默认 URL 是正确、完整的
  2. intervalu64 类型。这里考虑了一下 YouTube API 也许会限制频率,所以默认的每次检查 Playlist 的时间单位是分钟(对绝大多数情况应该都蛮够用了)。
  3. savetoString 类型。指定了这个 Playlist 里包含的 Video 会被下载到哪个目录。同样的,这里会默认目录存在,并且有权限读写(当然,实际上要是不能创建文件或者不能写入的话,还是做了 error handling)
  4. downloaderString 类型。其实暂时还没用到,默认的 Downloader 会用我自己一会写的坑_(:3」∠)_。然后在考虑 “外包” 给 Python 的 youtube-dl
  5. post_downloadString 类型。暂时还没有用上,但是还是先写上了~因为 YouTube 上高质量的视频流和音频流是分开的,所以需要在下载完成之后再做一些处理。直接在代码里调用 ffmpeg 什么也行,但是这样分离出来的话自由度会更高一些。代码里设想的是把 $videofile 替换为下载好的视频流文件路径(使用 " 括起来),把 $audiofile 替换为下载好的音频流文件路径,以及音频流和视频流的文件扩展名 $audioext$videoext
  6. proxyString 类型。访问 YouTube API 的代理。不过这里因为我们会用到的 HTTP 库是 reqwest,所以能支持的代理似乎暂时只有 HTTP 的,SOCKS5 类型的还没有支持。当然,如果不使用 Proxy 就可以访问 YouTube API 的话,那就留个空字符串 "" 即可(●°u°●)​ 」
  7. allow_fallbackbool 类型。假如 reqwest 不能正确处理 proxy 的话,是否允许回退到直接访问 YouTube API。如果设置为 false 的话,在 reqwest 不能正确处理 proxy 的时候就会直接 panic

4. Rust 代码模块组织

因为这个坑涉及到的模块会蛮多的,因此这里先把代码的模块组织贴上来~

.
├── Cargo.toml
└── src
    ├── config.json
    ├── main.rs                          # main app
    └── youtube
        ├── api.rs                       # YouTube API Access
        ├── audio_stream.rs              # YouTube Audio Stream Model
        ├── downloader.rs                # YouTube Video Downloader
        ├── error.rs                     # YouTubeError
        ├── mod.rs                       # To export public mod of youtube 
        ├── playlist.rs                  # YouTube Playlist Model
        ├── playlist_watcher.rs          # YouTube Playlist Watcher
        ├── playlist_watcher_config.rs   # YouTube Playlist Watcher Config
        ├── stream.rs                    # Audio Stream and Video Stream related functions and trait
        ├── urldecode.rs                 # Decode percentage encoded URL function
        ├── video.rs                     # YouTube Video Model
        ├── video_consumer.rs            # Consume Video Download Request
        └── video_stream.rs              # YouTube Video Stream Model

5. PlaylistWatcherConfig 类

我们现在需要实现的是 PlaylistWatcherConfig 类,src/youtube/playlist_watcher_config.rs

我们使用 JSON 格式保存配置文件,然后直接用 serdeDeserialize trait,以便可以直接从文件里读出来然后给我们 PlaylistWatcherConfig 的实例。

use serde::Deserialize;

#[derive(Deserialize, Debug, Clone)]
pub struct PlaylistWatcherConfig {
    pub playlist: String,
    pub interval: u64,
    pub saveto: String,
    pub downloader: String,
    pub post_download: String,
    pub proxy: String,
    pub allow_fallback: bool,
}

这样一来在 src/main.rs 里就可以直接尝试加载配置文件了~

lazy_static! {
    static ref CONFIG: Vec<PlaylistWatcherConfig> = from_cli_args();
}

fn from_cli_args() -> Vec<PlaylistWatcherConfig> {
    let args: Vec<String> = std::env::args().collect();
    if args.len() < 2 {
        panic!("[ERROR] no config file provided.");
    }
    
    match load_config(&*args[1]) {
        Ok(value) => value,
        Err(e) => panic!("[ERROR] Cannot read config file {}", e),
    }
}

fn load_config<P: AsRef<Path>>(conf_path: P) -> Result<Vec<PlaylistWatcherConfig>, Box<dyn Error>> {
    // https://docs.serde.rs/serde_json/fn.from_reader.html#example

    // Open the file in read-only mode with buffer.
    let file = File::open(conf_path)?;
    let reader = BufReader::new(file);

    // Read the JSON contents of the file as an instance of `Vec<PlaylistWatcherConfig>`.
    let u = serde_json::from_reader(reader)?;

    // Return the `Vec<PlaylistWatcherConfig>`
    Ok(u)
}

6. YouTubeError

显然这个坑里各种地方都可能会有问题,于是我们可以写个自己的 YouTubeError 类。这也方便我们之后在写各种函数时直接一层一层的往上抛 YouTubeError

当然,这个类可以设计得比较简单,主要就是记录了字符串信息的报错在 reason 里。然后实现了 std::fmt::Displaystd::error::Error 这两个 traits。还是很简单的,于是注释就不写了╮(╯▽╰)╭

完整的 src/youtube/error.rs 类如下

use std::error::Error;
use std::fmt::{self, Display, Formatter};
use std::string::String;

#[derive(Debug)]
pub struct YouTubeError {
    pub reason: String,
}

impl Display for YouTubeError {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.reason)
    }
}

impl Error for YouTubeError {
    fn description(&self) -> &str {
        &*self.reason
    }
}

7. Decode Percentage Encoded URL

Percentage Encoded URL 比如 https%3A%2F%2Fgithub.com,它多用来嵌套在 URL 里进行传值。编码规则也很简单啦,比如 : 的 16 进制为 3A,那么编码之后就是 %3A

要 decode 的话,那就直接读到 % 之后,把接下来的两个字符 parse 成 1 个 byte,最后再转为 u8 (也就是 Rust 里的 char 啦)

这里偷个懒, urldecode 就直接拿了 Rosetta Code 上的代码( ´▽`)

完整的 src/youtube/urldecode.rs 如下~

/// https://www.rosettacode.org/wiki/URL_decoding#Rust
fn append_frag(text: &mut String, frag: &mut String) {
    if !frag.is_empty() {
        let encoded = frag.chars()
            .collect::<Vec<char>>()
            .chunks(2)
            .map(|ch| {
                u8::from_str_radix(&ch.iter().collect::<String>(), 16).unwrap()
            }).collect::<Vec<u8>>();
        text.push_str(&std::str::from_utf8(&encoded).unwrap());
        frag.clear();
    }
}
 
pub fn urldecode(text: &str) -> String {
    let mut output = String::new();
    let mut encoded_ch = String::new();
    let mut iter = text.chars();
    while let Some(ch) = iter.next() {
        if ch == '%' {
            encoded_ch.push_str(&format!("{}{}", iter.next().unwrap(), iter.next().unwrap()));
        } else {
            append_frag(&mut output, &mut encoded_ch);
            output.push(ch);
        }
    }
    append_frag(&mut output, &mut encoded_ch);
    output
}

8. Access YouTube API

那么现在就可以写从 YouTube API 获取各种信息的坑,也就是 src/youtube.api.rs

8.1 HTTP Client

总的来说就是根据我们 PlaylistWatcherConfig 里面的 proxyallow_fallback 来生成一个 HTTP Client,同时也要支持设置 HTTP Request Header,最后请求不同的数据都用这个 HTTP Client 去做~

8.2 Get All Video ID in Playlist

抓取 Playlist 里所有的 Video ID 的话,就只能先看看 Playlist 页面的源码了~

在测试之后发现,所有的 Video ID 都在 tr 里的 data-video-id 属性里~

<tr class="pl-video yt-uix-tile "
    data-video-id="bu_86J8gbmA"
    data-set-video-id=""
    data-title="Harutya 春茶 best cover playlist - Harutya 春茶 best songs of all time - Best cover of Harutya 春茶">

8.3 Get Video Info

要获取 Video Info 的话,需要上面的 Video ID,比如说 bu_86J8gbmA,然后可以通过这个 YouTube API 去请求

https://www.youtube.com/get_video_info?video_id=bu_86J8gbmA

返回回来的其实是一个符合 URL Query 规范的字符串,里面包含了很多参数,其中 playerResponse 是我们比较关心的~我们用 PHP 来验证一下

<?php
    $aContext = array(
        'http' => array(
            'proxy'           => 'tcp://127.0.0.1:1087',
            'request_fulluri' => true,
        ),
    );
    $cxContext = stream_context_create($aContext);
    $content = file_get_contents('https://www.youtube.com/get_video_info?video_id=bu_86J8gbmA', false, $cxContext);
    parse_str($content, $query);
    printf("query parameters: \n - %s\n\n\n", join("\n - ", array_keys($query)));
    printf("parsed player_response: \n");
    print_r(json_decode($query["player_response"], true));
?>

输出的结果截取部分如下~

query parameters: 
 - c
 - player_response
 - hl
 - innertube_context_client_version
 - innertube_api_key
 - csn
 - innertube_api_version
 - host_language
 - watermark
 - cver
 - csi_page_type
 - vss_host
 - enablecsi
 - gapi_hint_params
 - status
 - cr
 - fexp
 - root_ve_type
 - fflags

 
parsed player_response: 
Array
(
    [playabilityStatus] => Array
        (
            [status] => OK
            [playableInEmbed] => 1
        )

    [streamingData] => Array
        (
            [expiresInSeconds] => 21540
            [formats] => Array
                (
...

是的~在 player_response 里,其实是一个 JSON 字符串,再对其 json_decode 之后,就能看到 streamingData 等内容了~其中的 formatsadaptiveFormats 里就包含了各种音频流和视频流~

随便拿一个视频流举例子?~

{
    "itag": 398,
    "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1577022319&ei=Dx__XfuPIc-UkwbT5Ltg&ip=...",
    "mimeType": "video\/mp4; codecs=\"av01.0.05M.08\"",
    "bitrate": 990238,
    "width": 1280,
    "height": 720,
    "initRange": {
        "start": "0",
        "end": "699"
    },
    "indexRange": {
        "start": "700",
        "end": "11123"
    },
    "lastModified": "1575857942294141",
    "contentLength": "179709401",
    "quality": "hd720",
    "fps": 6,
    "qualityLabel": "720p",
    "projectionType": "RECTANGULAR",
    "averageBitrate": 311522,
    "colorInfo": {
        "primaries": "COLOR_PRIMARIES_BT709",
        "transferCharacteristics": "COLOR_TRANSFER_CHARACTERISTICS_BT709",
        "matrixCoefficients": "COLOR_MATRIX_COEFFICIENTS_BT709"
    },
    "approxDurationMs": "4614999"
},

以及来一个音频流~

{
    "itag": 249,
    "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1576848511&ei=H3j8XencDpaukwbd1o3YDg&ip=...",
    "mimeType": "audio/webm; codecs=\"opus\"",
    "bitrate": 71995,
    "initRange": {
        "start": "0",
        "end": "265"
    },
    "indexRange": {
        "start": "266",
        "end": "8325"
    },
    "lastModified": "1555729332528048",
    "contentLength": "30486124",
    "quality": "tiny",
    "projectionType": "RECTANGULAR",
    "averageBitrate": 52846,
    "audioQuality": "AUDIO_QUALITY_LOW",
    "approxDurationMs": "4615081",
    "audioSampleRate": "48000",
    "audioChannels": 2
},

8.4 Code of src/youtube/api.rs

那么完整的 src/youtube/api.rs 如下~不过这里其实就没有做 URL Query Parsing,而是直接去找了 player_response 的位置,然后截取到下一个 & 之前。

extern crate reqwest;

use reqwest::{Client, Proxy, header};
use select::document::Document;
use select::predicate::Name;
use std::error::Error;
use std::string::String;
use super::urldecode::urldecode;
use super::error::YouTubeError;

/// get http client with proxy and optinnal header
///
/// # Example
///
/// ```
/// // With proxy enabled, allow fallback, and custom header
/// let mut headers = header::HeaderMap::new();
/// let field = "SOME HEADER";
/// let value = "SOME VALUE";
/// headers.insert(field, header::HeaderValue::from_str(value).unwrap());
/// get_client("http://127.0.0.1:1087", true, Some(header));
///
/// // With proxy enabled and allow fallback
/// get_client("http://127.0.0.1:1087", true, None);
/// 
/// // Without proxy
/// get_client("", true);
/// ```
pub fn get_client(with_proxy: &str, allow_fallback: bool, header: Option<header::HeaderMap>) -> Client {
    let mut client_builder = Client::builder();
    
    // try to add proxy for http client
    match Proxy::all(&*with_proxy) {
        Ok(proxy) => client_builder = client_builder.proxy(proxy),
        Err(e) => match allow_fallback {
            // if fallback is prohibited
            // then panic with error message
            false => panic!(format!("[ERROR] Proxy `{}` is invalid and fallback is disabled: {}\n", with_proxy, e)),
            true => (),
        }
    };
    
    // also check for headers
    if let Some(header_map) = header {
        // if we have some headers
        client_builder = client_builder.default_headers(header_map);
    }
    
    // build http client
    match client_builder.build() {
        Ok(client) => client,
        Err(e) => panic!(format!("[ERROR] Error occurred while build http client: {}\n", e)),
    }
}

/// get sts param with client
///
/// # Example
///
/// ```
/// let client = get_client("http://127.0.0.1:1087", true, None);
/// get_sts(&client);
/// ```
///
/// # Note
/// this function will be used in `get_video_info(&Client, &str) -> Result<serde_json::Value, YouTubeError>`
pub fn get_sts(client: &Client) -> Result<String, Box<dyn Error>> {
    // get `sts` from https://www.youtube.com/yts/jsbin/player-vflYXLM5n/en_US/base.js
    // try to grab the playlist webpage
    let mut resp = client.get("https://www.youtube.com/yts/jsbin/player-vflYXLM5n/en_US/base.js").send()?;
    match resp.status().is_success() {
        // if succeeded
        true => {
            // try to get the content of response
            let content = resp.text()?;
            // then try to extract `,sts={:token},`
            let sts_param = ",sts:";
            if let Some(mut sts_param_index) = content.find(sts_param) {
                // `{:token}` starts after the `,sts=`
                sts_param_index += sts_param.len();
                
                // get everything since `{:token}`
                let partial_sts_string = &content[sts_param_index..];
                
                // expecting another comma behined `{:token}`
                if let Some(index) = partial_sts_string.find(",") {
                    // this suggest that we do have a `,` followed by `{:token}`
                    return Ok(String::from(&partial_sts_string[..index]))
                }
            }
            println!("[WARNING] Cannot fetch `sts` at the moment");
            Ok(String::new())
        },
        // otherwise
        false => {
            // don't panic, just print warning
            println!("[WARNING] Cannot fetch `sts` at the moment");
            Ok(String::new())
        }
    }
}

/// get all video ids from playlist with client and url
///
/// # Example
///
/// ```
/// let client = get_client("http://127.0.0.1:1087", true, None);
/// get_playlist_from_url(&client, "https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G");
/// ```
pub fn get_playlist_from_url(client: &Client, playlist_url: &str) -> Result<Vec<String>, YouTubeError> {
    // try to grab the playlist webpage
    let resp = match client.get(&*playlist_url).send() {
        Ok(resp) => resp,
        Err(e) => return Err(YouTubeError { reason: format!("Error occurred while sending GET request to {}: {}", playlist_url, e) }),
    };
    
    match resp.status().is_success() {
        // if succeeded
        true => {
            // then parse the response HTML
            match Document::from_read(resp) {
                // if successfully parsed
                Ok(parsed) => {
                    // given that the videos are listed as
                    // ```
                    //    <tr class="pl-video yt-uix-tile " 
                    //        data-title="(女性声音)LEMON(柠檬)/米津玄師(完整版/KOBASOLO和春茶翻唱)" 
                    //        data-video-id="clU8c2fpk2s" 
                    //        data-set-video-id="">
                    // ```
                    // so we need to find all `<tr></tr>` with attribute `data-video-id="{:video_id}"`
                    let videos: Vec<String> = parsed.find(Name("tr"))
                        .filter_map(|n| n.attr("data-video-id"))
                        .map(|video_id| String::from(video_id))
                        .collect();
                    Ok(videos)
                },
                // if failed in parsing
                Err(e) => Err(YouTubeError { reason: format!("Error occurred while parsing HTML dom: {}", e) }),
            }
        },
        // if cannot grab the webpage of given playlist
        false => Err(YouTubeError { reason: format!("Cannot fetch playlist `{}` at the moment", playlist_url) })
    }
}

/// get video info from YouTube API with client and id
///
/// # Example
///
/// ```
/// let client = get_client("http://127.0.0.1:1087", true, None);
/// get_playlist_from_url(&client, "clU8c2fpk2s");
/// ```
pub fn get_video_info(client: &Client, video_id: &str) -> Result<serde_json::Value, YouTubeError> {
    // get `sts` param
    let sts = match get_sts(&client) {
        Ok(sts) => sts,
        Err(e) => {
            println!("[WARNING] Cannot get `sts` param at the moment: {}", e);
            String::from("")
        },
    };
    
    // build video info url
    let video_info_url = format!("https://www.youtube.com/get_video_info?video_id={}&sts={}", video_id, sts);
    
    // send request
    let mut resp = match client.get(&*video_info_url).send() {
        Ok(resp) => resp,
        Err(e) => return Err(YouTubeError { reason: format!("Error occurred while sending GET request to {}: {}", video_info_url, e) }),
    };
    match resp.status().is_success() {
        // if the server did answer to the request
        true => {
            // try to get the content of response
            let content = match resp.text() {
                Ok(content) => content,
                Err(e) => return Err(YouTubeError { reason: format!("Error occurred while reading response body of {}: {}", video_info_url, e)}),
            };
            // then try to extract `player_response={:json_string}`
            let player_response_param = "player_response=";
            if let Some(mut player_response_index) = content.find(player_response_param) {
                // if we did find `player_response={:json_string}` in response
                // that suggests YouTube did answer the request without any issue
                
                // the `{:json_string}` should start at `player_response_param + player_response_param.len()`
                player_response_index += player_response_param.len();
                let partial_json_string = &content[player_response_index..];
                
                // however, the whole repsonse body is a URI query
                //  ```status=ok&c=WEB&player_response=......&...```
                // so we need to where `{:json_string}` stops
                // which means to find out the next `&`
                let delimeter_index;
                if let Some(index) = partial_json_string.find("&") {
                    // this suggest that we do have another `&` followed by `{:json_string}`
                    delimeter_index = index;
                } else {
                    // but chances are that `player_response` is the last parameter of the query string
                    delimeter_index = partial_json_string.len();
                }
                
                // then we can get the whole `{:json_string}` without other uri query string
                let json_string = &partial_json_string[0..delimeter_index];
                
                // but we need to do a urldecode on `{:json_string}` part
                // to convert all percentage encoded char to its original form
                let mut json_string = urldecode(&json_string);
                
                // and also replace all `\u0026` to `&`
                json_string = json_string.replace("\\u0026", "&");
                
                // and replace all `+` to ` `
                json_string = json_string.replace("+", " ");
                
                // try to parse `json_string` into object
                match serde_json::from_str(&json_string) {
                    // if `json_string` can be parsed into json object
                    // then return it
                    Ok(json) => Ok(json),
                    // otherwise throw error message
                    Err(e) => Err(YouTubeError { reason: format!("Cannot parsing json for video `{}`: {}", video_info_url, e) })
                }
            } else {
                // if no anchor string found in response
                // then throw error message
                Err(YouTubeError { reason: format!("No anchor string `{}` found in response", player_response_param) })
            }
        },
        // if the server did NOT answer to the request
        // don't panic, just print warning
        false => Err(YouTubeError { reason: format!("Error occured while fetching video info: {}", video_info_url) }),
    }
}

9. Stream

9.1 Stream utility functions

因为在 YouTube API 里面有时数字直接作为 Value 出现的,比如 23333,有时又是在字符串里面,比如 "233333"。因此就需要一个 utility function 来帮我们 check, cast 或者 parse.

9.2 DownloadableStream Trait

在上面 YouTube API 的返回中,既有 Audio Stream 也有 Video Stream,为了之后写起来方便,于是就写了个 DownloadableStream 的 trait。主要的函数是返回下载的 URL,文件大小以及根据 MIME Type 猜测的文件扩展名~

9.3 Code of src/youtube/stream.rs

这里似乎还是挺直观的,就是声明了一个 trait(说实话真的很像 Objective-C 里的 protocol)。

剩下的就是一些辅助函数了,里面几乎是各种 check,而且也没办法写成模版函数之类的_(:3」∠)_

use serde_json;
use super::error::YouTubeError;

pub trait DownloadableStream {
    fn url(&self) -> &str;
    fn content_length(&self) -> usize;
    fn extension(&self) -> &str;
}

pub fn check_null_or_i64(info: &serde_json::Value, property_name: &str) -> Result<i64, YouTubeError> {
    match info[property_name].is_null() {
        true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }),
        false => {
            match info[property_name].is_i64() {
                true => Ok(info[property_name].as_i64().unwrap()),
                false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to i64", property_name) }),
            }
        }
    }
}

pub fn check_null_or_str(info: &serde_json::Value, property_name: &str) -> Result<String, YouTubeError> {
    match info[property_name].is_null() {
        true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }),
        false => {
            match info[property_name].is_string() {
                true => Ok(String::from(info[property_name].as_str().unwrap())),
                false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to String", property_name) }),
            }
        }
    }
}

pub fn check_null_or_u64(info: &serde_json::Value, property_name: &str) -> Result<u64, YouTubeError> {
    match info[property_name].is_null() {
        true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }),
        false => {
            match info[property_name].is_u64() {
                true => Ok(info[property_name].as_u64().unwrap()),
                false => Err(YouTubeError{ reason: format!("`{}` cannot be converted to u64", property_name) }),
            }
        }
    }
}

pub fn check_null_or_parse_u64(info: &serde_json::Value, property_name: &str) -> Result<u64, YouTubeError> {
    match info[property_name].is_null() {
        true => Err(YouTubeError{ reason: format!("No `{}` found", property_name) }),
        false => {
            match info[property_name].is_u64() {
                true => Ok(info[property_name].as_u64().unwrap()),
                false => {
                    let string = check_null_or_str(info, property_name)?;
                    match string.parse::<u64>() {
                        Ok(value) => Ok(value),
                        Err(e) => Err(YouTubeError{ reason: format!("`{}` cannot be converted to u64: {}", property_name, e) }),
                    }
                },
            }
        }
    }
}

10. Video

正如之前看到的那样,一个 YouTube Video 可以有多个不同清晰度和质量的 Video Stream 和 Audio Stream 提供

那么我们来先看看 Video Stream~

10.1 VideoStream

怎么说呢,基本上就是把 JSON 翻译成对应的 Rust struct 吧,这里省略了某些 key(因为不是每个视频流都有,而且有一些 Video Stream 是“加密”的,等到下一次来做加密的 Video Stream 的处理吧)

然后对于每一个 key 都需要 check 它的类型,这里当然就用到前面 Stream 里面的 utility functions 啦~

为了方便对比,把之前的 JSON 也复制一个过来/

{
    "itag": 398,
    "url": "https://r1---sn-4jpm0n-nwje.googlevideo.com/videoplayback?expire=1577022319&ei=Dx__XfuPIc-UkwbT5Ltg&ip=...",
    "mimeType": "video\/mp4; codecs=\"av01.0.05M.08\"",
    "bitrate": 990238,
    "width": 1280,
    "height": 720,
    "initRange": {
        "start": "0",
        "end": "699"
    },
    "indexRange": {
        "start": "700",
        "end": "11123"
    },
    "lastModified": "1575857942294141",
    "contentLength": "179709401",
    "quality": "hd720",
    "fps": 6,
    "qualityLabel": "720p",
    "projectionType": "RECTANGULAR",
    "averageBitrate": 311522,
    "colorInfo": {
        "primaries": "COLOR_PRIMARIES_BT709",
        "transferCharacteristics": "COLOR_TRANSFER_CHARACTERISTICS_BT709",
        "matrixCoefficients": "COLOR_MATRIX_COEFFICIENTS_BT709"
    },
    "approxDurationMs": "4614999"
},

完整的 src/youtube/video_stream.rs 如下~

use super::error::YouTubeError;
use super::stream::{self, DownloadableStream};

#[derive(Debug, Clone)]
pub struct VideoStream {
    pub video_id: String,
    pub video_title: String,
    pub itag: i64,
    pub url: String,
    pub mime_type: String,
    pub bitrate: i64,
    pub last_modified: u64,
    pub content_length: u64,
    pub width: u64,
    pub height: u64,
    pub quality: String,
}

impl VideoStream {
    pub fn from_json(video_info: &serde_json::Value, video_id: &String, video_title: &String) -> Result<VideoStream, YouTubeError> {
        Ok(VideoStream {
            video_id:           video_id.clone(),
            video_title:        video_title.clone(),
            itag:               stream::check_null_or_i64(&video_info, "itag")?,
            url:                stream::check_null_or_str(&video_info, "url")?,
            mime_type:          stream::check_null_or_str(&video_info, "mimeType")?,
            bitrate:            stream::check_null_or_i64(&video_info, "bitrate")?,
            last_modified:      stream::check_null_or_parse_u64(&video_info, "lastModified")?,
            content_length:     stream::check_null_or_parse_u64(&video_info, "contentLength")?,
            width:              stream::check_null_or_u64(&video_info, "width")?,
            height:             stream::check_null_or_u64(&video_info, "height")?,
            quality:            stream::check_null_or_str(&video_info, "quality")?,
        })
    }
}

impl DownloadableStream for VideoStream {
    fn url(&self) -> &str {
        &*self.url
    }
    
    fn content_length(&self) -> usize {
        self.content_length as usize
    }
    
    fn extension(&self) -> &str {
        match self.mime_type.trim().split(';').nth(0) {
            Some(extension) => match extension {
                "video/mp4" => "mp4",
                "video/webm" => "webm",
                _ => match extension.split('/').nth(1) {
                        Some(hint) => hint,
                        _ => "webm",
                     },
            },
            _ => "webm",
        }
    }
}

10.2 AudioStream

那么下一个不就是 Audio Stream 了嘛,几乎和 VideoStream 一模一样的处理方式~

use super::error::YouTubeError;
use super::stream::{self, DownloadableStream};

#[derive(Debug, Clone)]
pub struct AudioStream {
    pub video_id: String,
    pub video_title: String,
    pub itag: i64,
    pub url: String,
    pub mime_type: String,
    pub bitrate: i64,
    pub last_modified: u64,
    pub content_length: u64,
    pub quality: String,
    pub projection_type: String,
    pub average_bitrate: u64,
    pub audio_quality: String,
    pub approx_duration_ms: u64,
    pub audio_sample_rate: u64,
    pub audio_channels: u64,
}

impl AudioStream {
    pub fn from_json(video_info: &serde_json::Value, video_id: &String, video_title: &String) -> Result<AudioStream, YouTubeError> {
        Ok(AudioStream {
            video_id:           video_id.clone(),
            video_title:        video_title.clone(),
            itag:               stream::check_null_or_i64(&video_info, "itag")?,
            url:                stream::check_null_or_str(&video_info, "url")?,
            mime_type:          stream::check_null_or_str(&video_info, "mimeType")?,
            bitrate:            stream::check_null_or_i64(&video_info, "bitrate")?,
            last_modified:      stream::check_null_or_parse_u64(&video_info, "lastModified")?,
            content_length:     stream::check_null_or_parse_u64(&video_info, "contentLength")?,
            quality:            stream::check_null_or_str(&video_info, "quality")?,
            projection_type:    stream::check_null_or_str(&video_info, "projectionType")?,
            average_bitrate:    stream::check_null_or_u64(&video_info, "averageBitrate")?,
            audio_quality:      stream::check_null_or_str(&video_info, "audioQuality")?,
            approx_duration_ms: stream::check_null_or_parse_u64(&video_info, "approxDurationMs")?,
            audio_sample_rate:  stream::check_null_or_parse_u64(&video_info, "audioSampleRate")?,
            audio_channels:     stream::check_null_or_parse_u64(&video_info, "audioChannels")?,
        })
    }
}

impl DownloadableStream for AudioStream {
    fn url(&self) -> &str {
        &*self.url
    }
    
    fn content_length(&self) -> usize {
        self.content_length as usize
    }
    
    fn extension(&self) -> &str {
        match self.mime_type.trim().split(';').nth(0) {
            Some(extension) => match extension {
                "audio/mp4" => "m4a",
                "audio/webm" => "opus",
                _ => match extension.split('/').nth(1) {
                        Some(hint) => hint,
                        _ => "opus",
                    },
            },
            _ => "m4a",
        }
    }
}

10.3 Video 类

在有了 VideoStream 和 AudioStream 之后,就可以写 Video 类了~

1) 首先 Video 肯定有 idtitle,然后就是一组 VideoStream 和 一组 AudioStream

2) 然后构造 Video 的实例的话,则考虑

2.a) 在已经有 YouTube API 获取到的 Video Info 结果的 JSON 之后,parse 成相应的一组 VideoStream 和 一组 AudioStream,然后创建 Video 实例

2.b) 直接传 Video ID,然后通过 YouTube API 去获取到 Video Info,接下来传入 2.a) 处理~

3) 最后的话,再多加一个选择最好质量的 VideoStreamAudioStream 的函数~

那么完整的 src/youtube/video.rs 如下~

use serde_json;
use std::vec::Vec;
use super::api::{get_client, get_video_info};
use super::error::YouTubeError;
use super::audio_stream::AudioStream;
use super::video_stream::VideoStream;

#[derive(Debug)]
pub struct Video {
    pub id: String,
    pub title: String,
    pub audio_stream: Vec<AudioStream>,
    pub video_stream: Vec<VideoStream>,
}

impl Video {
    /// get video from its id
    ///
    /// # Example
    ///
    /// ```
    /// // With proxy enabled
    /// Video::from_video_id("https://www.youtube.com/watch?v=Z8oxYSEUnuU", "http://127.0.0.1:1087", true);
    /// 
    /// // Withput proxy
    /// Video::from_video_id("https://www.youtube.com/watch?v=Z8oxYSEUnuU", "", true);
    /// ```
    pub fn from_video_id(id: &str, proxy: &str, allow_fallback: bool) -> Result<Video, YouTubeError> {
        // get http client
        let client = get_client(proxy, allow_fallback, None);
        
        // try to get its info
        let video_info_json = get_video_info(&client, id)?;
        // try to instantiate Video from the video info json
        Video::from_video_info_json(&video_info_json)
    }
    
    pub fn from_video_info_json(video_info: &serde_json::Value) -> Result<Video, YouTubeError> {
        // the json object should contains "streamingData"
        //
        // ```
        //     "videoDetails": {
        //       "videoId": "bu_86J8gbmA",
        //       "title": "Harutya \u6625\u8336 best cover playlist - Harutya \u6625\u8336 best songs of all time - Best cover of Harutya \u6625\u8336",
        //       ...
        //     },
        //     "streamingData": {
        //       "expiresInSeconds": "21540",
        //         "adaptiveFormats": [
        //            {
        //               ...
        //            },
        //         ],
        //         "formats": [
        //           {
        //             ...
        //           },
        //        ],
        //     }
        // ```
        
        // get video details
        let video_details = &video_info["videoDetails"];
        
        // get video id and title
        let (video_id, video_title) = match video_details.is_null() {
            false => {
                let video_id = &video_details["videoId"];
                let video_title = &video_details["title"];
                match (video_id.is_string(), video_title.is_string()) {
                    (true, true) => (video_id.as_str().unwrap().to_string(), video_title.as_str().unwrap().to_string()),
                    (true, false) => (video_id.as_str().unwrap().to_string(), String::from("unknown-video-title")),
                    (false, true) => (String::from("unknown-video-id"), video_title.as_str().unwrap().to_string()),
                    (false, false) => (String::from("unknown-video-id"), String::from("unknown-video-title")),
                }
            },
            true => (String::from("unknown-video-id"), String::from("unknown-video-title")),
        };
        
        // we will also try to get `streamingData`
        let streaming_data = &video_info["streamingData"];
        match streaming_data.is_null() {
            // if there is `streamingData`
            false => {
                // then try to extract `adaptiveFormats` and `formats`
                let adaptive_formats = &streaming_data["adaptiveFormats"];
                let formats = &streaming_data["formats"];
                
                // merge all formats into a single array
                let all_formats = match (adaptive_formats.is_array(), formats.is_array()) {
                    (true, true)   => {
                        let mut all_formats_array = adaptive_formats.as_array().unwrap().clone();
                        all_formats_array.extend(formats.as_array().unwrap().iter().cloned());
                        Ok(all_formats_array)
                    },
                    (true, false)  => Ok(formats.as_array().unwrap().clone()),
                    (false, true)  => Ok(adaptive_formats.as_array().unwrap().clone()),
                    (false, false) => Err(YouTubeError{ reason: "No downloadable formats available".to_string() }),
                }?;
                
                let mut audio_streams: Vec<AudioStream> = Vec::new();
                let mut video_streams: Vec<VideoStream> = Vec::new();
                
                all_formats.iter().for_each(|format| {
                    let mime_type = &format["mimeType"];
                    if mime_type.is_string() {
                        match &mime_type.as_str().unwrap()[..5] {
                            "audio" => {
                                match AudioStream::from_json(format, &video_id, &video_title) {
                                    Ok(audio_stream) => {
                                        audio_streams.push(audio_stream);
                                    },
                                    Err(_) => (),
                                }
                            },
                            "video" => {
                                match VideoStream::from_json(format, &video_id, &video_title) {
                                    Ok(video_stream) => {
                                        video_streams.push(video_stream);
                                    },
                                    Err(_) => (),
                                }
                            },
                            _ => println!("[WARNING] Unknown mime type: {}", mime_type),
                        }
                    }
                });
                
                match (audio_streams.len(), video_streams.len()) {
                    (0, 0) => Err(YouTubeError{ reason: "Neither available audio stream nor video stream available".to_string() }),
                    (_, _) => Ok(Video {
                        id: video_id,
                        title: video_title,
                        audio_stream: audio_streams,
                        video_stream: video_streams,
                    })
                }
            },
            // otherwise
            true => Err(YouTubeError{ reason: "No `streamingData` found".to_string() }),
        }
    }
    
    pub fn pick_best_stream(&self) -> (Option<AudioStream>, Option<VideoStream>) {
        let mut best_audio: Option<AudioStream> = None;
        let mut best_video: Option<VideoStream> = None;
        let mut current_max_size = 0;
        let mut current_max_audio_bitrate = 0;
        let mut current_max_video_bitrate = 0;
        
        for audio_stream in &self.audio_stream {
            if audio_stream.bitrate > current_max_audio_bitrate {
                current_max_audio_bitrate = audio_stream.bitrate;
                best_audio = Some(audio_stream.clone());
            }
        }
        
        for video_stream in &self.video_stream {
            if video_stream.mime_type.starts_with("video/webm") {
                let size = video_stream.width * video_stream.height;
                if size >= current_max_size {
                    current_max_size = size;
                    if video_stream.bitrate > current_max_video_bitrate {
                        current_max_video_bitrate = video_stream.bitrate;
                        best_video = Some(video_stream.clone());
                    }
                }
            }
        }
        
        (best_audio, best_video)
    }
}

11. Playlist

因为一个播放列表里可以有多个 Video 嘛,所以在写好 Video 类之后,我们就可以来写 Playlist 类啦~

这里的话,一个是用于获取某个播放列表所有的 Video ID 的 associated function~另一个则是给出 Playlist 的 URL,然后返回一个 Playlist 的实例~

完整的 src/youtube/playlist.rs 如下~

use super::api::{get_client, get_playlist_from_url};
use super::error::YouTubeError;
use super::video::Video;

#[derive(Debug)]
pub struct Playlist {
    pub video: Vec<Video>,
}

impl Playlist {
    /// get all video ids in the playlist with proxy
    ///
    /// # Example
    ///
    /// ```
    /// // With proxy enabled
    /// Playlist::get_all_video_ids("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "http://127.0.0.1:1087", true);
    /// 
    /// // Withput proxy
    /// Playlist::get_all_video_ids("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "", true);
    /// ```
    pub fn get_all_video_ids(url: &str, proxy: &str, allow_fallback: bool) -> Result<Vec<String>, YouTubeError> {
        // get http client
        let client = get_client(proxy, allow_fallback, None);
        get_playlist_from_url(&client, url)
    }
    
    /// get playlist from url with proxy
    ///
    /// # Example
    ///
    /// ```
    /// // With proxy enabled
    /// Playlist::from_url("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "http://127.0.0.1:1087", true);
    /// 
    /// // Withput proxy
    /// Playlist::from_url("https://www.youtube.com/playlist?list=PLmPVZgHRcD6ZzbLAxHcP5FJRUOsUP5g4G", "", true);
    /// ```
    pub fn from_url(url: &str, proxy: &str, allow_fallback: bool) -> Result<Playlist, YouTubeError> {
        let video: Vec<Video> = Playlist::get_all_video_ids(url, proxy, allow_fallback)?.iter().map(|video_id| {
            // try to get Video from its video id
            Video::from_video_id(video_id, proxy, allow_fallback)
        }).filter_map(|maybe_video| {
            // filter all unsuccessful request
            match maybe_video {
                Ok(video) => Some(video),
                Err(e) => {
                    println!("[ERROR] Error occurred while instantiating `Video`: {}", e);
                    None
                },
            }
        }).collect();
        
        Ok(Playlist {
            video: video
        })
    }
}

12. Downloader

终于到了重点之一了(*^3^)

其实在有了前面的东西之后,我们完全可以实现一个 Rust 版的 youtube-dl 了~当然在我们写完这个 Downloader 类之后,以及部分“加密”的视频流需要额外处理一下╮( ̄▽ ̄"")╭

虽然暂时没做“加密”视频的处理,但是 Downloader 类还是可以先写起来的~因为“加密”视频要下载的话,只是在 url 里增加了一个解密之后的 signature 参数

这里 Downloader 理论上应该接收的是 DownloadableStreamPlaylistWatcherConfig,但是现在考虑到想做 AudioStreamVideoStream 的合并,就同时接收了 AudioStreamVideoStream。不过写起来的时候确实感觉到不太科学,因此这里下一版应该会再重构一下。

在开始下载的时候,在同目录下创建一个空白文件用于标示正在下载~比如 clU8c2fpk2s.downloading

然后 YouTube 上无论是下载 AudioStream 还是 VideoStream,似乎都不能一次性请求完,于是 buffer 就设置成 10Mbytes,然后通过 HTTP 的 Range header 来指定范围。

最后就是把每次下载回来的数据写入文件,当写入成功之后,判断 HTTP 请求的 Range 的 end 是否等于 AudioStreamVideoStreamcontent_length

相等就说明下载完成~并且在同目录下创建一个相应的隐藏的空白文件,比如 .clU8c2fpk2s.audio-session 或者 .clU8c2fpk2s.video-session 用于判断是否下载完成~

不相等的话,就继续构造下一个请求~

use reqwest::{Client, header};
use std::error::Error;
use std::fs::File;
use std::io::prelude::*;
use std::sync::Arc;
use std::thread;
use super::api::get_client;
use super::audio_stream::AudioStream;
use super::error::YouTubeError;
use super::playlist_watcher_config::PlaylistWatcherConfig;
use super::stream::DownloadableStream;
use super::video_stream::VideoStream;

#[derive(Debug)]
pub struct Downloader {
    pub audio: AudioStream,
    pub video: VideoStream,
    pub config: PlaylistWatcherConfig,
}

impl Downloader {
    pub fn new(audio: AudioStream, video: VideoStream, config: &PlaylistWatcherConfig) -> Self {
        Downloader {
            audio: audio,
            video: video,
            config: config.clone(),
        }
    }
    
    pub fn start(mut self) {
        thread::spawn(move || {
            println!("[INFO] start downloading `[{}] {}`", self.video.video_id, self.video.video_title);
            self.download()
        });
    }
    
    fn download(&mut self) {
        let _ = File::create(format!("{}/{}.downloading", self.config.saveto, self.video.video_id));
        match self.download_impl(true) {
            Ok(()) => match self.download_impl(false) {
                Ok(()) => (),
                Err(e) => println!("[ERROR] Cannot download audio stream: {}", e),
            },
            Err(e) => println!("[ERROR] Cannot download video stream: {}", e),
        }
    }
    
    fn download_partial(client: &Client, url: &str) -> Result<Arc<Vec<u8>>, Box<dyn Error>> {
        let mut res = client.get(url).send()?;
        let mut buf: Vec<u8> = vec![];
        res.copy_to(&mut buf)?;
        Ok(Arc::new(buf))
    }
    
    fn download_impl(&mut self, is_video: bool) -> Result<(), YouTubeError> {
        // generate file path
        let extension = match is_video {
            true => self.video.extension(),
            false => self.audio.extension(),
        };

        let filename = format!("{}/{}-{}.{}", self.config.saveto, self.video.video_title, self.video.video_id, extension);
        let mut file = match File::create(&*filename) {
            Ok(fc) => fc,
            Err(e) => return Err(YouTubeError { reason: format!("Cannot create file `{}`: {}", &*filename, e)} ),
        };
        
        // buffer 10 MB
        let buffer_size: usize = 1024 * 1024 * 10;
        let mut start: usize = 0;
        let mut end: usize = start + buffer_size;
        let mut retry = 0;
        let max_retry = 3;
        
        let content_length = match is_video {
            true => self.video.content_length(),
            false => self.audio.content_length(),
        };
        
        while retry < max_retry {
            if end > content_length {
                end = content_length;
            }
            
            println!("[INFO] {}: [{}, {}/{}]", self.video.video_id, start, end, content_length);

            let mut headers = header::HeaderMap::new();
            let range = format!("bytes={}-{}", start, end);
            headers.insert("Range", header::HeaderValue::from_str(&*range).unwrap());
            let client = get_client(&self.config.proxy, self.config.allow_fallback, Some(headers));
            match Downloader::download_partial(&client, &self.video.url) {
                Ok(buf) => {
                    // write to file
                    match file.write_all(&buf) {
                        Ok(()) => {
                            if end == content_length {
                                let filepath = match is_video {
                                    true => {
                                        println!("[INFO] Downloaded video stream of `[{}] {}`", self.video.video_id, self.video.video_title);
                                        format!("{}/.{}.video-session", self.config.saveto, self.video.video_id)
                                    },
                                    false => {
                                        println!("[INFO] Downloaded audio stream of `[{}] {}`", self.video.video_id, self.video.video_title);
                                        format!("{}/.{}.audio-session", self.config.saveto, self.video.video_id)
                                    }
                                };
                                let _ = File::create(&*filepath);
                                return Ok(())
                            }
                        },
                        Err(e) => return Err(YouTubeError {reason: format!("Cannot write to file `{}`: {}", &*filename, e)} )
                    };
                    start += buf.len();
                    end += buf.len();
                },
                Err(e) => {
                    retry += 1;
                    println!("[ERROR] Error occurred while downloading, will retry: {}", e);
                },
            };
        }
        Err(YouTubeError {reason: "Download failed, exceeded max retries".to_string() } )
    }
}

13. Watch Playlist and Download

这个就是典型的 Producer 和 Consumer 啦~

这里的 Producer 就是 PlaylistWatcher,Consumer 则是 VideoConsumer

同时,我们设计允许同时 watch 多个不同的 playlist,因此是 Multiple Producer and Single Consumer,而这种模式在 Rust 里通信的话,正好就可以用到 std::sync::mpsc。另外,mpsc 其实就是 Multiple Producer and Single Consumer 的缩写~

13.1 VideoConsumerMessage

我们先暂不考虑 Producer 怎么写,先来看看 Consumer 这边。

首先就是决定 Consumer 这边每次收到的消息 VideoConsumerMessage,都应该包含什么。既然是要下载的话,那么 Video ID 少不了,其次下载到什么地方是需要的,同时 proxy 的设置也需要。也就是 String 类型的 video_idPlaylistWatcherConfig 类型的 config

13.2 VideoConsumer

在收到 VideoConsumerMessage 之后,也就拿到了 video_id,那么利用 Video::from_video_id 就可以获取到 Video 的实例,接下来再用 Video.pick_best_stream 方法就能拿到这个实例对应的最好质量的 VideoStreamAudioStream

于是就可以丢进前面的 Downloader 里下载啦~

13.3 PlaylistWatcher

那么对于 PlaylistWatcher 来说的话,它需要的是什么呢~

首先肯定要有对应的 PlaylistWatcherConfig;接下来就是那个 Single Consumer 对应的 sender,有了 sender 才能给那边发消息~

在 start watching 之后,就可以利用 Playlist::get_all_video_ids 来获取那个 playlist 当前有哪些 Video ID;

接下来判断有哪些 Video ID 是还没有开始下载到指定目录的~过滤之后的 Video ID 就可以连带着 PlaylistWatcherConfig 组成一个 VideoConsumerMessage,然后用 sender 发过去了~

13.4 Code of src/youtube/video_consumer.rs

use std::option::Option;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Sender, Receiver};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::downloader::Downloader;
use super::playlist_watcher_config::PlaylistWatcherConfig;
use super::video::Video;

#[derive(Debug, Clone)]
pub struct VideoConsumerMessage {
    pub video_id: String,
    pub config: PlaylistWatcherConfig,
}

#[derive(Debug)]
pub struct VideoConsumer {
    sender: Arc<Mutex<Sender<VideoConsumerMessage>>>,
    receiver: Arc<Mutex<Receiver<VideoConsumerMessage>>>,
    should_stop: Arc<AtomicBool>,
    thread: Option<JoinHandle<()>>,
}

impl Drop for VideoConsumer {
    fn drop(&mut self) {
        self.should_stop.store(true, Ordering::Relaxed);
        if let Some(thread) = self.thread.take() {
            thread.join().unwrap();
        }
    }
}

impl VideoConsumer {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::channel();
        VideoConsumer {
            should_stop: Arc::new(AtomicBool::new(false)),
            thread: None,
            sender: Arc::new(Mutex::new(sender)),
            receiver: Arc::new(Mutex::new(receiver)),
        }
    }
    
    pub fn consume(&mut self) {
        // consume in new thread
        // listen for stop
        let receiver = Arc::clone(&self.receiver);
        let should_stop = Arc::clone(&self.should_stop);
        self.thread = Some(thread::spawn(move || {
            let timeout = Duration::from_secs(1);
            loop {
                // break if should stop
                if should_stop.load(Ordering::Relaxed) { break; }
                
                // this wait messages from its sender
                match receiver.lock().unwrap().recv_timeout(timeout) {
                    // this will be an instance of YouTubeConsumerMessage
                    Ok(message) => {
                        match Video::from_video_id(&message.video_id, &message.config.proxy, message.config.allow_fallback) {
                            Ok(video) => {
                                let (best_audio, best_video) = video.pick_best_stream();
                                if let Some(video_info) = best_video {
                                    if let Some(audio_info) = best_audio {
                                        // todo: find downloader
                                        let downloader = Downloader::new(audio_info, video_info, &message.config);
                                        downloader.start();
                                    } else {
                                        println!("[ERROR] Cannot find any audio track for: {}", message.video_id);
                                    }
                                } else {
                                    println!("[ERROR] Cannot find any video track for: {}", message.video_id);
                                }
                            },
                            Err(e) => println!("[ERROR] Cannot get video from video id while preparing download: {}", e),
                        }
                    },
                    Err(_) => (),
                }
            }
        }));
    }
    
    pub fn get_sender(&self) -> Arc<Mutex<Sender<VideoConsumerMessage>>> {
        self.sender.clone()
    }
}

13.5 Code of src/youtube/playlist_watcher.rs

extern crate reqwest;
extern crate select;

use crossbeam_channel::tick;
use std::option::Option;
use std::path::Path;
use std::string::String;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::time::Duration;
use std::thread;
use super::playlist::Playlist;
use super::playlist_watcher_config::PlaylistWatcherConfig;
use super::video_consumer::VideoConsumerMessage;

pub struct PlaylistWatcher {
    config: PlaylistWatcherConfig,
    sender: Arc<Mutex<Sender<VideoConsumerMessage>>>,
    should_stop: Arc<AtomicBool>,
    thread: Option<thread::JoinHandle<()>>,
}

impl Drop for PlaylistWatcher {
    fn drop(&mut self) {
        self.should_stop.store(true, Ordering::Relaxed);
        if let Some(thread) = self.thread.take() {
            thread.join().unwrap();
        }
    }
}

impl PlaylistWatcher {
    pub fn new(playlist_watcher_config: &PlaylistWatcherConfig, sender: Arc<Mutex<Sender<VideoConsumerMessage>>>) -> Self {
        PlaylistWatcher { 
            config: playlist_watcher_config.clone(),
            sender: sender,
            should_stop: Arc::new(AtomicBool::new(false)),
            thread: None,
        }
    }

    pub fn start(mut self) -> Self {
        let config = self.config.clone();
        let should_stop = Arc::clone(&self.should_stop);
        let sender = Arc::clone(&self.sender);
        self.thread = Some(thread::spawn(move || {
            let one_second = tick(Duration::from_secs(1));
            // convert minute to second
            let interval = config.interval.clone() * 60;
            loop {
                // wait for user defined intervals
                for _ in 0..interval {
                    // tick 1 sec at a time and check for exit
                    one_second.recv().unwrap();
                    if should_stop.load(Ordering::Relaxed) { return; }
                }
                
                match Playlist::get_all_video_ids(&config.playlist, &config.proxy, config.allow_fallback) {
                    // if successfully get ids of corresponding videos
                    Ok(video_ids) => {
                        // then for each of them
                        video_ids.iter().filter(|video_id| {
                            // build session file path for the video
                            let video_download_session = format!("{}/{}.downloading", config.saveto, video_id);
                            // check whether we already got the session file for the video or not
                            !Path::new(&*video_download_session).exists()
                        })
                        .map(|video_id| {
                            // for all videos that has no session file/
                            // generate a VideoConsumerMessage
                            VideoConsumerMessage {
                                video_id: String::from(video_id),
                                config: config.clone(),
                            }
                        }).for_each(|video_consumer_message| {
                            // send each VideoConsumerMessage via the sender
                            sender.lock().unwrap().send(video_consumer_message).unwrap();
                        });
                    },
                    // if failed in fetching the webpage of that playlist
                    // don't panic, just print error message
                    Err(e) => println!("[ERROR] Error occurred while fetching playlist: {}", e),
                }
            }
        }));
        self
    }
    
    pub fn stop(&mut self) {
        self.should_stop.store(true, Ordering::Relaxed);
    }
}

14. YouTube module / crate

把上面的各种类都实现好之后,就可以组合成一个 module 或者 crate 了!

完整的 src/youtube/mod.rs 如下~

mod urldecode;
mod stream;

pub mod api;
pub mod audio_stream;
pub mod downloader;
pub mod error;
pub mod playlist;
pub mod playlist_watcher;
pub mod playlist_watcher_config;
pub mod video;
pub mod video_stream;
pub mod video_consumer;

pub use downloader::Downloader;
pub use error::YouTubeError;
pub use playlist::Playlist;
pub use playlist_watcher::PlaylistWatcher;
pub use playlist_watcher_config::PlaylistWatcherConfig;
pub use video::Video;
pub use video_stream::VideoStream;
pub use video_consumer::{VideoConsumer, VideoConsumerMessage};

15. YouTube Playlist Watcher & Downloader

最后再在 src/main.rs 里组合一下~就差不多成了!

当然,为了响应 Ctrl+C 事件,额外用了一个 ctrlc 的 crate~

完整的 src/main.rs 如下~

#[macro_use]
extern crate lazy_static;

extern crate ctrlc;
extern crate chrono;
extern crate reqwest;
extern crate select;

mod youtube;

use crate::youtube::{PlaylistWatcher, PlaylistWatcherConfig, VideoConsumer};
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::panic;
use std::path::Path;
use std::sync::mpsc;
use std::vec::Vec;

lazy_static! {
    static ref CONFIG: Vec<PlaylistWatcherConfig> = from_cli_args();
}

fn main() {
    // this channel will communicate between main thread and ctrlc handler
    let (exit_sender, exit_event_receiver) = mpsc::channel();
    
    // set handler for Ctrl+C
    ctrlc::set_handler(move || {
        // which will notify the receiver on main thread
        exit_sender.send(()).unwrap();
        println!("[INFO] Ctrl+C event received");
    }).expect("Error setting Ctrl-C handler");
    
    // create global youtube video consumer
    let mut global_youtube_video_consumer = VideoConsumer::new();
    let video_sender = global_youtube_video_consumer.get_sender();
    
    // lazy load config files
    let mut watchers: Vec<PlaylistWatcher> = CONFIG.iter().map(|playlist_watcher_config| {
        // then maps every playlist to a watcher
        let watcher = PlaylistWatcher::new(playlist_watcher_config, video_sender.clone());
        // and start watching
        watcher.start()
    }).collect::<Vec<PlaylistWatcher>>();
    
    // start consuming message
    global_youtube_video_consumer.consume();
    
    // if we recevied Ctrl+C event 
    let _ = exit_event_receiver.recv().unwrap();
    
    // then we shutdown all watcher
    println!("[INFO] Shutting down...");
    for watcher in &mut watchers { watcher.stop() }
    println!("[INFO] Done");
}

fn from_cli_args() -> Vec<PlaylistWatcherConfig> {
    let args: Vec<String> = std::env::args().collect();
    if args.len() < 2 {
        panic!("[ERROR] no config file provided.");
    }
    
    match load_config(&*args[1]) {
        Ok(value) => value,
        Err(e) => panic!("[ERROR] Cannot read config file {}", e),
    }
}

fn load_config<P: AsRef<Path>>(conf_path: P) -> Result<Vec<PlaylistWatcherConfig>, Box<dyn Error>> {
    // https://docs.serde.rs/serde_json/fn.from_reader.html#example

    // Open the file in read-only mode with buffer.
    let file = File::open(conf_path)?;
    let reader = BufReader::new(file);

    // Read the JSON contents of the file as an instance of `Vec<Playlist>`.
    let u = serde_json::from_reader(reader)?;

    // Return the `Vec<Playlist>`
    Ok(u)
}

16. 后记

其实还可以重构一下 DownloaderVideoConsumer 的部分。想法是拆出一个 VideoDownloader,加一个 PostDownload

  1. VideoConsumer 直接把 Video 实例发给 VideoDownloader
  2. 再由 VideoDownloader 再选出用户要求的 DownloadableStream
  3. 然后使用 Downloader 下载好 VideoStreamAudioStream
  4. 最后交给 PostDownload 去调用用户定义的 post_download

但是!这两天写了这么大一堆之后,我觉得可能有一小段时间不想写 Rust 了哈哈哈哈哈哈哈♪(´ε` )

Leave a Reply

Your email address will not be published. Required fields are marked *

8 + 17 =