一、问题:mpsc如何发送各类不同的函数?
use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {
f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{
a+b
}
fn doit(amount: f32,code:String){
println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){
println!("amount:{:?}",amount);
}
fn test(){
process(2.0, "b".into(), doit);
let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));
fn1();
let fn2: Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| doit(amount,code));
fn2(2.0,"hello john!".into());
let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));
fn3(2.0,"hello rose!".into());
let fn4: Box<dyn FnOnce()> = Box::new(move ||workit(3.0));
fn4();
let fn5: Box<dyn FnOnce()> = Box::new(move ||println!("hello world!"));
fn5();
// 带类型返还的结构
let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));
fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){
//test();
let (sender, receiver) = mpsc::channel::<box_fn>();
let receiver = Arc::new(Mutex::new(receiver));
// 注意:||中均没有参数,故是FnOnce(); 具体参考test()
let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),
Box::new(move ||workit(3.0)),
Box::new(move || doit(2.0,"hello cat!".into()))];
for f in vec_fn {
sender.send(f).unwrap();
}
let rx = receiver.clone();
let handle = thread::spawn(move ||{
loop{
let box_fn = rx.lock().unwrap().recv().unwrap() ;
println!("from son thread!");
box_fn();
}
});
println!("main thread sended all box_fn!");
handle.join().unwrap();
}
二、线程池的应用:发送函数有什么用处?
如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。
在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。
use std::sync::atomic::AtomicU32;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
pub struct ThreadPool {
threads_num: usize,
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
send_num: Arc<Mutex<usize>>,
}
enum StrategyFn
{
T(Box<dyn FnOnce() + Send +'static>),
U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),
W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}
//fn get_fn(strategy_fn: StrategyFn) {}
enum Message{
Task(Task),
Shutdown,
}
struct Task{
job :Job,
task_id :usize,
}
type Job = Box<dyn FnOnce() + Send +'static>;
impl ThreadPool {
pub fn new(threads_num: usize) -> ThreadPool {
assert!(threads_num > 0);
let (sender, receiver) = mpsc::channel::<Message>();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(threads_num);
let send_num = Arc::new(Mutex::new(0_usize));
let execute_num = Arc::new(Mutex::new(0_usize));
for id in 0..threads_num {
let mut worker = Worker::new(id);
worker.run(&receiver, &execute_num);
workers.push(worker);
}
ThreadPool { threads_num,workers, sender,send_num}
}
pub fn execute(&mut self, task_id:usize,strategy_fn: StrategyFn)
{
let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});
match strategy_fn{
StrategyFn::U(f, input1, input2) => {
job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));
},
StrategyFn::T(f) =>{
job = Box::new(move || f());
}
StrategyFn::W(f,input1,input2)=> {
job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));
},
}
let task = Task{job:job,task_id:task_id};
let mut send_num = self.send_num.lock().unwrap();
*send_num = *send_num +1;
self.sender.send(Message::Task(task)).unwrap();
}
}
pub struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
pub fn new(id: usize)-> Self{
Worker {
id:id,
thread: None,
}
}
pub fn run(&mut self, receiver: &Arc<Mutex<mpsc::Receiver<Message>>>,execute_num: &Arc<Mutex<usize>>){
let id = self.id;
let receiver = receiver.clone();
let execute_num = execute_num.clone();
let thread = thread::spawn(move || loop {
let msg = receiver.lock().unwrap().recv().unwrap();
match msg{
Message::Task(task) => {
println!("Worker {} got a task; executing task {}.", id,task.task_id);
let mut execute_num = execute_num.lock().unwrap();
*execute_num = *execute_num + 1;
println!("work {} = >execute_num: {}", id,*execute_num);
(task.job)();
},
Message::Shutdown => {
println!("Worker {} received shutdown message.", id);
break;//很关键
}
}
});
self.thread = Some(thread);
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for _ in 0..self.threads_num{
self.sender.send(Message::Shutdown).unwrap();
}
println!("drop worker :{:?}",self.workers.len());
for (i,worker) in (&mut self.workers).into_iter().enumerate() {
println!("------Shutting down worker {} i:{} -----------", worker.id,i);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
macro_rules! create_strategy{
($($s:ident),*) => (
$(
pub fn $s(_a:f32,_b:f32){
println!("run strategy {:?}",stringify!($s));
//thread::sleep(Duration::from_millis(1));
}
)*
);
}
create_strategy!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
pub fn main() {
let mut pool = ThreadPool::new(2);
let input1 = Arc::new(1.0);
let input2 = Arc::new(2.0);
let strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];
println!("strategies num : {:?}",strategies.len());
for (task_id,strategy) in strategies.into_iter().enumerate(){
let strategy_fn = StrategyFn::U(Box::new(strategy),input1.clone(), input2.clone());
pool.execute(task_id as usize,strategy_fn);
}
println!("send_num : {:?}",*pool.send_num.lock().unwrap());
}
输出:
strategies num : 17
send_num : 17
drop worker :2
------Shutting down worker 0 i:0 -----------
Worker 0 got a task; executing task 0.
work 0 = >execute_num: 1
Worker 1 got a task; executing task 1.
run strategy "A"
Worker 0 got a task; executing task 2.
work 1 = >execute_num: 2
run strategy "B"
work 0 = >execute_num: 3
run strategy "C"
Worker 0 got a task; executing task 4.
work 0 = >execute_num: 4
run strategy "E"
Worker 0 got a task; executing task 5.
work 0 = >execute_num: 5
run strategy "F"
Worker 0 got a task; executing task 6.
work 0 = >execute_num: 6
run strategy "G"
Worker 0 got a task; executing task 7.
work 0 = >execute_num: 7
run strategy "H"
Worker 0 got a task; executing task 8.
work 0 = >execute_num: 8
run strategy "I"
Worker 0 got a task; executing task 9.
work 0 = >execute_num: 9
run strategy "J"
Worker 0 got a task; executing task 10.
work 0 = >execute_num: 10
run strategy "K"
Worker 0 got a task; executing task 11.
work 0 = >execute_num: 11
run strategy "L"
Worker 0 got a task; executing task 12.
work 0 = >execute_num: 12
run strategy "M"
Worker 0 got a task; executing task 13.
work 0 = >execute_num: 13
run strategy "N"
Worker 0 got a task; executing task 14.
work 0 = >execute_num: 14
run strategy "O"
Worker 0 got a task; executing task 15.
work 0 = >execute_num: 15
run strategy "P"
Worker 0 got a task; executing task 16.
work 0 = >execute_num: 16
run strategy "Q"
Worker 0 received shutdown message.
Worker 1 got a task; executing task 3.
work 1 = >execute_num: 17
run strategy "D"
------Shutting down worker 1 i:1 -----------
Worker 1 received shutdown message.