Skip to content

Commit

Permalink
Updated vectorscan implementation to use streaming mode instead of bl…
Browse files Browse the repository at this point in the history
…ock mode
  • Loading branch information
akenion committed Mar 22, 2024
1 parent 6247459 commit 173e16b
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 50 deletions.
23 changes: 17 additions & 6 deletions wordfence/scanning/matching/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,31 @@ def __init__(
self.timeout = timeout
self.match_all = match_all
self.prepared = False
self.prepared_thread = False
if not lazy:
self.prepare()

def get_cacheable(self) -> Optional[Cacheable]:
return None

def prepare(self) -> None:
if self.prepared:
return
self._prepare()
self.prepared = True
def prepare(self, thread: bool = False) -> None:
if not self.prepared:
self._prepare()
self.prepared = True
if thread:
self.prepare_thread()

def _prepare(self) -> None:
raise NotImplementedError()
pass

def prepare_thread(self) -> None:
if self.prepared_thread:
return
self._prepare_thread()
self.prepared_thread = True

def _prepare_thread(self) -> None:
pass

def create_workspace(self) -> Optional[MatchWorkspace]:
return MatchWorkspace()
Expand Down
29 changes: 20 additions & 9 deletions wordfence/scanning/matching/vectorscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
raise RuntimeError('Vectorscan is not available')


from ...util.vectorscan import VectorscanScanner, VectorscanMatch, \
from ...util.vectorscan import VectorscanStreamScanner, VectorscanMatch, \
VectorscanFlags, VectorscanDatabase, VectorscanScanTerminated, \
vectorscan_compile, vectorscan_deserialize
VectorscanMode, vectorscan_compile, vectorscan_deserialize


class VectorscanMatcherContext(BaseMatcherContext):
Expand All @@ -39,15 +39,18 @@ def process_chunk(
) -> bool:
self.matched = False
try:
self.matcher.scanner.scan(
chunk,
self._match_callback,
context=chunk
)
self.matcher.scanner.scan(chunk)
except VectorscanScanTerminated:
return True
return self.matched

def __enter__(self):
self.matcher.scanner.set_callback(self._match_callback)
return super().__enter__()

def __exit__(self, exc_type, exc_value, traceback) -> None:
self.matcher.scanner.reset()


class VectorscanCompiler(Compiler):

Expand All @@ -66,7 +69,11 @@ def compile(self, signature_set: SignatureSet) -> bytes:
VectorscanFlags.SINGLEMATCH |
VectorscanFlags.ALLOWEMPTY
)
database = vectorscan_compile(patterns, flags=flags)
database = vectorscan_compile(
patterns,
mode=VectorscanMode.STREAM,
flags=flags
)
log.debug('Successfully compiled vectorscan database')
return database

Expand Down Expand Up @@ -112,9 +119,13 @@ def _initialize_database(self) -> VectorscanDatabase:
def _prepare(self) -> None:
log.debug('Preparing vectorscan matcher...')
self.database = self._initialize_database()
self.scanner = VectorscanScanner(self.database)
log.debug('Successfully prepared vectorscan matcher')

def _prepare_thread(self) -> None:
log.debug('Preparding thread-specific vectorscan scanner...')
self.scanner = VectorscanStreamScanner(self.database)
log.debug('Successfully prepared vectorscan scanner')

def create_context(self) -> VectorscanMatcherContext:
return VectorscanMatcherContext(
self
Expand Down
57 changes: 30 additions & 27 deletions wordfence/scanning/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,34 +354,37 @@ def _generate_name(self) -> str:
return 'worker-' + str(self.index)

def work(self):
self._working = True
self._matcher.prepare()
log.debug(f'Worker {self.index} started, PID:' + str(os.getpid()))
with self._matcher.create_workspace() as workspace:
while self._working:
try:
item = self._work_queue.get(timeout=QUEUE_READ_TIMEOUT)
if item is None:
self._put_event(ScanEventType.FILE_QUEUE_EMPTIED)
self._complete()
elif isinstance(item, ExceptionContainer):
if isinstance(item.exception, ScanningIoException):
self._put_io_error(item)
try:
self._working = True
self._matcher.prepare(thread=True)
log.debug(f'Worker {self.index} started, PID:' + str(os.getpid()))
with self._matcher.create_workspace() as workspace:
while self._working:
try:
item = self._work_queue.get(timeout=QUEUE_READ_TIMEOUT)
if item is None:
self._put_event(ScanEventType.FILE_QUEUE_EMPTIED)
self._complete()
elif isinstance(item, ExceptionContainer):
if isinstance(item.exception, ScanningIoException):
self._put_io_error(item)
else:
self._put_event(
ScanEventType.FATAL_EXCEPTION,
{'exception': item}
)
else:
self._put_event(
ScanEventType.FATAL_EXCEPTION,
{'exception': item}
)
else:
try:
self._process_file(item, workspace)
except OSError as error:
self._put_io_error(ExceptionContainer(error))
except Exception as error:
self._put_error(ExceptionContainer(error))
except queue.Empty:
if self._status.value == Status.PROCESSING_FILES:
self._complete()
try:
self._process_file(item, workspace)
except OSError as error:
self._put_io_error(ExceptionContainer(error))
except Exception as error:
self._put_error(ExceptionContainer(error))
except queue.Empty:
if self._status.value == Status.PROCESSING_FILES:
self._complete()
except Exception as error:
self._put_error(ExceptionContainer(error))

def _put_event(self, event_type: ScanEventType, data: dict = None) -> None:
if data is None:
Expand Down
Loading

0 comments on commit 173e16b

Please sign in to comment.