如何解決“Future不能安全地在線程之間發(fā)送”的問題?
Rust應(yīng)用程序通常使用異步庫,如Tokio和Actix。這些庫為異步I/O和并行計算等提供了有力的支持。然而,不同的異步庫在一起使用時,有時會出現(xiàn)問題。
當(dāng)在Tokio運(yùn)行的異步函數(shù)中使用Actix client時,可能會發(fā)生“error: future不能安全地在線程之間發(fā)送”的錯誤,這在使用Tokio和Actix庫時是一個常見的問題。今天,我們來看看如何解決這個問題。
讓我們從一個簡單的代碼示例開始,它只適用于Actix,不會產(chǎn)生任何問題:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
let ret = client.get(url).send().await.unwrap().body().await.unwrap();
println!("{:?}", ret);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
在這段代碼中,我們使用Actix創(chuàng)建一個HTTP服務(wù)器,并使用Actix client向它發(fā)出GET請求。一切都很順利,但是當(dāng)我們試圖在Tokio運(yùn)行的異步函數(shù)中使用Actix client時,問題就開始了。
當(dāng)我們嘗試在Tokio運(yùn)行時中調(diào)用Actix client時,我們會遇到“error: future不能安全地在線程之間發(fā)送的錯誤。async block創(chuàng)建的future不是Send。類型 awc::Client 不是Send”。這是因為Actix client不是Send,這意味著它不能在線程之間安全地傳遞。
下面是導(dǎo)致此錯誤的示例代碼:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let r = tokio::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
client.get(url).send().await.unwrap().body().await.unwrap()
}).await.unwrap();
println!("{:?}", r);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}
為了解決這個問題并使代碼在Tokio中安全使用,我們可以使用來自Tokio的Oneshot機(jī)制。這種機(jī)制允許我們封裝Actix client的輸出,并在線程之間安全地傳遞它。
下面是用Oneshot用來解決這個問題的示例代碼:
use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main() {
actix_rt::spawn(async {
HttpServer::new(|| {
App::new()
.service(web::resource("/hello").route(web::get().to(ok)))
})
.bind("127.0.0.1:8080")?
.run()
.await
});
let (sender, receiver) = tokio::sync::oneshot::channel();
actix_rt::spawn(async move {
let client = Client::new();
let url = "http://127.0.0.1:8080/hello";
let _ = sender.send(client.get(url).send().await.unwrap().body().await.unwrap());
});
let r = tokio::spawn(async move {
receiver.await.unwrap()
}).await.unwrap();
println!("{:?}", r);
std::mem::forget(runtime);
}
async fn ok() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body("OK")
}