Rust语言的多线程编程

时间:2022-06-10 05:33:54

我写这篇短文的时候,正值Rust1.0发布不久,严格来说这是一门兼具C语言的执行效率和Java的开发效率的强大语言,它的所有权机制竟然让你无法写出线程不安全的代码,它是一门可以用来写操作系统的系统级语言,如果说新一代编程语言是什么,那就Rust了。

下面我注重介绍Rust的多线程编程是怎样,其中大部分内容参考翻译自Rust的官方文档,请看:

Concurrency并发

在计算机科学上,并发Concurrency 和并行 parallelism是非常重要的话题,也是软件产业一个热门的话题。计算机CPU有了越来越多的的核,但很多程序员没有准备好充分利用它们。

Rust的内存安全特性也应用于并发。Rust程序必须内存安全,没有数据竞争。Rust的类型系统 很胜任这工作,很容易让你理解在编译时的并行代码。

在谈论Rust并发特色之前,了解一些东西很重要:Rust是一门足够低级的语言,所有这些都由标准库提供,而不是语言本身。这意味着如果你你不喜欢Rust处理并发的某些方面,你可以用其它方式来实现。 mio 就是这方面实践的一个真实的例子。

背景: Send  Sync

并发很难解释清楚。在Rust中,我们由一个强大的静态的类型系统来帮助我们理解我们的代码。Rust本身给我们两个特性,帮助我们实现并发编程。

Send

第一个类是 Send. 当类型 T 实现了 Send, 它告诉编译器这个类型的实例的所有权可以在多个线程之间安全传递。

实施强制的限制条件是很重要的。例如,如果我们由一个通道在两个线程之间,我们可能会想在两个线程之间传递数据。因此,我们要保证传递的数据类型要实现 Send 特性。

相反,如果我我们用FFI包裹了一个线程不安全的类库,我们不会去实现 Send, 编译器会帮助我们确保它不会离开当前线程。

Sync

第二个特性是 Sync. 当一个类型实现了 Sync, 它向编译器表明这个类型的数据在多线程并发是不可能导致内存的不安全。例如,由原子引用计数的不可变数据的共享是线程安全的。Rust提供了一个类型 Arc<T>, 它实现了 Sync, 因此它可以在线程之间共享。

这两个特性运行你使用类型系统来保证你代码在并发情况下的所有权。在解释为什么之前,让我们先创建一段并行的Rust代码。

线程Threads

Rust标准库中的线程,允许你并行的运行Rust代码。下面是一个使用了 std::thread的例子:

use std::thread;

fn main() {
thread::spawn(
|| {
println
!("Hello from a thread!");
});
}

 

 thread::spawn() 方法接受一个在新线程运行的闭包。spawn方法返回一个线程的处理对象,可以用来等待子线程结束和取得线程返回结果:

use std::thread;

fn main() {
let handle
=thread::spawn(|| {
"Hello from a thread!"
});

println
!("{}", handle.join().unwrap());
}

 

很多语言有执行多线程的能力,但是非常不安全。有很多关于如何防止共享状态数据导致错误的书籍。Rust通过在编译时防止数据竞争来解决这个问题。让我们谈论一下怎样真正地在线程之间安全地共享数据。

安全地共享可变状态

归功于Rust的类型系统,我们与一个看似谎言的概念:安全地共享可变状态。很多程序员都认同共享可变状态是非常非常糟糕的。

有人曾经说过:

共享可变状态是万恶的根源。大多数语言尝试从“可变”这个方向来解决这个问题,但Rust通过“共享”这方面来解决这个问题。

 ownership system 帮助我们防止错误地使用指针,同样也帮助我们排除数据竞争。数据竞争是并发编程中最恐怖地bug之一。

举例说明,下面是一个Rust程序,里面有一个其它语言经常会出现地数据竞争。但是在Rust中是无法编译通过地:

use std::thread;

fn main() {
let mut data
=vec![1u32, 2, 3];

for i in 0..3 {
thread::spawn(move
|| {
data[i]
+=1;
});
}

thread::sleep_ms(
50);
}

 

编译时,提示错误如下:

8:17 error: capture of moved value: `data`
data[i] += 1;
^~~~

在这种情况,从代码我们知道我们的代码应该是安全的,但是Rust不确定。事实上是不安全地,如果我们在多个线程中有 data 的引用,线程拿走了引用的所有权,我们就有了三个拥有者了。这是不行的。我们可以通过 Arc<T> 来改正,它是一个原子引用计数器指针。原子意思是在多线程共享是安全的。

Arc<T> 假定一个它的内容有多个所有权,但是仍然可以安全地共享。它假定它地内容是线程同步的。但在我们这中情况,我们向改变里面的数据。我们需要一个可以保证一次只能由一个线程改数据的类型。这个类型就是 Mutex<T> 。下面老师第二个版本的代码。虽然依然编译不通过,但是是不同原因:

use std::thread;
use std::sync::Mutex;

fnmain() {
let mut data
=Mutex::new(vec![1u32, 2, 3]);

for i in 0..
3 {
let data
=data.lock().unwrap();
thread::spawn(move
|| {
data[i]
+=1;
});
}

thread::sleep_ms(
50);
}

 

下面是错误信息:

<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11 thread::spawn(move || {
^~~~~~~~~~~~~

你看看, Mutex 由一个 lock 方法,方法的签名是:

fn lock(&self) ->LockResult<MutexGuard<T>>

因为   MutexGuard<T>没有实现Send,我们不可以不能在线程之间传输这个对象,所以报错。

我们可以使用 Arc<T> 来修正这个错误。下面是可以编译通过的版本:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let data
=Arc::new(Mutex::new(vec![1u32, 2, 3]));

for i in 0..
3 {
let data
=data.clone();
thread::spawn(move
|| {
let mut data
=data.lock().unwrap();
data[i]
+=1;
});
}

thread::sleep_ms(
50);
}

 

我们调用了Arc的 clone() 方法 ,增加了内部的引用计数。它们返回只移动到了一个新的线程。我们细看一下线程的主体:

thread::spawn(move|| {
let mut data
=data.lock().unwrap();
data[i]
+=1;
});

 

首先,我们调用 lock(), 取得了一个互斥锁。因为可能会失败,它返回一个结果Result<T, E>, 因为只是举例说明,我们直接调用 unwrap() 来获取data的一个引用。真实代码可能要写更全面的代码来作错误处理。因为我们现在有一个互斥锁了,所以可以*地改变数据。

最后,当线程运行,我们等一段时间,但是这是有点不切实际:等多久合适呢,很难猜测,这是程序运行CPU执行情况决定的。

一种更加精准的计时器是使用Rust标准库提供的机制来实现线程同步。让我们讲一下这中机制: channels.

通道Channels

下面代码是使用channel来同步,而不是漫无目的地等待:

 1 use std::sync::{Arc, Mutex};
2 use std::thread;
3 use std::sync::mpsc;
4
5 fn main() {
6 let data=Arc::new(Mutex::new(0u32));
7
8 let (tx, rx) =mpsc::channel();
9
10 for _ in 0..10 {
11 let (data, tx) = (data.clone(), tx.clone());
12
13 thread::spawn(move|| {
14 letmutdata=data.lock().unwrap();
15 *data+=1;
16
17 tx.send(());
18 });
19 }
20
21 for _ in 0..10 {
22 rx.recv();
23 }
24 }

 

我们使用mpsc::channel() 方法类构造一个channel。我们用10个线程分别向通道发送一个简单地() 然后在主线程接收。

send方法是泛型地,我们可以向通道发送任何类型地数据。

use std::thread;
use std::sync::mpsc;

fn main() {
let (tx, rx)
=mpsc::channel();

for _ in0..10 {
let tx
=tx.clone();

thread::spawn(move
|| {
let answer
=42u32;

tx.send(answer);
});
}

rx.recv().ok().expect(
"Could not receive answer");
}

 

一个 u32 数据被发送,因为我们可以复制一份。因此我们可以创建一个线程,叫它计算答案,然后通过将答案通过channel发送给我们。

Panics致命异常

一个 panic! 会使执行中地线程崩溃。你可以这样写:

use std::thread;

let result
=thread::spawn(move|| {
panic
!("oops!");
}).join();

assert
!(result.is_err());

 

我们的线程返回了一个结果,我们可以通过这个返回结果检查线程师父抛异常。