Add type annotations for redis provider (#9815)

This commit is contained in:
Alexander Sutcliffe 2020-07-14 15:52:38 +02:00 коммит произвёл GitHub
Родитель 0eb5020fda
Коммит 0a2acf0b65
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 16 добавлений и 10 удалений

Просмотреть файл

@ -33,7 +33,7 @@ class RedisHook(BaseHook):
Also you can set ssl parameters as:
``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``.
"""
def __init__(self, redis_conn_id='redis_default'):
def __init__(self, redis_conn_id: str = 'redis_default') -> None:
"""
Prepares hook to connect to a Redis database.

Просмотреть файл

@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
from typing import Dict
from airflow.models import BaseOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.decorators import apply_defaults
@ -38,17 +40,17 @@ class RedisPublishOperator(BaseOperator):
@apply_defaults
def __init__(
self,
channel,
message,
redis_conn_id='redis_default',
*args, **kwargs):
channel: str,
message: str,
redis_conn_id: str = 'redis_default',
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.redis_conn_id = redis_conn_id
self.channel = channel
self.message = message
def execute(self, context):
def execute(self, context: Dict) -> None:
"""
Publish the message to Redis channel

Просмотреть файл

@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@ -28,11 +30,11 @@ class RedisKeySensor(BaseSensorOperator):
ui_color = '#f0eee4'
@apply_defaults
def __init__(self, key, redis_conn_id, *args, **kwargs):
def __init__(self, key: str, redis_conn_id: str, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.redis_conn_id = redis_conn_id
self.key = key
def poke(self, context):
def poke(self, context: Dict) -> bool:
self.log.info('Sensor checks for existence of key: %s', self.key)
return RedisHook(self.redis_conn_id).get_conn().exists(self.key)

Просмотреть файл

@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
from typing import Dict, List, Union
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@ -34,14 +36,14 @@ class RedisPubSubSensor(BaseSensorOperator):
ui_color = '#f0eee4'
@apply_defaults
def __init__(self, channels, redis_conn_id, *args, **kwargs):
def __init__(self, channels: Union[List[str], str], redis_conn_id: str, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.channels = channels
self.redis_conn_id = redis_conn_id
self.pubsub = RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub()
self.pubsub.subscribe(self.channels)
def poke(self, context):
def poke(self, context: Dict) -> bool:
"""
Check for message on subscribed channels and write to xcom the message with key ``message``