GenericLocation and fix secret passing logic

This commit is contained in:
Chen Xu 2022-07-18 03:53:39 +08:00
Родитель a730e8d4e9
Коммит 1366e4f232
16 изменённых файлов: 638 добавлений и 96 удалений

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "feathr"
version = "0.2.10"
version = "0.2.12"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Просмотреть файл

@ -1,4 +1,4 @@
use std::{path::Path, sync::Arc, collections::HashMap};
use std::{collections::HashMap, path::Path, sync::Arc};
use chrono::Duration;
use futures::future::join_all;
@ -7,8 +7,9 @@ use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
job_client, load_var_source, new_var_source, Error, FeathrApiClient, FeathrProject, JobClient,
JobId, JobStatus, SubmitJobRequest, VarSource, FeatureRegistry, registry_client::api_models, project::FeathrProjectImpl,
job_client, load_var_source, new_var_source, project::FeathrProjectImpl,
registry_client::api_models, Error, FeathrApiClient, FeathrProject, FeatureRegistry, JobClient,
JobId, JobStatus, SubmitJobRequest, VarSource,
};
#[derive(Clone, Debug)]
@ -50,7 +51,11 @@ impl FeathrClient {
self.new_project_with_tags(name, Default::default()).await
}
pub async fn new_project_with_tags(&self, name: &str, tags: HashMap<String, String>) -> Result<FeathrProject, Error> {
pub async fn new_project_with_tags(
&self,
name: &str,
tags: HashMap<String, String>,
) -> Result<FeathrProject, Error> {
let (id, version) = if let Some(r) = self.inner.get_registry_client() {
let def = api_models::ProjectDef {
name: name.to_string(),
@ -301,6 +306,17 @@ mod tests {
.await
.unwrap();
// let batch_source = proj.jdbc_source("nycTaxiBatchSource", "jdbc:sqlserver://feathrtestsql4.database.windows.net:1433;database=testsql;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;")
// .dbtable("green_tripdata_2020_04")
// .auth(JdbcSourceAuth::Userpass)
// .time_window(
// "lpep_dropoff_datetime",
// "yyyy-MM-dd HH:mm:ss"
// )
// .preprocessing("testudf.add_new_fare_amount")
// .build()
// .await
// .unwrap();
let request_features = proj
.anchor_group("request_features", proj.INPUT_CONTEXT().await)
.build()
@ -410,9 +426,12 @@ mod tests {
.await
.unwrap();
println!("features.conf:\n{}", proj.get_feature_config().await.unwrap());
println!(
"features.conf:\n{}",
proj.get_feature_config().await.unwrap()
);
let output = client.get_remote_url("output.bin");
let output = client.get_remote_url("a-output.bin");
let anchor_query = FeatureQuery::new(
&[
&f_trip_distance,
@ -429,7 +448,7 @@ mod tests {
&[&f_trip_time_distance, &f_trip_time_rounded],
&[&location_id],
);
let ob = ObservationSettings::new("wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", "lpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss");
let ob = ObservationSettings::new("wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", "lpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss").unwrap();
println!(
"features_join.conf:\n{}",
@ -442,7 +461,8 @@ mod tests {
.await
.unwrap()
.python_file("test-script/testudf.py")
.output_path(&output)
.output_location(&output)
.unwrap()
.build();
println!("Request: {:#?}", req);
@ -471,7 +491,10 @@ mod tests {
async fn test_load() {
let client = init().await;
let proj = client.load_project("p1").await.unwrap();
println!("features.conf:\n{}", proj.get_feature_config().await.unwrap());
println!(
"features.conf:\n{}",
proj.get_feature_config().await.unwrap()
);
let location_id = TypedKey::new("DOLocationID", ValueType::INT32)
.full_name("nyc_taxi.location_id")
@ -494,7 +517,7 @@ mod tests {
&["f_trip_time_distance", "f_trip_time_rounded"],
&[&location_id],
);
let ob = ObservationSettings::new("wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", "lpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss");
let ob = ObservationSettings::new("wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", "lpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss").unwrap();
println!(
"features_join.conf:\n{}",
@ -507,7 +530,8 @@ mod tests {
.await
.unwrap()
.python_file("test-script/testudf.py")
.output_path(&output)
.output_location(&output)
.unwrap()
.build();
println!("Request: {:#?}", req);

Просмотреть файл

@ -97,6 +97,15 @@ pub enum Error {
#[error("Entity({0}) has invalid type {1:?}")]
InvalidEntityType(String, EntityType),
#[error("Invalid value `{1}` for option `{0}`")]
InvalidOption(String ,String),
#[error("Missing option `{0}`")]
MissingOption(String),
#[error("{0}")]
InvalidArgument(String),
#[error("Feathr client is not connected to the registry")]
DetachedClient,
}

Просмотреть файл

@ -30,6 +30,15 @@ pub struct AnchorFeature {
pub(crate) inner: Arc<AnchorFeatureImpl>,
}
impl Serialize for AnchorFeature {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.inner.serialize(serializer)
}
}
impl AnchorFeature {
pub async fn with_key(&self, group: &str, key_alias: &[&str]) -> Result<Self, Error> {
self.owner
@ -105,6 +114,15 @@ pub struct DerivedFeature {
pub(crate) inner: Arc<DerivedFeatureImpl>,
}
impl Serialize for DerivedFeature {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.inner.serialize(serializer)
}
}
impl DerivedFeature {
pub async fn with_key(&self, key_alias: &[&str]) -> Result<Self, Error> {
self.owner

Просмотреть файл

@ -251,6 +251,10 @@ where
var_source: Arc<dyn VarSource + Send + Sync>,
request: &SubmitJobRequest,
) -> Result<Vec<String>, crate::Error> {
let mut secrets: HashMap<String, String> = Default::default();
for secret in request.secret_key.iter() {
secrets.insert(secret.to_string(), var_source.get_environment_variable(&[secret]).await?);
}
let mut ret: Vec<String> = vec![
"--s3-config".to_string(),
self.get_s3_config(var_source.clone()).await?,
@ -262,6 +266,8 @@ where
self.get_sql_config(var_source.clone()).await?,
"--snowflake-config".to_string(),
self.get_snowflake_config(var_source.clone()).await?,
"--system-properties".to_string(),
serde_json::to_string(&secrets)?,
];
let feature_config_url = self.get_remote_url(&format!(
@ -584,8 +590,8 @@ impl SubmitJoiningJobRequestBuilder {
configuration: Default::default(),
feature_config,
feature_join_config: job_config,
secret_keys: secret_keys,
user_functions: user_functions,
secret_keys,
user_functions,
}
}
@ -603,9 +609,12 @@ impl SubmitJoiningJobRequestBuilder {
/**
* Set output path for the Spark job
*/
pub fn output_path(&mut self, output_path: &str) -> &mut Self {
pub fn output_location<T>(&mut self, output_path: T) -> Result<&mut Self, crate::Error>
where
T: ToString
{
self.output_path = Some(output_path.to_string());
self
Ok(self)
}
/**

Просмотреть файл

@ -1,7 +1,7 @@
use chrono::{DateTime, Duration, Utc};
use serde::Serialize;
use crate::Error;
use crate::{Error, DataLocation, GetSecretKeys};
const END_TIME_FORMAT: &str = "yyyy-MM-dd HH:mm:ss";
@ -58,6 +58,16 @@ impl RedisSink {
#[serde(tag = "name", content = "params", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum OutputSink {
Redis(RedisSink),
Hdfs(DataLocation),
}
impl GetSecretKeys for OutputSink {
fn get_secret_keys(&self) -> Vec<String> {
match &self {
OutputSink::Redis(_) => vec![],
OutputSink::Hdfs(l) => l.get_secret_keys(),
}
}
}
impl From<&OutputSink> for OutputSink {
@ -78,6 +88,18 @@ impl From<&RedisSink> for OutputSink {
}
}
impl From<DataLocation> for OutputSink {
fn from(s: DataLocation) -> Self {
Self::Hdfs(s)
}
}
impl From<&DataLocation> for OutputSink {
fn from(s: &DataLocation) -> Self {
Self::Hdfs(s.to_owned())
}
}
fn ser_timeout<S>(v: &Option<Duration>, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,

Просмотреть файл

@ -3,6 +3,10 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::utils::{dur_to_string, str_to_dur};
pub trait GetSecretKeys {
fn get_secret_keys(&self) -> Vec<String>;
}
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ValueType {

Просмотреть файл

@ -1,17 +1,24 @@
use serde::Serialize;
use serde::{ser::SerializeStruct, Serialize};
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
use crate::{DataLocation, GetSecretKeys};
#[derive(Clone, Debug)]
pub struct ObservationSettings {
pub observation_path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub observation_path: DataLocation,
pub settings: Option<ObservationInnerSettings>,
}
impl ObservationSettings {
pub fn new(observation_path: &str, timestamp_column: &str, format: &str) -> Self {
Self {
observation_path: observation_path.to_string(),
pub fn new<T>(
observation_path: T,
timestamp_column: &str,
format: &str,
) -> Result<Self, crate::Error>
where
T: AsRef<str>,
{
Ok(Self {
observation_path: observation_path.as_ref().parse()?,
settings: Some(ObservationInnerSettings {
join_time_settings: JoinTimeSettings {
timestamp_column: TimestampColumn {
@ -20,23 +27,47 @@ impl ObservationSettings {
},
},
}),
}
})
}
pub fn from_path(observation_path: &str) -> Self {
Self {
observation_path: observation_path.to_string(),
pub fn from_path<T>(observation_path: T) -> Result<Self, crate::Error>
where
T: AsRef<str>,
{
Ok(Self {
observation_path: observation_path.as_ref().parse()?,
settings: None,
}
})
}
}
impl<T> From<T> for ObservationSettings
where
T: AsRef<str>,
{
fn from(s: T) -> Self {
Self::from_path(s.as_ref())
impl GetSecretKeys for ObservationSettings {
fn get_secret_keys(&self) -> Vec<String> {
self.observation_path.get_secret_keys()
}
}
impl Serialize for ObservationSettings {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct(
"ObservationSettings",
if self.settings.is_none() { 1 } else { 2 },
)?;
match &self.observation_path {
DataLocation::Hdfs { path } => {
state.serialize_field("observationPath", path)?;
}
_ => {
state.serialize_field("observationPath", &self.observation_path)?;
}
}
if let Some(s) = &self.settings {
state.serialize_field("settings", s)?;
}
state.end()
}
}

Просмотреть файл

@ -1,3 +1,4 @@
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
@ -14,10 +15,10 @@ use crate::feature::{
use crate::feature_builder::{AnchorFeatureBuilder, DerivedFeatureBuilder};
use crate::registry_client::api_models::{EdgeType, EntityLineage, EntityType};
use crate::{
DateTimeResolution, Error, Feature, FeatureQuery, FeatureRegistry, FeatureType,
HdfsSourceBuilder, JdbcSourceBuilder, KafkaSourceBuilder, ObservationSettings, Source,
SourceImpl, SourceLocation, SubmitGenerationJobRequestBuilder, SubmitJoiningJobRequestBuilder,
TypedKey,
DataLocation, DateTimeResolution, Error, Feature, FeatureQuery, FeatureRegistry, FeatureType,
GenericSourceBuilder, GetSecretKeys, HdfsSourceBuilder, JdbcSourceBuilder, KafkaSourceBuilder,
ObservationSettings, Source, SourceImpl, SubmitGenerationJobRequestBuilder,
SubmitJoiningJobRequestBuilder, TypedKey,
};
/**
@ -227,6 +228,10 @@ impl FeathrProject {
KafkaSourceBuilder::new(self.inner.clone(), name)
}
pub fn generic_source(&self, name: &str, format: &str) -> GenericSourceBuilder {
GenericSourceBuilder::new(self.inner.clone(), name, format)
}
/**
* Returns the placeholder data source
*/
@ -240,15 +245,16 @@ impl FeathrProject {
/**
* Creates the Spark job request for a feature-joining job
*/
pub async fn feature_join_job<O, Q>(
pub async fn feature_join_job<O, Q, L>(
&self,
observation_settings: O,
feature_query: &[&Q],
output: &str,
output: L,
) -> Result<SubmitJoiningJobRequestBuilder, Error>
where
O: Into<ObservationSettings>,
Q: Into<FeatureQuery> + Clone,
L: AsRef<str>,
{
let fq: Vec<FeatureQuery> = feature_query.iter().map(|&q| q.clone().into()).collect();
let feature_names: Vec<String> = fq
@ -256,13 +262,17 @@ impl FeathrProject {
.flat_map(|q| q.feature_list.into_iter())
.collect();
let mut secret_keys = self.get_secret_keys().await?;
let output_location = DataLocation::from_str(output.as_ref())?;
secret_keys.extend(output_location.get_secret_keys());
let ob = observation_settings.into();
Ok(SubmitJoiningJobRequestBuilder::new_join(
format!("{}_feathr_feature_join_job", self.inner.read().await.name),
ob.observation_path.to_string(),
self.get_feature_config().await?,
self.get_feature_join_config(ob, feature_query, output)?,
self.get_secret_keys().await?,
self.get_feature_join_config(ob, feature_query, output_location.to_argument()?)?,
secret_keys,
self.get_user_functions(&feature_names).await?,
))
}
@ -314,15 +324,16 @@ impl FeathrProject {
Ok(s)
}
pub(crate) fn get_feature_join_config<O, Q>(
pub(crate) fn get_feature_join_config<O, Q, T>(
&self,
observation_settings: O,
feature_query: &[&Q],
output: &str,
output: T,
) -> Result<String, Error>
where
O: Into<ObservationSettings>,
Q: Into<FeatureQuery> + Clone,
T: ToString,
{
// TODO: Validate feature names
@ -340,7 +351,7 @@ impl FeathrProject {
.into_iter()
.map(|&q| q.to_owned().into())
.collect(),
output_path: output.to_string(),
output_path: output.to_string().parse::<DataLocation>()?.to_argument()?,
};
Ok(serde_json::to_string_pretty(&cfg)?)
}
@ -508,7 +519,7 @@ impl FeathrProjectImpl {
.await?;
}
if !matches!(g.source.inner.location, SourceLocation::InputContext)
if !matches!(g.source.inner.location, DataLocation::InputContext)
&& (f.get_key().is_empty() || f.get_key() == vec![TypedKey::DUMMY_KEY()])
{
return Err(Error::DummyKeyUsedWithoutInputContext(f.get_name()));

Просмотреть файл

@ -144,7 +144,8 @@ impl TryInto<crate::project::FeathrProjectImpl> for (Uuid, u64, ProjectAttribute
pub struct SourceAttributes {
pub qualified_name: String,
pub name: String,
pub path: String,
#[serde(flatten, default)]
pub options: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preprocessing: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -165,25 +166,104 @@ impl TryInto<crate::source::SourceImpl> for (Uuid, u64, SourceAttributes) {
id: self.0,
version: 1,
name: self.2.name,
location: crate::SourceLocation::InputContext,
location: crate::DataLocation::InputContext,
time_window_parameters: None,
preprocessing: None,
registry_tags: Default::default(),
}
} else {
SourceImpl {
id: self.0,
version: self.1,
name: self.2.name,
location: crate::SourceLocation::Hdfs { path: self.2.path },
time_window_parameters: self.2.event_timestamp_column.map(|c| {
crate::TimeWindowParameters {
timestamp_column: c,
timestamp_column_format: self.2.timestamp_format.unwrap_or_default(),
}
}),
preprocessing: self.2.preprocessing,
registry_tags: self.2.tags,
match self.2.type_.to_lowercase().as_str() {
"jdbc" => SourceImpl {
id: self.0,
version: self.1,
name: self.2.name.clone(),
location: crate::DataLocation::Jdbc {
url: self
.2
.options
.get("url")
.ok_or(crate::Error::MissingOption("url".to_string()))?
.to_owned(),
dbtable: self.2.options.get("dbtable").cloned(),
query: self.2.options.get("query").cloned(),
auth: match self.2.options.get("auth") {
Some(auth) => match auth.as_str().to_lowercase().as_str() {
"userpass" => crate::JdbcAuth::Userpass {
user: format!("${{{}_USER}}", self.2.name),
password: format!("${{{}_PASSWORD}}", self.2.name),
},
"token" => crate::JdbcAuth::Token {
token: format!("${{{}_TOKEN}}", self.2.name),
},
_ => {
return Err(crate::Error::InvalidOption(
"auth".to_string(),
auth.to_owned(),
))
}
},
None => crate::JdbcAuth::Anonymous,
},
},
time_window_parameters: self.2.event_timestamp_column.map(|c| {
crate::TimeWindowParameters {
timestamp_column: c,
timestamp_column_format: self.2.timestamp_format.unwrap_or_default(),
}
}),
preprocessing: self.2.preprocessing,
registry_tags: self.2.tags,
},
"generic" => SourceImpl {
id: self.0,
version: self.1,
name: self.2.name,
location: crate::DataLocation::Generic {
format: self
.2
.options
.get("format")
.ok_or(crate::Error::MissingOption("format".to_string()))?
.to_owned(),
mode: self.2.options.get("mode").cloned(),
options: self.2.options.clone(),
},
time_window_parameters: self.2.event_timestamp_column.map(|c| {
crate::TimeWindowParameters {
timestamp_column: c,
timestamp_column_format: self.2.timestamp_format.unwrap_or_default(),
}
}),
preprocessing: self.2.preprocessing,
registry_tags: self.2.tags,
},
"hdfs" | "wasb" | "wasbs" | "dbfs" | "s3" => SourceImpl {
id: self.0,
version: self.1,
name: self.2.name,
location: crate::DataLocation::Hdfs {
path: self
.2
.options
.get("path")
.ok_or(crate::Error::MissingOption("path".to_string()))?
.to_owned(),
},
time_window_parameters: self.2.event_timestamp_column.map(|c| {
crate::TimeWindowParameters {
timestamp_column: c,
timestamp_column_format: self.2.timestamp_format.unwrap_or_default(),
}
}),
preprocessing: self.2.preprocessing,
registry_tags: self.2.tags,
},
_ => {
return Err(crate::Error::InvalidOption(
"type".to_string(),
self.2.type_.to_owned(),
))
},
}
})
}

Просмотреть файл

@ -32,7 +32,8 @@ pub struct SourceDef {
pub name: String,
#[serde(rename = "type")]
pub source_type: String,
pub path: String,
#[serde(default)]
pub options: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub event_timestamp_column: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
@ -45,17 +46,58 @@ pub struct SourceDef {
impl From<SourceImpl> for SourceDef {
fn from(s: SourceImpl) -> Self {
let (t, path) = match s.location {
crate::SourceLocation::Hdfs { path } => ("hdfs".to_string(), path),
crate::SourceLocation::InputContext => {
("PASSTHROUGH".to_string(), "PASSTHROUGH".to_string())
let (source_type, options) = match s.location {
crate::DataLocation::InputContext => {
("PASSTHROUGH", HashMap::new())
}
crate::DataLocation::Hdfs { path } => ("hdfs", {
let mut options = HashMap::new();
options.insert("path".to_string(), path);
options
}),
crate::DataLocation::Jdbc {
url,
dbtable,
query,
auth,
..
} => {
let mut options = HashMap::new();
options.insert("url".to_string(), url);
if let Some(dbtable) = dbtable {
options.insert("dbtable".to_string(), dbtable);
}
if let Some(query) = query {
options.insert("query".to_string(), query);
}
match auth {
crate::JdbcAuth::Userpass { .. } => {
options.insert("auth".to_string(), "userpass".to_string());
}
crate::JdbcAuth::Token { .. } => {
options.insert("auth".to_string(), "token".to_string());
}
crate::JdbcAuth::Anonymous => {}
}
("jdbc", options)
}
crate::DataLocation::Generic {
format,
mode,
mut options,
} => {
options.insert("format".to_string(), format);
if let Some(mode) = mode {
options.insert("mode".to_string(), mode);
}
("generic", options)
}
_ => todo!(),
};
Self {
name: s.name,
source_type: t,
path,
source_type: source_type.to_string(),
options,
event_timestamp_column: s.time_window_parameters.clone().map(|t| t.timestamp_column),
timestamp_format: s.time_window_parameters.map(|t| t.timestamp_column_format),
preprocessing: s.preprocessing,

Просмотреть файл

@ -1,4 +1,4 @@
use std::{sync::Arc, collections::HashMap};
use std::{collections::HashMap, str::FromStr, sync::Arc};
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use tokio::sync::RwLock;
@ -6,12 +6,12 @@ use uuid::Uuid;
use crate::{
project::{FeathrProjectImpl, FeathrProjectModifier},
Error,
Error, utils::parse_secret, GetSecretKeys,
};
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(untagged)]
pub(crate) enum JdbcAuth {
pub enum JdbcAuth {
Userpass { user: String, password: String },
Token { token: String },
Anonymous,
@ -32,14 +32,14 @@ impl Serialize for JdbcAuth {
JdbcAuth::Userpass { user, password } => {
let mut state = serializer.serialize_struct("JdbcAuth", 3)?;
state.serialize_field("type", "jdbc")?;
state.serialize_field("user", &user)?;
state.serialize_field("password", &password)?;
state.serialize_field("user", user)?;
state.serialize_field("password", password)?;
state.end()
}
JdbcAuth::Token { token } => {
let mut state = serializer.serialize_struct("JdbcAuth", 4)?;
state.serialize_field("type", "jdbc")?;
state.serialize_field("token", &token)?;
state.serialize_field("token", token)?;
state.serialize_field("useToken", &true)?;
state.end()
}
@ -58,7 +58,7 @@ pub struct KafkaSchema {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
#[serde(rename_all = "camelCase")]
pub(crate) enum SourceLocation {
pub enum DataLocation {
Hdfs {
path: String,
},
@ -76,9 +76,100 @@ pub(crate) enum SourceLocation {
topics: Vec<String>,
schema: KafkaSchema,
},
Generic {
format: String,
#[serde(skip_serializing_if = "Option::is_none", default)]
mode: Option<String>,
#[serde(flatten, default)]
options: HashMap<String, String>,
},
InputContext,
}
impl FromStr for DataLocation {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim();
Ok(if s.starts_with('{') && s.ends_with('}') {
serde_json::from_str(s)?
} else {
DataLocation::Hdfs {
path: s.to_string(),
}
})
}
}
impl ToString for DataLocation {
fn to_string(&self) -> String {
match &self {
DataLocation::Hdfs { path } => path.to_owned(),
_ => serde_json::to_string(&self).unwrap(),
}
}
}
impl DataLocation {
pub fn to_argument(&self) -> Result<String, crate::Error> {
match &self {
DataLocation::Hdfs { path } => Ok(path.to_owned()),
DataLocation::Jdbc { .. } | DataLocation::Generic { .. } => {
Ok(serde_json::to_string(&self)?)
}
DataLocation::Kafka { .. } => Err(crate::Error::InvalidArgument(
"Kafka cannot be used as output target".to_string(),
)),
DataLocation::InputContext => Err(crate::Error::InvalidArgument(
"INPUT_CONTEXT cannot be used as output target".to_string(),
)),
}
}
pub fn get_type(&self) -> String {
match &self {
DataLocation::Hdfs { .. } => "hdfs".to_string(),
DataLocation::Jdbc { .. } => "jdbc".to_string(),
DataLocation::Kafka { .. } => "kafka".to_string(),
DataLocation::Generic { .. } => "generic".to_string(),
DataLocation::InputContext => "INPUT_CONTEXT".to_string(),
}
}
}
impl GetSecretKeys for DataLocation {
fn get_secret_keys(&self) -> Vec<String> {
let mut secrets = vec![];
match &self {
DataLocation::Jdbc { auth, .. } => match auth {
JdbcAuth::Userpass { user, password } => {
if let Some(s) = parse_secret(&user) {
secrets.push(s);
}
if let Some(s) = parse_secret(&password) {
secrets.push(s);
}
}
JdbcAuth::Token { token } => {
if let Some(s) = parse_secret(&token) {
secrets.push(s);
}
}
JdbcAuth::Anonymous => (),
},
DataLocation::Generic { options, .. } => {
for (_, v) in options {
if let Some(s) = parse_secret(v) {
secrets.push(s);
}
}
}
_ => (),
}
secrets
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct TimeWindowParameters {
@ -95,7 +186,7 @@ pub(crate) struct SourceImpl {
pub(crate) version: u64,
#[serde(skip)]
pub(crate) name: String,
pub(crate) location: SourceLocation,
pub(crate) location: DataLocation,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) time_window_parameters: Option<TimeWindowParameters>,
#[serde(skip)]
@ -117,7 +208,7 @@ impl SourceImpl {
id: Uuid::new_v4(),
version: 1,
name: "PASSTHROUGH".to_string(),
location: SourceLocation::InputContext,
location: DataLocation::InputContext,
time_window_parameters: None,
preprocessing: None,
registry_tags: Default::default(),
@ -125,12 +216,12 @@ impl SourceImpl {
}
pub(crate) fn is_input_context(&self) -> bool {
matches!(self.location, SourceLocation::InputContext)
matches!(self.location, DataLocation::InputContext)
}
pub(crate) fn get_secret_keys(&self) -> Vec<String> {
match &self.location {
SourceLocation::Jdbc { auth, .. } => match auth {
DataLocation::Jdbc { auth, .. } => match auth {
JdbcAuth::Userpass { .. } => vec![
format!("{}_USER", self.name),
format!("{}_PASSWORD", self.name),
@ -138,16 +229,51 @@ impl SourceImpl {
JdbcAuth::Token { .. } => vec![format!("{}_TOKEN", self.name)],
_ => vec![],
},
DataLocation::Generic { options, .. } => options
.keys()
.filter_map(|k| {
if let Some(start) = k.find("${") {
if let Some(end) = k[start..].find("}") {
Some(k[start + 2..start + end].to_string())
} else {
None
}
} else {
None
}
})
.collect(),
_ => vec![],
}
}
}
impl GetSecretKeys for SourceImpl {
fn get_secret_keys(&self) -> Vec<String> {
self.location.get_secret_keys()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Source {
pub(crate) inner: Arc<SourceImpl>,
}
impl GetSecretKeys for Source {
fn get_secret_keys(&self) -> Vec<String> {
self.inner.get_secret_keys()
}
}
impl Serialize for Source {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.inner.serialize(serializer)
}
}
impl Default for Source {
fn default() -> Self {
Self {
@ -169,6 +295,14 @@ impl Source {
self.inner.name.clone()
}
pub fn get_type(&self) -> String {
self.inner.location.get_type()
}
pub fn get_location(&self) -> DataLocation {
self.inner.location.clone()
}
pub fn get_secret_keys(&self) -> Vec<String> {
self.inner.get_secret_keys()
}
@ -226,7 +360,7 @@ impl HdfsSourceBuilder {
id: Uuid::new_v4(),
version: 1,
name: self.name.to_string(),
location: SourceLocation::Hdfs {
location: DataLocation::Hdfs {
path: self.path.clone(),
},
time_window_parameters: self.time_window_parameters.clone(),
@ -314,15 +448,16 @@ impl JdbcSourceBuilder {
}
pub async fn build(&self) -> Result<Source, Error> {
let auth = self.auth.clone().unwrap_or(JdbcAuth::Anonymous);
let imp = SourceImpl {
id: Uuid::new_v4(),
version: 1,
name: self.name.to_string(),
location: SourceLocation::Jdbc {
location: DataLocation::Jdbc {
url: self.url.clone(),
dbtable: self.dbtable.to_owned(),
query: self.query.to_owned(),
auth: self.auth.clone().unwrap_or(JdbcAuth::Anonymous),
auth,
},
time_window_parameters: self.time_window_parameters.clone(),
preprocessing: self.preprocessing.clone(),
@ -405,7 +540,7 @@ impl KafkaSourceBuilder {
id: Uuid::new_v4(),
version: 1,
name: self.name.to_string(),
location: SourceLocation::Kafka {
location: DataLocation::Kafka {
brokers: self.brokers.clone(),
topics: self.topics.clone(),
schema: KafkaSchema {
@ -420,3 +555,146 @@ impl KafkaSourceBuilder {
self.owner.insert_source(imp).await
}
}
pub struct GenericSourceBuilder {
owner: Arc<RwLock<FeathrProjectImpl>>,
name: String,
format: String,
mode: Option<String>,
options: HashMap<String, String>,
time_window_parameters: Option<TimeWindowParameters>,
preprocessing: Option<String>,
}
impl GenericSourceBuilder {
pub(crate) fn new<T>(owner: Arc<RwLock<FeathrProjectImpl>>, name: &str, format: T) -> Self
where
T: ToString,
{
Self {
owner,
name: name.to_string(),
format: format.to_string(),
mode: None,
options: Default::default(),
time_window_parameters: None,
preprocessing: None,
}
}
pub fn mode<T>(&mut self, mode: T) -> &mut Self
where
T: ToString,
{
self.mode = Some(mode.to_string());
self
}
pub fn option<T1, T2>(&mut self, key: T1, value: T2) -> &mut Self
where
T1: ToString,
T2: ToString,
{
self.options
.insert(key.to_string().replace('.', "__"), value.to_string());
self
}
pub fn options<I, K, V>(&mut self, iter: I) -> &mut Self
where
I: IntoIterator<Item = (K, V)>,
K: ToString,
V: ToString,
{
iter.into_iter().for_each(|(key, value)| {
self.options
.insert(key.to_string().replace('.', "__"), value.to_string());
});
self
}
pub fn time_window(
&mut self,
timestamp_column: &str,
timestamp_column_format: &str,
) -> &mut Self {
self.time_window_parameters = Some(TimeWindowParameters {
timestamp_column: timestamp_column.to_string(),
timestamp_column_format: timestamp_column_format.to_string(),
});
self
}
pub fn preprocessing(&mut self, preprocessing: &str) -> &mut Self {
self.preprocessing = Some(preprocessing.to_string());
self
}
pub async fn build(&self) -> Result<Source, Error> {
let imp = SourceImpl {
id: Uuid::new_v4(),
version: 1,
name: self.name.to_string(),
location: DataLocation::Generic {
format: self.format.clone(),
mode: self.mode.clone(),
options: self.options.clone(),
},
time_window_parameters: self.time_window_parameters.clone(),
preprocessing: self.preprocessing.clone(),
registry_tags: Default::default(),
};
self.owner.insert_source(imp).await
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, str::FromStr};
use crate::DataLocation;
#[test]
fn data_location() {
let loc = DataLocation::from_str("s3://bucket/key").unwrap();
assert_eq!(
loc,
DataLocation::Hdfs {
path: "s3://bucket/key".to_string()
}
);
assert_eq!(loc.to_argument().unwrap(), "s3://bucket/key");
let loc: DataLocation = "s3://bucket/key".parse().unwrap();
assert_eq!(
loc,
DataLocation::Hdfs {
path: "s3://bucket/key".to_string()
}
);
assert_eq!(loc.to_argument().unwrap(), "s3://bucket/key");
let loc: DataLocation = r#"{"format": "cosmos.oltp", "mode": "APPEND", "spark__cosmos__accountEndpoint": "https://xchcosmos1.documents.azure.com:443/", "spark__cosmos__accountKey": "${cosmos1_KEY}", "spark__cosmos__database": "feathr", "spark__cosmos__container": "abcde"}"#.parse().unwrap();
assert_eq!(
loc,
DataLocation::Generic {
format: "cosmos.oltp".to_string(),
mode: Some("APPEND".to_string()),
options: {
let mut options = HashMap::new();
options.insert(
"spark__cosmos__accountEndpoint".to_string(),
"https://xchcosmos1.documents.azure.com:443/".to_string(),
);
options.insert(
"spark__cosmos__accountKey".to_string(),
"${cosmos1_KEY}".to_string(),
);
options.insert("spark__cosmos__database".to_string(), "feathr".to_string());
options.insert("spark__cosmos__container".to_string(), "abcde".to_string());
options
}
}
);
}
}

Просмотреть файл

@ -111,6 +111,17 @@ pub(crate) fn dur_to_string(d: Duration) -> String {
}
}
pub fn parse_secret(s: &str) -> Option<String> {
if let Some(start) = s.find("${") {
if let Some(end) = s[start..].find("}") {
if start < end {
return Some(s[start + 2..start+end].to_string());
}
}
}
None
}
#[cfg(test)]
mod tests {
use chrono::Duration;

Просмотреть файл

@ -22,7 +22,10 @@ struct EnvVarSource;
impl VarSource for EnvVarSource {
async fn get_environment_variable(&self, name: &[&str]) -> Result<String, crate::Error> {
let name: Vec<&str> = name.into_iter().map(|s| s.as_ref()).collect();
Ok(std::env::var(name.join("__").to_uppercase())?)
Ok(
std::env::var(name.join("__"))
.or_else(|_| std::env::var(name.join("__").to_uppercase()))?,
)
}
}
@ -151,13 +154,9 @@ where
T: AsRef<str>,
{
match YamlSource::from_str(content.as_ref()) {
Ok(src) => {
Arc::new(src)
}
Ok(src) => Arc::new(src),
Err(_) => {
warn!(
"Failed read Feathr config, using environment variables."
);
warn!("Failed read Feathr config, using environment variables.");
Arc::new(EnvVarSource)
}
}

Просмотреть файл

@ -116,4 +116,8 @@ feature_names_funcs = {
{{#each user_functions}}
"{{@key}}": {{this}},
{{/each}}
}
}
print("pyspark_client.py: Preprocessing via UDFs and submit Spark job.")
submit_spark_job(feature_names_funcs)
print("pyspark_client.py: Feathr Pyspark job completed.")

Просмотреть файл

@ -70,7 +70,7 @@ offline_store:
spark_config:
# choice for spark runtime. Currently support: azure_synapse, databricks
# The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.
spark_cluster: 'azure_synapse'
spark_cluster: 'databricks'
# configure number of parts for the spark output for feature generation job
spark_result_output_parts: '1'
@ -84,7 +84,7 @@ spark_config:
# Feathr Job configuration. Support local paths, path start with http(s)://, and paths start with abfs(s)://
# this is the default location so end users don't have to compile the runtime again.
# feathr_runtime_location: wasbs://public@azurefeathrstorage.blob.core.windows.net/feathr-assembly-0.1.0-SNAPSHOT.jar
feathr_runtime_location: "wasbs://public@xchfeathrtest4sto.blob.core.windows.net/feathr-assembly-0.4.0.jar"
feathr_runtime_location: "wasbs://public@xchfeathrtest4sto.blob.core.windows.net/feathr-assembly-0.5.0.jar"
databricks:
# workspace instance
workspace_instance_url: 'https://adb-5638037984879289.9.azuredatabricks.net/'
@ -96,7 +96,7 @@ spark_config:
work_dir: 'dbfs:/feathr_getting_started'
# this is the default location so end users don't have to compile the runtime again.
# feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.1.0.jar"
# feathr_runtime_location: "dbfs:/feathr-assembly-0.4.0.jar"
feathr_runtime_location: "dbfs:/feathr-assembly-0.5.0.jar"
online_store:
redis: