fix ignore file to include more

This commit is contained in:
Kyle Lahnakoski 2016-01-09 08:06:24 -05:00
Родитель bb789c4b8b
Коммит 319771f10b
11 изменённых файлов: 2591 добавлений и 53 удалений

53
.gitignore поставляемый
Просмотреть файл

@ -2,59 +2,6 @@
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
/.idea
/MoDataSubmission.iml
/pyLibrary/.svn

64
pyLibrary/env/README.md поставляемый Normal file
Просмотреть файл

@ -0,0 +1,64 @@
Environment
===========
This directory is for connecting to other systems. Generally, these
classes are facades that assume content is UTF-8 encoded JSON.
files
-----
The `File` class makes the default assumption all files have cr-delimited
unicode content that is UTF-8 encoded. This is great for json files.
It also provides better OO over some common file manipulations.
emailer
-------
A simple emailer, the primary purpose is to accept a [Dict](../dot/README.md)
of settings.
pulse
-----
For connecting clients to [Mozilla's Pulse](https://pulse.mozilla.org/).
elasticsearch
-------------
This module handles the lifecycle of an Elasticsearch index in the context of
ETL. You only need this module if you are creating and retiring indexes. You
do not need this module for simply searching; for that I suggest using the
rest API directly.
###Settings###
Both ```Cluster``` and ```Index``` objects accept the same settings dict,
selecting only the properties it requires.
{
"host" : "http://192.168.0.98",
"port" : 9200,
"index" : "b2g_tests",
"type" : "test_result",
"debug" : true,
"limit_replicas" : true,
"schema_file" : "./resources/schema/test_schema.json"
},
Cluster
-------
Index
-----

0
pyLibrary/env/__init__.py поставляемый Normal file
Просмотреть файл

311
pyLibrary/env/big_data.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,311 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
import gzip
from io import BytesIO
from tempfile import TemporaryFile
import zipfile
import zlib
from pyLibrary.debugs.logs import Log
from pyLibrary.maths import Math
# LIBRARY TO DEAL WITH BIG DATA ARRAYS AS ITERATORS OVER (IR)REGULAR SIZED
# BLOCKS, OR AS ITERATORS OVER LINES
MIN_READ_SIZE = 8 * 1024
MAX_STRING_SIZE = 1 * 1024 * 1024
class FileString(object):
"""
ACTS LIKE A STRING, BUT IS A FILE
"""
def __init__(self, file):
self.file = file
def decode(self, encoding):
if encoding != "utf8":
Log.error("can not handle {{encoding}}", encoding= encoding)
self.encoding = encoding
return self
def split(self, sep):
if sep != "\n":
Log.error("Can only split by lines")
self.file.seek(0)
return LazyLines(self.file)
def __len__(self):
temp = self.file.tell()
self.file.seek(0, 2)
file_length = self.file.tell()
self.file.seek(temp)
return file_length
def __getslice__(self, i, j):
self.file.seek(i)
output = self.file.read(j - i).decode(self.encoding)
return output
def __add__(self, other):
self.file.seek(0, 2)
self.file.write(other)
def __radd__(self, other):
new_file = TemporaryFile()
new_file.write(other)
self.file.seek(0)
for l in self.file:
new_file.write(l)
new_file.seek(0)
return FileString(new_file)
def __getattr__(self, attr):
return getattr(self.file, attr)
def __del__(self):
self.file, temp = None, self.file
if temp:
temp.close()
def __iter__(self):
self.file.seek(0)
return self.file
def safe_size(source):
"""
READ THE source UP TO SOME LIMIT, THEN COPY TO A FILE IF TOO BIG
RETURN A str() OR A FileString()
"""
if source is None:
return None
total_bytes = 0
bytes = []
b = source.read(MIN_READ_SIZE)
while b:
total_bytes += len(b)
bytes.append(b)
if total_bytes > MAX_STRING_SIZE:
try:
data = FileString(TemporaryFile())
for bb in bytes:
data.write(bb)
del bytes
del bb
b = source.read(MIN_READ_SIZE)
while b:
total_bytes += len(b)
data.write(b)
b = source.read(MIN_READ_SIZE)
data.seek(0)
Log.note("Using file of size {{length}} instead of str()", length= total_bytes)
return data
except Exception, e:
Log.error("Could not write file > {{num}} bytes", num= total_bytes, cause=e)
b = source.read(MIN_READ_SIZE)
data = b"".join(bytes)
del bytes
return data
class LazyLines(object):
"""
SIMPLE LINE ITERATOR, BUT WITH A BIT OF CACHING TO LOOK LIKE AN ARRAY
"""
def __init__(self, source, encoding="utf8"):
"""
ASSUME source IS A LINE ITERATOR OVER utf8 ENCODED BYTE STREAM
"""
self.source = source
self.encoding = encoding
self._iter = self.__iter__()
self._last = None
self._next = 0
def __getslice__(self, i, j):
if i == self._next:
return self._iter
Log.error("Do not know how to slice this generator")
def __iter__(self):
def output(encoding):
for v in self.source:
if not encoding:
self._last = v
else:
self._last = v.decode(encoding)
self._next += 1
yield self._last
return output(self.encoding)
def __getitem__(self, item):
try:
if item == self._next:
return self._iter.next()
elif item == self._next - 1:
return self._last
else:
Log.error("can not index out-of-order too much")
except Exception, e:
Log.error("Problem indexing", e)
class CompressedLines(LazyLines):
"""
KEEP COMPRESSED HTTP (content-type: gzip) IN BYTES ARRAY
WHILE PULLING OUT ONE LINE AT A TIME FOR PROCESSING
"""
def __init__(self, compressed, encoding="utf8"):
"""
USED compressed BYTES TO DELIVER LINES OF TEXT
LIKE LazyLines, BUT HAS POTENTIAL TO seek()
"""
self.compressed = compressed
LazyLines.__init__(self, None, encoding=encoding)
self._iter = self.__iter__()
def __iter__(self):
return LazyLines(ibytes2ilines(compressed_bytes2ibytes(self.compressed, MIN_READ_SIZE)), self.encoding).__iter__()
def __getslice__(self, i, j):
if i == self._next:
return self._iter
if i == 0:
return self.__iter__()
if i == self._next - 1:
def output():
yield self._last
for v in self._iter:
yield v
return output()
Log.error("Do not know how to slice this generator")
def __getitem__(self, item):
try:
if item == self._next:
self._last = self._iter.next()
self._next += 1
return self._last
elif item == self._next - 1:
return self._last
else:
Log.error("can not index out-of-order too much")
except Exception, e:
Log.error("Problem indexing", e)
def __radd__(self, other):
new_file = TemporaryFile()
new_file.write(other)
self.file.seek(0)
for l in self.file:
new_file.write(l)
new_file.seek(0)
return FileString(new_file)
def compressed_bytes2ibytes(compressed, size):
"""
CONVERT AN ARRAY OF BYTES TO A BYTE-BLOCK GENERATOR
USEFUL IN THE CASE WHEN WE WANT TO LIMIT HOW MUCH WE FEED ANOTHER
GENERATOR (LIKE A DECOMPRESSOR)
"""
decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
for i in range(0, Math.ceiling(len(compressed), size), size):
try:
block = compressed[i: i + size]
yield decompressor.decompress(block)
except Exception, e:
Log.error("Not expected", e)
def ibytes2ilines(stream):
"""
CONVERT A GENERATOR OF (ARBITRARY-SIZED) byte BLOCKS
TO A LINE (CR-DELIMITED) GENERATOR
"""
_buffer = stream.next()
s = 0
e = _buffer.find(b"\n")
while True:
while e == -1:
try:
next_block = stream.next()
_buffer = _buffer[s:] + next_block
s = 0
e = _buffer.find(b"\n")
except StopIteration:
_buffer = _buffer[s:]
del stream
yield _buffer
return
yield _buffer[s:e]
s = e + 1
e = _buffer.find(b"\n", s)
def sbytes2ilines(stream):
"""
CONVERT A STREAM OF (ARBITRARY-SIZED) byte BLOCKS
TO A LINE (CR-DELIMITED) GENERATOR
"""
def read():
output = stream.read(MIN_READ_SIZE)
return output
return ibytes2ilines({"next": read})
class GzipLines(CompressedLines):
"""
SAME AS CompressedLines, BUT USING THE GzipFile FORMAT FOR COMPRESSED BYTES
"""
def __init__(self, compressed, encoding="utf8"):
CompressedLines.__init__(self, compressed, encoding=encoding)
def __iter__(self):
buff = BytesIO(self.compressed)
return LazyLines(gzip.GzipFile(fileobj=buff, mode='r'), encoding=self.encoding).__iter__()
class ZipfileLines(CompressedLines):
"""
SAME AS CompressedLines, BUT USING THE ZipFile FORMAT FOR COMPRESSED BYTES
"""
def __init__(self, compressed, encoding="utf8"):
CompressedLines.__init__(self, compressed, encoding=encoding)
def __iter__(self):
buff = BytesIO(self.compressed)
archive = zipfile.ZipFile(buff, mode='r')
names = archive.namelist()
if len(names) != 1:
Log.error("*.zip file has {{num}} files, expecting only one.", num= len(names))
stream = archive.open(names[0], "r")
return LazyLines(sbytes2ilines(stream), encoding=self.encoding).__iter__()

1154
pyLibrary/env/elasticsearch.py поставляемый Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

137
pyLibrary/env/emailer.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,137 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
import smtplib
import sys
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pyLibrary.debugs.logs import Log
from pyLibrary.dot import listwrap
from pyLibrary.dot import coalesce
from pyLibrary.meta import use_settings
class Emailer:
@use_settings
def __init__(
self,
from_address,
to_address,
host,
username,
password,
subject="catchy title",
port=465,
use_ssl=1,
settings=None
):
self.settings = settings
self.server = None
def __enter__(self):
if self.server is not None:
Log.error("Got a problem")
if self.settings.use_ssl:
self.server = smtplib.SMTP_SSL(self.settings.host, self.settings.port)
else:
self.server = smtplib.SMTP(self.settings.host, self.settings.port)
if self.settings.username and self.settings.password:
self.server.login(self.settings.username, self.settings.password)
return self
def __exit__(self, type, value, traceback):
try:
self.server.quit()
except Exception, e:
Log.warning("Problem with smtp server quit(), ignoring problem", e)
self.server = None
def send_email(self,
from_address=None,
to_address=None,
subject=None,
text_data=None,
html_data=None
):
"""Sends an email.
from_addr is an email address; to_addrs is a list of email adresses.
Addresses can be plain (e.g. "jsmith@example.com") or with real names
(e.g. "John Smith <jsmith@example.com>").
text_data and html_data are both strings. You can specify one or both.
If you specify both, the email will be sent as a MIME multipart
alternative, i.e., the recipient will see the HTML content if his
viewer supports it; otherwise he'll see the text content.
"""
settings = self.settings
from_address = coalesce(from_address, settings["from"], settings.from_address)
to_address = listwrap(coalesce(to_address, settings.to_address, settings.to_addrs))
if not from_address or not to_address:
raise Exception("Both from_addr and to_addrs must be specified")
if not text_data and not html_data:
raise Exception("Must specify either text_data or html_data")
if not html_data:
msg = MIMEText(text_data)
elif not text_data:
msg = MIMEText(html_data, 'html')
else:
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(text_data, 'plain'))
msg.attach(MIMEText(html_data, 'html'))
msg['Subject'] = coalesce(subject, settings.subject)
msg['From'] = from_address
msg['To'] = ', '.join(to_address)
if self.server:
# CALL AS PART OF A SMTP SESSION
self.server.sendmail(from_address, to_address, msg.as_string())
else:
# CALL AS STAND-ALONE
with self:
self.server.sendmail(from_address, to_address, msg.as_string())
if sys.hexversion < 0x020603f0:
# versions earlier than 2.6.3 have a bug in smtplib when sending over SSL:
# http://bugs.python.org/issue4066
# Unfortunately the stock version of Python in Snow Leopard is 2.6.1, so
# we patch it here to avoid having to install an updated Python version.
import socket
import ssl
def _get_socket_fixed(self, host, port, timeout):
if self.debuglevel > 0:
print>> sys.stderr, 'connect:', (host, port)
new_socket = socket.create_connection((host, port), timeout)
new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile)
self.file = smtplib.SSLFakeFile(new_socket)
return new_socket
smtplib.SMTP_SSL._get_socket = _get_socket_fixed

331
pyLibrary/env/files.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,331 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime
import io
import os
import shutil
from pyLibrary.strings import utf82unicode
from pyLibrary.maths import crypto
from pyLibrary.dot import coalesce, set_default, split_field, join_field
from pyLibrary.dot import listwrap, wrap
from pyLibrary import convert
class File(object):
"""
ASSUMES ALL FILE CONTENT IS UTF8 ENCODED STRINGS
"""
def __init__(self, filename, buffering=2 ** 14, suffix=None):
"""
YOU MAY SET filename TO {"path":p, "key":k} FOR CRYPTO FILES
"""
if filename == None:
from pyLibrary.debugs.logs import Log
Log.error("File must be given a filename")
elif isinstance(filename, basestring):
self.key = None
self._filename = "/".join(filename.split(os.sep)) # USE UNIX STANDARD
else:
self.key = convert.base642bytearray(filename.key)
self._filename = "/".join(filename.path.split(os.sep)) # USE UNIX STANDARD
while self._filename.find(".../") >= 0:
# LET ... REFER TO GRANDPARENT, .... REFER TO GREAT-GRAND-PARENT, etc...
self._filename = self._filename.replace(".../", "../../")
self.buffering = buffering
if suffix:
self._filename = File.add_suffix(self._filename, suffix)
@classmethod
def new_instance(cls, *path):
def scrub(i, p):
if isinstance(p, File):
p = p.abspath
p = p.replace(os.sep, "/")
if p[-1] == '/':
p = p[:-1]
if i > 0 and p[0] == '/':
p = p[1:]
return p
return File('/'.join(scrub(i, p) for i, p in enumerate(path)))
@property
def filename(self):
return self._filename.replace("/", os.sep)
@property
def abspath(self):
if self._filename.startswith("~"):
home_path = os.path.expanduser("~")
if os.sep == "\\":
home_path = home_path.replace(os.sep, "/")
if home_path.endswith("/"):
home_path = home_path[:-1]
return home_path + self._filename[1::]
else:
if os.sep == "\\":
return os.path.abspath(self._filename).replace(os.sep, "/")
else:
return os.path.abspath(self._filename)
@staticmethod
def add_suffix(filename, suffix):
"""
ADD suffix TO THE filename (NOT INCLUDING THE FILE EXTENSION)
"""
path = filename.split("/")
parts = path[-1].split(".")
i = max(len(parts) - 2, 0)
parts[i] = parts[i] + suffix
path[-1] = ".".join(parts)
return "/".join(path)
@property
def extension(self):
parts = self._filename.split("/")[-1].split(".")
if len(parts) == 1:
return ""
else:
return parts[-1]
@property
def name(self):
parts = self._filename.split("/")[-1].split(".")
if len(parts) == 1:
return parts[0]
else:
return ".".join(parts[0:-1])
def set_extension(self, ext):
"""
RETURN NEW FILE WITH GIVEN EXTENSION
"""
path = self._filename.split("/")
parts = path[-1].split(".")
if len(parts) == 1:
parts.append(ext)
else:
parts[-1] = ext
path[-1] = ".".join(parts)
return File("/".join(path))
def set_name(self, name):
"""
RETURN NEW FILE WITH GIVEN EXTENSION
"""
path = self._filename.split("/")
parts = path[-1].split(".")
if len(parts) == 1:
path[-1] = name
else:
path[-1] = name + "." + parts[-1]
return File("/".join(path))
def backup_name(self, timestamp=None):
"""
RETURN A FILENAME THAT CAN SERVE AS A BACKUP FOR THIS FILE
"""
suffix = convert.datetime2string(coalesce(timestamp, datetime.now()), "%Y%m%d_%H%M%S")
return File.add_suffix(self._filename, suffix)
def read(self, encoding="utf8"):
with open(self._filename, "rb") as f:
content = f.read().decode(encoding)
if self.key:
return crypto.decrypt(content, self.key)
else:
return content
def read_json(self, encoding="utf8"):
from pyLibrary.jsons import ref
content = self.read(encoding=encoding)
value = convert.json2value(content, flexible=True, leaves=True)
abspath = self.abspath
if os.sep == "\\":
abspath = "/" + abspath.replace(os.sep, "/")
return ref.expand(value, "file://" + abspath)
def is_directory(self):
return os.path.isdir(self._filename)
def read_bytes(self):
try:
if not self.parent.exists:
self.parent.create()
with open(self._filename, "rb") as f:
return f.read()
except Exception, e:
from pyLibrary.debugs.logs import Log
Log.error("Problem reading file {{filename}}", self.abspath)
def write_bytes(self, content):
if not self.parent.exists:
self.parent.create()
with open(self._filename, "wb") as f:
f.write(content)
def write(self, data):
if not self.parent.exists:
self.parent.create()
with open(self._filename, "wb") as f:
if isinstance(data, list) and self.key:
from pyLibrary.debugs.logs import Log
Log.error("list of data and keys are not supported, encrypt before sending to file")
if isinstance(data, list):
pass
elif isinstance(data, basestring):
data=[data]
elif hasattr(data, "__iter__"):
pass
for d in data:
if not isinstance(d, unicode):
from pyLibrary.debugs.logs import Log
Log.error("Expecting unicode data only")
if self.key:
f.write(crypto.encrypt(d, self.key).encode("utf8"))
else:
f.write(d.encode("utf8"))
def __iter__(self):
# NOT SURE HOW TO MAXIMIZE FILE READ SPEED
# http://stackoverflow.com/questions/8009882/how-to-read-large-file-line-by-line-in-python
# http://effbot.org/zone/wide-finder.htm
def output():
try:
path = self._filename
if path.startswith("~"):
home_path = os.path.expanduser("~")
path = home_path + path[1::]
with io.open(path, "rb") as f:
for line in f:
yield utf82unicode(line)
except Exception, e:
from pyLibrary.debugs.logs import Log
Log.error("Can not read line from {{filename}}", filename= self._filename, cause=e)
return output()
def append(self, content):
"""
add a line to file
"""
if not self.parent.exists:
self.parent.create()
with open(self._filename, "ab") as output_file:
if isinstance(content, str):
from pyLibrary.debugs.logs import Log
Log.error("expecting to write unicode only")
output_file.write(content.encode("utf-8"))
output_file.write(b"\n")
def add(self, content):
return self.append(content)
def extend(self, content):
try:
if not self.parent.exists:
self.parent.create()
with open(self._filename, "ab") as output_file:
for c in content:
if isinstance(c, str):
from pyLibrary.debugs.logs import Log
Log.error("expecting to write unicode only")
output_file.write(c.encode("utf-8"))
output_file.write(b"\n")
except Exception, e:
from pyLibrary.debugs.logs import Log
Log.error("Could not write to file", e)
def delete(self):
try:
if os.path.isdir(self._filename):
shutil.rmtree(self._filename)
elif os.path.isfile(self._filename):
os.remove(self._filename)
return self
except Exception, e:
if e.strerror == "The system cannot find the path specified":
return
from pyLibrary.debugs.logs import Log
Log.error("Could not remove file", e)
def backup(self):
names = self._filename.split("/")[-1].split(".")
if len(names) == 1:
backup = File(self._filename + ".backup " + datetime.utcnow().strftime("%Y%m%d %H%i%s"))
def create(self):
try:
os.makedirs(self._filename)
except Exception, e:
from pyLibrary.debugs.logs import Log
Log.error("Could not make directory {{dir_name}}", dir_name= self._filename, cause=e)
@property
def children(self):
return [File(self._filename + "/" + c) for c in os.listdir(self.filename)]
@property
def parent(self):
return File("/".join(self._filename.split("/")[:-1]))
@property
def exists(self):
if self._filename in ["", "."]:
return True
try:
return os.path.exists(self._filename)
except Exception, e:
return False
def __bool__(self):
return self.__nonzero__()
def __nonzero__(self):
"""
USED FOR FILE EXISTENCE TESTING
"""
if self._filename in ["", "."]:
return True
try:
return os.path.exists(self._filename)
except Exception, e:
return False
@classmethod
def copy(cls, from_, to_):
File.new_instance(to_).write_bytes(File.new_instance(from_).read_bytes())

60
pyLibrary/env/git.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,60 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from pyLibrary.meta import cache
from pyLibrary.thread.multiprocess import Process
@cache
def get_git_revision():
"""
GET THE CURRENT GIT REVISION
"""
proc = Process("git log", ["git", "log", "-1"])
try:
while True:
line = proc.stdout.pop().strip()
if not line:
continue
if line.startswith("commit "):
return line[7:]
finally:
try:
proc.join()
except Exception:
pass
@cache
def get_remote_revision(url, branch):
"""
GET REVISION OF A REMOTE BRANCH
"""
proc = Process("git remote revision", ["git", "ls-remote", url, "refs/heads/" + branch])
try:
while True:
line = proc.stdout.pop().strip()
if not line:
continue
return line.split("\t")[0]
finally:
try:
proc.join()
except Exception:
pass
return None

264
pyLibrary/env/http.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,264 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# MIMICS THE requests API (http://docs.python-requests.org/en/latest/)
# DEMANDS data IS A JSON-SERIALIZABLE STRUCTURE
# WITH ADDED default_headers THAT CAN BE SET USING pyLibrary.debugs.settings
# EG
# {"debug.constants":{
# "pyLibrary.env.http.default_headers={
# "From":"klahnakoski@mozilla.com"
# }
# }}
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from copy import copy
from numbers import Number
from requests import sessions, Response
from pyLibrary import convert
from pyLibrary.debugs.exceptions import Except
from pyLibrary.debugs.logs import Log
from pyLibrary.dot import Dict, coalesce, wrap, set_default
from pyLibrary.env.big_data import safe_size, CompressedLines, ZipfileLines, GzipLines
from pyLibrary.maths import Math
from pyLibrary.queries import qb
from pyLibrary.thread.threads import Thread
from pyLibrary.times.durations import SECOND
FILE_SIZE_LIMIT = 100 * 1024 * 1024
MIN_READ_SIZE = 8 * 1024
ZIP_REQUEST = False
default_headers = Dict() # TODO: MAKE THIS VARIABLE A SPECIAL TYPE OF EXPECTED MODULE PARAMETER SO IT COMPLAINS IF NOT SET
default_timeout = 600
_warning_sent = False
def request(method, url, zip=None, retry=None, **kwargs):
"""
JUST LIKE requests.request() BUT WITH DEFAULT HEADERS AND FIXES
DEMANDS data IS ONE OF:
* A JSON-SERIALIZABLE STRUCTURE, OR
* LIST OF JSON-SERIALIZABLE STRUCTURES, OR
* None
Parameters
* zip - ZIP THE REQUEST BODY, IF BIG ENOUGH
* json - JSON-SERIALIZABLE STRUCTURE
* retry - {"times": x, "sleep": y} STRUCTURE
THE BYTE_STRINGS (b"") ARE NECESSARY TO PREVENT httplib.py FROM **FREAKING OUT**
IT APPEARS requests AND httplib.py SIMPLY CONCATENATE STRINGS BLINDLY, WHICH
INCLUDES url AND headers
"""
global _warning_sent
if not default_headers and not _warning_sent:
_warning_sent = True
Log.warning(
"The pyLibrary.env.http module was meant to add extra "
"default headers to all requests, specifically the 'Referer' "
"header with a URL to the project. Use the `pyLibrary.debug.constants.set()` "
"function to set `pyLibrary.env.http.default_headers`"
)
if isinstance(url, list):
# TRY MANY URLS
failures = []
for remaining, u in qb.countdown(url):
try:
response = request(method, u, zip=zip, retry=retry, **kwargs)
if Math.round(response.status_code, decimal=-2) not in [400, 500]:
return response
if not remaining:
return response
except Exception, e:
e = Except.wrap(e)
failures.append(e)
Log.error("Tried {{num}} urls", num=len(url), cause=failures)
session = sessions.Session()
session.headers.update(default_headers)
if zip is None:
zip = ZIP_REQUEST
if isinstance(url, unicode):
# httplib.py WILL **FREAK OUT** IF IT SEES ANY UNICODE
url = url.encode("ascii")
_to_ascii_dict(kwargs)
timeout = kwargs[b'timeout'] = coalesce(kwargs.get(b'timeout'), default_timeout)
if retry is None:
retry = Dict(times=1, sleep=0)
elif isinstance(retry, Number):
retry = Dict(times=retry, sleep=SECOND)
else:
retry = wrap(retry)
set_default(retry.sleep, {"times": 1, "sleep": 0})
if b'json' in kwargs:
kwargs[b'data'] = convert.value2json(kwargs[b'json']).encode("utf8")
del kwargs[b'json']
try:
if zip and len(coalesce(kwargs.get(b"data"))) > 1000:
compressed = convert.bytes2zip(kwargs[b"data"])
if b"headers" not in kwargs:
kwargs[b"headers"] = {}
kwargs[b"headers"][b'content-encoding'] = b'gzip'
kwargs[b"data"] = compressed
_to_ascii_dict(kwargs[b"headers"])
else:
_to_ascii_dict(kwargs.get(b"headers"))
except Exception, e:
Log.error("Request setup failure on {{url}}", url=url, cause=e)
errors = []
for r in range(retry.times):
if r:
Thread.sleep(retry.sleep)
try:
return session.request(method=method, url=url, **kwargs)
except Exception, e:
errors.append(Except.wrap(e))
if " Read timed out." in errors[0]:
Log.error("Tried {{times}} times: Timeout failure (timeout was {{timeout}}", timeout=timeout, times=retry.times, cause=errors[0])
else:
Log.error("Tried {{times}} times: Request failure of {{url}}", url=url, times=retry.times, cause=errors[0])
def _to_ascii_dict(headers):
if headers is None:
return
for k, v in copy(headers).items():
if isinstance(k, unicode):
del headers[k]
if isinstance(v, unicode):
headers[k.encode("ascii")] = v.encode("ascii")
else:
headers[k.encode("ascii")] = v
elif isinstance(v, unicode):
headers[k] = v.encode("ascii")
def get(url, **kwargs):
kwargs.setdefault(b'allow_redirects', True)
kwargs[b"stream"] = True
return HttpResponse(request(b'get', url, **kwargs))
def get_json(url, **kwargs):
"""
ASSUME RESPONSE IN IN JSON
"""
response = get(url, **kwargs)
c = response.all_content
return convert.json2value(convert.utf82unicode(c))
def options(url, **kwargs):
kwargs.setdefault(b'allow_redirects', True)
kwargs[b"stream"] = True
return HttpResponse(request(b'options', url, **kwargs))
def head(url, **kwargs):
kwargs.setdefault(b'allow_redirects', False)
kwargs[b"stream"] = True
return HttpResponse(request(b'head', url, **kwargs))
def post(url, **kwargs):
kwargs[b"stream"] = True
return HttpResponse(request(b'post', url, **kwargs))
def post_json(url, **kwargs):
"""
ASSUME RESPONSE IN IN JSON
"""
kwargs["data"] = convert.unicode2utf8(convert.value2json(kwargs["data"]))
response = post(url, **kwargs)
c=response.content
return convert.json2value(convert.utf82unicode(c))
def put(url, **kwargs):
return HttpResponse(request(b'put', url, **kwargs))
def patch(url, **kwargs):
kwargs[b"stream"] = True
return HttpResponse(request(b'patch', url, **kwargs))
def delete(url, **kwargs):
kwargs[b"stream"] = True
return HttpResponse(request(b'delete', url, **kwargs))
class HttpResponse(Response):
def __new__(cls, resp):
resp.__class__ = HttpResponse
return resp
def __init__(self, resp):
pass
self._cached_content = None
@property
def all_content(self):
# response.content WILL LEAK MEMORY (?BECAUSE OF PYPY"S POOR HANDLING OF GENERATORS?)
# THE TIGHT, SIMPLE, LOOP TO FILL blocks PREVENTS THAT LEAK
if self._content is not False:
self._cached_content = self._content
elif self._cached_content is None:
def read(size):
if self.raw._fp.fp is not None:
return self.raw.read(amt=size, decode_content=True)
else:
self.close()
return None
self._cached_content = safe_size(Dict(read=read))
if hasattr(self._cached_content, "read"):
self._cached_content.seek(0)
return self._cached_content
@property
def all_lines(self):
return self._all_lines()
def _all_lines(self, encoding="utf8"):
try:
content = self.raw.read(decode_content=False)
if self.headers.get('content-encoding') == 'gzip':
return CompressedLines(content, encoding=encoding)
elif self.headers.get('content-type') == 'application/zip':
return ZipfileLines(content, encoding=encoding)
elif self.url.endswith(".gz"):
return GzipLines(content, encoding)
else:
return content.decode(encoding).split("\n")
except Exception, e:
Log.error("Can not read content", cause=e)
finally:
self.close()

53
pyLibrary/env/mozlog.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,53 @@
from pyLibrary.debugs.logs import Log
from pyLibrary.strings import expand_template
_using_mozlog = False
def use():
if _using_mozlog:
return
globals()["_using_mozlog"] = True
try:
from mozlog.structured import structuredlog
global logger
logger = structuredlog.get_default_logger()
ToMozLog.logger = logger
ToMozLog.old_class = Log
globals()["Log"] = ToMozLog
except:
pass
class ToMozLog(object):
"""
MAP CALLS pyLibrary.debugs.logs.Log TO mozlog.structured.structuredlog.StructuredLogger
"""
logger = None
old_class = None
@classmethod
def debug(cls, template=None, params=None):
cls.logger.debug(expand_template(template, params))
@classmethod
def println(cls, template, params=None):
cls.logger.debug(expand_template(template, params))
@classmethod
def note(cls, template, params=None, stack_depth=0):
cls.logger.debug(expand_template(template, params))
@classmethod
def unexpected(cls, template, params=None, cause=None):
cls.logger.error(expand_template(template, params))
@classmethod
def warning(cls, template, params=None, *args, **kwargs):
cls.logger.warn(expand_template(template, params))
@classmethod
def error(cls, template, params=None, cause=None, stack_depth=0):
cls.logger.error(expand_template(template, params))
cls.old_class.error(template, params, cause, stack_depth)

217
pyLibrary/env/pulse.py поставляемый Normal file
Просмотреть файл

@ -0,0 +1,217 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
import datetime
from socket import timeout as socket_timeout
from kombu import Connection, Producer, Exchange
from pytz import timezone
from mozillapulse.utils import time_to_string
from pyLibrary.debugs import constants
from pyLibrary import jsons
from pyLibrary.debugs.exceptions import Except
from pyLibrary.debugs.logs import Log
from pyLibrary.dot import wrap, coalesce, Dict, set_default
from pyLibrary.meta import use_settings
from pyLibrary.thread.threads import Thread
from mozillapulse.consumers import GenericConsumer
class Consumer(Thread):
@use_settings
def __init__(
self,
exchange, # name of the Pulse exchange
topic, # message name pattern to subscribe to ('#' is wildcard)
target=None, # WILL BE CALLED WITH PULSE PAYLOADS AND ack() IF COMPLETE$ED WITHOUT EXCEPTION
target_queue=None, # (aka self.queue) WILL BE FILLED WITH PULSE PAYLOADS
host='pulse.mozilla.org', # url to connect,
port=5671, # tcp port
user=None,
password=None,
vhost="/",
start=0, # USED AS STARTING POINT FOR ASSIGNING THE _meta.count ATTRIBUTE
ssl=True,
applabel=None,
heartbeat=False, # True to also get the Pulse heartbeat message
durable=False, # True to keep queue after shutdown
serializer='json',
broker_timezone='GMT',
settings=None
):
self.target_queue = target_queue
self.pulse_target = target
if (target_queue == None and target == None) or (target_queue != None and target != None):
Log.error("Expecting a queue (for fast digesters) or a target (for slow digesters)")
Thread.__init__(self, name="Pulse consumer for " + settings.exchange, target=self._worker)
self.settings = settings
settings.callback = self._got_result
settings.user = coalesce(settings.user, settings.username)
settings.applabel = coalesce(settings.applable, settings.queue, settings.queue_name)
settings.topic = topic
self.pulse = ModifiedGenericConsumer(settings, connect=True, **settings)
self.count = coalesce(start, 0)
self.start()
def _got_result(self, data, message):
data = wrap(data)
data._meta.count = self.count
self.count += 1
if self.settings.debug:
Log.note("{{data}}", data= data)
if self.target_queue != None:
try:
self.target_queue.add(data)
message.ack()
except Exception, e:
e = Except.wrap(e)
if not self.target_queue.closed: # EXPECTED TO HAPPEN, THIS THREAD MAY HAVE BEEN AWAY FOR A WHILE
raise e
else:
try:
self.pulse_target(data)
message.ack()
except Exception, e:
Log.warning("Problem processing pulse (see `data` in structured log)", data=data, cause=e)
def _worker(self, please_stop):
def disconnect():
try:
self.target_queue.close()
Log.note("stop put into queue")
except:
pass
self.pulse.disconnect()
Log.note("pulse listener was given a disconnect()")
please_stop.on_go(disconnect)
while not please_stop:
try:
self.pulse.listen()
except Exception, e:
if not please_stop:
Log.warning("pulse had problem", e)
Log.note("pulse listener is done")
def __exit__(self, exc_type, exc_val, exc_tb):
Log.note("clean pulse exit")
self.please_stop.go()
try:
self.target_queue.close()
Log.note("stop put into queue")
except:
pass
try:
self.pulse.disconnect()
except Exception, e:
Log.warning("Can not disconnect during pulse exit, ignoring", e)
Thread.__exit__(self, exc_type, exc_val, exc_tb)
class Publisher(object):
"""
Mimic GenericPublisher https://github.com/bhearsum/mozillapulse/blob/master/mozillapulse/publishers.py
"""
@use_settings
def __init__(
self,
exchange, # name of the Pulse exchange
host='pulse.mozilla.org', # url to connect,
port=5671, # tcp port
user=None,
password=None,
vhost="/",
start=0, # USED AS STARTING POINT FOR ASSIGNING THE _meta.count ATTRIBUTE
ssl=True,
applabel=None,
heartbeat=False, # True to also get the Pulse heartbeat message
durable=False, # True to keep queue after shutdown
serializer='json',
broker_timezone='GMT',
settings=None
):
self.settings = settings
self.connection = None
self.count = 0
def connect(self):
if not self.connection:
self.connection = Connection(
hostname=self.settings.host,
port=self.settings.port,
userid=self.settings.user,
password=self.settings.password,
virtual_host=self.settings.vhost,
ssl=self.settings.ssl
)
def disconnect(self):
if self.connection:
self.connection.release()
self.connection = None
def send(self, topic, message):
"""Publishes a pulse message to the proper exchange."""
if not message:
Log.error("Expecting a message")
message._prepare()
if not self.connection:
self.connect()
producer = Producer(
channel=self.connection,
exchange=Exchange(self.settings.exchange, type='topic'),
routing_key=topic
)
# The message is actually a simple envelope format with a payload and
# some metadata.
final_data = Dict(
payload=message.data,
_meta=set_default({
'exchange': self.settings.exchange,
'routing_key': message.routing_key,
'serializer': self.settings.serializer,
'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))),
'count': self.count
}, message.metadata)
)
producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer)
self.count += 1
class ModifiedGenericConsumer(GenericConsumer):
def _drain_events_loop(self):
while True:
try:
self.connection.drain_events(timeout=self.timeout)
except socket_timeout, e:
Log.warning("timeout! Restarting pulse consumer.", cause=e)
try:
self.disconnect()
except Exception, f:
Log.warning("Problem with disconnect()", cause=f)
break