rust async

发布时间 2023-11-04 14:05:12作者: liuzonglin

async

  • SendSync 在跨线程时需要关注

    • Send: ownership(所有权) 可以 send 到其他线程
      • A type is Send if it is safe to send it to another thread.
    • Sync: 可以并发,无线程安全问题
      • A type is Sync if it is safe to share between threads (T is Sync if and only if &T is Send).
  • Why import: 因为有些异步执行引擎时可以多线程执行的(比如 Tokio)

  • Rc<T>: !Send, !Sync

    use std::rc::Rc;
    
    #[derive(Debug)]
    struct Handler;
    
    #[tokio::main]
    async fn main() {
        let handler = Rc::new(Handler);
        let handler_clone = Rc::clone(&handler);
        tokio::spawn(process(handler_clone));
    }
    
    async fn process(_handler: Rc<Handler>) {
        unimplemented!()
    }
    

    Rc 不是 Send

    future returned by process is not Send

    error: future cannot be sent between threads safely
    --> src\main.rs:10:18
        |
    10  |     tokio::spawn(process(handler_clone));
        |                  ^^^^^^^^^^^^^^^^^^^^^^ future returned by `process` is not `Send`
        |
        = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<Handler>`
    note: captured value is not `Send`
    --> src\main.rs:13:18
        |
    13  | async fn process(handler: Rc<Handler>) {
        |                  ^^^^^^^ has type `Rc<Handler>` which is not `Send`
    note: required by a bound in `tokio::spawn`
    --> C:\Users\liuzonglin\.cargo\registry\src\mirrors.ustc.edu.cn-61ef6e0cd06fb9b8\tokio-1.29.1\src\task\spawn.rs:166:21
        |
    164 |     pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
        |            ----- required by a bound in this function
    165 |     where
    166 |         T: Future + Send + 'static,
        |                     ^^^^ required by this bound in `spawn`
    
  • Arc<T>: Send & Sync if : T is Send & Sync

    Rc 替换 Arc

    示例:_01_example.rs

    use std::sync::Arc;
    
    #[derive(Debug)]
    struct Handler;
    
    #[tokio::main]
    async fn main() {
        let handler = Arc::new(Handler);
        let handler_clone = Arc::clone(&handler);
        tokio::spawn(process(handler_clone));
    }
    
    async fn process(_handler: Arc<Handler>) {
        unimplemented!()
    }
    

async fn

  • 异步函数语法:
    • async fn do_something() {/* ... */}
    • async fn 返回的是 Future,Future 需要由一个执行者来运行
  • futures::executor::block_on;
    • block_on 阻塞当前线程,直到提供的 Future 运行完成
    • 其它执行者提供更复杂的行为,例如将多个 Future 安排到同一个线程上

示例:async_example_01_.rs

use futures::executor::block_on;
use std::future::Future;

async fn hello_world1() {   // 等同于 hello_world2
    println!("你好,世界!");
}

fn hello_world2() -> impl Future<Output = ()> {
    async {
        println!("你好,世界!");
    }
}

fn main() {
    let fu1 = hello_world1();
    let fu2 = hello_world2();
    block_on(fu1); // 运行 `fu1` 并打印 “你好,世界!”
    block_on(fu2); // 运行 `fu2` 并打印 “你好,世界!”
}

block_on 阻塞当前线程,直到提供的未来完成执行。其他执行器提供更复杂的行为,例如将多个 future 调度到相同的 thread 上。

Await

  • 在 async fn 中,可以使用 .await 来等待另一个实现 Future trait 的完成
  • 与 block_on 不同,.await不会阻塞当前线程,而是异步的等待 Future 的完成(如果该 Future 目前无法取得进展,就允许其他任务执行)
use futures::executor::block_on;

async fn learn_and_sing() {
    // 在唱歌之前等待歌曲被学习。
    // 我们在这里使用`.await`而不是`block_on`,是为了防止阻塞线程,这样我们就可以在同时跳舞。
    let str = "hello";
    let song = learn_song(str).await;
    sing_song(song).await;
}

async fn dance() {
    // 随着歌曲的节奏跳舞。
}

async fn learn_song(song: &'static str) -> String {
    // 学习这首歌的歌词。
    // 这是一个简单示例,实际上,您可能希望使用异步 API 来获取歌词。
    let lyrics = song.to_string();
    lyrics
}

async fn sing_song(song: String) {
    // 唱这首歌的歌词。
    // 这是一个简单示例,实际上,您可能希望使用异步 API 来播放音乐。
    println!("Singing: {}", song);
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` 像 `.await` 一样,但可以同时等待多个未来。
    // 如果我们在 `learn_and_sing` 未来中暂时阻塞,那么 `dance`
    // 未来将接管当前线程。如果 `dance` 变得阻塞,
    // `learn_and_sing` 可以重新接管。如果两个未来都被阻塞,
    // 那么 `async_main` 就被阻塞,并会交给执行器。
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

learn_and_sing 的异步函数和一个名为 async_main 的异步主函数。learn_and_sing 函数异步地学习一首歌并演唱,dance 函数用于跳舞。async_main 函数启动了 learn_and_singdance 两个异步函数,并使用 futures::join! 宏等待这两个函数的完成。
main 函数是程序的入口点,它使用 block_on 宏阻塞当前线程,等待 async_main 函数的完成。

充分利用多核 CPU 的性能。在异步编程中,任务可以在不同的线程之间切换执行,从而提高程序的性能和响应能力。

示例:async_example_02_.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in tcp_listener.incoming() {
        let stream = stream.unwrap();
        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

~> curl -v -X GET http://127.0.0.1:7878/random_url
Note: Unnecessary use of -X or --request, GET is already inferred.
*   Trying 127.0.0.1:7878...
* Connected to 127.0.0.1 (127.0.0.1) port 7878 (#0)
> GET /random_url HTTP/1.1
> Host: 127.0.0.1:7878
> User-Agent: curl/8.0.1
> Accept: */*
>
< HTTP/1.1 404 NOT FOUND
* no chunk, no close, no size. Assume close to signal end
<
, <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>* Closing connection 0

~> curl -X GET http://127.0.0.1:7878/random_url
, <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>

并发地处理连接

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    tcp_listener
        .incoming()
        .for_each_concurrent(None, |tcp_stream| async move {
            let tcp_stream = tcp_stream.unwrap();
            handle_connection(tcp_stream).await;
        })
        .await
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

使用多线程并行处理请求

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    tcp_listener
        .incoming()
        .for_each_concurrent(None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            spawn(handle_connection(tcpstream));
        })
        .await;
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

use async_std::io::{Read, Write};
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::marker::Unpin;
use std::time::Duration;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener
        .incoming()
        .for_each_concurrent(None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            spawn(handle_connection(tcpstream));
        })
        .await;
}

async fn handle_connection(mut stream: impl Read + Write + Unpin) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::io::Error;
    use futures::task::{Context, Poll};

    use std::cmp::min;
    use std::pin::Pin;

    struct MockTcpStream {
        read_data: Vec<u8>,
        write_data: Vec<u8>,
    }

    impl Read for MockTcpStream {
        fn poll_read(
            self: Pin<&mut Self>,
            _: &mut Context,
            buf: &mut [u8],
        ) -> Poll<Result<usize, Error>> {
            let size: usize = min(self.read_data.len(), buf.len());
            buf[..size].copy_from_slice(&self.read_data[..size]);
            Poll::Ready(Ok(size))
        }
    }

    impl Write for MockTcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _: &mut Context,
            buf: &[u8],
        ) -> Poll<Result<usize, Error>> {
            self.write_data = Vec::from(buf);

            Poll::Ready(Ok(buf.len()))
        }

        fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }

        fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }
    }

    use std::marker::Unpin;
    impl Unpin for MockTcpStream {}

    use std::fs;

    #[async_std::test]
    async fn test_handle_connection() {
        let input_bytes = b"GET / HTTP/1.1\r\n";
        let mut contents = vec![0u8; 1024];
        contents[..input_bytes.len()].clone_from_slice(input_bytes);
        let mut stream = MockTcpStream {
            read_data: contents,
            write_data: Vec::new(),
        };

        handle_connection(&mut stream).await;
        let mut buf = [0u8; 1024];
        stream.read(&mut buf).await.unwrap();

        let expected_contents = fs::read_to_string("hello.html").unwrap();
        let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
        assert!(stream.write_data.starts_with(expected_response.as_bytes()));
    }
}