diff --git a/mq/esworker_papertrail.conf b/mq/esworker_papertrail.conf index e3afcc1d..3d5a379e 100644 --- a/mq/esworker_papertrail.conf +++ b/mq/esworker_papertrail.conf @@ -5,4 +5,3 @@ esservers=http://localhost:9200 papertrailinterval=60 papertrailbackoff=300 papertrailaccount= -papertrailmaxevents=2000 diff --git a/mq/esworker_papertrail.py b/mq/esworker_papertrail.py index caed85c3..4e8158fa 100755 --- a/mq/esworker_papertrail.py +++ b/mq/esworker_papertrail.py @@ -78,7 +78,7 @@ class PTRequestor(object): if resp.status_code == 200: break else: - logger.debug("Received invalid status code: {0}: {1}".format(resp.status_code, resp.text)) + logger.error("Received invalid status code: {0}: {1}".format(resp.status_code, resp.text)) total_retries += 1 if total_retries < max_retries: logger.debug("Sleeping a bit then retrying") @@ -97,9 +97,6 @@ class PTRequestor(object): maxid = self.makerequest(query, stime, etime, maxid) if maxid is None: break - if len(self._events.keys()) > self._evmax: - logger.warning("papertrail esworker hitting event request limit") - break # cache event ids we return to allow for some duplicate filtering checks # during next run self._evidcache = list(self._events.keys()) @@ -235,10 +232,12 @@ class taskConsumer(object): def run(self): while True: try: - curRequestTime = toUTC(datetime.now()) - timedelta(seconds=options.ptbackoff) + curRequestTime = toUTC(datetime.now()) + logger.debug("Looking at {} : {}".format(self.lastRequestTime, curRequestTime)) records = self.ptrequestor.request(options.ptquery, self.lastRequestTime, curRequestTime) # update last request time for the next request self.lastRequestTime = curRequestTime + logger.debug("Num of events received: {}".format(len(records))) for msgid in records: msgdict = records[msgid] @@ -352,7 +351,7 @@ def main(): logger.info("started without uwsgi") # establish api interface with papertrail - ptRequestor = PTRequestor(options.ptapikey, evmax=options.ptquerymax) + ptRequestor = PTRequestor(options.ptapikey) # consume our queue taskConsumer(ptRequestor, es).run() @@ -372,7 +371,6 @@ def initConfig(): options.ptquery = getConfig("papertrailquery", "", options.configfile) options.ptbackoff = getConfig("papertrailbackoff", 300, options.configfile) options.ptacctname = getConfig("papertrailaccount", "unset", options.configfile) - options.ptquerymax = getConfig("papertrailmaxevents", 2000, options.configfile) # How long to sleep between polling options.sleep_time = getConfig("sleep_time", 60, options.configfile)