Add featureset spec for test (#2960)
* Add featureset spec for test * Update overwrite true * Update change checking path * Merge master
This commit is contained in:
Родитель
619b9dab6c
Коммит
dd15e3f7d6
|
@ -19,7 +19,7 @@ on:
|
|||
- sdk/python/dev-requirements.txt
|
||||
- infra/bootstrapping/**
|
||||
- sdk/python/setup.sh
|
||||
- sdk/python/featurestore_sample
|
||||
- sdk/python/featurestore_sample/**
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
|
|
@ -19,7 +19,7 @@ on:
|
|||
- sdk/python/dev-requirements.txt
|
||||
- infra/bootstrapping/**
|
||||
- sdk/python/setup.sh
|
||||
- sdk/python/featurestore_sample
|
||||
- sdk/python/featurestore_sample/**
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
|
|
@ -19,7 +19,7 @@ on:
|
|||
- sdk/python/dev-requirements.txt
|
||||
- infra/bootstrapping/**
|
||||
- sdk/python/setup.sh
|
||||
- sdk/python/featurestore_sample
|
||||
- sdk/python/featurestore_sample/**
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
@ -32,7 +32,7 @@ jobs:
|
|||
- name: setup python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: "3.8"
|
||||
python-version: "3.10"
|
||||
- name: pip install notebook reqs
|
||||
run: pip install -r sdk/python/dev-requirements.txt
|
||||
- name: azure login
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
feature_transformation:
|
||||
transformation_code:
|
||||
path: ./transformation_code
|
||||
transformer_class: transaction_transform.TransactionFeatureTransformer
|
||||
features:
|
||||
- name: transaction_3d_count
|
||||
type: long
|
||||
- name: transaction_amount_3d_sum
|
||||
type: double
|
||||
- name: transaction_amount_3d_avg
|
||||
type: double
|
||||
- name: transaction_7d_count
|
||||
type: long
|
||||
- name: transaction_amount_7d_sum
|
||||
type: double
|
||||
- name: transaction_amount_7d_avg
|
||||
type: double
|
||||
index_columns:
|
||||
- name: accountID
|
||||
type: string
|
||||
source:
|
||||
path: wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet
|
||||
source_delay:
|
||||
days: 0
|
||||
hours: 0
|
||||
minutes: 20
|
||||
timestamp_column:
|
||||
name: timestamp
|
||||
type: parquet
|
||||
source_lookback:
|
||||
days: 7
|
||||
hours: 0
|
||||
minutes: 0
|
||||
temporal_join_lookback:
|
||||
days: 1
|
||||
hours: 0
|
||||
minutes: 0
|
|
@ -0,0 +1,46 @@
|
|||
from pyspark.sql import functions as F
|
||||
from pyspark.sql.window import Window
|
||||
from pyspark.ml import Transformer
|
||||
from pyspark.sql.dataframe import DataFrame
|
||||
|
||||
|
||||
class TransactionFeatureTransformer(Transformer):
|
||||
def _transform(self, df: DataFrame) -> DataFrame:
|
||||
days = lambda i: i * 86400
|
||||
w_3d = (
|
||||
Window.partitionBy("accountID")
|
||||
.orderBy(F.col("timestamp").cast("long"))
|
||||
.rangeBetween(-days(3), 0)
|
||||
)
|
||||
w_7d = (
|
||||
Window.partitionBy("accountID")
|
||||
.orderBy(F.col("timestamp").cast("long"))
|
||||
.rangeBetween(-days(7), 0)
|
||||
)
|
||||
res = (
|
||||
df.withColumn("transaction_7d_count", F.count("transactionID").over(w_7d))
|
||||
.withColumn(
|
||||
"transaction_amount_7d_sum", F.sum("transactionAmount").over(w_7d)
|
||||
)
|
||||
.withColumn(
|
||||
"transaction_amount_7d_avg", F.avg("transactionAmount").over(w_7d)
|
||||
)
|
||||
.withColumn("transaction_3d_count", F.count("transactionID").over(w_3d))
|
||||
.withColumn(
|
||||
"transaction_amount_3d_sum", F.sum("transactionAmount").over(w_3d)
|
||||
)
|
||||
.withColumn(
|
||||
"transaction_amount_3d_avg", F.avg("transactionAmount").over(w_3d)
|
||||
)
|
||||
.select(
|
||||
"accountID",
|
||||
"timestamp",
|
||||
"transaction_3d_count",
|
||||
"transaction_amount_3d_sum",
|
||||
"transaction_amount_3d_avg",
|
||||
"transaction_7d_count",
|
||||
"transaction_amount_7d_sum",
|
||||
"transaction_amount_7d_avg",
|
||||
)
|
||||
)
|
||||
return res
|
|
@ -728,7 +728,7 @@
|
|||
"if not os.path.exists(transactions_featureset_spec_folder):\n",
|
||||
" os.makedirs(transactions_featureset_spec_folder)\n",
|
||||
"\n",
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder)"
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -2132,7 +2132,7 @@
|
|||
"if not os.path.exists(transactions_featureset_spec_folder):\n",
|
||||
" os.makedirs(transactions_featureset_spec_folder)\n",
|
||||
"\n",
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder)"
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -593,7 +593,7 @@
|
|||
"if not os.path.exists(transactions_featureset_spec_folder):\n",
|
||||
" os.makedirs(transactions_featureset_spec_folder)\n",
|
||||
"\n",
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=False)"
|
||||
"transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -239,7 +239,7 @@ on:\n"""
|
|||
- infra/bootstrapping/**
|
||||
- sdk/python/setup.sh\n"""
|
||||
if is_featurestore_sample:
|
||||
workflow_yaml += f""" - sdk/python/featurestore_sample"""
|
||||
workflow_yaml += f""" - sdk/python/featurestore_sample/**"""
|
||||
workflow_yaml += f"""
|
||||
concurrency:
|
||||
group: {GITHUB_CONCURRENCY_GROUP}
|
||||
|
|
Загрузка…
Ссылка в новой задаче