译者 | 弯月
译者 | 弯月 责编 | 欧阳姝黎
出品 | CSDN(ID:CSDNnews)
Rust的异步功能很强大,但也以晦涩难懂著称。在本文中,我将总结之前提过的一些想法,并给出一些新的点子,看看这些想法放在一起能产生什么效果。
本文只是一个思想实验。对Rust进行大改造很麻烦,因此我们需要一个精确的方法来找出优缺点,并确定某个改动是否值得。我知道一些观点会产生完全相反的看法,所以我建议你用一种开放的心态阅读本文。
在对Rust中实现异步的不同方式进行探索之前,我们应该首先了解何时应该使用异步编程。毕竟,异步编程并不像仅仅使用线程那么容易。那么异步的好处是什么?有人会说是性能原因,异步代码更快,因为线程的开销太大了。实际情况更复杂。根据具体情况不同,在以I/O为主的应用程序中使用线程有可能更快。例如,一个基于线程的echo服务器在并发数小于100的时候比异步更快。但在并发数超过100之后,线程的性能就会下降,但也不是急剧下降。
我认为,使用异步的更好的理由是可以更有效地针对复杂的流程控制进行建模。例如,如果不适用异步编程,那么暂停或取消一个正在进行的操作就会非常困难。而且,使用线程时,在各个连接之间进行协调需要使用同步原语,这就会导致竞争。使用异步编程,可以在同一个线程中对多个连接进行操作,从而避免了同步原语。
Rust的异步模型能够非常好地对复杂流程控制进行建模。例如,mini-redis的subscribe命令(
https://github.com/tokio-rs/mini-redis/blob/master/src/cmd/subscribe.rs#L94-L156)就非常精练、非常优雅。但异步也不是万能灵药。许多人都认为异步Rust的学习曲线非常复杂。尽管入门很容易,但很快就会遇到陡峭的曲线。很多人付出了很多努力,尽管有几个方面有待改进,但我相信,异步Rust最大的问题就在于会违反“最小惊讶原则”。
举个例子。同学A在学习Rust时阅读了Rust的教科书和Tokio的指南,打算写一个聊天服务器作为练习。他选了一个基于行的简单协议,将每一行编码,添加前缀表示行的长度。解析行的函数如下:
let len = socket.read_u32.await?;
let mut line = vec![0; len];
socket.read_exact(&mut line).await?;
let line = str::from_utf8(line)?;
Ok(line)
}
这段代码除了async和await关键字之外,跟阻塞的Rust代码没有什么两样。尽管同学A从来没有写过Rust,但阅读并理解这个函数完全没问题,至少从他自己的角度看如此。在本地测试时,聊天服务器似乎也能正常工作,于是他给同学B发送了一个链接。但很不幸,在进行了一些聊天后,服务器崩溃了,并返回了“invalid UTF-8”的错误。同学A很迷惑,他检查了代码,但并没有发现什么错误。
那么问题在哪儿?似乎该任务在调用栈的更高层的位置使用了一个select!:
loop {
select! {
line_in = parse_line(&socket) => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv => {
write_line(&socket, line_out).await;
}
}
}
假设channel上收到了一条消息,而此时parse_line在等待更多数据,那么select!就会放弃parse_line操作,从而导致丢失解析中的状态。在后面的循环迭代中,parse_line再次被调用,从一帧的中间开始,从而导致读入了错误数据。
问题在此:任何Rust异步函数都可能被调用者随时取消,而且与阻塞Rust不同,这里的取消是一个常见的异步操作。更糟糕的是,没有任何新手教程提到了这一点。
如果能改变这一点,让异步Rust每一步的行为符合初学者预期呢?如果行为必须根据预期得到,那么必然有一个能接受的点,为初学者指引正确的方向。此外,我们还希望最大程度地减少学习过程中的意料之外,特别是刚开始的时候。
我们先来改变意料之外的取消问题,即让异步函数总是能够完成执行。当future能够保证完成后,同学A发现异步Rust的行为跟阻塞Rust完全相同,只不过是多了两个关键字async和await而已。生成新任务会增加并发,也会增加任务之间的协调通道数量。select!不再能够接受任意异步语句,而只能与通道或类似通道的类型(例如JoinHandle)一起使用。
使用能保证完成的future后,同学A的聊天服务器如下:
async fn handle_connection(socket: TcpStream, channel: Channel) {
let reader = Arc::new(socket);
let writer = reader.clone;
let read_task = task::spawn(async move {
while let Some(line_in) in parse_line(&reader).await {
broadcast_line(line_in);
}
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
res = read_task.join => {
// The connection closed, exit loop
break;
}
line_out = channel.recv => {
write_line(&writer, line_out).await;
}
}
}
}
这段代码与前面的示例很相似,但由于所有异步语句必然会完成,而且select!只接受类似于通道的类型,因此parse_line的调用被移动到了一个生成的任务中。select要求类似于通道的类型,这能够保证放弃丢失的分支是安全的。通道可以存储值,而且接收值是原子操作。丢失select的分支并不会导致取消时丢失数据。
如果写入时发生错误会怎样?现状下read_task会继续执行。然而,同学A希望它能出错,并优雅地关闭连接和所有任务。不幸的是,这里就会遇到设计上的难题。如果我们能够随时放弃任何异步语句,那么取消就非常容易了,只需要放弃future就可以。我们需要一种方法来取消正在执行的操作,因为这是使用异步编程的主要目的之一。为了实现这一点,JoinHandle提供了cancel方法:
async fn handle_connection(socket: TcpStream, channel: Channel) {
let reader = Arc::new(socket);
let writer = reader.clone;
let read_task = task::spawn(async move {
while let Some(line_in) in parse_line(&reader).await? {
broadcast_line(line_in)?;
}
Ok()
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
_ = read_task.join => {
// The connection closed or we encountered an error,
// exit the loop
break;
}
line_out = channel.recv => {
if write_line(&writer, line_out).await.is_err {
read_task.cancel;
read_task.join;
}
}
}
}
}
但是cancel能做什么呢?它并不能立即终止任务,因为现在异步语句是保证能够执行完成的。但我们的确需要停止处理并尽快返回。相反,被取消的任务中的所有资源类型都应该停止执行,并返回“被中断”的错误。进一步的尝试也应该返回错误。这种策略与Kotlin很相似,只不过Kotlin会抛出异常而已。如果在任务取消时,read_task正在parse_line中等待socket.read_u32,那么read_u32函数会立即返回Err(
io::ErrorKind::Interrupted)。操作符?会在任务中向上冒泡,导致整个任务中断。
乍一看,这种行为非常像其他任务停止的行为,但其实不一样。对于同学A而言,当前的异步Rust的终止行为看起来就像任务不确定地发生挂起一样。如果能强制资源(例如套接字)在取消时返回错误,就能跟踪取消的流程。同学A可以添加println!语句或使用其他调试策略来调查什么导致了任务中断。
AsyncDrop
然而,同学A并不知道,他的聊天服务器使用了io-uring来避免了绝大部分系统调用。由于future能保证完成,再加上AsyncDrop,就可以透明底使用io-uring API。当同学A在handle_connection的末尾drop TcpStream时,套接字会异步地关闭。为了实现这一点,TcpStream的AsyncDrop实现如下:
impl AsyncDrop for TcpStream {
async fn drop(&mut self) {
self.uring.close(self.fd).await;
}
}
有人提出了一个绝妙的方法在traits中使用async(
https://hackmd.io/bKfiVPRpTvyX8JK_Ng2EWA?view)。唯一的问题就是如何处理隐含的.await点。目前,异步地等待一个future需要进行一次.await调用。而当一个值离开async上下文的范围时,编译器会为AsyncDrop trait添加一个隐藏的yield点。这个行为违反了最少意料之外原则。那么,既然其他的点都是明示的,为何此处需要一个隐含的await点?
解决“有时需要隐含drop”的问题的提议之一就是,要求使用明示的函数调用执行异步的drop:
my_tcp_stream.read(&mut buf).await?;
async_drop(my_tcp_stream).await;
当然,如果用户忘记调用async drop怎么办?毕竟,编译器在阻塞Rust中会自动处理drop,而且这是个非常强大的功能。而且,注意上述代码有一个小问题:?操作符在读取错误时会跳过async_drop调用。Rust编译器能对此问题给出警告,但怎么修复呢?有办法让?与明示的async_drop兼容吗?
如果不要求明示的async drop调用,而是去掉await关键字怎么样?同学A就不需要在调用异步函数(如socket.read_u32().await)之后使用.await了。在异步上下文中调用异步函数时,.await就变成了隐含的。
似乎这是如今Rust的一大进步。但我们依然可以对这个假设提出质疑。隐含的.await只能在异步语句中发生,因此它的应用比较有限,而且依赖于上下文。同学A只有通过查看函数定义,才能知道自己位于某个异步上下文中。此外,如果IDE能高亮显示某个yield点,就会非常方便。
去掉.await还有另一个好处:它能让异步Rust与阻塞Rust一直。阻塞的概念已经是隐含的了。在阻塞Rust中,我们并不会写my_socket.read(buffer).block?。当同学A编写异步聊天服务器时,他注意到的唯一区别就是必须用async关键字来标记函数。同学A可以凭直觉想象异步代码的执行。“懒future”的问题不再出现,而同学A也不能无意间做下面的事,并对先输出“two”的情况感到困惑。
async fn my_fn_one {
println!("one");
}
async fn my_fn_two {
println!("two");
}
async fn mixup {
let one = my_fn_one;
let two = my_fn_two;
join!(two, one);
}
.await的RFC中的确有一些对于隐含await的讨论。当时,反对隐含await的最有力的观点是,await调用正好标记了async语句可以被中止的点。如果采用保证完成的future,这个观点就不那么有力了。当然,对于可以安全中止的异步语句,我们还应该保留await关键字吗?这个问题需要一个答案。但无论如何,去掉“.await”是一个非常大的挑战,必须谨慎行事。需要进行易用性研究,表明其优点大于缺点才行。
到目前为止,同学A已经可以使用异步Rust构建聊天服务器,而且不需要学习太多新概念,也不会遇到无法预测的行为。他了解了select!,但编译器会强制在类似于通道的类型中进行选择。除此之外,同学A还给函数添加了async,而且运行良好。他把代码展示给同学B看,并询问是否需要将套接字放在一个Arc中。同学B建议他阅读一下带有作用域的任务(scoped tasks),借此避免分配。
带有作用域的任务等价于crossbeam的“带有作用域的线程”,只不过是异步的。这个任务可以通过生成者借用数据。同学A可以使用带有作用域的任务来避免在连接处理函数中使用Arc:
async fn handle_connection(socket: TcpStream, channel: Channel) {
task::scope(async |scope| {
let read_task = scope.spawn(async || {
while let Some(line_in) in parse_line(&socket)? {
broadcast_line(line_in)?;
}
Ok()
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
_ = read_task.join => {
// The connection closed or we encountered an error,
// exit the loop
break;
}
line_out = channel.recv => {
if write_line(&writer, line_out).is_err {
break;
}
}
}
}
});
}
保证安全的关键是要保证,作用域的生存周期要大于在该作用域范围内生成的所有任务,换句话说,确保异步语句能够完成。但有一个缺点。启用带有作用域的任务会使“Future::poll”变得不安全,因为必须对future的完成情况进行轮询,以保证内存安全性。这种不安全性会导致Future的实现更难。为了降低难度,我们需要尽可能避免用户自己实现Future,包括实现类似于AsyncRead、AsyncIterator等traits。我相信这是一个可以达到的目标。
除了带有作用域的任务之外,保证异步语句的完成,还可以在使用io-uring或与C++ future集成时,让指针能正确地从任务传递到内核。某些情况下,还可能在生成子任务时避免分配,对于某些嵌入式环境非常有用,尽管这种情况需要一个略微不同的API。
利用今天的异步Rust,应用程序可以通过利用select!或FutureUnordered生成新任务的方式增加并发。到目前为止,我们讨论了任务生成和select!。我建议去掉FuturesUnordered,因为它经常会导致bug。在使用FutureUnordered时,很容易认为生成的任务会在后台执行,然后出乎意料地发现这些任务不会有任何进展。
相反,我们可以利用带有作用域的任务实现类似的方案:
let greeting = "Hello".to_string;
task::scope(async |scope| {
let mut task_set = scope.task_set;
for i in 0..10 {
task_set.spawn(async {
println!("{} from task {}", greeting, i);
i
});
}
async for res in task_set {
println!("task completed {:?}", res);
}
});
每个生成的任务都会并发执行,从生成者那里借用数据,而TaskSet能提供一个类似于FuturesUnordered,但不会导致灾难的API。至于缓存流等其他原语也可以在带有作用域的任务上实现。
还可以在这些原语之上实现一些新的并发原语。例如,可以实现类似于Kotlin的结构化并发。之前有人曾讨论过这个问题(
https://github.com/tokio-rs/tokio/issues/1879),但异步Rust的当前模型无法实现这一点。而将异步Rust改为保证完成,就能解锁这一领域。
本文开头我说过,使用异步编程可以更有效地对复杂的流程控制进行建模。目前最有效的原语为select!。我还提议,将select!改为只接受类似于通道的类型,这样可以强制同学A为每个连接生成两个任务,实现读写的并发性。生成任务能防止在取消读操作的时候出现bug,还能重写读操作,以处理意料之外的取消。例如,mini-redis在解析帧的时候,我们首先将接收到的数据保存到缓冲区中。当读操作被取消时,位于缓冲区中的数据不会丢失。下次调用读操作会从中断的地方继续。因此Mini-redis的读操作对于中止是安全的(abort-safe)。
如果不将select!限制在类似于通道的类型上,而是将其限制在对于中止是安全的操作上,会怎样?从通道中接收数据是中止安全的,但从带有缓冲区的I/O处理函数中读取也是中止安全的。这里的关键是,不应该假设所有异步操作都是中止安全的,而是应该要求开发者向函数定义中添加#[abort_safe](或async(abort))。这种策略有几个好处。首先,当同学A学习异步Rust时,它不需要知道任何有关安全性的概念。即使不理解这个概念,仅通过生成任务来获得并发性,也可以实现一切:
#[abort_safe]
async fn read_line(&mut self) -> io::Result<Option<String>> {
loop {
// Consume a full line from the buffer
if let Some(line) = self.parse_line? {
return Ok(line);
}
// Not enough data has been buffered to parse a full line
if 0 == self.socket.read_buf(&mut self.buffer)? {
// The remote closed the connection.
if self.buffer.is_empty {
return Ok(None);
} else {
return Err("connection reset by peer".into);
}
}
}
}
不再默认要求中止安全语句,而是由开发者自行标注。这种自行标注的策略符合撤销安全性的模式。当新的开发者阅读代码时,这个标注会告诉他们该函数必须保证中止安全。rust编译器甚至可以对于标注了#[abort_safe]的函数提供额外的检查和警告。
现在同学A可以在select!的循环中使用read_line了:
loop {
select! {
line_in = connection.read_line? => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv => {
connection.write_line(line_out)?;
}
}
}
#[abort_safe]注释引入了两个异步语句的变种。混合使用中止安全和非中止安全需要特别考虑。不论从中止安全还是从非中止安全的上下文中,都可以调用一个中止安全的函数。然而,Rust编译器会阻止从中止安全的上下文中调用非中止安全的函数,并提供一个有帮助的错误信息:
async fn must_complete { ... }
#[abort_safe]
async fn can_abort {
// Invalid call => compiler error
must_complete;
}
async fn must_complete { ... }
#[abort_safe]
async fn can_abort {
// Valid call
spawn(async { must_complete }).join;
}
开发者可以通过生成新任务的方式,从非中止安全函数中获得中止安全的上下文。
异步语句的两个新变种会增加语言的复杂性,但这个复杂性仅在学习曲线的后期才出现。在刚开始学习Rust时,默认的异步语句是非中止安全的。从这个上下文中,学习者可以不用关心中止安全性而直接调用异步函数。中止安全会在异步Rust的教程的后期作为一个可选的话题出现。
从目前的默认要求中止安全的异步模型改变成保证完成的模型,需要一个全新的Rust版本。处于讨论的目的,我们假设Rust 2026版引入了该变动。那么该版本中的Future trait将改变为保证完成的future,因此无法与老版本的trait兼容。相反,2026版中的旧trait将改名为AbortSafeFuture(名称暂定)。
在2026版中,给异步语句添加#[abort_safe]会生成一个AbortSafeFuture实现,而不是Future。2026之前版本中编写的任何异步函数都实现了AbortSafeFuture trait,因此任何已有的异步代码都能与新版本兼容(别忘了,中止安全的函数可以从任何上下文中调用)。
我讨论了Rust可能出现的一些改动。简单地总结一下:
异步函数保证完成
去掉await关键字
引入#[abort_safe]标注,表示异步函数是中止安全的
限制select!,仅接受中止安全的分支
取消已生成的任务的方式是禁止资源完成
支持带有作用域的任务
我相信,这些改动可以极大地简化Rust异步,尽管进行这些改动会影响现状。在进行决定之前,我们还需要更多数据。如今的异步代码有多少是中止安全的?我们能否进行易用性研究,以评价这些改动的好处?Rust拥有两种风格的异步语句,会带来多少认知上的困难?
我也希望本文能抛砖引玉,期待其他人能提出别的观点。现在Rust需要许多观点来决定其未来。
原文链接:
https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/
声明:本文由CSDN翻译,转载请注明来源。