зеркало из https://github.com/mozilla/MozDef.git
Improve papertrail mq worker (#1720)
This commit is contained in:
Родитель
1a0b5afb25
Коммит
2c4108fb24
|
@ -5,4 +5,3 @@ esservers=http://localhost:9200
|
|||
papertrailinterval=60
|
||||
papertrailbackoff=300
|
||||
papertrailaccount=<add_papertrailaccount>
|
||||
papertrailmaxevents=2000
|
||||
|
|
|
@ -78,7 +78,7 @@ class PTRequestor(object):
|
|||
if resp.status_code == 200:
|
||||
break
|
||||
else:
|
||||
logger.debug("Received invalid status code: {0}: {1}".format(resp.status_code, resp.text))
|
||||
logger.error("Received invalid status code: {0}: {1}".format(resp.status_code, resp.text))
|
||||
total_retries += 1
|
||||
if total_retries < max_retries:
|
||||
logger.debug("Sleeping a bit then retrying")
|
||||
|
@ -97,9 +97,6 @@ class PTRequestor(object):
|
|||
maxid = self.makerequest(query, stime, etime, maxid)
|
||||
if maxid is None:
|
||||
break
|
||||
if len(self._events.keys()) > self._evmax:
|
||||
logger.warning("papertrail esworker hitting event request limit")
|
||||
break
|
||||
# cache event ids we return to allow for some duplicate filtering checks
|
||||
# during next run
|
||||
self._evidcache = list(self._events.keys())
|
||||
|
@ -235,10 +232,12 @@ class taskConsumer(object):
|
|||
def run(self):
|
||||
while True:
|
||||
try:
|
||||
curRequestTime = toUTC(datetime.now()) - timedelta(seconds=options.ptbackoff)
|
||||
curRequestTime = toUTC(datetime.now())
|
||||
logger.debug("Looking at {} : {}".format(self.lastRequestTime, curRequestTime))
|
||||
records = self.ptrequestor.request(options.ptquery, self.lastRequestTime, curRequestTime)
|
||||
# update last request time for the next request
|
||||
self.lastRequestTime = curRequestTime
|
||||
logger.debug("Num of events received: {}".format(len(records)))
|
||||
for msgid in records:
|
||||
msgdict = records[msgid]
|
||||
|
||||
|
@ -352,7 +351,7 @@ def main():
|
|||
logger.info("started without uwsgi")
|
||||
|
||||
# establish api interface with papertrail
|
||||
ptRequestor = PTRequestor(options.ptapikey, evmax=options.ptquerymax)
|
||||
ptRequestor = PTRequestor(options.ptapikey)
|
||||
|
||||
# consume our queue
|
||||
taskConsumer(ptRequestor, es).run()
|
||||
|
@ -372,7 +371,6 @@ def initConfig():
|
|||
options.ptquery = getConfig("papertrailquery", "", options.configfile)
|
||||
options.ptbackoff = getConfig("papertrailbackoff", 300, options.configfile)
|
||||
options.ptacctname = getConfig("papertrailaccount", "unset", options.configfile)
|
||||
options.ptquerymax = getConfig("papertrailmaxevents", 2000, options.configfile)
|
||||
|
||||
# How long to sleep between polling
|
||||
options.sleep_time = getConfig("sleep_time", 60, options.configfile)
|
||||
|
|
Загрузка…
Ссылка в новой задаче