在圣诞节的时候一个人回顾了一下 LoveLive μ's 3rd Live,然后就做了这个~从视频里提取每个时间点的主要色彩并生成一张大的图。
假如视频时长为 01:43:25,每隔 1 秒计算一次其画面的主要色彩,并且在画出高度为 120 像素,宽度为 1 像素的 colorline 的话,就可以组成下面这样图的啦~
那么理论上长度就是 103 * 60 + 25 = 6205 秒,也就是 6205 像素。但是实际上需要注意的是 FPS 的获取,视频时长的计算和如何选择帧。因为视频的 FPS 可能并不是一个整数,而是类似 29.97
这样的浮点数,可是在 OpenCV 里并不支持按秒读取。于是只好先获取视频的 FPS 与 FRAME_COUNT。
但是如果直接 as i32
的话,原本 29.97
的 FPS 就会变成 29
,那么显然对视频时长的计算就会出错,6205 * 29.97 / 29 = 6412 秒,足足多了 207 秒!(那么问题来了)同时也会导致生存的图片在时间轴上不够精确。因此在代码上需要注意一下~
那比如在 3135 像素附近,开始出现了很多蓝色的条~粗略算一下也就是视频的 52:15 附近,这个显然就是「賢い、かわいい —— エリーチカ!」
(下面两张截图是在对应的区段里随便找的~并不是严格对应到起点的)
然后到了 3310 像素附近是大段连续紫色出现的起点~也就是视频里 55:10 的地方~「希パワーたーっぷり注入 はーいっプシュ!」
那么实现起来的话,也并不算是很复杂~项目的源代码在 GitHub 上~#/colorline
1. 整体思路
首先需要视频文件的路径,然后用户要求每隔多少秒计算一次画面的主要色彩。接下来打开文件之后,计算视频以秒为单位的长度以及按照要求的话,需要产生多少根 Colorline。
随后启动一个抽取视频每一帧的线程,Video Extraction Thread
。因为视频并不能真正的随机访问某一帧(可参考视频编码原理, P 帧、I 帧的概念等),故将会顺序遍历一次所有的 frame。
接下来到了应该被抽取出来计算主要彩色的帧的时候,为了利用好 CPU 资源,肯定会放到别的线程上去计算,不会放在 Video Extraction Thread
里做 。但是每到一个都另起一个线程的话,显然一会儿线程的数量可能就爆炸了~
那么我们就用一个线程池来做。这个线程池里的每一个 worker 接收的参数是一个需要计算的画面,以及这个画面对应的 Colorline 的 index。一个 worker 一次只负责计算某一帧的主要色彩,然后将计算结果与对应的 Colorline 的 index用 channel
送回。
为什么不直接画在 Mat
上?理论上这些线程访问的内存资源都不会有冲突,但是 Rust 里 OpenCV bindings 的 Mat
是不能在线程之间共享的(要么就一路 unsafe 走起,但那样似乎不如直接用 C++ 写了)。
所以对于线程池 worker threads 的计算结果,我们需要另外 spawn 一个 Worker-Thread Gather Thread
,它负责收集所有 worker 计算的结果。如何判断收集完呢?我们已经有了视频长度与用户希望的间隔时间,那么提前就可以计算出一共会从 channel
中收到多少 message。
最后,用户可以 poll 我们的计算结果。如果已经计算完成,但是还没生成最后的图,那么就生成好 Mat
然后返回;如果还没好的话,就告知 InProgress
;要是中间什么环节出错的话,就返回 Err
好了,基本的想法确定下来了,下面就可以开始写大坑了(*^3^)
2. K-Means 聚类计算图片主要颜色
这个其实蛮久以前写过 C++ 版本的 2333333,K-means 聚类算法计算给定图像中主要颜色。那么这里基本上可以是直接翻译到 Rust 上
那么我们来拆解一下这句话「K-Means 聚类计算图片主要颜色」—— K-Means 聚类计算
图片主要
颜色
2.1 Color
首先我们关注到「颜色」。那肯定给「颜色」一个 struct 吧,BGR 三个属性跑不了。然后「颜色」之间至少可以比较相同还是不同,于是需要实现 std::cmp::{PartialEq, Eq}
这两个 traits。
再接着关注到「主要」,那哪怕使用统计的方法,也要知道图片上有的每一个不同的颜色被多少个 Pixel 使用了吧。因此需要一个 HashMap 来做 counter。这样的话,「颜色」就还需要实现一下 std::hash::Hash
的 trait。
那么完整的 src/colorline/color.rs
如下
use std::cmp::{Eq, PartialEq}; use std::hash::{Hash, Hasher}; #[derive(Debug, Clone, Copy)] pub struct Color { pub b: f64, pub g: f64, pub r: f64, } impl Color { pub fn new(b: f64, g: f64, r: f64) -> Self { Color {b,g,r} } } impl PartialEq for Color { fn eq(&self, other: &Self) -> bool { (self.b == other.b) && (self.g == other.g) && (self.r == other.r) } } impl Eq for Color { } impl Hash for Color { fn hash<H: Hasher>(&self, state: &mut H) { self.b.to_string().hash(state); self.g.to_string().hash(state); self.r.to_string().hash(state); } }
2.2 K-means 聚类
这个其实就很简单了,随机选取 k
个点作为 k
个类的中心,然后对于所有的点,都 assign 给离它最近的类,然后重新计算每个类的中心,以及 k
个类中,最大的新旧中心的距离 $d$。如果 $d$ 小于一个预设的值,比如 $1.0$,那么视为“稳定”,返回 $k$ 个类的中心。否则则迭代上述过程。
重新计算每个类的中心则是,用每个颜色 $C_i$ 的 BGR 分量,乘上这个颜色出现的次数 $C^i_c$,然后各分量相加的结果,除以出现次数平方的和 $\sum^{i=0}_n (C^i_c)$。
完整的 src/colorline/dominant_color.rs
如下,
use opencv::core::Vec3b; use opencv::core::Mat; use super::color::Color; use super::error::ColorlineError; use std::collections::HashMap; // (Color, count) type ColorCount = (Color, u64); type Cluster = Vec<Color>; type ClusteredPoints = HashMap<u32, Vec<ColorCount>>; /// Calculate `k` dominant color of given image /// /// # Example /// ``` /// let image = opencv::imgcodecs::imread("/PATH/TO/A/IMAGE", 1).expect("Invalid image file"); /// let color = dominant_color(&image, 3); /// ``` pub fn dominant_color(image: &Mat, k: u32) -> Result<Cluster, ColorlineError> { // try to get cols and rows of given image let cols = image.cols()?; let rows = image.rows()?; // color counter let mut color_counter: HashMap<Color, u64> = HashMap::new(); // calculate appeared times of each existing color // and given the memory layout // we access cols in outter loop for col in 0..cols { // and access rows in inner loop for row in 0..rows { // assuming CV_8U3C for each pixel let pixel: &Vec3b = image.at_2d(row, col)?; // new `Color` from pixel // using f64 (f32 will do too) for k-means calculation // otherwise the calculation will be divergent let color = Color::new(pixel[0] as f64, pixel[1] as f64, pixel[2] as f64); // and increase the corresponding entry by 1 if let Some(color_entry) = color_counter.get_mut(&color) { *color_entry += 1; } else { color_counter.insert(color, 1); } } } // transform HashMap to Vec for k-means algorithm let pixels: Vec<ColorCount> = color_counter.iter().map(|(&color, &count)| (color, count)).collect(); // k-means Ok(kmeans(&pixels, k)) } /// K-means fn kmeans(pixels: &Vec<ColorCount>, k: u32) -> Cluster { let mut clusters: Cluster = vec![]; let randmax: usize = pixels.len(); // randomly choose k points as initial cluster center for _ in 0..k { clusters.push(pixels[rand::random::<usize>() % randmax].0); } loop { // initialize k clusters in this round let mut points: ClusteredPoints = HashMap::new(); for i in 0..k { let value: Vec<ColorCount> = vec![]; points.insert(i, value); } // find the nearest cluster for each pixel for pixel in pixels { // get current pixel color let color = pixel.0; let mut nearest_distance = std::f64::MAX; let mut nearest_index: u32 = 0; // calcuate the distance to each cluster for i in 0..k { // calculate the distance between current pixel from i-th clsuter let distance = euclidean_distance(&color, &clusters[i as usize]); // if the distance is nearer if distance < nearest_distance { // update neatest distance nearest_distance = distance; // update the cluster id to current pixel nearest_index = i; } } // assign current pixel to its nearest cluster if let Some(points_entry) = points.get_mut(&nearest_index) { (*points_entry).push((pixel.0, pixel.1)); } } // recalculate center for each cluster let mut diff: f64 = 0.0; for i in 0..k { // store old center let old_center = clusters[i as usize]; // compute new center let new_center = compute_center(&points[&i]); // the distance that center moved let dist = euclidean_distance(&old_center, &new_center); // assign new center to cluster[i] clusters[i as usize] = new_center; // record max moved distance among `k` clusters if dist > diff { diff = dist; } } // if it's "stable" -- // we assuming it's stable if the max moved distance is below 1.0 if diff < 1.0 { break; } } clusters } fn euclidean_distance(a: &Color, b: &Color) -> f64 { let mut distance: f64 = 0.0; distance += f64::powf((a.b - b.b) as f64, 2.0); distance += f64::powf((a.g - b.g) as f64, 2.0); distance += f64::powf((a.r - b.r) as f64, 2.0); f64::sqrt(distance) } fn compute_center(colors: &Vec<ColorCount>) -> Color { let mut total_count: f64 = 0.0; let mut vals: (f64, f64, f64) = (0.0, 0.0, 0.0); colors.iter().for_each(|(color, count)| { let count = *count as f64; total_count += count; vals.0 += color.b * count; vals.1 += color.g * count; vals.2 += color.r * count; }); vals.0 /= total_count; vals.1 /= total_count; vals.2 /= total_count; Color::new(vals.0, vals.1, vals.2) }
3. Colorline
Colorline 这边就选一些重点的来说吧~按照前面的思路 ——
接下来到了应该被抽取出来计算主要彩色的帧的时候,为了利用好 CPU 资源,肯定会放到别的线程上去计算,不会放在 Video Extraction Thread
里做 。但是每到一个都另起一个线程的话,显然一会儿线程的数量可能就爆炸了~
那么我们就用一个线程池来做。这个线程池里的每一个 worker 接收的参数是一个需要计算的画面,以及这个画面对应的 Colorline 的 index。一个 worker 一次只负责计算某一帧的主要色彩,然后将计算结果与对应的 Colorline 的 index用 channel
送回。
同时,因为 extraction 的速度可能快于从画面中提取主要色彩的速度,因此需要在增加到 ThreadPool 之前判断一下正在队列中的任务有多少。这里的话,我在等待的队列数是用户要求的线程数的 3 倍的时候暂停,等到 queued tasks 降到少于用户要求的线程数时就可以继续了~
fn start_video_extraction_thread(&mut self) -> Result<(), ColorlineError> { // try to get frame count of requested video let total_frames = self.video_capture.lock().unwrap().get(CAP_PROP_FRAME_COUNT)? as f64; // try to get the fps let fps = self.video_capture.lock().unwrap().get(CAP_PROP_FPS)? as f64; // calculate video duration in seconds let seconds = (total_frames / fps) as i32; // generate a colorline with every `interval` of orginal video let interval = self.option.interval(); // calculate number of messages it should produce let expected_num_message = (seconds / interval) as u32; self.num_message.store(expected_num_message, Ordering::Relaxed); // user requested colorline width let colorline_width = self.option.pixels() as u32; // create ARC copy so that `video_caputure` can be moved to sub-thread of extracting frames let video_capture = Arc::clone(&self.video_capture); // create a clone of `should_stop` let should_stop = Arc::clone(&self.should_stop); // create a clone of `is_processing` let is_processing = Arc::clone(&self.is_processing); // copy number of threads let num_threads = self.num_threads; // clone threaded sender for workers let sender = self.worker_thread_sender.clone(); // video extraction self.extraction_thread = Some(thread::spawn(move|| -> Result<(), ColorlineError> { // create threadpool let pool = ThreadPool::new(num_threads); // initialize colorline index let mut colorline_index: u32 = 0; // and current frame to 1.0 as f64 let mut current_frame: f64 = 1.0; let mut expected_frame_index: i32 = 0; while current_frame <= total_frames { // try to read the next frame from video let mut frame = Mat::default()?; video_capture.lock().unwrap().read(&mut frame)?; // if successfully read if frame.size()?.width > 0 { // calculate current frame index // in Rust, when cast float to integer, // the operation is equivalent to floor() let floor_frame_index = (current_frame / fps) as i32; // if it's the first time it reaches to the next frame if floor_frame_index == expected_frame_index { // we will calculate the dominant color of this frame // also increase the next expected frame index by 1 expected_frame_index += 1; // if the number of queued tasks is 3 times as the number of threads if pool.queued_count() > num_threads * 3 { // then we wait until queued tasks is less than the number of threads println!("[INFO] waiting for queued tasks: {}", pool.queued_count()); let ticker = tick(Duration::from_millis(1000)); loop { ticker.recv().unwrap(); if pool.queued_count() < num_threads { break; } } } // worker context let worker_colorline_index = colorline_index; let worker_current_frame = floor_frame_index; let worker_sender = (&sender).clone(); let mut copied_frame = Mat::default()?; frame.copy_to(&mut copied_frame)?; // calculate dominant color pool.execute(move|| { let colors = dominant_color(&copied_frame, 3).expect(&*format!("Error occurred at frame: {}", worker_current_frame)); worker_sender.send(ColorlineWorkerMessage::new(worker_colorline_index, colors[0])).unwrap(); }); // move to the next colorline colorline_index += colorline_width; } } else { return Err(ColorlineError {reason: format!("Cannot read frame at {}", current_frame)}); } // check whether received stop event if should_stop.load(Ordering::Relaxed) { // if received // then stop processing is_processing.store(false, Ordering::Relaxed); // and just return ok return Ok(()) } // incr frame index by 1 current_frame += 1.0; } Ok(()) })); Ok(()) }
接下来对于线程池 worker threads 的计算结果,我们需要另外 spawn 一个 Worker-Thread Gather Thread
,它负责收集所有 worker 计算的结果。如何判断收集完呢?我们已经有了视频长度与用户希望的间隔时间,那么提前就可以计算出一共会从 channel
中收到多少 message。
fn start_worker_gather_thread(&mut self) { // create a clone of `should_stop` let should_stop = Arc::clone(&self.should_stop); // create a clone of `is_processing` let is_processing = Arc::clone(&self.is_processing); // create a clone of `num_message` let num_message = Arc::clone(&self.num_message); // clone threaded sender for workers let receiver = self.worker_thread_receiver.clone(); // clone internal result for workers let internal_result = Arc::clone(&self.internal_result); // clone internal result for workers let is_done = Arc::clone(&self.is_done); // gather worker result self.worker_gather_thread = Some(thread::spawn(move|| -> Result<(), ColorlineError> { let timeout = Duration::from_millis(500); loop { match receiver.recv_timeout(timeout) { Ok(message) => { let mut guarded_internal_result = internal_result.lock().unwrap(); guarded_internal_result.push(message); if num_message.load(Ordering::Relaxed) as usize == guarded_internal_result.len() { is_done.store(true, Ordering::Relaxed); should_stop.store(true, Ordering::Relaxed); break; } }, Err(_) => (), } // check whether received stop event if should_stop.load(Ordering::Relaxed) { // if received // then stop processing is_processing.store(false, Ordering::Relaxed); // and just break from loop break; } } Ok(()) })); }
最后,用户可以 poll 我们的计算结果。这里就定义一个 ColorlinePoll
的枚举类型
pub enum ColorlinePoll<'a> { // something went erong Err(ColorlineError), // proceeded colorlines, total colorlines InProgress(usize, usize), // oh yeah! Ready(&'a Mat), }
当用户 poll 的时候,如果还没好的话,在还没超时之前,每过 100 ms 就去查询一次是否计算完成;
如果超时了还没计算完成的话,则就告知 InProgress
;如果在超时前计算完成,或者已经计算完成,但是还没生成最后的图,那么就生成好 Mat
然后返回;要是中间什么环节出错的话,就返回 Err
。
pub fn recv_timeout(&mut self, timeout: Duration) -> ColorlinePoll { let has_result_generated = self.has_result_generated.lock().unwrap(); if has_result_generated.load(Ordering::Relaxed) { return self.get_result(); } let ticker = tick(Duration::from_millis(100)); let now = Instant::now(); loop { // query `is_done` if we are not yet timeout if now.elapsed() < timeout { // if it's not done yet if self.is_done.load(Ordering::Relaxed) == false { // then we wait for 100ms ticker.recv().unwrap(); } else { // otherwise, it's done now // then we calculate image size let colorline_width = self.option.pixels() as i32; let num_message = self.num_message.load(Ordering::Relaxed) as i32; // the rows / height should be self.option.height() // the cols / width should be colorline_width * num_message // however, because of the memory layout (and also partly due to the limitation of Rust OpenCV bindings) // we have to write every single pixel by ourselves // that's to say, to have a better performance as much as we can // we will access the image firstly by rows in the outter loop // and then by cols in the inner loop (for each column at i-th row...) // the image will be "rotated 90 degrees clockwise" when we allocate it let rows = colorline_width * num_message; let cols = self.option.height() as i32; let mut result = match Mat::new_rows_cols_with_default(rows, cols, CV_8UC3, Scalar::new(0.0, 0.0, 0.0, 0.0)) { Ok(mat) => mat, Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot allocate new opencv::core::Mat: {}", e) }), }; let colorlines = self.internal_result.lock().unwrap(); let height = self.option.height(); for colorline in colorlines.iter() { let row_start = colorline.index() as i32; let row_end = row_start + colorline_width; let color = colorline.color(); for row in row_start..row_end { for col in 0..height { // for each column at i-th row let pixel: &mut Vec3b = match result.at_2d_mut(row, col) { Ok(pixel) => pixel, Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot access pixel at ({}, {}): {}", row, col, e) }), }; // write calculated color pixel[0] = color.b as u8; pixel[1] = color.g as u8; pixel[2] = color.r as u8; } } } // now we try to rotate the image 90 degrees counter clockwise let mut rotated_result = match Mat::default() { Ok(mat) => mat, Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot allocate new opencv::core::Mat: {}", e) }), }; let _ = rotate(&result, &mut rotated_result, ROTATE_90_COUNTERCLOCKWISE); // store the result self.result = Some(rotated_result); has_result_generated.store(true, Ordering::Relaxed); return self.get_result(); } } else { // if we're timeout // report that we're in progress return ColorlinePoll::InProgress(self.internal_result.lock().unwrap().len(), self.num_message.load(Ordering::Relaxed) as usize) } } }