|
#!/usr/bin/env python3 |
|
import subprocess |
|
import logging |
|
from itertools import chain |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
def frr_execute(commands): |
|
arguments = list(chain.from_iterable(('-c', command) for command in commands)) |
|
process = None |
|
|
|
try: |
|
logging.debug('Executing FRR commands: ' + ', '.join(commands)) |
|
process = subprocess.run([FRR_VTYSH_PATH] + arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
if process.returncode != 0: |
|
raise subprocess.CalledProcessError(process.returncode, process.args, output=process.stdout, |
|
stderr=process.stderr) |
|
|
|
return [str(process.stdout), str(process.stderr)] |
|
except subprocess.CalledProcessError: |
|
logging.warning('command execution failed: ' + ', '.join(commands)) |
|
logging.warning('> process stdout: ' + str(process.stdout)) |
|
logging.warning('> process stderr: ' + str(process.stderr)) |
|
|
|
|
|
def frr_verify(matcher): |
|
stdout, stderr = frr_execute(['show running-config']) |
|
return matcher in str(stdout) |
|
|
|
|
|
class BgpAttribute(): |
|
def __init__(self, name, address_family=None): |
|
self.name = name |
|
self.address_family = address_family |
|
|
|
def description(self): |
|
return '{address_family} > {name}'.format(address_family=self.address_family or 'global', name=self.name) |
|
|
|
def configure(self, neighbor, params, invert=False): |
|
cmd = 'neighbor {neighbor} {name} {param_string}'.format( |
|
neighbor=neighbor, name=self.name, param_string=' '.join([str(param) for param in params])) |
|
if invert: |
|
cmd = 'no ' + cmd |
|
|
|
return self.configure_raw(cmd) |
|
|
|
def configure_raw(self, cmd): |
|
# Sanitize command |
|
cmd = cmd.strip() |
|
|
|
# Jump into configuration of BGP instance |
|
cmds = [ |
|
'configure terminal', |
|
'router bgp {instance}'.format(instance=FRR_BGP_INSTANCE) |
|
] |
|
|
|
# Jump into address family configuration if applicable |
|
if self.address_family: |
|
cmds.append('address-family {address_family}'.format(address_family=self.address_family)) |
|
|
|
# Execute the prepared commands |
|
frr_execute(cmds + [cmd]) |
|
|
|
# Return the executed command |
|
return cmd |
|
|
|
|
|
FRR_VTYSH_PATH = "/home/ppmathis/development/frr/vtysh/vtysh" |
|
FRR_BGP_INSTANCE = 65500 |
|
FRR_BGP_NEIGHBOR = '1.1.1.1' |
|
FRR_BGP_PEER_GROUP = 'PG-FUZZ' |
|
FRR_BGP_ATTRIBUTES = [ |
|
[BgpAttribute('advertisement-interval'), [10], [20]], |
|
[BgpAttribute('bfd'), [10, 100, 1000], [20, 200, 2000]], |
|
[BgpAttribute('capability dynamic'), [], None], |
|
[BgpAttribute('capability extended-nexthop'), [], None], |
|
[BgpAttribute('description'), ['TEST #1'], ['TEST #2']], |
|
[BgpAttribute('disable-connected-check'), [], None], |
|
[BgpAttribute('dont-capability-negotiate'), [], None], |
|
[BgpAttribute('ebgp-multihop'), [10], [20]], |
|
[BgpAttribute('enforce-multihop'), [], None], |
|
[BgpAttribute('local-as'), [100], [200]], |
|
[BgpAttribute('override-capability'), [], None], |
|
[BgpAttribute('passive'), [], None], |
|
[BgpAttribute('password'), ['HelloWorld'], ['GoodbyeWorld']], |
|
# [BgpAttribute('remote-as'), [100], [200]], |
|
# ^ FRR Restriction: Peers of a peer group may not have a different AS |
|
# [BgpAttribute('shutdown'), [], None], |
|
# ^ FRR Restriction: Peers of a shutdown peer group can not be unshutted |
|
[BgpAttribute('solo'), [], None], |
|
[BgpAttribute('timers'), [10, 100], [20, 200]], |
|
[BgpAttribute('timers connect'), [10], [20]], |
|
[BgpAttribute('ttl-security hops'), [10], [20]], |
|
[BgpAttribute('update-source'), ['1.1.1.1'], ['2.2.2.2']], |
|
|
|
[BgpAttribute('activate', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('addpath-tx-all-paths', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('addpath-tx-bestpath-per-AS', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('allowas-in', 'ipv4 unicast'), [1], [2]], |
|
[BgpAttribute('allowas-in origin', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('as-override', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('attribute-unchanged', 'ipv4 unicast'), ['as-path', 'next-hop', 'med'], None], |
|
[BgpAttribute('capability orf prefix-list', 'ipv4 unicast'), ['both'], ['send']], |
|
[BgpAttribute('default-originate', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('default-originate route-map', 'ipv4 unicast'), ['RM-DO1'], ['RM-DO2']], |
|
[BgpAttribute('distribute-list', 'ipv4 unicast'), ['DL1-IN', 'in'], ['DL2-IN', 'in']], |
|
[BgpAttribute('distribute-list', 'ipv4 unicast'), ['DL1-OUT', 'out'], ['DL2-OUT', 'out']], |
|
[BgpAttribute('filter-list', 'ipv4 unicast'), ['FL1-IN', 'in'], ['FL2-IN', 'in']], |
|
[BgpAttribute('filter-list', 'ipv4 unicast'), ['FL1-OUT', 'out'], ['FL2-OUT', 'out']], |
|
[BgpAttribute('maximum-prefix', 'ipv4 unicast'), [10, 11, 'restart', 1], [20, 22, 'restart', 2]], |
|
[BgpAttribute('next-hop-self', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('next-hop-self force', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('prefix-list', 'ipv4 unicast'), ['PL1-IN', 'in'], ['PL2-IN', 'in']], |
|
[BgpAttribute('prefix-list', 'ipv4 unicast'), ['PL1-OUT', 'out'], ['PL2-OUT', 'out']], |
|
[BgpAttribute('remove-private-AS', 'ipv4 unicast'), ['all', 'replace-AS'], None], |
|
[BgpAttribute('route-map', 'ipv4 unicast'), ['RM1-IN', 'in'], ['RM2-IN', 'in']], |
|
[BgpAttribute('route-map', 'ipv4 unicast'), ['RM1-OUT', 'out'], ['RM2-OUT', 'out']], |
|
# TODO: route-reflector-client (more complicated to check due to internal-only) |
|
[BgpAttribute('route-server-client', 'ipv4 unicast'), [], None], |
|
# TODO: send-community (more complicated to check due to implicit all) |
|
[BgpAttribute('soft-reconfiguration inbound', 'ipv4 unicast'), [], None], |
|
[BgpAttribute('unsuppress-map', 'ipv4 unicast'), ['RM1-US'], ['RM2-US']], |
|
[BgpAttribute('weight', 'ipv4 unicast'), [100], [200]], |
|
|
|
[BgpAttribute('nexthop-local unchanged', 'ipv6 unicast'), [], None], |
|
] |
|
|
|
# Clone attributes of IPv4 unicast AFI for IPv6 unicast |
|
for attribute in FRR_BGP_ATTRIBUTES: |
|
if attribute[0].address_family == 'ipv4 unicast': |
|
FRR_BGP_ATTRIBUTES.append([BgpAttribute(attribute[0].name, 'ipv6 unicast'), attribute[1], attribute[2]]) |
|
|
|
# Sort attributes by AFI followed by name |
|
FRR_BGP_ATTRIBUTES = sorted(FRR_BGP_ATTRIBUTES, key=lambda k: (k[0].address_family or '.', k[0].name)) |
|
|
|
# Check all attributes |
|
for attribute in FRR_BGP_ATTRIBUTES: |
|
# (Re-)initialize FRR testing environment |
|
frr_execute([ |
|
'configure terminal', |
|
'no router bgp {instance}'.format(instance=FRR_BGP_INSTANCE), |
|
'router bgp {instance}'.format(instance=FRR_BGP_INSTANCE), |
|
'no bgp default ipv4-unicast', |
|
'neighbor {neighbor} peer-group'.format(neighbor=FRR_BGP_PEER_GROUP), |
|
'neighbor {neighbor} peer-group {peer_group}'.format(neighbor=FRR_BGP_NEIGHBOR, peer_group=FRR_BGP_PEER_GROUP) |
|
]) |
|
|
|
# Attempt to set peer-group specific attribute |
|
cmd_peer_group = attribute[0].configure(FRR_BGP_PEER_GROUP, attribute[1]) |
|
if not frr_verify(cmd_peer_group): |
|
logging.info('result: \u2716 {attribute} (failed peer-group config: {command})' |
|
.format(attribute=attribute[0].description(), command=cmd_peer_group)) |
|
continue |
|
|
|
# Attempt to set peer specific attribute |
|
cmd_peer = None |
|
if attribute[2]: |
|
cmd_peer = attribute[0].configure(FRR_BGP_NEIGHBOR, attribute[2]) |
|
else: |
|
cmd_peer = attribute[0].configure(FRR_BGP_NEIGHBOR, attribute[1], invert=True) |
|
|
|
if not frr_verify(cmd_peer): |
|
logging.info('result: \u2716 {attribute} (failed peer config: {command})' |
|
.format(attribute=attribute[0].description(), command=cmd_peer)) |
|
continue |
|
|
|
# Attempt to set peer-group specific attribute a second time |
|
attribute[0].configure_raw(cmd_peer_group) |
|
if not frr_verify(cmd_peer_group): |
|
logging.info('result: \u2716 {attribute} (failed peer-group re-config: {command})' |
|
.format(attribute=attribute[0].description(), command=cmd_peer_group)) |
|
continue |
|
|
|
# Verify that peer-specific attribute still exists |
|
if not frr_verify(cmd_peer): |
|
logging.info('result: \u2716 {attribute} (lost peer-specific attribute: {command})' |
|
.format(attribute=attribute[0].description(), command=cmd_peer_group)) |
|
continue |
|
|
|
# Everything looks good! |
|
logging.info('result: \u2714 {attribute}'.format(attribute=attribute[0].description())) |