从零开始的 Rust 学习笔记(15) —— Colorline

在圣诞节的时候一个人回顾了一下 LoveLive μ's 3rd Live,然后就做了这个~从视频里提取每个时间点的主要色彩并生成一张大的图。

假如视频时长为 01:43:25,每隔 1 秒计算一次其画面的主要色彩,并且在画出高度为 120 像素,宽度为 1 像素的 colorline 的话,就可以组成下面这样图的啦~

那么理论上长度就是 103 * 60 + 25 = 6205 秒,也就是 6205 像素。但是实际上需要注意的是 FPS 的获取,视频时长的计算和如何选择帧。因为视频的 FPS 可能并不是一个整数,而是类似 29.97 这样的浮点数,可是在 OpenCV 里并不支持按秒读取。于是只好先获取视频的 FPS 与 FRAME_COUNT。

但是如果直接 as i32 的话,原本 29.97 的 FPS 就会变成 29,那么显然对视频时长的计算就会出错,6205 * 29.97 / 29 = 6412 秒,足足多了 207 秒!(那么问题来了)同时也会导致生存的图片在时间轴上不够精确。因此在代码上需要注意一下~

那比如在 3135 像素附近,开始出现了很多蓝色的条~粗略算一下也就是视频的 52:15 附近,这个显然就是「賢い、かわいい —— エリーチカ!」

(下面两张截图是在对应的区段里随便找的~并不是严格对应到起点的)

然后到了 3310 像素附近是大段连续紫色出现的起点~也就是视频里 55:10 的地方~「希パワーたーっぷり注入 はーいっプシュ!

那么实现起来的话,也并不算是很复杂~项目的源代码在 GitHub 上~#/colorline

1. 整体思路

首先需要视频文件的路径,然后用户要求每隔多少秒计算一次画面的主要色彩。接下来打开文件之后,计算视频以秒为单位的长度以及按照要求的话,需要产生多少根 Colorline。

随后启动一个抽取视频每一帧的线程,Video Extraction Thread。因为视频并不能真正的随机访问某一帧(可参考视频编码原理, P 帧、I 帧的概念等),故将会顺序遍历一次所有的 frame。

接下来到了应该被抽取出来计算主要彩色的帧的时候,为了利用好 CPU 资源,肯定会放到别的线程上去计算,不会放在 Video Extraction Thread 里做 。但是每到一个都另起一个线程的话,显然一会儿线程的数量可能就爆炸了~

那么我们就用一个线程池来做。这个线程池里的每一个 worker 接收的参数是一个需要计算的画面,以及这个画面对应的 Colorline 的 index。一个 worker 一次只负责计算某一帧的主要色彩,然后将计算结果与对应的 Colorline 的 index用 channel 送回。

为什么不直接画在 Mat 上?理论上这些线程访问的内存资源都不会有冲突,但是 Rust 里 OpenCV bindings 的 Mat 是不能在线程之间共享的(要么就一路 unsafe 走起,但那样似乎不如直接用 C++ 写了)。

所以对于线程池 worker threads 的计算结果,我们需要另外 spawn 一个 Worker-Thread Gather Thread,它负责收集所有 worker 计算的结果。如何判断收集完呢?我们已经有了视频长度与用户希望的间隔时间,那么提前就可以计算出一共会从 channel 中收到多少 message。

最后,用户可以 poll 我们的计算结果。如果已经计算完成,但是还没生成最后的图,那么就生成好 Mat 然后返回;如果还没好的话,就告知 InProgress;要是中间什么环节出错的话,就返回 Err

好了,基本的想法确定下来了,下面就可以开始写大坑了(*^3^)

2. K-Means 聚类计算图片主要颜色

这个其实蛮久以前写过 C++ 版本的 2333333,K-means 聚类算法计算给定图像中主要颜色。那么这里基本上可以是直接翻译到 Rust 上

那么我们来拆解一下这句话「K-Means 聚类计算图片主要颜色」—— K-Means 聚类计算 图片主要 颜色

2.1 Color

首先我们关注到「颜色」。那肯定给「颜色」一个 struct 吧,BGR 三个属性跑不了。然后「颜色」之间至少可以比较相同还是不同,于是需要实现 std::cmp::{PartialEq, Eq} 这两个 traits。

再接着关注到「主要」,那哪怕使用统计的方法,也要知道图片上有的每一个不同的颜色被多少个 Pixel 使用了吧。因此需要一个 HashMap 来做 counter。这样的话,「颜色」就还需要实现一下 std::hash::Hash 的 trait。

那么完整的 src/colorline/color.rs 如下

use std::cmp::{Eq, PartialEq};
use std::hash::{Hash, Hasher};

#[derive(Debug, Clone, Copy)]
pub struct Color {
    pub b: f64,
    pub g: f64,
    pub r: f64,
}

impl Color {
    pub fn new(b: f64, g: f64, r: f64) -> Self {
        Color {b,g,r}
    }
}

impl PartialEq for Color {
    fn eq(&self, other: &Self) -> bool {
        (self.b == other.b) && (self.g == other.g) && (self.r == other.r)
    }
}

impl Eq for Color {
}

impl Hash for Color {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.b.to_string().hash(state);
        self.g.to_string().hash(state);
        self.r.to_string().hash(state);
    }
}

2.2 K-means 聚类

这个其实就很简单了,随机选取 k 个点作为 k 个类的中心,然后对于所有的点,都 assign 给离它最近的类,然后重新计算每个类的中心,以及 k 个类中,最大的新旧中心的距离 $d$。如果 $d$ 小于一个预设的值,比如 $1.0$,那么视为“稳定”,返回 $k$ 个类的中心。否则则迭代上述过程。

重新计算每个类的中心则是,用每个颜色 $C_i$ 的 BGR 分量,乘上这个颜色出现的次数 $C^i_c$,然后各分量相加的结果,除以出现次数平方的和 $\sum^{i=0}_n (C^i_c)$。

完整的 src/colorline/dominant_color.rs 如下,

use opencv::core::Vec3b;
use opencv::core::Mat;
use super::color::Color;
use super::error::ColorlineError;
use std::collections::HashMap;

// (Color, count)
type ColorCount = (Color, u64);
type Cluster = Vec<Color>;
type ClusteredPoints = HashMap<u32, Vec<ColorCount>>;

/// Calculate `k` dominant color of given image
///
/// # Example
/// ```
/// let image = opencv::imgcodecs::imread("/PATH/TO/A/IMAGE", 1).expect("Invalid image file");
/// let color = dominant_color(&image, 3);
/// ```
pub fn dominant_color(image: &Mat, k: u32) -> Result<Cluster, ColorlineError> {
    // try to get cols and rows of given image
    let cols = image.cols()?;
    let rows = image.rows()?;
    
    // color counter
    let mut color_counter: HashMap<Color, u64> = HashMap::new();
    
    // calculate appeared times of each existing color
    // and given the memory layout
    // we access cols in outter loop
    for col in 0..cols {
        // and access rows in inner loop
        for row in 0..rows {
            // assuming CV_8U3C for each pixel
            let pixel: &Vec3b = image.at_2d(row, col)?;
            // new `Color` from pixel
            // using f64 (f32 will do too) for k-means calculation
            // otherwise the calculation will be divergent
            let color = Color::new(pixel[0] as f64, pixel[1] as f64, pixel[2] as f64);
            
            // and increase the corresponding entry by 1
            if let Some(color_entry) = color_counter.get_mut(&color) {
                *color_entry += 1;
            } else {
                color_counter.insert(color, 1);
            }
        }
    }
    
    // transform HashMap to Vec for k-means algorithm
    let pixels: Vec<ColorCount> = color_counter.iter().map(|(&color, &count)| (color, count)).collect();
    
    // k-means
    Ok(kmeans(&pixels, k))
}

/// K-means
fn kmeans(pixels: &Vec<ColorCount>, k: u32) -> Cluster {
    let mut clusters: Cluster = vec![];
    let randmax: usize = pixels.len();

    // randomly choose k points as initial cluster center
    for _ in 0..k {
        clusters.push(pixels[rand::random::<usize>() % randmax].0);
    }
    
    loop {
        // initialize k clusters in this round
        let mut points: ClusteredPoints = HashMap::new();
        for i in 0..k {
            let value: Vec<ColorCount> = vec![];
            points.insert(i, value);
        }
        
        // find the nearest cluster for each pixel
        for pixel in pixels {
            // get current pixel color
            let color = pixel.0;
            let mut nearest_distance = std::f64::MAX;
            let mut nearest_index: u32 = 0;
            
            // calcuate the distance to each cluster
            for i in 0..k {
                // calculate the distance between current pixel from i-th clsuter
                let distance = euclidean_distance(&color, &clusters[i as usize]);
                // if the distance is nearer
                if distance < nearest_distance {
                    // update neatest distance
                    nearest_distance = distance;
                    // update the cluster id to current pixel
                    nearest_index = i;
                }
            }
            
            // assign current pixel to its nearest cluster
            if let Some(points_entry) = points.get_mut(&nearest_index) {
                (*points_entry).push((pixel.0, pixel.1));
            }
        }
        
        // recalculate center for each cluster
        let mut diff: f64 = 0.0;
        for i in 0..k {
            // store old center
            let old_center = clusters[i as usize];
            // compute new center
            let new_center = compute_center(&points[&i]);
            // the distance that center moved
            let dist = euclidean_distance(&old_center, &new_center);
            // assign new center to cluster[i]
            clusters[i as usize] = new_center;
            // record max moved distance among `k` clusters
            if dist > diff {
                diff = dist;
            }
        }
        
        // if it's "stable" -- 
        //     we assuming it's stable if the max moved distance is below 1.0
        if diff < 1.0 {
            break;
        }
    }
    
    clusters
}

fn euclidean_distance(a: &Color, b: &Color) -> f64 {
    let mut distance: f64 = 0.0;
    distance += f64::powf((a.b - b.b) as f64, 2.0);
    distance += f64::powf((a.g - b.g) as f64, 2.0);
    distance += f64::powf((a.r - b.r) as f64, 2.0);
    f64::sqrt(distance)
}

fn compute_center(colors: &Vec<ColorCount>) -> Color {
    let mut total_count: f64 = 0.0;
    let mut vals: (f64, f64, f64) = (0.0, 0.0, 0.0);
    
    colors.iter().for_each(|(color, count)| {
        let count = *count as f64;
        total_count += count;
    
        vals.0 += color.b * count;
        vals.1 += color.g * count;
        vals.2 += color.r * count;
    });
    
    vals.0 /= total_count;
    vals.1 /= total_count;
    vals.2 /= total_count;
    
    Color::new(vals.0, vals.1, vals.2)
}

3. Colorline

Colorline 这边就选一些重点的来说吧~按照前面的思路 ——

接下来到了应该被抽取出来计算主要彩色的帧的时候,为了利用好 CPU 资源,肯定会放到别的线程上去计算,不会放在 Video Extraction Thread 里做 。但是每到一个都另起一个线程的话,显然一会儿线程的数量可能就爆炸了~

那么我们就用一个线程池来做。这个线程池里的每一个 worker 接收的参数是一个需要计算的画面,以及这个画面对应的 Colorline 的 index。一个 worker 一次只负责计算某一帧的主要色彩,然后将计算结果与对应的 Colorline 的 index用 channel 送回。

同时,因为 extraction 的速度可能快于从画面中提取主要色彩的速度,因此需要在增加到 ThreadPool 之前判断一下正在队列中的任务有多少。这里的话,我在等待的队列数是用户要求的线程数的 3 倍的时候暂停,等到 queued tasks 降到少于用户要求的线程数时就可以继续了~

fn start_video_extraction_thread(&mut self) -> Result<(), ColorlineError> {
    // try to get frame count of requested video
    let total_frames = self.video_capture.lock().unwrap().get(CAP_PROP_FRAME_COUNT)? as f64;
    // try to get the fps
    let fps = self.video_capture.lock().unwrap().get(CAP_PROP_FPS)? as f64;
    // calculate video duration in seconds
    let seconds = (total_frames / fps) as i32;
    // generate a colorline with every `interval` of orginal video
    let interval = self.option.interval();
    // calculate number of messages it should produce
    let expected_num_message = (seconds / interval) as u32;
    self.num_message.store(expected_num_message, Ordering::Relaxed);
    // user requested colorline width
    let colorline_width = self.option.pixels() as u32;
    
    // create ARC copy so that `video_caputure` can be moved to sub-thread of extracting frames
    let video_capture = Arc::clone(&self.video_capture);
    // create a clone of `should_stop`
    let should_stop = Arc::clone(&self.should_stop);
    // create a clone of `is_processing`
    let is_processing = Arc::clone(&self.is_processing);
    // copy number of threads
    let num_threads = self.num_threads;
    // clone threaded sender for workers
    let sender = self.worker_thread_sender.clone();
    
    // video extraction
    self.extraction_thread = Some(thread::spawn(move|| -> Result<(), ColorlineError> {
        // create threadpool
        let pool = ThreadPool::new(num_threads);
        
        // initialize colorline index
        let mut colorline_index: u32 = 0;
        // and current frame to 1.0 as f64
        let mut current_frame: f64 = 1.0;
        let mut expected_frame_index: i32 = 0;
        
        while current_frame <= total_frames {
            // try to read the next frame from video
            let mut frame = Mat::default()?;
            video_capture.lock().unwrap().read(&mut frame)?;
            
            // if successfully read
            if frame.size()?.width > 0 {
                // calculate current frame index
                // in Rust, when cast float to integer,
                // the operation is equivalent to floor()
                let floor_frame_index = (current_frame / fps) as i32;
                // if it's the first time it reaches to the next frame
                if floor_frame_index == expected_frame_index {
                    // we will calculate the dominant color of this frame
                    
                    // also increase the next expected frame index by 1
                    expected_frame_index += 1;
                    
                    // if the number of queued tasks is 3 times as the number of threads
                    if pool.queued_count() > num_threads * 3 {
                        // then we wait until queued tasks is less than the number of threads
                        println!("[INFO] waiting for queued tasks: {}", pool.queued_count());
                        let ticker = tick(Duration::from_millis(1000));
                        loop {
                            ticker.recv().unwrap();
                            if pool.queued_count() < num_threads {
                                break;
                            }
                        }
                    }
                    
                    // worker context
                    let worker_colorline_index = colorline_index;
                    let worker_current_frame = floor_frame_index;
                    let worker_sender = (&sender).clone();
                    
                    let mut copied_frame = Mat::default()?;
                    frame.copy_to(&mut copied_frame)?;
                    // calculate dominant color
                    pool.execute(move|| {
                        let colors = dominant_color(&copied_frame, 3).expect(&*format!("Error occurred at frame: {}", worker_current_frame));
                        worker_sender.send(ColorlineWorkerMessage::new(worker_colorline_index, colors[0])).unwrap();
                    });
                    
                    // move to the next colorline
                    colorline_index += colorline_width;
                }
            } else {
                return Err(ColorlineError {reason: format!("Cannot read frame at {}", current_frame)});
            }
            
            // check whether received stop event
            if should_stop.load(Ordering::Relaxed) {
                // if received
                // then stop processing
                is_processing.store(false, Ordering::Relaxed);
                // and just return ok
                return Ok(())
            }
            
            // incr frame index by 1
            current_frame += 1.0;
        }
        Ok(())
    }));
    Ok(())
}

接下来对于线程池 worker threads 的计算结果,我们需要另外 spawn 一个 Worker-Thread Gather Thread,它负责收集所有 worker 计算的结果。如何判断收集完呢?我们已经有了视频长度与用户希望的间隔时间,那么提前就可以计算出一共会从 channel 中收到多少 message。

fn start_worker_gather_thread(&mut self) {
    // create a clone of `should_stop`
    let should_stop = Arc::clone(&self.should_stop);
    // create a clone of `is_processing`
    let is_processing = Arc::clone(&self.is_processing);
    // create a clone of `num_message`
    let num_message = Arc::clone(&self.num_message);
    // clone threaded sender for workers
    let receiver = self.worker_thread_receiver.clone();
    // clone internal result for workers
    let internal_result = Arc::clone(&self.internal_result);
    // clone internal result for workers
    let is_done = Arc::clone(&self.is_done);
    // gather worker result
    self.worker_gather_thread = Some(thread::spawn(move|| -> Result<(), ColorlineError> {
        let timeout = Duration::from_millis(500);
        loop {
            match receiver.recv_timeout(timeout) {
                Ok(message) => {
                    let mut guarded_internal_result = internal_result.lock().unwrap();
                    guarded_internal_result.push(message);
                    if num_message.load(Ordering::Relaxed) as usize == guarded_internal_result.len() {
                        is_done.store(true, Ordering::Relaxed);
                        should_stop.store(true, Ordering::Relaxed);
                        break;
                    }
                },
                Err(_) => (),
            }
            // check whether received stop event
            if should_stop.load(Ordering::Relaxed) {
                // if received
                // then stop processing
                is_processing.store(false, Ordering::Relaxed);
                // and just break from loop
                break;
            }
        }
        
        Ok(())
    }));
}

最后,用户可以 poll 我们的计算结果。这里就定义一个 ColorlinePoll 的枚举类型

pub enum ColorlinePoll<'a> {
    // something went erong
    Err(ColorlineError),
    // proceeded colorlines, total colorlines
    InProgress(usize, usize),
    // oh yeah!
    Ready(&'a Mat),
}

当用户 poll 的时候,如果还没好的话,在还没超时之前,每过 100 ms 就去查询一次是否计算完成;

如果超时了还没计算完成的话,则就告知 InProgress;如果在超时前计算完成,或者已经计算完成,但是还没生成最后的图,那么就生成好 Mat 然后返回;要是中间什么环节出错的话,就返回 Err

pub fn recv_timeout(&mut self, timeout: Duration) -> ColorlinePoll {
    let has_result_generated = self.has_result_generated.lock().unwrap();
    if has_result_generated.load(Ordering::Relaxed) {
        return self.get_result();
    }

    let ticker = tick(Duration::from_millis(100));
    let now = Instant::now();
    loop {
        // query `is_done` if we are not yet timeout
        if now.elapsed() < timeout {
            // if it's not done yet
            if self.is_done.load(Ordering::Relaxed) == false {
                // then we wait for 100ms
                ticker.recv().unwrap();
            } else {
                // otherwise, it's done now
                // then we calculate image size
                let colorline_width = self.option.pixels() as i32;
                let num_message = self.num_message.load(Ordering::Relaxed) as i32;
                
                // the rows / height should be self.option.height()
                // the cols / width should be colorline_width * num_message
                // however, because of the memory layout (and also partly due to the limitation of Rust OpenCV bindings)
                // we have to write every single pixel by ourselves
                // that's to say, to have a better performance as much as we can
                // we will access the image firstly by rows in the outter loop
                // and then by cols in the inner loop (for each column at i-th row...)
                // the image will be "rotated 90 degrees clockwise" when we allocate it
                let rows = colorline_width * num_message;
                let cols = self.option.height() as i32;
                
                let mut result = match Mat::new_rows_cols_with_default(rows, cols, CV_8UC3, Scalar::new(0.0, 0.0, 0.0, 0.0)) {
                    Ok(mat) => mat,
                    Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot allocate new opencv::core::Mat: {}", e) }),
                };
                
                let colorlines = self.internal_result.lock().unwrap();
                let height = self.option.height();
                for colorline in colorlines.iter() {
                    let row_start = colorline.index() as i32;
                    let row_end = row_start + colorline_width;
                    let color = colorline.color();
                    for row in row_start..row_end {
                        for col in 0..height {
                            // for each column at i-th row
                            let pixel: &mut Vec3b = match result.at_2d_mut(row, col) {
                                Ok(pixel) => pixel,
                                Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot access pixel at ({}, {}): {}", row, col, e) }),
                            };
                            
                            // write calculated color
                            pixel[0] = color.b as u8;
                            pixel[1] = color.g as u8;
                            pixel[2] = color.r as u8;
                        }
                    }
                }
                
                // now we try to rotate the image 90 degrees counter clockwise
                let mut rotated_result = match Mat::default() {
                    Ok(mat) => mat,
                    Err(e) => return ColorlinePoll::Err(ColorlineError { reason: format!("Cannot allocate new opencv::core::Mat: {}", e) }),
                };
                let _ = rotate(&result, &mut rotated_result, ROTATE_90_COUNTERCLOCKWISE);
                
                // store the result
                self.result = Some(rotated_result);
                has_result_generated.store(true, Ordering::Relaxed);
                return self.get_result();
            }
        } else {
            // if we're timeout
            // report that we're in progress
            return ColorlinePoll::InProgress(self.internal_result.lock().unwrap().len(), self.num_message.load(Ordering::Relaxed) as usize)
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

nine + twelve =