Implement storage upon opendal Operator

Signed-off-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
Xuanwo 2022-12-11 22:49:48 +08:00 коммит произвёл Sylvestre Ledru
Родитель 4401cd9d09
Коммит 467ebbe6db
2 изменённых файлов: 70 добавлений и 64 удалений

68
src/cache/cache.rs поставляемый
Просмотреть файл

@ -327,6 +327,57 @@ pub trait Storage: Send + Sync {
async fn max_size(&self) -> Result<Option<u64>>;
}
/// Implement storage for operator.
#[cfg(any(feature = "s3", feature = "azure"))]
#[async_trait]
impl Storage for opendal::Operator {
async fn get(&self, key: &str) -> Result<Cache> {
match self.object(&normalize_key(key)).read().await {
Ok(res) => {
let hit = CacheRead::from(io::Cursor::new(res))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got AWS error: {:?}", e);
Ok(Cache::Miss)
}
}
}
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let start = std::time::Instant::now();
self.object(&normalize_key(key))
.write(entry.finish()?)
.await?;
Ok(start.elapsed())
}
fn location(&self) -> String {
let meta = self.metadata();
format!(
"{}, bucket: {}, prefix: {}",
meta.scheme(),
meta.name(),
meta.root()
)
}
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
}
/// Normalize key `abcdef` into `a/b/c/abcdef`
fn normalize_key(key: &str) -> String {
format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key)
}
/// Get a suitable `Storage` implementation from configuration.
#[allow(clippy::cognitive_complexity)] // TODO simplify!
pub fn storage_from_config(config: &Config, pool: &tokio::runtime::Handle) -> Arc<dyn Storage> {
@ -463,14 +514,14 @@ pub fn storage_from_config(config: &Config, pool: &tokio::runtime::Handle) -> Ar
CacheType::S3(ref c) => {
debug!("Trying S3Cache({:?})", c);
#[cfg(feature = "s3")]
match pool.block_on(S3Cache::new(
match S3Cache::build(
&c.bucket,
c.region.as_deref(),
&c.key_prefix,
c.no_credentials,
c.endpoint.as_deref(),
c.use_ssl,
)) {
) {
Ok(s) => {
trace!("Using S3Cache");
return Arc::new(s);
@ -486,3 +537,16 @@ pub fn storage_from_config(config: &Config, pool: &tokio::runtime::Handle) -> Ar
trace!("Using DiskCache({:?}, {})", dir, size);
Arc::new(DiskCache::new(&dir, size, pool))
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_normalize_key() {
assert_eq!(
normalize_key("0123456789abcdef0123456789abcdef"),
"0/1/2/0123456789abcdef0123456789abcdef"
);
}
}

66
src/cache/s3.rs поставляемый
Просмотреть файл

@ -13,26 +13,21 @@
use opendal::services::s3;
use opendal::Operator;
use crate::cache::{Cache, CacheRead, CacheWrite, Storage};
use std::convert::TryInto;
use std::io;
use std::time::{Duration, Instant};
use crate::errors::*;
pub struct S3Cache {
client: Operator,
}
pub struct S3Cache;
impl S3Cache {
pub async fn new(
pub fn build(
bucket: &str,
region: Option<&str>,
key_prefix: &str,
no_credentials: bool,
endpoint: Option<&str>,
use_ssl: Option<bool>,
) -> Result<S3Cache> {
) -> Result<Operator> {
let mut builder = s3::Builder::default();
builder.bucket(bucket);
if let Some(region) = region {
@ -46,55 +41,10 @@ impl S3Cache {
builder.endpoint(&endpoint_resolver(endpoint, use_ssl)?);
}
Ok(S3Cache {
client: builder.build()?.into(),
})
Ok(builder.build()?.into())
}
}
#[async_trait]
impl Storage for S3Cache {
async fn get(&self, key: &str) -> Result<Cache> {
match self.client.object(&normalize_key(key)).read().await {
Ok(res) => {
let hit = CacheRead::from(io::Cursor::new(res))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got AWS error: {:?}", e);
Ok(Cache::Miss)
}
}
}
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let start = Instant::now();
self.client
.object(&normalize_key(key))
.write(entry.finish()?)
.await?;
Ok(start.elapsed())
}
fn location(&self) -> String {
format!("S3, bucket: {}", self.client.metadata().name())
}
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
}
fn normalize_key(key: &str) -> String {
format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key)
}
/// Resolve given endpoint along with use_ssl settings.
fn endpoint_resolver(endpoint: &str, use_ssl: Option<bool>) -> Result<String> {
let endpoint_uri: http::Uri = endpoint
@ -126,14 +76,6 @@ fn endpoint_resolver(endpoint: &str, use_ssl: Option<bool>) -> Result<String> {
mod test {
use super::*;
#[test]
fn test_normalize_key() {
assert_eq!(
normalize_key("0123456789abcdef0123456789abcdef"),
"0/1/2/0123456789abcdef0123456789abcdef"
);
}
#[test]
fn test_endpoint_resolver() -> Result<()> {
let cases = vec![