diff --git a/lib/elasticsearch_client.py b/lib/elasticsearch_client.py index dca5ff23..59099248 100644 --- a/lib/elasticsearch_client.py +++ b/lib/elasticsearch_client.py @@ -3,21 +3,10 @@ from query_models import SearchQuery, TermMatch, AggregatedResults, SimpleResult import json import logging - -# Remove this code when pyes is gone! -import os -import sys -sys.path.append(os.path.join(os.path.dirname(__file__), "../alerts/lib")) -import pyes_enabled -# Remove this code when pyes is gone! - -if pyes_enabled.pyes_on is True: - import pyes -else: - from elasticsearch import Elasticsearch - from elasticsearch_dsl import Search - from elasticsearch.exceptions import NotFoundError - from elasticsearch.helpers import bulk, BulkIndexError +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search +from elasticsearch.exceptions import NotFoundError +from elasticsearch.helpers import bulk, BulkIndexError from bulk_queue import BulkQueue @@ -47,118 +36,65 @@ class ElasticsearchInvalidIndex(Exception): class ElasticsearchClient(): def __init__(self, servers, bulk_amount=100, bulk_refresh_time=30): - if pyes_enabled.pyes_on is True: - # ES v1 - self.es_connection = pyes.ES(servers, bulk_size=bulk_amount) - else: - # ES v2 and up - self.es_connection = Elasticsearch(servers) - self.es_connection.ping() + self.es_connection = Elasticsearch(servers) + self.es_connection.ping() self.bulk_queue = BulkQueue(self, threshold=bulk_amount, flush_time=bulk_refresh_time) def delete_index(self, index_name, ignore_fail=False): - if pyes_enabled.pyes_on is True: - if ignore_fail is True: - self.es_connection.indices.delete_index_if_exists(index_name) - else: - self.es_connection.indices.delete_index(index_name) - else: - ignore_codes = [] - if ignore_fail is True: - ignore_codes = [400, 404] + ignore_codes = [] + if ignore_fail is True: + ignore_codes = [400, 404] - self.es_connection.indices.delete(index=index_name, ignore=ignore_codes) + self.es_connection.indices.delete(index=index_name, ignore=ignore_codes) def get_indices(self): - if pyes_enabled.pyes_on is True: - return self.es_connection.indices.stats()['indices'].keys() - else: - return self.es_connection.indices.stats()['indices'].keys() + return self.es_connection.indices.stats()['indices'].keys() def create_index(self, index_name, ignore_fail=False, mapping=None): - if pyes_enabled.pyes_on is True: - self.es_connection.indices.create_index(index_name) - else: - if not mapping: - mapping = ''' - { - "mappings":{} - }''' - self.es_connection.indices.create(index=index_name, update_all_types='true', body=mapping) + if not mapping: + mapping = ''' + { + "mappings":{} + }''' + self.es_connection.indices.create(index=index_name, update_all_types='true', body=mapping) def create_alias(self, alias_name, index_name): - if pyes_enabled.pyes_on is True: - self.es_connection.indices.set_alias(alias_name, index_name) - else: - if self.es_connection.indices.exists_alias(index='*', name=alias_name): - self.es_connection.indices.delete_alias(index='*', name=alias_name) + if self.es_connection.indices.exists_alias(index='*', name=alias_name): + self.es_connection.indices.delete_alias(index='*', name=alias_name) - self.es_connection.indices.put_alias(index=index_name, name=alias_name) + self.es_connection.indices.put_alias(index=index_name, name=alias_name) def get_alias(self, alias_name): - if pyes_enabled.pyes_on is True: - return self.es_connection.indices.get_alias(alias_name) - else: - return self.es_connection.indices.get_alias(index='*', name=alias_name).keys() + return self.es_connection.indices.get_alias(index='*', name=alias_name).keys() def flush(self, index_name): - if pyes_enabled.pyes_on is True: - self.es_connection.indices.flush() - else: - self.es_connection.indices.flush(index=index_name) + self.es_connection.indices.flush(index=index_name) def search(self, search_query, indices, size): results = [] - if pyes_enabled.pyes_on is True: - try: - esresults = self.es_connection.search(search_query, size=size, indices=','.join(map(str, indices))) - results = esresults._search_raw() - except pyes.exceptions.IndexMissingException: - raise ElasticsearchInvalidIndex(indices) - else: - try: - results = Search(using=self.es_connection, index=indices).params(size=size).filter(search_query).execute() - except NotFoundError: - raise ElasticsearchInvalidIndex(indices) + try: + results = Search(using=self.es_connection, index=indices).params(size=size).filter(search_query).execute() + except NotFoundError: + raise ElasticsearchInvalidIndex(indices) result_set = SimpleResults(results) return result_set def aggregated_search(self, search_query, indices, aggregations, size): - if pyes_enabled.pyes_on is True: - query = search_query.search() - for field_name, aggregation_size in aggregations: - query.facet.add_term_facet(field_name, size=aggregation_size) - - esresults = self.es_connection.search(query, size=size, indices=','.join(map(str, indices))) - results = esresults._search_raw() - else: - search_obj = Search(using=self.es_connection, index=indices).params(size=size) - query_obj = search_obj.filter(search_query) - for aggregation in aggregations: - query_obj.aggs.bucket(name=aggregation.to_dict()['terms']['field'], agg_type=aggregation) - results = query_obj.execute() + search_obj = Search(using=self.es_connection, index=indices).params(size=size) + query_obj = search_obj.filter(search_query) + for aggregation in aggregations: + query_obj.aggs.bucket(name=aggregation.to_dict()['terms']['field'], agg_type=aggregation) + results = query_obj.execute() result_set = AggregatedResults(results) return result_set def save_documents(self, documents): - if pyes_enabled.pyes_on is True: - for document in documents: - try: - self.es_connection.index(index=document['_index'], doc_type=document['_type'], doc=document, id=document['_id'], bulk=True) - except pyes.exceptions.NoServerAvailable: - raise ElasticsearchBadServer() - except pyes.exceptions.InvalidIndexNameException: - raise ElasticsearchInvalidIndex(document['_index']) - except pyes.exceptions.ElasticSearchException as e: - raise ElasticsearchException(e.message) - self.es_connection.flush_bulk(True) - else: - try: - bulk(self.es_connection, documents) - except BulkIndexError as e: - logger.error("Error bulk indexing: " + str(e)) + try: + bulk(self.es_connection, documents) + except BulkIndexError as e: + logger.error("Error bulk indexing: " + str(e)) def start_bulk_timer(self): if not self.bulk_queue.started(): @@ -179,20 +115,10 @@ class ElasticsearchClient(): if bulk: self.bulk_save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id) else: - if pyes_enabled.pyes_on is True: - try: - return self.es_connection.index(index=index, doc_type=doc_type, doc=body, id=doc_id) - except pyes.exceptions.NoServerAvailable: - raise ElasticsearchBadServer() - except pyes.exceptions.InvalidIndexNameException: - raise ElasticsearchInvalidIndex(index) - except pyes.exceptions.ElasticSearchException as e: - raise ElasticsearchException(e.message) - else: - doc_body = body - if '_source' in body: - doc_body = body['_source'] - return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=doc_body) + doc_body = body + if '_source' in body: + doc_body = body['_source'] + return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=doc_body) def save_alert(self, body, index='alerts', doc_type='alert', doc_id=None, bulk=False): return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk) @@ -230,23 +156,16 @@ class ElasticsearchClient(): "dashboard": json.dumps(dashboardjson) } - if pyes_enabled.pyes_on is True: - return self.es_connection.index(index='kibana-int', doc_type='dashboard', doc=dashboarddata) - else: - return self.es_connection.index(index='.kibana', doc_type='dashboard', body=dashboarddata) + return self.es_connection.index(index='.kibana', doc_type='dashboard', body=dashboarddata) def get_cluster_health(self): - if pyes_enabled.pyes_on is True: - escluster = pyes.managers.Cluster(self.es_connection) - return escluster.health() - else: - health_dict = self.es_connection.cluster.health() - # To line up with the health stats from ES1, we're - # removing certain keys - health_dict.pop('active_shards_percent_as_number', None) - health_dict.pop('delayed_unassigned_shards', None) - health_dict.pop('number_of_in_flight_fetch', None) - health_dict.pop('number_of_pending_tasks', None) - health_dict.pop('task_max_waiting_in_queue_millis', None) + health_dict = self.es_connection.cluster.health() + # To line up with the health stats from ES1, we're + # removing certain keys + health_dict.pop('active_shards_percent_as_number', None) + health_dict.pop('delayed_unassigned_shards', None) + health_dict.pop('number_of_in_flight_fetch', None) + health_dict.pop('number_of_pending_tasks', None) + health_dict.pop('task_max_waiting_in_queue_millis', None) - return health_dict + return health_dict diff --git a/lib/pyes_enabled.py b/lib/pyes_enabled.py deleted file mode 100644 index 7276bb9a..00000000 --- a/lib/pyes_enabled.py +++ /dev/null @@ -1,13 +0,0 @@ -import os - -global pyes_on -pyes_off = os.environ.get('DSL') - -# We're gonna short circuit and turn off pyes -pyes_off = 'True' -if pyes_off == 'True': - pyes_on = False - # print "\nUsing Elasticsearch DSL\n" -else: - pyes_on = True - print "\nUsing PYES\n" \ No newline at end of file diff --git a/lib/query_models.py b/lib/query_models.py index cfc6e812..e3bfe0aa 100644 --- a/lib/query_models.py +++ b/lib/query_models.py @@ -1,9 +1,4 @@ -import pyes_enabled - -if pyes_enabled.pyes_on is True: - import pyes -else: - from elasticsearch_dsl import Q, Search, A +from elasticsearch_dsl import Q, A from utilities.toUTC import toUTC @@ -12,155 +7,92 @@ from datetime import timedelta def ExistsMatch(field_name): - if pyes_enabled.pyes_on is True: - return pyes.ExistsFilter(field_name) return Q('exists', field=field_name) def TermMatch(key, value): - if pyes_enabled.pyes_on is True: - return pyes.TermFilter(key, value) return Q('match', **{key: value}) def TermsMatch(key, value): - if pyes_enabled.pyes_on is True: - return pyes.TermsFilter(key, value) return Q('terms', **{key: value}) def WildcardMatch(key, value): - if pyes_enabled.pyes_on is True: - return pyes.QueryFilter(pyes.WildcardQuery(key, value)) return Q('wildcard', **{key: value}) def PhraseMatch(key, value): - if pyes_enabled.pyes_on is True: - return pyes.QueryFilter(pyes.MatchQuery(key, value, 'phrase')) return Q('match_phrase', **{key: value}) def BooleanMatch(must=[], must_not=[], should=[]): - if pyes_enabled.pyes_on is True: - return pyes.BoolFilter(must=must, should=should, must_not=must_not) return Q('bool', must=must, must_not=must_not, should=should) def MissingMatch(field_name): - if pyes_enabled.pyes_on is True: - return pyes.filters.MissingFilter(field_name) return Q('missing', field=field_name) def RangeMatch(field_name, from_value, to_value): - if pyes_enabled.pyes_on is True: - return pyes.RangeQuery(qrange=pyes.ESRange(field_name, from_value=from_value, to_value=to_value)) return Q('range', **{field_name: {'gte': from_value, 'lte': to_value}}) def QueryStringMatch(query_str): - if pyes_enabled.pyes_on is True: - return pyes.QueryFilter(pyes.QueryStringQuery(query_str)) return Q('query_string', query=query_str) def Aggregation(field_name, aggregation_size=20): - if pyes_enabled.pyes_on is True: - return field_name, aggregation_size return A('terms', field=field_name, size=aggregation_size) def AggregatedResults(input_results): - if pyes_enabled.pyes_on is True: - converted_results = { - 'meta': { - 'timed_out': input_results.timed_out - }, - 'hits': [], - 'aggregations': {} + converted_results = { + 'meta': { + 'timed_out': input_results.timed_out + }, + 'hits': [], + 'aggregations': {} + } + for hit in input_results.hits: + hit_dict = { + '_id': hit.meta.id, + '_type': hit.meta.doc_type, + '_index': hit.meta.index, + '_score': hit.meta.score, + '_source': hit.to_dict() } - for hit in input_results.hits.hits: - hit_dict = { - '_id': unicode(hit['_id']), - '_type': hit['_type'], - '_index': hit['_index'], - '_score': hit['_score'], - '_source': hit['_source'], - } - converted_results['hits'].append(hit_dict) + converted_results['hits'].append(hit_dict) - for facet_name, facet_value in input_results.facets.iteritems(): - aggregation_dict = { - 'terms': [] - } - for term in facet_value.terms: - aggregation_dict['terms'].append({'count': term.count, 'key': term.term}) - converted_results['aggregations'][facet_name] = aggregation_dict - else: - converted_results = { - 'meta': { - 'timed_out': input_results.timed_out - }, - 'hits': [], - 'aggregations': {} + for agg_name, aggregation in input_results.aggregations.to_dict().iteritems(): + aggregation_dict = { + 'terms': [] } - for hit in input_results.hits: - hit_dict = { - '_id': hit.meta.id, - '_type': hit.meta.doc_type, - '_index': hit.meta.index, - '_score': hit.meta.score, - '_source': hit.to_dict() - } - converted_results['hits'].append(hit_dict) + for bucket in aggregation['buckets']: + aggregation_dict['terms'].append({'count': bucket['doc_count'], 'key': bucket['key']}) - for agg_name, aggregation in input_results.aggregations.to_dict().iteritems(): - aggregation_dict = { - 'terms': [] - } - for bucket in aggregation['buckets']: - aggregation_dict['terms'].append({'count': bucket['doc_count'], 'key': bucket['key']}) - - converted_results['aggregations'][agg_name] = aggregation_dict + converted_results['aggregations'][agg_name] = aggregation_dict return converted_results def SimpleResults(input_results): - if pyes_enabled.pyes_on is True: - converted_results = { - 'meta': { - 'timed_out': input_results.timed_out - }, - 'hits': [] + converted_results = { + 'meta': { + 'timed_out': input_results.timed_out, + }, + 'hits': [] + } + for hit in input_results.hits: + hit_dict = { + '_id': hit.meta.id, + '_type': hit.meta.doc_type, + '_index': hit.meta.index, + '_score': hit.meta.score, + '_source': hit.to_dict() } - for hit in input_results.hits.hits: - hit_dict = { - '_id': unicode(hit['_id']), - '_type': hit['_type'], - '_index': hit['_index'], - '_score': hit['_score'], - '_source': hit['_source'], - } - converted_results['hits'].append(hit_dict) - else: - converted_results = { - 'meta': { - 'timed_out': input_results.timed_out, - }, - 'hits': [] - } - for hit in input_results.hits: - hit_dict = { - '_id': hit.meta.id, - '_type': hit.meta.doc_type, - '_index': hit.meta.index, - '_score': hit.meta.score, - '_source': hit.to_dict() - } - converted_results['hits'].append(hit_dict) + converted_results['hits'].append(hit_dict) return converted_results @@ -207,12 +139,7 @@ class SearchQuery(): self.add_must(range_query) search_query = None - if pyes_enabled.pyes_on is True: - search_query = pyes.ConstantScoreQuery(pyes.MatchAllQuery()) - search_query.filters.append(BooleanMatch(must=self.must, should=self.should, must_not=self.must_not)) - else: - search_query = BooleanMatch( - must=self.must, must_not=self.must_not, should=self.should) + search_query = BooleanMatch(must=self.must, must_not=self.must_not, should=self.should) results = [] if len(self.aggregation) == 0: diff --git a/tests/lib/query_models/test_range_match.py b/tests/lib/query_models/test_range_match.py index 02e9c929..b760f6a4 100644 --- a/tests/lib/query_models/test_range_match.py +++ b/tests/lib/query_models/test_range_match.py @@ -6,10 +6,6 @@ import sys sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib")) from query_models import RangeMatch -# The weird thing here is that if events that have the exact same utctimestamp -# will not show up in either must or must_not. Updating to elasticsearch_dsl, we're -# going to use gte and lte, so inclusive, compared to exclusion in pyes. - class TestRangeMatchPositiveTestSuite(PositiveTestSuite): def query_tests(self): @@ -17,12 +13,11 @@ class TestRangeMatchPositiveTestSuite(PositiveTestSuite): end_date = "2016-08-13T21:07:12.316450+00:00" tests = { RangeMatch('utctimestamp', begin_date, end_date): [ - # {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'}, # uncomment when switched from pyes + {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'}, {'utctimestamp': '2016-08-12T21:07:13.316450+00:00'}, {'utctimestamp': '2016-08-12T23:04:12.316450+00:00'}, {'utctimestamp': '2016-08-13T21:07:11.316450+00:00'}, - # {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'}, # uncomment when switched from pyes - # this is because we are now including results that have the same value as either from or to + {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'}, ], } return tests @@ -35,8 +30,8 @@ class TestTermMatchNegativeTestSuite(NegativeTestSuite): tests = { RangeMatch('utctimestamp', begin_date, end_date): [ {'utctimestamp': '2016-08-12T21:07:11.316450+00:00'}, - # {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'}, - # {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'}, + {'utctimestamp': '2016-08-12T21:07:12.316450+00:00'}, + {'utctimestamp': '2016-08-13T21:07:12.316450+00:00'}, {'utctimestamp': '2016-08-13T21:07:13.316450+00:00'}, ], } diff --git a/tests/lib/test_elasticsearch_client.py b/tests/lib/test_elasticsearch_client.py index 69dc0a74..ff45ce83 100644 --- a/tests/lib/test_elasticsearch_client.py +++ b/tests/lib/test_elasticsearch_client.py @@ -15,13 +15,6 @@ import time from elasticsearch_client import ElasticsearchClient, ElasticsearchInvalidIndex import pytest -# Remove this code when pyes is gone! -import os -import sys -sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib")) -import pyes_enabled -# Remove this code when pyes is gone! - class ElasticsearchClientTest(UnitTestSuite): def setup(self): @@ -54,11 +47,6 @@ class MockTransportClass: "/events%2Cevents-previous/_search" ] - def _send_request(self, method, path, body=None, params=None, headers=None, raw=False, return_response=False): - if path not in self.exclude_paths: - self.request_counts += 1 - return self.original_function(method, path, body, params) - def backup_function(self, orig_function): self.original_function = orig_function @@ -135,13 +123,8 @@ class TestSimpleWrites(ElasticsearchClientTest): def test_simple_writing(self): mock_class = MockTransportClass() - - if pyes_enabled.pyes_on is True: - mock_class.backup_function(self.es_client.es_connection._send_request) - self.es_client.es_connection._send_request = mock_class._send_request - else: - mock_class.backup_function(self.es_client.es_connection.transport.perform_request) - self.es_client.es_connection.transport.perform_request = mock_class.perform_request + mock_class.backup_function(self.es_client.es_connection.transport.perform_request) + self.es_client.es_connection.transport.perform_request = mock_class.perform_request event_length = 10000 events = [] @@ -162,13 +145,8 @@ class BulkTest(ElasticsearchClientTest): def setup(self): super(BulkTest, self).setup() self.mock_class = MockTransportClass() - - if pyes_enabled.pyes_on is True: - self.mock_class.backup_function(self.es_client.es_connection._send_request) - self.es_client.es_connection._send_request = self.mock_class._send_request - else: - self.mock_class.backup_function(self.es_client.es_connection.transport.perform_request) - self.es_client.es_connection.transport.perform_request = self.mock_class.perform_request + self.mock_class.backup_function(self.es_client.es_connection.transport.perform_request) + self.es_client.es_connection.transport.perform_request = self.mock_class.perform_request def teardown(self): super(BulkTest, self).teardown() @@ -275,18 +253,12 @@ class TestClusterHealth(ElasticsearchClientTest): assert health_keys == ['active_primary_shards', 'active_shards', 'cluster_name', 'initializing_shards', 'number_of_data_nodes', 'number_of_nodes', 'relocating_shards', 'status', 'timed_out', 'unassigned_shards'] assert type(health_results['active_primary_shards']) is int assert type(health_results['active_shards']) is int - if pyes_enabled.pyes_on is True: - assert type(health_results['cluster_name']) is str - else: - assert type(health_results['cluster_name']) is unicode + assert type(health_results['cluster_name']) is unicode assert type(health_results['initializing_shards']) is int assert type(health_results['number_of_data_nodes']) is int assert type(health_results['number_of_nodes']) is int assert type(health_results['relocating_shards']) is int - if pyes_enabled.pyes_on is True: - assert type(health_results['status']) is str - else: - assert type(health_results['status']) is unicode + assert type(health_results['status']) is unicode assert type(health_results['timed_out']) is bool assert type(health_results['unassigned_shards']) is int @@ -325,44 +297,41 @@ class TestCreatingAlias(ElasticsearchClientTest): assert 'index2' in indices -if pyes_enabled.pyes_on is not True: - # Instead of trying to figure out how to update mappings via pyes, I decided - # to just skip this unit test since we'll be ripping it out soon - class TestBulkInvalidFormatProblem(BulkTest): +class TestBulkInvalidFormatProblem(BulkTest): - def setup(self): - super(TestBulkInvalidFormatProblem, self).setup() + def setup(self): + super(TestBulkInvalidFormatProblem, self).setup() - mapping = { - "mappings": { - "event": { - "properties": { - "utcstamp": { - "type": "date", - "format": "dateOptionalTime" - } + mapping = { + "mappings": { + "event": { + "properties": { + "utcstamp": { + "type": "date", + "format": "dateOptionalTime" } } } } + } - # Recreate the test indexes with a custom mapping to throw - # parsing errors - self.es_client.delete_index("events", True) - self.es_client.delete_index(self.event_index_name, True) - self.es_client.create_index(self.event_index_name, mapping=mapping) - self.es_client.create_alias('events', self.event_index_name) - self.es_client.create_alias('events-previous', self.event_index_name) + # Recreate the test indexes with a custom mapping to throw + # parsing errors + self.es_client.delete_index("events", True) + self.es_client.delete_index(self.event_index_name, True) + self.es_client.create_index(self.event_index_name, mapping=mapping) + self.es_client.create_alias('events', self.event_index_name) + self.es_client.create_alias('events-previous', self.event_index_name) - def test_bulk_problems(self): - event = { - "utcstamp": "2016-11-08T14:13:01.250631+00:00" - } - malformed_event = { - "utcstamp": "abc", - } + def test_bulk_problems(self): + event = { + "utcstamp": "2016-11-08T14:13:01.250631+00:00" + } + malformed_event = { + "utcstamp": "abc", + } - self.es_client.save_object(index='events', doc_type='event', body=event, bulk=True) - self.es_client.save_object(index='events', doc_type='event', body=malformed_event, bulk=True) - time.sleep(5) - assert self.get_num_events() == 1 + self.es_client.save_object(index='events', doc_type='event', body=event, bulk=True) + self.es_client.save_object(index='events', doc_type='event', body=malformed_event, bulk=True) + time.sleep(5) + assert self.get_num_events() == 1