哈哈,主要是为了练习一下rust的语法,不喜勿喷。
一.Executor申明
struct AExecutor<T> {
results:Arc<Mutex<HashMap<u32,T>>>, //1
functions:Arc<Mutex<Vec<ATask<T>>>> //2
}
1.results:用来存放结果,其中u32是一个future的id,T表示存放的数据
2.functions: 存放所有submit的任务
二.Task申明
struct ATask<T> {
func:Box<dyn Fn()-> T + 'static + Send>, //1
cond:Arc<Condvar>,//2
id:u32//3
}
1.func:任务对应的lambda函数
2.cond:用来唤醒客户端(调用future.get等待结果的客户端)
3.id:future对应的id
三.Future申明
struct AFuture<T> {
id:u32, //1
map:Arc<Mutex<HashMap<u32,T>>>,//2
cond:Arc<Condvar>//3
}
1.id:这个future的id,通过这个id可以在map中找到结果
2.map:存放结果的map
3.cond:用来等待计算完成的condition,和Task结构体中的cond是同一个,这样task完成后就能唤醒等待了。
四.Future实现
impl <T>AFuture<T> {
fn get(&self)-> T {
loop {
let mut map = self.map.lock().unwrap();
let value = map.remove(&self.id);
match value {
Some(v) => {return v;}
None=> {
self.cond.wait(map);
}
}
}
}
}
只有一个get函数,如果有数据,就返回该数据,否则就等待唤醒
五.Executor提交任务
fn submit(&self,func:Box<dyn Fn()-> T + 'static + Send>)->AFuture<T> {
let cond = Arc::new(Condvar::new()); //1
let id:u32 = 1;
let task = ATask{
func:func,
cond:cond.clone(),
id:id.clone()
};
{
let mut ll = self.functions.lock().unwrap();
ll.push(task); //2
}
AFuture {
id:id.clone(),
map:self.results.clone(),
cond:cond.clone(),
}//3
}
1.创建一个Condition,用智能指针控制,这样计算和等待的线程都能使用。这个Condition会同时存放到task/future中。
2.将task存放到任务队列,计算线程就是从这个队列中获取任务的。
3.返回future,cond就是步骤1创建的cond
六:Executor执行任务
fn work(&self) {
loop {
println!("work start");
thread::sleep(Duration::from_secs(10)); //1
let mut ll: std::sync::MutexGuard<'_, Vec<ATask<T>>> = self.functions.lock().unwrap();
println!("len is {}",ll.len());
if ll.len() == 0 {
continue;
}
let task = ll.pop().unwrap(); //2
drop(ll);//3
let result = (task.func)();
{
let mut results = self.results.lock().unwrap();
results.insert(task.id,result);//4
}
task.cond.notify_all();//5
}
}
1.这里用来管理任务的队列没有做成blocking的,所以暂时用一个sleep代替
2.从任务队列里面取出task
3.释放ll(lockguard),这样锁就释放了
4.将结果存放到结果的表里
5.唤醒等待线程
运行结果: