Skip to content

Commit

Permalink
Added support for using direct IO to read files when scanning for mal…
Browse files Browse the repository at this point in the history
…ware
  • Loading branch information
akenion committed Apr 4, 2024
1 parent 4567977 commit ed25217
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 8 deletions.
7 changes: 7 additions & 0 deletions wordfence/cli/malwarescan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@
"argument_type": "OPTION",
"default": None,
"hidden": True
},
"direct-io": {
"short_name": "D",
"description": "Use direct IO when opening files to avoid caching",
"context": "ALL",
"argument_type": "FLAG",
"default": False
}
}

Expand Down
3 changes: 2 additions & 1 deletion wordfence/cli/malwarescan/malwarescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def invoke(self) -> int:
logging_initializer=self.context.get_log_settings().apply,
match_engine=match_engine,
profile=self.config.profile,
profile_path=self.config.profile_path
profile_path=self.config.profile_path,
direct_io=self.config.direct_io
)
if io_manager.should_read_stdin():
options.path_source = io_manager.get_input_reader()
Expand Down
30 changes: 23 additions & 7 deletions wordfence/scanning/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from multiprocessing import Queue, Process, Value, get_start_method
from dataclasses import dataclass
from typing import Set, Optional, Callable, Dict, NamedTuple, Tuple, List, \
Union
Union, BinaryIO
from logging import Handler

from .exceptions import ScanningException, ScanningIoException
Expand All @@ -17,6 +17,7 @@
from ..util import timing
from ..util.io import StreamReader, is_symlink_loop, is_symlink_and_loop, \
get_all_parents, PathSet
from ..util.direct_io import DirectIoBuffer, DirectIoReader
from ..util.units import scale_byte_unit
from ..logging import log, remove_initial_handler, VERBOSE
from ..util.profiling import Profiler, ProfileEvent, EventTimer, \
Expand Down Expand Up @@ -163,7 +164,8 @@ class Options:
logging_initializer: Callable[[], None] = None
match_engine: MatchEngine = MatchEngine.get_default()
profile: bool = False,
profile_path: Optional[str] = None
profile_path: Optional[str] = None,
direct_io: bool = False


class Status(IntEnum):
Expand Down Expand Up @@ -390,7 +392,8 @@ def __init__(
use_log_events: bool = False,
allow_io_errors: bool = False,
logging_initializer: Callable[[], None] = None,
profile: bool = False
profile: bool = False,
direct_io: bool = False
):
self.index = index
self._status = status
Expand All @@ -404,13 +407,22 @@ def __init__(
self._allow_io_errors = allow_io_errors
self._logging_initializer = logging_initializer
self._profile = profile
self._opener = self._open_direct if direct_io else self._open
if direct_io:
self._direct_io_buffer = DirectIoBuffer(self._chunk_size)
self.complete = Value(c_bool, False)
self._timer = None
super().__init__(name=self._generate_name())

def _generate_name(self) -> str:
return 'worker-' + str(self.index)

def _open(self, path: str) -> BinaryIO:
return open(path, 'rb')

def _open_direct(self, path: str) -> DirectIoReader:
return DirectIoReader(path, self._direct_io_buffer)

def work(self):
self._timer = _event_timer(
self._profile,
Expand Down Expand Up @@ -496,7 +508,7 @@ def _get_next_chunk_size(self, length: int) -> int:
def _process_file(self, path: str, workspace: Optional[MatchWorkspace]):
log.log(VERBOSE, f'Processing file: {path}')
open_timer = _event_timer(self._profile, 'open_file')
with open(path, mode='rb') as file, \
with self._opener(path) as file, \
self._matcher.create_context() as context:
self._put_profile_event(open_timer)
length = 0
Expand Down Expand Up @@ -687,7 +699,8 @@ def __init__(
allow_io_errors: bool = False,
debug: bool = False,
logging_initializer: Callable[[], None] = False,
profiler: Optional[Profiler] = None
profiler: Optional[Profiler] = None,
direct_io: bool = False
):
self.size = size
self._matcher = matcher
Expand All @@ -704,6 +717,7 @@ def __init__(
self._debug = debug
self._logging_initializer = logging_initializer
self._profiler = profiler
self._direct_io = direct_io
self._completed = False

def __enter__(self):
Expand Down Expand Up @@ -764,7 +778,8 @@ def start(self):
self._use_log_events,
self._allow_io_errors,
self._logging_initializer,
self._profiler is not None
self._profiler is not None,
self._direct_io
)
worker.start()
self._workers.append(worker)
Expand Down Expand Up @@ -933,7 +948,8 @@ def scan(
allow_io_errors=self.options.allow_io_errors,
debug=self.options.debug,
logging_initializer=self.options.logging_initializer,
profiler=profiler
profiler=profiler,
direct_io=self.options.direct_io
) as worker_pool:
def add_path(path: str):
while not file_locator_process.add_path(path):
Expand Down
47 changes: 47 additions & 0 deletions wordfence/util/direct_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import mmap
import math

from typing import Optional


class DirectIoBuffer:

def __init__(self, max_chunk_size: int = mmap.PAGESIZE):
self.max_chunk_size = max_chunk_size
self.buffer_size = (
math.ceil(max_chunk_size / mmap.PAGESIZE) * mmap.PAGESIZE
)
self.buffer = mmap.mmap(-1, self.buffer_size)
self.buffer_view = memoryview(self.buffer)
self.buffers = [self.buffer]

def seek(self, position: int = 0) -> list:
self.buffer.seek(position)

def read(self, length: int) -> bytes:
return self.buffer.read(length)


class DirectIoReader:

def __init__(self, path: str, buffer: DirectIoBuffer):
self.fd = os.open(path, os.O_RDONLY | os.O_DIRECT)
self.buffer = buffer
self.offset = 0

def read(self, limit: Optional[int] = None) -> bytes:
read_offset = math.floor(self.offset / mmap.PAGESIZE)
skip = self.offset % mmap.PAGESIZE
read_length = os.preadv(self.fd, self.buffer.buffers, read_offset)
read_length -= skip
read_length = min(read_length, limit)
self.offset += read_length
self.buffer.seek(skip)
return self.buffer.read(read_length)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
os.close(self.fd)

0 comments on commit ed25217

Please sign in to comment.