Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests/vrf] Submit VRF testcases according to vrf test plan #1065

Merged
merged 6 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 120 additions & 61 deletions ansible/roles/test/files/ptftests/fib_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,99 +61,142 @@ class FibTest(BaseTest):
DEFAULT_BALANCING_RANGE = 0.25
BALANCING_TEST_TIMES = 10000
DEFAULT_BALANCING_TEST_RATIO = 0.0001
ACTION_FWD = 'fwd'
ACTION_DROP = 'drop'

_required_params = [
'fib_info',
'router_mac',
]

def __init__(self):
'''
@summary: constructor
'''
BaseTest.__init__(self)
self.test_params = test_params_get()
self.check_required_params()

#---------------------------------------------------------------------

def setUp(self):
'''
@summary: Setup for the test
Two test parameters are used:
Some test parameters are used:
- fib_info: the FIB information generated according to the testbed
- router_mac: the MAC address of the DUT used to create the eth_dst
of the packet
- testbed_type: the type of the testbed used to determine the source
port
- src_port: this list should include all enabled ports, both up links
- src_ports: this list should include all enabled ports, both up links
and down links.
- pkt_action: expect to receive test traffic or not. Default: fwd
- ipv4/ipv6: enable ipv4/ipv6 tests

Other test parameters:
- ttl: ttl of test pkts. Auto decrease 1 for expected pkts.
- ip_options enable ip option header in ipv4 pkts. Default: False(disable)
- src_vid vlan tag id of src pkts. Default: None(untag)
- dst_vid vlan tag id of dst pkts. Default: None(untag)

TODO: Have a separate line in fib_info/file to indicate all UP ports
'''
self.dataplane = ptf.dataplane_instance
self.fib = fib.Fib(self.test_params['fib_info'])
self.router_mac = self.test_params['router_mac']
self.pktlen = self.test_params['testbed_mtu']
fib_info = self.test_params.get('fib_info', None)
self.fib = fib.Fib(self.test_params['fib_info']) if fib_info is not None else None
self.router_mac = self.test_params.get('router_mac', None)
self.pktlen = self.test_params.get('testbed_mtu', 1500)

self.test_ipv4 = self.test_params.get('ipv4', True)
self.test_ipv6 = self.test_params.get('ipv6', True)

self.test_balancing = self.test_params.get('test_balancing', True)
self.balancing_range = self.test_params.get('balancing_range', self.DEFAULT_BALANCING_RANGE)
self.balancing_test_times = self.test_params.get('balancing_test_times', self.BALANCING_TEST_TIMES)
self.balancing_test_ratio = self.test_params.get('balancing_test_ratio', self.DEFAULT_BALANCING_TEST_RATIO)

# Provide the list of all UP interfaces with index in sequence order starting from 0
if self.test_params['testbed_type'] == 't1' or self.test_params['testbed_type'] == 't1-lag' or self.test_params['testbed_type'] == 't0-64-32':
self.src_ports = range(0, 32)
if self.test_params['testbed_type'] == 't1-64-lag' or self.test_params['testbed_type'] == 't1-64-lag-clet':
self.src_ports = [0, 1, 4, 5, 16, 17, 20, 21, 34, 36, 37, 38, 39, 42, 44, 45, 46, 47, 50, 52, 53, 54, 55, 58, 60, 61, 62, 63]
if self.test_params['testbed_type'] == 't0':
self.src_ports = range(1, 25) + range(28, 32)
if self.test_params['testbed_type'] == 't0-52':
self.src_ports = range(0, 52)
if self.test_params['testbed_type'] == 't0-56':
self.src_ports = [0, 1, 4, 5, 8, 9] + range(12, 18) + [20, 21, 24, 25, 28, 29, 32, 33, 36, 37] + range(40, 46) + [48, 49, 52, 53]
if self.test_params['testbed_type'] == 't0-64':
self.src_ports = range(0, 2) + range(4, 18) + range(20, 33) + range(36, 43) + range(48, 49) + range(52, 59)
if self.test_params['testbed_type'] == 't0-116':
self.src_ports = range(0, 120)
self.pkt_action = self.test_params.get('pkt_action', self.ACTION_FWD)
self.ttl = self.test_params.get('ttl', 64)
self.ip_options = self.test_params.get('ip_options', False)
self.src_vid = self.test_params.get('src_vid', None)
self.dst_vid = self.test_params.get('dst_vid', None)

self.src_ports = self.test_params.get('src_ports', None)
if self.src_ports is None:
# Provide the list of all UP interfaces with index in sequence order starting from 0
if self.test_params['testbed_type'] == 't1' or self.test_params['testbed_type'] == 't1-lag' or self.test_params['testbed_type'] == 't0-64-32':
self.src_ports = range(0, 32)
if self.test_params['testbed_type'] == 't1-64-lag' or self.test_params['testbed_type'] == 't1-64-lag-clet':
self.src_ports = [0, 1, 4, 5, 16, 17, 20, 21, 34, 36, 37, 38, 39, 42, 44, 45, 46, 47, 50, 52, 53, 54, 55, 58, 60, 61, 62, 63]
if self.test_params['testbed_type'] == 't0':
self.src_ports = range(1, 25) + range(28, 32)
if self.test_params['testbed_type'] == 't0-52':
self.src_ports = range(0, 52)
if self.test_params['testbed_type'] == 't0-56':
self.src_ports = [0, 1, 4, 5, 8, 9] + range(12, 18) + [20, 21, 24, 25, 28, 29, 32, 33, 36, 37] + range(40, 46) + [48, 49, 52, 53]
if self.test_params['testbed_type'] == 't0-64':
self.src_ports = range(0, 2) + range(4, 18) + range(20, 33) + range(36, 43) + range(48, 49) + range(52, 59)
if self.test_params['testbed_type'] == 't0-116':
self.src_ports = range(0, 120)
#---------------------------------------------------------------------

def check_ip_range(self, ipv4=True):
def check_required_params(self):
for param in self._required_params:
if param not in self.test_params:
raise Exception("Missing required parameter {}".format(param))

def check_ip_ranges(self, ipv4=True):
if ipv4:
ip_ranges = self.fib.ipv4_ranges()
else:
ip_ranges = self.fib.ipv6_ranges()

for ip_range in ip_ranges:

# Get the expected list of ports that would receive the packets
exp_port_list = self.fib[ip_range.get_first_ip()].get_next_hop_list()
# Choose random one source port from all ports excluding the expected ones
src_port = random.choice([port for port in self.src_ports if port not in exp_port_list])

if not exp_port_list:
continue

logging.info("Check IP range:" + str(ip_range) + " on " + str(exp_port_list) + "...")

# Send a packet with the first IP in the range
self.check_ip_route(src_port, ip_range.get_first_ip(), exp_port_list, ipv4)
# Send a packet with the last IP in the range
if ip_range.length() > 1:
self.check_ip_route(src_port, ip_range.get_last_ip(), exp_port_list, ipv4)
# Send a packet with a random IP in the range
if ip_range.length() > 2:
self.check_ip_route(src_port, ip_range.get_random_ip(), exp_port_list, ipv4)

# Test traffic balancing across ECMP/LAG members
if len(exp_port_list) > 1 and random.random() < self.balancing_test_ratio:
logging.info("Check IP range balancing...")
dst_ip = ip_range.get_random_ip()
hit_count_map = {}
for i in range(0, self.BALANCING_TEST_TIMES):
(matched_index, received) = self.check_ip_route(src_port, dst_ip, exp_port_list, ipv4)
hit_count_map[matched_index] = hit_count_map.get(matched_index, 0) + 1
self.check_balancing(self.fib[dst_ip].get_next_hop(), hit_count_map)
next_hop = self.fib[ip_range.get_first_ip()]
self.check_ip_range(ip_range, next_hop, ipv4)

def check_ip_range(self, ip_range, next_hop, ipv4=True):
# Get the expected list of ports that would receive the packets
exp_port_list = next_hop.get_next_hop_list()
# Choose random one source port from all ports excluding the expected ones
src_port = random.choice([port for port in self.src_ports if port not in exp_port_list])

if not exp_port_list:
return

logging.info("Check IP range:" + str(ip_range) + " on " + str(exp_port_list) + "...")

# Send a packet with the first IP in the range
self.check_ip_route(src_port, ip_range.get_first_ip(), exp_port_list, ipv4)
# Send a packet with the last IP in the range
if ip_range.length() > 1:
self.check_ip_route(src_port, ip_range.get_last_ip(), exp_port_list, ipv4)
# Send a packet with a random IP in the range
if ip_range.length() > 2:
self.check_ip_route(src_port, ip_range.get_random_ip(), exp_port_list, ipv4)

# Test traffic balancing across ECMP/LAG members
if (self.test_balancing and self.pkt_action == self.ACTION_FWD
and len(exp_port_list) > 1
and random.random() < self.balancing_test_ratio):
logging.info("Check IP range balancing...")
dst_ip = ip_range.get_random_ip()
hit_count_map = {}
for i in range(0, self.balancing_test_times):
(matched_index, received) = self.check_ip_route(src_port, dst_ip, exp_port_list, ipv4)
hit_count_map[matched_index] = hit_count_map.get(matched_index, 0) + 1
self.check_balancing(next_hop.get_next_hop(), hit_count_map)

def check_ip_route(self, src_port, dst_ip_addr, dst_port_list, ipv4=True):
if ipv4:
(matched_index, received) = self.check_ipv4_route(src_port, dst_ip_addr, dst_port_list)
res = self.check_ipv4_route(src_port, dst_ip_addr, dst_port_list)
else:
(matched_index, received) = self.check_ipv6_route(src_port, dst_ip_addr, dst_port_list)
res = self.check_ipv6_route(src_port, dst_ip_addr, dst_port_list)

if self.pkt_action == self.ACTION_DROP:
return res

(matched_index, received) = res

assert received

Expand Down Expand Up @@ -183,22 +226,31 @@ def check_ipv4_route(self, src_port, dst_ip_addr, dst_port_list):
ip_dst=ip_dst,
tcp_sport=sport,
tcp_dport=dport,
ip_ttl=64)
ip_ttl=self.ttl,
ip_options=self.ip_options,
dl_vlan_enable=self.src_vid is not None,
vlan_vid=self.src_vid or 0)
exp_pkt = simple_tcp_packet(
self.pktlen,
eth_src=self.router_mac,
ip_src=ip_src,
ip_dst=ip_dst,
tcp_sport=sport,
tcp_dport=dport,
ip_ttl=63)
ip_ttl=max(self.ttl-1, 0),
ip_options=self.ip_options,
dl_vlan_enable=self.dst_vid is not None,
vlan_vid=self.dst_vid or 0)
masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether, "dst")

send_packet(self, src_port, pkt)
logging.info("Sending packet from port " + str(src_port) + " to " + ip_dst)

return verify_packet_any_port(self, masked_exp_pkt, dst_port_list)
if self.pkt_action == self.ACTION_FWD:
return verify_packet_any_port(self, masked_exp_pkt, dst_port_list)
elif self.pkt_action == self.ACTION_DROP:
return verify_no_packet_any(self, masked_exp_pkt, dst_port_list)
#---------------------------------------------------------------------

def check_ipv6_route(self, src_port, dst_ip_addr, dst_port_list):
Expand All @@ -223,22 +275,29 @@ def check_ipv6_route(self, src_port, dst_ip_addr, dst_port_list):
ipv6_src=ip_src,
tcp_sport=sport,
tcp_dport=dport,
ipv6_hlim=64)
ipv6_hlim=self.ttl,
dl_vlan_enable=self.src_vid is not None,
vlan_vid=self.src_vid or 0)
exp_pkt = simple_tcpv6_packet(
pktlen=self.pktlen,
eth_src=self.router_mac,
ipv6_dst=ip_dst,
ipv6_src=ip_src,
tcp_sport=sport,
tcp_dport=dport,
ipv6_hlim=63)
ipv6_hlim=max(self.ttl-1, 0),
dl_vlan_enable=self.dst_vid is not None,
vlan_vid=self.dst_vid or 0)
masked_exp_pkt = Mask(exp_pkt)
masked_exp_pkt.set_do_not_care_scapy(scapy.Ether,"dst")

send_packet(self, src_port, pkt)
logging.info("Sending packet from port " + str(src_port) + " to " + ip_dst)

return verify_packet_any_port(self, masked_exp_pkt, dst_port_list)
if self.pkt_action == self.ACTION_FWD:
return verify_packet_any_port(self, masked_exp_pkt, dst_port_list)
elif self.pkt_action == self.ACTION_DROP:
return verify_no_packet_any(self, masked_exp_pkt, dst_port_list)
#---------------------------------------------------------------------
def check_within_expected_range(self, actual, expected):
'''
Expand Down Expand Up @@ -290,7 +349,7 @@ def runTest(self):
"""
# IPv4 Test
if (self.test_ipv4):
self.check_ip_range()
self.check_ip_ranges()
# IPv6 Test
if (self.test_ipv6):
self.check_ip_range(ipv4=False)
self.check_ip_ranges(ipv4=False)
Loading