Adding Online evaluation dependencies in eval environment (#3492)

* Adding Online evaluation dependencies in model_eval environment

* pinning library versions

* changed python dependencies to latest-pypi-version

* Removing managed identity

* Change evaluators format

* Removing azureml-core and azureml-automl-core dependencies

* Adding latest image tag

* Changing contracts to use DataMapping and InitParams

* Adding logging level

* Pinning azure.ai.evaluation as done previously by remote eval

* Pinning azure.ai.evaluation as done previously by remote eval

* Adding copyright headers

* Adding codehealth fix

* Adding codehealth fix

* Adding codehealth fix

* Adding codehealth fix

---------

Co-authored-by: apeddauppari <apeddauppari@microsoft.com>
This commit is contained in:
ghyadav 2024-10-23 23:28:21 +05:30 коммит произвёл GitHub
Родитель 2dbc9107fc
Коммит e3559856f5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
8 изменённых файлов: 575 добавлений и 2 удалений

Просмотреть файл

@ -5,4 +5,5 @@ RUN pip install -r /app/requirements.txt
# Copy your Python file into the image
COPY evaluate_on_data.py /app/evaluate_on_data.py
COPY save_evaluation.py /app/save_evaluation.py
COPY save_evaluation.py /app/save_evaluation.py
ADD online_eval /app/online_eval

Просмотреть файл

@ -0,0 +1,159 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Main script for the evaluate context."""
import argparse
import json
import logging
from collections import defaultdict
import importlib
import sys
from promptflow.client import load_flow
from azure.ai.evaluation import evaluate
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
import pandas as pd
from utils import get_mlclient, extract_model_info
import os
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_args():
"""Get arguments from the command line."""
parser = argparse.ArgumentParser()
# Inputs
parser.add_argument("--preprocessed_data", type=str, dest="preprocessed_data",
default="./preprocessed_data_output.jsonl")
parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl")
parser.add_argument("--evaluators", type=str, dest="evaluators")
parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1")
args, _ = parser.parse_known_args()
return vars(args)
def load_evaluator(evaluator):
"""Load the evaluator from the given path."""
logger.info(f"Loading evaluator {evaluator}")
loaded_evaluator = load_flow(evaluator)
logger.info(loaded_evaluator)
logger.info(
f"Loading module {os.getcwd()} {loaded_evaluator.entry.split(':')[0]} from {loaded_evaluator.path.parent.name}"
)
module_path = os.path.join(
loaded_evaluator.path.parent, loaded_evaluator.entry.split(":")[0] + ".py"
)
module_name = loaded_evaluator.entry.split(":")[0]
logger.info(f"Loading module {module_name} from {module_path}")
spec = importlib.util.spec_from_file_location(module_name, module_path)
mod = importlib.util.module_from_spec(spec)
logger.info(f"Loaded module {mod}")
sys.modules[module_name] = mod
spec.loader.exec_module(mod)
eval_class = getattr(mod, loaded_evaluator.entry.split(":")[1])
return eval_class
def update_value_in_dict(d, key_substring, new_func):
"""Recursively search for a value containing 'key_substring' and apply 'new_func' to modify it."""
for key, value in d.items():
if isinstance(value, dict):
update_value_in_dict(value, key_substring, new_func)
elif isinstance(value, str) and key_substring in value:
d[key] = new_func(value)
def find_file_and_get_parent_dir(root_dir, file_name="flow.flex.yaml"):
"""Find the flex flow or any given file in a directory and return the parent directory."""
for dirpath, _, filenames in os.walk(root_dir):
if file_name in filenames:
logger.info(f"Found {file_name} in {dirpath}")
return os.path.abspath(dirpath)
# Todo: We should not load evaluators every time the component runs
def download_evaluators_and_update_local_path(evaluators):
"""Find the flex flow or any given file in a directory and return the parent directory."""
for evaluator_name, evaluator in evaluators.items():
try:
root_dir = evaluator["Id"]
download_path = f"./{evaluator_name}"
if root_dir.startswith("azureml://"):
model_info = extract_model_info(root_dir)
if model_info is None:
logger.info(f"Invalid model asset id: {root_dir}")
return
if model_info['type'] == "workspace_registered":
mlclient = get_mlclient()
elif model_info['type'] == "registry_registered":
mlclient = get_mlclient(registry_name=model_info['registry'])
mlclient.models.download(name=model_info["model_name"], version=model_info["version"],
download_path=download_path)
evaluators[evaluator_name]["local_path"] = find_file_and_get_parent_dir(download_path)
else:
raise ValueError(f"Invalid model asset id: {root_dir}")
except Exception as e:
logger.info(f"Error downloading evaluator {evaluator['Id']}: {e}")
return evaluators
def load_evaluators(input_evaluators):
"""Initialize the evaluators using correct parameters and credentials for rai evaluators."""
loaded_evaluators, loaded_evaluator_configs = {}, {}
for evaluator_name, evaluator in input_evaluators.items():
init_params = evaluator.get("InitParams", {})
update_value_in_dict(init_params, "AZURE_OPENAI_API_KEY", lambda x: os.environ[x.upper()])
flow = load_evaluator(evaluator["local_path"])
if any(rai_eval in evaluator["Id"] for rai_eval in rai_evaluators):
init_params["credential"] = AzureMLOnBehalfOfCredential()
loaded_evaluators[evaluator_name] = flow(**init_params)
loaded_evaluator_configs[evaluator_name] = {"column_mapping": evaluator.get("DataMapping", {})}
return loaded_evaluators, loaded_evaluator_configs
def run_evaluation(command_line_args, evaluators, evaluator_configs):
"""Run the evaluation."""
# Todo: can we get only results back instead of the whole response?
results = evaluate(data=command_line_args["preprocessed_data"], evaluators=evaluators,
evaluator_config=evaluator_configs)
logger.info("Evaluation Completed")
logger.info("results here", results)
final_results = defaultdict(list)
for result in results["rows"]:
for evaluator_name in evaluators:
result_key = f"outputs.{evaluator_name}"
filtered_result = {k: v for k, v in result.items() if k.startswith(result_key)}
if len(filtered_result) == 1:
final_results[evaluator_name].append(filtered_result[list(filtered_result.keys())[0]])
else:
logger.info(f"Found multiple results for {evaluator_name}. Adding as json string.")
final_results[evaluator_name].append(json.dumps(filtered_result))
final_results = pd.DataFrame(final_results)
logger.info(final_results)
final_results.to_json(command_line_args["evaluated_data"], orient="records", lines=True)
rai_evaluators = [
"HateUnfairnessEvaluator",
"Sexual-Content-Evaluator",
"Hate-and-Unfairness-Evaluator",
"Violent-Content-Evaluator",
"Self-Harm-Related-Content-Evaluator",
]
def run(args):
"""Entry point of model prediction script."""
evaluators = json.loads(args["evaluators"])
evaluators = download_evaluators_and_update_local_path(evaluators)
evaluators, evaluator_configs = load_evaluators(evaluators)
run_evaluation(args, evaluators, evaluator_configs)
if __name__ == "__main__":
args = get_args()
run(args)

Просмотреть файл

@ -0,0 +1,42 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Main script for the online evaluation context."""
import argparse
import preprocess
import evaluate
import postprocess
def get_args():
"""Get arguments from the command line."""
parser = argparse.ArgumentParser()
# Inputs
parser.add_argument("--connection_string", type=str, dest="connection_string")
parser.add_argument("--resource_id", type=str, dest="resource_id")
parser.add_argument("--query", type=str, dest="query")
parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1")
parser.add_argument("--preprocessor_connection_type", type=str, dest="preprocessor_connection_type",
default="user-identity")
parser.add_argument("--cron_expression", type=str, dest="cron_expression", default="0 0 * * *")
parser.add_argument("--preprocessed_data", type=str, dest="preprocessed_data",
default="./preprocessed_data_output.jsonl")
parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl")
parser.add_argument("--evaluators", type=str, dest="evaluators")
args, _ = parser.parse_known_args()
return vars(args)
def run():
"""Entry point for the script."""
args = get_args()
preprocess.run(args)
evaluate.run(args)
postprocess.run(args)
# Evaluate
if __name__ == "__main__":
run()

Просмотреть файл

@ -0,0 +1,113 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Postprocess script for the online evaluation context."""
from argparse import ArgumentParser
import pandas as pd
from time import time_ns
import json
import opentelemetry
from opentelemetry import _logs
from opentelemetry.trace.span import TraceFlags
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_args():
"""Get arguments from the command line."""
parser = ArgumentParser()
# Inputs
parser.add_argument("--preprocessed_data", type=str, dest="preprocessed_data",
default="./preprocessed_data_output.jsonl")
parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl")
parser.add_argument("--connection_string", type=str, dest="connection_string", default=None)
parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1")
args, _ = parser.parse_known_args()
return vars(args)
def configure_logging(args) -> LoggerProvider:
"""Configure logging."""
logger.info("Configuring logging")
provider = LoggerProvider()
_logs.set_logger_provider(provider)
provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter()))
args["connection_string"] = None if args["connection_string"] == "" else args["connection_string"]
provider.add_log_record_processor(
BatchLogRecordProcessor(AzureMonitorLogExporter(connection_string=args["connection_string"])))
logger.info("Logging configured")
return provider
def log_evaluation_event_single(trace_id, span_id, trace_flags, response_id, evaluation):
"""Log evaluation event."""
for name, value in evaluation.items():
attributes = {"event.name": "gen_ai.evaluation.{name}", "gen_ai.evaluation.score": json.dumps(value),
"gen_ai.response_id": response_id}
body = f"gen_ai.evaluation for response_id: {response_id}"
event = opentelemetry.sdk._logs.LogRecord(
timestamp=time_ns(),
observed_timestamp=time_ns(),
trace_id=trace_id,
span_id=span_id,
trace_flags=trace_flags,
severity_text=None,
severity_number=_logs.SeverityNumber.UNSPECIFIED,
body=body,
attributes=attributes
)
_logs.get_logger(__name__).emit(event)
def log_evaluation_event(row) -> None:
"""Log evaluation event."""
if "trace_id" not in row or "span_id" not in row or "evaluation" not in row:
logger.warning("Missing required fields in the row: trace_id, span_id, evaluation")
trace_id = int(row.get("trace_id", "0"), 16)
span_id = int(row.get("span_id", "0"), 16)
trace_flags = TraceFlags(TraceFlags.SAMPLED)
response_id = row.get("gen_ai_response_id", "")
evaluation_results = row.get("evaluation", {})
if isinstance(evaluation_results, dict):
evaluation_results = [evaluation_results]
for evaluation in evaluation_results:
log_evaluation_event_single(trace_id, span_id, trace_flags, response_id, evaluation)
def get_combined_data(preprocessed_data, evaluated_data):
"""Combine preprocessed and evaluated data."""
logger.info("Combining preprocessed and evaluated data.")
preprocessed_df = pd.read_json(preprocessed_data, lines=True)
evaluation_data = []
with open(evaluated_data, 'r') as file:
for line in file:
evaluation_data.append(json.loads(line))
preprocessed_df["evaluation"] = evaluation_data
return preprocessed_df
def run(args):
"""Entry point of model prediction script."""
logger.info(f"Sampling Rate: {args['sampling_rate']}, Connection String: {args['connection_string']}")
provider = configure_logging(args)
data = get_combined_data(args["preprocessed_data"], args["evaluated_data"])
for _, row in data.iterrows():
log_evaluation_event(row)
provider.force_flush()
if __name__ == "__main__":
args = get_args()
run(args)

Просмотреть файл

@ -0,0 +1,101 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Preprocess script for the online evaluation context."""
import pandas as pd
from croniter import croniter
from datetime import datetime
from argparse import ArgumentParser
from azure.monitor.query import LogsQueryStatus
from utils import get_app_insights_client
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_args():
"""Get arguments from the command line."""
parser = ArgumentParser()
# Inputs
parser.add_argument("--resource_id", type=str, dest="resource_id")
parser.add_argument("--query", type=str, dest="query")
parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1")
parser.add_argument("--preprocessor_connection_type", type=str, dest="preprocessor_connection_type",
default="user-identity")
parser.add_argument("--cron_expression", type=str, dest="cron_expression", default="0 0 * * *")
# parser.add_argument("--evaluators", type=str, dest="evaluators")
parser.add_argument("--preprocessed_data", type=str, dest="preprocessed_data",
default="./preprocessed_data_output.jsonl")
args, _ = parser.parse_known_args()
return vars(args)
def calculate_time_window(cron_expression: str):
"""Calculate the time window for a job instance based on the system's current time and its cron schedule."""
# Parse the current time
logger.info(f"Calculating time window for cron expression: {cron_expression}")
current_time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
current_time = datetime.strptime(current_time_str, '%Y-%m-%d %H:%M:%S')
# Initialize croniter based on the cron expression and current time
cron = croniter(cron_expression, current_time)
# Calculate the previous run time (T_prev)
# now = datetime.now().replace(second=0, microsecond=0)
# The below statement would return the current cron's timestamp
_ = cron.get_prev(datetime)
# Window would be from the previous cron's timestamp to the current cron's timestamp
return cron.get_prev(datetime), current_time
# return cron.get_prev(datetime), cron.get_next(datetime)
def get_logs(client, resource_id: str, query: str, start_time: datetime, end_time: datetime):
"""Get logs from the resource."""
try:
logger.info(f"Querying resource: {resource_id}")
response = client.query_resource(resource_id, query, timespan=(start_time, end_time))
if response.status == LogsQueryStatus.SUCCESS:
data = response.tables
else:
# LogsQueryPartialResult
error = response.partial_error
data = response.partial_data
logger.info(error)
if len(data) == 0 or len(data) > 1:
raise Exception(f"Unable to parse query results. Unexpected number of tables: {len(data)}.")
table = data[0]
df = pd.DataFrame(data=table.rows, columns=table.columns)
return df
except Exception as e:
logger.info("something fatal happened")
logger.info(e)
def save_output(result, args):
"""Save output."""
try:
logger.info("Saving output.")
# Todo: One conversation will be split across multiple rows. how to combine them?
result.to_json(args["preprocessed_data"], orient="records", lines=True)
except Exception as e:
logger.info("Unable to save output.")
raise e
def run(args):
"""Entry point of model prediction script."""
logger.info(
f"Connection type: {args['preprocessor_connection_type']}, Resource ID: {args['resource_id']}, Cron "
f"Expression: {args['cron_expression']}, Sampling Rate: {args['sampling_rate']}")
client = get_app_insights_client(use_managed_identity=args["preprocessor_connection_type"] == "managed-identity")
start_time, end_time = calculate_time_window(args["cron_expression"])
logger.info(f"Start Time: {start_time}, End Time: {end_time}")
result = get_logs(client, args["resource_id"], args["query"], start_time, end_time)
save_output(result, args)
if __name__ == "__main__":
args = get_args()
run(args)

Просмотреть файл

@ -0,0 +1,31 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Utility functions for the online evaluation context."""
import importlib
import os
import sys
from promptflow.client import load_flow
def load_evaluator(evaluator):
"""Load the evaluator from the given path."""
print(f"Loading evaluator {evaluator}")
loaded_evaluator = load_flow(evaluator)
print(loaded_evaluator)
print(
f"Loading module {os.getcwd()} {loaded_evaluator.entry.split(':')[0]} from {loaded_evaluator.path.parent.name}"
)
module_path = os.path.join(
loaded_evaluator.path.parent, loaded_evaluator.entry.split(":")[0] + ".py"
)
module_name = loaded_evaluator.entry.split(":")[0]
print(f"Loading module {module_name} from {module_path}")
spec = importlib.util.spec_from_file_location(module_name, module_path)
mod = importlib.util.module_from_spec(spec)
print(f"Loaded module {mod}")
sys.modules[module_name] = mod
spec.loader.exec_module(mod)
eval_class = getattr(mod, loaded_evaluator.entry.split(":")[1])
return eval_class

Просмотреть файл

@ -0,0 +1,118 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Utility functions for the online evaluation context."""
import os
import re
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
from azure.identity import ManagedIdentityCredential
from azure.monitor.query import LogsQueryClient
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_managed_identity_credentials():
"""Get the managed identity credentials."""
client_id = os.environ.get('DEFAULT_IDENTITY_CLIENT_ID', None)
credential = ManagedIdentityCredential(client_id=client_id)
logger.info("ManagedIdentityCredential successfully loaded.")
return credential
def get_user_identity_credentials():
"""Get the user identity or default credentials."""
logger.info("Trying to load AzureMLOnBehalfOfCredential")
credential = AzureMLOnBehalfOfCredential()
logger.info("AzureMLOnBehalfOfCredential successfully loaded.")
return credential
def get_credentials(use_managed_identity=True):
"""Get the credentials."""
try:
if use_managed_identity:
logger.info("Initializing managed identity")
credential = get_managed_identity_credentials()
else:
credential = get_user_identity_credentials()
logger.info("Trying to fetch token for credentials")
except Exception as e:
logger.info("Error while loading credentials")
raise e
return credential
def get_app_insights_client(use_managed_identity):
"""Get the AppInsights client."""
try:
credential = get_credentials(use_managed_identity=use_managed_identity)
async_logs_query_client = LogsQueryClient(credential)
except Exception:
safe_message = (
"Not able to initialize AppInsights client. Please verify that the correct credentials have been provided")
raise Exception(safe_message)
return async_logs_query_client
# logger =
def extract_model_info(model_asset_id):
"""Extract model details from asset id."""
# Define regular expressions for extracting information
workspace_pattern = re.compile(
r"azureml://locations/(?P<location>\w+)/workspaces/(?P<workspace>[\w-]+)/models/(?P<model_name>[\w-]+)/"
r"versions/(?P<version>\d+)")
registry_pattern = re.compile(
r"azureml://registries/(?P<registry>[\w-]+)/models/(?P<model_name>[\w-]+)/versions/(?P<version>\d+)")
# Try to match the input model asset ID with the patterns
workspace_match = workspace_pattern.match(model_asset_id)
registry_match = registry_pattern.match(model_asset_id)
if workspace_match:
# Extract information for workspace registered model
info = workspace_match.groupdict()
info['type'] = 'workspace_registered'
elif registry_match:
# Extract information for registry registered model
info = registry_match.groupdict()
info['type'] = 'registry_registered'
else:
# If neither pattern matches, return None
return None
return info
def get_mlclient(
workspace_name: str = None, resource_group_name: str = None, subscription_id: str = None,
registry_name: str = None
):
"""Return ML Client.
:param workspace_name: Workspace name
:type workspace_name: MLClient
:param resource_group_name: resource group
:type resource_group_name: str
:param subscription_id: subscription ID
:type subscription_id: str
:param registry_name: registry name
:type registry_name: str
:return: MLClient object for workspace or registry
:rtype: MLClient
"""
credential = get_credentials(use_managed_identity=True)
if registry_name is None:
logger.info(f"Creating MLClient with sub: {subscription_id}, rg: {resource_group_name}, ws: {workspace_name}")
return MLClient(
credential=credential,
subscription_id=subscription_id,
resource_group_name=resource_group_name,
workspace_name=workspace_name,
)
logger.info(f"Creating MLClient with registry name {registry_name}")
return MLClient(credential=credential, registry_name=registry_name)

Просмотреть файл

@ -1,3 +1,11 @@
azure-ai-evaluation~=1.0.0b4
openai=={{latest-pypi-version}}
azureml-mlflow=={{latest-pypi-version}}
azure-ai-ml=={{latest-pypi-version}}
opentelemetry-api=={{latest-pypi-version}}
opentelemetry-sdk=={{latest-pypi-version}}
azure-monitor-query=={{latest-pypi-version}}
croniter=={{latest-pypi-version}}
azure-monitor-opentelemetry=={{latest-pypi-version}}
promptflow-azure=={{latest-pypi-version}}
azure-identity=={{latest-pypi-version}}
azure-ai-ml=={{latest-pypi-version}}