From 7317274c032bc18dd652717b33784e2ea5a1657f Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Fri, 29 Jul 2022 16:55:35 +0800 Subject: [PATCH] RBAC --- registry/Cargo.lock | 218 ++++++++++++- registry/Cargo.toml | 1 + registry/auth/Cargo.toml | 22 ++ registry/auth/src/lib.rs | 38 +++ registry/auth/src/token.rs | 158 ++++++++++ registry/common-utils/Cargo.toml | 1 + registry/common-utils/src/lib.rs | 14 + registry/feathr-registry/src/main.rs | 5 +- registry/raft-registry/Cargo.toml | 4 +- registry/raft-registry/src/app.rs | 92 ++++-- registry/raft-registry/src/lib.rs | 2 + registry/raft-registry/src/network/api_v1.rs | 206 ++++++++++++- registry/raft-registry/src/network/api_v2.rs | 288 +++++++++++++++++- registry/raft-registry/src/rbac_middleware.rs | 102 +++++++ registry/registry-api/Cargo.toml | 1 + registry/registry-api/src/api_models/mod.rs | 2 + registry/registry-api/src/api_models/rbac.rs | 52 ++++ registry/registry-api/src/api_provider.rs | 93 +++++- registry/registry-api/src/error.rs | 5 + registry/registry-provider/Cargo.toml | 2 +- registry/registry-provider/src/error.rs | 5 +- registry/registry-provider/src/lib.rs | 2 + .../registry-provider/src/rbac_provider.rs | 131 ++++++++ registry/registry-provider/src/registry.rs | 16 +- registry/sql-provider/Cargo.toml | 10 +- registry/sql-provider/src/database/mod.rs | 8 +- registry/sql-provider/src/database/mssql.rs | 164 +++++++++- registry/sql-provider/src/database/sqlx.rs | 141 ++++++++- registry/sql-provider/src/db_registry.rs | 41 ++- registry/sql-provider/src/lib.rs | 146 ++++++++- registry/sql-provider/src/mock.rs | 1 + registry/sql-provider/src/rbac_map.rs | 103 +++++++ registry/sql-provider/src/serdes.rs | 23 +- 33 files changed, 1993 insertions(+), 104 deletions(-) create mode 100644 registry/auth/Cargo.toml create mode 100644 registry/auth/src/lib.rs create mode 100644 registry/auth/src/token.rs create mode 100644 registry/raft-registry/src/rbac_middleware.rs create mode 100644 registry/registry-api/src/api_models/rbac.rs create mode 100644 registry/registry-provider/src/rbac_provider.rs create mode 100644 registry/sql-provider/src/rbac_map.rs diff --git a/registry/Cargo.lock b/registry/Cargo.lock index 968b01d..24ec331 100644 --- a/registry/Cargo.lock +++ b/registry/Cargo.lock @@ -91,6 +91,18 @@ version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" +[[package]] +name = "async-native-tls" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe" +dependencies = [ + "futures-util", + "native-tls", + "thiserror", + "url", +] + [[package]] name = "async-trait" version = "0.1.56" @@ -135,6 +147,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "auth" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "common-utils", + "futures", + "jsonwebtoken", + "lazy_static", + "log", + "openssl", + "reqwest", + "serde", + "serde_json", + "serde_with", + "thiserror", + "tokio", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -174,15 +206,15 @@ dependencies = [ [[package]] name = "bb8-tiberius" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61780ef76db8989f8fd30d9a63fcb1d7d1b8d7df3b7d4662c09e66474877b0e6" +checksum = "ba49af39f809f71aeec44467238c27b34d1caecc8a83a73906c006b7f1138ca3" dependencies = [ "async-trait", "bb8", "futures", "thiserror", - "tiberius", + "tiberius 0.10.0", "tokio", "tokio-util 0.6.10", ] @@ -339,6 +371,7 @@ version = "0.1.0" dependencies = [ "dotenv", "log", + "thiserror", "tracing-subscriber", ] @@ -516,14 +549,38 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.14.1", + "darling_macro 0.14.1", +] + +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", ] [[package]] @@ -540,13 +597,24 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn", +] + [[package]] name = "darling_macro" version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" dependencies = [ - "darling_core", + "darling_core 0.14.1", "quote", "syn", ] @@ -1241,6 +1309,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", "hashbrown", + "serde", ] [[package]] @@ -1285,6 +1354,20 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aa4b4af834c6cfd35d8763d359661b90f2e45d8f750a0849156c7f4671af09c" +dependencies = [ + "base64 0.13.0", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1711,6 +1794,20 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentls" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f561874f8d6ecfb674fc08863414040c93cc90c0b6963fe679895fab8b65560" +dependencies = [ + "futures-util", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "url", +] + [[package]] name = "os_str_bytes" version = "6.2.0" @@ -1789,6 +1886,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" +[[package]] +name = "pem" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c64931a1a212348ec4f3b4362585eca7159d0d09cbdf4a7f74f02173596fd4" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "pem-rfc7468" version = "0.3.1" @@ -1957,7 +2063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5105e426a8a747fefc4be06ab7705cb63bc3ab1b2db373867e1d83b1d95877a" dependencies = [ "Inflector", - "darling", + "darling 0.14.1", "http", "indexmap", "mime", @@ -2063,6 +2169,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "auth", + "chrono", "clap", "common-utils", "env_logger", @@ -2242,6 +2350,7 @@ dependencies = [ "async-trait", "chrono", "common-utils", + "itertools", "log", "poem", "poem-openapi", @@ -2548,6 +2657,34 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89df7a26519371a3cce44fbb914c2819c84d9b897890987fa3ab096491cc0ea8" +dependencies = [ + "base64 0.13.0", + "chrono", + "hex", + "indexmap", + "serde", + "serde_json", + "serde_with_macros", + "time 0.3.11", +] + +[[package]] +name = "serde_with_macros" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de337f322382fcdfbb21a014f7c224ee041a23785651db67b9827403178f698f" +dependencies = [ + "darling 0.14.1", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -2600,6 +2737,18 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time 0.3.11", +] + [[package]] name = "slab" version = "0.4.7" @@ -2674,6 +2823,7 @@ dependencies = [ "async-trait", "bb8", "bb8-tiberius", + "chrono", "common-utils", "itertools", "log", @@ -2686,9 +2836,12 @@ dependencies = [ "sqlx", "tantivy", "thiserror", - "tiberius", + "tiberius 0.10.0", + "tiberius-derive", "tokio", "tokio-stream", + "tracing", + "tracing-futures", "uuid 1.1.2", ] @@ -2725,6 +2878,7 @@ dependencies = [ "bitflags", "byteorder", "bytes 1.2.0", + "chrono", "crossbeam-queue", "digest", "dirs", @@ -2989,9 +3143,37 @@ dependencies = [ [[package]] name = "tiberius" -version = "0.9.4" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1c6d3d8b3740376841aed0deeca9f98b0607433a213727c0b7c132b20b4ad3" +checksum = "833311bc8e26e96c73ad1b5c1f488c588808c747a318905ec67e43d422ea2c08" +dependencies = [ + "async-native-tls", + "async-trait", + "asynchronous-codec", + "byteorder", + "bytes 1.2.0", + "connection-string", + "encoding", + "enumflags2", + "futures", + "futures-sink", + "futures-util", + "num-traits", + "once_cell", + "opentls", + "pin-project-lite", + "pretty-hex", + "thiserror", + "tracing", + "uuid 0.8.2", + "winauth", +] + +[[package]] +name = "tiberius" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332ebb88fba7df15f54c9f1332baf284d6c13c6afa3ff612e924db07a5c1d9ce" dependencies = [ "async-trait", "asynchronous-codec", @@ -3015,10 +3197,24 @@ dependencies = [ "tokio-rustls", "tokio-util 0.6.10", "tracing", - "uuid 0.8.2", + "uuid 1.1.2", "winauth", ] +[[package]] +name = "tiberius-derive" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8097a74621f85df93f8727700bedbc88426107f9a677e9ca027e056a2a54308f" +dependencies = [ + "darling 0.13.4", + "ident_case", + "proc-macro2", + "quote", + "syn", + "tiberius 0.7.3", +] + [[package]] name = "time" version = "0.1.44" diff --git a/registry/Cargo.toml b/registry/Cargo.toml index a6309b4..aec6150 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -8,5 +8,6 @@ members = [ "registry-api", "registry-cli", "raft-registry", + "auth", ] diff --git a/registry/auth/Cargo.toml b/registry/auth/Cargo.toml new file mode 100644 index 0000000..d739bdf --- /dev/null +++ b/registry/auth/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "auth" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +log = "0.4" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +serde_with = { version = "2", features = ["chrono"] } +thiserror = "1" +futures = "0.3" +tokio = "1" +chrono = { version = "0.4", features = ["serde"] } +reqwest = { version = "0.11", features = ["json"], default-features = false } +async-trait = "0.1" +lazy_static = "1" +openssl = "0.10" +jsonwebtoken = "8" +common-utils = { path = "../common-utils" } diff --git a/registry/auth/src/lib.rs b/registry/auth/src/lib.rs new file mode 100644 index 0000000..06fb063 --- /dev/null +++ b/registry/auth/src/lib.rs @@ -0,0 +1,38 @@ +use thiserror::Error; + +mod token; + +#[derive(Error, Debug)] +pub enum AuthError { + #[error("{0}")] + ReqwestError(String), + + #[error("{0}")] + JwtError(String), + + #[error(transparent)] + CertError(#[from] openssl::error::ErrorStack), + + #[error("Token format is invalid.")] + InvalidToken, + + #[error("Token timestamp is invalid.")] + InvalidTimestamp, + + #[error("Key('{0}') is not found.")] + KeyNotFound(String), + + #[error("Failed to initialize auth lib")] + InitializationError, +} + +pub use token::decode_token; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} diff --git a/registry/auth/src/token.rs b/registry/auth/src/token.rs new file mode 100644 index 0000000..7b4e303 --- /dev/null +++ b/registry/auth/src/token.rs @@ -0,0 +1,158 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, Utc}; +use common_utils::Logged; +use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation}; +use openssl::x509::X509; +use serde::{de::DeserializeOwned, Deserialize}; +use serde_with::TimestampSeconds; +use tokio::sync::{OnceCell, RwLock}; + +use crate::AuthError; + +impl From for AuthError { + fn from(e: reqwest::Error) -> Self { + Self::ReqwestError(e.to_string()) + } +} + +impl From for AuthError { + fn from(e: jsonwebtoken::errors::Error) -> Self { + Self::JwtError(e.to_string()) + } +} + +pub struct TokenDecoder { + // TODO: Refresh periodically, daily maybe? + keys: HashMap, +} + +impl TokenDecoder { + pub async fn new(base_url: &str) -> Result { + let resp: OpenIdConfiguration = reqwest::get(format!( + "{}/v2.0/.well-known/openid-configuration", + base_url + )) + .await? + .json() + .await?; + let cfg: AadKeyConfiguration = reqwest::get(resp.jwks_uri).await?.json().await?; + Ok(Self { + keys: cfg + .keys + .into_iter() + .filter_map(|k| k.into_decoding_key().log().ok()) + .collect::>(), + }) + } + + pub fn decode_token(&self, token: &str, check_expiration: bool) -> Result + where + T: DeserializeOwned, + { + let now = chrono::Utc::now(); + + #[serde_with::serde_as] + #[derive(Clone, Debug, Deserialize)] + struct Claims { + #[serde_as(as = "TimestampSeconds")] + nbf: DateTime, + #[serde_as(as = "TimestampSeconds")] + exp: DateTime, + #[serde(flatten)] + user_claims: U, + } + let claims: Claims = self.decode_token_claims_no_validation(token.trim())?; + if check_expiration && ((claims.nbf > now) || (claims.exp < now)) { + return Err(AuthError::InvalidTimestamp); + } + Ok(claims.user_claims) + } + + fn get_decoding_key(&self, kid: &str) -> Result<&DecodingKey, AuthError> { + let key = self + .keys + .get(kid) + .ok_or_else(|| AuthError::KeyNotFound(kid.to_owned()))?; + Ok(key) + } + + fn decode_token_claims_no_validation(&self, token: &str) -> Result + where + T: DeserializeOwned, + { + let header = decode_header(token)?; + let kid = &header.kid.or(header.x5t).ok_or(AuthError::InvalidToken)?; + let key = self.get_decoding_key(kid)?; + // TODO: Use 'alg' header + let mut validation = Validation::new(Algorithm::RS256); + validation.validate_exp = false; + let decoded = decode(token, key, &validation)?; + Ok(decoded.claims) + } +} + +#[derive(Clone, Debug, Deserialize)] +struct OpenIdConfiguration { + jwks_uri: String, +} + +#[allow(dead_code)] +#[derive(Clone, Debug, Deserialize)] +struct AadKey { + kty: String, + #[serde(rename = "use")] + use_: String, + kid: String, + x5t: String, + n: String, + e: String, + x5c: Vec, + issuer: String, +} + +impl AadKey { + fn into_decoding_key(self) -> Result<(String, DecodingKey), AuthError> { + let x509 = X509::from_pem( + format!( + "-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----", + self.x5c + .get(0) + .ok_or_else(|| AuthError::KeyNotFound(self.kid.to_owned()))? + ) + .as_bytes(), + )?; + let pk = x509.public_key()?.public_key_to_pem()?; + let key = DecodingKey::from_rsa_pem(pk.as_slice())?; + Ok((self.kid, key)) + } +} + +#[derive(Clone, Debug, Deserialize)] +struct AadKeyConfiguration { + keys: Vec, +} + +static DECODER: OnceCell>>> = OnceCell::const_new(); + +pub async fn decode_token(token: &str) -> Result +where + C: DeserializeOwned, +{ + DECODER + .get_or_init(|| async { + let base_url = std::env::var("OPENID_BASE_URL") + .unwrap_or("https://login.microsoftonline.com/common".to_string()); + TokenDecoder::new(&base_url) + .await + .ok() + .map(|d| Arc::new(RwLock::new(d))) + }) + .await + .as_ref() + .ok_or_else(|| AuthError::InitializationError)? + .read() + .await + .decode_token(token, true) + .map_err(|e| e.into()) +} diff --git a/registry/common-utils/Cargo.toml b/registry/common-utils/Cargo.toml index 10e03a3..4934b4c 100644 --- a/registry/common-utils/Cargo.toml +++ b/registry/common-utils/Cargo.toml @@ -7,5 +7,6 @@ edition = "2021" [dependencies] log = "0.4" +thiserror = "1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } dotenv = "0.15" diff --git a/registry/common-utils/src/lib.rs b/registry/common-utils/src/lib.rs index 76c0ddc..2a2c00e 100644 --- a/registry/common-utils/src/lib.rs +++ b/registry/common-utils/src/lib.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use log::trace; +use thiserror::Error; /// `set!` macro works like `vec!`, but generates a HashSet. #[macro_export] @@ -14,6 +15,19 @@ macro_rules! set { }; } +#[derive(Error, Debug)] +#[error("{0}")] +pub struct StringError(String); + +impl StringError { + pub fn new(s: T) -> Self + where + T: ToString, + { + StringError(s.to_string()) + } +} + /// Log if `Result` is an error pub trait Logged { fn log(self) -> Self; diff --git a/registry/feathr-registry/src/main.rs b/registry/feathr-registry/src/main.rs index e8ef96b..d2729bd 100644 --- a/registry/feathr-registry/src/main.rs +++ b/registry/feathr-registry/src/main.rs @@ -20,7 +20,7 @@ use poem::{ use poem_openapi::OpenApiService; use raft_registry::{ management_routes, raft_routes, FeathrApiV1, FeathrApiV2, NodeConfig, RaftRegistryApp, - RaftSequencer, + RaftSequencer, RbacMiddleware, }; use sql_provider::attach_storage; @@ -176,7 +176,8 @@ async fn main() -> Result<(), anyhow::Error> { .nest("/v2", api_service_v2) .with(Tracing) .with(RaftSequencer::new(app.store.clone())) - .with(Cors::new()); + .with(Cors::new()) + .with(RbacMiddleware); let docs_route = Route::new().nest("/v1", ui_v1).nest("/v2", ui_v2); diff --git a/registry/raft-registry/Cargo.toml b/registry/raft-registry/Cargo.toml index 1fa60d8..7efb0aa 100644 --- a/registry/raft-registry/Cargo.toml +++ b/registry/raft-registry/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1" async-trait = "0.1" futures-util = "0.3" thiserror = "1" +chrono = { version = "0.4", features = ["serde"] } env_logger = "0.9.0" serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" @@ -19,7 +20,7 @@ tokio = { version="1.0", default-features=false, features=["sync"] } tracing = "0.1" tracing-futures = "0.2" sled = "0.34" -uuid = { version = "1", features = ["v4"] } +uuid = { version = "1", features = ["v4", "serde"] } walkdir = "2.3" rand = "0.8" reqwest = { version = "0.11", features = ["json"] } @@ -30,3 +31,4 @@ common-utils = { path = "../common-utils" } registry-provider = { path = "../registry-provider" } sql-provider = { path = "../sql-provider" } registry-api = { path = "../registry-api" } +auth = { path = "../auth" } \ No newline at end of file diff --git a/registry/raft-registry/src/app.rs b/registry/raft-registry/src/app.rs index 63e7075..522f89e 100644 --- a/registry/raft-registry/src/app.rs +++ b/registry/raft-registry/src/app.rs @@ -1,30 +1,26 @@ -use std::collections::BTreeMap; -use std::collections::BTreeSet; -use std::sync::Arc; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; -use log::debug; -use log::trace; -use openraft::error::CheckIsLeaderError; -use openraft::error::InitializeError; -use openraft::raft::ClientWriteRequest; -use openraft::Config; -use openraft::EntryPayload; -use openraft::Node; -use openraft::Raft; -use registry_api::ApiError; -use registry_api::FeathrApiProvider; -use registry_api::FeathrApiRequest; -use registry_api::FeathrApiResponse; +use log::{debug, trace}; +use openraft::{ + error::{CheckIsLeaderError, InitializeError}, + raft::ClientWriteRequest, + Config, EntryPayload, Node, Raft, +}; +use poem::error::Forbidden; +use registry_api::{ + ApiError, FeathrApiProvider, FeathrApiRequest, FeathrApiResponse, IntoApiResult, +}; +use registry_provider::{Credential, Permission, RbacError, RbacProvider}; use sql_provider::load_content; use tokio::net::ToSocketAddrs; -use crate::ManagementCode; -use crate::RegistryClient; -use crate::RegistryNetwork; -use crate::RegistryNodeId; -use crate::RegistryRaft; -use crate::RegistryStore; -use crate::Restore; +use crate::{ + ManagementCode, RegistryClient, RegistryNetwork, RegistryNodeId, RegistryRaft, RegistryStore, + Restore, +}; // Representation of an application state. This struct can be shared around to share // instances of raft, store and more. @@ -73,6 +69,37 @@ impl RaftRegistryApp { } } + pub async fn check_permission( + &self, + credential: &Credential, + resource: Option<&str>, + permission: Permission, + ) -> poem::Result<()> { + let resource = match resource { + Some(s) => s.parse().map_api_error()?, + None => { + // Read/write project list works as long as there is an identity + return Ok(()); + } + }; + if !self + .store + .state_machine + .read() + .await + .registry + .check_permission(credential, &resource, permission) + .map_api_error()? + { + return Err(Forbidden(RbacError::PermissionDenied( + credential.to_string(), + resource, + permission, + ))); + } + Ok(()) + } + pub async fn check_code(&self, code: Option) -> poem::Result<()> { trace!("Checking code {:?}", code); match self.store.get_management_code() { @@ -103,9 +130,16 @@ impl RaftRegistryApp { } pub async fn load_data(&self) -> anyhow::Result<()> { - let (entities, edges) = load_content().await?; + let (entities, edges, permission_map) = load_content().await?; match self - .request(None, FeathrApiRequest::BatchLoad { entities, edges }) + .request( + None, + FeathrApiRequest::BatchLoad { + entities, + edges, + permissions: permission_map, + }, + ) .await { FeathrApiResponse::Error(e) => Err(e)?, @@ -204,12 +238,8 @@ impl RaftRegistryApp { ); // Remove stale old instance of this node if let Ok(m) = client.metrics().await { - let mut nodes: BTreeSet = m - .membership_config - .get_nodes() - .keys() - .copied() - .collect(); + let mut nodes: BTreeSet = + m.membership_config.get_nodes().keys().copied().collect(); debug!("Found nodes: {:?}", nodes); if nodes.contains(&self.id) { debug!("Node with id {} exists in the cluster, clean up stale instance", self.id); diff --git a/registry/raft-registry/src/lib.rs b/registry/raft-registry/src/lib.rs index 77426d7..df409c9 100644 --- a/registry/raft-registry/src/lib.rs +++ b/registry/raft-registry/src/lib.rs @@ -8,6 +8,7 @@ mod store; mod network; mod app; mod client; +mod rbac_middleware; pub type RegistryNodeId = u64; @@ -32,3 +33,4 @@ pub use store::*; pub use network::*; pub use app::*; pub use client::RegistryClient; +pub use rbac_middleware::RbacMiddleware; diff --git a/registry/raft-registry/src/network/api_v1.rs b/registry/raft-registry/src/network/api_v1.rs index 6171cc0..ccc43fb 100644 --- a/registry/raft-registry/src/network/api_v1.rs +++ b/registry/raft-registry/src/network/api_v1.rs @@ -1,4 +1,8 @@ -use poem::web::Data; +use common_utils::StringError; +use poem::{ + error::{BadRequest, InternalServerError}, + web::Data, +}; use poem_openapi::{ param::{Header, Path, Query}, payload::Json, @@ -6,8 +10,9 @@ use poem_openapi::{ }; use registry_api::{ AnchorDef, AnchorFeatureDef, CreationResponse, DerivedFeatureDef, Entity, EntityLineage, - FeathrApiRequest, ProjectDef, SourceDef, + FeathrApiRequest, ProjectDef, RbacResponse, SourceDef, }; +use registry_provider::{Credential, Permission}; use uuid::Uuid; use crate::RaftRegistryApp; @@ -20,6 +25,7 @@ enum ApiTags { AnchorFeature, DerivedFeature, Feature, + Rbac, } pub struct FeathrApiV1; @@ -28,12 +34,16 @@ impl FeathrApiV1 { #[oai(path = "/projects", method = "get", tag = "ApiTags::Project")] async fn get_projects( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, keyword: Query>, page: Query>, limit: Query>, ) -> poem::Result>> { + data.0 + .check_permission(credential.0, None, Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -51,10 +61,14 @@ impl FeathrApiV1 { #[oai(path = "/projects", method = "post", tag = "ApiTags::Project")] async fn new_project( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, None, Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -62,20 +76,45 @@ impl FeathrApiV1 { if definition.created_by.is_empty() { definition.created_by = creator.0.unwrap_or_default(); } - data.0 + let ret = data + .0 .request(None, FeathrApiRequest::CreateProject { definition }) .await - .into_uuid_and_version() - .map(|v| Json(v.into())) + .into_uuid_and_version(); + // Grant project admin permission to the creator of the project. + if let Ok((uuid, _)) = &ret { + let ret = data + .0 + .request( + None, + FeathrApiRequest::AddUserRole { + project_id_or_name: uuid.to_string(), + user: credential.0.clone(), + role: Permission::Admin, + requestor: credential.0.clone(), + reason: "Created project".to_string(), + }, + ) + .await; + match ret { + registry_api::FeathrApiResponse::Error(e) => return Err(e.into()), + _ => {} + } + } + ret.map(|v| Json(v.into())) } #[oai(path = "/projects/:project", method = "get", tag = "ApiTags::Project")] async fn get_project_lineage( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -95,6 +134,7 @@ impl FeathrApiV1 { )] async fn get_project_features( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -102,6 +142,9 @@ impl FeathrApiV1 { page: Query>, limit: Query>, ) -> poem::Result>> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -125,6 +168,7 @@ impl FeathrApiV1 { )] async fn get_project_datasources( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -132,6 +176,9 @@ impl FeathrApiV1 { page: Query>, limit: Query>, ) -> poem::Result>> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -155,11 +202,15 @@ impl FeathrApiV1 { )] async fn new_datasource( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -187,11 +238,15 @@ impl FeathrApiV1 { )] async fn new_derived_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -219,6 +274,7 @@ impl FeathrApiV1 { )] async fn get_project_anchors( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -226,6 +282,9 @@ impl FeathrApiV1 { page: Query>, limit: Query>, ) -> poem::Result>> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -249,11 +308,15 @@ impl FeathrApiV1 { )] async fn new_anchor( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -281,12 +344,16 @@ impl FeathrApiV1 { )] async fn new_anchor_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, anchor: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -311,10 +378,14 @@ impl FeathrApiV1 { #[oai(path = "/features/:feature", method = "get", tag = "ApiTags::Feature")] async fn get_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -334,10 +405,14 @@ impl FeathrApiV1 { )] async fn get_feature_lineage( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -357,10 +432,14 @@ impl FeathrApiV1 { )] async fn get_feature_project( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -372,4 +451,121 @@ impl FeathrApiV1 { .into_entity() .map(Json) } + + #[oai(path = "/userroles", method = "get", tag = "ApiTags::Rbac")] + async fn get_user_roles( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + ) -> poem::Result>> { + data.0 + .check_permission(credential.0, Some("global"), Permission::Admin) + .await?; + data.0 + .request(opt_seq.0, FeathrApiRequest::GetUserRoles) + .await + .into_user_roles() + .map(Json) + } + + #[oai( + path = "/users/:user/userroles/add", + method = "post", + tag = "ApiTags::Rbac" + )] + async fn add_user_role( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + user: Path, + project: Query, + role: Query, + reason: Query, + ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Admin) + .await?; + let resp = data + .0 + .request( + opt_seq.0, + FeathrApiRequest::AddUserRole { + user: user.0.parse().map_err(|e| BadRequest(e))?, + project_id_or_name: project.0, + role: match role.0.to_lowercase().as_str() { + "admin" => Permission::Admin, + "consumer" => Permission::Read, + "producer" => Permission::Write, + _ => { + return Err(BadRequest(StringError::new(format!( + "invalid role {}", + role.0 + )))) + } + }, + requestor: credential.0.to_owned(), + reason: reason.0, + }, + ) + .await; + match resp { + registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())), + registry_api::FeathrApiResponse::Error(e) => Err(e.into()), + _ => Err(InternalServerError(StringError::new( + "Internal Server Error", + ))), + } + } + + #[oai( + path = "/users/:user/userroles/delete", + method = "delete", + tag = "ApiTags::Rbac" + )] + async fn delete_user_role( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + user: Path, + project: Query, + role: Query, + reason: Query, + ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Admin) + .await?; + let resp = data + .0 + .request( + opt_seq.0, + FeathrApiRequest::DeleteUserRole { + user: user.0.parse().map_err(|e| BadRequest(e))?, + project_id_or_name: project.0, + role: match role.0.to_lowercase().as_str() { + "admin" => Permission::Admin, + "consumer" => Permission::Read, + "producer" => Permission::Write, + _ => { + return Err(BadRequest(StringError::new(format!( + "invalid role {}", + role.0 + )))) + } + }, + requestor: credential.0.to_owned(), + reason: reason.0, + }, + ) + .await; + match resp { + registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())), + registry_api::FeathrApiResponse::Error(e) => Err(e.into()), + _ => Err(InternalServerError(StringError::new( + "Internal Server Error", + ))), + } + } } diff --git a/registry/raft-registry/src/network/api_v2.rs b/registry/raft-registry/src/network/api_v2.rs index 8fbd31e..6737871 100644 --- a/registry/raft-registry/src/network/api_v2.rs +++ b/registry/raft-registry/src/network/api_v2.rs @@ -1,13 +1,18 @@ -use poem::web::Data; +use common_utils::StringError; +use poem::{ + error::{BadRequest, InternalServerError}, + web::Data, +}; use poem_openapi::{ param::{Header, Path, Query}, payload::Json, OpenApi, Tags, }; use registry_api::{ - AnchorDef, AnchorFeatureDef, CreationResponse, DerivedFeatureDef, Entities, Entity, - EntityLineage, FeathrApiRequest, ProjectDef, SourceDef, ApiError, + AnchorDef, AnchorFeatureDef, ApiError, CreationResponse, DerivedFeatureDef, Entities, Entity, + EntityLineage, FeathrApiRequest, ProjectDef, RbacResponse, SourceDef, }; +use registry_provider::{Credential, Permission}; use uuid::Uuid; use crate::RaftRegistryApp; @@ -20,6 +25,7 @@ enum ApiTags { AnchorFeature, DerivedFeature, Feature, + Rbac, } pub struct FeathrApiV2; @@ -29,12 +35,16 @@ impl FeathrApiV2 { #[oai(path = "/projects", method = "get", tag = "ApiTags::Project")] async fn get_projects( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, keyword: Query>, size: Query>, offset: Query>, ) -> poem::Result>> { + data.0 + .check_permission(credential.0, None, Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -52,10 +62,14 @@ impl FeathrApiV2 { #[oai(path = "/projects", method = "post", tag = "ApiTags::Project")] async fn new_project( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, None, Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -63,23 +77,46 @@ impl FeathrApiV2 { if definition.created_by.is_empty() { definition.created_by = creator.0.unwrap_or_default(); } - data.0 - .request( - None, - FeathrApiRequest::CreateProject { definition }, - ) + let ret = data + .0 + .request(None, FeathrApiRequest::CreateProject { definition }) .await - .into_uuid_and_version() - .map(|v| Json(v.into())) + .into_uuid_and_version(); + // Grant project admin permission to the creator of the project. + if let Ok((uuid, _)) = &ret { + let ret = data + .0 + .request( + None, + FeathrApiRequest::AddUserRole { + project_id_or_name: uuid.to_string(), + user: credential.0.clone(), + role: Permission::Admin, + requestor: credential.0.clone(), + reason: "Created project".to_string(), + }, + ) + .await; + match ret { + registry_api::FeathrApiResponse::Error(e) => return Err(e.into()), + _ => {} + } + } + + ret.map(|v| Json(v.into())) } #[oai(path = "/projects/:project", method = "get", tag = "ApiTags::Project")] async fn get_project( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -99,10 +136,14 @@ impl FeathrApiV2 { )] async fn get_project_lineage( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -122,6 +163,7 @@ impl FeathrApiV2 { )] async fn get_project_features( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -129,6 +171,9 @@ impl FeathrApiV2 { size: Query>, offset: Query>, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -151,6 +196,7 @@ impl FeathrApiV2 { )] async fn get_datasources( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -158,6 +204,9 @@ impl FeathrApiV2 { size: Query>, offset: Query>, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -180,11 +229,15 @@ impl FeathrApiV2 { )] async fn new_datasource( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -212,11 +265,15 @@ impl FeathrApiV2 { )] async fn get_datasource( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, source: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -237,11 +294,15 @@ impl FeathrApiV2 { )] async fn get_datasource_versions( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, source: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -262,12 +323,16 @@ impl FeathrApiV2 { )] async fn get_datasource_version( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, source: Path, version: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -289,6 +354,7 @@ impl FeathrApiV2 { )] async fn get_project_derived_features( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -296,6 +362,9 @@ impl FeathrApiV2 { size: Query>, offset: Query>, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -318,11 +387,15 @@ impl FeathrApiV2 { )] async fn new_derived_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -350,11 +423,15 @@ impl FeathrApiV2 { )] async fn get_project_derived_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -375,11 +452,15 @@ impl FeathrApiV2 { )] async fn get_project_derived_feature_versions( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -400,12 +481,16 @@ impl FeathrApiV2 { )] async fn get_project_derived_feature_version( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, feature: Path, - version: Path + version: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -427,6 +512,7 @@ impl FeathrApiV2 { )] async fn get_project_anchors( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -434,6 +520,9 @@ impl FeathrApiV2 { size: Query>, offset: Query>, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -456,11 +545,15 @@ impl FeathrApiV2 { )] async fn new_anchor( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -488,11 +581,15 @@ impl FeathrApiV2 { )] async fn get_anchor( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, anchor: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -513,11 +610,15 @@ impl FeathrApiV2 { )] async fn get_anchor_versions( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, anchor: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -538,12 +639,16 @@ impl FeathrApiV2 { )] async fn get_anchor_version( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, anchor: Path, - version: Path + version: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -565,6 +670,7 @@ impl FeathrApiV2 { )] async fn get_anchor_features( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -573,6 +679,9 @@ impl FeathrApiV2 { size: Query>, offset: Query>, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -596,12 +705,16 @@ impl FeathrApiV2 { )] async fn new_anchor_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-requestor")] creator: Header>, project: Path, anchor: Path, def: Json, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Write) + .await?; let mut definition = def.0; if definition.id.is_empty() { definition.id = Uuid::new_v4().to_string(); @@ -630,12 +743,16 @@ impl FeathrApiV2 { )] async fn get_project_anchor_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, anchor: Path, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -657,12 +774,16 @@ impl FeathrApiV2 { )] async fn get_project_anchor_feature_versions( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, anchor: Path, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -684,6 +805,7 @@ impl FeathrApiV2 { )] async fn get_project_anchor_feature_version( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, project: Path, @@ -691,6 +813,9 @@ impl FeathrApiV2 { feature: Path, version: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&project), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -709,10 +834,14 @@ impl FeathrApiV2 { #[oai(path = "/features/:feature", method = "get", tag = "ApiTags::Feature")] async fn get_feature( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -732,10 +861,14 @@ impl FeathrApiV2 { )] async fn get_feature_lineage( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -755,10 +888,14 @@ impl FeathrApiV2 { )] async fn get_feature_project( &self, + credential: Data<&Credential>, data: Data<&RaftRegistryApp>, #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, feature: Path, ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some(&feature), Permission::Read) + .await?; data.0 .request( opt_seq.0, @@ -770,16 +907,135 @@ impl FeathrApiV2 { .into_entity() .map(Json) } + + #[oai(path = "/userroles", method = "get", tag = "ApiTags::Rbac")] + async fn get_user_roles( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + ) -> poem::Result>> { + data.0 + .check_permission(credential.0, Some("global"), Permission::Admin) + .await?; + data.0 + .request(opt_seq.0, FeathrApiRequest::GetUserRoles) + .await + .into_user_roles() + .map(Json) + } + + #[oai( + path = "/users/:user/userroles/add", + method = "post", + tag = "ApiTags::Rbac" + )] + async fn add_user_role( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + user: Path, + project: Query, + role: Query, + reason: Query, + ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some("global"), Permission::Admin) + .await?; + let resp = data + .0 + .request( + opt_seq.0, + FeathrApiRequest::AddUserRole { + user: user.0.parse().map_err(|e| BadRequest(e))?, + project_id_or_name: project.0, + role: match role.0.to_lowercase().as_str() { + "admin" => Permission::Admin, + "consumer" => Permission::Read, + "producer" => Permission::Write, + _ => { + return Err(BadRequest(StringError::new(format!( + "invalid role {}", + role.0 + )))) + } + }, + requestor: credential.0.to_owned(), + reason: reason.0, + }, + ) + .await; + match resp { + registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())), + registry_api::FeathrApiResponse::Error(e) => Err(e.into()), + _ => Err(InternalServerError(StringError::new( + "Internal Server Error", + ))), + } + } + + #[oai( + path = "/users/:user/userroles/add", + method = "delete", + tag = "ApiTags::Rbac" + )] + async fn delete_user_role( + &self, + credential: Data<&Credential>, + data: Data<&RaftRegistryApp>, + #[oai(name = "x-registry-opt-seq")] opt_seq: Header>, + user: Path, + project: Query, + role: Query, + reason: Query, + ) -> poem::Result> { + data.0 + .check_permission(credential.0, Some("global"), Permission::Admin) + .await?; + let resp = data + .0 + .request( + opt_seq.0, + FeathrApiRequest::DeleteUserRole { + user: user.0.parse().map_err(|e| BadRequest(e))?, + project_id_or_name: project.0, + role: match role.0.to_lowercase().as_str() { + "admin" => Permission::Admin, + "consumer" => Permission::Read, + "producer" => Permission::Write, + _ => { + return Err(BadRequest(StringError::new(format!( + "invalid role {}", + role.0 + )))) + } + }, + requestor: credential.0.to_owned(), + reason: reason.0, + }, + ) + .await; + match resp { + registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())), + registry_api::FeathrApiResponse::Error(e) => Err(e.into()), + _ => Err(InternalServerError(StringError::new( + "Internal Server Error", + ))), + } + } } fn parse_version(v: T) -> Result, ApiError> where - T: AsRef + T: AsRef, { - if v.as_ref() =="latest" { + if v.as_ref() == "latest" { return Ok(None); } - Ok(Some(v.as_ref().parse().map_err(|_| ApiError::BadRequest(format!("Invalid version spec {}", v.as_ref())))?)) + Ok(Some(v.as_ref().parse().map_err(|_| { + ApiError::BadRequest(format!("Invalid version spec {}", v.as_ref())) + })?)) } #[cfg(test)] @@ -795,4 +1051,4 @@ mod tests { assert_eq!(parse_version("1").unwrap(), Some(1)); assert_eq!(parse_version("42").unwrap(), Some(42)); } -} \ No newline at end of file +} diff --git a/registry/raft-registry/src/rbac_middleware.rs b/registry/raft-registry/src/rbac_middleware.rs new file mode 100644 index 0000000..2b59a50 --- /dev/null +++ b/registry/raft-registry/src/rbac_middleware.rs @@ -0,0 +1,102 @@ +use std::str::FromStr; + +use auth::decode_token; +use common_utils::StringError; +use log::warn; +use poem::{error::{BadRequest, Forbidden}, Endpoint, Middleware, Request, Result}; +use registry_provider::Credential; +use serde::Deserialize; +use uuid::Uuid; + +pub struct RbacMiddleware; + +impl Middleware for RbacMiddleware { + type Output = RbacMiddlewareImpl; + + fn transform(&self, ep: E) -> Self::Output { + RbacMiddlewareImpl { ep } + } +} + +/// The new endpoint type generated by the TokenMiddleware. +pub struct RbacMiddlewareImpl { + ep: E, +} + +const TOKEN_HEADER: &str = "Authorization"; +const DEBUG_TOKEN_HEADER: &str = "x-feathr-debug-token"; + +#[derive(Default, Deserialize)] +#[serde(default)] +struct Claims { + app_id: Option, + preferred_username: Option, + email: Option, +} + +impl Claims { + fn get_credential(&self) -> Result { + match &self.app_id { + Some(s) => { + let id: Uuid = s.parse().map_err(|e| BadRequest(e))?; + Ok(Credential::App(id)) + } + None => match &self.preferred_username { + Some(s) => Ok(Credential::User(s.to_owned())), + None => match &self.email { + Some(s) => Ok(Credential::User(s.to_owned())), + None => Err(BadRequest(StringError::new("Invalid token claims"))), + }, + }, + } + } +} + +#[poem::async_trait] +impl Endpoint for RbacMiddlewareImpl { + type Output = E::Output; + + async fn call(&self, mut req: Request) -> Result { + if std::env::var("ENABLE_RBAC").is_err() { + req.extensions_mut().insert(Credential::RbacDisabled); + } else if std::env::var("FEATHR_ENABLE_RBAC_DEBUG_MUST_NOT_USE_IN_PROD").unwrap_or_default() + == "feathr_rbac_debug_enabled" + { + warn!("RBAC debug enabled"); + if let Some(value) = req + .headers() + .get(DEBUG_TOKEN_HEADER) + .and_then(|value| value.to_str().ok()) + { + if let Ok(credential) = Credential::from_str(value) { + warn!( + "RBAC debug enabled, got credential from debug header: {:?}", + credential + ); + req.extensions_mut().insert(credential); + } + } else if let Some(value) = req + .headers() + .get(TOKEN_HEADER) + .and_then(|value| value.to_str().ok()) + { + let value = value.trim_start_matches("Bearer"); + let claims: Claims = decode_token(value).await.map_err(|e| BadRequest(e))?; + req.extensions_mut().insert(claims.get_credential()?); + } + } else if let Some(value) = req + .headers() + .get(TOKEN_HEADER) + .and_then(|value| value.to_str().ok()) + { + let value = value.trim_start_matches("Bearer"); + let claims: Claims = decode_token(value).await.map_err(|e| BadRequest(e))?; + req.extensions_mut().insert(claims.get_credential()?); + } else { + return Err(Forbidden(StringError::new("Missing token"))); + } + + // call the next endpoint. + self.ep.call(req).await + } +} diff --git a/registry/registry-api/Cargo.toml b/registry/registry-api/Cargo.toml index c951e8d..d7cb31d 100644 --- a/registry/registry-api/Cargo.toml +++ b/registry/registry-api/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1" chrono = { version = "0.4", features = ["serde"] } async-trait = "0.1" thiserror = "1" +itertools = "0.10" serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" tokio = { version = "1", features = ["full"] } diff --git a/registry/registry-api/src/api_models/mod.rs b/registry/registry-api/src/api_models/mod.rs index c5a85c5..c2d83e3 100644 --- a/registry/registry-api/src/api_models/mod.rs +++ b/registry/registry-api/src/api_models/mod.rs @@ -9,10 +9,12 @@ use crate::error::ApiError; mod attributes; mod edge; mod entity; +mod rbac; pub use attributes::*; pub use edge::*; pub use entity::*; +pub use rbac::*; fn parse_uuid(s: &str) -> Result { Uuid::parse_str(s).map_err(|_| ApiError::BadRequest(format!("Invalid GUID `{}`", s))) diff --git a/registry/registry-api/src/api_models/rbac.rs b/registry/registry-api/src/api_models/rbac.rs new file mode 100644 index 0000000..5741c8a --- /dev/null +++ b/registry/registry-api/src/api_models/rbac.rs @@ -0,0 +1,52 @@ +use chrono::{DateTime, Utc}; +use poem_openapi::Object; +use registry_provider::{Permission, RbacRecord}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize, Object)] +#[oai(rename_all = "camelCase")] +#[serde(rename_all = "camelCase")] +pub struct RbacResponse { + pub scope: String, + pub user_name: String, + pub role_name: String, + pub create_by: String, + pub create_reason: String, + pub create_time: DateTime, + pub delete_by: Option, + pub delete_reason: Option, + pub delete_time: Option>, + pub access: Vec, +} + +pub fn into_user_roles(permissions: impl IntoIterator) -> Vec { + permissions + .into_iter() + .map(|record| { + RbacResponse { + scope: record.resource.to_string(), + user_name: record.credential.to_string(), + role_name: match record.permission { + Permission::Read => "consumer", + Permission::Write => "producer", + Permission::Admin => "admin", + } + .to_string(), + create_by: record.requestor.to_string(), + create_reason: record.reason, + create_time: record.time, + delete_by: None, + delete_reason: None, + delete_time: None, + access: match record.permission { + Permission::Read => vec!["read"], + Permission::Write => vec!["read", "write"], + Permission::Admin => vec!["read", "write", "manage"], + } + .into_iter() + .map(ToString::to_string) + .collect(), + } + }) + .collect() +} diff --git a/registry/registry-api/src/api_provider.rs b/registry/registry-api/src/api_provider.rs index 5768986..ec632bc 100644 --- a/registry/registry-api/src/api_provider.rs +++ b/registry/registry-api/src/api_provider.rs @@ -1,17 +1,19 @@ use std::collections::HashSet; use async_trait::async_trait; +use chrono::Utc; use common_utils::{set, Blank}; use log::debug; use registry_provider::{ - Edge, EdgeType, EntityProperty, EntityType, RegistryError, RegistryProvider, + Credential, Edge, EdgeType, EntityProperty, EntityType, Permission, RbacProvider, RbacRecord, + RegistryError, RegistryProvider, }; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::{ - AnchorDef, AnchorFeatureDef, ApiError, DerivedFeatureDef, Entities, Entity, EntityAttributes, - EntityLineage, EntityRef, IntoApiResult, ProjectDef, SourceDef, + into_user_roles, AnchorDef, AnchorFeatureDef, ApiError, DerivedFeatureDef, Entities, Entity, + EntityAttributes, EntityLineage, EntityRef, IntoApiResult, ProjectDef, RbacResponse, SourceDef, }; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -142,9 +144,27 @@ pub enum FeathrApiRequest { GetEntityProject { id_or_name: String, }, + // Raft specific BatchLoad { entities: Vec>, edges: Vec, + permissions: Vec, + }, + // RBAC + GetUserRoles, + AddUserRole { + project_id_or_name: String, + user: Credential, + role: Permission, + requestor: Credential, + reason: String, + }, + DeleteUserRole { + project_id_or_name: String, + user: Credential, + role: Permission, + requestor: Credential, + reason: String, }, } @@ -158,6 +178,8 @@ impl FeathrApiRequest { | Self::CreateAnchorFeature { .. } | Self::CreateProjectDerivedFeature { .. } | Self::BatchLoad { .. } + | Self::AddUserRole { .. } + | Self::DeleteUserRole { .. } ) } } @@ -172,6 +194,7 @@ pub enum FeathrApiResponse { Entity(Entity), Entities(Entities), EntityLineage(EntityLineage), + UserRoles(Vec), } impl FeathrApiResponse { @@ -213,6 +236,14 @@ impl FeathrApiResponse { _ => panic!("Shouldn't reach here"), } } + + pub fn into_user_roles(self) -> poem::Result> { + match self { + FeathrApiResponse::Error(e) => Err(e.into()), + FeathrApiResponse::UserRoles(v) => Ok(v), + _ => panic!("Shouldn't reach here"), + } + } } impl From for FeathrApiResponse { @@ -281,6 +312,12 @@ impl From for FeathrApiResponse { } } +impl From> for FeathrApiResponse { + fn from(v: Vec) -> Self { + Self::UserRoles(into_user_roles(v)) + } +} + impl From> for FeathrApiResponse where FeathrApiResponse: From + From, @@ -301,7 +338,7 @@ pub trait FeathrApiProvider: Sync + Send { #[async_trait] impl FeathrApiProvider for T where - T: RegistryProvider + Sync + Send, + T: RegistryProvider + RbacProvider + Sync + Send, { async fn request(&mut self, request: FeathrApiRequest) -> FeathrApiResponse { fn get_id(t: &T, id_or_name: String) -> Result @@ -496,7 +533,7 @@ where request: FeathrApiRequest, ) -> Result where - T: RegistryProvider, + T: RegistryProvider + RbacProvider, { Ok(match request { FeathrApiRequest::GetProjects { @@ -838,9 +875,11 @@ where ) .into() } - FeathrApiRequest::BatchLoad { entities, edges } => { - this.load_data(entities, edges).await.into() - } + FeathrApiRequest::BatchLoad { + entities, + edges, + permissions, + } => this.load_data(entities, edges, permissions).await.into(), FeathrApiRequest::GetEntityProject { id_or_name } => { let entity = this.get_entity_by_id_or_qualified_name(&id_or_name)?; if entity.entity_type == EntityType::Project { @@ -859,6 +898,44 @@ where .into() } } + FeathrApiRequest::GetUserRoles => this + .get_permissions() + .map_api_error()? + .into(), + FeathrApiRequest::AddUserRole { + project_id_or_name, + user, + role, + requestor, + reason, + } => { + let grant = RbacRecord{ + credential: user, + resource: project_id_or_name.parse()?, + permission: role, + requestor, + reason, + time: Utc::now(), + }; + this.grant_permission(&grant).await.into() + } + FeathrApiRequest::DeleteUserRole { + project_id_or_name, + user, + role, + requestor, + reason, + } => { + let revoke = RbacRecord{ + credential: user, + resource: project_id_or_name.parse()?, + permission: role, + requestor, + reason, + time: Utc::now(), + }; + this.revoke_permission(&revoke).await.into() + } }) } diff --git a/registry/registry-api/src/error.rs b/registry/registry-api/src/error.rs index f372382..8c5b1c1 100644 --- a/registry/registry-api/src/error.rs +++ b/registry/registry-api/src/error.rs @@ -47,6 +47,11 @@ impl From for ApiError { RegistryError::DeleteInUsed(_) => ApiError::BadRequest(format!("{:?}", e)), RegistryError::FtsError(_) => ApiError::InternalError(format!("{:?}", e)), RegistryError::ExternalStorageError(_) => ApiError::InternalError(format!("{:?}", e)), + RegistryError::RbacError(e) => match e { + registry_provider::RbacError::CredentialNotFound(_) => ApiError::BadRequest(format!("{:?}", e)), + registry_provider::RbacError::ResourceNotFound(e) => ApiError::NotFoundError(e), + registry_provider::RbacError::PermissionDenied(_, _, _) => ApiError::Forbidden(format!("{:?}", e)), + } } } } diff --git a/registry/registry-provider/Cargo.toml b/registry/registry-provider/Cargo.toml index 86f14a8..3a4aaed 100644 --- a/registry/registry-provider/Cargo.toml +++ b/registry/registry-provider/Cargo.toml @@ -13,4 +13,4 @@ tokio = "1" serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" uuid = { version = "1", features = ["v4", "serde"] } -chrono = { version = "0.4", features = ["serde"] } +chrono = { version = "0.4.10", features = ["serde"] } diff --git a/registry/registry-provider/src/error.rs b/registry/registry-provider/src/error.rs index 478ea9a..4c04d49 100644 --- a/registry/registry-provider/src/error.rs +++ b/registry/registry-provider/src/error.rs @@ -4,7 +4,7 @@ use serde::{Serialize, Deserialize}; use thiserror::Error; use uuid::Uuid; -use crate::EntityType; +use crate::{EntityType, RbacError}; #[derive(Clone, Debug, Error, Serialize, Deserialize)] pub enum RegistryError { @@ -34,5 +34,8 @@ pub enum RegistryError { #[error("{0}")] ExternalStorageError(String), + + #[error(transparent)] + RbacError(#[from] RbacError), } diff --git a/registry/registry-provider/src/lib.rs b/registry/registry-provider/src/lib.rs index cd752ce..fed87bc 100644 --- a/registry/registry-provider/src/lib.rs +++ b/registry/registry-provider/src/lib.rs @@ -2,11 +2,13 @@ mod error; mod fts; mod models; mod registry; +mod rbac_provider; pub use error::RegistryError; pub use fts::*; pub use models::*; pub use registry::*; +pub use rbac_provider::*; pub trait SerializableRegistry<'de> { fn take_snapshot(&self) -> Result, RegistryError>; diff --git a/registry/registry-provider/src/rbac_provider.rs b/registry/registry-provider/src/rbac_provider.rs new file mode 100644 index 0000000..d1942c0 --- /dev/null +++ b/registry/registry-provider/src/rbac_provider.rs @@ -0,0 +1,131 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::RegistryError; + +use std::str::FromStr; + +use uuid::Uuid; + +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Credential { + RbacDisabled, + User(String), + App(Uuid), +} + +impl ToString for Credential { + fn to_string(&self) -> String { + match self { + Credential::RbacDisabled => "*".to_string(), + Credential::User(user) => user.clone(), + Credential::App(app) => app.to_string(), + } + } +} + +impl FromStr for Credential { + type Err = RegistryError; + + fn from_str(s: &str) -> Result { + if let Ok(uuid) = Uuid::from_str(s) { + Ok(Credential::App(uuid)) + } else { + Ok(Credential::User(s.to_string())) + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Permission { + Read, + Write, + Admin, +} + +impl ToString for Permission { + fn to_string(&self) -> String { + match self { + Permission::Read => "consumer", + Permission::Write => "producer", + Permission::Admin => "admin", + } + .to_string() + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Resource { + Global, + // So far only project is used + NamedEntity(String), + Entity(Uuid), +} + +impl ToString for Resource { + fn to_string(&self) -> String { + match self { + Resource::Global => "global".to_string(), + Resource::NamedEntity(name) => name.clone(), + Resource::Entity(uuid) => uuid.to_string(), + } + } +} + +impl FromStr for Resource { + type Err = RegistryError; + + fn from_str(s: &str) -> Result { + if s.to_lowercase() == "global" { + Ok(Resource::Global) + } else if let Ok(uuid) = Uuid::from_str(s) { + Ok(Resource::Entity(uuid)) + } else { + Ok(Resource::NamedEntity(s.to_string())) + } + } +} + +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum RbacError { + #[error("Credential {0} not found")] + CredentialNotFound(String), + + #[error("Resource {0} not found")] + ResourceNotFound(String), + + #[error("Credential {0} doesn't have {2:?} permission to resource {1:?}")] + PermissionDenied(String, Resource, Permission), +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct RbacRecord { + pub credential: Credential, + pub resource: Resource, + pub permission: Permission, + pub requestor: Credential, + pub reason: String, + pub time: DateTime, +} + +#[async_trait] +pub trait RbacProvider: Send + Sync { + fn check_permission( + &self, + credential: &Credential, + resource: &Resource, + permission: Permission, + ) -> Result; + + fn load_permissions(&mut self, permissions: RI) -> Result<(), RegistryError> + where + RI: Iterator; + + fn get_permissions(&self) -> Result, RegistryError>; + + async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError>; + + async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError>; +} diff --git a/registry/registry-provider/src/registry.rs b/registry/registry-provider/src/registry.rs index a05493c..42e76aa 100644 --- a/registry/registry-provider/src/registry.rs +++ b/registry/registry-provider/src/registry.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EdgeType, Entity, EntityPropMutator, - EntityType, ProjectDef, RegistryError, SourceDef, ToDocString, + EntityType, ProjectDef, RbacRecord, RegistryError, SourceDef, ToDocString, }; pub fn extract_version(name: &str) -> (&str, Option) { @@ -37,6 +37,7 @@ where &mut self, entities: Vec>, edges: Vec, + permissions: Vec, ) -> Result<(), RegistryError>; /** @@ -229,6 +230,19 @@ where .collect()) } + fn get_entity_project_id(&self, id: Uuid) -> Result { + if let Ok(e) = self.get_entity(id) { + if e.entity_type == EntityType::Project { + return Ok(e.id); + } + } + self.get_neighbors(id, EdgeType::BelongsTo)? + .into_iter() + .find(|e| e.entity_type == EntityType::Project) + .ok_or_else(|| RegistryError::InvalidEntity(id)) + .map(|e| e.id) + } + /** * Returns all entities that depend on this one and vice versa, directly and indirectly */ diff --git a/registry/sql-provider/Cargo.toml b/registry/sql-provider/Cargo.toml index f378c03..eeb702b 100644 --- a/registry/sql-provider/Cargo.toml +++ b/registry/sql-provider/Cargo.toml @@ -11,21 +11,26 @@ async-trait = "0.1" anyhow = "1" thiserror = "1" itertools = "0.10" +chrono = { version = "0.4", features = ["serde"] } serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" uuid = { version = "1", features = ["v4", "serde"] } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1.8" +tracing = "0.1" +tracing-futures = "0.2" petgraph = { version = "0.6", features = ["default", "serde-1"] } regex = "1" tantivy = "0.18" -tiberius = { version = "0.9", features = [ +tiberius = { version = "0.10", features = [ + "chrono", "tds73", "tokio-rustls", "sql-browser-tokio", ], default-features = false, optional = true } +tiberius-derive = "0.0.2" bb8 = { version = "0.8", optional = true } -bb8-tiberius = { version = "0.11", features = [ +bb8-tiberius = { version = "0.12", features = [ "default", "tls", ], default-features = false, optional = true } @@ -34,6 +39,7 @@ sqlx = { version = "0.6.0", features = [ "any", "uuid", "macros", + "chrono", ], default-features = false, optional = true } common-utils = { path = "../common-utils" } diff --git a/registry/sql-provider/src/database/mod.rs b/registry/sql-provider/src/database/mod.rs index 35b2e15..414c154 100644 --- a/registry/sql-provider/src/database/mod.rs +++ b/registry/sql-provider/src/database/mod.rs @@ -1,4 +1,4 @@ -use registry_provider::{EntityProperty, Entity, Edge}; +use registry_provider::{EntityProperty, Entity, Edge, RbacRecord}; use crate::Registry; @@ -16,6 +16,10 @@ fn get_edge_table() -> String { std::env::var("EDGE_TABLE").unwrap_or_else(|_| "edges".to_string()) } +fn get_rbac_table() -> String { + std::env::var("RBAC_TABLE").unwrap_or_else(|_| "userroles".to_string()) +} + pub fn attach_storage(registry: &mut Registry) { #[cfg(feature = "mssql")] if mssql::validate_condition() { @@ -29,7 +33,7 @@ pub fn attach_storage(registry: &mut Registry) { } pub async fn load_content( -) -> Result<(Vec>, Vec), anyhow::Error> { +) -> Result<(Vec>, Vec, Vec), anyhow::Error> { #[cfg(feature = "mssql")] if mssql::validate_condition() { return mssql::load_content().await; diff --git a/registry/sql-provider/src/database/mssql.rs b/registry/sql-provider/src/database/mssql.rs index 07bfeff..03cf770 100644 --- a/registry/sql-provider/src/database/mssql.rs +++ b/registry/sql-provider/src/database/mssql.rs @@ -3,15 +3,24 @@ use std::sync::Arc; use async_trait::async_trait; use bb8::{Pool, PooledConnection}; use bb8_tiberius::ConnectionManager; +use chrono::{DateTime, Utc}; use common_utils::{Appliable, Logged}; -use log::debug; use tiberius::{FromSql, Row}; +use tiberius_derive::FromRow; use tokio::sync::{OnceCell, RwLock}; +use tracing::{debug, warn}; use uuid::Uuid; -use registry_provider::{Edge, EdgeType, Entity, EntityProperty, RegistryError}; +use registry_provider::{ + Credential, Edge, EdgeType, Entity, EntityProperty, Permission, RbacRecord, RegistryError, + Resource, +}; -use crate::{db_registry::ExternalStorage, Registry, database::get_entity_table}; +use crate::{ + database::{get_entity_table, get_rbac_table}, + db_registry::ExternalStorage, + Registry, +}; use super::get_edge_table; @@ -51,6 +60,17 @@ impl<'a> FromSql<'a> for EntityPropertyWrapper { } } +#[derive(FromRow)] +#[tiberius_derive(owned)] +struct RbacEntry { + user: String, + resource: String, + permission: String, + requestor: String, + reason: String, + time: String, +} + async fn load_entities( conn: &mut PooledConnection<'static, ConnectionManager>, ) -> Result, anyhow::Error> { @@ -88,6 +108,71 @@ async fn load_edges( Ok(x) } +async fn load_permissions( + conn: &mut PooledConnection<'static, ConnectionManager>, +) -> Result, anyhow::Error> { + let permissions_table = get_rbac_table(); + { + let check = conn + .simple_query(format!("select 1 from {}", permissions_table)) + .await; + if check.is_err() { + warn!("Permissions table not found, RBAC is disabled"); + std::env::remove_var("ENABLE_RBAC"); + return Ok(vec![]); + } + } + debug!("Loading RBAC from {}", permissions_table); + let x: Vec = conn + .simple_query(format!( + r#"SELECT user_name, project_name, role_name, create_by, create_reason, CONVERT(NVARCHAR(max), create_time, 20) from {} + where delete_by is null + order by record_id"#, + permissions_table + )) + .await? + .into_first_result() + .await? + .into_iter() + .map(RbacEntry::from_row) + .collect::, _>>()?; + debug!("{} permissions loaded", x.len()); + x.into_iter() + .map(|entry| { + let credential = match entry.user.parse::() { + Ok(id) => Credential::App(id), + Err(_) => Credential::User(entry.user), + }; + let resource = match entry.resource.as_str() { + "global" => Resource::Global, + _ => Resource::NamedEntity(entry.resource), + }; + let permission = match entry.permission.to_lowercase().as_str() { + "consumer" => Permission::Read, + "producer" => Permission::Write, + "admin" => Permission::Admin, + _ => Permission::Read, + }; + let requestor = match entry.requestor.parse::() { + Ok(id) => Credential::App(id), + Err(_) => Credential::User(entry.requestor), + }; + let reason = entry.reason; + let time: DateTime = DateTime::parse_from_str(&entry.time, "%Y-%m-%d %H:%M:%S") + .map(|t| t.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + Ok(RbacRecord { + credential, + resource, + permission, + requestor, + reason, + time, + }) + }) + .collect() +} + static POOL: OnceCell>>>> = OnceCell::const_new(); async fn init_pool() -> anyhow::Result>>> { @@ -120,11 +205,13 @@ pub fn validate_condition() -> bool { } } -pub async fn load_content() -> Result<(Vec>, Vec), anyhow::Error> { +pub async fn load_content( +) -> Result<(Vec>, Vec, Vec), anyhow::Error> { debug!("Loading registry data from database"); let mut conn = connect().await?; let edges = load_edges(&mut conn).await?; let entities = load_entities(&mut conn).await?; + let permissions = load_permissions(&mut conn).await?; debug!( "{} entities and {} edges loaded", entities.len(), @@ -133,6 +220,7 @@ pub async fn load_content() -> Result<(Vec>, Vec), Ok(( entities.into_iter().map(|e| e.into()).collect(), edges.into_iter().map(|e| e.into()).collect(), + permissions, )) } @@ -159,10 +247,7 @@ impl MsSqlStorage { impl Default for MsSqlStorage { fn default() -> Self { - Self::new( - &get_entity_table(), - &get_edge_table(), - ) + Self::new(&get_entity_table(), &get_edge_table()) } } @@ -289,6 +374,68 @@ impl ExternalStorage for MsSqlStorage { .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; Ok(()) } + + async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> { + let mut conn = connect() + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + conn.execute( + format!( + "INSERT INTO {} + (user_name, role_name, project_name, create_by, create_reason, create_time) + values + (@P1, @P2, @P3, @P4, @P5, SYSUTCDATETIME())", + get_rbac_table() + ) + .apply(|s| { + debug!("SQL is: {}", s); + s + }), + &[ + &grant.credential.to_string(), + &grant.permission.to_string(), + &grant.resource.to_string(), + &grant.requestor.to_string(), + &grant.reason, + ], + ) + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + Ok(()) + } + + async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> { + let mut conn = connect() + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + conn.execute( + format!( + "UPDATE {} + SET delete_by=@P1, delete_reason=@P2, delete_time=SYSUTCDATETIME() + WHERE user_name = @P3 and role_name = @P4 and project_name = @P5 and delete_reason is null", + get_rbac_table() + ) + .apply(|s| { + debug!("SQL is: {}", s); + debug!("P1={}", &revoke.requestor.to_string()); + debug!("P2={}", &revoke.reason); + debug!("P3={}", &revoke.credential.to_string()); + debug!("P4={}", &revoke.permission.to_string()); + debug!("P5={}", &revoke.resource.to_string()); + s + }), + &[ + &revoke.requestor.to_string(), + &revoke.reason, + &revoke.credential.to_string(), + &revoke.permission.to_string(), + &revoke.resource.to_string(), + ], + ) + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + Ok(()) + } } #[cfg(test)] @@ -313,6 +460,7 @@ mod tests { let mut r = Registry::::load( data.guid_entity_map.into_iter().map(|(_, i)| i.into()), data.relations.into_iter().map(|i| i.into()), + vec![].into_iter(), ) .await .unwrap(); diff --git a/registry/sql-provider/src/database/sqlx.rs b/registry/sql-provider/src/database/sqlx.rs index 3d6c111..4d7fe34 100644 --- a/registry/sql-provider/src/database/sqlx.rs +++ b/registry/sql-provider/src/database/sqlx.rs @@ -1,15 +1,23 @@ use std::{str::FromStr, sync::Arc}; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use log::debug; use sqlx::{ any::AnyKind, pool::PoolConnection, Any, AnyConnection, AnyPool, ConnectOptions, Connection, - Executor, + Executor, FromRow, }; -use crate::{database::get_entity_table, db_registry::ExternalStorage, Registry}; +use crate::{ + database::{get_entity_table, get_rbac_table}, + db_registry::ExternalStorage, + Registry, +}; use common_utils::Logged; -use registry_provider::{Edge, EdgeType, Entity, EntityProperty, RegistryError}; +use registry_provider::{ + Credential, Edge, EdgeType, Entity, EntityProperty, Permission, RbacRecord, RegistryError, + Resource, +}; use tokio::sync::{OnceCell, RwLock}; use uuid::Uuid; @@ -122,7 +130,73 @@ async fn load_edges() -> Result, anyhow::Error> { Ok(x) } -pub async fn load_content() -> Result<(Vec>, Vec), anyhow::Error> { +#[derive(FromRow)] +struct RbacEntry { + user: String, + resource: String, + permission: String, + requestor: String, + reason: String, + time: String, +} + +async fn load_permissions() -> Result, anyhow::Error> { + let permissions_table = get_rbac_table(); + debug!("Loading RBAC from {}", permissions_table); + let pool = POOL + .get_or_init(|| async { init_pool().await.ok() }) + .await + .clone() + .ok_or_else(|| anyhow::Error::msg("Environment variable 'CONNECTION_STR' is not set."))?; + debug!("SQLx connection pool acquired, connecting to database"); + let sql = format!( + r#"SELECT project_name, user_name, role_name from {} + where delete_by is null + "#, + permissions_table + ); + let rows: Vec = sqlx::query_as::<_, RbacEntry>(&sql) + .fetch_all(&pool) + .await?; + debug!("{} rows loaded", rows.len()); + rows.into_iter() + .map(|entry| { + let credential = match entry.user.parse::() { + Ok(id) => Credential::App(id), + Err(_) => Credential::User(entry.user), + }; + let resource = match entry.resource.as_str() { + "global" => Resource::Global, + _ => Resource::NamedEntity(entry.resource), + }; + let permission = match entry.permission.as_str() { + "consumer" => Permission::Read, + "producer" => Permission::Write, + "admin" => Permission::Admin, + _ => Permission::Read, + }; + let requestor = match entry.requestor.parse::() { + Ok(id) => Credential::App(id), + Err(_) => Credential::User(entry.requestor), + }; + let reason = entry.reason; + let time: DateTime = DateTime::parse_from_str(&entry.time, "%Y-%m-%d %H:%M:%S") + .map_err(|e| anyhow::Error::new(e))? + .with_timezone(&Utc); + Ok(RbacRecord { + credential, + resource, + permission, + requestor, + reason, + time, + }) + }) + .collect() +} + +pub async fn load_content( +) -> Result<(Vec>, Vec, Vec), anyhow::Error> { let conn_str = std::env::var("CONNECTION_STR")?; if conn_str .parse::<::Options>()? @@ -164,6 +238,7 @@ pub async fn load_content() -> Result<(Vec>, Vec), debug!("Loading registry data from database"); let edges = load_edges().await?; let entities = load_entities().await?; + let permissions = load_permissions().await?; debug!( "{} entities and {} edges loaded", entities.len(), @@ -172,6 +247,7 @@ pub async fn load_content() -> Result<(Vec>, Vec), Ok(( entities.into_iter().map(|e| e.into()).collect(), edges.into_iter().map(|e| e.into()).collect(), + permissions, )) } @@ -420,4 +496,61 @@ impl ExternalStorage for SqlxStorage { .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; Ok(()) } + + async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> { + let mut conn = connect() + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + let now = match conn.kind() { + AnyKind::Postgres => "NOW()", + AnyKind::MySql => "NOW()", + AnyKind::Sqlite => "datetime('now')", + }; + let sql = format!( + "INSERT INTO {} + (user_name, role_name, project_name, create_by, create_reason, create_time) + values + (?, ?, ?, ?, ?, {})", + get_rbac_table(), + now, + ); + let query = sqlx::query(&sql) + .bind(grant.credential.to_string()) + .bind(grant.permission.to_string()) + .bind(grant.resource.to_string()) + .bind(grant.requestor.to_string()) + .bind(grant.reason.clone()); + conn.execute(query) + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + Ok(()) + } + + async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> { + let mut conn = connect() + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + let now = match conn.kind() { + AnyKind::Postgres => "NOW()", + AnyKind::MySql => "NOW()", + AnyKind::Sqlite => "datetime('now')", + }; + let sql = format!( + "UPDATE {} + SET delete_by=?, delete_reason=?, delete_time={} + WHERE user_name = ? and role_name = ? and project_name = ? and deleted_reason is null", + get_rbac_table(), + now, + ); + let query = sqlx::query(&sql) + .bind(revoke.requestor.to_string()) + .bind(revoke.reason.clone()) + .bind(revoke.credential.to_string()) + .bind(revoke.permission.to_string()) + .bind(revoke.resource.to_string()); + conn.execute(query) + .await + .map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?; + Ok(()) + } } diff --git a/registry/sql-provider/src/db_registry.rs b/registry/sql-provider/src/db_registry.rs index 6ad3314..5b7f134 100644 --- a/registry/sql-provider/src/db_registry.rs +++ b/registry/sql-provider/src/db_registry.rs @@ -16,6 +16,7 @@ use tokio::sync::RwLock; use uuid::Uuid; use crate::fts::{FtsError, FtsIndex}; +use crate::rbac_map::RbacMap; const NODE_CAPACITY: usize = 1000; @@ -80,6 +81,16 @@ where edge_type: EdgeType, edge_id: Uuid, ) -> Result<(), RegistryError>; + + async fn grant_permission( + &mut self, + grant: &RbacRecord, + ) -> Result<(), RegistryError>; + + async fn revoke_permission( + &mut self, + revoke: &RbacRecord, + ) -> Result<(), RegistryError>; } #[derive(Debug)] @@ -105,6 +116,8 @@ where // FTS support pub(crate) fts_index: FtsIndex, + pub(crate) permission_map: RbacMap, + // TODO: pub external_storage: Vec>>>, } @@ -121,6 +134,7 @@ where deleted: Default::default(), entry_points: Default::default(), fts_index: Default::default(), + permission_map: Default::default(), external_storage: Default::default(), } } @@ -142,6 +156,7 @@ where pub fn from_content( graph: Graph, Edge, Directed>, deleted: HashSet, + permissions: Vec, ) -> Self { let fts_index = FtsIndex::new(); let node_id_map = graph @@ -171,6 +186,7 @@ where deleted, entry_points, fts_index, + permission_map: Default::default(), external_storage: Default::default(), }; let ids: Vec<_> = ret.node_id_map.keys().copied().collect(); @@ -180,6 +196,7 @@ where }); ret.fts_index.commit().ok(); + ret.load_permissions(permissions.into_iter()).ok(); ret } } @@ -197,6 +214,7 @@ where deleted: Default::default(), entry_points: Default::default(), fts_index: FtsIndex::new(), + permission_map: Default::default(), external_storage: Default::default(), } } @@ -257,10 +275,15 @@ where Ok(()) } - pub(crate) async fn load(entities: NI, edges: EI) -> Result + pub(crate) async fn load( + entities: NI, + edges: EI, + permissions: RI, + ) -> Result where NI: Iterator>, EI: Iterator, + RI: Iterator, { let mut ret = Self { graph: Graph::with_capacity(NODE_CAPACITY * 10, NODE_CAPACITY), @@ -269,9 +292,11 @@ where deleted: HashSet::with_capacity(NODE_CAPACITY), entry_points: Vec::with_capacity(NODE_CAPACITY), fts_index: FtsIndex::new(), + permission_map: Default::default(), external_storage: Default::default(), }; ret.batch_load(entities, edges).await?; + ret.load_permissions(permissions)?; Ok(ret) } @@ -866,6 +891,20 @@ mod tests { ); Ok(()) } + + async fn grant_permission( + &mut self, + _grant: &RbacRecord, + ) -> Result<(), RegistryError> { + Ok(()) + } + + async fn revoke_permission( + &mut self, + _revoke: &RbacRecord, + ) -> Result<(), RegistryError> { + Ok(()) + } } async fn init() -> Registry { diff --git a/registry/sql-provider/src/lib.rs b/registry/sql-provider/src/lib.rs index dc32716..915c8f4 100644 --- a/registry/sql-provider/src/lib.rs +++ b/registry/sql-provider/src/lib.rs @@ -1,6 +1,7 @@ mod database; mod db_registry; mod fts; +mod rbac_map; mod serdes; #[cfg(any(mock, test))] @@ -12,11 +13,11 @@ use std::fmt::Debug; use async_trait::async_trait; pub use database::{attach_storage, load_content}; pub use db_registry::Registry; -use log::debug; +use log::{debug, warn}; use registry_provider::{ - extract_version, AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EdgeType, Entity, - EntityPropMutator, EntityType, ProjectDef, RegistryError, RegistryProvider, SourceDef, - ToDocString, + extract_version, AnchorDef, AnchorFeatureDef, Credential, DerivedFeatureDef, Edge, EdgeType, + Entity, EntityPropMutator, EntityType, Permission, ProjectDef, RbacError, RbacProvider, + RbacRecord, RegistryError, RegistryProvider, Resource, SourceDef, ToDocString, }; use uuid::Uuid; @@ -32,9 +33,12 @@ where &mut self, entities: Vec>, edges: Vec, + permissions: Vec, ) -> Result<(), RegistryError> { self.batch_load(entities.into_iter(), edges.into_iter()) - .await + .await?; + self.load_permissions(permissions.into_iter())?; + Ok(()) } /** @@ -440,3 +444,135 @@ where + 1 } } + +#[async_trait] +impl RbacProvider for Registry +where + EntityProp: Clone + Debug + PartialEq + Eq + EntityPropMutator + ToDocString + Send + Sync, +{ + #[tracing::instrument(level = "trace", skip(self))] + fn check_permission( + &self, + credential: &Credential, + resource: &Resource, + permission: Permission, + ) -> Result { + if credential == &Credential::RbacDisabled { + return Ok(true); + } + // Get corresponding project to the resource + let resource = match resource { + Resource::NamedEntity(name) => { + let id = self.get_entity_id(name)?; + let proj_id = self.get_entity_project_id(id)?; + Resource::Entity(proj_id) + } + Resource::Entity(id) => { + let proj_id = self.get_entity_project_id(*id)?; + Resource::Entity(proj_id) + } + Resource::Global => Resource::Global, + }; + // User must be either Global Admin or Project Admin or having the permission on the resource + Ok(self + .permission_map + .check_permission(credential, &Resource::Global, Permission::Admin) + || self + .permission_map + .check_permission(credential, &resource, Permission::Admin) + || self + .permission_map + .check_permission(credential, &resource, permission)) + } + + fn load_permissions(&mut self, permissions: RI) -> Result<(), RegistryError> + where + RI: Iterator, + { + for mut record in permissions { + let resource = match &record.resource { + Resource::NamedEntity(name) => match name.parse::() { + Ok(id) => Resource::Entity(id), + Err(_) => Resource::Entity(match self.get_entity_by_name(&name, None) { + Some(e) => e.id, + None => { + warn!("Entity {} not found, skipped", name); + continue; + } + }), + }, + _ => record.resource, + }; + record.resource = resource; + self.permission_map.grant_permission(&record); + } + Ok(()) + } + + fn get_permissions(&self) -> Result, RegistryError> { + Ok(self + .permission_map + .iter() + .map(|(credential, permission, resource)| RbacRecord { + credential: credential.to_owned(), + resource: resource.resource.to_owned(), + permission: permission.to_owned(), + requestor: resource.granted_by.to_owned(), + reason: resource.reason.to_owned(), + time: resource.granted_time, + }) + .collect()) + } + + async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> { + // User `granted_by` must have the permission to grant the permission + if !self.check_permission(&grant.requestor, &grant.resource, Permission::Admin)? { + return Err(RbacError::PermissionDenied( + grant.requestor.to_string(), + grant.resource.to_owned(), + grant.permission, + ) + .into()); + } + + // Permission already granted, no need to do anything + if self.check_permission(&grant.credential, &grant.resource, grant.permission)? { + return Ok(()); + } + + // Record permission granting info to the external storages + for storage in self.external_storage.iter() { + storage.write().await.grant_permission(&grant).await?; + } + + // Update local data structure + self.permission_map.grant_permission(&grant); + Ok(()) + } + + async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> { + // User `revoked_by` must have the permission to grant the permission + if !self.check_permission(&revoke.requestor, &revoke.resource, Permission::Admin)? { + return Err(RbacError::PermissionDenied( + revoke.requestor.to_string(), + revoke.resource.to_owned(), + revoke.permission, + ) + .into()); + } + + // Permission not granted, no need to do anything + if !self.check_permission(&revoke.credential, &revoke.resource, revoke.permission)? { + return Ok(()); + } + + // Record permission revoking info to the external storages + for storage in self.external_storage.iter() { + storage.write().await.revoke_permission(&revoke).await?; + } + + // Update local data structure + self.permission_map.revoke_permission(&revoke); + Ok(()) + } +} diff --git a/registry/sql-provider/src/mock.rs b/registry/sql-provider/src/mock.rs index c45d21c..2477110 100644 --- a/registry/sql-provider/src/mock.rs +++ b/registry/sql-provider/src/mock.rs @@ -23,6 +23,7 @@ pub async fn load() -> crate::Registry { let mut r = Registry::::load( data.guid_entity_map.into_iter().map(|(_, i)| i.into()), data.relations.into_iter().map(|i| i.into()), + vec![].into_iter(), ) .await .unwrap(); diff --git a/registry/sql-provider/src/rbac_map.rs b/registry/sql-provider/src/rbac_map.rs new file mode 100644 index 0000000..c9ea2f9 --- /dev/null +++ b/registry/sql-provider/src/rbac_map.rs @@ -0,0 +1,103 @@ +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; + +use chrono::{DateTime, Utc}; +use registry_provider::{Credential, Permission, RbacRecord, Resource}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Eq, Serialize, Deserialize)] +pub(crate) struct RbacResource { + pub(crate) resource: Resource, + pub(crate) granted_by: Credential, + pub(crate) granted_time: DateTime, + pub(crate) reason: String, +} + +impl RbacResource { + pub fn new(resource: Resource, granted_by: Credential, reason: String) -> Self { + RbacResource { + resource, + granted_by, + granted_time: Utc::now(), + reason, + } + } +} + +impl From<&Resource> for RbacResource { + fn from(resource: &Resource) -> Self { + Self { + resource: resource.to_owned(), + granted_by: Credential::RbacDisabled, + granted_time: Utc::now(), + reason: Default::default(), + } + } +} + +impl PartialEq for RbacResource { + fn eq(&self, other: &Self) -> bool { + self.resource == other.resource + } +} + +impl PartialEq for RbacResource { + fn eq(&self, other: &Resource) -> bool { + &self.resource == other + } +} + +impl Hash for RbacResource { + fn hash(&self, state: &mut H) { + self.resource.hash(state); + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct RbacMap { + map: HashMap>>, +} + +impl RbacMap { + pub fn check_permission( + &self, + credential: &Credential, + resource: &Resource, + permission: Permission, + ) -> bool { + self.map + .get(&credential) + .and_then(|map| map.get(&permission)) + .map(|set| set.contains(&resource.into())) + .unwrap_or(false) + } + + pub fn grant_permission(&mut self, grant: &RbacRecord) { + self.map + .entry(grant.credential.clone()) + .or_insert_with(HashMap::new) + .entry(grant.permission) + .or_insert_with(HashSet::new) + .insert(RbacResource::new( + grant.resource.clone(), + grant.requestor.clone(), + grant.reason.clone(), + )); + } + + pub fn revoke_permission(&mut self, revoke: &RbacRecord) { + self.map + .entry(revoke.credential.clone()) + .or_insert_with(HashMap::new) + .entry(revoke.permission) + .or_insert_with(HashSet::new) + .remove(&(&revoke.resource).into()); + } + + pub(crate) fn iter(&self) -> impl Iterator { + self.map.iter().flat_map(|(c, pr)| { + pr.iter() + .flat_map(move |(p, r)| r.iter().map(move |r| (c, p, r))) + }) + } +} diff --git a/registry/sql-provider/src/serdes.rs b/registry/sql-provider/src/serdes.rs index 5e78bb4..0fee633 100644 --- a/registry/sql-provider/src/serdes.rs +++ b/registry/sql-provider/src/serdes.rs @@ -16,9 +16,10 @@ where where S: serde::Serializer, { - let mut entity = serializer.serialize_struct("Registry", 2)?; + let mut entity = serializer.serialize_struct("Registry", 3)?; entity.serialize_field("graph", &self.graph)?; entity.serialize_field("deleted", &self.deleted)?; + entity.serialize_field("permission_map", &self.permission_map.iter().collect::>())?; entity.end() } } @@ -44,6 +45,7 @@ EntityProp: Clone enum Field { Graph, Deleted, + PermissionMap, } struct RegistryVisitor { _t1: std::marker::PhantomData, @@ -77,8 +79,11 @@ EntityProp: Clone let deleted = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(1, &self))?; - Ok(Registry::::from_content( - graph, deleted, + let permission_map = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(2, &self))?; + Ok(Registry::::from_content( + graph, deleted, permission_map, )) } @@ -88,6 +93,7 @@ EntityProp: Clone { let mut graph = None; let mut deleted = None; + let mut permission_map = None; while let Some(key) = map.next_key()? { match key { Field::Graph => { @@ -102,17 +108,24 @@ EntityProp: Clone } deleted = Some(map.next_value()?); } + Field::PermissionMap => { + if permission_map.is_some() { + return Err(de::Error::duplicate_field("permission_map")); + } + permission_map = Some(map.next_value()?); + } } } let graph = graph.ok_or_else(|| de::Error::missing_field("graph"))?; let deleted = deleted.ok_or_else(|| de::Error::missing_field("deleted"))?; + let permission_map = permission_map.ok_or_else(|| de::Error::missing_field("permission_map"))?; Ok(Registry::::from_content( - graph, deleted, + graph, deleted, permission_map, )) } } - const FIELDS: &[&str] = &["graph", "deleted"]; + const FIELDS: &[&str] = &["graph", "deleted", "permission_map"]; deserializer.deserialize_struct( "Registry", FIELDS,