With tokio-compat being used it is now possible to use dependencies that
use tokio-0.2
This commit is contained in:
Markus Westerlind 2020-04-29 10:59:38 +02:00 коммит произвёл Nathan Froyd
Родитель c0209bb270
Коммит 34f391fe4f
3 изменённых файлов: 71 добавлений и 46 удалений

39
Cargo.lock сгенерированный
Просмотреть файл

@ -1918,19 +1918,22 @@ dependencies = [
[[package]]
name = "redis"
version = "0.9.1"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3"
checksum = "3eeb1fe3fc011cde97315f370bc88e4db3c23b08709a04915921e02b1d363b20"
dependencies = [
"bytes 0.4.12",
"bytes 0.5.4",
"combine",
"futures 0.1.29",
"dtoa",
"futures-executor",
"futures-util",
"itoa",
"percent-encoding 2.1.0",
"pin-project-lite",
"sha1",
"tokio-codec",
"tokio-executor",
"tokio-io",
"tokio-tcp",
"url 1.7.2",
"tokio 0.2.19",
"tokio-util",
"url 2.1.1",
]
[[package]]
@ -2627,8 +2630,12 @@ dependencies = [
"bytes 0.5.4",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"libc",
"memchr 2.3.3",
"mio",
"mio-uds",
"num_cpus",
"pin-project-lite",
"slab",
@ -2893,6 +2900,20 @@ dependencies = [
"tokio-reactor",
]
[[package]]
name = "tokio-util"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930"
dependencies = [
"bytes 0.5.4",
"futures-core",
"futures-sink",
"log 0.4.8",
"pin-project-lite",
"tokio 0.2.19",
]
[[package]]
name = "toml"
version = "0.4.10"

Просмотреть файл

@ -57,7 +57,7 @@ num_cpus = "1.0"
number_prefix = "0.2.5"
openssl = { version = "0.10", optional = true }
rand = "0.5"
redis = { version = "0.9.0", optional = true }
redis = { version = "0.15.0", optional = true }
regex = "1"
reqwest = { version = "0.9.11", optional = true }
retry = "0.4.0"

58
src/cache/redis.rs поставляемый
Просмотреть файл

@ -15,8 +15,8 @@
use crate::cache::{Cache, CacheRead, CacheWrite, Storage};
use crate::errors::*;
use futures::{future, Future};
use redis::r#async::Connection;
use futures_03::prelude::*;
use redis::aio::Connection;
use redis::{cmd, Client, InfoDict};
use std::collections::HashMap;
use std::io::Cursor;
@ -39,8 +39,8 @@ impl RedisCache {
}
/// Returns a connection with configured read and write timeouts.
fn connect(&self) -> impl Future<Item = Connection, Error = Error> {
self.client.get_async_connection().map_err(|e| e.into())
async fn connect(self) -> Result<Connection> {
Ok(self.client.get_async_connection().await?)
}
}
@ -50,15 +50,16 @@ impl Storage for RedisCache {
let key = key.to_owned();
let me = self.clone();
Box::new(
me.connect()
.and_then(|c| cmd("GET").arg(key).query_async(c).from_err())
.and_then(|(_, d): (_, Vec<u8>)| {
Box::pin(async move {
let mut c = me.connect().await?;
let d: Vec<u8> = cmd("GET").arg(key).query_async(&mut c).await?;
if d.is_empty() {
Ok(Cache::Miss)
} else {
CacheRead::from(Cursor::new(d)).map(Cache::Hit)
}
}),
})
.compat(),
)
}
@ -68,12 +69,13 @@ impl Storage for RedisCache {
let me = self.clone();
let start = Instant::now();
Box::new(
me.connect()
.and_then(move |c| {
future::result(entry.finish())
.and_then(|d| cmd("SET").arg(key).arg(d).query_async(c).from_err())
Box::pin(async move {
let mut c = me.connect().await?;
let d = entry.finish()?;
cmd("SET").arg(key).arg(d).query_async(&mut c).await?;
Ok(start.elapsed())
})
.map(move |(_, ())| start.elapsed()),
.compat(),
)
}
@ -85,11 +87,14 @@ impl Storage for RedisCache {
/// Returns the current cache size. This value is aquired via
/// the Redis INFO command (used_memory).
fn current_size(&self) -> SFuture<Option<u64>> {
let me = self.clone(); // TODO Remove clone
Box::new(
self.connect()
.and_then(|c| cmd("INFO").query_async(c).from_err())
.map(|(_, v)| v)
.map(|i: InfoDict| i.get("used_memory")),
Box::pin(async move {
let mut c = me.connect().await?;
let v: InfoDict = cmd("INFO").query_async(&mut c).await?;
Ok(v.get("used_memory"))
})
.compat(),
)
}
@ -97,20 +102,19 @@ impl Storage for RedisCache {
/// the Redis CONFIG command (maxmemory). If the server has no
/// configured limit, the result is None.
fn max_size(&self) -> SFuture<Option<u64>> {
let me = self.clone(); // TODO Remove clone
Box::new(
self.connect()
.and_then(|c| {
cmd("CONFIG")
Box::pin(async move {
let mut c = me.connect().await?;
let h: HashMap<String, usize> = cmd("CONFIG")
.arg("GET")
.arg("maxmemory")
.query_async(c)
.from_err()
.query_async(&mut c)
.await?;
Ok(h.get("maxmemory")
.and_then(|&s| if s != 0 { Some(s as u64) } else { None }))
})
.map(|(_, v)| v)
.map(|h: HashMap<String, usize>| {
h.get("maxmemory")
.and_then(|&s| if s != 0 { Some(s as u64) } else { None })
}),
.compat(),
)
}
}