add debug logs for firing, emitting, execution (#256)

* add debug logs for firing, emitting, execution

* formatting
This commit is contained in:
Chris Trevino 2024-07-01 12:19:04 -07:00 коммит произвёл GitHub
Родитель 1668d4cc4f
Коммит c8e1eb6e1b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 21 добавлений и 6 удалений

Просмотреть файл

@ -1,6 +1,6 @@
[tool.poetry]
name = "reactivedataflow"
version = "0.1.6"
version = "0.1.7"
description = "Reactive Dataflow Graphs"
license = "MIT"
authors = ["Chris Trevino <chtrevin@microsoft.com>"]

Просмотреть файл

@ -1,6 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Emit Conditions Decorator."""
import logging
from collections.abc import Callable
from reactivedataflow.nodes import (
@ -10,6 +11,8 @@ from reactivedataflow.nodes import (
VerbOutput,
)
_log = logging.getLogger(__name__)
def emit_conditions(
*conditions: EmitCondition,
@ -23,8 +26,10 @@ def emit_conditions(
condition(inputs, result) for condition in conditions
)
if not are_conditions_met:
_log.debug("Emit conditions not met for %s", fn.__qualname__)
return VerbOutput(no_output=True)
_log.debug("Emit conditions met for %s", fn.__qualname__)
return result
wrapped_fn.__qualname__ = f"{fn.__qualname__}_wrapemitcond"

Просмотреть файл

@ -1,10 +1,13 @@
# Copyright (c) 2024 Microsoft Corporation.
"""reactivedataflow Firing Conditions Decorator."""
import logging
from collections.abc import Callable
from reactivedataflow.nodes import FireCondition, VerbFunction, VerbInput, VerbOutput
_log = logging.getLogger(__name__)
def fire_conditions(
*conditions: FireCondition,
@ -15,7 +18,10 @@ def fire_conditions(
async def wrapped_fn(inputs: VerbInput) -> VerbOutput:
are_conditions_met = all(condition(inputs) for condition in conditions)
if not are_conditions_met:
_log.debug("Firing conditions not met for %s", fn.__qualname__)
return VerbOutput(no_output=True)
_log.debug("Firing conditions met for %s", fn.__qualname__)
return await fn(inputs)
wrapped_fn.__qualname__ = f"{fn.__qualname__}_wrapfirecond"

Просмотреть файл

@ -2,6 +2,7 @@
"""The reactivedataflow ExecutionNode class."""
import asyncio
import logging
from typing import Any
import reactivex as rx
@ -12,6 +13,8 @@ from .io import VerbInput
from .node import Node
from .types import VerbFunction
_log = logging.getLogger(__name__)
class ExecutionNode(Node):
"""The ExecutionNode class for dynamic processing graphs."""
@ -57,7 +60,7 @@ class ExecutionNode(Node):
self._outputs = {}
self._tasks = []
# fire a recompute
self._schedule_recompute()
self._schedule_recompute("init")
def _output(self, name: str) -> rx.subject.BehaviorSubject:
"""Get the subject of a given output."""
@ -79,7 +82,7 @@ class ExecutionNode(Node):
def config(self, value: dict[str, Any]) -> None:
"""Set the configuration of the node."""
self._config = value
self._schedule_recompute()
self._schedule_recompute("config")
def output(self, name: str = default_output) -> rx.Observable[Any]:
"""Get the observable of a given output."""
@ -130,11 +133,11 @@ class ExecutionNode(Node):
def on_named_value(value: Any, name: str) -> None:
self._named_input_values[name] = value
self._schedule_recompute()
self._schedule_recompute("named_value")
def on_array_value(value: Any, i: int) -> None:
self._array_input_values[i] = value
self._schedule_recompute()
self._schedule_recompute("array_value")
# Detach from inputs
self.detach()
@ -152,7 +155,8 @@ class ExecutionNode(Node):
sub = source.subscribe(lambda v, i=i: on_array_value(v, i))
self._subscriptions.append(sub)
def _schedule_recompute(self) -> None:
def _schedule_recompute(self, cause: str | None) -> None:
_log.debug(f"recompute scheduled for {self._id} due to {cause or 'unknown'}")
task = asyncio.create_task(self._recompute())
task.add_done_callback(lambda _: self._tasks.remove(task))
self._tasks.append(task)