This commit is contained in:
annatisch 2018-09-21 14:51:16 -07:00
Родитель b391045585
Коммит da94d065bb
10 изменённых файлов: 40 добавлений и 50 удалений

Просмотреть файл

@ -173,12 +173,11 @@ class AsyncReceiver(Receiver):
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not await self._handler._client_ready_async():
if not await self._handler._client_ready_async():
return False
else:
return True
return True
async def close_async(self, exception=None):
"""
@ -193,7 +192,7 @@ class AsyncReceiver(Receiver):
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
@ -237,21 +236,19 @@ class AsyncReceiver(Receiver):
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receive failed: {}".format(e))

Просмотреть файл

@ -158,12 +158,11 @@ class AsyncSender(Sender):
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not await self._handler._client_ready_async():
if not await self._handler._client_ready_async():
return False
else:
return True
return True
async def close_async(self, exception=None):
"""
@ -178,7 +177,7 @@ class AsyncSender(Sender):
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception

Просмотреть файл

@ -330,8 +330,6 @@ class EventHubClient(object):
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
except:
raise
finally:
mgmt_client.close()

Просмотреть файл

@ -32,13 +32,13 @@ def _error_handler(error):
"""
if error.condition == b'com.microsoft:server-busy':
return errors.ErrorAction(retry=True, backoff=4)
elif error.condition == b'com.microsoft:timeout':
if error.condition == b'com.microsoft:timeout':
return errors.ErrorAction(retry=True, backoff=2)
elif error.condition == b'com.microsoft:operation-cancelled':
if error.condition == b'com.microsoft:operation-cancelled':
return errors.ErrorAction(retry=True)
elif error.condition == b"com.microsoft:container-close":
if error.condition == b"com.microsoft:container-close":
return errors.ErrorAction(retry=True, backoff=4)
elif error.condition in _NO_RETRY_ERRORS:
if error.condition in _NO_RETRY_ERRORS:
return errors.ErrorAction(retry=False)
return errors.ErrorAction(retry=True)
@ -269,7 +269,7 @@ class Offset(object):
if isinstance(self.value, datetime.datetime):
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
elif isinstance(self.value, int):
if isinstance(self.value, int):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

Просмотреть файл

@ -170,12 +170,11 @@ class Receiver:
timeout, auth_in_progress = self._handler._auth.handle_token()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not self._handler._client_ready():
if not self._handler._client_ready():
return False
else:
return True
return True
def close(self, exception=None):
"""
@ -190,7 +189,7 @@ class Receiver:
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
@ -243,18 +242,16 @@ class Receiver:
if shutdown.action.retry and self.auto_reconnect:
self.reconnect()
return data_batch
else:
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
self.reconnect()
return data_batch
else:
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except Exception as e:
error = EventHubError("Receive failed: {}".format(e))
self.close(exception=error)

Просмотреть файл

@ -154,12 +154,11 @@ class Sender:
timeout, auth_in_progress = self._handler._auth.handle_token()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not self._handler._client_ready():
if not self._handler._client_ready():
return False
else:
return True
return True
def close(self, exception=None):
"""
@ -174,7 +173,7 @@ class Sender:
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception

Просмотреть файл

@ -36,8 +36,8 @@ class EventHubPartitionPump(PartitionPump):
_opened_ok = True
except Exception as err: # pylint: disable=broad-except
_logger.warning(
"%r,%r PartitionPumpWarning: Failure creating client or receiver, " +
"retrying: %r", self.host.guid, self.partition_context.partition_id, err)
"%r,%r PartitionPumpWarning: Failure creating client or receiver, retrying: %r",
self.host.guid, self.partition_context.partition_id, err)
last_exception = err
_retry_count += 1

Просмотреть файл

@ -121,7 +121,7 @@ class PartitionContext:
self.lease.offset = checkpoint.offset
self.lease.sequence_number = checkpoint.sequence_number
else:
_logger.error(
_logger.error( # pylint: disable=logging-not-lazy
"Ignoring out of date checkpoint with offset %r/sequence number %r because " +
"current persisted checkpoint has higher offset %r/sequence number %r",
checkpoint.offset,

Просмотреть файл

@ -143,7 +143,7 @@ class PartitionPump():
# CloseAsync are protected by synchronizing too.
try:
last = events[-1]
if last != None:
if last is not None:
self.partition_context.set_offset_and_sequence_number(last)
await self.processor.process_events_async(self.partition_context, events)
except Exception as err: # pylint: disable=broad-except

Просмотреть файл

@ -6,7 +6,7 @@ reports=no
# For all codes, run 'pylint --list-msgs' or go to 'https://pylint.readthedocs.io/en/latest/reference_guide/features.html'
# locally-disabled: Warning locally suppressed using disable-msg
# cyclic-import: because of https://github.com/PyCQA/pylint/issues/850
disable=raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods
disable=useless-object-inheritance,raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods
[FORMAT]
max-line-length=120