Skip to content

Commit

Permalink
[TACACS] Improve TACACS per-command authorization UT coverage (sonic-…
Browse files Browse the repository at this point in the history
…net#8115)

[TACACS] Improve TACACS per-command  authorization UT coverage

### Description of PR
Improve TACACS per-command  authorization UT coverage

Summary:
Fixes # (issue)

### Type of change

<!--
- Fill x for your type of change.
- e.g.
- [x] Bug fix
-->

- [ ] Bug fix
- [ ] Testbed and Framework(new/improvement)
- [X] Test case(new/improvement)


### Back port request
- [ ] 201911
- [ ] 202012
- [x] 202205

### Approach
#### What is the motivation for this PR?
Improve TACACS per-command  authorization UT coverage.

#### How did you do it?
Add new UT to cover 'run command with wildcard' scenario.
Improve exist UT to cover more commands.

#### How did you verify/test it?
Manually test with latest master branch image and 202205/202211 branch image.

#### Any platform specific information?

#### Supported testbed topology if it's a new test case?

### Documentation
<!--
(If it's a new feature, new test case)
Did you update documentation/Wiki relevant to your implementation?
Link to the wiki page?
-->
  • Loading branch information
liuh-80 authored and parmarkj committed Oct 3, 2023
1 parent 79fdcb3 commit f94a91c
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 42 deletions.
37 changes: 1 addition & 36 deletions tests/tacacs/test_accounting.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import logging
import time
from tests.common.devices.ptf import PTFHost
import binascii


import pytest


from .test_authorization import ssh_connect_remote, ssh_run_command, \
per_command_check_skip_versions, remove_all_tacacs_server
from .utils import stop_tacacs_server, start_tacacs_server
from .utils import stop_tacacs_server, start_tacacs_server, check_server_received
from tests.common.errors import RunAnsibleModuleFail
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release
Expand Down Expand Up @@ -123,40 +122,6 @@ def check_local_no_other_user_log(duthost, tacacs_creds):
pytest_assert(len(logs) == 0, "Expected to find no accounting logs but found: {}".format(logs))


def check_server_received(ptfhost, data):
"""
Check if tacacs server received the data.
"""
hex = binascii.hexlify(data.encode('ascii'))
hex_string = hex.decode()

"""
Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string:
1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log:
Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53
Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f
In above log, the 'data[140] = 0xf8' is received data.
2. Following sed command will extract the received data from tac_plus.log:
sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605
3. Following set command will join all received data to hex string:
sed ':a; N; $!ba; s/\\n//g'
4. Then the grep command will check if the received hex data containes expected hex string.
grep '{0}'".format(hex_string)
Also suppress following Flake8 error/warning:
W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert.
E501 : Line too long. Following sed command difficult to split to multiple line.
"""
sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501
res = ptfhost.shell(sed_command)
logger.info(sed_command)
logger.info(res["stdout_lines"])
pytest_assert(len(res["stdout_lines"]) > 0)


@pytest.fixture
def rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
Expand Down
157 changes: 151 additions & 6 deletions tests/tacacs/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.tacacs.utils import per_command_check_skip_versions, remove_all_tacacs_server, get_ld_path
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release, wait_until
from .utils import check_server_received

pytestmark = [
pytest.mark.disable_loganalyzer,
Expand Down Expand Up @@ -79,6 +80,18 @@ def remote_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds)
yield ssh_client


@pytest.fixture
def remote_rw_user_client(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
dutip = duthost.mgmt_ip
with ssh_connect_remote(
dutip,
tacacs_creds['tacacs_rw_user'],
tacacs_creds['tacacs_rw_user_passwd']
) as ssh_client:
yield ssh_client


@pytest.fixture
def local_user_client():
with paramiko.SSHClient() as ssh_client:
Expand Down Expand Up @@ -125,7 +138,11 @@ def verify_show_aaa(remote_user_client):
return False


def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client):
def check_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
remote_user_client):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
"""
Verify TACACS+ user run command in server side whitelist:
Expand Down Expand Up @@ -154,14 +171,74 @@ def check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname,
)


def test_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, setup_authorization_tacacs,
tacacs_creds, check_tacacs, remote_user_client):
check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client)
def test_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
setup_authorization_tacacs,
tacacs_creds,
check_tacacs,
remote_user_client,
remote_rw_user_client):

check_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
remote_user_client)

# check commands used by scripts
commands = [
"show interfaces counters -a -p 3",
"show ip bgp neighbor",
"show ipv6 bgp neighbor",
"show feature status telemetry",
"touch testfile",
"chmod +w testfile",
"echo \"test\" > testfile",
"ls -l testfile | egrep -v -i '^total'",
"/bin/sed -i '$d' testfile",
"find -type f -name testfile -print | xargs /bin/rm -f",
"touch testfile",
"rm -f testfi*",
"mkdir -p test",
"portstat -c",
"show ip bgp summary",
"show ipv6 bgp summary",
"show interfaces portchannel",
"show muxcable firmware",
"show platform summary",
"show version",
"show lldp table",
"show reboot-cause",
"configlet --help",
"sonic-db-cli CONFIG_DB HGET \"FEATURE|macsec\" state"
]

for subcommand in commands:
exit_code, stdout, stderr = ssh_run_command(remote_user_client, subcommand)
pytest_assert(exit_code == 0)

rw_commands = [
"sudo config interface",
"sudo route_check.py | head -n 100",
"sudo dmesg -D",
"sudo sonic-cfggen --print-data",
"sudo config list-checkpoints",
"redis-cli -n 4 keys \\*"
]

for subcommand in rw_commands:
exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, subcommand)
pytest_assert(exit_code == 0)


def test_authorization_tacacs_only_some_server_down(
duthosts, enum_rand_one_per_hwsku_hostname,
setup_authorization_tacacs, tacacs_creds, ptfhost, check_tacacs, remote_user_client):
setup_authorization_tacacs,
tacacs_creds,
ptfhost,
check_tacacs,
remote_user_client):
"""
Setup multiple tacacs server for this UT.
Tacacs server 127.0.0.1 not accessible.
Expand All @@ -184,7 +261,11 @@ def test_authorization_tacacs_only_some_server_down(
Verify TACACS+ user can't run command not in server side whitelist.
Verify Local user can't login.
"""
check_authorization_tacacs_only(duthosts, enum_rand_one_per_hwsku_hostname, tacacs_creds, remote_user_client)
check_authorization_tacacs_only(
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
remote_user_client)

# Cleanup
duthost.shell("sudo config tacacs delete %s" % invalid_tacacs_server_ip)
Expand Down Expand Up @@ -419,6 +500,70 @@ def test_backward_compatibility_disable_authorization(
start_tacacs_server(ptfhost)


def create_test_files(remote_client):
exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.1")
pytest_assert(exit_code == 0)

exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.2")
pytest_assert(exit_code == 0)

exit_code, stdout, stderr = ssh_run_command(remote_client, "touch testfile.3")
pytest_assert(exit_code == 0)


def test_tacacs_authorization_wildcard(
ptfhost,
duthosts,
enum_rand_one_per_hwsku_hostname,
tacacs_creds,
check_tacacs,
remote_user_client,
remote_rw_user_client):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
duthost.shell("sudo config aaa authorization tacacs+")

# Create files for command with wildcards
create_test_files(remote_user_client)

# Verify command with wildcard been send to TACACS server side correctly.
exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls *")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=*")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=testfile.?")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls testfile*")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=testfile*")

exit_code, stdout, stderr = ssh_run_command(remote_user_client, "ls test*.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/ls")
check_server_received(ptfhost, "cmd-arg=test*.?")

# Create files for command with wildcards
create_test_files(remote_rw_user_client)

# Verify sudo command with * been send to TACACS server side correctly.
exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo ls test*.?")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/sudo")
check_server_received(ptfhost, "cmd-arg=ls")
check_server_received(ptfhost, "cmd-arg=test*.?")

exit_code, stdout, stderr = ssh_run_command(remote_rw_user_client, "sudo zgrep pfcwd /var/log/syslog*")
pytest_assert(exit_code == 0)
check_server_received(ptfhost, "cmd=/usr/bin/sudo")
check_server_received(ptfhost, "cmd-arg=zgrep")
check_server_received(ptfhost, "cmd-arg=pfcwd")
check_server_received(ptfhost, "cmd-arg=/var/log/syslog*")


def test_stop_request_next_server_after_reject(
duthosts, enum_rand_one_per_hwsku_hostname,
tacacs_creds, ptfhost, check_tacacs, remote_user_client, local_user_client):
Expand Down
35 changes: 35 additions & 0 deletions tests/tacacs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import crypt
import logging
import re
import binascii

from tests.common.errors import RunAnsibleModuleFail
from tests.common.utilities import wait_until, check_skip_release
Expand Down Expand Up @@ -244,3 +245,37 @@ def remove_all_tacacs_server(duthost):
tacacs_server = tacacs_server.rstrip()
if tacacs_server:
duthost.shell("sudo config tacacs delete %s" % tacacs_server)


def check_server_received(ptfhost, data):
"""
Check if tacacs server received the data.
"""
hex = binascii.hexlify(data.encode('ascii'))
hex_string = hex.decode()

"""
Extract received data from tac_plus.log, then use grep to check if the received data contains hex_string:
1. tac_plus server start with '-d 2058' parameter to log received data in following format in tac_plus.log:
Thu Mar 9 06:26:16 2023 [75483]: data[140] = 0xf8, xor'ed with hash[12] = 0xab -> 0x53
Thu Mar 9 06:26:16 2023 [75483]: data[141] = 0x8d, xor'ed with hash[13] = 0xc2 -> 0x4f
In above log, the 'data[140] = 0xf8' is received data.
2. Following sed command will extract the received data from tac_plus.log:
sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log # noqa W605
3. Following set command will join all received data to hex string:
sed ':a; N; $!ba; s/\\n//g'
4. Then the grep command will check if the received hex data containes expected hex string.
grep '{0}'".format(hex_string)
Also suppress following Flake8 error/warning:
W605 : Invalid escape sequence. Flake8 can't handle sed command escape sequence, so will report false alert.
E501 : Line too long. Following sed command difficult to split to multiple line.
"""
sed_command = "sed -n 's/.*-> 0x\(..\).*/\\1/p' /var/log/tac_plus.log | sed ':a; N; $!ba; s/\\n//g' | grep '{0}'".format(hex_string) # noqa W605 E501
res = ptfhost.shell(sed_command)
logger.info(sed_command)
logger.info(res["stdout_lines"])
pytest_assert(len(res["stdout_lines"]) > 0)

0 comments on commit f94a91c

Please sign in to comment.