Add geomodel test case where incoming events are older than recorded state

This commit is contained in:
Brandon Myers 2019-09-30 13:47:30 -05:00
Родитель b765b6af23
Коммит de2f19a581
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 8AA79AD83045BBC7
1 изменённых файлов: 77 добавлений и 5 удалений

Просмотреть файл

@ -1014,16 +1014,15 @@ class TestDifferentIPsSameLocation(AlertTestSuite):
events=[default_event, another_event]),
]
@freeze_time('2017-01-01 01:00:00', tz_offset=0)
def setup(self):
super().setup()
index = 'localities'
if self.config_delete_indexes:
self.es_client.delete_index(index, True)
self.es_client.create_index(index)
journal = geomodel.wrap_journal(self.es_client)
def state(username, locs):
@ -1056,11 +1055,84 @@ class TestDifferentIPsSameLocation(AlertTestSuite):
}
)
])
journal(geomodel.Entry.new(test_state), index)
self.refresh(index)
def teardown(self):
if self.config_delete_indexes:
self.es_client.delete_index('localities', True)
super().teardown()
class TestAlreadyProcessedEvents(AlertTestSuite):
alert_filename = 'geomodel_location'
alert_classname = 'AlertGeoModel'
default_event = {
'_source': {
'details': {
'sourceipaddress': '1.2.3.4',
'username': 'tester1',
'sourceipgeolocation': {
'city': 'Portland',
'country_code': 'US',
'latitude': 45.5234,
'longitude': -122.6762,
},
},
'tags': ['auth0'],
}
}
events = [
AlertTestSuite.create_event(default_event),
]
events[0]['_source']['utctimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda({'minutes': 4})
events[0]['_source']['receivedtimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda({'minutes': 4})
test_cases = [
NegativeAlertTestCase(
description='Should not fire if encounters older events than state file',
events=events
)
]
@freeze_time('2017-01-01 01:00:00', tz_offset=0)
def setup(self):
super().setup()
index = 'localities'
if self.config_delete_indexes:
self.es_client.delete_index(index, True)
self.es_client.create_index(index)
journal = geomodel.wrap_journal(self.es_client)
def state(username, locs):
return geomodel.State('locality', username, locs)
def locality(cfg):
return geomodel.Locality(**cfg)
test_state = state('tester1', [
locality(
{
'sourceipaddress': '1.2.3.4',
'city': 'Portland',
'country': 'US',
'lastaction': toUTC(datetime.now()),
'latitude': 45.5234,
'longitude': -122.6762,
'radius': 50,
}
)
])
journal(geomodel.Entry.new(test_state), index)
self.refresh(index)
def teardown(self):
if self.config_delete_indexes:
self.es_client.delete_index('localities', True)
super().teardown()