diff --git a/.gitignore b/.gitignore index 235337f..cec5637 100644 --- a/.gitignore +++ b/.gitignore @@ -2,59 +2,6 @@ __pycache__/ *.py[cod] -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*,cover - -# Translations -*.mo -*.pot - -# Django stuff: -*.log - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ /.idea /MoDataSubmission.iml /pyLibrary/.svn diff --git a/pyLibrary/env/README.md b/pyLibrary/env/README.md new file mode 100644 index 0000000..07a02eb --- /dev/null +++ b/pyLibrary/env/README.md @@ -0,0 +1,64 @@ + +Environment +=========== + +This directory is for connecting to other systems. Generally, these +classes are facades that assume content is UTF-8 encoded JSON. + + +files +----- + +The `File` class makes the default assumption all files have cr-delimited +unicode content that is UTF-8 encoded. This is great for json files. +It also provides better OO over some common file manipulations. + + +emailer +------- + +A simple emailer, the primary purpose is to accept a [Dict](../dot/README.md) +of settings. + + +pulse +----- + +For connecting clients to [Mozilla's Pulse](https://pulse.mozilla.org/). + + +elasticsearch +------------- + +This module handles the lifecycle of an Elasticsearch index in the context of +ETL. You only need this module if you are creating and retiring indexes. You +do not need this module for simply searching; for that I suggest using the +rest API directly. + +###Settings### + +Both ```Cluster``` and ```Index``` objects accept the same settings dict, +selecting only the properties it requires. + + { + "host" : "http://192.168.0.98", + "port" : 9200, + "index" : "b2g_tests", + "type" : "test_result", + "debug" : true, + "limit_replicas" : true, + "schema_file" : "./resources/schema/test_schema.json" + }, + + + + + + + +Cluster +------- + + +Index +----- diff --git a/pyLibrary/env/__init__.py b/pyLibrary/env/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyLibrary/env/big_data.py b/pyLibrary/env/big_data.py new file mode 100644 index 0000000..4db1fa6 --- /dev/null +++ b/pyLibrary/env/big_data.py @@ -0,0 +1,311 @@ +# encoding: utf-8 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import + +import gzip +from io import BytesIO +from tempfile import TemporaryFile +import zipfile +import zlib + +from pyLibrary.debugs.logs import Log +from pyLibrary.maths import Math + +# LIBRARY TO DEAL WITH BIG DATA ARRAYS AS ITERATORS OVER (IR)REGULAR SIZED +# BLOCKS, OR AS ITERATORS OVER LINES + + +MIN_READ_SIZE = 8 * 1024 +MAX_STRING_SIZE = 1 * 1024 * 1024 + + +class FileString(object): + """ + ACTS LIKE A STRING, BUT IS A FILE + """ + + def __init__(self, file): + self.file = file + + def decode(self, encoding): + if encoding != "utf8": + Log.error("can not handle {{encoding}}", encoding= encoding) + self.encoding = encoding + return self + + def split(self, sep): + if sep != "\n": + Log.error("Can only split by lines") + self.file.seek(0) + return LazyLines(self.file) + + def __len__(self): + temp = self.file.tell() + self.file.seek(0, 2) + file_length = self.file.tell() + self.file.seek(temp) + return file_length + + def __getslice__(self, i, j): + self.file.seek(i) + output = self.file.read(j - i).decode(self.encoding) + return output + + def __add__(self, other): + self.file.seek(0, 2) + self.file.write(other) + + def __radd__(self, other): + new_file = TemporaryFile() + new_file.write(other) + self.file.seek(0) + for l in self.file: + new_file.write(l) + new_file.seek(0) + return FileString(new_file) + + def __getattr__(self, attr): + return getattr(self.file, attr) + + def __del__(self): + self.file, temp = None, self.file + if temp: + temp.close() + + def __iter__(self): + self.file.seek(0) + return self.file + + +def safe_size(source): + """ + READ THE source UP TO SOME LIMIT, THEN COPY TO A FILE IF TOO BIG + RETURN A str() OR A FileString() + """ + + if source is None: + return None + + total_bytes = 0 + bytes = [] + b = source.read(MIN_READ_SIZE) + while b: + total_bytes += len(b) + bytes.append(b) + if total_bytes > MAX_STRING_SIZE: + try: + data = FileString(TemporaryFile()) + for bb in bytes: + data.write(bb) + del bytes + del bb + b = source.read(MIN_READ_SIZE) + while b: + total_bytes += len(b) + data.write(b) + b = source.read(MIN_READ_SIZE) + data.seek(0) + Log.note("Using file of size {{length}} instead of str()", length= total_bytes) + + return data + except Exception, e: + Log.error("Could not write file > {{num}} bytes", num= total_bytes, cause=e) + b = source.read(MIN_READ_SIZE) + + data = b"".join(bytes) + del bytes + return data + + +class LazyLines(object): + """ + SIMPLE LINE ITERATOR, BUT WITH A BIT OF CACHING TO LOOK LIKE AN ARRAY + """ + + def __init__(self, source, encoding="utf8"): + """ + ASSUME source IS A LINE ITERATOR OVER utf8 ENCODED BYTE STREAM + """ + self.source = source + self.encoding = encoding + self._iter = self.__iter__() + self._last = None + self._next = 0 + + def __getslice__(self, i, j): + if i == self._next: + return self._iter + Log.error("Do not know how to slice this generator") + + def __iter__(self): + def output(encoding): + for v in self.source: + if not encoding: + self._last = v + else: + self._last = v.decode(encoding) + self._next += 1 + yield self._last + + return output(self.encoding) + + def __getitem__(self, item): + try: + if item == self._next: + return self._iter.next() + elif item == self._next - 1: + return self._last + else: + Log.error("can not index out-of-order too much") + except Exception, e: + Log.error("Problem indexing", e) + + +class CompressedLines(LazyLines): + """ + KEEP COMPRESSED HTTP (content-type: gzip) IN BYTES ARRAY + WHILE PULLING OUT ONE LINE AT A TIME FOR PROCESSING + """ + + def __init__(self, compressed, encoding="utf8"): + """ + USED compressed BYTES TO DELIVER LINES OF TEXT + LIKE LazyLines, BUT HAS POTENTIAL TO seek() + """ + self.compressed = compressed + LazyLines.__init__(self, None, encoding=encoding) + self._iter = self.__iter__() + + def __iter__(self): + return LazyLines(ibytes2ilines(compressed_bytes2ibytes(self.compressed, MIN_READ_SIZE)), self.encoding).__iter__() + + def __getslice__(self, i, j): + if i == self._next: + return self._iter + + if i == 0: + return self.__iter__() + + if i == self._next - 1: + def output(): + yield self._last + for v in self._iter: + yield v + + return output() + Log.error("Do not know how to slice this generator") + + def __getitem__(self, item): + try: + if item == self._next: + self._last = self._iter.next() + self._next += 1 + return self._last + elif item == self._next - 1: + return self._last + else: + Log.error("can not index out-of-order too much") + except Exception, e: + Log.error("Problem indexing", e) + + + def __radd__(self, other): + new_file = TemporaryFile() + new_file.write(other) + self.file.seek(0) + for l in self.file: + new_file.write(l) + new_file.seek(0) + return FileString(new_file) + + +def compressed_bytes2ibytes(compressed, size): + """ + CONVERT AN ARRAY OF BYTES TO A BYTE-BLOCK GENERATOR + USEFUL IN THE CASE WHEN WE WANT TO LIMIT HOW MUCH WE FEED ANOTHER + GENERATOR (LIKE A DECOMPRESSOR) + """ + + decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) + + for i in range(0, Math.ceiling(len(compressed), size), size): + try: + block = compressed[i: i + size] + yield decompressor.decompress(block) + except Exception, e: + Log.error("Not expected", e) + +def ibytes2ilines(stream): + """ + CONVERT A GENERATOR OF (ARBITRARY-SIZED) byte BLOCKS + TO A LINE (CR-DELIMITED) GENERATOR + """ + _buffer = stream.next() + s = 0 + e = _buffer.find(b"\n") + while True: + while e == -1: + try: + next_block = stream.next() + _buffer = _buffer[s:] + next_block + s = 0 + e = _buffer.find(b"\n") + except StopIteration: + _buffer = _buffer[s:] + del stream + yield _buffer + return + + yield _buffer[s:e] + s = e + 1 + e = _buffer.find(b"\n", s) + +def sbytes2ilines(stream): + """ + CONVERT A STREAM OF (ARBITRARY-SIZED) byte BLOCKS + TO A LINE (CR-DELIMITED) GENERATOR + """ + def read(): + output = stream.read(MIN_READ_SIZE) + return output + + return ibytes2ilines({"next": read}) + + +class GzipLines(CompressedLines): + """ + SAME AS CompressedLines, BUT USING THE GzipFile FORMAT FOR COMPRESSED BYTES + """ + + def __init__(self, compressed, encoding="utf8"): + CompressedLines.__init__(self, compressed, encoding=encoding) + + def __iter__(self): + buff = BytesIO(self.compressed) + return LazyLines(gzip.GzipFile(fileobj=buff, mode='r'), encoding=self.encoding).__iter__() + + +class ZipfileLines(CompressedLines): + """ + SAME AS CompressedLines, BUT USING THE ZipFile FORMAT FOR COMPRESSED BYTES + """ + + def __init__(self, compressed, encoding="utf8"): + CompressedLines.__init__(self, compressed, encoding=encoding) + + def __iter__(self): + buff = BytesIO(self.compressed) + archive = zipfile.ZipFile(buff, mode='r') + names = archive.namelist() + if len(names) != 1: + Log.error("*.zip file has {{num}} files, expecting only one.", num= len(names)) + stream = archive.open(names[0], "r") + return LazyLines(sbytes2ilines(stream), encoding=self.encoding).__iter__() diff --git a/pyLibrary/env/elasticsearch.py b/pyLibrary/env/elasticsearch.py new file mode 100644 index 0000000..55cb8d0 --- /dev/null +++ b/pyLibrary/env/elasticsearch.py @@ -0,0 +1,1154 @@ +# encoding: utf-8 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from collections import Mapping +from copy import deepcopy +from datetime import datetime +import re +import time + +from pyLibrary import convert, strings +from pyLibrary.debugs.logs import Log +from pyLibrary.dot import coalesce, Null, Dict, set_default, join_field, split_field, unwraplist, listwrap, literal_field +from pyLibrary.dot.lists import DictList +from pyLibrary.dot import wrap +from pyLibrary.env import http +from pyLibrary.jsons.typed_encoder import json2typed +from pyLibrary.maths.randoms import Random +from pyLibrary.maths import Math +from pyLibrary.meta import use_settings +from pyLibrary.queries import qb +from pyLibrary.strings import utf82unicode +from pyLibrary.thread.threads import ThreadedQueue, Thread, Lock +from pyLibrary.times.durations import MINUTE + + +ES_NUMERIC_TYPES = ["long", "integer", "double", "float"] +ES_PRIMITIVE_TYPES = ["string", "boolean", "integer", "date", "long", "double"] + + +class Features(object): + pass + + +class Index(Features): + """ + AN ElasticSearch INDEX LIFETIME MANAGEMENT TOOL + + ElasticSearch'S REST INTERFACE WORKS WELL WITH PYTHON AND JAVASCRIPT + SO HARDLY ANY LIBRARY IS REQUIRED. IT IS SIMPLER TO MAKE HTTP CALLS + DIRECTLY TO ES USING YOUR FAVORITE HTTP LIBRARY. I HAVE SOME + CONVENIENCE FUNCTIONS HERE, BUT IT'S BETTER TO MAKE YOUR OWN. + + THIS CLASS IS TO HELP DURING ETL, CREATING INDEXES, MANAGING ALIASES + AND REMOVING INDEXES WHEN THEY HAVE BEEN REPLACED. IT USES A STANDARD + SUFFIX (YYYYMMDD-HHMMSS) TO TRACK AGE AND RELATIONSHIP TO THE ALIAS, + IF ANY YET. + + """ + @use_settings + def __init__( + self, + index, # NAME OF THE INDEX, EITHER ALIAS NAME OR FULL VERSION NAME + type=None, # SCHEMA NAME, (DEFAULT TO TYPE IN INDEX, IF ONLY ONE) + alias=None, + explore_metadata=True, # PROBING THE CLUSTER FOR METADATA IS ALLOWED + read_only=True, + tjson=False, # STORED AS TYPED JSON + timeout=None, # NUMBER OF SECONDS TO WAIT FOR RESPONSE, OR SECONDS TO WAIT FOR DOWNLOAD (PASSED TO requests) + debug=False, # DO NOT SHOW THE DEBUG STATEMENTS + settings=None + ): + if index==None: + Log.error("not allowed") + if index == alias: + Log.error("must have a unique index name") + + self.cluster_state = None + self.debug = debug + self.settings = settings + self.cluster = Cluster(settings) + + try: + full_index = self.get_index(index) + if full_index and alias==None: + settings.alias = settings.index + settings.index = full_index + if full_index==None: + Log.error("not allowed") + if type == None: + # NO type PROVIDED, MAYBE THERE IS A SUITABLE DEFAULT? + with self.cluster.metadata_locker: + index_ = self.cluster._metadata.indices[self.settings.index] + if not index_: + indices = self.cluster.get_metadata(index=self.settings.index).indices + index_ = indices[self.settings.index] + + candidate_types = list(index_.mappings.keys()) + if len(candidate_types) != 1: + Log.error("Expecting `type` parameter") + self.settings.type = type = candidate_types[0] + except Exception, e: + # EXPLORING (get_metadata()) IS NOT ALLOWED ON THE PUBLIC CLUSTER + Log.error("not expected", cause=e) + + if not type: + Log.error("not allowed") + + self.path = "/" + full_index + "/" + type + + if self.debug: + Log.alert("elasticsearch debugging for {{url}} is on", url=self.url) + + @property + def url(self): + return self.cluster.path.rstrip("/") + "/" + self.path.lstrip("/") + + def get_schema(self, retry=True): + if self.settings.explore_metadata: + indices = self.cluster.get_metadata().indices + index = indices[self.settings.index] + + if index == None and retry: + #TRY AGAIN, JUST IN CASE + self.cluster.cluster_state = None + return self.get_schema(retry=False) + + if not index.mappings[self.settings.type]: + Log.error("ElasticSearch index ({{index}}) does not have type ({{type}})", self.settings) + return index.mappings[self.settings.type] + else: + mapping = self.cluster.get(self.path + "/_mapping") + if not mapping[self.settings.type]: + Log.error("{{index}} does not have type {{type}}", self.settings) + return wrap({"mappings": mapping[self.settings.type]}) + + def delete_all_but_self(self): + """ + DELETE ALL INDEXES WITH GIVEN PREFIX, EXCEPT name + """ + prefix = self.settings.alias + name = self.settings.index + + if prefix == name: + Log.note("{{index_name}} will not be deleted", index_name= prefix) + for a in self.cluster.get_aliases(): + # MATCH YYMMDD_HHMMSS FORMAT + if re.match(re.escape(prefix) + "\\d{8}_\\d{6}", a.index) and a.index != name: + self.cluster.delete_index(a.index) + + def add_alias(self, alias=None): + if alias: + self.cluster_state = None + self.cluster.post( + "/_aliases", + data={ + "actions": [ + {"add": {"index": self.settings.index, "alias": alias}} + ] + }, + timeout=coalesce(self.settings.timeout, 30) + ) + else: + # SET ALIAS ACCORDING TO LIFECYCLE RULES + self.cluster_state = None + self.cluster.post( + "/_aliases", + data={ + "actions": [ + {"add": {"index": self.settings.index, "alias": self.settings.alias}} + ] + }, + timeout=coalesce(self.settings.timeout, 30) + ) + + def get_index(self, alias): + """ + RETURN THE INDEX USED BY THIS alias + """ + alias_list = self.cluster.get_aliases() + output = sort([ + a.index + for a in alias_list + if a.alias == alias or + a.index == alias or + (re.match(re.escape(alias) + "\\d{8}_\\d{6}", a.index) and a.index != alias) + ]) + + if len(output) > 1: + Log.error("only one index with given alias==\"{{alias}}\" expected", alias= alias) + + if not output: + return Null + + return output.last() + + def is_proto(self, index): + """ + RETURN True IF THIS INDEX HAS NOT BEEN ASSIGNED ITS ALIAS + """ + for a in self.cluster.get_aliases(): + if a.index == index and a.alias: + return False + return True + + def flush(self): + self.cluster.post("/" + self.settings.index + "/_refresh") + + def delete_record(self, filter): + if self.settings.read_only: + Log.error("Index opened in read only mode, no changes allowed") + self.cluster.get_metadata() + + if self.cluster.cluster_state.version.number.startswith("0.90"): + query = {"filtered": { + "query": {"match_all": {}}, + "filter": filter + }} + elif self.cluster.cluster_state.version.number.startswith("1.0"): + query = {"query": {"filtered": { + "query": {"match_all": {}}, + "filter": filter + }}} + else: + raise NotImplementedError + + if self.debug: + Log.note("Delete bugs:\n{{query}}", query= query) + + result = self.cluster.delete( + self.path + "/_query", + data=convert.value2json(query), + timeout=60 + ) + + for name, status in result._indices.items(): + if status._shards.failed > 0: + Log.error("Failure to delete from {{index}}", index=name) + + + def extend(self, records): + """ + records - MUST HAVE FORM OF + [{"value":value}, ... {"value":value}] OR + [{"json":json}, ... {"json":json}] + OPTIONAL "id" PROPERTY IS ALSO ACCEPTED + """ + if self.settings.read_only: + Log.error("Index opened in read only mode, no changes allowed") + lines = [] + try: + for r in records: + id = r.get("id") + + if id == None: + id = random_id() + + if "json" in r: + # if id != coalesce(wrap(convert.json2value(r["json"])).value._id, id): + # Log.error("expecting _id to match") + json = r["json"] + elif "value" in r: + # if id != coalesce(wrap(r).value._id, id): + # Log.error("expecting _id to match") + json = convert.value2json(r["value"]) + else: + json = None + Log.error("Expecting every record given to have \"value\" or \"json\" property") + + lines.append('{"index":{"_id": ' + convert.value2json(id) + '}}') + if self.settings.tjson: + lines.append(json2typed(json)) + else: + lines.append(json) + del records + + if not lines: + return + + try: + data_bytes = "\n".join(lines) + "\n" + data_bytes = data_bytes.encode("utf8") + except Exception, e: + Log.error("can not make request body from\n{{lines|indent}}", lines=lines, cause=e) + + + response = self.cluster.post( + self.path + "/_bulk", + data=data_bytes, + headers={"Content-Type": "text"}, + timeout=self.settings.timeout + ) + items = response["items"] + + for i, item in enumerate(items): + if self.cluster.version.startswith("0.90."): + if not item.index.ok: + Log.error( + "{{error}} while loading line:\n{{line}}", + error=item.index.error, + line=lines[i * 2 + 1] + ) + elif any(map(self.cluster.version.startswith, ["1.4.", "1.5.", "1.6.", "1.7."])): + if item.index.status not in [200, 201]: + Log.error( + "{{num}} {{error}} while loading line id={{id}} into index {{index|quote}}:\n{{line}}", + num=item.index.status, + error=item.index.error, + line=strings.limit(lines[i * 2 + 1], 300), + index=self.settings.index, + id=item.index._id + ) + else: + Log.error("version not supported {{version}}", version=self.cluster.version) + + if self.debug: + Log.note("{{num}} documents added", num=len(items)) + except Exception, e: + if e.message.startswith("sequence item "): + Log.error("problem with {{data}}", data=repr(lines[int(e.message[14:16].strip())]), cause=e) + Log.error("problem sending to ES", e) + + # RECORDS MUST HAVE id AND json AS A STRING OR + # HAVE id AND value AS AN OBJECT + def add(self, record): + if self.settings.read_only: + Log.error("Index opened in read only mode, no changes allowed") + if isinstance(record, list): + Log.error("add() has changed to only accept one record, no lists") + self.extend([record]) + + # -1 FOR NO REFRESH + def set_refresh_interval(self, seconds): + if seconds <= 0: + interval = -1 + else: + interval = unicode(seconds) + "s" + + if self.cluster.version.startswith("0.90."): + response = self.cluster.put( + "/" + self.settings.index + "/_settings", + data='{"index":{"refresh_interval":' + convert.value2json(interval) + '}}' + ) + + result = convert.json2value(utf82unicode(response.all_content)) + if not result.ok: + Log.error("Can not set refresh interval ({{error}})", { + "error": utf82unicode(response.all_content) + }) + elif any(map(self.cluster.version.startswith, ["1.4.", "1.5.", "1.6.", "1.7."])): + response = self.cluster.put( + "/" + self.settings.index + "/_settings", + data=convert.unicode2utf8('{"index":{"refresh_interval":' + convert.value2json(interval) + '}}') + ) + + result = convert.json2value(utf82unicode(response.all_content)) + if not result.acknowledged: + Log.error("Can not set refresh interval ({{error}})", { + "error": utf82unicode(response.all_content) + }) + else: + Log.error("Do not know how to handle ES version {{version}}", version=self.cluster.version) + + def search(self, query, timeout=None, retry=None): + query = wrap(query) + try: + if self.debug: + if len(query.facets.keys()) > 20: + show_query = query.copy() + show_query.facets = {k: "..." for k in query.facets.keys()} + else: + show_query = query + Log.note("Query:\n{{query|indent}}", query=show_query) + return self.cluster.post( + self.path + "/_search", + data=query, + timeout=coalesce(timeout, self.settings.timeout), + retry=retry + ) + except Exception, e: + Log.error( + "Problem with search (path={{path}}):\n{{query|indent}}", + path=self.path + "/_search", + query=query, + cause=e + ) + + def threaded_queue(self, batch_size=None, max_size=None, period=None, silent=False): + return ThreadedQueue( + "push to elasticsearch: " + self.settings.index, + self, + batch_size=batch_size, + max_size=max_size, + period=period, + silent=silent + ) + + def delete(self): + self.cluster.delete_index(index=self.settings.index) + + +known_clusters = {} + +class Cluster(object): + + @use_settings + def __new__(cls, host, port=9200, settings=None): + if not isinstance(port, int): + Log.error("port must be integer") + cluster = known_clusters.get((host, port)) + if cluster: + return cluster + + cluster = object.__new__(cls) + known_clusters[(host, port)] = cluster + return cluster + + @use_settings + def __init__(self, host, port=9200, explore_metadata=True, settings=None): + """ + settings.explore_metadata == True - IF PROBING THE CLUSTER FOR METADATA IS ALLOWED + settings.timeout == NUMBER OF SECONDS TO WAIT FOR RESPONSE, OR SECONDS TO WAIT FOR DOWNLOAD (PASSED TO requests) + """ + if hasattr(self, "settings"): + return + + self.settings = settings + self.cluster_state = None + self._metadata = None + self.metadata_locker = Lock() + self.debug = settings.debug + self.version = None + self.path = settings.host + ":" + unicode(settings.port) + + self.get_metadata() + + @use_settings + def get_or_create_index( + self, + index, + alias=None, + schema=None, + limit_replicas=None, + read_only=False, + tjson=False, + settings=None + ): + best = self._get_best(settings) + if not best: + output = self.create_index(settings=settings, schema=schema, limit_replicas=limit_replicas) + return output + elif best.alias != None: + settings.alias = best.alias + settings.index = best.index + elif settings.alias == None: + settings.alias = settings.index + settings.index = best.index + + index = settings.index + meta = self.get_metadata(index=index) + columns = parse_properties(index, [], meta.indices[index].mappings.values()[0].properties) + if len(columns)!=0: + settings.tjson = any(c.name.endswith("$value") for c in columns) + + return Index(settings) + + def _get_best(self, settings): + from pyLibrary.queries import qb + aliases = self.get_aliases() + indexes = qb.sort([ + a + for a in aliases + if (a.alias == settings.index and settings.alias == None) or + (re.match(re.escape(settings.index) + r'\d{8}_\d{6}', a.index) and settings.alias == None) or + (a.index == settings.index and (a.alias == None or a.alias == settings.alias)) + ], "index") + return indexes.last() + + @use_settings + def get_index(self, index, type=None, alias=None, read_only=True, settings=None): + """ + TESTS THAT THE INDEX EXISTS BEFORE RETURNING A HANDLE + """ + if read_only: + # GET EXACT MATCH, OR ALIAS + aliases = self.get_aliases() + if index in aliases.index: + return Index(settings) + if index in aliases.alias: + match = [a for a in aliases if a.alias == index][0] + settings.alias = match.alias + settings.index = match.index + return Index(settings) + Log.error("Can not find index {{index_name}}", index_name=settings.index) + else: + # GET BEST MATCH, INCLUDING PROTOTYPE + best = self._get_best(settings) + if not best: + Log.error("Can not find index {{index_name}}", index_name=settings.index) + + if best.alias != None: + settings.alias = best.alias + settings.index = best.index + elif settings.alias == None: + settings.alias = settings.index + settings.index = best.index + return Index(settings) + + def get_alias(self, alias): + """ + RETURN REFERENCE TO ALIAS (MANY INDEXES) + USER MUST BE SURE NOT TO SEND UPDATES + """ + aliases = self.get_aliases() + if alias in aliases.alias: + settings = self.settings.copy() + settings.alias = alias + settings.index = alias + return Index(read_only=True, settings=settings) + Log.error("Can not find any index with alias {{alias_name}}", alias_name= alias) + + def get_prototype(self, alias): + """ + RETURN ALL INDEXES THAT ARE INTENDED TO BE GIVEN alias, BUT HAVE NO + ALIAS YET BECAUSE INCOMPLETE + """ + output = sort([ + a.index + for a in self.get_aliases() + if re.match(re.escape(alias) + "\\d{8}_\\d{6}", a.index) and not a.alias + ]) + return output + + @use_settings + def create_index( + self, + index, + alias=None, + schema=None, + limit_replicas=None, + read_only=False, + tjson=False, + settings=None + ): + if not settings.alias: + settings.alias = settings.index + settings.index = proto_name(settings.alias) + + if settings.alias == settings.index: + Log.error("Expecting index name to conform to pattern") + + if settings.schema_file: + Log.error('schema_file attribute not supported. Use {"$ref":} instead') + + if schema == None: + Log.error("Expecting a schema") + elif isinstance(schema, basestring): + schema = convert.json2value(schema, leaves=True) + else: + schema = convert.json2value(convert.value2json(schema), leaves=True) + + if limit_replicas: + # DO NOT ASK FOR TOO MANY REPLICAS + health = self.get("/_cluster/health") + if schema.settings.index.number_of_replicas >= health.number_of_nodes: + Log.warning("Reduced number of replicas: {{from}} requested, {{to}} realized", + {"from": schema.settings.index.number_of_replicas}, + to= health.number_of_nodes - 1 + ) + schema.settings.index.number_of_replicas = health.number_of_nodes - 1 + + self.post( + "/" + settings.index, + data=schema, + headers={"Content-Type": "application/json"} + ) + while True: + time.sleep(1) + try: + self.head("/" + settings.index) + break + except Exception: + Log.note("{{index}} does not exist yet", index=settings.index) + + es = Index(settings=settings) + return es + + def delete_index(self, index=None): + self.delete("/" + index) + + def get_aliases(self): + """ + RETURN LIST OF {"alias":a, "index":i} PAIRS + ALL INDEXES INCLUDED, EVEN IF NO ALIAS {"alias":Null} + """ + data = self.get("/_cluster/state") + output = [] + for index, desc in data.metadata.indices.items(): + if not desc["aliases"]: + output.append({"index": index, "alias": None}) + else: + for a in desc["aliases"]: + output.append({"index": index, "alias": a}) + return wrap(output) + + def get_metadata(self, index=None, force=False): + with self.metadata_locker: + if self.settings.explore_metadata: + if not self._metadata or (force and index is None): + response = self.get("/_cluster/state") + self._metadata = wrap(response.metadata) + self.cluster_state = wrap(self.get("/")) + self.version = self.cluster_state.version.number + elif index: # UPDATE THE MAPPING FOR ONE INDEX ONLY + response = self.get("/"+index+"/_mapping") + if self.version.startswith("0.90."): + best = qb.sort(response.items(), 0).last() + self._metadata.indices[index].mappings = best[1] + else: + self._metadata.indices[index].mappings = qb.sort(response.items(), 0).last()[1].mappings + return Dict(indices={index: self._metadata.indices[index]}) + else: + Log.error("Metadata exploration has been disabled") + return self._metadata + + def post(self, path, **kwargs): + url = self.settings.host + ":" + unicode(self.settings.port) + path + + try: + wrap(kwargs).headers["Accept-Encoding"] = "gzip,deflate" + + data = kwargs.get(b'data') + if data == None: + pass + elif isinstance(data, Mapping): + kwargs[b'data'] = data =convert.unicode2utf8(convert.value2json(data)) + elif not isinstance(kwargs["data"], str): + Log.error("data must be utf8 encoded string") + + if self.debug: + sample = kwargs.get(b'data', "")[:300] + Log.note("{{url}}:\n{{data|indent}}", url=url, data=sample) + + response = http.post(url, **kwargs) + if response.status_code not in [200, 201]: + Log.error(response.reason + ": " + response.content) + if self.debug: + Log.note("response: {{response}}", response=utf82unicode(response.content)[:130]) + details = convert.json2value(utf82unicode(response.content)) + if details.error: + Log.error(convert.quote2string(details.error)) + if details._shards.failed > 0: + Log.error("Shard failures {{failures|indent}}", + failures="---\n".join(r.replace(";", ";\n") for r in details._shards.failures.reason) + ) + return details + except Exception, e: + if url[0:4] != "http": + suggestion = " (did you forget \"http://\" prefix on the host name?)" + else: + suggestion = "" + + if kwargs.get("data"): + Log.error( + "Problem with call to {{url}}" + suggestion + "\n{{body|left(10000)}}", + url=url, + body=kwargs["data"][0:10000] if self.debug else kwargs["data"][0:100], + cause=e + ) + else: + Log.error("Problem with call to {{url}}" + suggestion, url=url, cause=e) + + + + def get(self, path, **kwargs): + url = self.settings.host + ":" + unicode(self.settings.port) + path + try: + response = http.get(url, **kwargs) + if response.status_code not in [200]: + Log.error(response.reason+": "+response.all_content) + if self.debug: + Log.note("response: {{response}}", response=utf82unicode(response.all_content)[:130]) + details = wrap(convert.json2value(utf82unicode(response.all_content))) + if details.error: + Log.error(details.error) + return details + except Exception, e: + Log.error("Problem with call to {{url}}", url=url, cause=e) + + def head(self, path, **kwargs): + url = self.settings.host + ":" + unicode(self.settings.port) + path + try: + response = http.head(url, **kwargs) + if response.status_code not in [200]: + Log.error(response.reason+": "+response.all_content) + if self.debug: + Log.note("response: {{response}}", response= utf82unicode(response.all_content)[:130]) + if response.all_content: + details = wrap(convert.json2value(utf82unicode(response.all_content))) + if details.error: + Log.error(details.error) + return details + else: + return None # WE DO NOT EXPECT content WITH HEAD REQUEST + except Exception, e: + Log.error("Problem with call to {{url}}", url= url, cause=e) + + def put(self, path, **kwargs): + url = self.settings.host + ":" + unicode(self.settings.port) + path + + if self.debug: + sample = kwargs["data"][:300] + Log.note("PUT {{url}}:\n{{data|indent}}", url= url, data= sample) + try: + response = http.put(url, **kwargs) + if response.status_code not in [200]: + Log.error(response.reason+": "+response.all_content) + if self.debug: + Log.note("response: {{response}}", response= utf82unicode(response.all_content)[0:300:]) + return response + except Exception, e: + Log.error("Problem with call to {{url}}", url= url, cause=e) + + def delete(self, path, **kwargs): + url = self.settings.host + ":" + unicode(self.settings.port) + path + try: + response = convert.json2value(utf82unicode(http.delete(url, **kwargs).content)) + if self.debug: + Log.note("delete response {{response}}", response= response) + return response + except Exception, e: + Log.error("Problem with call to {{url}}", url= url, cause=e) + + +def proto_name(prefix, timestamp=None): + if not timestamp: + timestamp = datetime.utcnow() + return prefix + convert.datetime2string(timestamp, "%Y%m%d_%H%M%S") + + +def sort(values): + return wrap(sorted(values)) + + +def scrub(r): + """ + REMOVE KEYS OF DEGENERATE VALUES (EMPTY STRINGS, EMPTY LISTS, AND NULLS) + CONVERT STRINGS OF NUMBERS TO NUMBERS + RETURNS **COPY**, DOES NOT CHANGE ORIGINAL + """ + return wrap(_scrub(r)) + + +def _scrub(r): + try: + if r == None: + return None + elif isinstance(r, basestring): + if r == "": + return None + return r + elif Math.is_number(r): + return convert.value2number(r) + elif isinstance(r, Mapping): + if isinstance(r, Dict): + r = object.__getattribute__(r, "_dict") + output = {} + for k, v in r.items(): + v = _scrub(v) + if v != None: + output[k.lower()] = v + if len(output) == 0: + return None + return output + elif hasattr(r, '__iter__'): + if isinstance(r, DictList): + r = r.list + output = [] + for v in r: + v = _scrub(v) + if v != None: + output.append(v) + if not output: + return None + if len(output) == 1: + return output[0] + try: + return sort(output) + except Exception: + return output + else: + return r + except Exception, e: + Log.warning("Can not scrub: {{json}}", json= r) + + + +class Alias(Features): + @use_settings + def __init__( + self, + alias, # NAME OF THE ALIAS + type=None, # SCHEMA NAME, WILL HUNT FOR ONE IF None + explore_metadata=True, # IF PROBING THE CLUSTER FOR METADATA IS ALLOWED + debug=False, + timeout=None, # NUMBER OF SECONDS TO WAIT FOR RESPONSE, OR SECONDS TO WAIT FOR DOWNLOAD (PASSED TO requests) + settings=None + ): + self.debug = debug + if self.debug: + Log.alert("Elasticsearch debugging on {{index|quote}} is on", index= settings.index) + + self.settings = settings + self.cluster = Cluster(settings) + + if type == None: + if not explore_metadata: + Log.error("Alias() was given no `type` (aka schema) and not allowed to explore metadata. Do not know what to do now.") + + indices = self.cluster.get_metadata().indices + if not self.settings.alias or self.settings.alias==self.settings.index: + alias_list = self.cluster.get("/_alias/"+self.settings.index) + candidates = [(name, i) for name, i in alias_list.items() if self.settings.index in i.aliases.keys()] + full_name = qb.sort(candidates, 0).last()[0] + index = self.cluster.get("/" + full_name + "/_mapping")[full_name] + else: + index = self.cluster.get("/"+self.settings.index+"/_mapping")[self.settings.index] + + # FIND MAPPING WITH MOST PROPERTIES (AND ASSUME THAT IS THE CANONICAL TYPE) + max_prop = -1 + for _type, mapping in index.mappings.items(): + if _type == "_default_": + continue + num_prop = len(mapping.properties.keys()) + if max_prop < num_prop: + max_prop = num_prop + self.settings.type = _type + type = _type + + if type == None: + Log.error("Can not find schema type for index {{index}}", index=coalesce(self.settings.alias, self.settings.index)) + + self.path = "/" + alias + "/" + type + + @property + def url(self): + return self.cluster.path.rstrip("/") + "/" + self.path.lstrip("/") + + def get_schema(self, retry=True): + if self.settings.explore_metadata: + indices = self.cluster.get_metadata().indices + if not self.settings.alias or self.settings.alias==self.settings.index: + #PARTIALLY DEFINED settings + candidates = [(name, i) for name, i in indices.items() if self.settings.index in i.aliases] + # TODO: MERGE THE mappings OF ALL candidates, DO NOT JUST PICK THE LAST ONE + + index = "dummy value" + schema = wrap({"_routing": {}, "properties": {}}) + for _, ind in qb.sort(candidates, {"value": 0, "sort": -1}): + mapping = ind.mappings[self.settings.type] + set_default(schema._routing, mapping._routing) + schema.properties = _merge_mapping(schema.properties, mapping.properties) + else: + #FULLY DEFINED settings + index = indices[self.settings.index] + schema = index.mappings[self.settings.type] + + if index == None and retry: + #TRY AGAIN, JUST IN CASE + self.cluster.cluster_state = None + return self.get_schema(retry=False) + + #TODO: REMOVE THIS BUG CORRECTION + if not schema and self.settings.type == "test_result": + schema = index.mappings["test_results"] + # DONE BUG CORRECTION + + if not schema: + Log.error( + "ElasticSearch index ({{index}}) does not have type ({{type}})", + index=self.settings.index, + type=self.settings.type + ) + return schema + else: + mapping = self.cluster.get(self.path + "/_mapping") + if not mapping[self.settings.type]: + Log.error("{{index}} does not have type {{type}}", self.settings) + return wrap({"mappings": mapping[self.settings.type]}) + + def delete(self, filter): + self.cluster.get_metadata() + + if self.cluster.cluster_state.version.number.startswith("0.90"): + query = {"filtered": { + "query": {"match_all": {}}, + "filter": filter + }} + elif self.cluster.cluster_state.version.number.startswith("1."): + query = {"query": {"filtered": { + "query": {"match_all": {}}, + "filter": filter + }}} + else: + raise NotImplementedError + + if self.debug: + Log.note("Delete bugs:\n{{query}}", query= query) + + keep_trying = True + while keep_trying: + result = self.cluster.delete( + self.path + "/_query", + data=convert.value2json(query), + timeout=60 + ) + keep_trying = False + for name, status in result._indices.items(): + if status._shards.failed > 0: + if status._shards.failures[0].reason.find("rejected execution (queue capacity ") >= 0: + keep_trying = True + Thread.sleep(seconds=5) + break + + if not keep_trying: + for name, status in result._indices.items(): + if status._shards.failed > 0: + Log.error( + "ES shard(s) report Failure to delete from {{index}}: {{message}}. Query was {{query}}", + index=name, + query=query, + message=status._shards.failures[0].reason + ) + + def search(self, query, timeout=None): + query = wrap(query) + try: + if self.debug: + if len(query.facets.keys()) > 20: + show_query = query.copy() + show_query.facets = {k: "..." for k in query.facets.keys()} + else: + show_query = query + Log.note("Query {{path}}\n{{query|indent}}", path=self.path + "/_search", query=show_query) + return self.cluster.post( + self.path + "/_search", + data=query, + timeout=coalesce(timeout, self.settings.timeout) + ) + except Exception, e: + Log.error( + "Problem with search (path={{path}}):\n{{query|indent}}", + path=self.path + "/_search", + query=query, + cause=e + ) + + +def parse_properties(parent_index_name, parent_query_path, esProperties): + """ + RETURN THE COLUMN DEFINITIONS IN THE GIVEN esProperties OBJECT + """ + from pyLibrary.queries.meta import Column + + columns = DictList() + for name, property in esProperties.items(): + if parent_query_path: + index_name, query_path = parent_index_name, join_field(split_field(parent_query_path) + [name]) + else: + index_name, query_path = parent_index_name, name + + if property.type == "nested" and property.properties: + # NESTED TYPE IS A NEW TYPE DEFINITION + # MARKUP CHILD COLUMNS WITH THE EXTRA DEPTH + self_columns = parse_properties(index_name, query_path, property.properties) + for c in self_columns: + c.nested_path = unwraplist([query_path] + listwrap(c.nested_path)) + columns.extend(self_columns) + columns.append(Column( + table=index_name, + name=query_path, + abs_name=query_path, + type="nested", + nested_path=query_path + )) + + continue + + if property.properties: + child_columns = parse_properties(index_name, query_path, property.properties) + columns.extend(child_columns) + columns.append(Column( + table=index_name, + name=query_path, + abs_name=query_path, + type="source" if property.enabled == False else "object" + )) + + if property.dynamic: + continue + if not property.type: + continue + if property.type == "multi_field": + property.type = property.fields[name].type # PULL DEFAULT TYPE + for i, (n, p) in enumerate(property.fields.items()): + if n == name: + # DEFAULT + columns.append(Column( + table=index_name, + name=query_path, + abs_name=query_path, + type=p.type + )) + else: + columns.append(Column( + table=index_name, + name=query_path + "\\." + n, + abs_name=query_path + "\\." + n, + type=p.type + )) + continue + + if property.type in ["string", "boolean", "integer", "date", "long", "double"]: + columns.append(Column( + table=index_name, + name=query_path, + abs_name=query_path, + type=property.type + )) + if property.index_name and name != property.index_name: + columns.append(Column( + table=index_name, + abs_name=query_path, + name=query_path, + type=property.type + )) + elif property.enabled == None or property.enabled == False: + columns.append(Column( + table=index_name, + name=query_path, + abs_name=query_path, + type="source" if property.enabled==False else "object" + )) + else: + Log.warning("unknown type {{type}} for property {{path}}", type=property.type, path=query_path) + + return columns + + +def random_id(): + return Random.hex(40) + +def _merge_mapping(a, b): + """ + MERGE TWO MAPPINGS, a TAKES PRECEDENCE + """ + for name, b_details in b.items(): + a_details = a[literal_field(name)] + if a_details.properties and not a_details.type: + a_details.type = "object" + if b_details.properties and not b_details.type: + b_details.type = "object" + + if a_details: + a_details.type = _merge_type[a_details.type][b_details.type] + + if b_details.type in ["object", "nested"]: + _merge_mapping(a_details.properties, b_details.properties) + else: + a[literal_field(name)] = deepcopy(b_details) + + return a + +_merge_type = { + "boolean": { + "boolean": "boolean", + "integer": "integer", + "long": "long", + "float": "float", + "double": "double", + "string": "string", + "object": None, + "nested": None + }, + "integer": { + "boolean": "integer", + "integer": "integer", + "long": "long", + "float": "float", + "double": "double", + "string": "string", + "object": None, + "nested": None + }, + "long": { + "boolean": "long", + "integer": "long", + "long": "long", + "float": "double", + "double": "double", + "string": "string", + "object": None, + "nested": None + }, + "float": { + "boolean": "float", + "integer": "float", + "long": "double", + "float": "float", + "double": "double", + "string": "string", + "object": None, + "nested": None + }, + "double": { + "boolean": "double", + "integer": "double", + "long": "double", + "float": "double", + "double": "double", + "string": "string", + "object": None, + "nested": None + }, + "string": { + "boolean": "string", + "integer": "string", + "long": "string", + "float": "string", + "double": "string", + "string": "string", + "object": None, + "nested": None + }, + "object": { + "boolean": None, + "integer": None, + "long": None, + "float": None, + "double": None, + "string": None, + "object": "object", + "nested": "nested" + }, + "nested": { + "boolean": None, + "integer": None, + "long": None, + "float": None, + "double": None, + "string": None, + "object": "nested", + "nested": "nested" + } +} + diff --git a/pyLibrary/env/emailer.py b/pyLibrary/env/emailer.py new file mode 100644 index 0000000..f901872 --- /dev/null +++ b/pyLibrary/env/emailer.py @@ -0,0 +1,137 @@ +# encoding: utf-8 +# +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import + +import smtplib +import sys + +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from pyLibrary.debugs.logs import Log +from pyLibrary.dot import listwrap +from pyLibrary.dot import coalesce +from pyLibrary.meta import use_settings + + +class Emailer: + @use_settings + def __init__( + self, + from_address, + to_address, + host, + username, + password, + subject="catchy title", + port=465, + use_ssl=1, + settings=None + ): + self.settings = settings + self.server = None + + def __enter__(self): + if self.server is not None: + Log.error("Got a problem") + + if self.settings.use_ssl: + self.server = smtplib.SMTP_SSL(self.settings.host, self.settings.port) + else: + self.server = smtplib.SMTP(self.settings.host, self.settings.port) + + if self.settings.username and self.settings.password: + self.server.login(self.settings.username, self.settings.password) + + return self + + def __exit__(self, type, value, traceback): + try: + self.server.quit() + except Exception, e: + Log.warning("Problem with smtp server quit(), ignoring problem", e) + + self.server = None + + def send_email(self, + from_address=None, + to_address=None, + subject=None, + text_data=None, + html_data=None + ): + """Sends an email. + + from_addr is an email address; to_addrs is a list of email adresses. + Addresses can be plain (e.g. "jsmith@example.com") or with real names + (e.g. "John Smith "). + + text_data and html_data are both strings. You can specify one or both. + If you specify both, the email will be sent as a MIME multipart + alternative, i.e., the recipient will see the HTML content if his + viewer supports it; otherwise he'll see the text content. + """ + + settings = self.settings + + from_address = coalesce(from_address, settings["from"], settings.from_address) + to_address = listwrap(coalesce(to_address, settings.to_address, settings.to_addrs)) + + if not from_address or not to_address: + raise Exception("Both from_addr and to_addrs must be specified") + if not text_data and not html_data: + raise Exception("Must specify either text_data or html_data") + + if not html_data: + msg = MIMEText(text_data) + elif not text_data: + msg = MIMEText(html_data, 'html') + else: + msg = MIMEMultipart('alternative') + msg.attach(MIMEText(text_data, 'plain')) + msg.attach(MIMEText(html_data, 'html')) + + msg['Subject'] = coalesce(subject, settings.subject) + msg['From'] = from_address + msg['To'] = ', '.join(to_address) + + if self.server: + # CALL AS PART OF A SMTP SESSION + self.server.sendmail(from_address, to_address, msg.as_string()) + else: + # CALL AS STAND-ALONE + with self: + self.server.sendmail(from_address, to_address, msg.as_string()) + + + +if sys.hexversion < 0x020603f0: + # versions earlier than 2.6.3 have a bug in smtplib when sending over SSL: + # http://bugs.python.org/issue4066 + + # Unfortunately the stock version of Python in Snow Leopard is 2.6.1, so + # we patch it here to avoid having to install an updated Python version. + import socket + import ssl + + def _get_socket_fixed(self, host, port, timeout): + if self.debuglevel > 0: + print>> sys.stderr, 'connect:', (host, port) + new_socket = socket.create_connection((host, port), timeout) + new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile) + self.file = smtplib.SSLFakeFile(new_socket) + return new_socket + + smtplib.SMTP_SSL._get_socket = _get_socket_fixed + + diff --git a/pyLibrary/env/files.py b/pyLibrary/env/files.py new file mode 100644 index 0000000..3305b16 --- /dev/null +++ b/pyLibrary/env/files.py @@ -0,0 +1,331 @@ +# encoding: utf-8 +# +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + + +from datetime import datetime +import io +import os +import shutil + +from pyLibrary.strings import utf82unicode +from pyLibrary.maths import crypto +from pyLibrary.dot import coalesce, set_default, split_field, join_field +from pyLibrary.dot import listwrap, wrap +from pyLibrary import convert + + +class File(object): + """ + ASSUMES ALL FILE CONTENT IS UTF8 ENCODED STRINGS + """ + + def __init__(self, filename, buffering=2 ** 14, suffix=None): + """ + YOU MAY SET filename TO {"path":p, "key":k} FOR CRYPTO FILES + """ + if filename == None: + from pyLibrary.debugs.logs import Log + + Log.error("File must be given a filename") + elif isinstance(filename, basestring): + self.key = None + self._filename = "/".join(filename.split(os.sep)) # USE UNIX STANDARD + else: + self.key = convert.base642bytearray(filename.key) + self._filename = "/".join(filename.path.split(os.sep)) # USE UNIX STANDARD + + while self._filename.find(".../") >= 0: + # LET ... REFER TO GRANDPARENT, .... REFER TO GREAT-GRAND-PARENT, etc... + self._filename = self._filename.replace(".../", "../../") + self.buffering = buffering + + + if suffix: + self._filename = File.add_suffix(self._filename, suffix) + + @classmethod + def new_instance(cls, *path): + def scrub(i, p): + if isinstance(p, File): + p = p.abspath + p = p.replace(os.sep, "/") + if p[-1] == '/': + p = p[:-1] + if i > 0 and p[0] == '/': + p = p[1:] + return p + + return File('/'.join(scrub(i, p) for i, p in enumerate(path))) + + + @property + def filename(self): + return self._filename.replace("/", os.sep) + + @property + def abspath(self): + if self._filename.startswith("~"): + home_path = os.path.expanduser("~") + if os.sep == "\\": + home_path = home_path.replace(os.sep, "/") + if home_path.endswith("/"): + home_path = home_path[:-1] + + return home_path + self._filename[1::] + else: + if os.sep == "\\": + return os.path.abspath(self._filename).replace(os.sep, "/") + else: + return os.path.abspath(self._filename) + + @staticmethod + def add_suffix(filename, suffix): + """ + ADD suffix TO THE filename (NOT INCLUDING THE FILE EXTENSION) + """ + path = filename.split("/") + parts = path[-1].split(".") + i = max(len(parts) - 2, 0) + parts[i] = parts[i] + suffix + path[-1] = ".".join(parts) + return "/".join(path) + + @property + def extension(self): + parts = self._filename.split("/")[-1].split(".") + if len(parts) == 1: + return "" + else: + return parts[-1] + + @property + def name(self): + parts = self._filename.split("/")[-1].split(".") + if len(parts) == 1: + return parts[0] + else: + return ".".join(parts[0:-1]) + + def set_extension(self, ext): + """ + RETURN NEW FILE WITH GIVEN EXTENSION + """ + path = self._filename.split("/") + parts = path[-1].split(".") + if len(parts) == 1: + parts.append(ext) + else: + parts[-1] = ext + + path[-1] = ".".join(parts) + return File("/".join(path)) + + def set_name(self, name): + """ + RETURN NEW FILE WITH GIVEN EXTENSION + """ + path = self._filename.split("/") + parts = path[-1].split(".") + if len(parts) == 1: + path[-1] = name + else: + path[-1] = name + "." + parts[-1] + return File("/".join(path)) + + def backup_name(self, timestamp=None): + """ + RETURN A FILENAME THAT CAN SERVE AS A BACKUP FOR THIS FILE + """ + suffix = convert.datetime2string(coalesce(timestamp, datetime.now()), "%Y%m%d_%H%M%S") + return File.add_suffix(self._filename, suffix) + + def read(self, encoding="utf8"): + with open(self._filename, "rb") as f: + content = f.read().decode(encoding) + if self.key: + return crypto.decrypt(content, self.key) + else: + return content + + def read_json(self, encoding="utf8"): + from pyLibrary.jsons import ref + + content = self.read(encoding=encoding) + value = convert.json2value(content, flexible=True, leaves=True) + abspath = self.abspath + if os.sep == "\\": + abspath = "/" + abspath.replace(os.sep, "/") + return ref.expand(value, "file://" + abspath) + + def is_directory(self): + return os.path.isdir(self._filename) + + def read_bytes(self): + try: + if not self.parent.exists: + self.parent.create() + with open(self._filename, "rb") as f: + return f.read() + except Exception, e: + from pyLibrary.debugs.logs import Log + + Log.error("Problem reading file {{filename}}", self.abspath) + + def write_bytes(self, content): + if not self.parent.exists: + self.parent.create() + with open(self._filename, "wb") as f: + f.write(content) + + def write(self, data): + if not self.parent.exists: + self.parent.create() + with open(self._filename, "wb") as f: + if isinstance(data, list) and self.key: + from pyLibrary.debugs.logs import Log + + Log.error("list of data and keys are not supported, encrypt before sending to file") + + if isinstance(data, list): + pass + elif isinstance(data, basestring): + data=[data] + elif hasattr(data, "__iter__"): + pass + + for d in data: + if not isinstance(d, unicode): + from pyLibrary.debugs.logs import Log + + Log.error("Expecting unicode data only") + if self.key: + f.write(crypto.encrypt(d, self.key).encode("utf8")) + else: + f.write(d.encode("utf8")) + + def __iter__(self): + # NOT SURE HOW TO MAXIMIZE FILE READ SPEED + # http://stackoverflow.com/questions/8009882/how-to-read-large-file-line-by-line-in-python + # http://effbot.org/zone/wide-finder.htm + def output(): + try: + path = self._filename + if path.startswith("~"): + home_path = os.path.expanduser("~") + path = home_path + path[1::] + + with io.open(path, "rb") as f: + for line in f: + yield utf82unicode(line) + except Exception, e: + from pyLibrary.debugs.logs import Log + + Log.error("Can not read line from {{filename}}", filename= self._filename, cause=e) + + return output() + + def append(self, content): + """ + add a line to file + """ + if not self.parent.exists: + self.parent.create() + with open(self._filename, "ab") as output_file: + if isinstance(content, str): + from pyLibrary.debugs.logs import Log + + Log.error("expecting to write unicode only") + output_file.write(content.encode("utf-8")) + output_file.write(b"\n") + + def add(self, content): + return self.append(content) + + def extend(self, content): + try: + if not self.parent.exists: + self.parent.create() + with open(self._filename, "ab") as output_file: + for c in content: + if isinstance(c, str): + from pyLibrary.debugs.logs import Log + + Log.error("expecting to write unicode only") + + output_file.write(c.encode("utf-8")) + output_file.write(b"\n") + except Exception, e: + from pyLibrary.debugs.logs import Log + + Log.error("Could not write to file", e) + + def delete(self): + try: + if os.path.isdir(self._filename): + shutil.rmtree(self._filename) + elif os.path.isfile(self._filename): + os.remove(self._filename) + return self + except Exception, e: + if e.strerror == "The system cannot find the path specified": + return + from pyLibrary.debugs.logs import Log + + Log.error("Could not remove file", e) + + def backup(self): + names = self._filename.split("/")[-1].split(".") + if len(names) == 1: + backup = File(self._filename + ".backup " + datetime.utcnow().strftime("%Y%m%d %H%i%s")) + + + def create(self): + try: + os.makedirs(self._filename) + except Exception, e: + from pyLibrary.debugs.logs import Log + + Log.error("Could not make directory {{dir_name}}", dir_name= self._filename, cause=e) + + @property + def children(self): + return [File(self._filename + "/" + c) for c in os.listdir(self.filename)] + + @property + def parent(self): + return File("/".join(self._filename.split("/")[:-1])) + + @property + def exists(self): + if self._filename in ["", "."]: + return True + try: + return os.path.exists(self._filename) + except Exception, e: + return False + + def __bool__(self): + return self.__nonzero__() + + + def __nonzero__(self): + """ + USED FOR FILE EXISTENCE TESTING + """ + if self._filename in ["", "."]: + return True + try: + return os.path.exists(self._filename) + except Exception, e: + return False + + @classmethod + def copy(cls, from_, to_): + File.new_instance(to_).write_bytes(File.new_instance(from_).read_bytes()) diff --git a/pyLibrary/env/git.py b/pyLibrary/env/git.py new file mode 100644 index 0000000..55a8adc --- /dev/null +++ b/pyLibrary/env/git.py @@ -0,0 +1,60 @@ +# encoding: utf-8 +# +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import + +from pyLibrary.meta import cache +from pyLibrary.thread.multiprocess import Process + + +@cache +def get_git_revision(): + """ + GET THE CURRENT GIT REVISION + """ + proc = Process("git log", ["git", "log", "-1"]) + + try: + while True: + line = proc.stdout.pop().strip() + if not line: + continue + if line.startswith("commit "): + return line[7:] + finally: + try: + proc.join() + except Exception: + pass + +@cache +def get_remote_revision(url, branch): + """ + GET REVISION OF A REMOTE BRANCH + """ + + proc = Process("git remote revision", ["git", "ls-remote", url, "refs/heads/" + branch]) + + try: + while True: + line = proc.stdout.pop().strip() + if not line: + continue + return line.split("\t")[0] + finally: + try: + proc.join() + except Exception: + pass + + return None + diff --git a/pyLibrary/env/http.py b/pyLibrary/env/http.py new file mode 100644 index 0000000..51fa005 --- /dev/null +++ b/pyLibrary/env/http.py @@ -0,0 +1,264 @@ +# encoding: utf-8 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + +# MIMICS THE requests API (http://docs.python-requests.org/en/latest/) +# DEMANDS data IS A JSON-SERIALIZABLE STRUCTURE +# WITH ADDED default_headers THAT CAN BE SET USING pyLibrary.debugs.settings +# EG +# {"debug.constants":{ +# "pyLibrary.env.http.default_headers={ +# "From":"klahnakoski@mozilla.com" +# } +# }} + + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from copy import copy +from numbers import Number + +from requests import sessions, Response + +from pyLibrary import convert +from pyLibrary.debugs.exceptions import Except +from pyLibrary.debugs.logs import Log +from pyLibrary.dot import Dict, coalesce, wrap, set_default +from pyLibrary.env.big_data import safe_size, CompressedLines, ZipfileLines, GzipLines +from pyLibrary.maths import Math +from pyLibrary.queries import qb +from pyLibrary.thread.threads import Thread +from pyLibrary.times.durations import SECOND + + +FILE_SIZE_LIMIT = 100 * 1024 * 1024 +MIN_READ_SIZE = 8 * 1024 +ZIP_REQUEST = False +default_headers = Dict() # TODO: MAKE THIS VARIABLE A SPECIAL TYPE OF EXPECTED MODULE PARAMETER SO IT COMPLAINS IF NOT SET +default_timeout = 600 + +_warning_sent = False + + +def request(method, url, zip=None, retry=None, **kwargs): + """ + JUST LIKE requests.request() BUT WITH DEFAULT HEADERS AND FIXES + DEMANDS data IS ONE OF: + * A JSON-SERIALIZABLE STRUCTURE, OR + * LIST OF JSON-SERIALIZABLE STRUCTURES, OR + * None + + Parameters + * zip - ZIP THE REQUEST BODY, IF BIG ENOUGH + * json - JSON-SERIALIZABLE STRUCTURE + * retry - {"times": x, "sleep": y} STRUCTURE + + THE BYTE_STRINGS (b"") ARE NECESSARY TO PREVENT httplib.py FROM **FREAKING OUT** + IT APPEARS requests AND httplib.py SIMPLY CONCATENATE STRINGS BLINDLY, WHICH + INCLUDES url AND headers + """ + global _warning_sent + if not default_headers and not _warning_sent: + _warning_sent = True + Log.warning( + "The pyLibrary.env.http module was meant to add extra " + "default headers to all requests, specifically the 'Referer' " + "header with a URL to the project. Use the `pyLibrary.debug.constants.set()` " + "function to set `pyLibrary.env.http.default_headers`" + ) + + if isinstance(url, list): + # TRY MANY URLS + failures = [] + for remaining, u in qb.countdown(url): + try: + response = request(method, u, zip=zip, retry=retry, **kwargs) + if Math.round(response.status_code, decimal=-2) not in [400, 500]: + return response + if not remaining: + return response + except Exception, e: + e = Except.wrap(e) + failures.append(e) + Log.error("Tried {{num}} urls", num=len(url), cause=failures) + + session = sessions.Session() + session.headers.update(default_headers) + + if zip is None: + zip = ZIP_REQUEST + + if isinstance(url, unicode): + # httplib.py WILL **FREAK OUT** IF IT SEES ANY UNICODE + url = url.encode("ascii") + + _to_ascii_dict(kwargs) + timeout = kwargs[b'timeout'] = coalesce(kwargs.get(b'timeout'), default_timeout) + + if retry is None: + retry = Dict(times=1, sleep=0) + elif isinstance(retry, Number): + retry = Dict(times=retry, sleep=SECOND) + else: + retry = wrap(retry) + set_default(retry.sleep, {"times": 1, "sleep": 0}) + + if b'json' in kwargs: + kwargs[b'data'] = convert.value2json(kwargs[b'json']).encode("utf8") + del kwargs[b'json'] + + try: + if zip and len(coalesce(kwargs.get(b"data"))) > 1000: + compressed = convert.bytes2zip(kwargs[b"data"]) + if b"headers" not in kwargs: + kwargs[b"headers"] = {} + kwargs[b"headers"][b'content-encoding'] = b'gzip' + kwargs[b"data"] = compressed + + _to_ascii_dict(kwargs[b"headers"]) + else: + _to_ascii_dict(kwargs.get(b"headers")) + except Exception, e: + Log.error("Request setup failure on {{url}}", url=url, cause=e) + + errors = [] + for r in range(retry.times): + if r: + Thread.sleep(retry.sleep) + + try: + return session.request(method=method, url=url, **kwargs) + except Exception, e: + errors.append(Except.wrap(e)) + + if " Read timed out." in errors[0]: + Log.error("Tried {{times}} times: Timeout failure (timeout was {{timeout}}", timeout=timeout, times=retry.times, cause=errors[0]) + else: + Log.error("Tried {{times}} times: Request failure of {{url}}", url=url, times=retry.times, cause=errors[0]) + + +def _to_ascii_dict(headers): + if headers is None: + return + for k, v in copy(headers).items(): + if isinstance(k, unicode): + del headers[k] + if isinstance(v, unicode): + headers[k.encode("ascii")] = v.encode("ascii") + else: + headers[k.encode("ascii")] = v + elif isinstance(v, unicode): + headers[k] = v.encode("ascii") + + +def get(url, **kwargs): + kwargs.setdefault(b'allow_redirects', True) + kwargs[b"stream"] = True + return HttpResponse(request(b'get', url, **kwargs)) + + +def get_json(url, **kwargs): + """ + ASSUME RESPONSE IN IN JSON + """ + response = get(url, **kwargs) + c = response.all_content + return convert.json2value(convert.utf82unicode(c)) + +def options(url, **kwargs): + kwargs.setdefault(b'allow_redirects', True) + kwargs[b"stream"] = True + return HttpResponse(request(b'options', url, **kwargs)) + + +def head(url, **kwargs): + kwargs.setdefault(b'allow_redirects', False) + kwargs[b"stream"] = True + return HttpResponse(request(b'head', url, **kwargs)) + + +def post(url, **kwargs): + kwargs[b"stream"] = True + return HttpResponse(request(b'post', url, **kwargs)) + + +def post_json(url, **kwargs): + """ + ASSUME RESPONSE IN IN JSON + """ + kwargs["data"] = convert.unicode2utf8(convert.value2json(kwargs["data"])) + + response = post(url, **kwargs) + c=response.content + return convert.json2value(convert.utf82unicode(c)) + + +def put(url, **kwargs): + return HttpResponse(request(b'put', url, **kwargs)) + + +def patch(url, **kwargs): + kwargs[b"stream"] = True + return HttpResponse(request(b'patch', url, **kwargs)) + + +def delete(url, **kwargs): + kwargs[b"stream"] = True + return HttpResponse(request(b'delete', url, **kwargs)) + + +class HttpResponse(Response): + def __new__(cls, resp): + resp.__class__ = HttpResponse + return resp + + def __init__(self, resp): + pass + self._cached_content = None + + @property + def all_content(self): + # response.content WILL LEAK MEMORY (?BECAUSE OF PYPY"S POOR HANDLING OF GENERATORS?) + # THE TIGHT, SIMPLE, LOOP TO FILL blocks PREVENTS THAT LEAK + if self._content is not False: + self._cached_content = self._content + elif self._cached_content is None: + def read(size): + if self.raw._fp.fp is not None: + return self.raw.read(amt=size, decode_content=True) + else: + self.close() + return None + + self._cached_content = safe_size(Dict(read=read)) + + if hasattr(self._cached_content, "read"): + self._cached_content.seek(0) + + return self._cached_content + + @property + def all_lines(self): + return self._all_lines() + + def _all_lines(self, encoding="utf8"): + try: + content = self.raw.read(decode_content=False) + if self.headers.get('content-encoding') == 'gzip': + return CompressedLines(content, encoding=encoding) + elif self.headers.get('content-type') == 'application/zip': + return ZipfileLines(content, encoding=encoding) + elif self.url.endswith(".gz"): + return GzipLines(content, encoding) + else: + return content.decode(encoding).split("\n") + except Exception, e: + Log.error("Can not read content", cause=e) + finally: + self.close() diff --git a/pyLibrary/env/mozlog.py b/pyLibrary/env/mozlog.py new file mode 100644 index 0000000..d21c5a7 --- /dev/null +++ b/pyLibrary/env/mozlog.py @@ -0,0 +1,53 @@ +from pyLibrary.debugs.logs import Log +from pyLibrary.strings import expand_template + +_using_mozlog = False + +def use(): + if _using_mozlog: + return + + globals()["_using_mozlog"] = True + try: + from mozlog.structured import structuredlog + + global logger + logger = structuredlog.get_default_logger() + ToMozLog.logger = logger + ToMozLog.old_class = Log + globals()["Log"] = ToMozLog + except: + pass + + +class ToMozLog(object): + """ + MAP CALLS pyLibrary.debugs.logs.Log TO mozlog.structured.structuredlog.StructuredLogger + """ + logger = None + old_class = None + + @classmethod + def debug(cls, template=None, params=None): + cls.logger.debug(expand_template(template, params)) + + @classmethod + def println(cls, template, params=None): + cls.logger.debug(expand_template(template, params)) + + @classmethod + def note(cls, template, params=None, stack_depth=0): + cls.logger.debug(expand_template(template, params)) + + @classmethod + def unexpected(cls, template, params=None, cause=None): + cls.logger.error(expand_template(template, params)) + + @classmethod + def warning(cls, template, params=None, *args, **kwargs): + cls.logger.warn(expand_template(template, params)) + + @classmethod + def error(cls, template, params=None, cause=None, stack_depth=0): + cls.logger.error(expand_template(template, params)) + cls.old_class.error(template, params, cause, stack_depth) diff --git a/pyLibrary/env/pulse.py b/pyLibrary/env/pulse.py new file mode 100644 index 0000000..b69cdc5 --- /dev/null +++ b/pyLibrary/env/pulse.py @@ -0,0 +1,217 @@ +# encoding: utf-8 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Author: Kyle Lahnakoski (kyle@lahnakoski.com) +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import + +import datetime +from socket import timeout as socket_timeout + +from kombu import Connection, Producer, Exchange +from pytz import timezone +from mozillapulse.utils import time_to_string + +from pyLibrary.debugs import constants +from pyLibrary import jsons +from pyLibrary.debugs.exceptions import Except +from pyLibrary.debugs.logs import Log +from pyLibrary.dot import wrap, coalesce, Dict, set_default +from pyLibrary.meta import use_settings +from pyLibrary.thread.threads import Thread +from mozillapulse.consumers import GenericConsumer + + +class Consumer(Thread): + @use_settings + def __init__( + self, + exchange, # name of the Pulse exchange + topic, # message name pattern to subscribe to ('#' is wildcard) + target=None, # WILL BE CALLED WITH PULSE PAYLOADS AND ack() IF COMPLETE$ED WITHOUT EXCEPTION + target_queue=None, # (aka self.queue) WILL BE FILLED WITH PULSE PAYLOADS + host='pulse.mozilla.org', # url to connect, + port=5671, # tcp port + user=None, + password=None, + vhost="/", + start=0, # USED AS STARTING POINT FOR ASSIGNING THE _meta.count ATTRIBUTE + ssl=True, + applabel=None, + heartbeat=False, # True to also get the Pulse heartbeat message + durable=False, # True to keep queue after shutdown + serializer='json', + broker_timezone='GMT', + settings=None + ): + self.target_queue = target_queue + self.pulse_target = target + if (target_queue == None and target == None) or (target_queue != None and target != None): + Log.error("Expecting a queue (for fast digesters) or a target (for slow digesters)") + + Thread.__init__(self, name="Pulse consumer for " + settings.exchange, target=self._worker) + self.settings = settings + settings.callback = self._got_result + settings.user = coalesce(settings.user, settings.username) + settings.applabel = coalesce(settings.applable, settings.queue, settings.queue_name) + settings.topic = topic + + self.pulse = ModifiedGenericConsumer(settings, connect=True, **settings) + self.count = coalesce(start, 0) + self.start() + + def _got_result(self, data, message): + data = wrap(data) + data._meta.count = self.count + self.count += 1 + + if self.settings.debug: + Log.note("{{data}}", data= data) + if self.target_queue != None: + try: + self.target_queue.add(data) + message.ack() + except Exception, e: + e = Except.wrap(e) + if not self.target_queue.closed: # EXPECTED TO HAPPEN, THIS THREAD MAY HAVE BEEN AWAY FOR A WHILE + raise e + else: + try: + self.pulse_target(data) + message.ack() + except Exception, e: + Log.warning("Problem processing pulse (see `data` in structured log)", data=data, cause=e) + + def _worker(self, please_stop): + def disconnect(): + try: + self.target_queue.close() + Log.note("stop put into queue") + except: + pass + + self.pulse.disconnect() + Log.note("pulse listener was given a disconnect()") + + please_stop.on_go(disconnect) + + while not please_stop: + try: + self.pulse.listen() + except Exception, e: + if not please_stop: + Log.warning("pulse had problem", e) + Log.note("pulse listener is done") + + + def __exit__(self, exc_type, exc_val, exc_tb): + Log.note("clean pulse exit") + self.please_stop.go() + try: + self.target_queue.close() + Log.note("stop put into queue") + except: + pass + + try: + self.pulse.disconnect() + except Exception, e: + Log.warning("Can not disconnect during pulse exit, ignoring", e) + Thread.__exit__(self, exc_type, exc_val, exc_tb) + + +class Publisher(object): + """ + Mimic GenericPublisher https://github.com/bhearsum/mozillapulse/blob/master/mozillapulse/publishers.py + """ + + @use_settings + def __init__( + self, + exchange, # name of the Pulse exchange + host='pulse.mozilla.org', # url to connect, + port=5671, # tcp port + user=None, + password=None, + vhost="/", + start=0, # USED AS STARTING POINT FOR ASSIGNING THE _meta.count ATTRIBUTE + ssl=True, + applabel=None, + heartbeat=False, # True to also get the Pulse heartbeat message + durable=False, # True to keep queue after shutdown + serializer='json', + broker_timezone='GMT', + settings=None + ): + self.settings = settings + self.connection = None + self.count = 0 + + def connect(self): + if not self.connection: + self.connection = Connection( + hostname=self.settings.host, + port=self.settings.port, + userid=self.settings.user, + password=self.settings.password, + virtual_host=self.settings.vhost, + ssl=self.settings.ssl + ) + + def disconnect(self): + if self.connection: + self.connection.release() + self.connection = None + + def send(self, topic, message): + """Publishes a pulse message to the proper exchange.""" + + if not message: + Log.error("Expecting a message") + + message._prepare() + + if not self.connection: + self.connect() + + producer = Producer( + channel=self.connection, + exchange=Exchange(self.settings.exchange, type='topic'), + routing_key=topic + ) + + # The message is actually a simple envelope format with a payload and + # some metadata. + final_data = Dict( + payload=message.data, + _meta=set_default({ + 'exchange': self.settings.exchange, + 'routing_key': message.routing_key, + 'serializer': self.settings.serializer, + 'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))), + 'count': self.count + }, message.metadata) + ) + + producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer) + self.count += 1 + + +class ModifiedGenericConsumer(GenericConsumer): + def _drain_events_loop(self): + while True: + try: + self.connection.drain_events(timeout=self.timeout) + except socket_timeout, e: + Log.warning("timeout! Restarting pulse consumer.", cause=e) + try: + self.disconnect() + except Exception, f: + Log.warning("Problem with disconnect()", cause=f) + break