From 0871adc10eea92cd9f62cca347bed9df139253e9 Mon Sep 17 00:00:00 2001 From: Jerry Date: Wed, 6 May 2020 12:25:04 +0800 Subject: [PATCH] add more doc and checks --- scripts/roa.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/scripts/roa.py b/scripts/roa.py index dc351f1..7c87e05 100755 --- a/scripts/roa.py +++ b/scripts/roa.py @@ -9,6 +9,8 @@ import re class BashParser: def __init__(self): self.__pa = None # are we parsing bash array? + def end(self): + assert not self.__pa # check if array ends properly def parseline(self, line): repl_quotes = lambda t: t.replace('"', '').replace('\'', '') line = line.strip() @@ -30,7 +32,7 @@ class BashParser: if not line or line.startswith('#'): return None l = line.split('=') - assert len(l) >= 2 + assert len(l) >= 2 # is this line key=value syntax? return [l[0], '='.join([repl_quotes(i) for i in l[1:]])] bp = BashParser() @@ -41,6 +43,7 @@ def shell2dict(shellscript): if r: key, val = r fc[key.lower()] = val + bp.end() return fc cwd = Path() @@ -73,7 +76,7 @@ def neoneo_get_people(): assert f.name people[f.name] = {k: fc.get(k) for k in present_keys} nic_hdl = name2nichdl(f.name) - assert nic_hdl not in nic_hdl_names + assert nic_hdl not in nic_hdl_names # nic_hdl collision nic_hdl_names.add(nic_hdl) people[f.name]['nic_hdl'] = nic_hdl for v in people[f.name].values(): @@ -129,14 +132,14 @@ def neonet_route2roa(dirname, is_ipv6=False): roa_entries_key = ("asn", "prefix", "supernet", "netname") if fc.get('type').lower() in ('lo', 'subnet'): asn = str2asn(fc.get('asn')) - assert asn in ASNS + assert asn in ASNS # asn not in as-dir route = f.name.replace(',', '/') supernet = get_supernet(fc.get('supernet')) netname = fc.get('name') assert netname roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet, netname]))) elif fc.get('type').lower().startswith('tun'): - assert NODE_TABLE[fc.get('downstream')] # extra check for downstream + assert NODE_TABLE[fc.get('downstream')] # not in node-dir asn = NODE_TABLE[fc.get('upstream')] assert asn in ASNS route = f.name.replace(',', '/') @@ -144,10 +147,10 @@ def neonet_route2roa(dirname, is_ipv6=False): netname = "%s-%s" % (fc.get('type'), route) roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet, netname]))) elif fc.get('type').lower() == 'ptp': - assert NODE_TABLE[fc.get('upstream')] # extra check for upstream - assert NODE_TABLE[fc.get('downstream')] # extra check for downstream + assert NODE_TABLE[fc.get('upstream')] # not in node-dir + assert NODE_TABLE[fc.get('downstream')] # not in node-dir else: - raise AssertionError + raise AssertionError # unknown type except Exception: print("[!] Error while processing file", f) raise @@ -161,7 +164,7 @@ def neonet_route2roa(dirname, is_ipv6=False): pass else: print("[!] Error: found", net2, "overlaps", net1) - raise AssertionError + raise AssertionError # if this is intended, please include SUPERNET= in your route return roa_entries if __name__ == "__main__":