This commit is contained in:
Chen Xu 2022-07-29 16:55:35 +08:00
Родитель bf9a70c746
Коммит 7317274c03
33 изменённых файлов: 1993 добавлений и 104 удалений

218
registry/Cargo.lock сгенерированный
Просмотреть файл

@ -91,6 +91,18 @@ version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704"
[[package]]
name = "async-native-tls"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe"
dependencies = [
"futures-util",
"native-tls",
"thiserror",
"url",
]
[[package]]
name = "async-trait"
version = "0.1.56"
@ -135,6 +147,26 @@ dependencies = [
"winapi",
]
[[package]]
name = "auth"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"common-utils",
"futures",
"jsonwebtoken",
"lazy_static",
"log",
"openssl",
"reqwest",
"serde",
"serde_json",
"serde_with",
"thiserror",
"tokio",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -174,15 +206,15 @@ dependencies = [
[[package]]
name = "bb8-tiberius"
version = "0.11.0"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61780ef76db8989f8fd30d9a63fcb1d7d1b8d7df3b7d4662c09e66474877b0e6"
checksum = "ba49af39f809f71aeec44467238c27b34d1caecc8a83a73906c006b7f1138ca3"
dependencies = [
"async-trait",
"bb8",
"futures",
"thiserror",
"tiberius",
"tiberius 0.10.0",
"tokio",
"tokio-util 0.6.10",
]
@ -339,6 +371,7 @@ version = "0.1.0"
dependencies = [
"dotenv",
"log",
"thiserror",
"tracing-subscriber",
]
@ -516,14 +549,38 @@ dependencies = [
"cipher",
]
[[package]]
name = "darling"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
dependencies = [
"darling_core 0.13.4",
"darling_macro 0.13.4",
]
[[package]]
name = "darling"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02"
dependencies = [
"darling_core",
"darling_macro",
"darling_core 0.14.1",
"darling_macro 0.14.1",
]
[[package]]
name = "darling_core"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
@ -540,13 +597,24 @@ dependencies = [
"syn",
]
[[package]]
name = "darling_macro"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
dependencies = [
"darling_core 0.13.4",
"quote",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5"
dependencies = [
"darling_core",
"darling_core 0.14.1",
"quote",
"syn",
]
@ -1241,6 +1309,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown",
"serde",
]
[[package]]
@ -1285,6 +1354,20 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonwebtoken"
version = "8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aa4b4af834c6cfd35d8763d359661b90f2e45d8f750a0849156c7f4671af09c"
dependencies = [
"base64 0.13.0",
"pem",
"ring",
"serde",
"serde_json",
"simple_asn1",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -1711,6 +1794,20 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "opentls"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f561874f8d6ecfb674fc08863414040c93cc90c0b6963fe679895fab8b65560"
dependencies = [
"futures-util",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"url",
]
[[package]]
name = "os_str_bytes"
version = "6.2.0"
@ -1789,6 +1886,15 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc"
[[package]]
name = "pem"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c64931a1a212348ec4f3b4362585eca7159d0d09cbdf4a7f74f02173596fd4"
dependencies = [
"base64 0.13.0",
]
[[package]]
name = "pem-rfc7468"
version = "0.3.1"
@ -1957,7 +2063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5105e426a8a747fefc4be06ab7705cb63bc3ab1b2db373867e1d83b1d95877a"
dependencies = [
"Inflector",
"darling",
"darling 0.14.1",
"http",
"indexmap",
"mime",
@ -2063,6 +2169,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"auth",
"chrono",
"clap",
"common-utils",
"env_logger",
@ -2242,6 +2350,7 @@ dependencies = [
"async-trait",
"chrono",
"common-utils",
"itertools",
"log",
"poem",
"poem-openapi",
@ -2548,6 +2657,34 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89df7a26519371a3cce44fbb914c2819c84d9b897890987fa3ab096491cc0ea8"
dependencies = [
"base64 0.13.0",
"chrono",
"hex",
"indexmap",
"serde",
"serde_json",
"serde_with_macros",
"time 0.3.11",
]
[[package]]
name = "serde_with_macros"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de337f322382fcdfbb21a014f7c224ee041a23785651db67b9827403178f698f"
dependencies = [
"darling 0.14.1",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_yaml"
version = "0.8.26"
@ -2600,6 +2737,18 @@ dependencies = [
"libc",
]
[[package]]
name = "simple_asn1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"time 0.3.11",
]
[[package]]
name = "slab"
version = "0.4.7"
@ -2674,6 +2823,7 @@ dependencies = [
"async-trait",
"bb8",
"bb8-tiberius",
"chrono",
"common-utils",
"itertools",
"log",
@ -2686,9 +2836,12 @@ dependencies = [
"sqlx",
"tantivy",
"thiserror",
"tiberius",
"tiberius 0.10.0",
"tiberius-derive",
"tokio",
"tokio-stream",
"tracing",
"tracing-futures",
"uuid 1.1.2",
]
@ -2725,6 +2878,7 @@ dependencies = [
"bitflags",
"byteorder",
"bytes 1.2.0",
"chrono",
"crossbeam-queue",
"digest",
"dirs",
@ -2989,9 +3143,37 @@ dependencies = [
[[package]]
name = "tiberius"
version = "0.9.4"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1c6d3d8b3740376841aed0deeca9f98b0607433a213727c0b7c132b20b4ad3"
checksum = "833311bc8e26e96c73ad1b5c1f488c588808c747a318905ec67e43d422ea2c08"
dependencies = [
"async-native-tls",
"async-trait",
"asynchronous-codec",
"byteorder",
"bytes 1.2.0",
"connection-string",
"encoding",
"enumflags2",
"futures",
"futures-sink",
"futures-util",
"num-traits",
"once_cell",
"opentls",
"pin-project-lite",
"pretty-hex",
"thiserror",
"tracing",
"uuid 0.8.2",
"winauth",
]
[[package]]
name = "tiberius"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332ebb88fba7df15f54c9f1332baf284d6c13c6afa3ff612e924db07a5c1d9ce"
dependencies = [
"async-trait",
"asynchronous-codec",
@ -3015,10 +3197,24 @@ dependencies = [
"tokio-rustls",
"tokio-util 0.6.10",
"tracing",
"uuid 0.8.2",
"uuid 1.1.2",
"winauth",
]
[[package]]
name = "tiberius-derive"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8097a74621f85df93f8727700bedbc88426107f9a677e9ca027e056a2a54308f"
dependencies = [
"darling 0.13.4",
"ident_case",
"proc-macro2",
"quote",
"syn",
"tiberius 0.7.3",
]
[[package]]
name = "time"
version = "0.1.44"

Просмотреть файл

@ -8,5 +8,6 @@ members = [
"registry-api",
"registry-cli",
"raft-registry",
"auth",
]

22
registry/auth/Cargo.toml Normal file
Просмотреть файл

@ -0,0 +1,22 @@
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = { version = "2", features = ["chrono"] }
thiserror = "1"
futures = "0.3"
tokio = "1"
chrono = { version = "0.4", features = ["serde"] }
reqwest = { version = "0.11", features = ["json"], default-features = false }
async-trait = "0.1"
lazy_static = "1"
openssl = "0.10"
jsonwebtoken = "8"
common-utils = { path = "../common-utils" }

38
registry/auth/src/lib.rs Normal file
Просмотреть файл

@ -0,0 +1,38 @@
use thiserror::Error;
mod token;
#[derive(Error, Debug)]
pub enum AuthError {
#[error("{0}")]
ReqwestError(String),
#[error("{0}")]
JwtError(String),
#[error(transparent)]
CertError(#[from] openssl::error::ErrorStack),
#[error("Token format is invalid.")]
InvalidToken,
#[error("Token timestamp is invalid.")]
InvalidTimestamp,
#[error("Key('{0}') is not found.")]
KeyNotFound(String),
#[error("Failed to initialize auth lib")]
InitializationError,
}
pub use token::decode_token;
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
let result = 2 + 2;
assert_eq!(result, 4);
}
}

158
registry/auth/src/token.rs Normal file
Просмотреть файл

@ -0,0 +1,158 @@
use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use common_utils::Logged;
use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation};
use openssl::x509::X509;
use serde::{de::DeserializeOwned, Deserialize};
use serde_with::TimestampSeconds;
use tokio::sync::{OnceCell, RwLock};
use crate::AuthError;
impl From<reqwest::Error> for AuthError {
fn from(e: reqwest::Error) -> Self {
Self::ReqwestError(e.to_string())
}
}
impl From<jsonwebtoken::errors::Error> for AuthError {
fn from(e: jsonwebtoken::errors::Error) -> Self {
Self::JwtError(e.to_string())
}
}
pub struct TokenDecoder {
// TODO: Refresh periodically, daily maybe?
keys: HashMap<String, DecodingKey>,
}
impl TokenDecoder {
pub async fn new(base_url: &str) -> Result<Self, AuthError> {
let resp: OpenIdConfiguration = reqwest::get(format!(
"{}/v2.0/.well-known/openid-configuration",
base_url
))
.await?
.json()
.await?;
let cfg: AadKeyConfiguration = reqwest::get(resp.jwks_uri).await?.json().await?;
Ok(Self {
keys: cfg
.keys
.into_iter()
.filter_map(|k| k.into_decoding_key().log().ok())
.collect::<HashMap<_, _>>(),
})
}
pub fn decode_token<T>(&self, token: &str, check_expiration: bool) -> Result<T, AuthError>
where
T: DeserializeOwned,
{
let now = chrono::Utc::now();
#[serde_with::serde_as]
#[derive(Clone, Debug, Deserialize)]
struct Claims<U> {
#[serde_as(as = "TimestampSeconds<i64>")]
nbf: DateTime<Utc>,
#[serde_as(as = "TimestampSeconds<i64>")]
exp: DateTime<Utc>,
#[serde(flatten)]
user_claims: U,
}
let claims: Claims<T> = self.decode_token_claims_no_validation(token.trim())?;
if check_expiration && ((claims.nbf > now) || (claims.exp < now)) {
return Err(AuthError::InvalidTimestamp);
}
Ok(claims.user_claims)
}
fn get_decoding_key(&self, kid: &str) -> Result<&DecodingKey, AuthError> {
let key = self
.keys
.get(kid)
.ok_or_else(|| AuthError::KeyNotFound(kid.to_owned()))?;
Ok(key)
}
fn decode_token_claims_no_validation<T>(&self, token: &str) -> Result<T, AuthError>
where
T: DeserializeOwned,
{
let header = decode_header(token)?;
let kid = &header.kid.or(header.x5t).ok_or(AuthError::InvalidToken)?;
let key = self.get_decoding_key(kid)?;
// TODO: Use 'alg' header
let mut validation = Validation::new(Algorithm::RS256);
validation.validate_exp = false;
let decoded = decode(token, key, &validation)?;
Ok(decoded.claims)
}
}
#[derive(Clone, Debug, Deserialize)]
struct OpenIdConfiguration {
jwks_uri: String,
}
#[allow(dead_code)]
#[derive(Clone, Debug, Deserialize)]
struct AadKey {
kty: String,
#[serde(rename = "use")]
use_: String,
kid: String,
x5t: String,
n: String,
e: String,
x5c: Vec<String>,
issuer: String,
}
impl AadKey {
fn into_decoding_key(self) -> Result<(String, DecodingKey), AuthError> {
let x509 = X509::from_pem(
format!(
"-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----",
self.x5c
.get(0)
.ok_or_else(|| AuthError::KeyNotFound(self.kid.to_owned()))?
)
.as_bytes(),
)?;
let pk = x509.public_key()?.public_key_to_pem()?;
let key = DecodingKey::from_rsa_pem(pk.as_slice())?;
Ok((self.kid, key))
}
}
#[derive(Clone, Debug, Deserialize)]
struct AadKeyConfiguration {
keys: Vec<AadKey>,
}
static DECODER: OnceCell<Option<Arc<RwLock<TokenDecoder>>>> = OnceCell::const_new();
pub async fn decode_token<C>(token: &str) -> Result<C, AuthError>
where
C: DeserializeOwned,
{
DECODER
.get_or_init(|| async {
let base_url = std::env::var("OPENID_BASE_URL")
.unwrap_or("https://login.microsoftonline.com/common".to_string());
TokenDecoder::new(&base_url)
.await
.ok()
.map(|d| Arc::new(RwLock::new(d)))
})
.await
.as_ref()
.ok_or_else(|| AuthError::InitializationError)?
.read()
.await
.decode_token(token, true)
.map_err(|e| e.into())
}

Просмотреть файл

@ -7,5 +7,6 @@ edition = "2021"
[dependencies]
log = "0.4"
thiserror = "1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dotenv = "0.15"

Просмотреть файл

@ -1,6 +1,7 @@
use std::fmt::Debug;
use log::trace;
use thiserror::Error;
/// `set!` macro works like `vec!`, but generates a HashSet.
#[macro_export]
@ -14,6 +15,19 @@ macro_rules! set {
};
}
#[derive(Error, Debug)]
#[error("{0}")]
pub struct StringError(String);
impl StringError {
pub fn new<T>(s: T) -> Self
where
T: ToString,
{
StringError(s.to_string())
}
}
/// Log if `Result` is an error
pub trait Logged {
fn log(self) -> Self;

Просмотреть файл

@ -20,7 +20,7 @@ use poem::{
use poem_openapi::OpenApiService;
use raft_registry::{
management_routes, raft_routes, FeathrApiV1, FeathrApiV2, NodeConfig, RaftRegistryApp,
RaftSequencer,
RaftSequencer, RbacMiddleware,
};
use sql_provider::attach_storage;
@ -176,7 +176,8 @@ async fn main() -> Result<(), anyhow::Error> {
.nest("/v2", api_service_v2)
.with(Tracing)
.with(RaftSequencer::new(app.store.clone()))
.with(Cors::new());
.with(Cors::new())
.with(RbacMiddleware);
let docs_route = Route::new().nest("/v1", ui_v1).nest("/v2", ui_v2);

Просмотреть файл

@ -11,6 +11,7 @@ anyhow = "1"
async-trait = "0.1"
futures-util = "0.3"
thiserror = "1"
chrono = { version = "0.4", features = ["serde"] }
env_logger = "0.9.0"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
@ -19,7 +20,7 @@ tokio = { version="1.0", default-features=false, features=["sync"] }
tracing = "0.1"
tracing-futures = "0.2"
sled = "0.34"
uuid = { version = "1", features = ["v4"] }
uuid = { version = "1", features = ["v4", "serde"] }
walkdir = "2.3"
rand = "0.8"
reqwest = { version = "0.11", features = ["json"] }
@ -30,3 +31,4 @@ common-utils = { path = "../common-utils" }
registry-provider = { path = "../registry-provider" }
sql-provider = { path = "../sql-provider" }
registry-api = { path = "../registry-api" }
auth = { path = "../auth" }

Просмотреть файл

@ -1,30 +1,26 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use log::debug;
use log::trace;
use openraft::error::CheckIsLeaderError;
use openraft::error::InitializeError;
use openraft::raft::ClientWriteRequest;
use openraft::Config;
use openraft::EntryPayload;
use openraft::Node;
use openraft::Raft;
use registry_api::ApiError;
use registry_api::FeathrApiProvider;
use registry_api::FeathrApiRequest;
use registry_api::FeathrApiResponse;
use log::{debug, trace};
use openraft::{
error::{CheckIsLeaderError, InitializeError},
raft::ClientWriteRequest,
Config, EntryPayload, Node, Raft,
};
use poem::error::Forbidden;
use registry_api::{
ApiError, FeathrApiProvider, FeathrApiRequest, FeathrApiResponse, IntoApiResult,
};
use registry_provider::{Credential, Permission, RbacError, RbacProvider};
use sql_provider::load_content;
use tokio::net::ToSocketAddrs;
use crate::ManagementCode;
use crate::RegistryClient;
use crate::RegistryNetwork;
use crate::RegistryNodeId;
use crate::RegistryRaft;
use crate::RegistryStore;
use crate::Restore;
use crate::{
ManagementCode, RegistryClient, RegistryNetwork, RegistryNodeId, RegistryRaft, RegistryStore,
Restore,
};
// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
@ -73,6 +69,37 @@ impl RaftRegistryApp {
}
}
pub async fn check_permission(
&self,
credential: &Credential,
resource: Option<&str>,
permission: Permission,
) -> poem::Result<()> {
let resource = match resource {
Some(s) => s.parse().map_api_error()?,
None => {
// Read/write project list works as long as there is an identity
return Ok(());
}
};
if !self
.store
.state_machine
.read()
.await
.registry
.check_permission(credential, &resource, permission)
.map_api_error()?
{
return Err(Forbidden(RbacError::PermissionDenied(
credential.to_string(),
resource,
permission,
)));
}
Ok(())
}
pub async fn check_code(&self, code: Option<ManagementCode>) -> poem::Result<()> {
trace!("Checking code {:?}", code);
match self.store.get_management_code() {
@ -103,9 +130,16 @@ impl RaftRegistryApp {
}
pub async fn load_data(&self) -> anyhow::Result<()> {
let (entities, edges) = load_content().await?;
let (entities, edges, permission_map) = load_content().await?;
match self
.request(None, FeathrApiRequest::BatchLoad { entities, edges })
.request(
None,
FeathrApiRequest::BatchLoad {
entities,
edges,
permissions: permission_map,
},
)
.await
{
FeathrApiResponse::Error(e) => Err(e)?,
@ -204,12 +238,8 @@ impl RaftRegistryApp {
);
// Remove stale old instance of this node
if let Ok(m) = client.metrics().await {
let mut nodes: BTreeSet<RegistryNodeId> = m
.membership_config
.get_nodes()
.keys()
.copied()
.collect();
let mut nodes: BTreeSet<RegistryNodeId> =
m.membership_config.get_nodes().keys().copied().collect();
debug!("Found nodes: {:?}", nodes);
if nodes.contains(&self.id) {
debug!("Node with id {} exists in the cluster, clean up stale instance", self.id);

Просмотреть файл

@ -8,6 +8,7 @@ mod store;
mod network;
mod app;
mod client;
mod rbac_middleware;
pub type RegistryNodeId = u64;
@ -32,3 +33,4 @@ pub use store::*;
pub use network::*;
pub use app::*;
pub use client::RegistryClient;
pub use rbac_middleware::RbacMiddleware;

Просмотреть файл

@ -1,4 +1,8 @@
use poem::web::Data;
use common_utils::StringError;
use poem::{
error::{BadRequest, InternalServerError},
web::Data,
};
use poem_openapi::{
param::{Header, Path, Query},
payload::Json,
@ -6,8 +10,9 @@ use poem_openapi::{
};
use registry_api::{
AnchorDef, AnchorFeatureDef, CreationResponse, DerivedFeatureDef, Entity, EntityLineage,
FeathrApiRequest, ProjectDef, SourceDef,
FeathrApiRequest, ProjectDef, RbacResponse, SourceDef,
};
use registry_provider::{Credential, Permission};
use uuid::Uuid;
use crate::RaftRegistryApp;
@ -20,6 +25,7 @@ enum ApiTags {
AnchorFeature,
DerivedFeature,
Feature,
Rbac,
}
pub struct FeathrApiV1;
@ -28,12 +34,16 @@ impl FeathrApiV1 {
#[oai(path = "/projects", method = "get", tag = "ApiTags::Project")]
async fn get_projects(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
keyword: Query<Option<String>>,
page: Query<Option<usize>>,
limit: Query<Option<usize>>,
) -> poem::Result<Json<Vec<String>>> {
data.0
.check_permission(credential.0, None, Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -51,10 +61,14 @@ impl FeathrApiV1 {
#[oai(path = "/projects", method = "post", tag = "ApiTags::Project")]
async fn new_project(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
def: Json<ProjectDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, None, Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -62,20 +76,45 @@ impl FeathrApiV1 {
if definition.created_by.is_empty() {
definition.created_by = creator.0.unwrap_or_default();
}
data.0
let ret = data
.0
.request(None, FeathrApiRequest::CreateProject { definition })
.await
.into_uuid_and_version()
.map(|v| Json(v.into()))
.into_uuid_and_version();
// Grant project admin permission to the creator of the project.
if let Ok((uuid, _)) = &ret {
let ret = data
.0
.request(
None,
FeathrApiRequest::AddUserRole {
project_id_or_name: uuid.to_string(),
user: credential.0.clone(),
role: Permission::Admin,
requestor: credential.0.clone(),
reason: "Created project".to_string(),
},
)
.await;
match ret {
registry_api::FeathrApiResponse::Error(e) => return Err(e.into()),
_ => {}
}
}
ret.map(|v| Json(v.into()))
}
#[oai(path = "/projects/:project", method = "get", tag = "ApiTags::Project")]
async fn get_project_lineage(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
) -> poem::Result<Json<EntityLineage>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -95,6 +134,7 @@ impl FeathrApiV1 {
)]
async fn get_project_features(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -102,6 +142,9 @@ impl FeathrApiV1 {
page: Query<Option<usize>>,
limit: Query<Option<usize>>,
) -> poem::Result<Json<Vec<Entity>>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -125,6 +168,7 @@ impl FeathrApiV1 {
)]
async fn get_project_datasources(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -132,6 +176,9 @@ impl FeathrApiV1 {
page: Query<Option<usize>>,
limit: Query<Option<usize>>,
) -> poem::Result<Json<Vec<Entity>>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -155,11 +202,15 @@ impl FeathrApiV1 {
)]
async fn new_datasource(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<SourceDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -187,11 +238,15 @@ impl FeathrApiV1 {
)]
async fn new_derived_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<DerivedFeatureDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -219,6 +274,7 @@ impl FeathrApiV1 {
)]
async fn get_project_anchors(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -226,6 +282,9 @@ impl FeathrApiV1 {
page: Query<Option<usize>>,
limit: Query<Option<usize>>,
) -> poem::Result<Json<Vec<Entity>>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -249,11 +308,15 @@ impl FeathrApiV1 {
)]
async fn new_anchor(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<AnchorDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -281,12 +344,16 @@ impl FeathrApiV1 {
)]
async fn new_anchor_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
anchor: Path<String>,
def: Json<AnchorFeatureDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -311,10 +378,14 @@ impl FeathrApiV1 {
#[oai(path = "/features/:feature", method = "get", tag = "ApiTags::Feature")]
async fn get_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -334,10 +405,14 @@ impl FeathrApiV1 {
)]
async fn get_feature_lineage(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<EntityLineage>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -357,10 +432,14 @@ impl FeathrApiV1 {
)]
async fn get_feature_project(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -372,4 +451,121 @@ impl FeathrApiV1 {
.into_entity()
.map(Json)
}
#[oai(path = "/userroles", method = "get", tag = "ApiTags::Rbac")]
async fn get_user_roles(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
) -> poem::Result<Json<Vec<RbacResponse>>> {
data.0
.check_permission(credential.0, Some("global"), Permission::Admin)
.await?;
data.0
.request(opt_seq.0, FeathrApiRequest::GetUserRoles)
.await
.into_user_roles()
.map(Json)
}
#[oai(
path = "/users/:user/userroles/add",
method = "post",
tag = "ApiTags::Rbac"
)]
async fn add_user_role(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
user: Path<String>,
project: Query<String>,
role: Query<String>,
reason: Query<String>,
) -> poem::Result<Json<String>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Admin)
.await?;
let resp = data
.0
.request(
opt_seq.0,
FeathrApiRequest::AddUserRole {
user: user.0.parse().map_err(|e| BadRequest(e))?,
project_id_or_name: project.0,
role: match role.0.to_lowercase().as_str() {
"admin" => Permission::Admin,
"consumer" => Permission::Read,
"producer" => Permission::Write,
_ => {
return Err(BadRequest(StringError::new(format!(
"invalid role {}",
role.0
))))
}
},
requestor: credential.0.to_owned(),
reason: reason.0,
},
)
.await;
match resp {
registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())),
registry_api::FeathrApiResponse::Error(e) => Err(e.into()),
_ => Err(InternalServerError(StringError::new(
"Internal Server Error",
))),
}
}
#[oai(
path = "/users/:user/userroles/delete",
method = "delete",
tag = "ApiTags::Rbac"
)]
async fn delete_user_role(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
user: Path<String>,
project: Query<String>,
role: Query<String>,
reason: Query<String>,
) -> poem::Result<Json<String>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Admin)
.await?;
let resp = data
.0
.request(
opt_seq.0,
FeathrApiRequest::DeleteUserRole {
user: user.0.parse().map_err(|e| BadRequest(e))?,
project_id_or_name: project.0,
role: match role.0.to_lowercase().as_str() {
"admin" => Permission::Admin,
"consumer" => Permission::Read,
"producer" => Permission::Write,
_ => {
return Err(BadRequest(StringError::new(format!(
"invalid role {}",
role.0
))))
}
},
requestor: credential.0.to_owned(),
reason: reason.0,
},
)
.await;
match resp {
registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())),
registry_api::FeathrApiResponse::Error(e) => Err(e.into()),
_ => Err(InternalServerError(StringError::new(
"Internal Server Error",
))),
}
}
}

Просмотреть файл

@ -1,13 +1,18 @@
use poem::web::Data;
use common_utils::StringError;
use poem::{
error::{BadRequest, InternalServerError},
web::Data,
};
use poem_openapi::{
param::{Header, Path, Query},
payload::Json,
OpenApi, Tags,
};
use registry_api::{
AnchorDef, AnchorFeatureDef, CreationResponse, DerivedFeatureDef, Entities, Entity,
EntityLineage, FeathrApiRequest, ProjectDef, SourceDef, ApiError,
AnchorDef, AnchorFeatureDef, ApiError, CreationResponse, DerivedFeatureDef, Entities, Entity,
EntityLineage, FeathrApiRequest, ProjectDef, RbacResponse, SourceDef,
};
use registry_provider::{Credential, Permission};
use uuid::Uuid;
use crate::RaftRegistryApp;
@ -20,6 +25,7 @@ enum ApiTags {
AnchorFeature,
DerivedFeature,
Feature,
Rbac,
}
pub struct FeathrApiV2;
@ -29,12 +35,16 @@ impl FeathrApiV2 {
#[oai(path = "/projects", method = "get", tag = "ApiTags::Project")]
async fn get_projects(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
keyword: Query<Option<String>>,
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Vec<String>>> {
data.0
.check_permission(credential.0, None, Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -52,10 +62,14 @@ impl FeathrApiV2 {
#[oai(path = "/projects", method = "post", tag = "ApiTags::Project")]
async fn new_project(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
def: Json<ProjectDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, None, Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -63,23 +77,46 @@ impl FeathrApiV2 {
if definition.created_by.is_empty() {
definition.created_by = creator.0.unwrap_or_default();
}
data.0
.request(
None,
FeathrApiRequest::CreateProject { definition },
)
let ret = data
.0
.request(None, FeathrApiRequest::CreateProject { definition })
.await
.into_uuid_and_version()
.map(|v| Json(v.into()))
.into_uuid_and_version();
// Grant project admin permission to the creator of the project.
if let Ok((uuid, _)) = &ret {
let ret = data
.0
.request(
None,
FeathrApiRequest::AddUserRole {
project_id_or_name: uuid.to_string(),
user: credential.0.clone(),
role: Permission::Admin,
requestor: credential.0.clone(),
reason: "Created project".to_string(),
},
)
.await;
match ret {
registry_api::FeathrApiResponse::Error(e) => return Err(e.into()),
_ => {}
}
}
ret.map(|v| Json(v.into()))
}
#[oai(path = "/projects/:project", method = "get", tag = "ApiTags::Project")]
async fn get_project(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -99,10 +136,14 @@ impl FeathrApiV2 {
)]
async fn get_project_lineage(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
) -> poem::Result<Json<EntityLineage>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -122,6 +163,7 @@ impl FeathrApiV2 {
)]
async fn get_project_features(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -129,6 +171,9 @@ impl FeathrApiV2 {
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -151,6 +196,7 @@ impl FeathrApiV2 {
)]
async fn get_datasources(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -158,6 +204,9 @@ impl FeathrApiV2 {
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -180,11 +229,15 @@ impl FeathrApiV2 {
)]
async fn new_datasource(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<SourceDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -212,11 +265,15 @@ impl FeathrApiV2 {
)]
async fn get_datasource(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
source: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -237,11 +294,15 @@ impl FeathrApiV2 {
)]
async fn get_datasource_versions(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
source: Path<String>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -262,12 +323,16 @@ impl FeathrApiV2 {
)]
async fn get_datasource_version(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
source: Path<String>,
version: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -289,6 +354,7 @@ impl FeathrApiV2 {
)]
async fn get_project_derived_features(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -296,6 +362,9 @@ impl FeathrApiV2 {
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -318,11 +387,15 @@ impl FeathrApiV2 {
)]
async fn new_derived_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<DerivedFeatureDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -350,11 +423,15 @@ impl FeathrApiV2 {
)]
async fn get_project_derived_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -375,11 +452,15 @@ impl FeathrApiV2 {
)]
async fn get_project_derived_feature_versions(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
feature: Path<String>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -400,12 +481,16 @@ impl FeathrApiV2 {
)]
async fn get_project_derived_feature_version(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
feature: Path<String>,
version: Path<String>
version: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -427,6 +512,7 @@ impl FeathrApiV2 {
)]
async fn get_project_anchors(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -434,6 +520,9 @@ impl FeathrApiV2 {
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -456,11 +545,15 @@ impl FeathrApiV2 {
)]
async fn new_anchor(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
def: Json<AnchorDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -488,11 +581,15 @@ impl FeathrApiV2 {
)]
async fn get_anchor(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
anchor: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -513,11 +610,15 @@ impl FeathrApiV2 {
)]
async fn get_anchor_versions(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
anchor: Path<String>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -538,12 +639,16 @@ impl FeathrApiV2 {
)]
async fn get_anchor_version(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
anchor: Path<String>,
version: Path<String>
version: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -565,6 +670,7 @@ impl FeathrApiV2 {
)]
async fn get_anchor_features(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -573,6 +679,9 @@ impl FeathrApiV2 {
size: Query<Option<usize>>,
offset: Query<Option<usize>>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -596,12 +705,16 @@ impl FeathrApiV2 {
)]
async fn new_anchor_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-requestor")] creator: Header<Option<String>>,
project: Path<String>,
anchor: Path<String>,
def: Json<AnchorFeatureDef>,
) -> poem::Result<Json<CreationResponse>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Write)
.await?;
let mut definition = def.0;
if definition.id.is_empty() {
definition.id = Uuid::new_v4().to_string();
@ -630,12 +743,16 @@ impl FeathrApiV2 {
)]
async fn get_project_anchor_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
anchor: Path<String>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -657,12 +774,16 @@ impl FeathrApiV2 {
)]
async fn get_project_anchor_feature_versions(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
anchor: Path<String>,
feature: Path<String>,
) -> poem::Result<Json<Entities>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -684,6 +805,7 @@ impl FeathrApiV2 {
)]
async fn get_project_anchor_feature_version(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
project: Path<String>,
@ -691,6 +813,9 @@ impl FeathrApiV2 {
feature: Path<String>,
version: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&project), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -709,10 +834,14 @@ impl FeathrApiV2 {
#[oai(path = "/features/:feature", method = "get", tag = "ApiTags::Feature")]
async fn get_feature(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -732,10 +861,14 @@ impl FeathrApiV2 {
)]
async fn get_feature_lineage(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<EntityLineage>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -755,10 +888,14 @@ impl FeathrApiV2 {
)]
async fn get_feature_project(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
feature: Path<String>,
) -> poem::Result<Json<Entity>> {
data.0
.check_permission(credential.0, Some(&feature), Permission::Read)
.await?;
data.0
.request(
opt_seq.0,
@ -770,16 +907,135 @@ impl FeathrApiV2 {
.into_entity()
.map(Json)
}
#[oai(path = "/userroles", method = "get", tag = "ApiTags::Rbac")]
async fn get_user_roles(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
) -> poem::Result<Json<Vec<RbacResponse>>> {
data.0
.check_permission(credential.0, Some("global"), Permission::Admin)
.await?;
data.0
.request(opt_seq.0, FeathrApiRequest::GetUserRoles)
.await
.into_user_roles()
.map(Json)
}
#[oai(
path = "/users/:user/userroles/add",
method = "post",
tag = "ApiTags::Rbac"
)]
async fn add_user_role(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
user: Path<String>,
project: Query<String>,
role: Query<String>,
reason: Query<String>,
) -> poem::Result<Json<String>> {
data.0
.check_permission(credential.0, Some("global"), Permission::Admin)
.await?;
let resp = data
.0
.request(
opt_seq.0,
FeathrApiRequest::AddUserRole {
user: user.0.parse().map_err(|e| BadRequest(e))?,
project_id_or_name: project.0,
role: match role.0.to_lowercase().as_str() {
"admin" => Permission::Admin,
"consumer" => Permission::Read,
"producer" => Permission::Write,
_ => {
return Err(BadRequest(StringError::new(format!(
"invalid role {}",
role.0
))))
}
},
requestor: credential.0.to_owned(),
reason: reason.0,
},
)
.await;
match resp {
registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())),
registry_api::FeathrApiResponse::Error(e) => Err(e.into()),
_ => Err(InternalServerError(StringError::new(
"Internal Server Error",
))),
}
}
#[oai(
path = "/users/:user/userroles/add",
method = "delete",
tag = "ApiTags::Rbac"
)]
async fn delete_user_role(
&self,
credential: Data<&Credential>,
data: Data<&RaftRegistryApp>,
#[oai(name = "x-registry-opt-seq")] opt_seq: Header<Option<u64>>,
user: Path<String>,
project: Query<String>,
role: Query<String>,
reason: Query<String>,
) -> poem::Result<Json<String>> {
data.0
.check_permission(credential.0, Some("global"), Permission::Admin)
.await?;
let resp = data
.0
.request(
opt_seq.0,
FeathrApiRequest::DeleteUserRole {
user: user.0.parse().map_err(|e| BadRequest(e))?,
project_id_or_name: project.0,
role: match role.0.to_lowercase().as_str() {
"admin" => Permission::Admin,
"consumer" => Permission::Read,
"producer" => Permission::Write,
_ => {
return Err(BadRequest(StringError::new(format!(
"invalid role {}",
role.0
))))
}
},
requestor: credential.0.to_owned(),
reason: reason.0,
},
)
.await;
match resp {
registry_api::FeathrApiResponse::Unit => Ok(Json("OK".to_string())),
registry_api::FeathrApiResponse::Error(e) => Err(e.into()),
_ => Err(InternalServerError(StringError::new(
"Internal Server Error",
))),
}
}
}
fn parse_version<T>(v: T) -> Result<Option<u64>, ApiError>
where
T: AsRef<str>
T: AsRef<str>,
{
if v.as_ref() =="latest" {
if v.as_ref() == "latest" {
return Ok(None);
}
Ok(Some(v.as_ref().parse().map_err(|_| ApiError::BadRequest(format!("Invalid version spec {}", v.as_ref())))?))
Ok(Some(v.as_ref().parse().map_err(|_| {
ApiError::BadRequest(format!("Invalid version spec {}", v.as_ref()))
})?))
}
#[cfg(test)]
@ -795,4 +1051,4 @@ mod tests {
assert_eq!(parse_version("1").unwrap(), Some(1));
assert_eq!(parse_version("42").unwrap(), Some(42));
}
}
}

Просмотреть файл

@ -0,0 +1,102 @@
use std::str::FromStr;
use auth::decode_token;
use common_utils::StringError;
use log::warn;
use poem::{error::{BadRequest, Forbidden}, Endpoint, Middleware, Request, Result};
use registry_provider::Credential;
use serde::Deserialize;
use uuid::Uuid;
pub struct RbacMiddleware;
impl<E: Endpoint> Middleware<E> for RbacMiddleware {
type Output = RbacMiddlewareImpl<E>;
fn transform(&self, ep: E) -> Self::Output {
RbacMiddlewareImpl { ep }
}
}
/// The new endpoint type generated by the TokenMiddleware.
pub struct RbacMiddlewareImpl<E> {
ep: E,
}
const TOKEN_HEADER: &str = "Authorization";
const DEBUG_TOKEN_HEADER: &str = "x-feathr-debug-token";
#[derive(Default, Deserialize)]
#[serde(default)]
struct Claims {
app_id: Option<String>,
preferred_username: Option<String>,
email: Option<String>,
}
impl Claims {
fn get_credential(&self) -> Result<Credential> {
match &self.app_id {
Some(s) => {
let id: Uuid = s.parse().map_err(|e| BadRequest(e))?;
Ok(Credential::App(id))
}
None => match &self.preferred_username {
Some(s) => Ok(Credential::User(s.to_owned())),
None => match &self.email {
Some(s) => Ok(Credential::User(s.to_owned())),
None => Err(BadRequest(StringError::new("Invalid token claims"))),
},
},
}
}
}
#[poem::async_trait]
impl<E: Endpoint> Endpoint for RbacMiddlewareImpl<E> {
type Output = E::Output;
async fn call(&self, mut req: Request) -> Result<Self::Output> {
if std::env::var("ENABLE_RBAC").is_err() {
req.extensions_mut().insert(Credential::RbacDisabled);
} else if std::env::var("FEATHR_ENABLE_RBAC_DEBUG_MUST_NOT_USE_IN_PROD").unwrap_or_default()
== "feathr_rbac_debug_enabled"
{
warn!("RBAC debug enabled");
if let Some(value) = req
.headers()
.get(DEBUG_TOKEN_HEADER)
.and_then(|value| value.to_str().ok())
{
if let Ok(credential) = Credential::from_str(value) {
warn!(
"RBAC debug enabled, got credential from debug header: {:?}",
credential
);
req.extensions_mut().insert(credential);
}
} else if let Some(value) = req
.headers()
.get(TOKEN_HEADER)
.and_then(|value| value.to_str().ok())
{
let value = value.trim_start_matches("Bearer");
let claims: Claims = decode_token(value).await.map_err(|e| BadRequest(e))?;
req.extensions_mut().insert(claims.get_credential()?);
}
} else if let Some(value) = req
.headers()
.get(TOKEN_HEADER)
.and_then(|value| value.to_str().ok())
{
let value = value.trim_start_matches("Bearer");
let claims: Claims = decode_token(value).await.map_err(|e| BadRequest(e))?;
req.extensions_mut().insert(claims.get_credential()?);
} else {
return Err(Forbidden(StringError::new("Missing token")));
}
// call the next endpoint.
self.ep.call(req).await
}
}

Просмотреть файл

@ -11,6 +11,7 @@ anyhow = "1"
chrono = { version = "0.4", features = ["serde"] }
async-trait = "0.1"
thiserror = "1"
itertools = "0.10"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }

Просмотреть файл

@ -9,10 +9,12 @@ use crate::error::ApiError;
mod attributes;
mod edge;
mod entity;
mod rbac;
pub use attributes::*;
pub use edge::*;
pub use entity::*;
pub use rbac::*;
fn parse_uuid(s: &str) -> Result<Uuid, ApiError> {
Uuid::parse_str(s).map_err(|_| ApiError::BadRequest(format!("Invalid GUID `{}`", s)))

Просмотреть файл

@ -0,0 +1,52 @@
use chrono::{DateTime, Utc};
use poem_openapi::Object;
use registry_provider::{Permission, RbacRecord};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, Object)]
#[oai(rename_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub struct RbacResponse {
pub scope: String,
pub user_name: String,
pub role_name: String,
pub create_by: String,
pub create_reason: String,
pub create_time: DateTime<Utc>,
pub delete_by: Option<String>,
pub delete_reason: Option<String>,
pub delete_time: Option<DateTime<Utc>>,
pub access: Vec<String>,
}
pub fn into_user_roles(permissions: impl IntoIterator<Item = RbacRecord>) -> Vec<RbacResponse> {
permissions
.into_iter()
.map(|record| {
RbacResponse {
scope: record.resource.to_string(),
user_name: record.credential.to_string(),
role_name: match record.permission {
Permission::Read => "consumer",
Permission::Write => "producer",
Permission::Admin => "admin",
}
.to_string(),
create_by: record.requestor.to_string(),
create_reason: record.reason,
create_time: record.time,
delete_by: None,
delete_reason: None,
delete_time: None,
access: match record.permission {
Permission::Read => vec!["read"],
Permission::Write => vec!["read", "write"],
Permission::Admin => vec!["read", "write", "manage"],
}
.into_iter()
.map(ToString::to_string)
.collect(),
}
})
.collect()
}

Просмотреть файл

@ -1,17 +1,19 @@
use std::collections::HashSet;
use async_trait::async_trait;
use chrono::Utc;
use common_utils::{set, Blank};
use log::debug;
use registry_provider::{
Edge, EdgeType, EntityProperty, EntityType, RegistryError, RegistryProvider,
Credential, Edge, EdgeType, EntityProperty, EntityType, Permission, RbacProvider, RbacRecord,
RegistryError, RegistryProvider,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{
AnchorDef, AnchorFeatureDef, ApiError, DerivedFeatureDef, Entities, Entity, EntityAttributes,
EntityLineage, EntityRef, IntoApiResult, ProjectDef, SourceDef,
into_user_roles, AnchorDef, AnchorFeatureDef, ApiError, DerivedFeatureDef, Entities, Entity,
EntityAttributes, EntityLineage, EntityRef, IntoApiResult, ProjectDef, RbacResponse, SourceDef,
};
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -142,9 +144,27 @@ pub enum FeathrApiRequest {
GetEntityProject {
id_or_name: String,
},
// Raft specific
BatchLoad {
entities: Vec<registry_provider::Entity<EntityProperty>>,
edges: Vec<Edge>,
permissions: Vec<RbacRecord>,
},
// RBAC
GetUserRoles,
AddUserRole {
project_id_or_name: String,
user: Credential,
role: Permission,
requestor: Credential,
reason: String,
},
DeleteUserRole {
project_id_or_name: String,
user: Credential,
role: Permission,
requestor: Credential,
reason: String,
},
}
@ -158,6 +178,8 @@ impl FeathrApiRequest {
| Self::CreateAnchorFeature { .. }
| Self::CreateProjectDerivedFeature { .. }
| Self::BatchLoad { .. }
| Self::AddUserRole { .. }
| Self::DeleteUserRole { .. }
)
}
}
@ -172,6 +194,7 @@ pub enum FeathrApiResponse {
Entity(Entity),
Entities(Entities),
EntityLineage(EntityLineage),
UserRoles(Vec<RbacResponse>),
}
impl FeathrApiResponse {
@ -213,6 +236,14 @@ impl FeathrApiResponse {
_ => panic!("Shouldn't reach here"),
}
}
pub fn into_user_roles(self) -> poem::Result<Vec<RbacResponse>> {
match self {
FeathrApiResponse::Error(e) => Err(e.into()),
FeathrApiResponse::UserRoles(v) => Ok(v),
_ => panic!("Shouldn't reach here"),
}
}
}
impl From<RegistryError> for FeathrApiResponse {
@ -281,6 +312,12 @@ impl From<EntityLineage> for FeathrApiResponse {
}
}
impl From<Vec<RbacRecord>> for FeathrApiResponse {
fn from(v: Vec<RbacRecord>) -> Self {
Self::UserRoles(into_user_roles(v))
}
}
impl<T, E> From<Result<T, E>> for FeathrApiResponse
where
FeathrApiResponse: From<T> + From<E>,
@ -301,7 +338,7 @@ pub trait FeathrApiProvider: Sync + Send {
#[async_trait]
impl<T> FeathrApiProvider for T
where
T: RegistryProvider<EntityProperty> + Sync + Send,
T: RegistryProvider<EntityProperty> + RbacProvider + Sync + Send,
{
async fn request(&mut self, request: FeathrApiRequest) -> FeathrApiResponse {
fn get_id<T>(t: &T, id_or_name: String) -> Result<Uuid, RegistryError>
@ -496,7 +533,7 @@ where
request: FeathrApiRequest,
) -> Result<FeathrApiResponse, ApiError>
where
T: RegistryProvider<EntityProperty>,
T: RegistryProvider<EntityProperty> + RbacProvider,
{
Ok(match request {
FeathrApiRequest::GetProjects {
@ -838,9 +875,11 @@ where
)
.into()
}
FeathrApiRequest::BatchLoad { entities, edges } => {
this.load_data(entities, edges).await.into()
}
FeathrApiRequest::BatchLoad {
entities,
edges,
permissions,
} => this.load_data(entities, edges, permissions).await.into(),
FeathrApiRequest::GetEntityProject { id_or_name } => {
let entity = this.get_entity_by_id_or_qualified_name(&id_or_name)?;
if entity.entity_type == EntityType::Project {
@ -859,6 +898,44 @@ where
.into()
}
}
FeathrApiRequest::GetUserRoles => this
.get_permissions()
.map_api_error()?
.into(),
FeathrApiRequest::AddUserRole {
project_id_or_name,
user,
role,
requestor,
reason,
} => {
let grant = RbacRecord{
credential: user,
resource: project_id_or_name.parse()?,
permission: role,
requestor,
reason,
time: Utc::now(),
};
this.grant_permission(&grant).await.into()
}
FeathrApiRequest::DeleteUserRole {
project_id_or_name,
user,
role,
requestor,
reason,
} => {
let revoke = RbacRecord{
credential: user,
resource: project_id_or_name.parse()?,
permission: role,
requestor,
reason,
time: Utc::now(),
};
this.revoke_permission(&revoke).await.into()
}
})
}

Просмотреть файл

@ -47,6 +47,11 @@ impl From<RegistryError> for ApiError {
RegistryError::DeleteInUsed(_) => ApiError::BadRequest(format!("{:?}", e)),
RegistryError::FtsError(_) => ApiError::InternalError(format!("{:?}", e)),
RegistryError::ExternalStorageError(_) => ApiError::InternalError(format!("{:?}", e)),
RegistryError::RbacError(e) => match e {
registry_provider::RbacError::CredentialNotFound(_) => ApiError::BadRequest(format!("{:?}", e)),
registry_provider::RbacError::ResourceNotFound(e) => ApiError::NotFoundError(e),
registry_provider::RbacError::PermissionDenied(_, _, _) => ApiError::Forbidden(format!("{:?}", e)),
}
}
}
}

Просмотреть файл

@ -13,4 +13,4 @@ tokio = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4.10", features = ["serde"] }

Просмотреть файл

@ -4,7 +4,7 @@ use serde::{Serialize, Deserialize};
use thiserror::Error;
use uuid::Uuid;
use crate::EntityType;
use crate::{EntityType, RbacError};
#[derive(Clone, Debug, Error, Serialize, Deserialize)]
pub enum RegistryError {
@ -34,5 +34,8 @@ pub enum RegistryError {
#[error("{0}")]
ExternalStorageError(String),
#[error(transparent)]
RbacError(#[from] RbacError),
}

Просмотреть файл

@ -2,11 +2,13 @@ mod error;
mod fts;
mod models;
mod registry;
mod rbac_provider;
pub use error::RegistryError;
pub use fts::*;
pub use models::*;
pub use registry::*;
pub use rbac_provider::*;
pub trait SerializableRegistry<'de> {
fn take_snapshot(&self) -> Result<Vec<u8>, RegistryError>;

Просмотреть файл

@ -0,0 +1,131 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::RegistryError;
use std::str::FromStr;
use uuid::Uuid;
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum Credential {
RbacDisabled,
User(String),
App(Uuid),
}
impl ToString for Credential {
fn to_string(&self) -> String {
match self {
Credential::RbacDisabled => "*".to_string(),
Credential::User(user) => user.clone(),
Credential::App(app) => app.to_string(),
}
}
}
impl FromStr for Credential {
type Err = RegistryError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(uuid) = Uuid::from_str(s) {
Ok(Credential::App(uuid))
} else {
Ok(Credential::User(s.to_string()))
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum Permission {
Read,
Write,
Admin,
}
impl ToString for Permission {
fn to_string(&self) -> String {
match self {
Permission::Read => "consumer",
Permission::Write => "producer",
Permission::Admin => "admin",
}
.to_string()
}
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum Resource {
Global,
// So far only project is used
NamedEntity(String),
Entity(Uuid),
}
impl ToString for Resource {
fn to_string(&self) -> String {
match self {
Resource::Global => "global".to_string(),
Resource::NamedEntity(name) => name.clone(),
Resource::Entity(uuid) => uuid.to_string(),
}
}
}
impl FromStr for Resource {
type Err = RegistryError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.to_lowercase() == "global" {
Ok(Resource::Global)
} else if let Ok(uuid) = Uuid::from_str(s) {
Ok(Resource::Entity(uuid))
} else {
Ok(Resource::NamedEntity(s.to_string()))
}
}
}
#[derive(Error, Debug, Clone, Serialize, Deserialize)]
pub enum RbacError {
#[error("Credential {0} not found")]
CredentialNotFound(String),
#[error("Resource {0} not found")]
ResourceNotFound(String),
#[error("Credential {0} doesn't have {2:?} permission to resource {1:?}")]
PermissionDenied(String, Resource, Permission),
}
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct RbacRecord {
pub credential: Credential,
pub resource: Resource,
pub permission: Permission,
pub requestor: Credential,
pub reason: String,
pub time: DateTime<Utc>,
}
#[async_trait]
pub trait RbacProvider: Send + Sync {
fn check_permission(
&self,
credential: &Credential,
resource: &Resource,
permission: Permission,
) -> Result<bool, RegistryError>;
fn load_permissions<RI>(&mut self, permissions: RI) -> Result<(), RegistryError>
where
RI: Iterator<Item = RbacRecord>;
fn get_permissions(&self) -> Result<Vec<RbacRecord>, RegistryError>;
async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError>;
async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError>;
}

Просмотреть файл

@ -5,7 +5,7 @@ use uuid::Uuid;
use crate::{
AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EdgeType, Entity, EntityPropMutator,
EntityType, ProjectDef, RegistryError, SourceDef, ToDocString,
EntityType, ProjectDef, RbacRecord, RegistryError, SourceDef, ToDocString,
};
pub fn extract_version(name: &str) -> (&str, Option<u64>) {
@ -37,6 +37,7 @@ where
&mut self,
entities: Vec<Entity<EntityProp>>,
edges: Vec<Edge>,
permissions: Vec<RbacRecord>,
) -> Result<(), RegistryError>;
/**
@ -229,6 +230,19 @@ where
.collect())
}
fn get_entity_project_id(&self, id: Uuid) -> Result<Uuid, RegistryError> {
if let Ok(e) = self.get_entity(id) {
if e.entity_type == EntityType::Project {
return Ok(e.id);
}
}
self.get_neighbors(id, EdgeType::BelongsTo)?
.into_iter()
.find(|e| e.entity_type == EntityType::Project)
.ok_or_else(|| RegistryError::InvalidEntity(id))
.map(|e| e.id)
}
/**
* Returns all entities that depend on this one and vice versa, directly and indirectly
*/

Просмотреть файл

@ -11,21 +11,26 @@ async-trait = "0.1"
anyhow = "1"
thiserror = "1"
itertools = "0.10"
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
uuid = { version = "1", features = ["v4", "serde"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.8"
tracing = "0.1"
tracing-futures = "0.2"
petgraph = { version = "0.6", features = ["default", "serde-1"] }
regex = "1"
tantivy = "0.18"
tiberius = { version = "0.9", features = [
tiberius = { version = "0.10", features = [
"chrono",
"tds73",
"tokio-rustls",
"sql-browser-tokio",
], default-features = false, optional = true }
tiberius-derive = "0.0.2"
bb8 = { version = "0.8", optional = true }
bb8-tiberius = { version = "0.11", features = [
bb8-tiberius = { version = "0.12", features = [
"default",
"tls",
], default-features = false, optional = true }
@ -34,6 +39,7 @@ sqlx = { version = "0.6.0", features = [
"any",
"uuid",
"macros",
"chrono",
], default-features = false, optional = true }
common-utils = { path = "../common-utils" }

Просмотреть файл

@ -1,4 +1,4 @@
use registry_provider::{EntityProperty, Entity, Edge};
use registry_provider::{EntityProperty, Entity, Edge, RbacRecord};
use crate::Registry;
@ -16,6 +16,10 @@ fn get_edge_table() -> String {
std::env::var("EDGE_TABLE").unwrap_or_else(|_| "edges".to_string())
}
fn get_rbac_table() -> String {
std::env::var("RBAC_TABLE").unwrap_or_else(|_| "userroles".to_string())
}
pub fn attach_storage(registry: &mut Registry<EntityProperty>) {
#[cfg(feature = "mssql")]
if mssql::validate_condition() {
@ -29,7 +33,7 @@ pub fn attach_storage(registry: &mut Registry<EntityProperty>) {
}
pub async fn load_content(
) -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>), anyhow::Error> {
) -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>, Vec<RbacRecord>), anyhow::Error> {
#[cfg(feature = "mssql")]
if mssql::validate_condition() {
return mssql::load_content().await;

Просмотреть файл

@ -3,15 +3,24 @@ use std::sync::Arc;
use async_trait::async_trait;
use bb8::{Pool, PooledConnection};
use bb8_tiberius::ConnectionManager;
use chrono::{DateTime, Utc};
use common_utils::{Appliable, Logged};
use log::debug;
use tiberius::{FromSql, Row};
use tiberius_derive::FromRow;
use tokio::sync::{OnceCell, RwLock};
use tracing::{debug, warn};
use uuid::Uuid;
use registry_provider::{Edge, EdgeType, Entity, EntityProperty, RegistryError};
use registry_provider::{
Credential, Edge, EdgeType, Entity, EntityProperty, Permission, RbacRecord, RegistryError,
Resource,
};
use crate::{db_registry::ExternalStorage, Registry, database::get_entity_table};
use crate::{
database::{get_entity_table, get_rbac_table},
db_registry::ExternalStorage,
Registry,
};
use super::get_edge_table;
@ -51,6 +60,17 @@ impl<'a> FromSql<'a> for EntityPropertyWrapper {
}
}
#[derive(FromRow)]
#[tiberius_derive(owned)]
struct RbacEntry {
user: String,
resource: String,
permission: String,
requestor: String,
reason: String,
time: String,
}
async fn load_entities(
conn: &mut PooledConnection<'static, ConnectionManager>,
) -> Result<Vec<EntityProperty>, anyhow::Error> {
@ -88,6 +108,71 @@ async fn load_edges(
Ok(x)
}
async fn load_permissions(
conn: &mut PooledConnection<'static, ConnectionManager>,
) -> Result<Vec<RbacRecord>, anyhow::Error> {
let permissions_table = get_rbac_table();
{
let check = conn
.simple_query(format!("select 1 from {}", permissions_table))
.await;
if check.is_err() {
warn!("Permissions table not found, RBAC is disabled");
std::env::remove_var("ENABLE_RBAC");
return Ok(vec![]);
}
}
debug!("Loading RBAC from {}", permissions_table);
let x: Vec<RbacEntry> = conn
.simple_query(format!(
r#"SELECT user_name, project_name, role_name, create_by, create_reason, CONVERT(NVARCHAR(max), create_time, 20) from {}
where delete_by is null
order by record_id"#,
permissions_table
))
.await?
.into_first_result()
.await?
.into_iter()
.map(RbacEntry::from_row)
.collect::<Result<Vec<_>, _>>()?;
debug!("{} permissions loaded", x.len());
x.into_iter()
.map(|entry| {
let credential = match entry.user.parse::<Uuid>() {
Ok(id) => Credential::App(id),
Err(_) => Credential::User(entry.user),
};
let resource = match entry.resource.as_str() {
"global" => Resource::Global,
_ => Resource::NamedEntity(entry.resource),
};
let permission = match entry.permission.to_lowercase().as_str() {
"consumer" => Permission::Read,
"producer" => Permission::Write,
"admin" => Permission::Admin,
_ => Permission::Read,
};
let requestor = match entry.requestor.parse::<Uuid>() {
Ok(id) => Credential::App(id),
Err(_) => Credential::User(entry.requestor),
};
let reason = entry.reason;
let time: DateTime<Utc> = DateTime::parse_from_str(&entry.time, "%Y-%m-%d %H:%M:%S")
.map(|t| t.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
Ok(RbacRecord {
credential,
resource,
permission,
requestor,
reason,
time,
})
})
.collect()
}
static POOL: OnceCell<Option<Arc<RwLock<Pool<ConnectionManager>>>>> = OnceCell::const_new();
async fn init_pool() -> anyhow::Result<Arc<RwLock<Pool<ConnectionManager>>>> {
@ -120,11 +205,13 @@ pub fn validate_condition() -> bool {
}
}
pub async fn load_content() -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>), anyhow::Error> {
pub async fn load_content(
) -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>, Vec<RbacRecord>), anyhow::Error> {
debug!("Loading registry data from database");
let mut conn = connect().await?;
let edges = load_edges(&mut conn).await?;
let entities = load_entities(&mut conn).await?;
let permissions = load_permissions(&mut conn).await?;
debug!(
"{} entities and {} edges loaded",
entities.len(),
@ -133,6 +220,7 @@ pub async fn load_content() -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>),
Ok((
entities.into_iter().map(|e| e.into()).collect(),
edges.into_iter().map(|e| e.into()).collect(),
permissions,
))
}
@ -159,10 +247,7 @@ impl MsSqlStorage {
impl Default for MsSqlStorage {
fn default() -> Self {
Self::new(
&get_entity_table(),
&get_edge_table(),
)
Self::new(&get_entity_table(), &get_edge_table())
}
}
@ -289,6 +374,68 @@ impl ExternalStorage<EntityProperty> for MsSqlStorage {
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> {
let mut conn = connect()
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
conn.execute(
format!(
"INSERT INTO {}
(user_name, role_name, project_name, create_by, create_reason, create_time)
values
(@P1, @P2, @P3, @P4, @P5, SYSUTCDATETIME())",
get_rbac_table()
)
.apply(|s| {
debug!("SQL is: {}", s);
s
}),
&[
&grant.credential.to_string(),
&grant.permission.to_string(),
&grant.resource.to_string(),
&grant.requestor.to_string(),
&grant.reason,
],
)
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> {
let mut conn = connect()
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
conn.execute(
format!(
"UPDATE {}
SET delete_by=@P1, delete_reason=@P2, delete_time=SYSUTCDATETIME()
WHERE user_name = @P3 and role_name = @P4 and project_name = @P5 and delete_reason is null",
get_rbac_table()
)
.apply(|s| {
debug!("SQL is: {}", s);
debug!("P1={}", &revoke.requestor.to_string());
debug!("P2={}", &revoke.reason);
debug!("P3={}", &revoke.credential.to_string());
debug!("P4={}", &revoke.permission.to_string());
debug!("P5={}", &revoke.resource.to_string());
s
}),
&[
&revoke.requestor.to_string(),
&revoke.reason,
&revoke.credential.to_string(),
&revoke.permission.to_string(),
&revoke.resource.to_string(),
],
)
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
}
#[cfg(test)]
@ -313,6 +460,7 @@ mod tests {
let mut r = Registry::<EntityProperty>::load(
data.guid_entity_map.into_iter().map(|(_, i)| i.into()),
data.relations.into_iter().map(|i| i.into()),
vec![].into_iter(),
)
.await
.unwrap();

Просмотреть файл

@ -1,15 +1,23 @@
use std::{str::FromStr, sync::Arc};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::debug;
use sqlx::{
any::AnyKind, pool::PoolConnection, Any, AnyConnection, AnyPool, ConnectOptions, Connection,
Executor,
Executor, FromRow,
};
use crate::{database::get_entity_table, db_registry::ExternalStorage, Registry};
use crate::{
database::{get_entity_table, get_rbac_table},
db_registry::ExternalStorage,
Registry,
};
use common_utils::Logged;
use registry_provider::{Edge, EdgeType, Entity, EntityProperty, RegistryError};
use registry_provider::{
Credential, Edge, EdgeType, Entity, EntityProperty, Permission, RbacRecord, RegistryError,
Resource,
};
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
@ -122,7 +130,73 @@ async fn load_edges() -> Result<Vec<Edge>, anyhow::Error> {
Ok(x)
}
pub async fn load_content() -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>), anyhow::Error> {
#[derive(FromRow)]
struct RbacEntry {
user: String,
resource: String,
permission: String,
requestor: String,
reason: String,
time: String,
}
async fn load_permissions() -> Result<Vec<RbacRecord>, anyhow::Error> {
let permissions_table = get_rbac_table();
debug!("Loading RBAC from {}", permissions_table);
let pool = POOL
.get_or_init(|| async { init_pool().await.ok() })
.await
.clone()
.ok_or_else(|| anyhow::Error::msg("Environment variable 'CONNECTION_STR' is not set."))?;
debug!("SQLx connection pool acquired, connecting to database");
let sql = format!(
r#"SELECT project_name, user_name, role_name from {}
where delete_by is null
"#,
permissions_table
);
let rows: Vec<RbacEntry> = sqlx::query_as::<_, RbacEntry>(&sql)
.fetch_all(&pool)
.await?;
debug!("{} rows loaded", rows.len());
rows.into_iter()
.map(|entry| {
let credential = match entry.user.parse::<Uuid>() {
Ok(id) => Credential::App(id),
Err(_) => Credential::User(entry.user),
};
let resource = match entry.resource.as_str() {
"global" => Resource::Global,
_ => Resource::NamedEntity(entry.resource),
};
let permission = match entry.permission.as_str() {
"consumer" => Permission::Read,
"producer" => Permission::Write,
"admin" => Permission::Admin,
_ => Permission::Read,
};
let requestor = match entry.requestor.parse::<Uuid>() {
Ok(id) => Credential::App(id),
Err(_) => Credential::User(entry.requestor),
};
let reason = entry.reason;
let time: DateTime<Utc> = DateTime::parse_from_str(&entry.time, "%Y-%m-%d %H:%M:%S")
.map_err(|e| anyhow::Error::new(e))?
.with_timezone(&Utc);
Ok(RbacRecord {
credential,
resource,
permission,
requestor,
reason,
time,
})
})
.collect()
}
pub async fn load_content(
) -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>, Vec<RbacRecord>), anyhow::Error> {
let conn_str = std::env::var("CONNECTION_STR")?;
if conn_str
.parse::<<AnyConnection as Connection>::Options>()?
@ -164,6 +238,7 @@ pub async fn load_content() -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>),
debug!("Loading registry data from database");
let edges = load_edges().await?;
let entities = load_entities().await?;
let permissions = load_permissions().await?;
debug!(
"{} entities and {} edges loaded",
entities.len(),
@ -172,6 +247,7 @@ pub async fn load_content() -> Result<(Vec<Entity<EntityProperty>>, Vec<Edge>),
Ok((
entities.into_iter().map(|e| e.into()).collect(),
edges.into_iter().map(|e| e.into()).collect(),
permissions,
))
}
@ -420,4 +496,61 @@ impl ExternalStorage<EntityProperty> for SqlxStorage {
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> {
let mut conn = connect()
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
let now = match conn.kind() {
AnyKind::Postgres => "NOW()",
AnyKind::MySql => "NOW()",
AnyKind::Sqlite => "datetime('now')",
};
let sql = format!(
"INSERT INTO {}
(user_name, role_name, project_name, create_by, create_reason, create_time)
values
(?, ?, ?, ?, ?, {})",
get_rbac_table(),
now,
);
let query = sqlx::query(&sql)
.bind(grant.credential.to_string())
.bind(grant.permission.to_string())
.bind(grant.resource.to_string())
.bind(grant.requestor.to_string())
.bind(grant.reason.clone());
conn.execute(query)
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> {
let mut conn = connect()
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
let now = match conn.kind() {
AnyKind::Postgres => "NOW()",
AnyKind::MySql => "NOW()",
AnyKind::Sqlite => "datetime('now')",
};
let sql = format!(
"UPDATE {}
SET delete_by=?, delete_reason=?, delete_time={}
WHERE user_name = ? and role_name = ? and project_name = ? and deleted_reason is null",
get_rbac_table(),
now,
);
let query = sqlx::query(&sql)
.bind(revoke.requestor.to_string())
.bind(revoke.reason.clone())
.bind(revoke.credential.to_string())
.bind(revoke.permission.to_string())
.bind(revoke.resource.to_string());
conn.execute(query)
.await
.map_err(|e| RegistryError::ExternalStorageError(format!("{:?}", e)))?;
Ok(())
}
}

Просмотреть файл

@ -16,6 +16,7 @@ use tokio::sync::RwLock;
use uuid::Uuid;
use crate::fts::{FtsError, FtsIndex};
use crate::rbac_map::RbacMap;
const NODE_CAPACITY: usize = 1000;
@ -80,6 +81,16 @@ where
edge_type: EdgeType,
edge_id: Uuid,
) -> Result<(), RegistryError>;
async fn grant_permission(
&mut self,
grant: &RbacRecord,
) -> Result<(), RegistryError>;
async fn revoke_permission(
&mut self,
revoke: &RbacRecord,
) -> Result<(), RegistryError>;
}
#[derive(Debug)]
@ -105,6 +116,8 @@ where
// FTS support
pub(crate) fts_index: FtsIndex,
pub(crate) permission_map: RbacMap,
// TODO:
pub external_storage: Vec<Arc<RwLock<dyn ExternalStorage<EntityProp>>>>,
}
@ -121,6 +134,7 @@ where
deleted: Default::default(),
entry_points: Default::default(),
fts_index: Default::default(),
permission_map: Default::default(),
external_storage: Default::default(),
}
}
@ -142,6 +156,7 @@ where
pub fn from_content(
graph: Graph<Entity<EntityProp>, Edge, Directed>,
deleted: HashSet<Uuid>,
permissions: Vec<RbacRecord>,
) -> Self {
let fts_index = FtsIndex::new();
let node_id_map = graph
@ -171,6 +186,7 @@ where
deleted,
entry_points,
fts_index,
permission_map: Default::default(),
external_storage: Default::default(),
};
let ids: Vec<_> = ret.node_id_map.keys().copied().collect();
@ -180,6 +196,7 @@ where
});
ret.fts_index.commit().ok();
ret.load_permissions(permissions.into_iter()).ok();
ret
}
}
@ -197,6 +214,7 @@ where
deleted: Default::default(),
entry_points: Default::default(),
fts_index: FtsIndex::new(),
permission_map: Default::default(),
external_storage: Default::default(),
}
}
@ -257,10 +275,15 @@ where
Ok(())
}
pub(crate) async fn load<NI, EI>(entities: NI, edges: EI) -> Result<Self, RegistryError>
pub(crate) async fn load<NI, EI, RI>(
entities: NI,
edges: EI,
permissions: RI,
) -> Result<Self, RegistryError>
where
NI: Iterator<Item = Entity<EntityProp>>,
EI: Iterator<Item = Edge>,
RI: Iterator<Item = RbacRecord>,
{
let mut ret = Self {
graph: Graph::with_capacity(NODE_CAPACITY * 10, NODE_CAPACITY),
@ -269,9 +292,11 @@ where
deleted: HashSet::with_capacity(NODE_CAPACITY),
entry_points: Vec::with_capacity(NODE_CAPACITY),
fts_index: FtsIndex::new(),
permission_map: Default::default(),
external_storage: Default::default(),
};
ret.batch_load(entities, edges).await?;
ret.load_permissions(permissions)?;
Ok(ret)
}
@ -866,6 +891,20 @@ mod tests {
);
Ok(())
}
async fn grant_permission(
&mut self,
_grant: &RbacRecord,
) -> Result<(), RegistryError> {
Ok(())
}
async fn revoke_permission(
&mut self,
_revoke: &RbacRecord,
) -> Result<(), RegistryError> {
Ok(())
}
}
async fn init() -> Registry<DummyEntityProp> {

Просмотреть файл

@ -1,6 +1,7 @@
mod database;
mod db_registry;
mod fts;
mod rbac_map;
mod serdes;
#[cfg(any(mock, test))]
@ -12,11 +13,11 @@ use std::fmt::Debug;
use async_trait::async_trait;
pub use database::{attach_storage, load_content};
pub use db_registry::Registry;
use log::debug;
use log::{debug, warn};
use registry_provider::{
extract_version, AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EdgeType, Entity,
EntityPropMutator, EntityType, ProjectDef, RegistryError, RegistryProvider, SourceDef,
ToDocString,
extract_version, AnchorDef, AnchorFeatureDef, Credential, DerivedFeatureDef, Edge, EdgeType,
Entity, EntityPropMutator, EntityType, Permission, ProjectDef, RbacError, RbacProvider,
RbacRecord, RegistryError, RegistryProvider, Resource, SourceDef, ToDocString,
};
use uuid::Uuid;
@ -32,9 +33,12 @@ where
&mut self,
entities: Vec<Entity<EntityProp>>,
edges: Vec<Edge>,
permissions: Vec<RbacRecord>,
) -> Result<(), RegistryError> {
self.batch_load(entities.into_iter(), edges.into_iter())
.await
.await?;
self.load_permissions(permissions.into_iter())?;
Ok(())
}
/**
@ -440,3 +444,135 @@ where
+ 1
}
}
#[async_trait]
impl<EntityProp> RbacProvider for Registry<EntityProp>
where
EntityProp: Clone + Debug + PartialEq + Eq + EntityPropMutator + ToDocString + Send + Sync,
{
#[tracing::instrument(level = "trace", skip(self))]
fn check_permission(
&self,
credential: &Credential,
resource: &Resource,
permission: Permission,
) -> Result<bool, RegistryError> {
if credential == &Credential::RbacDisabled {
return Ok(true);
}
// Get corresponding project to the resource
let resource = match resource {
Resource::NamedEntity(name) => {
let id = self.get_entity_id(name)?;
let proj_id = self.get_entity_project_id(id)?;
Resource::Entity(proj_id)
}
Resource::Entity(id) => {
let proj_id = self.get_entity_project_id(*id)?;
Resource::Entity(proj_id)
}
Resource::Global => Resource::Global,
};
// User must be either Global Admin or Project Admin or having the permission on the resource
Ok(self
.permission_map
.check_permission(credential, &Resource::Global, Permission::Admin)
|| self
.permission_map
.check_permission(credential, &resource, Permission::Admin)
|| self
.permission_map
.check_permission(credential, &resource, permission))
}
fn load_permissions<RI>(&mut self, permissions: RI) -> Result<(), RegistryError>
where
RI: Iterator<Item = RbacRecord>,
{
for mut record in permissions {
let resource = match &record.resource {
Resource::NamedEntity(name) => match name.parse::<Uuid>() {
Ok(id) => Resource::Entity(id),
Err(_) => Resource::Entity(match self.get_entity_by_name(&name, None) {
Some(e) => e.id,
None => {
warn!("Entity {} not found, skipped", name);
continue;
}
}),
},
_ => record.resource,
};
record.resource = resource;
self.permission_map.grant_permission(&record);
}
Ok(())
}
fn get_permissions(&self) -> Result<Vec<RbacRecord>, RegistryError> {
Ok(self
.permission_map
.iter()
.map(|(credential, permission, resource)| RbacRecord {
credential: credential.to_owned(),
resource: resource.resource.to_owned(),
permission: permission.to_owned(),
requestor: resource.granted_by.to_owned(),
reason: resource.reason.to_owned(),
time: resource.granted_time,
})
.collect())
}
async fn grant_permission(&mut self, grant: &RbacRecord) -> Result<(), RegistryError> {
// User `granted_by` must have the permission to grant the permission
if !self.check_permission(&grant.requestor, &grant.resource, Permission::Admin)? {
return Err(RbacError::PermissionDenied(
grant.requestor.to_string(),
grant.resource.to_owned(),
grant.permission,
)
.into());
}
// Permission already granted, no need to do anything
if self.check_permission(&grant.credential, &grant.resource, grant.permission)? {
return Ok(());
}
// Record permission granting info to the external storages
for storage in self.external_storage.iter() {
storage.write().await.grant_permission(&grant).await?;
}
// Update local data structure
self.permission_map.grant_permission(&grant);
Ok(())
}
async fn revoke_permission(&mut self, revoke: &RbacRecord) -> Result<(), RegistryError> {
// User `revoked_by` must have the permission to grant the permission
if !self.check_permission(&revoke.requestor, &revoke.resource, Permission::Admin)? {
return Err(RbacError::PermissionDenied(
revoke.requestor.to_string(),
revoke.resource.to_owned(),
revoke.permission,
)
.into());
}
// Permission not granted, no need to do anything
if !self.check_permission(&revoke.credential, &revoke.resource, revoke.permission)? {
return Ok(());
}
// Record permission revoking info to the external storages
for storage in self.external_storage.iter() {
storage.write().await.revoke_permission(&revoke).await?;
}
// Update local data structure
self.permission_map.revoke_permission(&revoke);
Ok(())
}
}

Просмотреть файл

@ -23,6 +23,7 @@ pub async fn load() -> crate::Registry<EntityProperty> {
let mut r = Registry::<EntityProperty>::load(
data.guid_entity_map.into_iter().map(|(_, i)| i.into()),
data.relations.into_iter().map(|i| i.into()),
vec![].into_iter(),
)
.await
.unwrap();

Просмотреть файл

@ -0,0 +1,103 @@
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use chrono::{DateTime, Utc};
use registry_provider::{Credential, Permission, RbacRecord, Resource};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, Serialize, Deserialize)]
pub(crate) struct RbacResource {
pub(crate) resource: Resource,
pub(crate) granted_by: Credential,
pub(crate) granted_time: DateTime<Utc>,
pub(crate) reason: String,
}
impl RbacResource {
pub fn new(resource: Resource, granted_by: Credential, reason: String) -> Self {
RbacResource {
resource,
granted_by,
granted_time: Utc::now(),
reason,
}
}
}
impl From<&Resource> for RbacResource {
fn from(resource: &Resource) -> Self {
Self {
resource: resource.to_owned(),
granted_by: Credential::RbacDisabled,
granted_time: Utc::now(),
reason: Default::default(),
}
}
}
impl PartialEq for RbacResource {
fn eq(&self, other: &Self) -> bool {
self.resource == other.resource
}
}
impl PartialEq<Resource> for RbacResource {
fn eq(&self, other: &Resource) -> bool {
&self.resource == other
}
}
impl Hash for RbacResource {
fn hash<H: Hasher>(&self, state: &mut H) {
self.resource.hash(state);
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct RbacMap {
map: HashMap<Credential, HashMap<Permission, HashSet<RbacResource>>>,
}
impl RbacMap {
pub fn check_permission(
&self,
credential: &Credential,
resource: &Resource,
permission: Permission,
) -> bool {
self.map
.get(&credential)
.and_then(|map| map.get(&permission))
.map(|set| set.contains(&resource.into()))
.unwrap_or(false)
}
pub fn grant_permission(&mut self, grant: &RbacRecord) {
self.map
.entry(grant.credential.clone())
.or_insert_with(HashMap::new)
.entry(grant.permission)
.or_insert_with(HashSet::new)
.insert(RbacResource::new(
grant.resource.clone(),
grant.requestor.clone(),
grant.reason.clone(),
));
}
pub fn revoke_permission(&mut self, revoke: &RbacRecord) {
self.map
.entry(revoke.credential.clone())
.or_insert_with(HashMap::new)
.entry(revoke.permission)
.or_insert_with(HashSet::new)
.remove(&(&revoke.resource).into());
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Credential, &Permission, &RbacResource)> {
self.map.iter().flat_map(|(c, pr)| {
pr.iter()
.flat_map(move |(p, r)| r.iter().map(move |r| (c, p, r)))
})
}
}

Просмотреть файл

@ -16,9 +16,10 @@ where
where
S: serde::Serializer,
{
let mut entity = serializer.serialize_struct("Registry", 2)?;
let mut entity = serializer.serialize_struct("Registry", 3)?;
entity.serialize_field("graph", &self.graph)?;
entity.serialize_field("deleted", &self.deleted)?;
entity.serialize_field("permission_map", &self.permission_map.iter().collect::<Vec<_>>())?;
entity.end()
}
}
@ -44,6 +45,7 @@ EntityProp: Clone
enum Field {
Graph,
Deleted,
PermissionMap,
}
struct RegistryVisitor<EntityProp> {
_t1: std::marker::PhantomData<EntityProp>,
@ -77,8 +79,11 @@ EntityProp: Clone
let deleted = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &self))?;
Ok(Registry::<EntityProp>::from_content(
graph, deleted,
let permission_map = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(2, &self))?;
Ok(Registry::<EntityProp>::from_content(
graph, deleted, permission_map,
))
}
@ -88,6 +93,7 @@ EntityProp: Clone
{
let mut graph = None;
let mut deleted = None;
let mut permission_map = None;
while let Some(key) = map.next_key()? {
match key {
Field::Graph => {
@ -102,17 +108,24 @@ EntityProp: Clone
}
deleted = Some(map.next_value()?);
}
Field::PermissionMap => {
if permission_map.is_some() {
return Err(de::Error::duplicate_field("permission_map"));
}
permission_map = Some(map.next_value()?);
}
}
}
let graph = graph.ok_or_else(|| de::Error::missing_field("graph"))?;
let deleted = deleted.ok_or_else(|| de::Error::missing_field("deleted"))?;
let permission_map = permission_map.ok_or_else(|| de::Error::missing_field("permission_map"))?;
Ok(Registry::<EntityProp>::from_content(
graph, deleted,
graph, deleted, permission_map,
))
}
}
const FIELDS: &[&str] = &["graph", "deleted"];
const FIELDS: &[&str] = &["graph", "deleted", "permission_map"];
deserializer.deserialize_struct(
"Registry",
FIELDS,