Skip to content

Translator

exceptions

Define all of the exceptions we want to raise in our translators.

ASNError

Bases: TypeError

ASNError provides an error class to use when there is an issue with an Autonomous System Number.

Source code in translator/exceptions.py
4
5
class ASNError(TypeError):
    """ASNError provides an error class to use when there is an issue with an Autonomous System Number."""

gobgp

A translator interface for GoBGP (https://github.com/osrg/gobgp).

GoBGP

Represents a GoBGP instance.

Source code in translator/gobgp.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class GoBGP:
    """Represents a GoBGP instance."""

    def __init__(self, url):
        """Configure the channel used for communication."""
        channel = grpc.insecure_channel(url)
        self.stub = gobgp_pb2_grpc.GobgpApiStub(channel)

    @staticmethod
    def _get_family_afi(ip_version):
        if ip_version == IPV6:
            return gobgp_pb2.Family.AFI_IP6
        return gobgp_pb2.Family.AFI_IP

    def _build_path(self, ip, event_data=None):  # noqa: PLR0914
        # Grab ASN and Community from our event_data, or use the defaults
        if not event_data:
            event_data = {}
        asn = event_data.get("asn", DEFAULT_ASN)
        community = event_data.get("community", DEFAULT_COMMUNITY)
        ip_version = ip.ip.version

        # Set the origin to incomplete (options are IGP, EGP, incomplete)
        # Incomplete means that BGP is unsure of exactly how the prefix was injected into the topology.
        # The most common scenario here is that the prefix was redistributed into Border Gateway Protocol
        # from some other protocol, typically an IGP. - https://www.kwtrain.com/blog/bgp-pt2
        origin = Any()
        origin.Pack(
            attribute_pb2.OriginAttribute(
                origin=2,
            ),
        )

        # IP prefix and its associated length
        nlri = Any()
        nlri.Pack(
            attribute_pb2.IPAddressPrefix(
                prefix_len=ip.network.prefixlen,
                prefix=str(ip.ip),
            ),
        )

        # Set the next hop to the correct value depending on IP family
        next_hop = Any()
        family_afi = self._get_family_afi(ip_version)
        if ip_version == IPV6:
            next_hops = event_data.get("next_hop", DEFAULT_V6_NEXTHOP)
            next_hop.Pack(
                attribute_pb2.MpReachNLRIAttribute(
                    family=gobgp_pb2.Family(afi=family_afi, safi=gobgp_pb2.Family.SAFI_UNICAST),
                    next_hops=[next_hops],
                    nlris=[nlri],
                ),
            )
        else:
            next_hops = event_data.get("next_hop", DEFAULT_V4_NEXTHOP)
            next_hop.Pack(
                attribute_pb2.NextHopAttribute(
                    next_hop=next_hops,
                ),
            )

        # Set our AS Path
        as_path = Any()
        as_segment = None

        # Make sure our asn is an acceptable value.
        asn_is_valid(asn)
        as_segment = [attribute_pb2.AsSegment(numbers=[asn])]
        as_segments = attribute_pb2.AsPathAttribute(segments=as_segment)
        as_path.Pack(as_segments)

        # Set our community number
        # The ASN gets packed into the community so we need to be careful about size to not overflow the structure
        communities = Any()
        # Standard community
        # Since we pack both into the community string we need to make sure they will both fit
        if asn < MAX_SMALL_ASN and community < MAX_SMALL_COMM:
            # We bitshift ASN left by 16 so that there is room to add the community on the end of it. This is because
            # GoBGP wants the community sent as a single integer.
            comm_id = (asn << 16) + community
            communities.Pack(attribute_pb2.CommunitiesAttribute(communities=[comm_id]))
        else:
            logger.info("LargeCommunity Used - ASN: %s. Community: %s", asn, community)
            global_admin = asn
            local_data1 = community
            # set to 0 because there's no use case for it, but we need a local_data2 for gobgp to read any of it
            local_data2 = 0
            large_community = attribute_pb2.LargeCommunity(
                global_admin=global_admin,
                local_data1=local_data1,
                local_data2=local_data2,
            )
            communities.Pack(attribute_pb2.LargeCommunitiesAttribute(communities=[large_community]))

        attributes = [origin, next_hop, as_path, communities]

        return gobgp_pb2.Path(
            nlri=nlri,
            pattrs=attributes,
            family=gobgp_pb2.Family(afi=family_afi, safi=gobgp_pb2.Family.SAFI_UNICAST),
        )

    def add_path(self, ip, event_data):
        """Announce a single route."""
        logger.info("Blocking %s", ip)
        try:
            path = self._build_path(ip, event_data)

            self.stub.AddPath(
                gobgp_pb2.AddPathRequest(table_type=gobgp_pb2.GLOBAL, path=path),
                _TIMEOUT_SECONDS,
            )
        except ASNError as e:
            logger.warning("ASN assertion failed with error: %s", e)

    def del_all_paths(self):
        """Remove all routes from being announced."""
        logger.warning("Withdrawing ALL routes")

        self.stub.DeletePath(gobgp_pb2.DeletePathRequest(table_type=gobgp_pb2.GLOBAL), _TIMEOUT_SECONDS)

    def del_path(self, ip, event_data):
        """Remove a single route from being announced."""
        logger.info("Unblocking %s", ip)
        try:
            path = self._build_path(ip, event_data)
            self.stub.DeletePath(
                gobgp_pb2.DeletePathRequest(table_type=gobgp_pb2.GLOBAL, path=path),
                _TIMEOUT_SECONDS,
            )
        except ASNError as e:
            logger.warning("ASN assertion failed with error: %s", e)

    def get_prefixes(self, ip):
        """Retrieve the routes that match a prefix and are announced.

        Returns:
            list: The routes that overlap with the prefix and are currently announced.
        """
        prefixes = [gobgp_pb2.TableLookupPrefix(prefix=str(ip.ip))]
        family_afi = self._get_family_afi(ip.ip.version)
        result = self.stub.ListPath(
            gobgp_pb2.ListPathRequest(
                table_type=gobgp_pb2.GLOBAL,
                prefixes=prefixes,
                family=gobgp_pb2.Family(afi=family_afi, safi=gobgp_pb2.Family.SAFI_UNICAST),
            ),
            _TIMEOUT_SECONDS,
        )
        return list(result)

    def is_blocked(self, ip):
        """Return True if at least one route matching the prefix is being announced."""
        return len(self.get_prefixes(ip)) > 0

__init__(url)

Configure the channel used for communication.

Source code in translator/gobgp.py
28
29
30
31
def __init__(self, url):
    """Configure the channel used for communication."""
    channel = grpc.insecure_channel(url)
    self.stub = gobgp_pb2_grpc.GobgpApiStub(channel)

add_path(ip, event_data)

Announce a single route.

Source code in translator/gobgp.py
128
129
130
131
132
133
134
135
136
137
138
139
def add_path(self, ip, event_data):
    """Announce a single route."""
    logger.info("Blocking %s", ip)
    try:
        path = self._build_path(ip, event_data)

        self.stub.AddPath(
            gobgp_pb2.AddPathRequest(table_type=gobgp_pb2.GLOBAL, path=path),
            _TIMEOUT_SECONDS,
        )
    except ASNError as e:
        logger.warning("ASN assertion failed with error: %s", e)

del_all_paths()

Remove all routes from being announced.

Source code in translator/gobgp.py
141
142
143
144
145
def del_all_paths(self):
    """Remove all routes from being announced."""
    logger.warning("Withdrawing ALL routes")

    self.stub.DeletePath(gobgp_pb2.DeletePathRequest(table_type=gobgp_pb2.GLOBAL), _TIMEOUT_SECONDS)

del_path(ip, event_data)

Remove a single route from being announced.

Source code in translator/gobgp.py
147
148
149
150
151
152
153
154
155
156
157
def del_path(self, ip, event_data):
    """Remove a single route from being announced."""
    logger.info("Unblocking %s", ip)
    try:
        path = self._build_path(ip, event_data)
        self.stub.DeletePath(
            gobgp_pb2.DeletePathRequest(table_type=gobgp_pb2.GLOBAL, path=path),
            _TIMEOUT_SECONDS,
        )
    except ASNError as e:
        logger.warning("ASN assertion failed with error: %s", e)

get_prefixes(ip)

Retrieve the routes that match a prefix and are announced.

Returns:

Name Type Description
list

The routes that overlap with the prefix and are currently announced.

Source code in translator/gobgp.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_prefixes(self, ip):
    """Retrieve the routes that match a prefix and are announced.

    Returns:
        list: The routes that overlap with the prefix and are currently announced.
    """
    prefixes = [gobgp_pb2.TableLookupPrefix(prefix=str(ip.ip))]
    family_afi = self._get_family_afi(ip.ip.version)
    result = self.stub.ListPath(
        gobgp_pb2.ListPathRequest(
            table_type=gobgp_pb2.GLOBAL,
            prefixes=prefixes,
            family=gobgp_pb2.Family(afi=family_afi, safi=gobgp_pb2.Family.SAFI_UNICAST),
        ),
        _TIMEOUT_SECONDS,
    )
    return list(result)

is_blocked(ip)

Return True if at least one route matching the prefix is being announced.

Source code in translator/gobgp.py
177
178
179
def is_blocked(self, ip):
    """Return True if at least one route matching the prefix is being announced."""
    return len(self.get_prefixes(ip)) > 0

shared

Provide a location for code that we want to share between all translators.

asn_is_valid(asn)

asn_is_valid makes sure that an ASN passed in is a valid 2 or 4 Byte ASN.

Parameters:

Name Type Description Default
asn int

The Autonomous System Number that we want to validate

required

Raises:

Type Description
ASNError

If the ASN is not between 0 and 4294967295 or is not an integer.

Returns:

Name Type Description
bool bool

description

Source code in translator/shared.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def asn_is_valid(asn: int) -> bool:
    """asn_is_valid makes sure that an ASN passed in is a valid 2 or 4 Byte ASN.

    Args:
        asn (int): The Autonomous System Number that we want to validate

    Raises:
        ASNError: If the ASN is not between 0 and 4294967295 or is not an integer.

    Returns:
        bool: _description_

    """
    if not isinstance(asn, int):
        msg = f"ASN {asn} is not an Integer, has type {type(asn)}"
        raise ASNError(msg)
    if not 0 < asn < MAX_ASN_VAL:
        # This is the max as stated in rfc6996
        msg = f"ASN {asn} is out of range. Must be between 0 and 4294967295"
        raise ASNError(msg)

    return True

tests

acceptance

environment

Configure the test environment before executing acceptance tests.

before_all(context)

Create a GoBGP object.

Source code in translator/tests/acceptance/environment.py
6
7
8
9
def before_all(context):
    """Create a GoBGP object."""
    context.gobgp = GoBGP("gobgp:50051")
    context.config.setup_logging()

steps

actions

Define the steps used by Behave.

add_block(context, route, asn, community)

Block a single IP.

Source code in translator/tests/acceptance/steps/actions.py
13
14
15
16
17
18
@when("we add {route} with {asn} and {community} to the block list")
def add_block(context, route, asn, community):
    """Block a single IP."""
    ip = ipaddress.ip_interface(route)
    event_data = {"asn": int(asn), "community": int(community)}
    context.gobgp.add_path(ip, event_data)
asn_validation_fails(context, route, asn, community)

Ensure the ASN was invalid.

Source code in translator/tests/acceptance/steps/actions.py
43
44
45
46
47
48
@capture
@when("{route} and {community} with invalid {asn} is sent")
def asn_validation_fails(context, route, asn, community):
    """Ensure the ASN was invalid."""
    add_block(context, route, asn, community)
    assert context.log_capture.find_event("ASN assertion failed")
check_block(context, ip)

Ensure that the IP is currently blocked.

Source code in translator/tests/acceptance/steps/actions.py
51
52
53
54
@then("{ip} is blocked")
def check_block(context, ip):
    """Ensure that the IP is currently blocked."""
    assert get_block_status(context, ip)
check_unblock(context, ip)

Ensure that the IP is currently unblocked.

Source code in translator/tests/acceptance/steps/actions.py
57
58
59
60
@then("{ip} is unblocked")
def check_unblock(context, ip):
    """Ensure that the IP is currently unblocked."""
    assert not get_block_status(context, ip)
del_block(context, route, asn, community)

Remove a single IP.

Source code in translator/tests/acceptance/steps/actions.py
21
22
23
24
25
26
@then("we delete {route} with {asn} and {community} from the block list")
def del_block(context, route, asn, community):
    """Remove a single IP."""
    ip = ipaddress.ip_interface(route)
    event_data = {"asn": int(asn), "community": int(community)}
    context.gobgp.del_path(ip, event_data)
get_block_status(context, ip)

Check if the IP is currently blocked.

Returns:

Name Type Description
bool

The return value. True if the IP is currently blocked, False otherwise.

Source code in translator/tests/acceptance/steps/actions.py
29
30
31
32
33
34
35
36
37
38
39
40
def get_block_status(context, ip):
    """Check if the IP is currently blocked.

    Returns:
        bool: The return value. True if the IP is currently blocked, False otherwise.
    """
    # Allow our add/delete requests to settle
    time.sleep(1)

    ip_obj = ipaddress.ip_interface(ip)

    return any(ip_obj in ipaddress.ip_network(path.destination.prefix) for path in context.gobgp.get_prefixes(ip_obj))

translator

Define the main event loop for the translator.

install_deps()

Install necessary dependencies for debuggers.

Because of how we build translator currently, we don't have a great way to selectively install things at build, so we just do it here! Right now this also includes base.txt, which is unecessary, but in the future when we build a little better, it'll already be setup.

Source code in translator/translator.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def install_deps():
    """Install necessary dependencies for debuggers.

    Because of how we build translator currently, we don't have a great way to selectively
    install things at build, so we just do it here! Right now this also includes base.txt,
    which is unecessary, but in the future when we build a little better, it'll already be
    setup.
    """
    logger.info("Installing dependencies for debuggers")

    import subprocess  # noqa: S404, PLC0415
    import sys  # noqa: PLC0415

    subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "/requirements/local.txt"])  # noqa: S603 TODO: add this to the container build

    logger.info("Done installing dependencies for debuggers")

main() async

Connect to the websocket and start listening for messages.

Source code in translator/translator.py
119
120
121
122
123
124
125
126
async def main():
    """Connect to the websocket and start listening for messages."""
    while True:
        try:
            await websocket_loop()
        except RpcError as e:
            logger.warning("Encountered an error connecting to gobgp, retrying in 10s, error is: %s", e)
            await asyncio.sleep(10)

process(message, websocket, g) async

Take a single message form the websocket and hand it off to the appropriate function.

Source code in translator/translator.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def process(message, websocket, g):
    """Take a single message form the websocket and hand it off to the appropriate function."""
    json_message = json.loads(message)
    event_type = json_message.get("type")
    event_data = json_message.get("message")
    if event_type not in KNOWN_MESSAGES:
        logger.error("Unknown event type received: %s", event_type)
    # TODO: Maybe only allow this in testing?
    elif event_type == "translator_remove_all":
        g.del_all_paths()
    else:
        try:
            ip = ipaddress.ip_interface(event_data["route"])
        except:  # noqa E722
            logger.exception("Error parsing message: %s", message)
            return

        if event_type == "translator_add":
            g.add_path(ip, event_data)
        elif event_type == "translator_remove":
            g.del_path(ip, event_data)
        elif event_type == "translator_check":
            json_message["type"] = "translator_check_resp"
            json_message["message"]["is_blocked"] = g.is_blocked(ip)
            json_message["message"]["translator_name"] = hostname
            await websocket.send(json.dumps(json_message))

websocket_loop() async

Connect to the websocket and start listening for messages for Gobgp.

Source code in translator/translator.py
108
109
110
111
112
113
114
115
116
async def websocket_loop():
    """Connect to the websocket and start listening for messages for Gobgp."""
    g = GoBGP("gobgp:50051")
    async for websocket in websockets.connect(url):
        try:
            async for message in websocket:
                await process(message, websocket, g)
        except websockets.ConnectionClosed:
            continue