Merge branch 'runhli/update_feast' of https://github.com/runhli/feast-azure into runhli-runhli/update_feast

This commit is contained in:
Sam Kemp 2022-06-27 14:23:34 +01:00
Родитель 2cf7c4e3d9 591411d871
Коммит abefd9573e
7 изменённых файлов: 99 добавлений и 20 удалений

11
.github/workflows/test.yaml поставляемый
Просмотреть файл

@ -18,6 +18,11 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
architecture: x64
- name: Setup Go
id: setup-go
uses: actions/setup-go@v2
with:
go-version: 1.17.7
- name: include submodule
run: git submodule update --init
# - name: Get pip cache dir
@ -34,11 +39,15 @@ jobs:
# key: ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-pip-${{ hashFiles('**/setup.py') }}
# restore-keys: |
# ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-pip-
- name: Install pip-tools
run: pip install pip-tools
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install 'protobuf>=3.10,<3.20'
python -m pip install db-dtypes
python -m pip install -e feast/sdk/python[ci]
cd feast
make install-python-ci-dependencies
cd ..
- name: Run Feast unit tests
run: FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 feast/sdk/python/tests

1
.gitmodules поставляемый
Просмотреть файл

@ -1,3 +1,4 @@
[submodule "feast"]
path = feast
url = https://github.com/feast-dev/feast.git
branch = v0.21-branch

2
feast

@ -1 +1 @@
Subproject commit b1ccf8dd1535f721aee8bea937ee38feff80bec5
Subproject commit bdd47d041c7c8d5f71cb8bcd7fa0c24a45e8eb0f

Просмотреть файл

@ -1,14 +1,16 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
import pandas
import pyarrow as pa
from tqdm import tqdm
from feast import FeatureService
from feast.entity import Entity
from feast.feature_logging import FeatureServiceLoggingSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
@ -24,6 +26,8 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset
from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute
from feast.utils import make_tzaware
DEFAULT_BATCH_SIZE = 10_000
@ -78,6 +82,7 @@ class AzureProvider(Provider):
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)
@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_read(
self,
config: RepoConfig,
@ -198,3 +203,49 @@ class AzureProvider(Provider):
end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore
)
def write_feature_service_logs(
self,
feature_service: FeatureService,
logs: Union[pa.Table, str],
config: RepoConfig,
registry: Registry,
):
assert (
feature_service.logging_config is not None
), "Logging should be configured for the feature service before calling this function"
self.offline_store.write_logged_features(
config=config,
data=logs,
source=FeatureServiceLoggingSource(feature_service, config.project),
logging_config=feature_service.logging_config,
registry=registry,
)
def retrieve_feature_service_logs(
self,
feature_service: FeatureService,
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
) -> RetrievalJob:
assert (
feature_service.logging_config is not None
), "Logging should be configured for the feature service before calling this function"
logging_source = FeatureServiceLoggingSource(feature_service, config.project)
schema = logging_source.get_schema(registry)
logging_config = feature_service.logging_config
ts_column = logging_source.get_log_timestamp_column()
columns = list(set(schema.names) - {ts_column})
return self.offline_store.pull_all_from_table_or_query(
config=config,
data_source=logging_config.destination.to_data_source(),
join_key_columns=[],
feature_name_columns=columns,
timestamp_field=ts_column,
start_date=make_tzaware(start_date),
end_date=make_tzaware(end_date),
)

Просмотреть файл

@ -26,8 +26,10 @@ from sqlalchemy.orm import Session, sessionmaker
from feast import errors
from feast.data_source import DataSource
from .mssqlserver_source import MsSqlServerSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
from feast.infra.offline_stores.offline_store import (
OfflineStore,
RetrievalJob,
@ -135,7 +137,7 @@ class MsSqlServerOfflineStore(OfflineStore):
== "feast_azure_provider.mssqlserver.MsSqlServerOfflineStore"
)
from_expression = data_source.get_table_query_string().replace("`", "")
timestamps = [event_timestamp_column]
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
query = f"""
@ -414,7 +416,7 @@ def get_feature_view_query_context(
join_keys = []
entity_selections = []
reverse_field_mapping = {
v: k for k, v in feature_view.input.field_mapping.items()
v: k for k, v in feature_view.source.field_mapping.items()
}
for entity_name in feature_view.entities:
entity = registry.get_entity(entity_name, project)
@ -429,17 +431,17 @@ def get_feature_view_query_context(
else:
ttl_seconds = 0
assert isinstance(feature_view.input, MsSqlServerSource)
assert isinstance(feature_view.source, MsSqlServerSource)
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column
event_timestamp_column = feature_view.source.event_timestamp_column
created_timestamp_column = feature_view.source.created_timestamp_column
context = FeatureViewQueryContext(
name=feature_view.name,
ttl=ttl_seconds,
entities=join_keys,
features=features,
table_ref=feature_view.input.table_ref,
table_ref=feature_view.source.table_ref,
event_timestamp_column=reverse_field_mapping.get(
event_timestamp_column, event_timestamp_column
),
@ -447,7 +449,7 @@ def get_feature_view_query_context(
created_timestamp_column, created_timestamp_column
),
# TODO: Make created column optional and not hardcoded
table_subquery=feature_view.input.get_table_query_string().replace("`", ""),
table_subquery=feature_view.source.get_table_query_string().replace("`", ""),
entity_selections=entity_selections,
)
query_context.append(context)

Просмотреть файл

@ -100,6 +100,10 @@ class MsSqlServerSource(DataSource):
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
connection_str: Optional[str] = "",
description: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = None,
name: Optional[str] = None
):
self._mssqlserver_options = MsSqlServerOptions(
connection_str=connection_str, table_ref=table_ref
@ -107,11 +111,15 @@ class MsSqlServerSource(DataSource):
self._connection_str = connection_str
super().__init__(
event_timestamp_column
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"),
created_timestamp_column,
field_mapping,
date_partition_column,
created_timestamp_column = created_timestamp_column,
field_mapping = field_mapping,
date_partition_column = date_partition_column,
description = description,
tags = tags,
owner = owner,
name = name,
timestamp_field = event_timestamp_column
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME")
)
def __eq__(self, other):
@ -121,12 +129,20 @@ class MsSqlServerSource(DataSource):
)
return (
self.mssqlserver_options.connection_str
self.name == other.name
and self.mssqlserver_options.connection_str
== other.mssqlserver_options.connection_str
and self.event_timestamp_column == other.event_timestamp_column
and self.timestamp_field == other.timestamp_field
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
)
def __hash__(self):
return hash((
self.name,
self.mssqlserver_options.connection_str,
self.timestamp_field,
self.created_timestamp_column))
@property
def table_ref(self):
@ -153,7 +169,7 @@ class MsSqlServerSource(DataSource):
field_mapping=dict(data_source.field_mapping),
table_ref=options["table_ref"],
connection_str=options["connection_string"],
event_timestamp_column=data_source.event_timestamp_column,
event_timestamp_column=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
@ -165,7 +181,7 @@ class MsSqlServerSource(DataSource):
custom_options=self.mssqlserver_options.to_proto(),
)
data_source_proto.event_timestamp_column = self.event_timestamp_column
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column

Просмотреть файл

@ -29,7 +29,7 @@ setup(
python_requires=">=3.7.0",
packages=find_packages(exclude=("tests",)),
install_requires=[
"feast[redis]==0.18.1",
"feast[redis]==0.21.3",
"azure-storage-blob>=0.37.0",
"azure-identity>=1.6.1",
"SQLAlchemy>=1.4.19",