refactor: Integrate with opendal for s3 (#1412)

Signed-off-by: Xuanwo <github@xuanwo.io>
Co-authored-by: Sylvestre Ledru <sylvestre@debian.org>
This commit is contained in:
Xuanwo 2022-12-11 20:23:27 +08:00 коммит произвёл GitHub
Родитель a31cf6491e
Коммит 3716030d47
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 298 добавлений и 502 удалений

580
Cargo.lock сгенерированный
Просмотреть файл

@ -8,6 +8,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom 0.2.8",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.20"
@ -97,6 +108,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-compat"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d"
dependencies = [
"futures-core",
"futures-io",
"once_cell",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-lock"
version = "2.6.0"
@ -136,330 +160,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
version = "0.51.0"
name = "backon"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56a636c44c77fa18bdba56126a34d30cfe5538fe88f7d34988fa731fee143ddd"
checksum = "6cd1a59bc091e593ee9ed62df4e4a07115e00a0e0a52fd7e0e04540773939b80"
dependencies = [
"aws-http",
"aws-sdk-sso",
"aws-sdk-sts",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-json",
"aws-smithy-types",
"aws-types",
"bytes",
"hex",
"http",
"hyper",
"ring",
"time 0.3.17",
"futures",
"pin-project",
"rand 0.8.5",
"tokio",
"tower",
"tracing",
"zeroize",
]
[[package]]
name = "aws-endpoint"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ca8f374874f6459aaa88dc861d7f5d834ca1ff97668eae190e97266b5f6c3fb"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"http",
"regex",
"tracing",
]
[[package]]
name = "aws-http"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78d41e19e779b73463f5f0c21b3aacc995f4ba783ab13a7ae9f5dfb159a551b4"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"http-body",
"lazy_static",
"percent-encoding",
"pin-project-lite",
"tracing",
]
[[package]]
name = "aws-sdk-s3"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f08665c8e03aca8cb092ef01e617436ebfa977fddc1240e1b062488ab5d48a"
dependencies = [
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-sigv4",
"aws-smithy-async",
"aws-smithy-checksums",
"aws-smithy-client",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-types",
"aws-smithy-xml",
"aws-types",
"bytes",
"bytes-utils",
"http",
"http-body",
"tokio-stream",
"tower",
"tracing",
]
[[package]]
name = "aws-sdk-sso"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86dcb1cb71aa8763b327542ead410424515cff0cde5b753eedd2917e09c63734"
dependencies = [
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-json",
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"tokio-stream",
"tower",
]
[[package]]
name = "aws-sdk-sts"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdfcf584297c666f6b472d5368a78de3bc714b6e0a53d7fbf76c3e347c292ab1"
dependencies = [
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-query",
"aws-smithy-types",
"aws-smithy-xml",
"aws-types",
"bytes",
"http",
"tower",
]
[[package]]
name = "aws-sig-auth"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cbe7b2be9e185c1fbce27fc9c41c66b195b32d89aa099f98768d9544221308"
dependencies = [
"aws-sigv4",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-types",
"http",
"tracing",
]
[[package]]
name = "aws-sigv4"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03ff4cff8c4a101962d593ba94e72cd83891aecd423f0c6e3146bff6fb92c9e3"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-http",
"bytes",
"form_urlencoded",
"hex",
"http",
"once_cell",
"percent-encoding",
"regex",
"ring",
"time 0.3.17",
"tracing",
]
[[package]]
name = "aws-smithy-async"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b3442b4c5d3fc39891a2e5e625735fba6b24694887d49c6518460fde98247a9"
dependencies = [
"futures-util",
"pin-project-lite",
"tokio",
"tokio-stream",
]
[[package]]
name = "aws-smithy-checksums"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc227e36e346f45298288359f37123e1a92628d1cec6b11b5eb335553278bd9e"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"bytes",
"crc32c",
"crc32fast",
"hex",
"http",
"http-body",
"md-5",
"pin-project-lite",
"sha1",
"sha2",
"tracing",
]
[[package]]
name = "aws-smithy-client"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff28d553714f8f54cd921227934fc13a536a1c03f106e56b362fd57e16d450ad"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-types",
"bytes",
"fastrand",
"http",
"http-body",
"hyper",
"hyper-rustls",
"lazy_static",
"pin-project-lite",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "aws-smithy-eventstream"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7ea0df7161ce65b5c8ca6eb709a1a907376fa18226976e41c748ce02ccccf24"
dependencies = [
"aws-smithy-types",
"bytes",
"crc32fast",
]
[[package]]
name = "aws-smithy-http"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf58ed4fefa61dbf038e5421a521cbc2c448ef69deff0ab1d915d8a10eda5664"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-types",
"bytes",
"bytes-utils",
"futures-core",
"http",
"http-body",
"hyper",
"once_cell",
"percent-encoding",
"pin-project-lite",
"pin-utils",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "aws-smithy-http-tower"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20c96d7bd35e7cf96aca1134b2f81b1b59ffe493f7c6539c051791cbbf7a42d3"
dependencies = [
"aws-smithy-http",
"bytes",
"http",
"http-body",
"pin-project-lite",
"tower",
"tracing",
]
[[package]]
name = "aws-smithy-json"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8324ba98c8a94187723cc16c37aefa09504646ee65c3d2c3af495bab5ea701b"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-query"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83834ed2ff69ea6f6657baf205267dc2c0abe940703503a3e5d60ce23be3d306"
dependencies = [
"aws-smithy-types",
"urlencoding",
]
[[package]]
name = "aws-smithy-types"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b02e06ea63498c43bc0217ea4d16605d4e58d85c12fc23f6572ff6d0a840c61"
dependencies = [
"itoa",
"num-integer",
"ryu",
"time 0.3.17",
]
[[package]]
name = "aws-smithy-xml"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "246e9f83dd1fdf5d347fa30ae4ad30a9d1d42ce4cd74a93d94afa874646f94cd"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05701d32da168b44f7ee63147781aed8723e792cc131cb9b18363b5393f17f70"
dependencies = [
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-types",
"http",
"rustc_version",
"tracing",
"zeroize",
]
[[package]]
@ -477,6 +186,25 @@ dependencies = [
"serde",
]
[[package]]
name = "bincode"
version = "2.0.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bb50c5a2ef4b9b1e7ae73e3a73b52ea24b20312d629f9c4df28260b7ad2c3c4"
dependencies = [
"bincode_derive",
"serde",
]
[[package]]
name = "bincode_derive"
version = "2.0.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a45a23389446d2dd25dc8e73a7a3b3c43522b630cac068927f0649d43d719d2"
dependencies = [
"virtue",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -558,16 +286,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "bytes-utils"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1"
dependencies = [
"bytes",
"either",
]
[[package]]
name = "cache-padded"
version = "1.2.0"
@ -732,15 +450,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32c"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e"
dependencies = [
"rustc_version",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -837,6 +546,15 @@ dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
@ -859,6 +577,12 @@ dependencies = [
"syn",
]
[[package]]
name = "dlv-list"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
[[package]]
name = "doc-comment"
version = "0.3.3"
@ -950,6 +674,12 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "flagset"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499"
[[package]]
name = "flate2"
version = "1.0.25"
@ -1235,6 +965,9 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash",
]
[[package]]
name = "heck"
@ -1379,9 +1112,7 @@ checksum = "59df7c4e19c950e6e0e868dcc0a300b09a9b88e9ec55bd879ca819087a77355d"
dependencies = [
"http",
"hyper",
"log",
"rustls",
"rustls-native-certs",
"tokio",
"tokio-rustls",
]
@ -1845,6 +1576,39 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "opendal"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23c49b040a9a4357d22f0b5f637214ab759d8e19fea2d90f42844ac44249ea5b"
dependencies = [
"anyhow",
"async-compat",
"async-trait",
"backon",
"base64",
"bincode 2.0.0-rc.2",
"bytes",
"flagset",
"futures",
"http",
"log",
"md-5",
"once_cell",
"parking_lot",
"percent-encoding",
"pin-project",
"quick-xml",
"reqsign",
"reqwest",
"serde",
"serde_json",
"time 0.3.17",
"tokio",
"ureq",
"uuid",
]
[[package]]
name = "openssl"
version = "0.10.43"
@ -1890,6 +1654,16 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "ordered-multimap"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a"
dependencies = [
"dlv-list",
"hashbrown",
]
[[package]]
name = "os_str_bytes"
version = "6.4.1"
@ -2067,6 +1841,16 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.21"
@ -2220,6 +2004,35 @@ dependencies = [
"winapi",
]
[[package]]
name = "reqsign"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d544686c7cb5a31f085ae9f5c4e40a30dcba72d0b1d77656329dc4f90433cd6"
dependencies = [
"anyhow",
"backon",
"base64",
"bytes",
"dirs",
"form_urlencoded",
"hex",
"hmac",
"http",
"jsonwebtoken",
"log",
"once_cell",
"percent-encoding",
"quick-xml",
"rust-ini",
"serde",
"serde_json",
"sha1",
"sha2",
"time 0.3.17",
"ureq",
]
[[package]]
name = "reqwest"
version = "0.11.13"
@ -2235,6 +2048,7 @@ dependencies = [
"http",
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
@ -2245,11 +2059,15 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-native-certs",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
@ -2387,12 +2205,13 @@ dependencies = [
]
[[package]]
name = "rustc_version"
version = "0.4.0"
name = "rust-ini"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df"
dependencies = [
"semver 1.0.14",
"cfg-if 1.0.0",
"ordered-multimap",
]
[[package]]
@ -2486,12 +2305,8 @@ dependencies = [
"assert_cmd",
"async-trait",
"atty",
"aws-config",
"aws-sdk-s3",
"aws-sig-auth",
"aws-smithy-client",
"base64",
"bincode",
"bincode 1.3.3",
"blake3",
"byteorder",
"bytes",
@ -2526,6 +2341,7 @@ dependencies = [
"num_cpus",
"number_prefix",
"once_cell",
"opendal",
"openssl",
"parity-tokio-ipc",
"percent-encoding",
@ -2548,6 +2364,7 @@ dependencies = [
"tar",
"tempfile",
"thirtyfour_sync",
"time 0.3.17",
"tokio",
"tokio-serde",
"tokio-util",
@ -3142,17 +2959,6 @@ dependencies = [
"pin-project",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.3"
@ -3182,11 +2988,6 @@ version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
"tracing",
@ -3304,6 +3105,23 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "ureq"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97acb4c28a254fd7a4aeec976c46a7fa404eac4d7c134b30c75144846d7cb8f"
dependencies = [
"base64",
"chunked_transfer",
"log",
"once_cell",
"rustls",
"rustls-native-certs",
"url",
"webpki",
"webpki-roots",
]
[[package]]
name = "url"
version = "2.3.0"
@ -3316,12 +3134,6 @@ dependencies = [
"serde",
]
[[package]]
name = "urlencoding"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9"
[[package]]
name = "urlparse"
version = "0.7.3"
@ -3341,6 +3153,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [
"getrandom 0.2.8",
"serde",
]
[[package]]
@ -3361,6 +3174,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "virtue"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b60dcd6a64dd45abf9bd426970c9843726da7fc08f44cd6fcebf68c21220a63"
[[package]]
name = "void"
version = "1.0.2"
@ -3522,6 +3341,15 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.22.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
dependencies = [
"webpki",
]
[[package]]
name = "which"
version = "4.3.0"
@ -3704,12 +3532,6 @@ dependencies = [
"libc",
]
[[package]]
name = "xmlparser"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd"
[[package]]
name = "zeroize"
version = "1.5.7"

Просмотреть файл

@ -28,6 +28,7 @@ bincode = "1"
blake3 = "1"
byteorder = "1.0"
bytes = "1"
opendal = { version= "0.22", optional=true }
chrono = { version = "0.4.23", optional = true }
clap = { version = "3.2.22", features = ["derive", "env", "wrap_help"] }
directories = "4.0.1"
@ -72,6 +73,7 @@ tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"
tokio-serde = "0.8"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tower = "0.4"
time = "0.3.15"
toml = "0.5"
url = { version = "2", optional = true }
uuid = { version = "1.2", features = ["v4"] }
@ -90,11 +92,6 @@ syslog = { version = "6", optional = true }
void = { version = "1", optional = true }
version-compare = { version = "0.1.1", optional = true }
aws-config = { version = "0.51", optional = true }
aws-sdk-s3 = { version = "0.21", optional = true }
aws-smithy-client = { version = "0.51", features = ["rustls"], optional = true }
aws-sig-auth = { version = "0.51", optional = true }
[dev-dependencies]
assert_cmd = "=2.0.7"
cc = "1.0"
@ -125,7 +122,7 @@ features = [
default = ["all"]
all = ["dist-client", "redis", "s3", "memcached", "gcs", "azure", "gha"]
azure = ["chrono", "hyper", "hyperx", "reqwest", "url", "hmac", "md-5", "sha2"]
s3 = ["aws-config", "aws-sdk-s3", "aws-smithy-client", "aws-sig-auth"]
s3 = ["opendal"]
gcs = ["chrono", "hyper", "hyperx", "percent-encoding", "reqwest", "ring", "url"]
gha = ["gha-toolkit"]
memcached = ["memcached-rs"]

211
src/cache/s3.rs поставляемый
Просмотреть файл

@ -10,13 +10,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::middleware::DefaultMiddleware;
use aws_sdk_s3::operation::{GetObject, PutObject};
use aws_sdk_s3::output::{GetObjectOutput, PutObjectOutput};
use aws_sdk_s3::{Config, Endpoint, Region};
use aws_sig_auth::signer::{OperationSigningConfig, SigningRequirements};
use aws_smithy_client::erase::DynConnector;
use opendal::services::s3;
use opendal::Operator;
use crate::cache::{Cache, CacheRead, CacheWrite, Storage};
use std::convert::TryInto;
@ -26,9 +21,7 @@ use std::time::{Duration, Instant};
use crate::errors::*;
pub struct S3Cache {
client: S3Client,
no_credentials: bool,
key_prefix: String,
client: Operator,
}
impl S3Cache {
@ -40,10 +33,21 @@ impl S3Cache {
endpoint: Option<&str>,
use_ssl: Option<bool>,
) -> Result<S3Cache> {
let mut builder = s3::Builder::default();
builder.bucket(bucket);
if let Some(region) = region {
builder.region(region);
}
builder.root(key_prefix);
if no_credentials {
builder.disable_credential_loader();
}
if let Some(endpoint) = endpoint {
builder.endpoint(&endpoint_resolver(endpoint, use_ssl)?);
}
Ok(S3Cache {
key_prefix: key_prefix.to_owned(),
no_credentials,
client: S3Client::new(bucket, region, endpoint, use_ssl).await?,
client: builder.build()?.into(),
})
}
}
@ -51,14 +55,9 @@ impl S3Cache {
#[async_trait]
impl Storage for S3Cache {
async fn get(&self, key: &str) -> Result<Cache> {
let response = self
.client
.get_object(&normalize_key(&self.key_prefix, key), self.no_credentials)
.await;
match response {
match self.client.object(&normalize_key(key)).read().await {
Ok(res) => {
let hit = CacheRead::from(io::Cursor::new(res.body.collect().await?.into_bytes()))?;
let hit = CacheRead::from(io::Cursor::new(res))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
@ -72,14 +71,15 @@ impl Storage for S3Cache {
let start = Instant::now();
self.client
.put_object(&normalize_key(&self.key_prefix, key), entry.finish()?)
.object(&normalize_key(key))
.write(entry.finish()?)
.await?;
Ok(start.elapsed())
}
fn location(&self) -> String {
format!("S3, bucket: {}", self.client.bucket)
format!("S3, bucket: {}", self.client.metadata().name())
}
async fn current_size(&self) -> Result<Option<u64>> {
@ -91,19 +91,15 @@ impl Storage for S3Cache {
}
}
fn normalize_key(prefix: &str, key: &str) -> String {
format!(
"{}{}/{}/{}/{}",
prefix,
&key[0..1],
&key[1..2],
&key[2..3],
&key
)
fn normalize_key(key: &str) -> String {
format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key)
}
fn endpoint_resolver(endpoint: &str, use_ssl: Option<bool>) -> Endpoint {
let endpoint_uri: http::Uri = endpoint.try_into().unwrap();
/// Resolve given endpoint along with use_ssl settings.
fn endpoint_resolver(endpoint: &str, use_ssl: Option<bool>) -> Result<String> {
let endpoint_uri: http::Uri = endpoint
.try_into()
.map_err(|err| anyhow!("input endpoint {endpoint} is invalid: {:?}", err))?;
let mut parts = endpoint_uri.into_parts();
match use_ssl {
Some(true) => {
@ -122,78 +118,8 @@ fn endpoint_resolver(endpoint: &str, use_ssl: Option<bool>) -> Endpoint {
if parts.path_and_query.is_none() {
parts.path_and_query = Some(http::uri::PathAndQuery::from_static("/"));
}
Endpoint::mutable(http::Uri::from_parts(parts).unwrap())
}
struct S3Client {
client: aws_smithy_client::Client<DynConnector, DefaultMiddleware>,
bucket: String,
config: Config,
}
impl S3Client {
async fn new(
bucket: &str,
region: Option<&str>,
endpoint: Option<&str>,
use_ssl: Option<bool>,
) -> Result<S3Client> {
let region_provider =
RegionProviderChain::first_try(region.map(|r| Region::new(r.to_owned())))
.or_default_provider();
let shared_config = aws_config::from_env().region(region_provider).load().await;
let mut builder = aws_sdk_s3::config::Builder::from(&shared_config);
if let Some(endpoint) = endpoint {
builder = builder.endpoint_resolver(endpoint_resolver(endpoint, use_ssl));
}
let config = builder.build();
// Keep the client around for connection reuse
let client = aws_smithy_client::Builder::new()
.dyn_https_connector(
aws_smithy_client::http_connector::ConnectorSettings::builder().build(),
)
.middleware(DefaultMiddleware::new())
.build();
Ok(S3Client {
client,
bucket: bucket.to_owned(),
config,
})
}
async fn get_object(&self, key: &str, no_credentials: bool) -> Result<GetObjectOutput> {
let mut op = GetObject::builder()
.bucket(&self.bucket)
.key(key)
.build()
.unwrap()
.make_operation(&self.config)
.await?;
if no_credentials {
let mut signing_config = OperationSigningConfig::default_config();
signing_config.signing_requirements = SigningRequirements::Disabled;
op.properties_mut().insert(signing_config);
}
Ok(self.client.call(op).await?)
}
async fn put_object(&self, key: &str, data: Vec<u8>) -> Result<PutObjectOutput> {
let op = PutObject::builder()
.bucket(&self.bucket)
.key(key)
.body(data.into())
.build()
.unwrap()
.make_operation(&self.config)
.await?;
Ok(self.client.call(op).await?)
}
Ok(http::Uri::from_parts(parts)?.to_string())
}
#[cfg(test)]
@ -203,24 +129,75 @@ mod test {
#[test]
fn test_normalize_key() {
assert_eq!(
normalize_key("prefix", "0123456789abcdef0123456789abcdef"),
"prefix0/1/2/0123456789abcdef0123456789abcdef"
);
assert_eq!(
normalize_key("prefix/", "0123456789abcdef0123456789abcdef"),
"prefix/0/1/2/0123456789abcdef0123456789abcdef"
normalize_key("0123456789abcdef0123456789abcdef"),
"0/1/2/0123456789abcdef0123456789abcdef"
);
}
#[test]
fn test_endpoint_resolver() {
let endpoint = endpoint_resolver("s3-us-east-1.amazonaws.com", None);
assert_eq!(endpoint.uri().scheme_str(), Some("http"));
fn test_endpoint_resolver() -> Result<()> {
let cases = vec![
(
"no scheme without use_ssl",
"s3-us-east-1.amazonaws.com",
None,
"http://s3-us-east-1.amazonaws.com/",
),
(
"http without use_ssl",
"http://s3-us-east-1.amazonaws.com",
None,
"http://s3-us-east-1.amazonaws.com/",
),
(
"https without use_ssl",
"https://s3-us-east-1.amazonaws.com",
None,
"https://s3-us-east-1.amazonaws.com/",
),
(
"no scheme with use_ssl",
"s3-us-east-1.amazonaws.com",
Some(true),
"https://s3-us-east-1.amazonaws.com/",
),
(
"http with use_ssl",
"http://s3-us-east-1.amazonaws.com",
Some(true),
"https://s3-us-east-1.amazonaws.com/",
),
(
"https with use_ssl",
"https://s3-us-east-1.amazonaws.com",
Some(true),
"https://s3-us-east-1.amazonaws.com/",
),
(
"no scheme with not use_ssl",
"s3-us-east-1.amazonaws.com",
Some(false),
"http://s3-us-east-1.amazonaws.com/",
),
(
"http with not use_ssl",
"http://s3-us-east-1.amazonaws.com",
Some(false),
"http://s3-us-east-1.amazonaws.com/",
),
(
"https with not use_ssl",
"https://s3-us-east-1.amazonaws.com",
Some(false),
"http://s3-us-east-1.amazonaws.com/",
),
];
let endpoint = endpoint_resolver("s3-us-east-1.amazonaws.com", Some(true));
assert_eq!(endpoint.uri().scheme_str(), Some("https"));
for (name, endpoint, use_ssl, expected) in cases {
let actual = endpoint_resolver(endpoint, use_ssl)?;
assert_eq!(actual, expected, "{}", name);
}
let endpoint = endpoint_resolver("s3-us-east-1.amazonaws.com", Some(false));
assert_eq!(endpoint.uri().scheme_str(), Some("http"));
Ok(())
}
}