实现线程调度
本章代码对应 commit :65ccec7b180cd56e645f359343a793e75a49eb98
概要
多线程并发执行需要调度器的辅助。调度器的作用是在合适的时刻选择线程执行,并在合适的时候切换线程,防止一个线程占用或多的资源或阻塞,从而实现 cpu 资源分配相对“公平”。本章我们将:
假想我们已经有了一个调度算法。
算法和数据结构是分离的,所以在实现调度器的时候不依赖于算法
创建线程池(thread pool)用于保存所有线程。
- 创建线程管理器(processor)通过调度算法管理 thread pool。
- 实现线程调度相关函数。
- 引入 Round Robin 调度算法。
调度算法
//in process/scheduler.rs
impl Scheduler {
pub fn new(max_time_slice: usize) -> Scheduler { /* TODO */ }
pub fn push(&mut self, tid: Tid) { /* TODO */ }
pub fn pop(&mut self) -> Option<Tid> { /* TODO */ }
pub fn tick(&mut self) -> bool { /* TODO */ }
pub fn exit(&mut self, tid: Tid) { /* TODO */ }
}
这是本章我们使用的调度算法的对外接口。功能包括创建(new)、添加线程(push)、获取即将被调用的线程(pop)、提醒算法时钟周期的到来(tick)和退出线程时将线程从调度算法中移出(exit)。
线程池(ThreadPool)
线程的运行状态包括但不限于:等待运行、运行中、睡眠和等待退出。这里我们创建一个枚举类型作为进程的状态类型:
// in process/mod.rs
pub type Tid = usize;
pub type ExitCode = usize;
// in process/structs.rs
use crate::process::{ Tid, ExitCode };
#[derive(Clone)]
pub enum Status {
Ready,
Running(Tid),
Sleeping,
Exited(ExitCode),
}
Tid 是线程 id 。就像每个人的身份证号都是不一样的,每一个线程都有独一无二的 id ,这时线程的标识。
线程池是用于存放线程的容器,只需要包含线程的信息和线程调度算法。创建 process/thread_pool.rs :
// in process/thread_pool.rs
use crate::process::scheduler::Scheduler;
use crate::process::structs::*;
use alloc::{ vec::Vec, boxed::Box };
struct ThreadInfo {
status: Status,
present: bool,
thread: Option<Box<Thread>>,
}
pub struct ThreadPool {
threads: Vec<Option<ThreadInfo>>, // 线程信号量的向量
scheduler: Box<Scheduler>, // 调度算法
}
Box 允许我们将一个值放在堆上而不是栈上,留在栈上的则是指向堆数据的指针。除了数据被储存在堆上而不是栈上之外,Box 没有额外的性能损失,不过也没有额外的功能。
让我们先来实现他的构造函数和向线程池中加入线程的功能:
// in process/thread_pool.rs
use crate::process::Tid;
impl ThreadPool {
pub fn new(size: usize, scheduler: Scheduler) -> ThreadPool {
ThreadPool {
threads: {
let mut th = Vec::new();
th.resize_with(size, Default::default);
th
},
scheduler: Box::new(scheduler),
}
}
fn alloc_tid(&self) -> Tid {
for (i, info) in self.threads.iter().enumerate() {
if info.is_none() {
return i;
}
}
panic!("alloc tid failed !");
}
pub fn add(&mut self, _thread: Box<Thread>) {
let tid = self.alloc_tid();
self.threads[tid] = Some(ThreadInfo{
status: Status::Ready,
present: true,
thread: Some(_thread),
});
self.scheduler.push(tid);
println!("tid to alloc: {}", tid);
}
}
构造函数规定了线程的最大数量 size 和调度算法。由于线程数组是已经创建好的,但是默认内容为 None ,所以在添加线程的时候只需要将从 Vec 中找到一个未使用的位置,把新线程的信息传递过去就可以了。同时,不要忘记为调度算法传入线程 id 。
调度器
由于创建的调度器是全局的,需要考虑一些安全问题和异步问题。为此需要对成员进行一些包装,不过这不影响实现思路。创建 processor.rs :
// in process/processor.rs
use core::cell::UnsafeCell;
use alloc::boxed::Box;
use crate::process::Tid;
use crate::process::structs::*;
use crate::process::thread_pool::ThreadPool;
pub struct ProcessorInner {
pool: Box<ThreadPool>,
idle: Box<Thread>,
current: Option<(Tid, Box<Thread>)>,
}
pub struct Processor {
inner: UnsafeCell<Option<ProcessorInner>>,
}
unsafe impl Sync for Processor {}
一个实现了 Sync trait 的类型可以安全的在多个线程中拥有其值的引用。因为他是标记 trait ,所以不需要手动实现。 UnsafeCell 内的元素不严格区分 immutable 和 mutable 。
调度器接口实现
这里先列出功能简单明了的几个函数:
// in process/processor.rs
impl Processor {
pub const fn new() -> Processor {
Processor {
inner: UnsafeCell::new(None),
}
}
pub fn init(&self, idle: Box<Thread>, pool: Box<ThreadPool> ) {
unsafe {
*self.inner.get() = Some(ProcessorInner{
pool,
idle,
current: None,
});
}
}
fn inner(&self) -> &mut ProcessorInner {
unsafe { &mut *self.inner.get() }
.as_mut()
.expect("Processor is not initialized")
}
pub fn add_thread(&self, thread: Box<Thread>) {
self.inner().pool.add(thread);
}
}
在进行线程切换时,为了防止因为中断引起线程切换出错,需要关闭中断,之后再恢复到原先的中断状态。这里我们先实现三个与中断控制相关的函数:
// in interrupt.rs
#[inline(always)]
pub fn enable_and_wfi() { // 使能中断并等待中断
unsafe {
asm!("csrsi sstatus, 1 << 1; wfi" :::: "volatile");
}
}
#[inline(always)]
pub fn disable_and_store() -> usize { // 禁用中断并返回当前中断状态
let sstatus: usize;
unsafe {
asm!("csrci sstatus, 1 << 1" : "=r"(sstatus) ::: "volatile");
}
sstatus & (1 << 1)
}
#[inline(always)]
pub fn restore(flags: usize) { // 根据 flag 设置中断
unsafe {
asm!("csrs sstatus, $0" :: "r"(flags) :: "volatile");
}
}
接下来开始实现三个与调度直接相关的重要函数。
其中调用了一些 ThreadPool 中尚未实现的函数,将于之后实现
run
这是整个调度过程最核心的函数,由 idle 线程调用。具体实现如下:
// in process/processor.rs
use crate::interrupt::{ disable_and_store, enable_and_wfi };
impl Processor {
pub fn run(&self) -> !{
let inner = self.inner();
// 关闭中断,防止此时产生中断异常导致线程切换出错。
disable_and_store();
// 循环从线程池中寻找可调度线程
loop {
// 如果存在需要被调度的线程
if let Some(thread) = inner.pool.acquire() {
inner.current = Some(thread);
// 切换至需要被调度的线程
inner.idle.switch_to(&mut *inner.current.as_mut().unwrap().1);
// 上一个线程已经结束或时间片用完,切换回 idle 线程
let (tid, thread) = inner.current.take().unwrap();
println!("thread {} ran just now", tid);
// 将上一个线程放回线程池中
inner.pool.retrieve(tid, thread);
} else {
// 开启中断并等待中断产生
enable_and_wfi();
// 关闭中断,从线程池中寻找可调度线程
disable_and_store();
}
}
}
}
tick
每产生一次时钟中断(即经过一个时钟周期),就需要通知线程池,让他通过调度算法判断是否需要切换线程:
// in process/mod.rs
static CPU: Processor = Processor::new();
pub fn tick() {
CPU.tick();
}
// in interrupt.rs
use crate::process::tick;
fn super_timer() {
clock_set_next_event();
unsafe{
TICK = TICK + 1;
if TICK % 100 == 0 {
println!("100 ticks!");
}
}
tick();
}
// in process/processor.rs
use crate::interrupt::restore;
impl Processor {
pub fn tick(&self) {
let inner = self.inner();
if !inner.current.is_none() {
if inner.pool.tick() {
let flags = disable_and_store();
inner
.current
.as_mut()
.unwrap()
.1
.switch_to(&mut inner.idle);
// 恢复原先的中断状态
restore(flags);
}
}
}
}
inner.pool.tick
会通知线程池和调度算法已经过了一个时钟周期,同时返回一个布尔值:是否需要进行线程切换。如果需要切换至其他线程,则先切换至 idle 线程,然后由 idle 进行调度(回到 Processer.run
)。
exit
当线程任务完成之后,就可以通过 Processor.exit
结束自己(结束当前线程):
// in process/processor.rs
impl Processor {
pub fn exit(&self, code: usize) -> ! {
let inner = self.inner();
let tid = inner.current.as_ref().unwrap().0;
// 通知线程池该线程即将退出
inner.pool.exit(tid, code);
// 切换至 idle 线程,进入调度
inner
.current
.as_mut()
.unwrap()
.1
.switch_to(&mut inner.idle);
loop {}
}
}
线程池接口实现
线程池接口的功能在前文已经提及,也可由函数名判断函数功能。具体实现也较为简单,所以直接给出实现:
// in process/thread_pool.rs
impl ThreadPool {
pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
if let Some(tid) = self.scheduler.pop() {
let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
thread_info.status = Status::Running(tid);
return Some((tid, thread_info.thread.take().expect("thread not exist ")));
} else {
return None;
}
}
pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread> ) {
let mut thread_info = self.threads[tid].as_mut().expect("thread not exist !");
if thread_info.present {
thread_info.thread = Some(thread);
thread_info.status = Status::Ready;
self.scheduler.push(tid);
}
}
pub fn tick(&mut self) -> bool {
// 通知调度算法时钟周期加一,询问是否需要调度
self.scheduler.tick()
}
pub fn exit(&mut self, tid: Tid, code: usize) {
self.threads[tid] = Some(ThreadInfo{
status: Status::Ready,
present: false,
thread: None,
});
self.scheduler.exit(tid);
println!("exit code: {}", code);
}
}
这里用到了 Option.take ,功能与所有权转移或浅拷贝相似
引入 Round Robin 调度算法
在 Cargo.toml 中加入:
RoundRobinScheduler = { path = "crate/RoundRobinScheduler" }
创建 process/scheduler.rs :
use crate::process::Tid;
use RoundRobinScheduler::RRScheduler;
pub struct Scheduler {
scheduler: RRScheduler,
}
impl Scheduler {
pub fn new(max_time_slice: usize) -> Scheduler {
let s = Scheduler {
scheduler: RRScheduler::new(max_time_slice),
};
s
}
pub fn push(&mut self, tid: Tid) {
self.scheduler.push(tid);
}
pub fn pop(&mut self) -> Option<Tid> {
self.scheduler.pop()
}
pub fn tick(&mut self) -> bool {
self.scheduler.tick()
}
pub fn exit(&mut self, tid: Tid) {
self.scheduler.exit(tid);
}
}
最后,由于 Processor.add_thread
需要 Box<Thread>
类型的参数,所以我们修改一下 struct Thread 的构造函数:
// in process/structs.rs
use alloc::boxed::Box;
use riscv::register::satp;
impl Thread {
pub fn new_idle() -> Box<Thread> {
unsafe {
Box::new(Thread {
context: Context::null(),
kstack: KernelStack::new(),
})
}
}
pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Box<Thread> {
unsafe {
let _kstack = KernelStack::new();
Box::new(Thread {
context: Context::new_kernel_thread(entry, arg, _kstack.top(), satp::read().bits()),
kstack: _kstack,
})
}
}
}
至此我们的调度器已经全部完成,让我们来测试一下他吧:
// in process/mod.rs
mod structs;
mod scheduler;
mod processor;
mod thread_pool;
use structs::Thread;
use alloc::boxed::Box;
use processor::Processor;
use thread_pool::ThreadPool;
use self::scheduler::Scheduler;
pub type Tid = usize;
pub type ExitCode = usize;
static CPU: Processor = Processor::new();
pub fn tick() {
CPU.tick();
}
pub fn init() {
println!("+------ now to initialize process ------+");
let scheduler = Scheduler::new(1);
let thread_pool = ThreadPool::new(100, scheduler);
CPU.init(Thread::new_idle(), Box::new(thread_pool));
let thread0 = Thread::new_kernel(hello_thread, 0);
CPU.add_thread(thread0);
let thread1 = Thread::new_kernel(hello_thread, 1);
CPU.add_thread(thread1);
let thread2 = Thread::new_kernel(hello_thread, 2);
CPU.add_thread(thread2);
let thread3 = Thread::new_kernel(hello_thread, 3);
CPU.add_thread(thread3);
let thread4 = Thread::new_kernel(hello_thread, 4);
CPU.add_thread(thread4);
CPU.run();
}
#[no_mangle]
pub extern "C" fn hello_thread(arg: usize) -> ! {
println!("hello thread");
println!("arg is {}", arg);
for i in 0..100 {
println!("{}{}{}{}{}{}{}{}", arg, arg, arg, arg, arg, arg, arg, arg);
for j in 0..1000 {
}
}
println!("end of thread {}", arg);
CPU.exit(0)
}
FIX runtime error: panicked at 'Processor is not initialized'
执行 make run
,发现kernel出现了运行时错误:
...
kernel_end:0x80c01000: kernel_size:0xc01000
+------ now to initialize process ------+
panicked at 'Processor is not initialized', /home/chyyuu/.rustup/toolchains/nightly-2019-03-05-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/option.rs:1038:5
qemu-system-riscv32: terminating on signal 15 from pid 31872 ()
从字面意思上看,是Processor
这个结构没有初始化!但仔细检查代码,在如下代码中有new
和记忆棒初始化的过程:
// in process/mod.rs
...
static CPU: Processor = Processor::new();
pub fn init() {
println!("+------ now to initialize process ------+");
let scheduler = Scheduler::new(1);
let thread_pool = ThreadPool::new(100, scheduler);
println!("+------ now to initialize processor ------+");
CPU.init(Thread::new_idle(), Box::new(thread_pool));
...
所以,不应该是这部分的问题。再进一步检查,在init.rs
中的rust_main
函数在调用process_init()
之前,有一些unsafe代码,怀疑是它造成的???。试着把代码删除。再执行 make run
,发现我们的调度器已经能够自动切换线程,线程来回切换也可以正常恢复原先的工作环境,并且在线程结束后能够正常结束退出。