Remove pyes_enabled references in codebase

Signed-off-by: Brandon Myers <bmyers@mozilla.com>
This commit is contained in:
Brandon Myers 2016-11-29 12:43:00 -06:00
Родитель b12cd5583e
Коммит 8895fed1e9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 8AA79AD83045BBC7
5 изменённых файлов: 125 добавлений и 328 удалений

Просмотреть файл

@ -3,21 +3,10 @@ from query_models import SearchQuery, TermMatch, AggregatedResults, SimpleResult
import json
import logging
# Remove this code when pyes is gone!
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../alerts/lib"))
import pyes_enabled
# Remove this code when pyes is gone!
if pyes_enabled.pyes_on is True:
import pyes
else:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import bulk, BulkIndexError
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import bulk, BulkIndexError
from bulk_queue import BulkQueue
@ -47,118 +36,65 @@ class ElasticsearchInvalidIndex(Exception):
class ElasticsearchClient():
def __init__(self, servers, bulk_amount=100, bulk_refresh_time=30):
if pyes_enabled.pyes_on is True:
# ES v1
self.es_connection = pyes.ES(servers, bulk_size=bulk_amount)
else:
# ES v2 and up
self.es_connection = Elasticsearch(servers)
self.es_connection.ping()
self.es_connection = Elasticsearch(servers)
self.es_connection.ping()
self.bulk_queue = BulkQueue(self, threshold=bulk_amount, flush_time=bulk_refresh_time)
def delete_index(self, index_name, ignore_fail=False):
if pyes_enabled.pyes_on is True:
if ignore_fail is True:
self.es_connection.indices.delete_index_if_exists(index_name)
else:
self.es_connection.indices.delete_index(index_name)
else:
ignore_codes = []
if ignore_fail is True:
ignore_codes = [400, 404]
ignore_codes = []
if ignore_fail is True:
ignore_codes = [400, 404]
self.es_connection.indices.delete(index=index_name, ignore=ignore_codes)
self.es_connection.indices.delete(index=index_name, ignore=ignore_codes)
def get_indices(self):
if pyes_enabled.pyes_on is True:
return self.es_connection.indices.stats()['indices'].keys()
else:
return self.es_connection.indices.stats()['indices'].keys()
return self.es_connection.indices.stats()['indices'].keys()
def create_index(self, index_name, ignore_fail=False, mapping=None):
if pyes_enabled.pyes_on is True:
self.es_connection.indices.create_index(index_name)
else:
if not mapping:
mapping = '''
{
"mappings":{}
}'''
self.es_connection.indices.create(index=index_name, update_all_types='true', body=mapping)
if not mapping:
mapping = '''
{
"mappings":{}
}'''
self.es_connection.indices.create(index=index_name, update_all_types='true', body=mapping)
def create_alias(self, alias_name, index_name):
if pyes_enabled.pyes_on is True:
self.es_connection.indices.set_alias(alias_name, index_name)
else:
if self.es_connection.indices.exists_alias(index='*', name=alias_name):
self.es_connection.indices.delete_alias(index='*', name=alias_name)
if self.es_connection.indices.exists_alias(index='*', name=alias_name):
self.es_connection.indices.delete_alias(index='*', name=alias_name)
self.es_connection.indices.put_alias(index=index_name, name=alias_name)
self.es_connection.indices.put_alias(index=index_name, name=alias_name)
def get_alias(self, alias_name):
if pyes_enabled.pyes_on is True:
return self.es_connection.indices.get_alias(alias_name)
else:
return self.es_connection.indices.get_alias(index='*', name=alias_name).keys()
return self.es_connection.indices.get_alias(index='*', name=alias_name).keys()
def flush(self, index_name):
if pyes_enabled.pyes_on is True:
self.es_connection.indices.flush()
else:
self.es_connection.indices.flush(index=index_name)
self.es_connection.indices.flush(index=index_name)
def search(self, search_query, indices, size):
results = []
if pyes_enabled.pyes_on is True:
try:
esresults = self.es_connection.search(search_query, size=size, indices=','.join(map(str, indices)))
results = esresults._search_raw()
except pyes.exceptions.IndexMissingException:
raise ElasticsearchInvalidIndex(indices)
else:
try:
results = Search(using=self.es_connection, index=indices).params(size=size).filter(search_query).execute()
except NotFoundError:
raise ElasticsearchInvalidIndex(indices)
try:
results = Search(using=self.es_connection, index=indices).params(size=size).filter(search_query).execute()
except NotFoundError:
raise ElasticsearchInvalidIndex(indices)
result_set = SimpleResults(results)
return result_set
def aggregated_search(self, search_query, indices, aggregations, size):
if pyes_enabled.pyes_on is True:
query = search_query.search()
for field_name, aggregation_size in aggregations:
query.facet.add_term_facet(field_name, size=aggregation_size)
esresults = self.es_connection.search(query, size=size, indices=','.join(map(str, indices)))
results = esresults._search_raw()
else:
search_obj = Search(using=self.es_connection, index=indices).params(size=size)
query_obj = search_obj.filter(search_query)
for aggregation in aggregations:
query_obj.aggs.bucket(name=aggregation.to_dict()['terms']['field'], agg_type=aggregation)
results = query_obj.execute()
search_obj = Search(using=self.es_connection, index=indices).params(size=size)
query_obj = search_obj.filter(search_query)
for aggregation in aggregations:
query_obj.aggs.bucket(name=aggregation.to_dict()['terms']['field'], agg_type=aggregation)
results = query_obj.execute()
result_set = AggregatedResults(results)
return result_set
def save_documents(self, documents):
if pyes_enabled.pyes_on is True:
for document in documents:
try:
self.es_connection.index(index=document['_index'], doc_type=document['_type'], doc=document, id=document['_id'], bulk=True)
except pyes.exceptions.NoServerAvailable:
raise ElasticsearchBadServer()
except pyes.exceptions.InvalidIndexNameException:
raise ElasticsearchInvalidIndex(document['_index'])
except pyes.exceptions.ElasticSearchException as e:
raise ElasticsearchException(e.message)
self.es_connection.flush_bulk(True)
else:
try:
bulk(self.es_connection, documents)
except BulkIndexError as e:
logger.error("Error bulk indexing: " + str(e))
try:
bulk(self.es_connection, documents)
except BulkIndexError as e:
logger.error("Error bulk indexing: " + str(e))
def start_bulk_timer(self):
if not self.bulk_queue.started():
@ -179,20 +115,10 @@ class ElasticsearchClient():
if bulk:
self.bulk_save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id)
else:
if pyes_enabled.pyes_on is True:
try:
return self.es_connection.index(index=index, doc_type=doc_type, doc=body, id=doc_id)
except pyes.exceptions.NoServerAvailable:
raise ElasticsearchBadServer()
except pyes.exceptions.InvalidIndexNameException:
raise ElasticsearchInvalidIndex(index)
except pyes.exceptions.ElasticSearchException as e:
raise ElasticsearchException(e.message)
else:
doc_body = body
if '_source' in body:
doc_body = body['_source']
return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=doc_body)
doc_body = body
if '_source' in body:
doc_body = body['_source']
return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=doc_body)
def save_alert(self, body, index='alerts', doc_type='alert', doc_id=None, bulk=False):
return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk)
@ -230,23 +156,16 @@ class ElasticsearchClient():
"dashboard": json.dumps(dashboardjson)
}
if pyes_enabled.pyes_on is True:
return self.es_connection.index(index='kibana-int', doc_type='dashboard', doc=dashboarddata)
else:
return self.es_connection.index(index='.kibana', doc_type='dashboard', body=dashboarddata)
return self.es_connection.index(index='.kibana', doc_type='dashboard', body=dashboarddata)
def get_cluster_health(self):
if pyes_enabled.pyes_on is True:
escluster = pyes.managers.Cluster(self.es_connection)
return escluster.health()
else:
health_dict = self.es_connection.cluster.health()
# To line up with the health stats from ES1, we're
# removing certain keys
health_dict.pop('active_shards_percent_as_number', None)
health_dict.pop('delayed_unassigned_shards', None)
health_dict.pop('number_of_in_flight_fetch', None)
health_dict.pop('number_of_pending_tasks', None)
health_dict.pop('task_max_waiting_in_queue_millis', None)
health_dict = self.es_connection.cluster.health()
# To line up with the health stats from ES1, we're
# removing certain keys
health_dict.pop('active_shards_percent_as_number', None)
health_dict.pop('delayed_unassigned_shards', None)
health_dict.pop('number_of_in_flight_fetch', None)
health_dict.pop('number_of_pending_tasks', None)
health_dict.pop('task_max_waiting_in_queue_millis', None)
return health_dict
return health_dict

Просмотреть файл

@ -1,13 +0,0 @@
import os
global pyes_on
pyes_off = os.environ.get('DSL')
# We're gonna short circuit and turn off pyes
pyes_off = 'True'
if pyes_off == 'True':
pyes_on = False
# print "\nUsing Elasticsearch DSL\n"
else:
pyes_on = True
print "\nUsing PYES\n"

Просмотреть файл

@ -1,9 +1,4 @@
import pyes_enabled
if pyes_enabled.pyes_on is True:
import pyes
else:
from elasticsearch_dsl import Q, Search, A
from elasticsearch_dsl import Q, A
from utilities.toUTC import toUTC
@ -12,155 +7,92 @@ from datetime import timedelta
def ExistsMatch(field_name):
if pyes_enabled.pyes_on is True:
return pyes.ExistsFilter(field_name)
return Q('exists', field=field_name)
def TermMatch(key, value):
if pyes_enabled.pyes_on is True:
return pyes.TermFilter(key, value)
return Q('match', **{key: value})
def TermsMatch(key, value):
if pyes_enabled.pyes_on is True:
return pyes.TermsFilter(key, value)
return Q('terms', **{key: value})
def WildcardMatch(key, value):
if pyes_enabled.pyes_on is True:
return pyes.QueryFilter(pyes.WildcardQuery(key, value))
return Q('wildcard', **{key: value})
def PhraseMatch(key, value):
if pyes_enabled.pyes_on is True:
return pyes.QueryFilter(pyes.MatchQuery(key, value, 'phrase'))
return Q('match_phrase', **{key: value})
def BooleanMatch(must=[], must_not=[], should=[]):
if pyes_enabled.pyes_on is True:
return pyes.BoolFilter(must=must, should=should, must_not=must_not)
return Q('bool', must=must, must_not=must_not, should=should)
def MissingMatch(field_name):
if pyes_enabled.pyes_on is True:
return pyes.filters.MissingFilter(field_name)
return Q('missing', field=field_name)
def RangeMatch(field_name, from_value, to_value):
if pyes_enabled.pyes_on is True:
return pyes.RangeQuery(qrange=pyes.ESRange(field_name, from_value=from_value, to_value=to_value))
return Q('range', **{field_name: {'gte': from_value, 'lte': to_value}})
def QueryStringMatch(query_str):
if pyes_enabled.pyes_on is True:
return pyes.QueryFilter(pyes.QueryStringQuery(query_str))
return Q('query_string', query=query_str)
def Aggregation(field_name, aggregation_size=20):
if pyes_enabled.pyes_on is True:
return field_name, aggregation_size
return A('terms', field=field_name, size=aggregation_size)
def AggregatedResults(input_results):
if pyes_enabled.pyes_on is True:
converted_results = {
'meta': {
'timed_out': input_results.timed_out
},
'hits': [],
'aggregations': {}
converted_results = {
'meta': {
'timed_out': input_results.timed_out
},
'hits': [],
'aggregations': {}
}
for hit in input_results.hits:
hit_dict = {
'_id': hit.meta.id,
'_type': hit.meta.doc_type,
'_index': hit.meta.index,
'_score': hit.meta.score,
'_source': hit.to_dict()
}
for hit in input_results.hits.hits:
hit_dict = {
'_id': unicode(hit['_id']),
'_type': hit['_type'],
'_index': hit['_index'],
'_score': hit['_score'],
'_source': hit['_source'],
}
converted_results['hits'].append(hit_dict)
converted_results['hits'].append(hit_dict)
for facet_name, facet_value in input_results.facets.iteritems():
aggregation_dict = {
'terms': []
}
for term in facet_value.terms:
aggregation_dict['terms'].append({'count': term.count, 'key': term.term})
converted_results['aggregations'][facet_name] = aggregation_dict
else:
converted_results = {
'meta': {
'timed_out': input_results.timed_out
},
'hits': [],
'aggregations': {}
for agg_name, aggregation in input_results.aggregations.to_dict().iteritems():
aggregation_dict = {
'terms': []
}
for hit in input_results.hits:
hit_dict = {
'_id': hit.meta.id,
'_type': hit.meta.doc_type,
'_index': hit.meta.index,
'_score': hit.meta.score,
'_source': hit.to_dict()
}
converted_results['hits'].append(hit_dict)
for bucket in aggregation['buckets']:
aggregation_dict['terms'].append({'count': bucket['doc_count'], 'key': bucket['key']})
for agg_name, aggregation in input_results.aggregations.to_dict().iteritems():
aggregation_dict = {
'terms': []
}
for bucket in aggregation['buckets']:
aggregation_dict['terms'].append({'count': bucket['doc_count'], 'key': bucket['key']})
converted_results['aggregations'][agg_name] = aggregation_dict
converted_results['aggregations'][agg_name] = aggregation_dict
return converted_results
def SimpleResults(input_results):
if pyes_enabled.pyes_on is True:
converted_results = {
'meta': {
'timed_out': input_results.timed_out
},
'hits': []
converted_results = {
'meta': {
'timed_out': input_results.timed_out,
},
'hits': []
}
for hit in input_results.hits:
hit_dict = {
'_id': hit.meta.id,
'_type': hit.meta.doc_type,
'_index': hit.meta.index,
'_score': hit.meta.score,
'_source': hit.to_dict()
}
for hit in input_results.hits.hits:
hit_dict = {
'_id': unicode(hit['_id']),
'_type': hit['_type'],
'_index': hit['_index'],
'_score': hit['_score'],
'_source': hit['_source'],
}
converted_results['hits'].append(hit_dict)
else:
converted_results = {
'meta': {
'timed_out': input_results.timed_out,
},
'hits': []
}
for hit in input_results.hits:
hit_dict = {
'_id': hit.meta.id,
'_type': hit.meta.doc_type,
'_index': hit.meta.index,
'_score': hit.meta.score,
'_source': hit.to_dict()
}
converted_results['hits'].append(hit_dict)
converted_results['hits'].append(hit_dict)
return converted_results
@ -207,12 +139,7 @@ class SearchQuery():
self.add_must(range_query)
search_query = None
if pyes_enabled.pyes_on is True:
search_query = pyes.ConstantScoreQuery(pyes.MatchAllQuery())
search_query.filters.append(BooleanMatch(must=self.must, should=self.should, must_not=self.must_not))
else:
search_query = BooleanMatch(
must=self.must, must_not=self.must_not, should=self.should)
search_query = BooleanMatch(must=self.must, must_not=self.must_not, should=self.should)
results = []
if len(self.aggregation) == 0:

Просмотреть файл

@ -6,10 +6,6 @@ import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib"))
from query_models import RangeMatch
# The weird thing here is that if events that have the exact same utctimestamp
# will not show up in either must or must_not. Updating to elasticsearch_dsl, we're
# going to use gte and lte, so inclusive, compared to exclusion in pyes.
class TestRangeMatchPositiveTestSuite(PositiveTestSuite):
def query_tests(self):
@ -17,12 +13,11 @@ class TestRangeMatchPositiveTestSuite(PositiveTestSuite):
end_date = "2016-08-13T21:07:12.316450+00:00"
tests = {
RangeMatch('utctimestamp', begin_date, end_date): [
# {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'}, # uncomment when switched from pyes
{'utctimestamp': '2016-08-12T21:07:12.316450+00:00'},
{'utctimestamp': '2016-08-12T21:07:13.316450+00:00'},
{'utctimestamp': '2016-08-12T23:04:12.316450+00:00'},
{'utctimestamp': '2016-08-13T21:07:11.316450+00:00'},
# {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'}, # uncomment when switched from pyes
# this is because we are now including results that have the same value as either from or to
{'utctimestamp': '2016-08-13T21:07:12.316450+00:00'},
],
}
return tests
@ -35,8 +30,8 @@ class TestTermMatchNegativeTestSuite(NegativeTestSuite):
tests = {
RangeMatch('utctimestamp', begin_date, end_date): [
{'utctimestamp': '2016-08-12T21:07:11.316450+00:00'},
# {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'},
# {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'},
{'utctimestamp': '2016-08-12T21:07:12.316450+00:00'},
{'utctimestamp': '2016-08-13T21:07:12.316450+00:00'},
{'utctimestamp': '2016-08-13T21:07:13.316450+00:00'},
],
}

Просмотреть файл

@ -15,13 +15,6 @@ import time
from elasticsearch_client import ElasticsearchClient, ElasticsearchInvalidIndex
import pytest
# Remove this code when pyes is gone!
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib"))
import pyes_enabled
# Remove this code when pyes is gone!
class ElasticsearchClientTest(UnitTestSuite):
def setup(self):
@ -54,11 +47,6 @@ class MockTransportClass:
"/events%2Cevents-previous/_search"
]
def _send_request(self, method, path, body=None, params=None, headers=None, raw=False, return_response=False):
if path not in self.exclude_paths:
self.request_counts += 1
return self.original_function(method, path, body, params)
def backup_function(self, orig_function):
self.original_function = orig_function
@ -135,13 +123,8 @@ class TestSimpleWrites(ElasticsearchClientTest):
def test_simple_writing(self):
mock_class = MockTransportClass()
if pyes_enabled.pyes_on is True:
mock_class.backup_function(self.es_client.es_connection._send_request)
self.es_client.es_connection._send_request = mock_class._send_request
else:
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
event_length = 10000
events = []
@ -162,13 +145,8 @@ class BulkTest(ElasticsearchClientTest):
def setup(self):
super(BulkTest, self).setup()
self.mock_class = MockTransportClass()
if pyes_enabled.pyes_on is True:
self.mock_class.backup_function(self.es_client.es_connection._send_request)
self.es_client.es_connection._send_request = self.mock_class._send_request
else:
self.mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
self.es_client.es_connection.transport.perform_request = self.mock_class.perform_request
self.mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
self.es_client.es_connection.transport.perform_request = self.mock_class.perform_request
def teardown(self):
super(BulkTest, self).teardown()
@ -275,18 +253,12 @@ class TestClusterHealth(ElasticsearchClientTest):
assert health_keys == ['active_primary_shards', 'active_shards', 'cluster_name', 'initializing_shards', 'number_of_data_nodes', 'number_of_nodes', 'relocating_shards', 'status', 'timed_out', 'unassigned_shards']
assert type(health_results['active_primary_shards']) is int
assert type(health_results['active_shards']) is int
if pyes_enabled.pyes_on is True:
assert type(health_results['cluster_name']) is str
else:
assert type(health_results['cluster_name']) is unicode
assert type(health_results['cluster_name']) is unicode
assert type(health_results['initializing_shards']) is int
assert type(health_results['number_of_data_nodes']) is int
assert type(health_results['number_of_nodes']) is int
assert type(health_results['relocating_shards']) is int
if pyes_enabled.pyes_on is True:
assert type(health_results['status']) is str
else:
assert type(health_results['status']) is unicode
assert type(health_results['status']) is unicode
assert type(health_results['timed_out']) is bool
assert type(health_results['unassigned_shards']) is int
@ -325,44 +297,41 @@ class TestCreatingAlias(ElasticsearchClientTest):
assert 'index2' in indices
if pyes_enabled.pyes_on is not True:
# Instead of trying to figure out how to update mappings via pyes, I decided
# to just skip this unit test since we'll be ripping it out soon
class TestBulkInvalidFormatProblem(BulkTest):
class TestBulkInvalidFormatProblem(BulkTest):
def setup(self):
super(TestBulkInvalidFormatProblem, self).setup()
def setup(self):
super(TestBulkInvalidFormatProblem, self).setup()
mapping = {
"mappings": {
"event": {
"properties": {
"utcstamp": {
"type": "date",
"format": "dateOptionalTime"
}
mapping = {
"mappings": {
"event": {
"properties": {
"utcstamp": {
"type": "date",
"format": "dateOptionalTime"
}
}
}
}
}
# Recreate the test indexes with a custom mapping to throw
# parsing errors
self.es_client.delete_index("events", True)
self.es_client.delete_index(self.event_index_name, True)
self.es_client.create_index(self.event_index_name, mapping=mapping)
self.es_client.create_alias('events', self.event_index_name)
self.es_client.create_alias('events-previous', self.event_index_name)
# Recreate the test indexes with a custom mapping to throw
# parsing errors
self.es_client.delete_index("events", True)
self.es_client.delete_index(self.event_index_name, True)
self.es_client.create_index(self.event_index_name, mapping=mapping)
self.es_client.create_alias('events', self.event_index_name)
self.es_client.create_alias('events-previous', self.event_index_name)
def test_bulk_problems(self):
event = {
"utcstamp": "2016-11-08T14:13:01.250631+00:00"
}
malformed_event = {
"utcstamp": "abc",
}
def test_bulk_problems(self):
event = {
"utcstamp": "2016-11-08T14:13:01.250631+00:00"
}
malformed_event = {
"utcstamp": "abc",
}
self.es_client.save_object(index='events', doc_type='event', body=event, bulk=True)
self.es_client.save_object(index='events', doc_type='event', body=malformed_event, bulk=True)
time.sleep(5)
assert self.get_num_events() == 1
self.es_client.save_object(index='events', doc_type='event', body=event, bulk=True)
self.es_client.save_object(index='events', doc_type='event', body=malformed_event, bulk=True)
time.sleep(5)
assert self.get_num_events() == 1