Added QueueBase and SenderBase

This commit is contained in:
Bogdan Berce 2014-12-20 19:41:16 -08:00
Родитель 62157104b4
Коммит 275f2919af
4 изменённых файлов: 187 добавлений и 10 удалений

11
.gitignore поставляемый
Просмотреть файл

@ -1,10 +1,8 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
@ -21,17 +19,14 @@ var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
@ -39,21 +34,17 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# User-specific files
*.suo
*.user
*.sln.docstates
.idea/

Просмотреть файл

@ -0,0 +1,74 @@
try:
# Python 2.x
from Queue import Queue
except ImportError:
# Python 3.x
from queue import Queue
class QueueBase(object):
"""The base class for all types of queues for use in conjunction with an implementation of :class:`SenderBase`.
The queue will notify the sender that it needs to pick up items when it reaches :func:`max_queue_length`,
or when the consumer calls :func:`flush`.
"""
def __init__(self, sender):
"""Initializes a new instance of the class.
Args:
sender (:class:`SenderBase`) the sender object that will be used in conjunction with this queue.
"""
self._queue = Queue()
self._max_queue_length = 500
self._sender = sender
if sender:
self._sender.queue = self
@property
def max_queue_length(self):
"""The maximum number of items that will be held by the queue before the queue will call the :func:`flush`
method.
Returns:
int. the maximum queue size. (defaults to: 500)
"""
return self._max_queue_length
@max_queue_length.setter
def max_queue_length(self, value):
"""The maximum number of items that will be held by the queue before the queue will call the :func:`flush`
method.
Args:
value (int): The value for the maximum queue length. The minimum allowed value is 1.
"""
if value < 1:
value = 1
self._max_queue_length = value
@property
def sender(self):
"""The sender that is associated with this queue that this queue will use to send data to the service.
Returns:
:class:`SenderBase`. the sender object.
"""
return self._max_queue_length
def put(self, item):
"""Adds the passed in item object to the queue and calls :func:`flush` if the size of the queue is larger
than :func:`max_queue_length`. This method does nothing if the passed in item is None.
Args:
item (:class:`contracts.Envelope`) item the telemetry envelope object to send to the service.
"""
if not item:
return
self._queue.put(item)
if self._queue.qsize() >= self._max_queue_length:
self.flush()
def flush(self):
"""Flushes the current queue by notifying the {#sender}. This method needs to be overridden by a concrete
implementations of the queue class.
"""
pass

Просмотреть файл

@ -0,0 +1,110 @@
import json
try:
# Python 2.x
import urllib2 as HTTPClient
from urllib2 import HTTPError
except ImportError:
# Python 3.x
import urllib.request as HTTPClient
from urllib.error import HTTPError
class SenderBase(object):
"""The base class for all types of senders for use in conjunction with an implementation of :class:`QueueBase`.
The queue will notify the sender that it needs to pick up items. The concrete sender implementation will
listen to these notifications and will pull items from the queue getting at most :func:`send_buffer_size` items.
It will then call :func:`send` using the list of items pulled from the queue.
"""
def __init__(self, service_endpoint_uri):
"""Initializes a new instance of the class.
Args:
service_endpoint_uri (str) the address of the service to send telemetry data to.
"""
self._service_endpoint_uri = service_endpoint_uri
self._queue = None
self._send_buffer_size = 100
@property
def service_endpoint_uri(self):
"""The HTTP or HTTPS endpoint that this sender will send data to.
Returns:
str. the service endpoint URI.
"""
return self._service_endpoint_uri
@service_endpoint_uri.setter
def service_endpoint_uri(self, value):
"""The service endpoint URI where this sender will send data to.
Args:
value (str): the service endpoint URI.
"""
self._service_endpoint_uri = value
@property
def queue(self):
"""The queue that this sender is draining. While :class:`SenderBase` doesn't implement any means of doing
so, derivations of this class do.
Returns:
:class:`QueueBase`. the queue instance that this sender is draining.
"""
return self._queue
@queue.setter
def queue(self, value):
"""The queue that this sender is draining. While :class:`SenderBase` doesn't implement any means of doing
so, derivations of this class do.
Args:
value (:class:`QueueBase`). the queue instance that this sender is draining.
"""
self._queue = value
@property
def send_buffer_size(self):
"""The buffer size for a single batch of telemetry. This is the maximum number of items in a single service
request that this sender is going to send.
Returns:
int. the maximum number of items in a telemetry batch.
"""
return self._send_buffer_size
@service_endpoint_uri.setter
def send_buffer_size(self, value):
"""The buffer size for a single batch of telemetry. This is the maximum number of items in a single service
request that this sender is going to send.
Args:
value (int): the maximum number of items in a telemetry batch.
"""
self._send_buffer_size = value
def send(self, data_to_send):
""" Immediately sends the data passed in to :func:`service_endpoint_uri`. If the service request fails, the
passed in items are pushed back to the :func:`queue`.
Args:
data_to_send (Array): an array of :class:`contracts.Envelope` objects to send to the service.
"""
request_payload = json.dumps([ a.write() for a in data_to_send ])
request = HTTPClient.Request(self._service_endpoint_uri, bytearray(request_payload, 'utf-8'), { 'Accept': 'application/json', 'Content-Type' : 'application/json; charset=utf-8' })
try:
response = HTTPClient.urlopen(request)
status_code = response.getcode()
if 200 <= status_code < 300:
return
except HTTPError as e:
if e.getcode() == 400:
return
except Exception as e:
pass
# Add our unsent data back on to the queue
for data in data_to_send:
self._queue.put(data)

Просмотреть файл

@ -1,3 +1,5 @@
from .SenderBase import SenderBase
from .QueueBase import QueueBase
from .TelemetryChannel import TelemetryChannel
from .TelemetryContext import TelemetryContext
from . import contracts