This commit is contained in:
Chen Xu 2022-07-13 21:11:25 +08:00
Родитель 0e68de1427
Коммит 4de1d30634
6 изменённых файлов: 276 добавлений и 324 удалений

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "feathr"
version = "0.2.9"
version = "0.2.10"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Просмотреть файл

@ -1,26 +1,45 @@
use std::sync::Arc;
use async_trait::async_trait;
use azure_core::auth::TokenCredential;
use azure_identity::{DefaultAzureCredential, DefaultAzureCredentialBuilder};
use log::debug;
use reqwest::RequestBuilder;
use uuid::Uuid;
use crate::{Error, FeatureRegistry, VarSource};
use super::api_models::{self, CreationResponse};
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct FeathrApiClient {
registry_endpoint: String,
client: reqwest::Client,
version: usize,
credential: Option<Arc<DefaultAzureCredential>>,
}
impl std::fmt::Debug for FeathrApiClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FeathrApiClient")
.field("registry_endpoint", &self.registry_endpoint)
.field("client", &self.client)
.field("version", &self.version)
.finish()
}
}
impl FeathrApiClient {
pub fn new(registry_url: &str, version: usize) -> Self {
pub fn new(registry_url: &str, version: usize, auth: bool) -> Self {
Self {
registry_endpoint: registry_url.to_string(),
client: Default::default(),
version,
credential: if auth {
Some(Arc::new(DefaultAzureCredentialBuilder::new().build()))
} else {
None
},
}
}
/**
@ -29,6 +48,12 @@ impl FeathrApiClient {
pub async fn from_var_source(
var_source: Arc<dyn VarSource + Send + Sync>,
) -> Result<Self, crate::Error> {
let auth: bool = var_source
.get_environment_variable(&["feature_registry", "auth"])
.await
.unwrap_or("true".to_string())
.parse()
.map_err(|e| crate::Error::InvalidConfig(format!("Invalid api_version, {}", e)))?;
Ok(Self {
registry_endpoint: var_source
.get_environment_variable(&["feature_registry", "api_endpoint"])
@ -40,6 +65,29 @@ impl FeathrApiClient {
.unwrap_or("1".to_string())
.parse()
.map_err(|e| crate::Error::InvalidConfig(format!("Invalid api_version, {}", e)))?,
credential: if auth {
Some(Arc::new(DefaultAzureCredentialBuilder::new().build()))
} else {
None
},
})
}
async fn auth(&self, builder: RequestBuilder) -> Result<RequestBuilder, Error> {
Ok(if let Some(cred) = self.credential.clone() {
debug!("Acquiring token");
match cred.get_token("https://graph.microsoft.com/").await {
Ok(res) => {
debug!("Token acquired");
builder.bearer_auth(res.token.secret())
}
Err(e) => {
debug!("Failed to acquire token, error is {}", e);
builder
}
}
} else {
builder
})
}
}
@ -51,10 +99,20 @@ impl FeatureRegistry for FeathrApiClient {
let url = match self.version {
1 => format!("{}/projects/{}", self.registry_endpoint, name),
2 => format!("{}/projects/{}/lineage", self.registry_endpoint, name),
_ => Err(crate::Error::InvalidConfig(format!("Unsupported api_version {}", self.version)))?,
_ => Err(crate::Error::InvalidConfig(format!(
"Unsupported api_version {}",
self.version
)))?,
};
debug!("URL: {}", url);
Ok(self.client.get(url).send().await?.json().await?)
Ok(self
.auth(self.client.get(url))
.await?
.send()
.await?
.error_for_status()?
.json()
.await?)
}
async fn new_project(&self, definition: api_models::ProjectDef) -> Result<(Uuid, u64), Error> {
@ -64,8 +122,8 @@ impl FeatureRegistry for FeathrApiClient {
serde_json::to_string(&definition).unwrap()
);
let r: CreationResponse = self
.client
.post(url)
.auth(self.client.post(url))
.await?
.json(&definition)
.send()
.await?
@ -87,8 +145,8 @@ impl FeatureRegistry for FeathrApiClient {
);
debug!("SourceDef: {}", serde_json::to_string(&definition).unwrap());
let r: CreationResponse = self
.client
.post(url)
.auth(self.client.post(url))
.await?
.json(&definition)
.send()
.await?
@ -107,8 +165,8 @@ impl FeatureRegistry for FeathrApiClient {
let url = format!("{}/projects/{}/anchors", self.registry_endpoint, project_id);
debug!("AnchorDef: {}", serde_json::to_string(&definition).unwrap());
let r: CreationResponse = self
.client
.post(url)
.auth(self.client.post(url))
.await?
.json(&definition)
.send()
.await?
@ -134,8 +192,8 @@ impl FeatureRegistry for FeathrApiClient {
serde_json::to_string(&definition).unwrap()
);
let r: CreationResponse = self
.client
.post(url)
.auth(self.client.post(url))
.await?
.json(&definition)
.send()
.await?
@ -160,8 +218,8 @@ impl FeatureRegistry for FeathrApiClient {
serde_json::to_string(&definition).unwrap()
);
let r: CreationResponse = self
.client
.post(url)
.auth(self.client.post(url))
.await?
.json(&definition)
.send()
.await?

4
python/Cargo.lock сгенерированный
Просмотреть файл

@ -421,7 +421,7 @@ dependencies = [
[[package]]
name = "feathr"
version = "0.2.9"
version = "0.2.10"
dependencies = [
"async-trait",
"azure_core",
@ -453,7 +453,7 @@ dependencies = [
[[package]]
name = "feathrs"
version = "0.2.9"
version = "0.2.10"
dependencies = [
"chrono",
"feathr",

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "feathrs"
version = "0.2.9"
version = "0.2.10"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Просмотреть файл

@ -37,6 +37,14 @@ where
}
}
fn block_on<F: Future>(future: F) -> F::Output {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(future)
}
#[pyclass]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum ValueType {
@ -568,11 +576,12 @@ impl Source {
}
fn __repr__(&self) -> String {
format!("Source(id='{}', name='{}', version={})",
self.get_id(),
self.get_name(),
self.get_version()
)
format!(
"Source(id='{}', name='{}', version={})",
self.get_id(),
self.get_name(),
self.get_version()
)
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
@ -837,33 +846,25 @@ impl AnchorFeature {
}
fn with_key(&self, group: &str, key_alias: Vec<&str>) -> PyResult<Self> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.with_key(group, &key_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{}", e)))?
.into())
})
block_on(async {
Ok(self
.0
.with_key(group, &key_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{}", e)))?
.into())
})
}
fn as_feature(&self, group: &str, feature_alias: &str) -> PyResult<Self> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.as_feature(group, feature_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(self
.0
.as_feature(group, feature_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
fn __repr__(&self) -> String {
@ -946,18 +947,14 @@ impl DerivedFeature {
}
fn as_feature(&self, feature_alias: &str) -> PyResult<Self> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.as_feature(feature_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(self
.0
.as_feature(feature_alias)
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
fn __repr__(&self) -> String {
format!(
@ -1001,11 +998,7 @@ impl AnchorGroup {
}
#[getter]
fn get_anchor_features(&self) -> Vec<String> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.get_anchor_features().await })
block_on(async { self.0.get_anchor_features().await })
}
#[args(keys = "None", registry_tags = "None")]
@ -1040,32 +1033,24 @@ impl AnchorGroup {
builder.add_tag(&key, &value);
}
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
fn __getitem__(&self, key: &str) -> PyResult<AnchorFeature> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.get_anchor(key)
.await
.map_err(|_| PyKeyError::new_err(key.to_string()))?
.into())
})
block_on(async {
Ok(self
.0
.get_anchor(key)
.await
.map_err(|_| PyKeyError::new_err(key.to_string()))?
.into())
})
}
fn __repr__(&self) -> String {
format!(
@ -1096,101 +1081,61 @@ struct FeathrProject(feathr::FeathrProject, FeathrClient);
impl FeathrProject {
#[getter]
pub fn get_id(&self) -> String {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.get_id().await.to_string() })
block_on(async { self.0.get_id().await.to_string() })
}
#[getter]
pub fn get_version(&self) -> u64 {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.get_version().await })
block_on(async { self.0.get_version().await })
}
#[getter]
pub fn get_name(&self) -> String {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.get_name().await.to_string() })
block_on(async { self.0.get_name().await.to_string() })
}
#[getter]
pub fn get_input_context(&self) -> Source {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.INPUT_CONTEXT().await.into() })
block_on(async { self.0.INPUT_CONTEXT().await.into() })
}
#[getter]
pub fn get_sources(&self) -> PyResult<Vec<String>> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { Ok(self.0.get_sources().await) })
block_on(async { Ok(self.0.get_sources().await) })
}
#[getter]
pub fn get_anchor_groups(&self) -> PyResult<Vec<String>> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { Ok(self.0.get_anchor_groups().await) })
block_on(async { Ok(self.0.get_anchor_groups().await) })
}
#[getter]
pub fn get_anchor_features(&self) -> PyResult<Vec<String>> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { Ok(self.0.get_anchor_features().await) })
block_on(async { Ok(self.0.get_anchor_features().await) })
}
#[getter]
pub fn get_derived_features(&self) -> PyResult<Vec<String>> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { Ok(self.0.get_derived_features().await) })
block_on(async { Ok(self.0.get_derived_features().await) })
}
pub fn get_anchor_group(&self, name: &str) -> PyResult<AnchorGroup> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.get_anchor_group(name)
.await
.map_err(|_| PyKeyError::new_err(name.to_string()))?
.into())
})
block_on(async {
Ok(self
.0
.get_anchor_group(name)
.await
.map_err(|_| PyKeyError::new_err(name.to_string()))?
.into())
})
}
pub fn get_derived_feature(&self, name: &str) -> PyResult<DerivedFeature> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(self
.0
.get_derived_feature(name)
.await
.map_err(|_| PyKeyError::new_err(name.to_string()))?
.into())
})
block_on(async {
Ok(self
.0
.get_derived_feature(name)
.await
.map_err(|_| PyKeyError::new_err(name.to_string()))?
.into())
})
}
#[args(registry_tags = "None")]
@ -1206,17 +1151,13 @@ impl FeathrProject {
builder.add_registry_tag(&key, &value);
}
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
#[args(keys = "None", registry_tags = "None")]
@ -1262,17 +1203,13 @@ impl FeathrProject {
builder.add_tag(&key, &value);
}
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
#[args(
@ -1303,17 +1240,13 @@ impl FeathrProject {
builder.preprocessing(&preprocessing);
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
#[args(
@ -1367,17 +1300,13 @@ impl FeathrProject {
builder.preprocessing(&preprocessing);
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
block_on(async {
Ok(builder
.build()
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?
.into())
})
}
// pub fn kafka_source(&self, name: &str, brokers: &PyList, topics: &PyList, avro_json: &PyAny) {}
@ -1405,25 +1334,21 @@ impl FeathrProject {
}
let queries: Vec<&feathr::FeatureQuery> = queries.iter().map(|q| q).collect();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let request = self
.0
.feature_join_job(observation, &queries, output)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.output_path(output)
.build();
let client = self.1 .0.clone();
Ok(client
.submit_job(request)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.0)
})
block_on(async {
let request = self
.0
.feature_join_job(observation, &queries, output)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.output_path(output)
.build();
let client = self.1 .0.clone();
Ok(client
.submit_job(request)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.0)
})
}
fn get_offline_features_async<'p>(
@ -1509,33 +1434,29 @@ impl FeathrProject {
);
let sink = sink.map(|s| feathr::OutputSink::Redis(s.0));
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut builder = self
.0
.feature_gen_job(&feature_names, start, end, step.into())
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?;
if let Some(sink) = sink {
builder.sink(sink);
}
block_on(async {
let mut builder = self
.0
.feature_gen_job(&feature_names, start, end, step.into())
.await
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?;
if let Some(sink) = sink {
builder.sink(sink);
}
let request = builder
.build()
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?;
let client = self.1 .0.clone();
let jobs_ids: Vec<u64> = client
.submit_jobs(request)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.into_iter()
.map(|job_id| job_id.0)
.collect();
Ok(jobs_ids)
})
let request = builder
.build()
.map_err(|e| PyValueError::new_err(format!("{:#?}", e)))?;
let client = self.1 .0.clone();
let jobs_ids: Vec<u64> = client
.submit_jobs(request)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.into_iter()
.map(|job_id| job_id.0)
.collect();
Ok(jobs_ids)
})
}
#[args(step = "DateTimeResolution::Daily", sink = "None")]
@ -1606,11 +1527,7 @@ impl FeathrProject {
#[allow(non_snake_case)]
#[getter]
pub fn INPUT_CONTEXT(&self) -> Source {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { self.0.INPUT_CONTEXT().await.into() })
block_on(async { self.0.INPUT_CONTEXT().await.into() })
}
fn __repr__(&self) -> String {
@ -1637,16 +1554,12 @@ struct FeathrClient(feathr::FeathrClient);
impl FeathrClient {
#[new]
fn load(config_file: String) -> PyResult<Self> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
feathr::FeathrClient::load(config_file)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
.map(|c| FeathrClient(c))
})
block_on(async {
feathr::FeathrClient::load(config_file)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
.map(|c| FeathrClient(c))
})
}
#[staticmethod]
@ -1662,16 +1575,12 @@ impl FeathrClient {
#[staticmethod]
fn loads(content: &str) -> PyResult<Self> {
let content = content.to_string();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
feathr::FeathrClient::from_str(&content)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
.map(|c| FeathrClient(c))
})
block_on(async move {
feathr::FeathrClient::from_str(&content)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
.map(|c| FeathrClient(c))
})
}
#[staticmethod]
@ -1686,30 +1595,22 @@ impl FeathrClient {
}
fn load_project(&self, name: &str) -> PyResult<FeathrProject> {
let project = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
self.0
.load_project(name)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
})?;
let project = block_on(async move {
self.0
.load_project(name)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
})?;
Ok(FeathrProject(project, self.clone()))
}
fn new_project(&self, name: &str) -> PyResult<FeathrProject> {
let project = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
self.0
.new_project(name)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
})?;
let project = block_on(async move {
self.0
.new_project(name)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))
})?;
Ok(FeathrProject(project, self.clone()))
}
@ -1722,16 +1623,12 @@ impl FeathrClient {
) -> PyResult<String> {
let client = self.0.clone();
let timeout = timeout.map(|s| Duration::seconds(s));
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(cancelable_wait(py, async {
Ok(client
.wait_for_job(feathr::JobId(job_id), timeout)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?)
}))
block_on(cancelable_wait(py, async {
Ok(client
.wait_for_job(feathr::JobId(job_id), timeout)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?)
}))
}
#[args(timeout = "None")]
@ -1760,21 +1657,17 @@ impl FeathrClient {
) -> PyResult<Vec<String>> {
let client = self.0.clone();
let timeout = timeout.map(|s| Duration::seconds(s));
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(cancelable_wait(py, async {
let jobs = job_id
.into_iter()
.map(|job_id| client.wait_for_job(feathr::JobId(job_id), timeout));
let complete: Vec<String> = join_all(jobs)
.await
.into_iter()
.map(|r| r.unwrap_or_default())
.collect();
Ok(complete)
}))
block_on(cancelable_wait(py, async {
let jobs = job_id
.into_iter()
.map(|job_id| client.wait_for_job(feathr::JobId(job_id), timeout));
let complete: Vec<String> = join_all(jobs)
.await
.into_iter()
.map(|r| r.unwrap_or_default())
.collect();
Ok(complete)
}))
}
#[args(timeout = "None")]
@ -1801,18 +1694,14 @@ impl FeathrClient {
pub fn get_job_status(&self, job_id: u64) -> PyResult<JobStatus> {
let client = self.0.clone();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let status: JobStatus = client
.get_job_status(feathr::JobId(job_id))
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.into();
Ok(status)
})
block_on(async {
let status: JobStatus = client
.get_job_status(feathr::JobId(job_id))
.await
.map_err(|e| PyRuntimeError::new_err(format!("{:#?}", e)))?
.into();
Ok(status)
})
}
pub fn get_job_status_async<'p>(&'p self, job_id: u64, py: Python<'p>) -> PyResult<&'p PyAny> {

Просмотреть файл

@ -108,12 +108,17 @@ online_store:
ssl_enabled: True
feature_registry:
# For testing
api_endpoint: 'http://localhost:8000/api/v1'
# For testing, `auth` is set to `true` by default
# api_endpoint: 'http://localhost:8000/api/v1'
# auth: "false"
# Official Feathr registry supports API version 1
# api_endpoint: 'https://feathr-sql-registry.azurewebsites.net/api/v1'
# auth: false
api_endpoint: 'https://feathr-rbac-registry.azurewebsites.net/api/v1'
auth: "true"
# Registry in this repo supports API version 2
# api_endpoint: 'https://feathrregistry.azurewebsites.net/api/v2'
# auth: false
# api_version: 2