Bugzilla-ETL/vendor/jx_python/group_by.py

181 строка
5.2 KiB
Python

# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import absolute_import, division, unicode_literals
import math
import sys
from jx_base.container import Container
from jx_base.expressions import jx_expression
from jx_base.language import is_expression
from jx_python.expressions import jx_expression_to_function
from mo_collections.multiset import Multiset
from mo_dots import Data, FlatList, Null, listwrap
from mo_future import binary_type, text_type
from mo_logs import Log
from mo_logs.exceptions import Except
def groupby(data, keys=None, size=None, min_size=None, max_size=None, contiguous=False):
"""
:param data:
:param keys:
:param size:
:param min_size:
:param max_size:
:param contiguous: MAINTAIN THE ORDER OF THE DATA, STARTING THE NEW GROUP WHEN THE SELECTOR CHANGES
:return: return list of (keys, values) PAIRS, WHERE
keys IS IN LEAF FORM (FOR USE WITH {"eq": terms} OPERATOR
values IS GENERATOR OF ALL VALUE THAT MATCH keys
contiguous -
"""
if isinstance(data, Container):
return data.groupby(keys)
if size != None or min_size != None or max_size != None:
if size != None:
max_size = size
return groupby_min_max_size(data, min_size=min_size, max_size=max_size)
try:
keys = listwrap(keys)
if not contiguous:
from jx_python import jx
data = jx.sort(data, keys)
if not data:
return Null
if any(is_expression(k) for k in keys):
Log.error("can not handle expressions")
else:
accessor = jx_expression_to_function(jx_expression({"tuple": keys})) # CAN RETURN Null, WHICH DOES NOT PLAY WELL WITH __cmp__
def _output():
start = 0
prev = accessor(data[0])
for i, d in enumerate(data):
curr = accessor(d)
if curr != prev:
group = {}
for k, gg in zip(keys, prev):
group[k] = gg
yield Data(group), data[start:i:]
start = i
prev = curr
group = {}
for k, gg in zip(keys, prev):
group[k] = gg
yield Data(group), data[start::]
return _output()
except Exception as e:
Log.error("Problem grouping", cause=e)
def groupby_size(data, size):
if hasattr(data, "next"):
iterator = data
elif hasattr(data, "__iter__"):
iterator = data.__iter__()
else:
Log.error("do not know how to handle this type")
done = FlatList()
def more():
output = FlatList()
for i in range(size):
try:
output.append(iterator.next())
except StopIteration:
done.append(True)
break
return output
# THIS IS LAZY
i = 0
while True:
output = more()
yield (i, output)
if len(done) > 0:
break
i += 1
def groupby_Multiset(data, min_size, max_size):
# GROUP multiset BASED ON POPULATION OF EACH KEY, TRYING TO STAY IN min/max LIMITS
if min_size == None:
min_size = 0
total = 0
i = 0
g = list()
for k, c in data.items():
if total < min_size or total + c < max_size:
total += c
g.append(k)
elif total < max_size:
yield (i, g)
i += 1
total = c
g = [k]
if total >= max_size:
Log.error("({{min}}, {{max}}) range is too strict given step of {{increment}}",
min=min_size,
max=max_size,
increment=c
)
if g:
yield (i, g)
def groupby_min_max_size(data, min_size=0, max_size=None, ):
if max_size == None:
max_size = sys.maxint
if data.__class__ in (bytearray, text_type, binary_type, list, FlatList):
def _iter():
num = int(math.ceil(len(data)/max_size))
for i in range(num):
output = (i, data[i * max_size:i * max_size + max_size:])
yield output
return _iter()
elif hasattr(data, "__iter__"):
def _iter():
g = 0
out = FlatList()
try:
for i, d in enumerate(data):
out.append(d)
if (i + 1) % max_size == 0:
yield g, out
g += 1
out = FlatList()
if out:
yield g, out
except Exception as e:
e = Except.wrap(e)
if out:
# AT LEAST TRY TO RETURN WHAT HAS BEEN PROCESSED SO FAR
yield g, out
Log.error("Problem inside jx.groupby", e)
return _iter()
elif not isinstance(data, Multiset):
return groupby_size(data, max_size)
else:
return groupby_Multiset(data, min_size, max_size)