From d53514790033fe0efa997dbdae0606db483696e8 Mon Sep 17 00:00:00 2001 From: Antonio Fresneda Date: Tue, 2 Mar 2021 13:13:01 +0100 Subject: [PATCH] Apply style corrections: - Fix wrong tab size in parameters descriptions. - Change imports in test_follow_symbolic_links and test_audit. - Fix adds missing documentation to follow_symbolic_links tests and test_audit. --- .../test_files/test_audit/test_audit.py | 112 +++++++++++------- .../test_audit_after_initial_scan.py | 37 +++--- .../test_audit/test_audit_no_dir.py | 23 ++-- .../test_audit/test_remove_audit.py | 18 ++- .../test_audit/test_remove_rule_five_times.py | 32 ++--- .../test_follow_symbolic_link/common.py | 3 +- ...t_audit_rules_removed_after_change_link.py | 29 +++-- .../test_change_target.py | 39 +++--- .../test_change_target_inside_folder.py | 33 +++--- ...est_change_target_with_nested_directory.py | 25 ++-- .../test_delete_symlink.py | 36 +++--- .../test_delete_target.py | 48 ++++---- .../test_follow_symbolic_disabled.py | 30 ++--- .../test_monitor_symlink.py | 29 +++-- .../test_not_following_symbolic_link.py | 44 +++---- .../test_revert_symlink.py | 37 +++--- .../test_symlink_and_dir.py | 32 ++--- .../test_symlink_dir_inside_monitored_dir.py | 26 ++-- .../test_symlink_to_dir_between_scans.py | 29 +++-- .../test_symlink_within_dir.py | 29 +++-- 20 files changed, 393 insertions(+), 298 deletions(-) diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit.py b/tests/integration/test_fim/test_files/test_audit/test_audit.py index b1521e18e5..8e77f23ab8 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit.py @@ -7,16 +7,9 @@ import psutil import pytest +import wazuh_testing.fim as fim + from wazuh_testing import logger -from wazuh_testing.fim import (LOG_FILE_PATH, callback_audit_added_rule, - callback_audit_connection, - callback_audit_health_check, - callback_audit_reloaded_rule, - callback_audit_rules_manipulation, - callback_realtime_added_directory, - callback_audit_key, - create_file, REGULAR, - detect_initial_scan) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import FileMonitor @@ -33,7 +26,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations @@ -58,16 +51,20 @@ def test_audit_health_check(tags_to_apply, get_configuration, """Check if the health check is passed. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. + TimeoutError: If an expected event couldn't be captured. """ logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_health_check, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_health_check, error_message='Health check failed') @@ -79,18 +76,22 @@ def test_added_rules(tags_to_apply, get_configuration, """Check if the specified folders are added to Audit rules list. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) logger.info('Checking the event...') events = wazuh_log_monitor.start(timeout=20, - callback=callback_audit_added_rule, + callback=fim.callback_audit_added_rule, accum_results=3, error_message='Folders were not added to Audit rules list' ).result() @@ -108,11 +109,15 @@ def test_readded_rules(tags_to_apply, get_configuration, """Check if the removed rules are added to Audit rules list. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ logger.info('Applying the test configuration') @@ -124,12 +129,12 @@ def test_readded_rules(tags_to_apply, get_configuration, os.system(command) wazuh_log_monitor.start(timeout=20, - callback=callback_audit_rules_manipulation, + callback=fim.callback_audit_rules_manipulation, error_message=f'Did not receive expected "manipulation" event with the ' f'command {command}') events = wazuh_log_monitor.start(timeout=10, - callback=callback_audit_added_rule, + callback=fim.callback_audit_added_rule, error_message='Did not receive expected "added" event with the rule ' 'modification').result() @@ -144,11 +149,15 @@ def test_readded_rules_on_restart(tags_to_apply, get_configuration, """Check if the rules are added to Audit when it restarts. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ logger.info('Applying the test configuration') @@ -160,12 +169,12 @@ def test_readded_rules_on_restart(tags_to_apply, get_configuration, p.wait() wazuh_log_monitor.start(timeout=10, - callback=callback_audit_connection, + callback=fim.callback_audit_connection, error_message=f'Did not receive expected "connect" event with the command ' f'{" ".join(restart_command)}') events = wazuh_log_monitor.start(timeout=30, - callback=callback_audit_added_rule, + callback=fim.callback_audit_added_rule, accum_results=3, error_message=f'Did not receive expected "load" event with the command ' f'{" ".join(restart_command)}').result() @@ -183,11 +192,14 @@ def test_move_rules_realtime(tags_to_apply, get_configuration, """Check if the rules are changed to realtime when Audit stops. Args: - tags_to_apply (set): Configuration tag to apply in the test. - + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ logger.info('Applying the test configuration') @@ -199,7 +211,7 @@ def test_move_rules_realtime(tags_to_apply, get_configuration, p.wait() events = wazuh_log_monitor.start(timeout=30, - callback=callback_realtime_added_directory, + callback=fim.callback_realtime_added_directory, accum_results=3, error_message=f'Did not receive expected "directory added" for monitoring ' f'with the command {" ".join(stop_command)}').result() @@ -221,12 +233,16 @@ def test_audit_key(audit_key, path, get_configuration, configure_environment, re a file is created. Args: - audit_key (str): Name of the audit_key to monitor. - tags_to_apply (set): Configuration tag to apply in the test. + audit_key (str): Name of the audit_key to monitor. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ logger.info('Applying the test configuration') @@ -238,15 +254,15 @@ def test_audit_key(audit_key, path, get_configuration, configure_environment, re # Restart and for wazuh control_service('stop') - truncate_file(LOG_FILE_PATH) - wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) + truncate_file(fim.LOG_FILE_PATH) + wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) control_service('start') - detect_initial_scan(wazuh_log_monitor) + fim.detect_initial_scan(wazuh_log_monitor) # Look for audit_key word - create_file(REGULAR, path, "testfile") + fim.create_file(fim.REGULAR, path, "testfile") events = wazuh_log_monitor.start(timeout=30, - callback=callback_audit_key, + callback=fim.callback_audit_key, accum_results=1, error_message=f'Did not receive expected "Match audit_key ..." event ' f'with the command {" ".join(add_rule_command)}').result() @@ -265,13 +281,17 @@ def test_restart_audit(tags_to_apply, should_restart, get_configuration, configu the file again. Args: - tags_to_apply (set): Configuration tag to apply in the test. - should_restart (boolean): True if Auditd should restart, False otherwise + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + should_restart (boolean): True if Auditd should restart, False otherwise + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the time before the and after the restart are equal when auditd has been restarted or if the time - before and after the restart are different when auditd hasn't been restarted + TimeoutError: If an expected event couldn't be captured. + ValueError: If the time before the and after the restart are equal when auditd has been restarted or if the time + before and after the restart are different when auditd hasn't been restarted """ def get_audit_creation_time(): diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py b/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py index ad89fdd3f1..52f3a1a130 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py @@ -8,11 +8,8 @@ import subprocess import pytest -from wazuh_testing.fim import (LOG_FILE_PATH, - callback_audit_added_rule, - callback_audit_removed_rule, - callback_audit_connection_close, - callback_audit_connection, wait_for_audit) +import wazuh_testing.fim as fim + from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing import global_parameters @@ -28,7 +25,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -56,23 +53,27 @@ def test_remove_and_read_folder(tags_to_apply, folder, get_configuration, """Remove folder which is monitored with auditd and then create it again. Args: - tags_to_apply (set): Configuration tag to apply in the test. - folder (str): The folder to remove and read. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + folder (str): The folder to remove and read. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) shutil.rmtree(folder, ignore_errors=True) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_audit_removed_rule, + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_removed_rule, error_message=f'Did not receive expected "removed" event ' f'removing the folder {folder}') os.makedirs(folder, mode=0o777) - wait_for_audit(True, wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_audit_added_rule, + fim.wait_for_audit(True, wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_added_rule, error_message='Did not receive expected "added" event') @@ -84,10 +85,14 @@ def test_reconnect_to_audit(tags_to_apply, get_configuration, configure_environm """Restart auditd and check Wazuh reconnect to auditd Args: - tags_to_apply (set): Configuration tag to apply in the test + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) @@ -95,7 +100,7 @@ def test_reconnect_to_audit(tags_to_apply, get_configuration, configure_environm restart_command = ["service", "auditd", "restart"] subprocess.run(restart_command, check=True) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_connection_close, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_connection_close, error_message='Did not receive expected "audit connection close" event') - wazuh_log_monitor.start(timeout=20, callback=callback_audit_connection, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_connection, error_message='Did not receive expected "audit connection" event') diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py b/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py index ee59352d29..0966333996 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py @@ -8,8 +8,9 @@ import sys import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import wait_for_audit, generate_params, callback_audit_unable_dir, callback_audit_added_rule from wazuh_testing.tools import PREFIX, LOG_FILE_PATH, ALERT_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -25,12 +26,12 @@ filename = 'testfile' test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) wazuh_alert_monitor = FileMonitor(ALERT_FILE_PATH) # Configurations -p, m = generate_params(extra_params={'TEST_DIRECTORIES': testdir}, modes=['whodata']) +p, m = fim.generate_params(extra_params={'TEST_DIRECTORIES': testdir}, modes=['whodata']) configurations = load_wazuh_configurations(configurations_path, __name__, params=p, metadata=m) @@ -70,24 +71,28 @@ def test_audit_no_dir(tags_to_apply, get_configuration, configure_environment, r when the directory is created, it starts to be monitored. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. - ValueError: If the path of the event is wrong. + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ check_apply_test(tags_to_apply, get_configuration['tags']) # Assert message is generated: Unable to add audit rule for .... - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_audit_unable_dir, + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_unable_dir, error_message='Did not receive message "Unable to add audit rule for ..."' ).result() assert result == testdir, f'{testdir} not in "Unable to add audit rule for {result}" message' # Create the directory and verify that it is added to the audit rules. It is checked every 30 seconds. os.makedirs(testdir) - wait_for_audit(True, wazuh_log_monitor) - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_audit_added_rule, + fim.wait_for_audit(True, wazuh_log_monitor) + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_added_rule, error_message='Folders were not added to Audit rules list').result() assert result == testdir, f'{testdir} not in "Added audit rule for monitoring directory: {result}" message' diff --git a/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py b/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py index 08a3994384..574685fe52 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py +++ b/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py @@ -8,8 +8,9 @@ import subprocess import pytest +import wazuh_testing.fim as fim + from distro import id -from wazuh_testing.fim import LOG_FILE_PATH, callback_audit_cannot_start from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,7 +25,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -77,14 +78,19 @@ def test_move_folders_to_realtime(tags_to_apply, get_configuration, uninstall_in """Check folders monitored with Whodata change to Real-time if auditd is not installed Args: - tags_to_apply (set): Configuration tag to apply in the test. - + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + uninstall_install_audit (fixture): Uninstall auditd before the test and install auditd again after the test is + executed. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_cannot_start, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_cannot_start, error_message='Did not receive expected "Who-data engine could not start. ' 'Switching who-data to real-time" event') diff --git a/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py b/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py index a07cb2a15c..c29019ce82 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py +++ b/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py @@ -7,9 +7,8 @@ import subprocess import pytest -from wazuh_testing.fim import (LOG_FILE_PATH, - callback_audit_rules_manipulation, - callback_audit_deleting_rule) +import wazuh_testing.fim as fim + from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,7 +23,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -35,14 +34,7 @@ @pytest.fixture(scope='module', params=configurations) def get_configuration(request): - """Get configurations from the module. - - Args: - request: - - Returns: - - """ + """Get configurations from the module.""" return request.param @@ -56,19 +48,27 @@ def test_remove_rule_five_times(tags_to_apply, folder, audit_key, """Remove auditd rule using auditctl five times and check Wazuh ignores folder. Args: - tags_to_apply (set): Configuration tag to apply in the test. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + folder (str): Path whose rule will be removed. + audit_key (str): Name of the configured audit key. + get_configuration (fixture): Gets the current configuration of the test. + uninstall_install_audit (fixture): Uninstall auditd before the test and install auditd again after the test is + executed. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: - TimeoutError: If an expected event couldn't be captured. + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) for _ in range(0, 5): subprocess.run(["auditctl", "-W", folder, "-p", "wa", "-k", audit_key], check=True) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_rules_manipulation, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_rules_manipulation, error_message='Did not receive expected ' '"Detected Audit rules manipulation" event') - wazuh_log_monitor.start(timeout=20, callback=callback_audit_deleting_rule, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_deleting_rule, error_message='Did not receive expected "Deleting Audit rules" event') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py index 5a23068748..df1b5bc258 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py @@ -4,8 +4,7 @@ import subprocess import sys -from wazuh_testing.fim import callback_end_audit_reload_rules, create_file, REGULAR, SYMLINK, \ - callback_symlink_scan_ended, change_internal_options +from wazuh_testing.fim import create_file, REGULAR, SYMLINK, callback_symlink_scan_ended, change_internal_options from wazuh_testing.tools import PREFIX # variables diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py index 03906f156e..1875e105bc 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py @@ -5,10 +5,8 @@ import subprocess import pytest +import wazuh_testing.fim as fim - -from wazuh_testing.fim import generate_params, create_file, REGULAR, SYMLINK, callback_detect_event, \ - LOG_FILE_PATH, change_internal_options, wait_for_audit from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing import global_parameters @@ -21,7 +19,7 @@ pytestmark = [pytest.mark.linux, pytest.mark.tier(level=1)] -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Variables @@ -38,7 +36,7 @@ # Configurations -conf_params, conf_metadata = generate_params(extra_params=param_dir, modes=['whodata']) +conf_params, conf_metadata = fim.generate_params(extra_params=param_dir, modes=['whodata']) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) # Functions @@ -49,9 +47,9 @@ def extra_configuration_before_yield(): Setup the symlink to one folder """ # Symlink pointing to testdir1 - create_file(SYMLINK, symlink_root_path, symlink_name, target=testdir1) + fim.create_file(fim.SYMLINK, symlink_root_path, symlink_name, target=testdir1) # Set symlink_scan_interval to a given value - change_internal_options(param='syscheck.symlink_scan_interval', value=link_interval) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=link_interval) def extra_configuration_after_yield(): @@ -60,7 +58,7 @@ def extra_configuration_after_yield(): """ # Symlink pointing to testdir1 os.remove(symlink_path) - change_internal_options(param='syscheck.symlink_scan_interval', value=600) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) # fixtures @@ -85,6 +83,11 @@ def test_audit_rules_removed_after_change_link(replaced_target, new_target, file replaced_target (str): Directory where the link is pointing. new_target (str): Directory where the link will be pointed after it's updated. file_name (str): Name of the file that will be created inside the folders. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If an expected event couldn't be captured. @@ -92,8 +95,8 @@ def test_audit_rules_removed_after_change_link(replaced_target, new_target, file """ check_apply_test(tags_to_apply, get_configuration['tags']) - create_file(REGULAR, replaced_target, file_name) - ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.create_file(fim.REGULAR, replaced_target, file_name) + ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert ev['data']['type'] == 'added' and ev['data']['path'] == os.path.join(replaced_target, file_name) @@ -102,11 +105,11 @@ def test_audit_rules_removed_after_change_link(replaced_target, new_target, file modify_symlink(new_target, symlink_path) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(True, wazuh_log_monitor) + fim.wait_for_audit(True, wazuh_log_monitor) rules_paths = str(subprocess.check_output(['auditctl', '-l'])) - create_file(REGULAR, new_target, file_name) - ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.create_file(fim.REGULAR, new_target, file_name) + ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert ev['data']['type'] == 'added' and ev['data']['path'] == os.path.join(new_target, file_name) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py index 6dad95c87a..30c88dab50 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py @@ -4,24 +4,24 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ modify_symlink, testdir_link, wait_for_symlink_check, testdir_target, testdir_not_target # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_after_yield, \ extra_configuration_before_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH, wait_for_audit) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor # All tests in this module apply to linux only pytestmark = [pytest.mark.linux, pytest.mark.sunos5, pytest.mark.darwin, pytest.mark.tier(level=1)] -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -51,8 +51,13 @@ def test_symbolic_change_target(tags_to_apply, main_folder, aux_folder, get_conf Wait until symlink_checker runs and ensure that the new file is being monitored and the old one is not. Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. main_folder (str): Directory that is being pointed at or contains the pointed file. aux_folder (str): Directory that will be pointed at or will contain the future pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -64,16 +69,16 @@ def modify_and_check_events(f1, f2, text): Modify the content of 2 given files. We assume the first one is being monitored and the other one is not. We expect a 'modified' event for the first one and a timeout for the second one. """ - modify_file_content(f1, file1, text) - modify_file_content(f2, file1, text) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(f1, file1, text) + fim.modify_file_content(f2, file1, text) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event' ).result() assert 'modified' in modify['data']['type'] and f1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') @@ -85,22 +90,22 @@ def modify_and_check_events(f1, f2, text): # If symlink is pointing to a directory, we need to add files and expect their 'added' event (only if the file # is being created withing the pointed directory if main_folder == testdir_target: - create_file(REGULAR, main_folder, file1, content='') - create_file(REGULAR, aux_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - add = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.create_file(fim.REGULAR, aux_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + add = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event' ).result() assert 'added' in add['data']['type'] and file1 in add['data']['path'], \ f"'added' event not matching for {file1}" with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') else: - create_file(REGULAR, aux_folder, file1, content='') + fim.create_file(fim.REGULAR, aux_folder, file1, content='') with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') @@ -113,7 +118,7 @@ def modify_and_check_events(f1, f2, text): modify_and_check_events(main_folder, aux_folder, 'Sample number one') wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Expect events the other way around now modify_and_check_events(aux_folder, main_folder, 'Sample number two') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py index e20b1b2056..c45fb6b4f8 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ modify_symlink, testdir_link, wait_for_symlink_check, testdir_target, testdir2 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH, wait_for_audit) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,13 +21,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -53,8 +53,13 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne the new target are still being raised. Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. previous_target (str): Previous symlink target. new_target (str): New symlink target (path). + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -70,27 +75,27 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne # Check create event if it's pointing to a directory if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, previous_target, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, previous_target, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Change the target to another file and wait the symcheck to update the link information modify_symlink(new_target, os.path.join(testdir_link, symlink)) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Modify the content of the previous target and don't expect events. Modify the new target and expect an event - modify_file_content(previous_target, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(previous_target, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') - modify_file_content(testdir2, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(testdir2, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and os.path.join(testdir2, file1) in modify['data']['path'], \ diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py index 8b647daae3..58cf64ee6e 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ modify_symlink, testdir_link, wait_for_symlink_check, testdir2 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger, global_parameters -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, LOG_FILE_PATH, wait_for_audit) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,14 +21,14 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}, +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -52,8 +52,13 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne should not trigger 'added' events for the monitored subdirectory on the next scan. Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. previous_target (str): Previous symlink target (path) new_target (str): New symlink target (path). + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -67,19 +72,19 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne symlink = 'symlink3' # Check create event - create_file(REGULAR, previous_target, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.create_file(fim.REGULAR, previous_target, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Change the target to another file and wait the symcheck to update the link information modify_symlink(new_target, os.path.join(testdir_link, symlink)) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Verify that no events are generated - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py index aa8783b86d..ede91143b7 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ testdir_link, wait_for_symlink_check, testdir_target, testdir_not_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, SYMLINK, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH, wait_for_audit) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,13 +21,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -53,9 +53,13 @@ def test_symbolic_delete_symlink(tags_to_apply, main_folder, aux_folder, get_con the target file again once symlink checker runs. Events should be detected now. Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. main_folder (str): Directory that is being pointed at or contains the pointed file. aux_folder (str): Directory that will be pointed at or will contain the future pointed file. - + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -67,30 +71,30 @@ def test_symbolic_delete_symlink(tags_to_apply, main_folder, aux_folder, get_con scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' file1 = 'regular1' if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Remove symlink and don't expect events symlink = 'symlink' if tags_to_apply == {'monitored_file'} else 'symlink2' delete_f(testdir_link, symlink) wait_for_symlink_check(wazuh_log_monitor) - modify_file_content(main_folder, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(main_folder, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Restore symlink and modify the target again. Expect events now - create_file(SYMLINK, testdir_link, symlink, target=os.path.join(main_folder, file1)) + fim.create_file(fim.SYMLINK, testdir_link, symlink, target=os.path.join(main_folder, file1)) wait_for_symlink_check(wazuh_log_monitor) # Wait unitl the audit rule of the link's target is loaded again - wait_for_audit(get_configuration['metadata']['fim_mode'] == "whodata", wazuh_log_monitor) + fim.wait_for_audit(get_configuration['metadata']['fim_mode'] == "whodata", wazuh_log_monitor) - modify_file_content(main_folder, file1, new_content='Sample modification 2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + fim.modify_file_content(main_folder, file1, new_content='Sample modification 2') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py index 96863f7455..4e35ab88be 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py @@ -5,15 +5,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ wait_for_symlink_check, testdir_target, testdir_not_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import generate_params, create_file, REGULAR, callback_detect_event, \ - callback_audit_removed_rule, callback_audit_added_rule, callback_audit_reloading_rules, check_time_travel, \ - modify_file_content, LOG_FILE_PATH, wait_for_audit from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -23,13 +22,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -57,8 +56,13 @@ def test_symbolic_delete_target(tags_to_apply, main_folder, aux_folder, get_conf and modify the file. Modification event must be detected this time. Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. main_folder (str): Directory that is being pointed at or contains the pointed file. aux_folder (str): Directory that will be pointed at or will contain the future pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -74,58 +78,58 @@ def test_symbolic_delete_target(tags_to_apply, main_folder, aux_folder, get_conf # If symlink is pointing to a directory, we need to add files and expect their 'added' event (only if the file # is being created withing the pointed directory. Then, delete the pointed file or directory if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') delete_f(main_folder) else: delete_f(main_folder, file1) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - delete = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + delete = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert 'deleted' in delete['data']['type'] and file1 in delete['data']['path'], \ f"'deleted' event not matching for {file1}" if tags_to_apply == {'monitored_dir'} and whodata: - wazuh_log_monitor.start(timeout=3, callback=callback_audit_removed_rule, + wazuh_log_monitor.start(timeout=3, callback=fim.callback_audit_removed_rule, error_message='Did not receive expected "Monitored directory \'{main_folder}\' was' 'removed: Audit rule removed') os.makedirs(main_folder, exist_ok=True, mode=0o777) - wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=callback_audit_reloading_rules, + wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=fim.callback_audit_reloading_rules, error_message='Did not receive expected "Reloading Audit rules" event') - wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=callback_audit_added_rule, + wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=fim.callback_audit_added_rule, error_message='Did not receive expected "Added audit rule... ' '\'{main_folder}\'" event') else: # If syscheck is monitoring with whodata, wait for audit to reload rules - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) wait_for_symlink_check(wazuh_log_monitor) # Restore the target - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) if tags_to_apply == {'monitored_dir'} and whodata: - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') else: # We don't expect any event since symlink hasn't updated the link information with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error('A "Sending FIM event: ..." event has been detected. No event should be detected as symlink ' 'has not updated the link information yet.') logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Modify the files and expect events since symcheck has updated now - modify_file_content(main_folder, file1, 'Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(main_folder, file1, 'Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py index c03cd31bfb..d6ccfd9954 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py @@ -5,14 +5,12 @@ import os import pytest +import wazuh_testing.fim as fim from test_fim.test_files.test_follow_symbolic_link.common import testdir_target, testdir1 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (LOG_FILE_PATH, - generate_params, create_file, REGULAR, callback_detect_event, - modify_file, delete_file, check_time_travel) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,11 +22,11 @@ test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'no'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'no'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -58,6 +56,10 @@ def test_follow_symbolic_disabled(path, tags_to_apply, get_configuration, config Args: path (str): Path of the target file or directory + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -70,25 +72,25 @@ def test_follow_symbolic_disabled(path, tags_to_apply, get_configuration, config # If the symlink targets to a directory, create a file in it and ensure no event is raised. if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, path, regular_file) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.create_file(fim.REGULAR, path, regular_file) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) # Modify the target file and don't expect any events - modify_file(path, regular_file, new_content='Modify sample') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file(path, regular_file, new_content='Modify sample') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) # Delete the target file and don't expect any events - delete_file(path, regular_file) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.delete_file(path, regular_file) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py index dcb1d5ca2e..a2b291fd67 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py @@ -3,14 +3,13 @@ # This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ testdir_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield - -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -20,13 +19,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -52,6 +51,10 @@ def test_symbolic_monitor_symlink(tags_to_apply, main_folder, get_configuration, Args: main_folder (str): Directory that is being pointed at or contains the pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -63,16 +66,16 @@ def test_symbolic_monitor_symlink(tags_to_apply, main_folder, get_configuration, # Add creation if symlink is pointing to a folder if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - add = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + add = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'added' in add['data']['type'] and file1 in add['data']['path'], \ "'added' event not matching" # Modify the linked file and expect an event - modify_file_content(main_folder, file1, 'Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(main_folder, file1, 'Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ @@ -80,8 +83,8 @@ def test_symbolic_monitor_symlink(tags_to_apply, main_folder, get_configuration, # Delete the linked file and expect an event delete_f(main_folder, file1) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - delete = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + delete = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'deleted' in delete['data']['type'] and file1 in delete['data']['path'], \ diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py index 96f64525d7..8d77f90611 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py @@ -5,11 +5,10 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import modify_symlink from wazuh_testing import global_parameters, logger -from wazuh_testing.fim import (LOG_FILE_PATH, - generate_params, create_file, REGULAR, SYMLINK, callback_detect_event, - modify_file, delete_file, check_time_travel) from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -26,11 +25,11 @@ os.path.join(PREFIX, 'testdir2')] testdir_link, testdir1, testdir2 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -65,6 +64,11 @@ def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_di monitored_dir (str): Monitored directory. non_monitored_dir1 (str): Non-monitored directory. non_monitored_dir2 (str): Non-monitored directory. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -81,35 +85,35 @@ def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_di scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Create regular files out of the monitored directory and don't expect its event - create_file(REGULAR, non_monitored_dir1, name1, content='') - create_file(REGULAR, non_monitored_dir1, name2, content='') + fim.create_file(fim.REGULAR, non_monitored_dir1, name1, content='') + fim.create_file(fim.REGULAR, non_monitored_dir1, name2, content='') target = a_path if sym_target == 'file' else non_monitored_dir1 - create_file(SYMLINK, monitored_dir, sl_name, target=target) + fim.create_file(fim.SYMLINK, monitored_dir, sl_name, target=target) # Create the syslink and expect its event, since it's withing the monitored directory - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Modify the target file and don't expect any event - modify_file(non_monitored_dir1, name1, new_content='Modify sample') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file(non_monitored_dir1, name1, new_content='Modify sample') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Modify the target of the symlink and expect the modify event modify_symlink(target=b_path, path=sl_path) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() if 'modified' in result['data']['type']: logger.info("Received modified event. No more events will be expected.") elif 'deleted' in result['data']['type']: logger.info("Received deleted event. Now an added event will be expected.") - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'added' in result['data']['type'], f"The event {result} should be of type 'added'" @@ -117,10 +121,10 @@ def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_di assert False, f"Detected event {result} should be of type 'modified' or 'deleted'" # Remove and restore the target file. Don't expect any events - delete_file(b_path, name2) - create_file(REGULAR, non_monitored_dir1, name2, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.delete_file(b_path, name2) + fim.create_file(fim.REGULAR, non_monitored_dir1, name2, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py index 1163b963e8..e0ace73547 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ modify_symlink, testdir_link, wait_for_symlink_check # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH, wait_for_audit) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -19,11 +19,11 @@ pytestmark = [pytest.mark.linux, pytest.mark.sunos5, pytest.mark.darwin, pytest.mark.tier(level=1)] -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -51,6 +51,13 @@ def test_symbolic_revert_symlink(tags_to_apply, get_configuration, configure_env is not being monitored anymore and the new folder is. Revert the target change and ensure the file is being monitored and the folder is not. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + Raises: TimeoutError: If a expected event wasn't triggered. AttributeError: If a unexpected event was captured. @@ -58,9 +65,9 @@ def test_symbolic_revert_symlink(tags_to_apply, get_configuration, configure_env """ def modify_and_assert(file): - modify_file_content(testdir1, file, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - ev = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + fim.modify_file_content(testdir1, file, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + ev = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'modified' in ev['data']['type'] and os.path.join(testdir1, file) in ev['data']['path'], \ f"'modified' event not matching for {testdir1} {file}" @@ -71,29 +78,29 @@ def modify_and_assert(file): file2 = 'regular2' # Don't expect an event since it is not being monitored yet - modify_file_content(testdir1, file2, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(testdir1, file2, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Change the target to the folder and now expect an event modify_symlink(testdir1, os.path.join(testdir_link, 'symlink')) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) modify_and_assert(file2) # Modify symlink target, wait for sym_check to update it modify_symlink(os.path.join(testdir1, file1), os.path.join(testdir_link, 'symlink')) wait_for_symlink_check(wazuh_log_monitor) # Wait for audit to reload the rules - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) - modify_file_content(testdir1, file2, new_content='Sample modification2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(testdir1, file2, new_content='Sample modification2') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') modify_and_assert(file1) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py index 1113a153c2..3a03646a35 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py @@ -5,12 +5,12 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import wait_for_symlink_check, \ symlink_interval, \ modify_symlink from wazuh_testing import global_parameters, logger -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, change_internal_options, \ - callback_detect_event, check_time_travel, wait_for_audit from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -27,11 +27,11 @@ testdir_target = test_directories[1] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -47,16 +47,16 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, 'testdir_link', target=testdir) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, 'testdir_link', target=testdir) # Set symlink_scan_interval to a given value - change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) def extra_configuration_after_yield(): """Set symlink_scan_interval to default value and remove symbolic link""" os.remove(testdir_link) - change_internal_options(param='syscheck.symlink_scan_interval', value=600) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) # Tests @@ -72,6 +72,10 @@ def test_symlink_dir_inside_monitored_dir(tags_to_apply, get_configuration, conf Args: tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -86,20 +90,20 @@ def test_symlink_dir_inside_monitored_dir(tags_to_apply, get_configuration, conf # Wait for both audit and the symlink check to run wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Create a file in the pointed folder and expect events - create_file(REGULAR, testdir_link, 'regular2') + fim.create_file(fim.REGULAR, testdir_link, 'regular2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py index 776020bb3d..22a1d8dcad 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py @@ -5,9 +5,9 @@ import os import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, \ - REQUIRED_ATTRIBUTES, CHECK_ALL, CHECK_SIZE, regular_file_cud from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,11 +24,11 @@ testdir_target = os.path.join(testdir, 'testdir_target') test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -45,8 +45,8 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, 'testdir_link', target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, 'testdir_link', target=testdir_target) def extra_configuration_after_yield(): @@ -57,7 +57,7 @@ def extra_configuration_after_yield(): # Tests @pytest.mark.parametrize('tags_to_apply, checkers', [ - ({'symlink_dir_inside_monitored_dir'}, REQUIRED_ATTRIBUTES[CHECK_ALL] - {CHECK_SIZE}), + ({'symlink_dir_inside_monitored_dir'}, fim.REQUIRED_ATTRIBUTES[fim.CHECK_ALL] - {fim.CHECK_SIZE}), ]) def test_symlink_dir_inside_monitored_dir(tags_to_apply, checkers, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): @@ -67,8 +67,12 @@ def test_symlink_dir_inside_monitored_dir(tags_to_apply, checkers, get_configura follow_symbolic_link, etc...) Args: - tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. - checkers (dict): Check options to be used. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + checkers (dict): Check options to be used. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -79,7 +83,7 @@ def test_symlink_dir_inside_monitored_dir(tags_to_apply, checkers, get_configura scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Alerts from the pointed directory should have all checks except size - regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, + fim.regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, time_travel=scheduled) # Alerts from the main directory should have all checks - regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) + fim.regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py index ac3cc13dc6..31d45be56f 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py @@ -6,11 +6,12 @@ from shutil import rmtree import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import wait_for_symlink_check, symlink_interval, \ testdir_link, testdir_target from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, change_internal_options, \ - check_time_travel, callback_detect_event + from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -23,11 +24,11 @@ test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -46,17 +47,17 @@ def extra_configuration_before_yield(): symlinkdir = testdir_link os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, symlinkdir, target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, symlinkdir, target=testdir_target) # Set symlink_scan_interval to a given value - change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) def extra_configuration_after_yield(): """Set symlink_scan_interval to default value""" rmtree(testdir_link, ignore_errors=True) rmtree(testdir_target, ignore_errors=True) - change_internal_options(param='syscheck.symlink_scan_interval', value=600) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) # Tests @@ -72,7 +73,11 @@ def test_symlink_to_dir_between_scans(tags_to_apply, get_configuration, configur the new directory should send alerts during a second scan. Args: - tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -86,13 +91,13 @@ def test_symlink_to_dir_between_scans(tags_to_apply, get_configuration, configur # Delete symbolic link and create a folder with the same name os.remove(testdir_link) os.makedirs(testdir_link, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_link, regular2) + fim.create_file(fim.REGULAR, testdir_link, regular2) # Wait for both audit and the symlink check to run wait_for_symlink_check(wazuh_log_monitor) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py index 2126fe0bfd..b73dcbca9c 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py @@ -6,9 +6,9 @@ from shutil import rmtree import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, \ - REQUIRED_ATTRIBUTES, CHECK_ALL, CHECK_SIZE, regular_file_cud from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -25,11 +25,11 @@ testdir_target = os.path.join(PREFIX, 'testdir_target') test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -46,8 +46,8 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, testdir, 'testdir_link', target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, testdir, 'testdir_link', target=testdir_target) def extra_configuration_after_yield(): @@ -58,7 +58,7 @@ def extra_configuration_after_yield(): # Tests @pytest.mark.parametrize('tags_to_apply, checkers', [ - ({'symlink_within_directory'}, REQUIRED_ATTRIBUTES[CHECK_ALL] - {CHECK_SIZE}), + ({'symlink_within_directory'}, fim.REQUIRED_ATTRIBUTES[fim.CHECK_ALL] - {fim.CHECK_SIZE}), ]) def test_symlink_within_dir(tags_to_apply, checkers, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): @@ -67,8 +67,12 @@ def test_symlink_within_dir(tags_to_apply, checkers, get_configuration, configur The link configuration should prevail over the monitored directory (checks, follow_symbolic_link, etc...). Args: - tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. - checkers (dict): Check options to be used. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + checkers (dict): Check options to be used. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. Raises: TimeoutError: If a expected event wasn't triggered. @@ -79,7 +83,8 @@ def test_symlink_within_dir(tags_to_apply, checkers, get_configuration, configur scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Alerts from the pointed directory should have all checks except size - regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, - time_travel=scheduled) + fim.regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, + options=checkers, time_travel=scheduled) # Alerts from the main directory should have all checks - regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) + fim.regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, + time_travel=scheduled)