optimize OutputEventFilter for large stdout streams

update our event data search algorithm to be a bit lazier in event data
discovery; this drastically improves processing speeds for stdout >5MB

see: https://github.com/ansible/awx/issues/417
This commit is contained in:
Ryan Petrello
2018-01-16 10:52:12 -05:00
parent 2b1d2b2976
commit 51f7907a01
4 changed files with 71 additions and 10 deletions

View File

@@ -18,6 +18,7 @@ import contextlib
import tempfile
import six
import psutil
from StringIO import StringIO
# Decorator
from decorator import decorator
@@ -867,7 +868,8 @@ class OutputEventFilter(object):
self._event_ct = 0
self._counter = 1
self._start_line = 0
self._buffer = ''
self._buffer = StringIO()
self._last_chunk = ''
self._current_event_data = None
def flush(self):
@@ -878,9 +880,19 @@ class OutputEventFilter(object):
pass
def write(self, data):
self._buffer += data
while True:
match = self.EVENT_DATA_RE.search(self._buffer)
self._buffer.write(data)
# keep a sliding window of the last chunk written so we can detect
# event tokens and determine if we need to perform a search of the full
# buffer
should_search = '\x1b[K' in (self._last_chunk + data)
self._last_chunk = data
# Only bother searching the buffer if we recently saw a start/end
# token (\x1b[K)
while should_search:
value = self._buffer.getvalue()
match = self.EVENT_DATA_RE.search(value)
if not match:
break
try:
@@ -888,13 +900,17 @@ class OutputEventFilter(object):
event_data = json.loads(base64.b64decode(base64_data))
except ValueError:
event_data = {}
self._emit_event(self._buffer[:match.start()], event_data)
self._buffer = self._buffer[match.end():]
self._emit_event(value[:match.start()], event_data)
remainder = value[match.end():]
self._buffer = StringIO()
self._buffer.write(remainder)
self._last_chunk = remainder
def close(self):
if self._buffer:
self._emit_event(self._buffer)
self._buffer = ''
value = self._buffer.getvalue()
if value:
self._emit_event(value)
self._buffer = StringIO()
self._event_callback(dict(event='EOF'))
def _emit_event(self, buffered_stdout, next_event_data=None):