Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TACACS] Improve TACACS per-command authorization UT coverage #8115

Merged
merged 6 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 1 addition & 36 deletions tests/tacacs/test_accounting.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import logging
import time
from tests.common.devices.ptf import PTFHost
import binascii


import pytest


from .test_authorization import ssh_connect_remote, ssh_run_command, \
per_command_check_skip_versions, remove_all_tacacs_server
from .utils import stop_tacacs_server, start_tacacs_server
from .utils import stop_tacacs_server, start_tacacs_server, check_server_received
from tests.common.errors import RunAnsibleModuleFail
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release
Expand Down Expand Up @@ -123,40 +122,6 @@ def check_local_no_other_user_log(duthost, tacacs_creds):
pytest_assert(len(logs) == 0, "Expected to find no accounting logs but found: {}".format(logs))


def check_server_received(ptfhost, data):
"""
Check if tacacs server received the data.
"""
hex = binascii.hexlify(data.encode('ascii'))
hex_string = hex.decode()

"""
Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string:
1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log:
Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53
Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f
In above log, the 'data[140] = 0xf8' is received data.

2. Following sed command will extract the received data from tac_plus.log:
sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605

3. Following set command will join all received data to hex string:
sed ':a; N; $!ba; s/\\n//g'

4. Then the grep command will check if the received hex data containes expected hex string.
grep '{0}'".format(hex_string)

Also suppress following Flake8 error/warning:
W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert.
E501 : Line too long. Following sed command difficult to split to multiple line.
"""
sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501
res = ptfhost.shell(sed_command)
logger.info(sed_command)
logger.info(res["stdout_lines"])
pytest_assert(len(res["stdout_lines"]) > 0)


@pytest.fixture
def rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
Expand Down
147 changes: 142 additions & 5 deletions tests/tacacs/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.tacacs.utils import per_command_check_skip_versions, remove_all_tacacs_server, get_ld_path
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release, wait_until
from .utils import check_server_received

pytestmark = [
pytest.mark.disable_loganalyzer,
Expand Down Expand Up @@ -78,6 +79,16 @@ def remote_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds)
) as ssh_client:
yield ssh_client

@pytest.fixture
def remote_rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
dutip = duthost.mgmt_ip
with ssh_connect_remote(
dutip,
tacacs_creds['tacacs_rw_user'],
tacacs_creds['tacacs_rw_user_passwd']
) as ssh_client:
yield ssh_client

@pytest.fixture
def local_user_client():
Expand Down Expand Up @@ -125,7 +136,12 @@ def verify_show_aaa(remote_user_client):
return False


def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client):
def check_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
remote_user_client,
remote_rw_user_client):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
"""
Verify TACACS+ user run command in server side whitelist:
Expand Down Expand Up @@ -153,10 +169,67 @@ def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname,
tacacs_creds['local_user_passwd']
)


def test_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, setup_authorization_tacacs,
tacacs_creds, check_tacacs, remote_user_client):
check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client)
# check commands used by scripts
commands = [
"show interfaces counters -a -p 3",
"show ip bgp neighbor",
"show ipv6 bgp neighbor",
"show feature status telemetry",
"touch testfile",
"chmod +w testfile",
"echo \"test\" > testfile",
"ls -l testfile | egrep -v -i '^total'",
"/bin/sed -i '$d' testfile",
"find -type f -name testfile -print | xargs /bin/rm -f",
"touch testfile",
"rm -f testfi*",
"mkdir -p test",
"portstat -c",
"show ip bgp summary",
"show ipv6 bgp summary",
"show interfaces portchannel",
"show muxcable firmware",
"show platform summary",
"show version",
"show lldp table",
"show reboot-cause",
"configlet --help",
"sonic-db-cli CONFIG_DB HGET \"FEATURE|macsec\" state"
]

for subcommand in commands:
exit_code, stdout, stderr = ssh_run_command(remote_user_client, subcommand)
pytest_assert(exit_code == 0)

rw_commands = [
"sudo config interface",
"sudo route_check.py | head -n 100",
"sudo dmesg -D",
"sudo sonic-cfggen --print-data",
"sudo config list-checkpoints",
"redis-cli -n 4 keys \\*"
]

for subcommand in rw_commands:
exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, subcommand)
pytest_assert(exit_code == 0)


def test_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
setup_authorization_tacacs,
tacacs_creds,
check_tacacs,
remote_user_client,
remote_rw_user_client):

check_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
remote_user_client,
remote_rw_user_client)


def test_authorization_tacacs_only_some_server_down(
Expand Down Expand Up @@ -417,3 +490,67 @@ def test_backward_compatibility_disable_authorization(

# cleanup
start_tacacs_server(ptfhost)


def create_test_files(remote_client):
exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.1")
pytest_assert(exit_code == 0)

exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.2")
pytest_assert(exit_code == 0)

exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.3")
pytest_assert(exit_code == 0)


def test_tacacs_authorization_wildcard(
ptfhost,
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
check_tacacs,
remote_user_client,
remote_rw_user_client):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
duthost.shell("sudo config aaa authorization tacacs+")

# Create files for command with wildcards
create_test_files(remote_user_client)

# Verify command with wildcard been send to TACACS server side correctly.
exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls *")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=*")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=testfile.?")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile*")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=testfile*")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls test*.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=test*.?")

# Create files for command with wildcards
create_test_files(remote_rw_user_client)

# Verify sudo command with * been send to TACACS server side correctly.
exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo ls test*.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/sudo")
check_server_received(ptfhost, "cmd-arg=ls")
check_server_received(ptfhost, "cmd-arg=test*.?")

exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo zgrep pfcwd /var/log/syslog*")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/sudo")
check_server_received(ptfhost, "cmd-arg=zgrep")
check_server_received(ptfhost, "cmd-arg=pfcwd")
check_server_received(ptfhost, "cmd-arg=/var/log/syslog*")
2 changes: 1 addition & 1 deletion tests/tacacs/test_rw_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def test_rw_user_ipv6(localhost, duthosts, enum_rand_one_per_hwsku_hostname, tac
res = ssh_remote_run(localhost, dutip, tacacs_creds['tacacs_rw_user'],
tacacs_creds['tacacs_rw_user_passwd'], "cat /etc/passwd")

check_output(res, 'testadmin', 'remote_user_su')
check_output(res, 'testadmin', 'remote_user_su')
35 changes: 35 additions & 0 deletions tests/tacacs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import crypt
import logging
import re
import binascii

from tests.common.errors import RunAnsibleModuleFail
from tests.common.utilities import wait_until, check_skip_release
Expand Down Expand Up @@ -244,3 +245,37 @@ def remove_all_tacacs_server(duthost):
tacacs_server = tacacs_server.rstrip()
if tacacs_server:
duthost.shell("sudo config tacacs delete %s" % tacacs_server)


def check_server_received(ptfhost, data):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No any code change, for fix the circle import issue, move code from test_accounting.py.

"""
Check if tacacs server received the data.
"""
hex = binascii.hexlify(data.encode('ascii'))
hex_string = hex.decode()

"""
Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string:
1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log:
Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53
Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f
In above log, the 'data[140] = 0xf8' is received data.

2. Following sed command will extract the received data from tac_plus.log:
sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605

3. Following set command will join all received data to hex string:
sed ':a; N; $!ba; s/\\n//g'

4. Then the grep command will check if the received hex data containes expected hex string.
grep '{0}'".format(hex_string)

Also suppress following Flake8 error/warning:
W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert.
E501 : Line too long. Following sed command difficult to split to multiple line.
"""
sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501
res = ptfhost.shell(sed_command)
logger.info(sed_command)
logger.info(res["stdout_lines"])
pytest_assert(len(res["stdout_lines"]) > 0)