diff --git a/deps/wazuh_testing/wazuh_testing/fim.py b/deps/wazuh_testing/wazuh_testing/fim.py index 6c7295dd55..6cee79961b 100644 --- a/deps/wazuh_testing/wazuh_testing/fim.py +++ b/deps/wazuh_testing/wazuh_testing/fim.py @@ -1096,6 +1096,13 @@ def callback_audit_loaded_rule(line): return None +def callback_end_audit_reload_rules(line): + match = re.match(r'.*Audit rules reloaded\. Rules loaded: (.+)', line) + if match: + return match.group(1) + return None + + def callback_audit_event_too_long(line): if 'Caching Audit message: event too long' in line: return True @@ -1109,7 +1116,7 @@ def callback_audit_reloading_rules(line): def callback_audit_reloaded_rule(line): - match = re.match(r'.*Reloaded audit rule for monitoring directory: \'(.+)\'', line) + match = re.match(r'.*Already added audit rule for monitoring directory: \'(.+)\'', line) if match: return match.group(1) return None @@ -2185,6 +2192,17 @@ def detect_whodata_start(file_monitor): '"File integrity monitoring real-time Whodata engine started" event') +def wait_for_audit(whodata, monitor): + """Wait for the audit callback if we are using whodata monitoring. + Args: + whodata (boolean): True if whodata is active. + monitor (FileMonitor): LogMonitor to use. + """ + if whodata: + monitor.start(timeout=35, callback=callback_end_audit_reload_rules, update_position=False, + error_message='Did not receive expected "Audit rules reloaded..." event') + + def generate_params(extra_params: dict = None, apply_to_all: Union[Sequence[Any], Generator[dict, None, None]] = None, modes: list = None): """ diff --git a/docs/tests/integration/test_fim/test_files/test_audit/test_audit.md b/docs/tests/integration/test_fim/test_files/test_audit/test_audit.md new file mode 100644 index 0000000000..5b2f9b503d --- /dev/null +++ b/docs/tests/integration/test_fim/test_files/test_audit/test_audit.md @@ -0,0 +1,75 @@ +# Test audit +This test file has several tests: +- `test_audit_health_check`: Checks the behavior of the FIM audit health check. +- `test_added_rules`: Checks if FIM adds the rules for monitored directories using whodata. +- `test_readded_rules`: Checks that FIM is able to re-add the rule of a directory if it's removed. +- `test_readded_rules_on_restart`: Check if FIM is able to add the audit rules when auditd is restarted. +- `test_move_rules_to_realtime`: Checks that FIM moves the monitored directories using `whodata` to realtime when auditd is stopped. +- `test_audit_key`: Checks that the `` functionality works. +- `test_restart_audit`: Checks that the `` functionality works. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:01:20 | [test_audit.py](../../../../../../tests/integration/test_fim/test_files/test_audit/test_audit.py)| + +## Test logic + +### test_audit_health_check +- The test will monitor a folder using `whodata` +- It will check that the health check passed. + +### test_added_rules +- The test will monitor several folders using `whodata` +- Once FIM starts, the test will check if the a rule for every monitored directory is added + +### test_readded_rules +- The test will monitor a folder using `whodata`. +- Once FIM starts, the test will remove the audit rule (using `auditctl`) and will wait until the manipulation event is triggered. +- Finally, the test will check that the audit rule is added again. + +### test_readded_rules_on_restart +- The test will monitor a folder using `whodata`. +- Once FIM starts, the test will restart auditd and it will wait until auditd has started. +- After auditd is running, he test will wait for the `connect` and the `load rule` events. + +### test_move_rules_realtime +- The test will monitor several folders using `whodata` +- Once FIM starts, the test will stop the auditd service. +- Then it will wait until the monitored directories using `whodata` are monitored with `realtime` + +### test_audit_key +- The test will manually add a rule for a monitored path using a custom audit key. +- After FIM starts, the test will check that the events that are generated with the custom key are processed. + +### test_restart_audit +- The test removes the audit plugin file. +- Then it will check the audit creation time. +## Checks + +- [x] Checks that FIM audit health check works. +- [X] Checks that FIM adds audit rules for monitored directories. +- [X] Checks that FIM is able to re-add audit rules. +- [X] Checks that FIM moves the directories to realtime when whodata is not available. +- [X] Checks the FIM behavior of the `audit_key` and `restart_audit` options. + + +## Execution result + +``` +python3 -m pytest test_files/test_audit/test_audit.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 24 items + +test_files/test_audit/test_audit.py .....sssssssss.ssssssss. [100%] + +============================================= 7 passed, 17 skipped in 80.86s (0:01:20) ============================================= + +``` + +## Code documentation + +::: tests.integration.test_fim.test_files.test_audit.test_audit diff --git a/docs/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.md b/docs/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.md index e69de29bb2..b9a0e381ce 100644 --- a/docs/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.md +++ b/docs/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.md @@ -0,0 +1,47 @@ +# Test audit after initial scan +This test file has two tests: + +The first one, called `test_remove_and_read_folder` checks that FIM monitors a folder if it's removed and created. +The second one, restarts `auditd` and checks if `whodata` works. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:01:33 | [test_audit_after_initial_scan.py](../../../../../../tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py)| + +## Test logic + +### test_remove_and_read_folder +- The test will monitor a folder using `whodata` +- Once FIM starts, the test will remove the folder and checks if the audit rule associated to that folder has been removed. +- Finally, it creates again the same folder and checks that the audit rule is added. + +### test_reconnect_to_audit +- The test will monitor a folder using `whodata` +- Then it will restart the `auditd` daemon. +- Finally, the test waits until FIM is able connect to audit. + +## Checks + +- [x] Checks that FIM can recover from loosing it's connection to audit. +- [x] Checks that FIM is able to monitor the folders using whodata after they are removed and created again. + +## Execution result + +``` +python3 -m pytest test_files/test_audit/test_audit_after_initial_scan.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 4 items + +test_files/test_audit/test_audit_after_initial_scan.py .... [100%] + +=================================================== 4 passed in 93.54s (0:01:33) =================================================== +``` + +## Code documentation + +::: tests.integration.test_fim.test_files.test_audit.test_audit_after_initial_scan diff --git a/docs/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.md b/docs/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.md new file mode 100644 index 0000000000..b2ad324ae5 --- /dev/null +++ b/docs/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.md @@ -0,0 +1,38 @@ +# Test audit no dir +This test checks that FIM doesn't add audit rules for non-existing directories. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:00:32 | [test_audit_no_dir.py](../../../../../../tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py)| + +## Test logic + +### test_remove_and_read_folder +- The test will monitor a non-existing folder using `whodata` +- Once FIM starts, the test will check that the audit rule is not added. +- Then, it will create the folder and wait until the rule is added again. + +## Checks + +- [x] Checks that FIM doesn't add rules for non-existing directories. +- [x] Checks that FIM is able to monitor a folder after it's creation. +## Execution result + +``` +python3 -m pytest test_files/test_audit/test_audit_no_dir.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_audit/test_audit_no_dir.py . [100%] + +======================================================== 1 passed in 31.96s ======================================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_audit/test_remove_audit.md b/docs/tests/integration/test_fim/test_files/test_audit/test_remove_audit.md new file mode 100644 index 0000000000..0b28f8a806 --- /dev/null +++ b/docs/tests/integration/test_fim/test_files/test_audit/test_remove_audit.md @@ -0,0 +1,38 @@ +# Test remove audit + +The test checks that if audit is not installed, FIM switches from `whodata` to `realtime`. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:00:05 | [test_remove_rule_five_times.py](../../../../../../tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py)| + +## Test logic + +- The test will uninstall `auditd`. +- The test will check that FIM is able to switch from `whodata` to `realtime`. +- Finally, the test will install again `auditd` + +## Checks + +- [x] Checks that FIM is able to switch from `whodata` to `realtime` if auditd is not installed. + +## Execution result + +``` +python3 -m pytest test_files/test_audit/test_remove_audit.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_audit/test_remove_audit.py . [100%] + +======================================================== 1 passed in 5.95s ========================================================= +``` + +## Code documentation + +::: tests.integration.test_fim.test_files.test_audit.test_remove_audit diff --git a/docs/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.md b/docs/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.md new file mode 100644 index 0000000000..c11cf2ca70 --- /dev/null +++ b/docs/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.md @@ -0,0 +1,37 @@ +# Test audit remove rule five times + +The test checks that FIM stops monitoring with `whodata` when at least 5 manipulation in the audit rules has been done by a user. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:00:06 | [test_remove_rule_five_times.py](../../../../../../tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py)| + +## Test logic + +- The test will monitor a folder using `whodata`. +- The test will modify five times the audit rules and it will check that `whodata` switches to `realtime` . + +## Checks + +- [x] Checks that FIM is able to switch from `whodata` to `realtime` when an user edits the audit rules. + +## Execution result + +``` +python3 -m pytest test_files/test_audit/test_remove_rule_five_times.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_audit/test_remove_rule_five_times.py . [100%] + +======================================================== 1 passed in 4.34s ========================================================= +``` + +## Code documentation + +::: tests.integration.test_fim.test_files.test_audit.test_remove_rule_five_times diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.md new file mode 100644 index 0000000000..0bc3330105 --- /dev/null +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.md @@ -0,0 +1,40 @@ +# Test audit rules removed after change link + +This test checks that FIM removes automatically the audit rule of the target of a monitored symbolic link when the link's target is replaced. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:00:33 | [test_audit_rules_removed_after_change_link.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py)| + +## Test logic + + +- The test will monitor a symbolic link pointing to a directory using `whodata`. +- Once FIM starts, it will create and expect events inside the pointed folder. +- After the events are processed, the test will change the target of the link to another folder, it will wait until the thread that checks the symbolic links updates the link's target. +- Finally, it will generate some events inside the new target and it will check that the audit rule of the previous target folder has been removed (by using `auditctl -l`). + +## Checks + +- [x] The rule is removed. +- [x] The events are triggered for all the link's targets + +## Execution result + +``` + python3 -m pytest test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py . [100%] + +====================================================== 1 passed in 33.48s ====================================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.md index e69de29bb2..9ffadaa74e 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.md @@ -0,0 +1,41 @@ +# Test change target + +Checks if FIM updates the symbolic link's target properly. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:03:00 | [test_change_target.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/directory. +- Once FIM starts, it will create and expect events inside the pointed folder and will create files inside the new target making sure that it won't generate any alerts. +- After the events are processed, the test will change the target of the link to another folder, it will wait until the thread that checks the symbolic links updates the link's target. +- Then, the test checks the new file is being monitored and the old one is not. + +## Checks + +- [x] The rule is removed. +- [x] The events are triggered for all the link's targets + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_change_target.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_change_target.py .ss..ss..ss. [100%] + +=========================================== 6 passed, 6 skipped in 176.83s (0:02:56) =========================================== + +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.md index e69de29bb2..a90bfc3154 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.md @@ -0,0 +1,39 @@ +# Test change target inside folder + +Check if FIM stops detecting events from previous target when pointing to a new folder. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 10 | 00:02 | [test_change_target_inside_folder.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/folder. +- Once FIM starts, the test will change the link's target to another file/folder inside a monitored folder. +- It will wait until the thread that checks the symbolic links updates the link's target. +- Finally, it will generate some events inside the new target and it will check that the events are triggered + +## Checks + +- [x] The events are triggered for all the link's targets +- [X] No events are triggered for all link's targets + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_change_target_inside_folder.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_change_target_inside_folder.py .ss..ss..ss. [100%] + +=========================================== 6 passed, 6 skipped in 178.70s (0:02:58) =========================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md index e69de29bb2..df8b6f8b3c 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md @@ -0,0 +1,39 @@ +# Test change target with nested directory + +This test checks that FIM doesn't trigger any alerts for directories within the target of a monitored symbolic link when the link is changed. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:00:30 | [test_change_target_with_nested_directory.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a directory which contains a monitored subdirectory +- Once FIM starts, it will create and expect events inside the pointed folder. +- After the events are processed, the test will change the target of the link to another folder, it will wait until the thread that checks the symbolic links updates the link's target. +- Finally, it checks that no events are triggered inside the monitored subdirectory. + +## Checks + +- [x] No events are triggered inside the monitored subdirectory. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py . [100%] + +====================================================== 1 passed in 27.86s ====================================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.md index e69de29bb2..4c04a9947d 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.md @@ -0,0 +1,42 @@ +# Test change target +Check if FIM stops detecting events when deleting the monitored symbolic link. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:05:00 | [test_delete_symlink.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/directory. +- Once FIM starts, it will create and expect events inside the pointed folder. +- After the events are processed, the test will remove the symbolic link, wait until the links are reloaded and will create files inside the target that the link was pointing to and check that no alerts are triggered. +- Then, the test will restore the link, it will wait until the link is updated and it will generate events inside the target folder and check that the alerts are triggered. +## Checks + +- [x] FIM stops monitoring the link's target if the link was removed. +- [x] FIM will monitor again the target directory/file if the link is restored. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_delete_ +test_delete_symlink.py test_delete_target.py +root@ubuntu1:/vagrant/wazuh-qa/tests/integration/test_fim# python3 -m pytest test_files/test_follow_symbolic_link/test_delete_symlink.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_delete_symlink.py .ss..ss..ss. [100%] + +=========================================== 6 passed, 6 skipped in 296.78s (0:04:56) =========================================== + + +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.md index e69de29bb2..9cce1c929e 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.md @@ -0,0 +1,38 @@ +# Test change target +Check if FIM stops detecting events when deleting the target of a monitored symbolic link. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:03:00 | [test_delete_target.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/directory. +- Once FIM starts, it will create and expect events inside the pointed folder. +- After the events are processed, the test will remove the link's target, wait until the links are reloaded. Before the next link reload, the test will create again the file/directory and will generate events inside the target that the link was pointing to and check that no alerts are triggered. +- Then, the test will wait until the links are reloaded, it will generate and checks the events with the uploaded link. +## Checks + +- [x] FIM stops monitoring the link's target if the target was removed. +- [x] FIM will monitor again the target directory/file if the target is restored. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_delete_target.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_delete_target.py .ss..ss..ss. [100%] + +=========================================== 6 passed, 6 skipped in 357.27s (0:05:57) =========================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.md index e69de29bb2..c445a6d2c6 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.md @@ -0,0 +1,38 @@ +# Test change target + +Check the FIM behavior when the option `follow_symbolic_link` is set to `no`. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:02:00 | [test_follow_symbolic_disabled.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/directory. +- Once FIM starts, it will create and won't expect events inside the pointed folder. +- Then, the test will modify the link's target, and check that no alerts are triggered. +- Finally, the test will remove the link's target, and check that no alerts are triggered. +## Checks + +- [x] FIM stops monitoring the link's target if the option `follow_symbolic_link` is disabled. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py .ss..ss..ss. [100%] + +=========================================== 6 passed, 6 skipped in 95.89s (0:01:35) ============================================ +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.md index e69de29bb2..7d7655a06c 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.md @@ -0,0 +1,38 @@ +# Test change target +Checks the behavior when monitoring a link that points to a file or a directory. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:02:00 | [test_monitor_symlink.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py)| + +## Test logic + +- The test will monitor a symbolic link pointing to a file/directory. +- Once FIM starts, if the link is a folder, creates a file and checks the expect added event. +- Then, it will modify and expect modified event. +- Finally, the test will remove the link's target and check the delete event. +## Checks + +- [x] FIM monitors the target of the link. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_monitor_symlink.py +===================================================== test session starts ====================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 12 items + +test_files/test_follow_symbolic_link/test_monitor_symlink.py .ss..ss..ss. [100%] + +================================================ 6 passed, 6 skipped in 27.04s ================================================= +root@ubuntu1:/vagrant/wazuh-qa/tests/integration/test_fim# +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.md index e69de29bb2..c4b9f3a28a 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.md @@ -0,0 +1,39 @@ +# Test change target +Checks the behavior when monitoring a link that points to a file or a directory with the option `follow_symbolic_link` disabled. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:02:00 | [test_not_following_symbolic_link.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py)| + +## Test logic + +- The test will create some files in a non monitored folder and won't expect any events. +- Then it will create a symbolic link inside a monitored folder and pointing to the non monitored folder. +- It will expect a `added` event with the path of the symbolic link, as it within a monitored directory. +- It will create some events in the symbolic link's target and won't expect any events. +- Then it will change the link's target, and it will expect a `modified` event. + +## Checks + +- [x] FIM doesn't monitor the link's target when `follow_symbolic_link` is disabled. +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py +============================= test session starts ============================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 6 items + +test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py . [ 16%] +..... [100%] + +=========================================== 6 passed in 87.60s (0:01:27) =========================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.md index e69de29bb2..5b0b3ea9d8 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.md @@ -0,0 +1,38 @@ +# Test change target + +Check if FIM detects changes in the symbolic links targets properly. + +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:02:00 | [test_revert_symlink.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py)| + +## Test logic + +- The test will create a link to a file/directory. +- Then, it will change the target to a directory and will create some files inside, expecting all the alerts. +- After the events are processed, the test will change the link to it's previous target. +- The test will generate events and expect alerts. +## Checks + +- [x] FIM monitors the target of the link when is changed and when the change is reverted. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_revert_symlink.py +=============================================== test session starts ================================================ +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 3 items + +test_files/test_follow_symbolic_link/test_revert_symlink.py ... [100%] + +========================================== 3 passed in 159.54s (0:02:39) =========================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.md index e69de29bb2..fe4791b4fc 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.md @@ -0,0 +1,36 @@ +# Test change target + +Check if FIM scans a directory silently when a link is changed, preventing events from triggering until it has finished. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:02:00 | [test_symlink_and_dir.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py)| + +## Test logic + +- The test will create a link to a file/directory. +- Then, it will change the target to non empty directory, checking that no events are triggered for the files already in the directory. +- Finally, the test generates events and checks that alerts are triggered. +## Checks + +- [x] FIM doesn't trigger alerts for already existing files when a link is changed to a non empty directory. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_symlink_and_dir.py +=============================================== test session starts ================================================ +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 3 items + +test_files/test_follow_symbolic_link/test_symlink_and_dir.py ... [100%] + +=========================================== 3 passed in 95.67s (0:01:35) =========================================== +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.md index e69de29bb2..19dfa9599c 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.md @@ -0,0 +1,36 @@ +# Test change target + +Check the precedence of monitoring options when there is a subdirectory within monitored directory through a symbolic link. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:01:00 | [test_symlink_dir_inside_monitored_dir.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/ test_symlink_dir_inside_monitored_dir.py)| + +## Test logic + +- The test will create a directory, a symbolic link to that directory and a subdirectory. The directory and the symbolic link are monitored with different options. +- Then, it will generate events inside the directory and will check the alerts fields matches the ones that are configured for the symbolic link. +- Finally, the test will generate events in the subdirectory and check the alerts fields matches the ones that are configured for the link. +## Checks + +- [x] FIM processes correctly the precedence in the configuration when a directory is monitored inside a monitored symbolic link with the option `follow_symbolic_link` enabled. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py +=============================================== test session starts ================================================ +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 3 items + +test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py ... [100%] + +================================================ 3 passed in 38.65s ================================================ +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.md index e69de29bb2..b4d76b0d2c 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.md @@ -0,0 +1,36 @@ +# Test change target + +Check that FIM correctly monitors folders that replaced monitored symbolic links when the option `follow_symbolic_link` is enabled. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux/UNIX | 00:00:20 | [test_symlink_to_dir_between_scans.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/ test_symlink_dir_inside_monitored_dir.py)| + +## Test logic + +- The test will create a directory with some files and a symbolic link. +- Then, it will remove the link and will create a directory with the same path. +- Then, it will wait until the next scheduled scan and will check that new files triggers events. +## Checks + +- [x] FIM monitors directories that have replaced symbolic links. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py +=============================================== test session starts ================================================ +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 1 item + +test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py . [100%] + +================================================ 1 passed in 22.95s ================================================ +``` + +## Code documentation + + diff --git a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.md b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.md index e69de29bb2..3a344708b5 100644 --- a/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.md +++ b/docs/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.md @@ -0,0 +1,36 @@ +# Test change target + +Check the precedence of monitoring options when there is a symbolic link within monitored directory and `follow_symbolic_link` is enabled. +## General info + +| Tier | Platforms | Time spent| Test file | +|:--:|:--:|:--:|:--:| +| 1 | Linux | 00:01:00 | [test_symlink_within_dir.py](../../../../../../tests/integration/test_fim/test_files/test_follow_symbolic_link/ test_symlink_within_dir.py)| + +## Test logic + +- The test will create a directory, a subdirectory and a symbolic link to the subdirectory. The directory and the symbolic link are monitored with different options. +- Then, it will generate events inside the directory and will check the alerts fields matches the ones that are configured for the directory. +- Finally, the test will generate events in the subdirectory and check the alerts fields matches the ones that are configured for the link. +## Checks + +- [x] FIM processes correctly the precedence in the configuration when a symbolic link is inside a monitored directory. + +## Execution result + +``` +python3 -m pytest test_files/test_follow_symbolic_link/test_symlink_within_dir.py +======================================================= test session starts ======================================================== +platform linux -- Python 3.8.5, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 +rootdir: /vagrant/wazuh-qa/tests/integration, configfile: pytest.ini +plugins: html-2.0.1, testinfra-5.0.0, metadata-1.11.0 +collected 3 items + +test_files/test_follow_symbolic_link/test_symlink_within_dir.py ... [100%] + +======================================================== 3 passed in 39.55s ======================================================== +``` + +## Code documentation + + diff --git a/mkdocs.yml b/mkdocs.yml index d2e36a47f3..84fd5d7395 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -171,6 +171,10 @@ nav: - Test audit: - tests/integration/test_fim/test_files/test_audit/index.md - tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.md + - tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.md + - tests/integration/test_fim/test_files/test_audit/test_audit.md + - tests/integration/test_fim/test_files/test_audit/test_remove_audit.md + - tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.md - Test basic usage: - tests/integration/test_fim/test_files/test_basic_usage/index.md - Test basic usage baseline generation: tests/integration/test_fim/test_files/test_basic_usage/test_basic_usage_baseline_generation.md @@ -215,8 +219,9 @@ nav: - Test file limit values: tests/integration/test_fim/test_files/test_file_limit/test_file_limit_values.md - Test follow symbolic link: - tests/integration/test_fim/test_files/test_follow_symbolic_link/index.md + - Test audit rules removed after link update: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.md - Test change target inside folder: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.md - - Test cahnge target with nested directory: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md + - Test change target with nested directory: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.md - Test change target: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.md - Test delete symlink: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.md - Test delete target: tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.md diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit.py b/tests/integration/test_fim/test_files/test_audit/test_audit.py index 70cfcc8cd3..8e77f23ab8 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit.py @@ -7,16 +7,9 @@ import psutil import pytest +import wazuh_testing.fim as fim + from wazuh_testing import logger -from wazuh_testing.fim import (LOG_FILE_PATH, callback_audit_added_rule, - callback_audit_connection, - callback_audit_health_check, - callback_audit_reloaded_rule, - callback_audit_rules_manipulation, - callback_realtime_added_directory, - callback_audit_key, - create_file, REGULAR, - detect_initial_scan) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import FileMonitor @@ -33,7 +26,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations @@ -55,11 +48,23 @@ def get_configuration(request): ]) def test_audit_health_check(tags_to_apply, get_configuration, configure_environment, restart_syscheckd): - """Check if the health check is passed.""" + """Check if the health check is passed. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + """ + logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_health_check, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_health_check, error_message='Health check failed') @@ -68,12 +73,25 @@ def test_audit_health_check(tags_to_apply, get_configuration, ]) def test_added_rules(tags_to_apply, get_configuration, configure_environment, restart_syscheckd): - """Check if the specified folders are added to Audit rules list.""" + """Check if the specified folders are added to Audit rules list. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. + """ + logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) logger.info('Checking the event...') events = wazuh_log_monitor.start(timeout=20, - callback=callback_audit_added_rule, + callback=fim.callback_audit_added_rule, accum_results=3, error_message='Folders were not added to Audit rules list' ).result() @@ -88,7 +106,20 @@ def test_added_rules(tags_to_apply, get_configuration, ]) def test_readded_rules(tags_to_apply, get_configuration, configure_environment, restart_syscheckd): - """Check if the removed rules are added to Audit rules list.""" + """Check if the removed rules are added to Audit rules list. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. + """ + logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) @@ -98,13 +129,13 @@ def test_readded_rules(tags_to_apply, get_configuration, os.system(command) wazuh_log_monitor.start(timeout=20, - callback=callback_audit_rules_manipulation, + callback=fim.callback_audit_rules_manipulation, error_message=f'Did not receive expected "manipulation" event with the ' f'command {command}') events = wazuh_log_monitor.start(timeout=10, - callback=callback_audit_reloaded_rule, - error_message='Did not receive expected "reload" event with the rule ' + callback=fim.callback_audit_added_rule, + error_message='Did not receive expected "added" event with the rule ' 'modification').result() assert dir_ in events, f'{dir_} not in {events}' @@ -115,7 +146,20 @@ def test_readded_rules(tags_to_apply, get_configuration, ]) def test_readded_rules_on_restart(tags_to_apply, get_configuration, configure_environment, restart_syscheckd): - """Check if the rules are added to Audit when it restarts.""" + """Check if the rules are added to Audit when it restarts. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. + """ + logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) @@ -125,12 +169,12 @@ def test_readded_rules_on_restart(tags_to_apply, get_configuration, p.wait() wazuh_log_monitor.start(timeout=10, - callback=callback_audit_connection, + callback=fim.callback_audit_connection, error_message=f'Did not receive expected "connect" event with the command ' f'{" ".join(restart_command)}') events = wazuh_log_monitor.start(timeout=30, - callback=callback_audit_reloaded_rule, + callback=fim.callback_audit_added_rule, accum_results=3, error_message=f'Did not receive expected "load" event with the command ' f'{" ".join(restart_command)}').result() @@ -145,7 +189,19 @@ def test_readded_rules_on_restart(tags_to_apply, get_configuration, ]) def test_move_rules_realtime(tags_to_apply, get_configuration, configure_environment, restart_syscheckd): - """Check if the rules are changed to realtime when Audit stops.""" + """Check if the rules are changed to realtime when Audit stops. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. + """ + logger.info('Applying the test configuration') check_apply_test(tags_to_apply, get_configuration['tags']) @@ -155,7 +211,7 @@ def test_move_rules_realtime(tags_to_apply, get_configuration, p.wait() events = wazuh_log_monitor.start(timeout=30, - callback=callback_realtime_added_directory, + callback=fim.callback_realtime_added_directory, accum_results=3, error_message=f'Did not receive expected "directory added" for monitoring ' f'with the command {" ".join(stop_command)}').result() @@ -173,16 +229,22 @@ def test_move_rules_realtime(tags_to_apply, get_configuration, ("custom_audit_key", "/testdir1") ]) def test_audit_key(audit_key, path, get_configuration, configure_environment, restart_syscheckd): - """Check functionality by adding a audit rule and checking if alerts with that key are triggered when + """Check `` functionality by adding a audit rule and checking if alerts with that key are triggered when a file is created. - Parameters - ---------- - audit_key : str - Name of the audit_key to monitor - path : str - Path of the folder to be monitored + Args: + audit_key (str): Name of the audit_key to monitor. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ + logger.info('Applying the test configuration') check_apply_test({audit_key}, get_configuration['tags']) @@ -192,15 +254,15 @@ def test_audit_key(audit_key, path, get_configuration, configure_environment, re # Restart and for wazuh control_service('stop') - truncate_file(LOG_FILE_PATH) - wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) + truncate_file(fim.LOG_FILE_PATH) + wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) control_service('start') - detect_initial_scan(wazuh_log_monitor) + fim.detect_initial_scan(wazuh_log_monitor) # Look for audit_key word - create_file(REGULAR, path, "testfile") + fim.create_file(fim.REGULAR, path, "testfile") events = wazuh_log_monitor.start(timeout=30, - callback=callback_audit_key, + callback=fim.callback_audit_key, accum_results=1, error_message=f'Did not receive expected "Match audit_key ..." event ' f'with the command {" ".join(add_rule_command)}').result() @@ -215,15 +277,21 @@ def test_audit_key(audit_key, path, get_configuration, configure_environment, re ({'restart_audit_false'}, False) ]) def test_restart_audit(tags_to_apply, should_restart, get_configuration, configure_environment, restart_syscheckd): - """Check functionality by removing the plugin and monitoring audit to see if it restart and create + """Check `` functionality by removing the plugin and monitoring audit to see if it restart and create the file again. - Parameters - ---------- - tags_to_apply : set - Run test if matches with a configuration identifier, skip otherwise - should_restart : boolean - True if Auditd should restart, False otherwise + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + should_restart (boolean): True if Auditd should restart, False otherwise + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the time before the and after the restart are equal when auditd has been restarted or if the time + before and after the restart are different when auditd hasn't been restarted """ def get_audit_creation_time(): diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py b/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py index 6a015616ce..52f3a1a130 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit_after_initial_scan.py @@ -8,13 +8,11 @@ import subprocess import pytest -from wazuh_testing.fim import (LOG_FILE_PATH, - callback_audit_reloaded_rule, - callback_audit_removed_rule, - callback_audit_connection_close, - callback_audit_connection) +import wazuh_testing.fim as fim + from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor +from wazuh_testing import global_parameters # Marks @@ -27,7 +25,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -54,24 +52,29 @@ def test_remove_and_read_folder(tags_to_apply, folder, get_configuration, wait_for_fim_start): """Remove folder which is monitored with auditd and then create it again. - Parameters - ---------- - tags_to_apply : set - Configuration tag to apply in the test - folder : str - The folder to remove and read + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + folder (str): The folder to remove and read. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) shutil.rmtree(folder, ignore_errors=True) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_removed_rule, + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_removed_rule, error_message=f'Did not receive expected "removed" event ' f'removing the folder {folder}') os.makedirs(folder, mode=0o777) - wazuh_log_monitor.start(timeout=30, callback=callback_audit_reloaded_rule, - error_message='Did not receive expected "reload" event') + fim.wait_for_audit(True, wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_added_rule, + error_message='Did not receive expected "added" event') @pytest.mark.parametrize('tags_to_apply', [ @@ -81,10 +84,15 @@ def test_reconnect_to_audit(tags_to_apply, get_configuration, configure_environm restart_syscheckd, wait_for_fim_start): """Restart auditd and check Wazuh reconnect to auditd - Parameters - ---------- - tags_to_apply : set - Configuration tag to apply in the test + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) @@ -92,7 +100,7 @@ def test_reconnect_to_audit(tags_to_apply, get_configuration, configure_environm restart_command = ["service", "auditd", "restart"] subprocess.run(restart_command, check=True) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_connection_close, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_connection_close, error_message='Did not receive expected "audit connection close" event') - wazuh_log_monitor.start(timeout=20, callback=callback_audit_connection, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_connection, error_message='Did not receive expected "audit connection" event') diff --git a/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py b/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py index e0d8b9a832..0966333996 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py +++ b/tests/integration/test_fim/test_files/test_audit/test_audit_no_dir.py @@ -8,8 +8,9 @@ import sys import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import generate_params, callback_audit_unable_dir, callback_audit_added_rule from wazuh_testing.tools import PREFIX, LOG_FILE_PATH, ALERT_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -25,12 +26,12 @@ filename = 'testfile' test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) wazuh_alert_monitor = FileMonitor(ALERT_FILE_PATH) # Configurations -p, m = generate_params(extra_params={'TEST_DIRECTORIES': testdir}, modes=['whodata']) +p, m = fim.generate_params(extra_params={'TEST_DIRECTORIES': testdir}, modes=['whodata']) configurations = load_wazuh_configurations(configurations_path, __name__, params=p, metadata=m) @@ -69,21 +70,29 @@ def test_audit_no_dir(tags_to_apply, get_configuration, configure_environment, r is up, the audit rules are reloaded every 30 seconds (not configurable), so when the directory is created, it starts to be monitored. - Parameters - ---------- - tags_to_apply : set - Configuration tag to apply in the test + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the path of the event is wrong. """ + check_apply_test(tags_to_apply, get_configuration['tags']) # Assert message is generated: Unable to add audit rule for .... - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_audit_unable_dir, + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_unable_dir, error_message='Did not receive message "Unable to add audit rule for ..."' ).result() assert result == testdir, f'{testdir} not in "Unable to add audit rule for {result}" message' # Create the directory and verify that it is added to the audit rules. It is checked every 30 seconds. os.makedirs(testdir) - result = wazuh_log_monitor.start(timeout=30, callback=callback_audit_added_rule, + fim.wait_for_audit(True, wazuh_log_monitor) + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_audit_added_rule, error_message='Folders were not added to Audit rules list').result() assert result == testdir, f'{testdir} not in "Added audit rule for monitoring directory: {result}" message' diff --git a/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py b/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py index 9eee335787..574685fe52 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py +++ b/tests/integration/test_fim/test_files/test_audit/test_remove_audit.py @@ -8,8 +8,9 @@ import subprocess import pytest +import wazuh_testing.fim as fim + from distro import id -from wazuh_testing.fim import LOG_FILE_PATH, callback_audit_cannot_start from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,7 +25,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -76,14 +77,20 @@ def test_move_folders_to_realtime(tags_to_apply, get_configuration, uninstall_in configure_environment, restart_syscheckd): """Check folders monitored with Whodata change to Real-time if auditd is not installed - Parameters - ---------- - tags_to_apply : set - Configuration tag to apply + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + uninstall_install_audit (fixture): Uninstall auditd before the test and install auditd again after the test is + executed. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + Raises: + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_cannot_start, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_cannot_start, error_message='Did not receive expected "Who-data engine could not start. ' 'Switching who-data to real-time" event') diff --git a/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py b/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py index 4c41418948..c29019ce82 100644 --- a/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py +++ b/tests/integration/test_fim/test_files/test_audit/test_remove_rule_five_times.py @@ -7,9 +7,8 @@ import subprocess import pytest -from wazuh_testing.fim import (LOG_FILE_PATH, - callback_audit_rules_manipulation, - callback_audit_deleting_rule) +import wazuh_testing.fim as fim + from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,7 +23,7 @@ test_directories = [os.path.join('/', 'testdir1'), os.path.join('/', 'testdir2'), os.path.join('/', 'testdir3')] testdir1, testdir2, testdir3 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations @@ -48,23 +47,28 @@ def test_remove_rule_five_times(tags_to_apply, folder, audit_key, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): """Remove auditd rule using auditctl five times and check Wazuh ignores folder. - Parameters - ---------- - tags_to_apply : set - Configuration tag to apply in the test - folder : str - The folder to remove and read - audit_key : str - The key which Wazuh put. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + folder (str): Path whose rule will be removed. + audit_key (str): Name of the configured audit key. + get_configuration (fixture): Gets the current configuration of the test. + uninstall_install_audit (fixture): Uninstall auditd before the test and install auditd again after the test is + executed. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) for _ in range(0, 5): subprocess.run(["auditctl", "-W", folder, "-p", "wa", "-k", audit_key], check=True) - wazuh_log_monitor.start(timeout=20, callback=callback_audit_rules_manipulation, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_rules_manipulation, error_message='Did not receive expected ' '"Detected Audit rules manipulation" event') - wazuh_log_monitor.start(timeout=20, callback=callback_audit_deleting_rule, + wazuh_log_monitor.start(timeout=20, callback=fim.callback_audit_deleting_rule, error_message='Did not receive expected "Deleting Audit rules" event') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py index 90c587003f..df1b5bc258 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/common.py @@ -4,8 +4,7 @@ import subprocess import sys -from wazuh_testing.fim import callback_audit_loaded_rule, create_file, REGULAR, SYMLINK, callback_symlink_scan_ended, \ - change_internal_options +from wazuh_testing.fim import create_file, REGULAR, SYMLINK, callback_symlink_scan_ended, change_internal_options from wazuh_testing.tools import PREFIX # variables @@ -43,13 +42,6 @@ def modify_symlink(target, path, file=None): subprocess.call(['ln', '-sfn', target, path]) -def wait_for_audit(whodata, monitor): - """Wait for the audit callback if we are using whodata monitoring""" - if whodata: - monitor.start(timeout=30, callback=callback_audit_loaded_rule, - error_message='Did not receive expected "Audit rule loaded: -w ... -p" event') - - def delete_f(path, file=None): """Delete given path. Directory or file""" if file is None: diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/data/wazuh_conf.yaml b/tests/integration/test_fim/test_files/test_follow_symbolic_link/data/wazuh_conf.yaml index 4b6ea0888b..8afff22cef 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/data/wazuh_conf.yaml +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/data/wazuh_conf.yaml @@ -155,3 +155,18 @@ attributes: - FIM_MODE - follow_symbolic_link: FOLLOW_MODE +# conf 10 +- tags: + - check_audit_removed_rules + apply_to_modules: + - test_audit_rules_removed_after_change_link + sections: + - section: syscheck + elements: + - disabled: + value: 'no' + - directories: + value: LINK_PATH + attributes: + - FIM_MODE + - follow_symbolic_link: FOLLOW_MODE diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py new file mode 100644 index 0000000000..1875e105bc --- /dev/null +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_audit_rules_removed_after_change_link.py @@ -0,0 +1,117 @@ +# Copyright (C) 2015-2021, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 +import os +import subprocess + +import pytest +import wazuh_testing.fim as fim + +from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test +from wazuh_testing.tools.monitoring import FileMonitor +from wazuh_testing import global_parameters +from wazuh_testing.tools import PREFIX + +from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, testdir_not_target, \ + wait_for_symlink_check, modify_symlink +# noinspection PyUnresolvedReferences +from test_fim.test_files.test_follow_symbolic_link.common import test_directories + +pytestmark = [pytest.mark.linux, pytest.mark.tier(level=1)] + +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) + +# Variables + +fname = "testfile" +symlink_root_path = PREFIX +symlink_name = "symlink" +symlink_path = os.path.join(symlink_root_path, symlink_name) +link_interval = 2 + +param_dir = { + 'FOLLOW_MODE': 'yes', + 'LINK_PATH': symlink_path +} + +# Configurations + +conf_params, conf_metadata = fim.generate_params(extra_params=param_dir, modes=['whodata']) +configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) + +# Functions + + +def extra_configuration_before_yield(): + """ + Setup the symlink to one folder + """ + # Symlink pointing to testdir1 + fim.create_file(fim.SYMLINK, symlink_root_path, symlink_name, target=testdir1) + # Set symlink_scan_interval to a given value + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=link_interval) + + +def extra_configuration_after_yield(): + """ + Setup the symlink to one folder + """ + # Symlink pointing to testdir1 + os.remove(symlink_path) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) + + +# fixtures + +@pytest.fixture(scope='module', params=configurations) +def get_configuration(request): + """Get configurations from the module.""" + return request.param + + +# tests + +@pytest.mark.parametrize('replaced_target, new_target, file_name, tags_to_apply', [ + (testdir1, testdir_not_target, f'{fname}_1', {'check_audit_removed_rules'}) + ]) +def test_audit_rules_removed_after_change_link(replaced_target, new_target, file_name, tags_to_apply, + get_configuration, configure_environment, + restart_syscheckd, wait_for_fim_start): + """ Test that checks if the audit rules are removed when the symlink target's is changed. + + Args: + replaced_target (str): Directory where the link is pointing. + new_target (str): Directory where the link will be pointed after it's updated. + file_name (str): Name of the file that will be created inside the folders. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If an expected event couldn't be captured. + ValueError: If the event type isn't added or if the audit rule for ``replaced_target`` isn't removed. + + """ + check_apply_test(tags_to_apply, get_configuration['tags']) + fim.create_file(fim.REGULAR, replaced_target, file_name) + ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, + error_message='Did not receive expected "Sending FIM event: ..." event').result() + + assert ev['data']['type'] == 'added' and ev['data']['path'] == os.path.join(replaced_target, file_name) + + # Change the target of the symlink and expect events while there's no syscheck scan + + modify_symlink(new_target, symlink_path) + wait_for_symlink_check(wazuh_log_monitor) + fim.wait_for_audit(True, wazuh_log_monitor) + + rules_paths = str(subprocess.check_output(['auditctl', '-l'])) + fim.create_file(fim.REGULAR, new_target, file_name) + ev = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, + error_message='Did not receive expected "Sending FIM event: ..." event').result() + + assert ev['data']['type'] == 'added' and ev['data']['path'] == os.path.join(new_target, file_name) + + assert replaced_target not in rules_paths, f'The audit rule has been reloaded for {replaced_target}' diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py index 5dd43834b6..30c88dab50 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target.py @@ -4,24 +4,24 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ - modify_symlink, testdir_link, wait_for_symlink_check, wait_for_audit, testdir_target, testdir_not_target + modify_symlink, testdir_link, wait_for_symlink_check, testdir_target, testdir_not_target # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_after_yield, \ extra_configuration_before_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor # All tests in this module apply to linux only pytestmark = [pytest.mark.linux, pytest.mark.sunos5, pytest.mark.darwin, pytest.mark.tier(level=1)] -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -44,19 +44,24 @@ def get_configuration(request): ]) def test_symbolic_change_target(tags_to_apply, main_folder, aux_folder, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck updates the symlink target properly + """Check if syscheck updates the symlink target properly - CHECK: Having a symbolic link pointing to a file/folder, change the target of the link to another file/folder. + Having a symbolic link pointing to a file/folder, change the target of the link to another file/folder. Ensure that the old file is being monitored and the new one is not before symlink_checker runs. Wait until symlink_checker runs and ensure that the new file is being monitored and the old one is not. - Parameters - ---------- - main_folder : str - Directory that is being pointed at or contains the pointed file. - aux_folder : str - Directory that will be pointed at or will contain the future pointed file. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + main_folder (str): Directory that is being pointed at or contains the pointed file. + aux_folder (str): Directory that will be pointed at or will contain the future pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. """ def modify_and_check_events(f1, f2, text): @@ -64,16 +69,16 @@ def modify_and_check_events(f1, f2, text): Modify the content of 2 given files. We assume the first one is being monitored and the other one is not. We expect a 'modified' event for the first one and a timeout for the second one. """ - modify_file_content(f1, file1, text) - modify_file_content(f2, file1, text) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(f1, file1, text) + fim.modify_file_content(f2, file1, text) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event' ).result() assert 'modified' in modify['data']['type'] and f1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') @@ -85,22 +90,22 @@ def modify_and_check_events(f1, f2, text): # If symlink is pointing to a directory, we need to add files and expect their 'added' event (only if the file # is being created withing the pointed directory if main_folder == testdir_target: - create_file(REGULAR, main_folder, file1, content='') - create_file(REGULAR, aux_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - add = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.create_file(fim.REGULAR, aux_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + add = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event' ).result() assert 'added' in add['data']['type'] and file1 in add['data']['path'], \ f"'added' event not matching for {file1}" with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') else: - create_file(REGULAR, aux_folder, file1, content='') + fim.create_file(fim.REGULAR, aux_folder, file1, content='') with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') @@ -113,7 +118,7 @@ def modify_and_check_events(f1, f2, text): modify_and_check_events(main_folder, aux_folder, 'Sample number one') wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Expect events the other way around now modify_and_check_events(aux_folder, main_folder, 'Sample number two') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py index d7871cb206..c45fb6b4f8 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_inside_folder.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ - modify_symlink, testdir_link, wait_for_symlink_check, wait_for_audit, testdir_target, testdir2 + modify_symlink, testdir_link, wait_for_symlink_check, testdir_target, testdir2 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,13 +21,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -46,20 +46,27 @@ def get_configuration(request): ]) def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, new_target, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck stops detecting events from previous target when pointing to a new folder + """Check if syscheck stops detecting events from previous target when pointing to a new folder - CHECK: Having a symbolic link pointing to a file/folder, change its target to another file/folder inside a monitored + Having a symbolic link pointing to a file/folder, change its target to another file/folder inside a monitored folder. After symlink_checker runs check that no events for the previous target file are detected while events for the new target are still being raised. - Parameters - ---------- - previous_target : str - Previous symlink target (path) - new_target : str - New symlink target (path). + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + previous_target (str): Previous symlink target. + new_target (str): New symlink target (path). + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ + check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' whodata = get_configuration['metadata']['fim_mode'] == 'whodata' @@ -68,27 +75,27 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne # Check create event if it's pointing to a directory if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, previous_target, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, previous_target, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Change the target to another file and wait the symcheck to update the link information modify_symlink(new_target, os.path.join(testdir_link, symlink)) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Modify the content of the previous target and don't expect events. Modify the new target and expect an event - modify_file_content(previous_target, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(previous_target, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') - modify_file_content(testdir2, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(testdir2, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and os.path.join(testdir2, file1) in modify['data']['path'], \ diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py index a30ee8adb1..58cf64ee6e 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_change_target_with_nested_directory.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ - modify_symlink, testdir_link, wait_for_symlink_check, wait_for_audit, testdir2 + modify_symlink, testdir_link, wait_for_symlink_check, testdir2 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger, global_parameters -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,14 +21,14 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}, +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -46,18 +46,24 @@ def get_configuration(request): ]) def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, new_target, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck stops detecting events from previous target when pointing to a new folder + """Check if syscheck stops detecting events from previous target when pointing to a new folder - CHECK: Having a symbolic link pointing to a folder which contains another monitored directory. Changing the target + Having a symbolic link pointing to a folder which contains another monitored directory. Changing the target should not trigger 'added' events for the monitored subdirectory on the next scan. - Parameters - ---------- - previous_target : str - Previous symlink target (path) - new_target : str - New symlink target (path). + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + previous_target (str): Previous symlink target (path) + new_target (str): New symlink target (path). + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -66,19 +72,19 @@ def test_symbolic_change_target_inside_folder(tags_to_apply, previous_target, ne symlink = 'symlink3' # Check create event - create_file(REGULAR, previous_target, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.create_file(fim.REGULAR, previous_target, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Change the target to another file and wait the symcheck to update the link information modify_symlink(new_target, os.path.join(testdir_link, symlink)) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Verify that no events are generated - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py index 617de4540f..ede91143b7 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_symlink.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ testdir_link, wait_for_symlink_check, testdir_target, testdir_not_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, create_file, REGULAR, SYMLINK, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -21,13 +21,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -46,45 +46,55 @@ def get_configuration(request): ]) def test_symbolic_delete_symlink(tags_to_apply, main_folder, aux_folder, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck stops detecting events when deleting the monitored symlink. + """Check if syscheck stops detecting events when deleting the monitored symlink. - CHECK: Having a symbolic link pointing to a file/folder, remove that symbolic link file, wait for the symlink + Having a symbolic link pointing to a file/folder, remove that symbolic link file, wait for the symlink checker runs and modify the target file. No events should be detected. Restore the symbolic link and modify the target file again once symlink checker runs. Events should be detected now. - Parameters - ---------- - main_folder : str - Directory that is being pointed at or contains the pointed file. - aux_folder : str - Directory that will be pointed at or will contain the future pointed file. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + main_folder (str): Directory that is being pointed at or contains the pointed file. + aux_folder (str): Directory that will be pointed at or will contain the future pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) + scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' file1 = 'regular1' if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Remove symlink and don't expect events symlink = 'symlink' if tags_to_apply == {'monitored_file'} else 'symlink2' delete_f(testdir_link, symlink) wait_for_symlink_check(wazuh_log_monitor) - modify_file_content(main_folder, file1, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(main_folder, file1, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Restore symlink and modify the target again. Expect events now - create_file(SYMLINK, testdir_link, symlink, target=os.path.join(main_folder, file1)) + fim.create_file(fim.SYMLINK, testdir_link, symlink, target=os.path.join(main_folder, file1)) wait_for_symlink_check(wazuh_log_monitor) - modify_file_content(main_folder, file1, new_content='Sample modification 2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + # Wait unitl the audit rule of the link's target is loaded again + fim.wait_for_audit(get_configuration['metadata']['fim_mode'] == "whodata", wazuh_log_monitor) + + fim.modify_file_content(main_folder, file1, new_content='Sample modification 2') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py index ad4596f4ee..4e35ab88be 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_delete_target.py @@ -5,15 +5,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ - wait_for_symlink_check, wait_for_audit, testdir_target, testdir_not_target, delete_f + wait_for_symlink_check, testdir_target, testdir_not_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import generate_params, create_file, REGULAR, callback_detect_event, \ - callback_audit_removed_rule, callback_audit_reloaded_rule, callback_audit_reloading_rules, check_time_travel, \ - modify_file_content, LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -23,13 +22,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -48,21 +47,27 @@ def get_configuration(request): ]) def test_symbolic_delete_target(tags_to_apply, main_folder, aux_folder, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck detects events properly when removing a target, have the symlink updated and + """Check if syscheck detects events properly when removing a target, have the symlink updated and then recreating the target - CHECK: Having a symbolic link pointing to a file/folder, remove that file/folder and check that deleted event is + Having a symbolic link pointing to a file/folder, remove that file/folder and check that deleted event is detected. Once symlink_checker runs create the same file. No events should be raised. Wait again for symlink_checker run and modify the file. Modification event must be detected this time. - Parameters - ---------- - main_folder : str - Directory that is being pointed at or contains the pointed file. - aux_folder : str - Directory that will be pointed at or will contain the future pointed file. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + main_folder (str): Directory that is being pointed at or contains the pointed file. + aux_folder (str): Directory that will be pointed at or will contain the future pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -73,58 +78,58 @@ def test_symbolic_delete_target(tags_to_apply, main_folder, aux_folder, get_conf # If symlink is pointing to a directory, we need to add files and expect their 'added' event (only if the file # is being created withing the pointed directory. Then, delete the pointed file or directory if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') delete_f(main_folder) else: delete_f(main_folder, file1) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - delete = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + delete = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert 'deleted' in delete['data']['type'] and file1 in delete['data']['path'], \ f"'deleted' event not matching for {file1}" if tags_to_apply == {'monitored_dir'} and whodata: - os.makedirs(main_folder, exist_ok=True, mode=0o777) - wazuh_log_monitor.start(timeout=3, callback=callback_audit_removed_rule, + wazuh_log_monitor.start(timeout=3, callback=fim.callback_audit_removed_rule, error_message='Did not receive expected "Monitored directory \'{main_folder}\' was' 'removed: Audit rule removed') - wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=callback_audit_reloading_rules, + os.makedirs(main_folder, exist_ok=True, mode=0o777) + wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=fim.callback_audit_reloading_rules, error_message='Did not receive expected "Reloading Audit rules" event') - wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=callback_audit_reloaded_rule, - error_message='Did not receive expected "Reloaded audit rule for monitoring directory: ' - '\'{main_folder}\'" event') + wazuh_log_monitor.start(timeout=RELOAD_RULES_INTERVAL, callback=fim.callback_audit_added_rule, + error_message='Did not receive expected "Added audit rule... ' + '\'{main_folder}\'" event') else: # If syscheck is monitoring with whodata, wait for audit to reload rules - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) wait_for_symlink_check(wazuh_log_monitor) # Restore the target - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) if tags_to_apply == {'monitored_dir'} and whodata: - wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') else: # We don't expect any event since symlink hasn't updated the link information with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error('A "Sending FIM event: ..." event has been detected. No event should be detected as symlink ' 'has not updated the link information yet.') logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) # Modify the files and expect events since symcheck has updated now - modify_file_content(main_folder, file1, 'Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(main_folder, file1, 'Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ f"'modified' event not matching for {file1}" diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py index 6890d42449..d6ccfd9954 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_follow_symbolic_disabled.py @@ -5,14 +5,12 @@ import os import pytest +import wazuh_testing.fim as fim from test_fim.test_files.test_follow_symbolic_link.common import testdir_target, testdir1 # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (LOG_FILE_PATH, - generate_params, create_file, REGULAR, callback_detect_event, - modify_file, delete_file, check_time_travel) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,11 +22,11 @@ test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'no'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'no'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -56,10 +54,16 @@ def test_follow_symbolic_disabled(path, tags_to_apply, get_configuration, config Ensure that the monitored symbolic link is considered a regular file and it will not follow its target path. It will only generate events if it changes somehow, not its target (file or directory) - Parameters - ---------- - path : str - Path of the target file or directory + Args: + path (str): Path of the target file or directory + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -68,25 +72,25 @@ def test_follow_symbolic_disabled(path, tags_to_apply, get_configuration, config # If the symlink targets to a directory, create a file in it and ensure no event is raised. if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, path, regular_file) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.create_file(fim.REGULAR, path, regular_file) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) # Modify the target file and don't expect any events - modify_file(path, regular_file, new_content='Modify sample') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file(path, regular_file, new_content='Modify sample') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) # Delete the target file and don't expect any events - delete_file(path, regular_file) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.delete_file(path, regular_file) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(error_msg) raise AttributeError(error_msg) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py index 70807d7e61..a2b291fd67 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_monitor_symlink.py @@ -3,14 +3,13 @@ # This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ testdir_target, delete_f # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield - -from wazuh_testing.fim import (generate_params, create_file, REGULAR, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -20,13 +19,13 @@ # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata ) -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # fixtures @@ -45,16 +44,21 @@ def get_configuration(request): ]) def test_symbolic_monitor_symlink(tags_to_apply, main_folder, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check what happens with a symlink and its target when syscheck monitors it. + """Check what happens with a symlink and its target when syscheck monitors it. CHECK: Having a symbolic link pointing to a file/folder, modify and delete the file. Check that alerts are being raised. - Parameters - ---------- - main_folder : str - Directory that is being pointed at or contains the pointed file. + Args: + main_folder (str): Directory that is being pointed at or contains the pointed file. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -62,26 +66,26 @@ def test_symbolic_monitor_symlink(tags_to_apply, main_folder, get_configuration, # Add creation if symlink is pointing to a folder if tags_to_apply == {'monitored_dir'}: - create_file(REGULAR, main_folder, file1, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - add = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + fim.create_file(fim.REGULAR, main_folder, file1, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + add = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'added' in add['data']['type'] and file1 in add['data']['path'], \ - f"'added' event not matching" + "'added' event not matching" # Modify the linked file and expect an event - modify_file_content(main_folder, file1, 'Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - modify = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.modify_file_content(main_folder, file1, 'Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + modify = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'modified' in modify['data']['type'] and file1 in modify['data']['path'], \ - f"'modified' event not matching" + "'modified' event not matching" # Delete the linked file and expect an event delete_f(main_folder, file1) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - delete = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + delete = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'deleted' in delete['data']['type'] and file1 in delete['data']['path'], \ - f"'deleted' event not matching" + "'deleted' event not matching" diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py index 015059a73a..8d77f90611 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_not_following_symbolic_link.py @@ -5,11 +5,10 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import modify_symlink from wazuh_testing import global_parameters, logger -from wazuh_testing.fim import (LOG_FILE_PATH, - generate_params, create_file, REGULAR, SYMLINK, callback_detect_event, - modify_file, delete_file, check_time_travel) from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -26,11 +25,11 @@ os.path.join(PREFIX, 'testdir2')] testdir_link, testdir1, testdir2 = test_directories -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -54,21 +53,27 @@ def get_configuration(request): def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_dir1, non_monitored_dir2, sym_target, tags_to_apply, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check what happens with a symlink and its target when syscheck monitors a directory with a symlink + """Check what happens with a symlink and its target when syscheck monitors a directory with a symlink and not the symlink itself. When this happens, the symbolic link is considered a regular file and it will not follow its target path. It will only generate events if it changes somehow, not its target (file or directory) - Parameters - ---------- - monitored_dir : str - Monitored directory. - non_monitored_dir1 : str - Non-monitored directory. - non_monitored_dir2 : str - Non-monitored directory. + + Args: + monitored_dir (str): Monitored directory. + non_monitored_dir1 (str): Non-monitored directory. + non_monitored_dir2 (str): Non-monitored directory. + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) name1 = f'{sym_target}regular1' @@ -80,35 +85,35 @@ def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_di scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Create regular files out of the monitored directory and don't expect its event - create_file(REGULAR, non_monitored_dir1, name1, content='') - create_file(REGULAR, non_monitored_dir1, name2, content='') + fim.create_file(fim.REGULAR, non_monitored_dir1, name1, content='') + fim.create_file(fim.REGULAR, non_monitored_dir1, name2, content='') target = a_path if sym_target == 'file' else non_monitored_dir1 - create_file(SYMLINK, monitored_dir, sl_name, target=target) + fim.create_file(fim.SYMLINK, monitored_dir, sl_name, target=target) # Create the syslink and expect its event, since it's withing the monitored directory - check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected "Sending FIM event: ..." event') # Modify the target file and don't expect any event - modify_file(non_monitored_dir1, name1, new_content='Modify sample') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file(non_monitored_dir1, name1, new_content='Modify sample') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Modify the target of the symlink and expect the modify event modify_symlink(target=b_path, path=sl_path) - check_time_travel(scheduled, monitor=wazuh_log_monitor) - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() if 'modified' in result['data']['type']: logger.info("Received modified event. No more events will be expected.") elif 'deleted' in result['data']['type']: logger.info("Received deleted event. Now an added event will be expected.") - result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + result = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() assert 'added' in result['data']['type'], f"The event {result} should be of type 'added'" @@ -116,10 +121,10 @@ def test_symbolic_monitor_directory_with_symlink(monitored_dir, non_monitored_di assert False, f"Detected event {result} should be of type 'modified' or 'deleted'" # Remove and restore the target file. Don't expect any events - delete_file(b_path, name2) - create_file(REGULAR, non_monitored_dir1, name2, content='') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.delete_file(b_path, name2) + fim.create_file(fim.REGULAR, non_monitored_dir1, name2, content='') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=5, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=5, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py index 11c6cceb02..e0ace73547 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_revert_symlink.py @@ -4,14 +4,14 @@ import os import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import configurations_path, testdir1, \ - modify_symlink, testdir_link, wait_for_symlink_check, wait_for_audit + modify_symlink, testdir_link, wait_for_symlink_check # noinspection PyUnresolvedReferences from test_fim.test_files.test_follow_symbolic_link.common import test_directories, extra_configuration_before_yield, \ extra_configuration_after_yield from wazuh_testing import logger -from wazuh_testing.fim import (generate_params, callback_detect_event, - check_time_travel, modify_file_content, LOG_FILE_PATH) from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -19,11 +19,11 @@ pytestmark = [pytest.mark.linux, pytest.mark.sunos5, pytest.mark.darwin, pytest.mark.tier(level=1)] -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata @@ -45,18 +45,29 @@ def get_configuration(request): ]) def test_symbolic_revert_symlink(tags_to_apply, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Check if syscheck detects new targets properly + """Check if syscheck detects new targets properly - CHECK: Having a symbolic link pointing to a file/folder, change its target to a folder. Check that the old file + Having a symbolic link pointing to a file/folder, change its target to a folder. Check that the old file is not being monitored anymore and the new folder is. Revert the target change and ensure the file is being monitored and the folder is not. + + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ def modify_and_assert(file): - modify_file_content(testdir1, file, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) - ev = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event).result() + fim.modify_file_content(testdir1, file, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) + ev = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event).result() assert 'modified' in ev['data']['type'] and os.path.join(testdir1, file) in ev['data']['path'], \ f"'modified' event not matching for {testdir1} {file}" @@ -67,26 +78,29 @@ def modify_and_assert(file): file2 = 'regular2' # Don't expect an event since it is not being monitored yet - modify_file_content(testdir1, file2, new_content='Sample modification') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.modify_file_content(testdir1, file2, new_content='Sample modification') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Change the target to the folder and now expect an event modify_symlink(testdir1, os.path.join(testdir_link, 'symlink')) wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) modify_and_assert(file2) # Modify symlink target, wait for sym_check to update it modify_symlink(os.path.join(testdir1, file1), os.path.join(testdir_link, 'symlink')) wait_for_symlink_check(wazuh_log_monitor) - modify_file_content(testdir1, file2, new_content='Sample modification2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + # Wait for audit to reload the rules + fim.wait_for_audit(whodata, wazuh_log_monitor) + + fim.modify_file_content(testdir1, file2, new_content='Sample modification2') + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=3, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=3, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') modify_and_assert(file1) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py index b90c23c77c..3a03646a35 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_and_dir.py @@ -5,12 +5,12 @@ import os import pytest -from test_fim.test_files.test_follow_symbolic_link.common import wait_for_symlink_check, wait_for_audit, \ +import wazuh_testing.fim as fim + +from test_fim.test_files.test_follow_symbolic_link.common import wait_for_symlink_check, \ symlink_interval, \ modify_symlink from wazuh_testing import global_parameters, logger -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, change_internal_options, \ - callback_detect_event, check_time_travel from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -27,11 +27,11 @@ testdir_target = test_directories[1] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -47,16 +47,16 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, 'testdir_link', target=testdir) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, 'testdir_link', target=testdir) # Set symlink_scan_interval to a given value - change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) def extra_configuration_after_yield(): """Set symlink_scan_interval to default value and remove symbolic link""" os.remove(testdir_link) - change_internal_options(param='syscheck.symlink_scan_interval', value=600) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) # Tests @@ -66,15 +66,20 @@ def extra_configuration_after_yield(): ]) def test_symlink_dir_inside_monitored_dir(tags_to_apply, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Monitor a directory and a symbolic link to it, change the target of the symbolic link. + """Monitor a directory and a symbolic link to it, change the target of the symbolic link. The directory must be scanned silently, preventing events from triggering until it has finished. - Parameters - ---------- - tags_to_apply : set - Run test if matches with a configuration identifier, skip otherwise. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -85,20 +90,20 @@ def test_symlink_dir_inside_monitored_dir(tags_to_apply, get_configuration, conf # Wait for both audit and the symlink check to run wait_for_symlink_check(wazuh_log_monitor) - wait_for_audit(whodata, wazuh_log_monitor) + fim.wait_for_audit(whodata, wazuh_log_monitor) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) with pytest.raises(TimeoutError): - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event) + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event) logger.error(f'Unexpected event {event.result()}') raise AttributeError(f'Unexpected event {event.result()}') # Create a file in the pointed folder and expect events - create_file(REGULAR, testdir_link, 'regular2') + fim.create_file(fim.REGULAR, testdir_link, 'regular2') - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) - wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event') diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py index dadec95787..22a1d8dcad 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_dir_inside_monitored_dir.py @@ -5,9 +5,9 @@ import os import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, \ - REQUIRED_ATTRIBUTES, CHECK_ALL, CHECK_SIZE, regular_file_cud from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -24,11 +24,11 @@ testdir_target = os.path.join(testdir, 'testdir_target') test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -45,8 +45,8 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, 'testdir_link', target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, 'testdir_link', target=testdir_target) def extra_configuration_after_yield(): @@ -57,28 +57,33 @@ def extra_configuration_after_yield(): # Tests @pytest.mark.parametrize('tags_to_apply, checkers', [ - ({'symlink_dir_inside_monitored_dir'}, REQUIRED_ATTRIBUTES[CHECK_ALL] - {CHECK_SIZE}), + ({'symlink_dir_inside_monitored_dir'}, fim.REQUIRED_ATTRIBUTES[fim.CHECK_ALL] - {fim.CHECK_SIZE}), ]) def test_symlink_dir_inside_monitored_dir(tags_to_apply, checkers, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Monitor a directory within a directory monitored through a symbolic link with `follow_symbolic_link` enabled. + """Monitor a directory within a directory monitored through a symbolic link with `follow_symbolic_link` enabled. The monitored directory configuration should prevail over the configuration of the symbolic link (checks, follow_symbolic_link, etc...) - Parameters - ---------- - tags_to_apply : set - Run test if matches with a configuration identifier, skip otherwise. - checkers : dict - Check options to be used. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + checkers (dict): Check options to be used. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Alerts from the pointed directory should have all checks except size - regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, + fim.regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, time_travel=scheduled) # Alerts from the main directory should have all checks - regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) + fim.regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py index 59957c15f0..31d45be56f 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_to_dir_between_scans.py @@ -6,11 +6,12 @@ from shutil import rmtree import pytest +import wazuh_testing.fim as fim + from test_fim.test_files.test_follow_symbolic_link.common import wait_for_symlink_check, symlink_interval, \ testdir_link, testdir_target from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, change_internal_options, \ - check_time_travel, callback_detect_event + from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -23,11 +24,11 @@ test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}, modes=['scheduled']) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -46,17 +47,17 @@ def extra_configuration_before_yield(): symlinkdir = testdir_link os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, PREFIX, symlinkdir, target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, PREFIX, symlinkdir, target=testdir_target) # Set symlink_scan_interval to a given value - change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=symlink_interval) def extra_configuration_after_yield(): """Set symlink_scan_interval to default value""" rmtree(testdir_link, ignore_errors=True) rmtree(testdir_target, ignore_errors=True) - change_internal_options(param='syscheck.symlink_scan_interval', value=600) + fim.change_internal_options(param='syscheck.symlink_scan_interval', value=600) # Tests @@ -66,16 +67,22 @@ def extra_configuration_after_yield(): ]) def test_symlink_to_dir_between_scans(tags_to_apply, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Replace a link with a directory between scans. + """Replace a link with a directory between scans. - This test monitors a link with `follow_symblic_link` enabled. After the first scan, it is replaced with a directory, + This test monitors a link with `follow_symbolic_link` enabled. After the first scan, it is replaced with a directory, the new directory should send alerts during a second scan. - Parameters - ---------- - tags_to_apply : set - Run test if matches with a configuration identifier, skip otherwise. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' @@ -84,13 +91,13 @@ def test_symlink_to_dir_between_scans(tags_to_apply, get_configuration, configur # Delete symbolic link and create a folder with the same name os.remove(testdir_link) os.makedirs(testdir_link, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_link, regular2) + fim.create_file(fim.REGULAR, testdir_link, regular2) # Wait for both audit and the symlink check to run wait_for_symlink_check(wazuh_log_monitor) - check_time_travel(scheduled, monitor=wazuh_log_monitor) + fim.check_time_travel(scheduled, monitor=wazuh_log_monitor) - event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=callback_detect_event, + event = wazuh_log_monitor.start(timeout=global_parameters.default_timeout, callback=fim.callback_detect_event, error_message='Did not receive expected ' '"Sending FIM event: ..." event').result() diff --git a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py index 642a964f0d..b73dcbca9c 100644 --- a/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py +++ b/tests/integration/test_fim/test_files/test_follow_symbolic_link/test_symlink_within_dir.py @@ -6,9 +6,9 @@ from shutil import rmtree import pytest +import wazuh_testing.fim as fim + from wazuh_testing import global_parameters -from wazuh_testing.fim import SYMLINK, REGULAR, LOG_FILE_PATH, generate_params, create_file, \ - REQUIRED_ATTRIBUTES, CHECK_ALL, CHECK_SIZE, regular_file_cud from wazuh_testing.tools import PREFIX from wazuh_testing.tools.configuration import load_wazuh_configurations, check_apply_test from wazuh_testing.tools.monitoring import FileMonitor @@ -25,11 +25,11 @@ testdir_target = os.path.join(PREFIX, 'testdir_target') test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') -wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) +wazuh_log_monitor = FileMonitor(fim.LOG_FILE_PATH) # Configurations -conf_params, conf_metadata = generate_params(extra_params={'FOLLOW_MODE': 'yes'}) +conf_params, conf_metadata = fim.generate_params(extra_params={'FOLLOW_MODE': 'yes'}) configurations = load_wazuh_configurations(configurations_path, __name__, params=conf_params, metadata=conf_metadata) @@ -46,8 +46,8 @@ def get_configuration(request): def extra_configuration_before_yield(): """Create files and symlinks""" os.makedirs(testdir_target, exist_ok=True, mode=0o777) - create_file(REGULAR, testdir_target, 'regular1') - create_file(SYMLINK, testdir, 'testdir_link', target=testdir_target) + fim.create_file(fim.REGULAR, testdir_target, 'regular1') + fim.create_file(fim.SYMLINK, testdir, 'testdir_link', target=testdir_target) def extra_configuration_after_yield(): @@ -58,27 +58,33 @@ def extra_configuration_after_yield(): # Tests @pytest.mark.parametrize('tags_to_apply, checkers', [ - ({'symlink_within_directory'}, REQUIRED_ATTRIBUTES[CHECK_ALL] - {CHECK_SIZE}), + ({'symlink_within_directory'}, fim.REQUIRED_ATTRIBUTES[fim.CHECK_ALL] - {fim.CHECK_SIZE}), ]) def test_symlink_within_dir(tags_to_apply, checkers, get_configuration, configure_environment, restart_syscheckd, wait_for_fim_start): - """ - Monitor a link within a monitored directory. + """Monitor a link within a monitored directory. The link configuration should prevail over the monitored directory (checks, follow_symbolic_link, etc...). - Parameters - ---------- - tags_to_apply : set - Run test if matches with a configuration identifier, skip otherwise. - checkers : dict - Check options to be used. + Args: + tags_to_apply (set): Run test if matches with a configuration identifier, skip otherwise. + checkers (dict): Check options to be used. + get_configuration (fixture): Gets the current configuration of the test. + configure_environment (fixture): Configure the environment for the execution of the test. + restart_syscheckd (fixture): Restarts syscheck. + wait_for_fim_start (fixture): Waits until the first FIM scan is completed. + + Raises: + TimeoutError: If a expected event wasn't triggered. + AttributeError: If a unexpected event was captured. + ValueError: If the event's type and path are not the expected. """ check_apply_test(tags_to_apply, get_configuration['tags']) scheduled = get_configuration['metadata']['fim_mode'] == 'scheduled' # Alerts from the pointed directory should have all checks except size - regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, options=checkers, - time_travel=scheduled) + fim.regular_file_cud(testdir_target, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, + options=checkers, time_travel=scheduled) # Alerts from the main directory should have all checks - regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, time_travel=scheduled) + fim.regular_file_cud(testdir, wazuh_log_monitor, min_timeout=global_parameters.default_timeout, + time_travel=scheduled)