diff --git a/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py b/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py index 32de122e50..3aece8bd15 100644 --- a/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py +++ b/tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py @@ -7,7 +7,7 @@ import pytest import allure import time - +import copy import requests pytestmark = [pytest.mark.roam_test, pytest.mark.bridge, pytest.mark.roam_ota] @@ -24,6 +24,7 @@ @allure.suite("11r Roaming over the air") @allure.feature("Roam Test") +@pytest.mark.ota class TestRoamOTA(object): @pytest.mark.same_channel @pytest.mark.wpa2_personal @@ -32,29 +33,35 @@ class TestRoamOTA(object): def test_roam_2g_to_2g_sc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Same channel, 2G, WPA2 Personal - pytest -m "roam and twog and same_channel and wpa2_personal" + pytest -m "roam and twog and same_channel and wpa2_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "5G": - config_data['radios'].pop(i) - if "5G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("5G") + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + logging.info(f"---dut list: {dut_list}---") + config['radios'] = [ + {"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"] if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -74,7 +81,8 @@ def test_roam_2g_to_2g_sc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -83,38 +91,59 @@ def test_roam_2g_to_2g_sc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="twog", num_sta=1, security="wpa2", security_key=key, ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="twog", num_sta=1, security="wpa2", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="11", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.different_channel @pytest.mark.wpa2_personal @@ -123,21 +152,25 @@ def test_roam_2g_to_2g_sc_psk_wpa2(self, get_target_object, get_test_library, ge def test_roam_2g_to_2g_dc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Different channel, 2G, WPA2 Personal - pytest -m "roam and twog and same_channel and wpa2_personal" + pytest -m "roam and twog and different_channel and wpa2_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "5G": - config_data['radios'].pop(i) - if "5G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("5G") + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ + {"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"] if len(dut_list) < 2: logging.error( f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") @@ -146,11 +179,10 @@ def test_roam_2g_to_2g_dc_psk_wpa2(self, get_target_object, get_test_library, ge serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) if ap == 1: - if ap == 1: - config_data['radios'] = [ - {"band": "2G", "channel": 1, "channel-mode": "HE", "channel-width": 20, "country": "CA"}] - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + config['radios'] = [ + {"band": "2G", "channel": 1, "channel-mode": "HE", "channel-width": 20, "country": "CA"}] + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -182,39 +214,56 @@ def test_roam_2g_to_2g_dc_psk_wpa2(self, get_target_object, get_test_library, ge assert False, f"push configuration to {serial_number} got failed" get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ "device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="twog", num_sta=1, security="wpa2", security_key=key, - ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="11", option="ota", dut_name=dut_names, - traffic_type="lf_udp", sta_type="11r") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="twog", num_sta=1, security="wpa2", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="1", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa2_personal @@ -223,29 +272,34 @@ def test_roam_2g_to_2g_dc_psk_wpa2(self, get_target_object, get_test_library, ge def test_roam_5g_to_5g_sc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Same channel, 5G, WPA2 Personal - pytest -m "roam and fiveg and same_channel and wpa2_personal" + pytest -m "roam and fiveg and same_channel and wpa2_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "2G": - config_data['radios'].pop(i) - if "2G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("2G") + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ + {"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"] if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -265,7 +319,8 @@ def test_roam_5g_to_5g_sc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -274,38 +329,59 @@ def test_roam_5g_to_5g_sc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="fiveg", num_sta=1, security="wpa2", security_key=key, ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="36", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="fiveg", num_sta=1, security="wpa2", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="36", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.different_channel @pytest.mark.wpa2_personal @@ -314,31 +390,37 @@ def test_roam_5g_to_5g_sc_psk_wpa2(self, get_target_object, get_test_library, ge def test_roam_5g_to_5g_dc_psk_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Different channel, 5G, WPA2 Personal - pytest -m "roam and fiveg and different_channel and wpa2_personal" + pytest -m "roam and fiveg and different_channel and wpa2_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "2G": - config_data['radios'].pop(i) - if "2G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("2G") + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ + {"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"] if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) if ap == 1: - config_data['radios'] = [{"band": "5G", "channel": 149, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + config['radios'] = [ + {"band": "5G", "channel": 149, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -358,7 +440,8 @@ def test_roam_5g_to_5g_dc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -367,38 +450,58 @@ def test_roam_5g_to_5g_dc_psk_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="fiveg", num_sta=1, security="wpa2", security_key=key, ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="36", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="fiveg", num_sta=1, security="wpa2", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="36", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_personal @@ -408,31 +511,36 @@ def test_roam_5g_to_5g_dc_psk_wpa2(self, get_target_object, get_test_library, ge def test_roam_2g_to_2g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Same channel, 2G, WPA3 Personal - pytest -m "roam and twog and same_channel and wpa3_personal" + pytest -m "roam and twog and same_channel and wpa3_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "5G": - config_data['radios'].pop(i) + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ + {"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}] # change ssid config data to sae - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" - if "5G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("5G") + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"] if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -452,7 +560,8 @@ def test_roam_2g_to_2g_sc_psk_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -461,38 +570,59 @@ def test_roam_2g_to_2g_sc_psk_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="twog", num_sta=1, security="wpa3", security_key=key, ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r-sae") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="twog", num_sta=1, security="wpa3", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="11", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r-sae") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_personal @@ -502,31 +632,36 @@ def test_roam_2g_to_2g_sc_psk_wpa3(self, get_target_object, get_test_library, ge def test_roam_5g_to_5g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Same channel, 5G, WPA3 Personal - pytest -m "roam and fiveg and same_channel and wpa3_personal" + pytest -m "roam and fiveg and same_channel and wpa3_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "2G": - config_data['radios'].pop(i) + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ + {"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] # change ssid security type to sae - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" - if "2G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("2G") + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"] if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -546,7 +681,8 @@ def test_roam_5g_to_5g_sc_psk_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -555,38 +691,59 @@ def test_roam_5g_to_5g_sc_psk_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") else: logging.error("Failed to get iwinfo") pytest.exit("Failed to get iwinfo") elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + pytest.fail("Empty iwinfo reponse from AP through minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="fiveg", num_sta=1, security="wpa3", security_key=key, ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="36", option="ota", dut_name=dut_names, traffic_type="lf_udp", sta_type="11r-sae") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="fiveg", num_sta=1, security="wpa3", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="11", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r-sae") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_personal @@ -596,17 +753,23 @@ def test_roam_5g_to_5g_sc_psk_wpa3(self, get_target_object, get_test_library, ge def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed): """ Test Roaming between two APs, Same channel, 6G, WPA3 Personal - pytest -m "roam and sixg and same_channel and wpa3_personal" + pytest -m "roam and sixg and same_channel and wpa3_personal and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['radios'] = [ + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['radios'] = [ { "band": "6G", "channel": 161, @@ -616,8 +779,8 @@ def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, ge } ] # change wifi-band and security type to sae - config_data['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"] - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"] + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "sae" if len(dut_list) < 2: logging.error( f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") @@ -625,8 +788,8 @@ def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, ge for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -658,58 +821,81 @@ def test_roam_6g_to_6g_sc_psk_wpa3(self, get_target_object, get_test_library, ge assert False, f"push configuration to {serial_number} got failed" get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ "device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"Extracted AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - key = config_data['interfaces'][0]["ssids"][0]["encryption"]["key"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="sixg", num_sta=1, security="wpa3", security_key=key, - ssid=ssid, upstream="1.1.eth1", duration=None, - iteration=1, channel="161", option="ota", dut_name=dut_names, - traffic_type="lf_udp", sta_type="11r-sae") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + key = config['interfaces'][0]["ssids"][0]["encryption"]["key"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + scan_freq=freqs_, + band="sixg", num_sta=1, security="wpa3", security_key=key, + ssid=ssid, upstream="1.1.eth1", duration=None, + iteration=1, channel="161", option="ota", dut_name=dut_names, + traffic_type="lf_udp", sta_type="11r-sae") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa2_enterprise @pytest.mark.twog @pytest.mark.roam - def test_roam_2g_to_2g_sc_eap_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed, radius_info): + def test_roam_2g_to_2g_sc_eap_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed, + radius_info): """ Test Roaming between two APs, Same channel, 2G, WPA2 Enterprise - pytest -m "roam and twog and same_channel and wpa2_enterprise" + pytest -m "roam and twog and same_channel and wpa2_enterprise and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['interfaces'][0]["ssids"][0]["radius"] = { + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['interfaces'][0]["ssids"][0]["radius"] = { "accounting": { "host": radius_info["ip"], "port": radius_info["port"], @@ -721,21 +907,22 @@ def test_roam_2g_to_2g_sc_eap_wpa2(self, get_target_object, get_test_library, ge "secret": radius_info["secret"] } } - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "5G": - config_data['radios'].pop(i) - if "5G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("5G") - if "key" in config_data['interfaces'][0]["ssids"][0]["encryption"]: - config_data['interfaces'][0]["ssids"][0]["encryption"].pop("key") + config['radios'] = [ + {"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"] + if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2" + if "key" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"].pop("key") if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -755,7 +942,8 @@ def test_roam_2g_to_2g_sc_eap_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -764,58 +952,88 @@ def test_roam_2g_to_2g_sc_eap_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="twog", num_sta=1, security="wpa2", ssid=ssid, upstream="1.1.eth1", eap_method="TLS", - eap_identity=radius_info["user"], eap_password=radius_info["password"], private_key="1234567890", - pk_passwd=radius_info["pk_password"], ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + band="twog", num_sta=1, security="wpa2", ssid=ssid, + upstream="1.1.eth1", eap_method="TLS", + pairwise_cipher="DEFAULT ", + groupwise_cipher="DEFAULT ", + eap_identity=radius_info["user"], + eap_password=radius_info["password"], + private_key="/home/lanforge/client.p12", + pk_passwd=radius_info["pk_password"], + ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", + iteration=1, channel="11", option="otd", dut_name=dut_names, + traffic_type="lf_udp") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa2_enterprise @pytest.mark.fiveg @pytest.mark.roam - def test_roam_5g_to_5g_sc_eap_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed, radius_info): + def test_roam_5g_to_5g_sc_eap_wpa2(self, get_target_object, get_test_library, get_lab_info, selected_testbed, + radius_info): """ Test Roaming between two APs, Same channel, 5G, WPA2 Enterprise - pytest -m "roam and fiveg and same_channel and wpa2_enterprise" + pytest -m "roam and fiveg and same_channel and wpa2_enterprise and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['interfaces'][0]["ssids"][0]["radius"] = { + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['interfaces'][0]["ssids"][0]["radius"] = { "accounting": { "host": radius_info["ip"], "port": radius_info["port"], @@ -827,21 +1045,22 @@ def test_roam_5g_to_5g_sc_eap_wpa2(self, get_target_object, get_test_library, ge "secret": radius_info["secret"] } } - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "2G": - config_data['radios'].pop(i) - if "2G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("2G") - if "key" in config_data['interfaces'][0]["ssids"][0]["encryption"]: - config_data['interfaces'][0]["ssids"][0]["encryption"].pop("key") + config['radios'] = [ + {"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"] + if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa2" + if "key" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"].pop("key") if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -861,7 +1080,8 @@ def test_roam_5g_to_5g_sc_eap_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -870,58 +1090,88 @@ def test_roam_5g_to_5g_sc_eap_wpa2(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="fiveg", num_sta=1, security="wpa2", ssid=ssid, upstream="1.1.eth1", eap_method="TLS", - eap_identity=radius_info["user"], eap_password=radius_info["password"], private_key="1234567890", - pk_passwd=radius_info["pk_password"], ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + band="fiveg", num_sta=1, security="wpa2", ssid=ssid, + upstream="1.1.eth1", eap_method="TLS", + pairwise_cipher="DEFAULT ", + groupwise_cipher="DEFAULT ", + eap_identity=radius_info["user"], + eap_password=radius_info["password"], + private_key="/home/lanforge/client.p12", + pk_passwd=radius_info["pk_password"], + ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", + iteration=1, channel="11", option="ota", dut_name=dut_names, + traffic_type="lf_udp") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_enterprise @pytest.mark.twog @pytest.mark.roam - def test_roam_2g_to_2g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, radius_info): + def test_roam_2g_to_2g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, + radius_info): """ Test Roaming between two APs, Same channel, 2G, WPA3 Enterprise - pytest -m "roam and twog and same_channel and wpa3_enterprise" + pytest -m "roam and twog and same_channel and wpa3_enterprise and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['interfaces'][0]["ssids"][0]["radius"] = { + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['interfaces'][0]["ssids"][0]["radius"] = { "accounting": { "host": radius_info["ip"], "port": radius_info["port"], @@ -933,23 +1183,22 @@ def test_roam_2g_to_2g_sc_eap_wpa3(self, get_target_object, get_test_library, ge "secret": radius_info["secret"] } } - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "5G": - config_data['radios'].pop(i) + config['radios'] = [ + {"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 40, "country": "CA"}] # change ssid security type to sae - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" - if "5G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("5G") - if "key" in config_data['interfaces'][0]["ssids"][0]["encryption"]: - config_data['interfaces'][0]["ssids"][0]["encryption"].pop("key") + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"] + if "key" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"].pop("key") if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -969,7 +1218,8 @@ def test_roam_2g_to_2g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -978,58 +1228,88 @@ def test_roam_2g_to_2g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="twog", num_sta=1, security="wpa3", ssid=ssid, upstream="1.1.eth1", eap_method="TLS", - eap_identity=radius_info["user"], eap_password=radius_info["password"], private_key="1234567890", - pk_passwd=radius_info["pk_password"], ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + band="twog", num_sta=1, security="wpa3", ssid=ssid, + upstream="1.1.eth1", eap_method="TLS", + pairwise_cipher="DEFAULT ", + groupwise_cipher="DEFAULT ", + eap_identity=radius_info["user"], + eap_password=radius_info["password"], + private_key="/home/lanforge/client.p12", + pk_passwd=radius_info["pk_password"], + ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap-sha384", + iteration=1, channel="11", option="ota", dut_name=dut_names, + traffic_type="lf_udp") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_enterprise @pytest.mark.fiveg @pytest.mark.roam - def test_roam_5g_to_5g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, radius_info): + def test_roam_5g_to_5g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, + radius_info): """ Test Roaming between two APs, Same channel, 5G, WPA3 Enterprise - pytest -m "roam and fiveg and same_channel and wpa3_enterprise" + pytest -m "roam and fiveg and same_channel and wpa3_enterprise and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['interfaces'][0]["ssids"][0]["radius"] = { + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['interfaces'][0]["ssids"][0]["radius"] = { "accounting": { "host": radius_info["ip"], "port": radius_info["port"], @@ -1041,23 +1321,22 @@ def test_roam_5g_to_5g_sc_eap_wpa3(self, get_target_object, get_test_library, ge "secret": radius_info["secret"] } } - for i in range(len(config_data["radios"])): - if config_data['radios'][i]["band"] == "2G": - config_data['radios'].pop(i) + config['radios'] = [ + {"band": "5G", "channel": 36, "channel-mode": "HE", "channel-width": 80, "country": "CA"}] # change ssid security type to sae - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" - if "2G" in config_data['interfaces'][0]["ssids"][0]["wifi-bands"]: - config_data['interfaces'][0]["ssids"][0]["wifi-bands"].remove("2G") - if "key" in config_data['interfaces'][0]["ssids"][0]["encryption"]: - config_data['interfaces'][0]["ssids"][0]["encryption"].pop("key") + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["5G"] + if "key" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"].pop("key") if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -1077,7 +1356,8 @@ def test_roam_5g_to_5g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -1086,58 +1366,89 @@ def test_roam_5g_to_5g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="fiveg", num_sta=1, security="wpa3", ssid=ssid, upstream="1.1.eth1", eap_method="TLS", - eap_identity=radius_info["user"], eap_password=radius_info["password"], private_key="1234567890", - pk_passwd=radius_info["pk_password"], ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", - iteration=1, channel="11", option="ota", dut_name=dut_names, traffic_type="lf_udp") - assert pass_fail, message + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + band="fiveg", num_sta=1, security="wpa3", ssid=ssid, + upstream="1.1.eth1", eap_method="TLS", + pairwise_cipher="DEFAULT ", + groupwise_cipher="DEFAULT ", + eap_identity=radius_info["user"], + eap_password=radius_info["password"], + private_key="/home/lanforge/client.p12", + pk_passwd=radius_info["pk_password"], + ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap-sha384", + iteration=1, channel="36", option="ota", + dut_name=dut_names, + traffic_type="lf_udp") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True @pytest.mark.same_channel @pytest.mark.wpa3_enterprise @pytest.mark.sixg @pytest.mark.roam - def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, radius_info): + def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, get_lab_info, selected_testbed, + radius_info): """ Test Roaming between two APs, Same channel, 6G, WPA3 Enterprise - pytest -m "roam and sixg and same_channel and wpa3_enterprise" + pytest -m "roam and sixg and same_channel and wpa3_enterprise and ota" """ ap_data = dict() dut_list = [str(selected_testbed)] dut_names = list() bssid_list = list() + freqs_ = "" testbed_info = get_lab_info.CONFIGURATION - if str(selected_testbed + 'a') in testbed_info: - dut_list.append(str(selected_testbed + 'a')) - logging.info(f"dut list: {dut_list}--") - config_data['interfaces'][0]["ssids"][0]["radius"] = { + config = copy.deepcopy(config_data) + temp_list = list() + for key, val in testbed_info.items(): + tb_type, tb_name = selected_testbed.split("-") + if tb_type in key and tb_name[0] in key: + temp_list.append(key) + temp_list.sort() + dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1] + config['interfaces'][0]["ssids"][0]["radius"] = { "accounting": { "host": radius_info["ip"], "port": radius_info["port"], @@ -1149,7 +1460,7 @@ def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, ge "secret": radius_info["secret"] } } - config_data['radios'] = [ + config['radios'] = [ { "band": "6G", "channel": 161, @@ -1159,18 +1470,19 @@ def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, ge } ] # change wifi-band and security type to wpa3 - config_data['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"] - config_data['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" - if "key" in config_data['interfaces'][0]["ssids"][0]["encryption"]: - config_data['interfaces'][0]["ssids"][0]["encryption"].pop("key") + config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"] + config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3" + if "key" in config['interfaces'][0]["ssids"][0]["encryption"]: + config['interfaces'][0]["ssids"][0]["encryption"].pop("key") if len(dut_list) < 2: - logging.error(f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") + logging.error( + f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}") assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}" for ap in range(len(dut_list)): serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier'] dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model']) - logging.info(config_data) - payload = {"configuration": json.dumps(config_data), "serialNumber": serial_number, "UUID": 2} + logging.info(config) + payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2} uri = get_target_object.controller_library_object.build_uri( "device/" + serial_number + "/configure") logging.info("Sending Command: " + "\n" + str(uri) + "\n" + @@ -1190,7 +1502,8 @@ def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json())) if resp.status_code != 200: - if resp.status_code == 400 and "Device is already executing a command. Please try later." in resp.json()["ErrorDescription"]: + if resp.status_code == 400 and "Device is already executing a command. Please try later." in \ + resp.json()["ErrorDescription"]: time.sleep(30) resp = requests.post(uri, data=json.dumps(payload, indent=2), headers=get_target_object.controller_library_object.make_headers(), @@ -1199,37 +1512,59 @@ def test_roam_6g_to_6g_sc_eap_wpa3(self, get_target_object, get_test_library, ge logging.info(resp.json()) else: assert False, f"push configuration to {serial_number} got failed" - get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]]["device_under_tests"] - ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=False) + get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][ + "device_under_tests"] + ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True) if str(ap_iwinfo) != "Error: pop from empty list": - interfaces = {} - interface_matches = re.finditer( - r'wlan\d\s+ESSID:\s+".*?"\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+([\d\s]+)', ap_iwinfo, - re.DOTALL) - logging.info(str(interface_matches)) + include_essid = config['interfaces'][0]["ssids"][0]["name"] + re_obj = re.compile( + rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+(' + r'\d+)\s+\(([\d.]+) GHz\)', + re.DOTALL + ) + # find all matches + interface_matches = re_obj.finditer(ap_iwinfo) if interface_matches: for match in interface_matches: - interface_name = f'wlan{match.group(0)[4]}' - access_point = match.group(1) - channel = match.group(2).strip() - interfaces[interface_name] = {'Access Point': access_point, 'Channel': channel} - ap_data.update({serial_number: {'Access Point': access_point, 'Channel': channel}}) - logging.info(interfaces) - else: - logging.error("Failed to get iwinfo") - pytest.exit("Failed to get iwinfo") - elif ap_iwinfo == {}: - assert False, "Empty iwinfo reponse from AP through minicom" + interface_name = match.group(1) + access_point = match.group(2) + channel = match.group(3) + frequency = match.group(4).replace('.', '') + ap_data.update( + {serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}}) + logging.info(f"AP Data from iwinfo: {ap_data}") + if ap_data == {}: + logging.error("Failed to get required iwinfo from minicom") + pytest.fail("Failed to get required iwinfo from minicom") else: - assert False, "Failed to get iwinfo from minicom" - logging.info(f"AP Data from iwinfo: {ap_data}") + pytest.fail("Failed to get iwinfo from minicom") for serial in ap_data: bssid_list.append(ap_data[serial]['Access Point']) - ssid = config_data['interfaces'][0]["ssids"][0]["name"] - pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], - band="sixg", num_sta=1, security="wpa3", ssid=ssid, upstream="1.1.eth1", eap_method="TLS", - eap_identity=radius_info["user"], eap_password=radius_info["password"], private_key="1234567890", - pk_passwd=radius_info["pk_password"], ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap", - iteration=1, channel="161", option="ota", dut_name=dut_names, traffic_type="lf_udp") - assert pass_fail, message - + if not ap_data[serial]['frequency'].endswith(","): + freqs_ = freqs_ + ap_data[serial]['frequency'] + "," + else: + freqs_ = freqs_ + ap_data[serial]['frequency'] + ssid = config['interfaces'][0]["ssids"][0]["name"] + pass_fail, message = True, "Test Passed" + try: + pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1], + band="fiveg", num_sta=1, security="wpa3", ssid=ssid, + upstream="1.1.eth1", eap_method="TLS", + pairwise_cipher="DEFAULT ", + groupwise_cipher="DEFAULT ", + eap_identity=radius_info["user"], + eap_password=radius_info["password"], + private_key="/home/lanforge/client.p12", + pk_passwd=radius_info["pk_password"], + ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap-sha384", + iteration=1, channel="161", option="ota", dut_name=dut_names, + traffic_type="lf_udp") + except Exception as e: + logging.error(f"Exception in roam test : {e}") + pass_fail, message = False, e + finally: + get_target_object.dut_library_object.get_dut_logs(print_log=False) + if not pass_fail: + pytest.fail(f"Test failed with the following reasons: \n{message}") + else: + assert True