diff --git a/tests/tacacs/test_accounting.py b/tests/tacacs/test_accounting.py index a6d467faf1..3a2cef6df0 100644 --- a/tests/tacacs/test_accounting.py +++ b/tests/tacacs/test_accounting.py @@ -1,7 +1,6 @@ import logging import time from tests.common.devices.ptf import PTFHost -import binascii import pytest @@ -9,7 +8,7 @@ from .test_authorization import ssh_connect_remote, ssh_run_command, \ per_command_check_skip_versions, remove_all_tacacs_server -from .utils import stop_tacacs_server, start_tacacs_server +from .utils import stop_tacacs_server, start_tacacs_server, check_server_received from tests.common.errors import RunAnsibleModuleFail from tests.common.helpers.assertions import pytest_assert from tests.common.utilities import skip_release @@ -123,40 +122,6 @@ def check_local_no_other_user_log(duthost, tacacs_creds): pytest_assert(len(logs) == 0, "Expected to find no accounting logs but found: {}".format(logs)) -def check_server_received(ptfhost, data): - """ - Check if tacacs server received the data. - """ - hex = binascii.hexlify(data.encode('ascii')) - hex_string = hex.decode() - - """ - Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string: - 1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log: - Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53 - Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f - In above log, the 'data[140] = 0xf8' is received data. - - 2. Following sed command will extract the received data from tac_plus.log: - sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605 - - 3. Following set command will join all received data to hex string: - sed ':a; N; $!ba; s/\\n//g' - - 4. Then the grep command will check if the received hex data containes expected hex string. - grep '{0}'".format(hex_string) - - Also suppress following Flake8 error/warning: - W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert. - E501 : Line too long. Following sed command difficult to split to multiple line. - """ - sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501 - res = ptfhost.shell(sed_command) - logger.info(sed_command) - logger.info(res["stdout_lines"]) - pytest_assert(len(res["stdout_lines"]) > 0) - - @pytest.fixture def rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds): duthost = duthosts[enum_rand_one_per_hwsku_hostname] diff --git a/tests/tacacs/test_authorization.py b/tests/tacacs/test_authorization.py index 8ceda73810..b0cffe00c6 100644 --- a/tests/tacacs/test_authorization.py +++ b/tests/tacacs/test_authorization.py @@ -7,6 +7,7 @@ from tests.tacacs.utils import per_command_check_skip_versions, remove_all_tacacs_server, get_ld_path from tests.common.helpers.assertions import pytest_assert from tests.common.utilities import skip_release, wait_until +from .utils import check_server_received pytestmark = [ pytest.mark.disable_loganalyzer, @@ -79,6 +80,18 @@ def remote_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds) yield ssh_client +@pytest.fixture +def remote_rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + dutip = duthost.mgmt_ip + with ssh_connect_remote( + dutip, + tacacs_creds['tacacs_rw_user'], + tacacs_creds['tacacs_rw_user_passwd'] + ) as ssh_client: + yield ssh_client + + @pytest.fixture def local_user_client(): with paramiko.SSHClient() as ssh_client: @@ -125,7 +138,11 @@ def verify_show_aaa(remote_user_client): return False -def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client): +def check_authorization_tacacs_only( + duthosts, + enum_rand_one_per_hwsku_hostname, + tacacs_creds, + remote_user_client): duthost = duthosts[enum_rand_one_per_hwsku_hostname] """ Verify TACACS+ user run command in server side whitelist: @@ -154,14 +171,74 @@ def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, ) -def test_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, setup_authorization_tacacs, - tacacs_creds, check_tacacs, remote_user_client): - check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client) +def test_authorization_tacacs_only( + duthosts, + enum_rand_one_per_hwsku_hostname, + setup_authorization_tacacs, + tacacs_creds, + check_tacacs, + remote_user_client, + remote_rw_user_client): + + check_authorization_tacacs_only( + duthosts, + enum_rand_one_per_hwsku_hostname, + tacacs_creds, + remote_user_client) + + # check commands used by scripts + commands = [ + "show interfaces counters -a -p 3", + "show ip bgp neighbor", + "show ipv6 bgp neighbor", + "show feature status telemetry", + "touch testfile", + "chmod +w testfile", + "echo \"test\" > testfile", + "ls -l testfile | egrep -v -i '^total'", + "/bin/sed -i '$d' testfile", + "find -type f -name testfile -print | xargs /bin/rm -f", + "touch testfile", + "rm -f testfi*", + "mkdir -p test", + "portstat -c", + "show ip bgp summary", + "show ipv6 bgp summary", + "show interfaces portchannel", + "show muxcable firmware", + "show platform summary", + "show version", + "show lldp table", + "show reboot-cause", + "configlet --help", + "sonic-db-cli CONFIG_DB HGET \"FEATURE|macsec\" state" + ] + + for subcommand in commands: + exit_code, stdout, stderr = ssh_run_command(remote_user_client, subcommand) + pytest_assert(exit_code == 0) + + rw_commands = [ + "sudo config interface", + "sudo route_check.py | head -n 100", + "sudo dmesg -D", + "sudo sonic-cfggen --print-data", + "sudo config list-checkpoints", + "redis-cli -n 4 keys \\*" + ] + + for subcommand in rw_commands: + exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, subcommand) + pytest_assert(exit_code == 0) def test_authorization_tacacs_only_some_server_down( duthosts, enum_rand_one_per_hwsku_hostname, - setup_authorization_tacacs, tacacs_creds, ptfhost, check_tacacs, remote_user_client): + setup_authorization_tacacs, + tacacs_creds, + ptfhost, + check_tacacs, + remote_user_client): """ Setup multiple tacacs server for this UT. Tacacs server 127.0.0.1 not accessible. @@ -184,7 +261,11 @@ def test_authorization_tacacs_only_some_server_down( Verify TACACS+ user can't run command not in server side whitelist. Verify Local user can't login. """ - check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client) + check_authorization_tacacs_only( + duthosts, + enum_rand_one_per_hwsku_hostname, + tacacs_creds, + remote_user_client) # Cleanup duthost.shell("sudo config tacacs delete %s" % invalid_tacacs_server_ip) @@ -419,6 +500,70 @@ def test_backward_compatibility_disable_authorization( start_tacacs_server(ptfhost) +def create_test_files(remote_client): + exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.1") + pytest_assert(exit_code == 0) + + exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.2") + pytest_assert(exit_code == 0) + + exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.3") + pytest_assert(exit_code == 0) + + +def test_tacacs_authorization_wildcard( + ptfhost, + duthosts, + enum_rand_one_per_hwsku_hostname, + tacacs_creds, + check_tacacs, + remote_user_client, + remote_rw_user_client): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + duthost.shell("sudo config aaa authorization tacacs+") + + # Create files for command with wildcards + create_test_files(remote_user_client) + + # Verify command with wildcard been send to TACACS server side correctly. + exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls *") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/ls") + check_server_received(ptfhost, "cmd-arg=*") + + exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile.?") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/ls") + check_server_received(ptfhost, "cmd-arg=testfile.?") + + exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile*") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/ls") + check_server_received(ptfhost, "cmd-arg=testfile*") + + exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls test*.?") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/ls") + check_server_received(ptfhost, "cmd-arg=test*.?") + + # Create files for command with wildcards + create_test_files(remote_rw_user_client) + + # Verify sudo command with * been send to TACACS server side correctly. + exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo ls test*.?") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/sudo") + check_server_received(ptfhost, "cmd-arg=ls") + check_server_received(ptfhost, "cmd-arg=test*.?") + + exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo zgrep pfcwd /var/log/syslog*") + pytest_assert(exit_code == 0) + check_server_received(ptfhost, "cmd=/usr/bin/sudo") + check_server_received(ptfhost, "cmd-arg=zgrep") + check_server_received(ptfhost, "cmd-arg=pfcwd") + check_server_received(ptfhost, "cmd-arg=/var/log/syslog*") + + def test_stop_request_next_server_after_reject( duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client): diff --git a/tests/tacacs/utils.py b/tests/tacacs/utils.py index 6cfa45537e..677de776ab 100644 --- a/tests/tacacs/utils.py +++ b/tests/tacacs/utils.py @@ -1,6 +1,7 @@ import crypt import logging import re +import binascii from tests.common.errors import RunAnsibleModuleFail from tests.common.utilities import wait_until, check_skip_release @@ -244,3 +245,37 @@ def remove_all_tacacs_server(duthost): tacacs_server = tacacs_server.rstrip() if tacacs_server: duthost.shell("sudo config tacacs delete %s" % tacacs_server) + + +def check_server_received(ptfhost, data): + """ + Check if tacacs server received the data. + """ + hex = binascii.hexlify(data.encode('ascii')) + hex_string = hex.decode() + + """ + Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string: + 1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log: + Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53 + Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f + In above log, the 'data[140] = 0xf8' is received data. + + 2. Following sed command will extract the received data from tac_plus.log: + sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605 + + 3. Following set command will join all received data to hex string: + sed ':a; N; $!ba; s/\\n//g' + + 4. Then the grep command will check if the received hex data containes expected hex string. + grep '{0}'".format(hex_string) + + Also suppress following Flake8 error/warning: + W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert. + E501 : Line too long. Following sed command difficult to split to multiple line. + """ + sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501 + res = ptfhost.shell(sed_command) + logger.info(sed_command) + logger.info(res["stdout_lines"]) + pytest_assert(len(res["stdout_lines"]) > 0)