Python and data: slicing, caching, threading


In this blog post we'll explore some interesting implementation aspects of the analysis tool used in "The underlying problem of the Fediverse and other decentralised platforms". You could see it as the technical counterpart of the bundle. First, I'll recapitulate some points of the 2019 version. Then the 2020 version serves as the reference for the three highlighted topics: slicing, caching and threading. After reading this article you also might be able to speed up your Python apps (here: from around six minutes to four seconds). The source code of this tool is available in the bitkeks/fediverse-infra-analysis Github repository.

Some words about the methodologies applied during the analysis created last year. The graphs in 2019 were created with Libre Office Calc, based on an instances dump from instances.social. For each instance, the hostname was resolved to the target IP addresses and then a whois call was issued for these IPs. The cumulated data was then put together in a CSV file, ready to be examined with Calc.

Since whois not just returns the name of a hosting provider, but the name of an allocated network address range, mapping from one to the other has to be done. In the 2019 version, this was implemented by utilizing a dict, as shown below. Please note that querying the WHOIS databases might be subject to fair use policies. I solved this by reducing the queries with a cache file, in which successful WHOIS queries were stored and not sent again (this mechanism was improved in the 2020 version, as you'll see later).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
hoster_map = {
    "cloudflarenet": "cloudflare",
    "amazon technologies": "amazon",
    "amazon data services": "amazon",
    "amazon.com, inc.": "amazon",
    "hetzner-": "hetzner",
    "ovh": "ovh",
    "google": "google",
    "digitalocean": "digitalocean",
    "sakura-": "sakura",
    "us-linode-": "linode",
    "linode-us": "linode",
    ...
}

def map_whois_to_hoster(item: bytes) -> str:
    for hoster in hoster_map:
        if hoster.encode() in item:
            return hoster_map[hoster]

entry = whois[name].encode().lower().split(b"\n")
for item in entry:
    if item.startswith((b"netname:", b"organization:", b"organisation:", b"org:")):
        hoster = map_whois_to_hoster(item)
        if hoster:
            if hoster not in counters:
                counters[hoster] = []
            counters[hoster].append(name)
            break

Easy task! Collect the whois output, split at each newline and then look for the given keys in each line. For example, the line netname: OVH-CLOUD-LIM would be matched, and the hoster_map key ovh would match in line 18. The value ovh would then be returned and used as the key in counters, adding this instance to the list of instances hosted at this provider.

For the 2020 version, I created a new Python tool which has a lot more features than the last one and uses matplotlib for plots. The graph below shows the processing pipeline with file inputs and task chains.

Process 2020

It begins on the left, processing input files for IP-to-AS mappings and for the instances data. Both tasks use cache files, which we will explore in detail later. As the next step, a clean up job is executed for all cache files, creating a clean base for the workers to run. Then the pool of worker threads is started. Each worker has two assignments: get the IP address of the instance and then map this IP address to an AS. The topic of threading will be picked up in part three. When the workers finish, their results are collected. Lastly, graphs and exports are created.

Slicing

In 2020, IP-to-AS mapping (WHOIS) was vastly improved from the previous approach. When you're just hacking along, putting together some ideas it might be okay to be lazy and use whois to gather the information you need. But hey, performance-wise this is pretty inefficient. Looking for an alternative source for these mappings I stumbled upon iptoasn.com. A free IP address to ASN database, downloadable, easy to parse, updated hourly was exactly what I was looking for. We now have a list of all AS with their name, IP range and ASN. The following code handles the new input.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def asnfile_init(filename: str) -> dict:
    ip_networks = {}

    if not os.path.exists(filename):
        raise FileNotFoundError

    fh = gzip.open(filename, "rt")
    tsv = csv.reader(fh, delimiter="\t")

    current_slice = None
    current_slice_size = 0

    for row in tsv:
        if current_slice is None:
            # Initialization
            current_slice = row[0]

        if current_slice_size == 1000:
            current_slice = row[0]
            current_slice_size = 0

        if current_slice not in ip_networks:
            ip_networks[current_slice] = []

        # entry = { [...] }

        ip_networks[current_slice].append(entry)
        current_slice_size += 1

    fh.close()
    return ip_networks

You might wonder what current_slice does exactly. It is one of the many performance tuners built into the tool. In the first version of this implementation, each AS was placed as a top-level entry in the ip_networks dict. Simple, easy, what could go wrong. But this data structure quickly reaches its limits when it comes to search. Later in the process, IP addresses have to be mapped to their AS, based on the data in ip_networks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def get_asn_of_ip(ip, ip_networks: dict) -> list:
    if not type(ip) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
        ip = ipaddress.ip_address(ip)

    candidates = []
    # Iterate over slices to find the right network slice
    match = None
    for slice_start in ip_networks.keys():
        slice_start_ip = ipaddress.ip_address(slice_start)
        if ip == slice_start_ip:
            # we exactly matched the beginning IP of a slice, set it as match
            match = slice_start
            break
        elif ip < slice_start_ip:
            # Searched IP is in previous slice
            break
        # Set slice as possible match, meaning it can be accessed
        # as "previous" slice if the next iteration breaks
        match = slice_start

    if match is None:
        return []

    # The IP that is searched for is in the last matched slice
    for network in ip_networks[match]:
        # Using ipaddress objects as network start and end
        if network["start"] < ip < network["end"]:
            candidates.append(network)

Compare this implementation to the first version in which all networks are iterated and converted on the fly:

1
2
3
4
5
def get_asn_of_ip(ip, ip_networks: dict) -> list:
    # [...]
    for network in ip_networks:
        if ipaddress.ip_address(entry["start"]) < ip < ipaddress.ip_address(entry["end"]):
            candidates.append(network)

The IPv4 data set has 418362 entries. Due to the implementation running through all the entries, this means 418362 iterations. Each iteration executes two IP conversions with ipaddress.ip_address() for the comparison, which consumes a lot of performance. In the new version there are only 419 keys in the ip_networks dict, with 1000 entries each, so only a maximum of 1419 iterations are needed. Creating ipaddress objects still happens, but they are only applied to a specific subset of the whole data set (additionally using a cache as well).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ipaddressified_ip_networks = {}  # conversion cache

def ipaddressify_ip_networks(ip_networks: list) -> list:
    new_ip_networks = []
    for entry in ip_networks:
        entry["start"] = ipaddress.ip_address(entry["start"])
        entry["end"] = ipaddress.ip_address(entry["end"])
        new_ip_networks.append(entry)
    return new_ip_networks

def get_asn_of_ip(ip, ip_networks: dict) -> list:
    # [...]
    if match in ipaddressified_ip_networks:
        converted_network = ipaddressified_ip_networks[match]
    else:
        converted_network = ipaddressify_ip_networks(ip_networks[match])
        ipaddressified_ip_networks[match] = converted_network
    # [...]

This method of splitting a huge batch of items into smaller chunks I call slicing. In Python, slices are applied to lists to get only a subset of its items, e.g. list_with_items[1:5] returns items 2-5 from the list. In data structure theory we'd now enter the realm of trees and partitions. More on that at Wikipedia. Since we already touched caching, let's continue with this topic!

Caching

Caching is an extremly efficient way to persist results of a process for later re-use. Imagine you read an input file and you apply a parsing task on all the entries in this file, resulting in some output like a dict. Your script exits and you wish to re-run it. Should it need to do everything again, if the input has not changed? No, of course not. This is where caching comes in.

In my tool there are five caches: IP-to-ASN input file parsing, IP resolving (one for IP cache and one for hostnames which have no IP address), IP-to-AS mapping and the ip_networks conversion as shown above. The first four caches are persisted to disk and read on subsequent runs of the program. In the code snippet below, the first code example in this post (asnfile_init) is extended with caching functionality.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def asnfile_init(filename: str) -> dict:
    ip_networks = {}

    if not os.path.exists(filename):
        raise FileNotFoundError

    # Check hash of cache file
    filehash = hashlib.sha1()
    with open(filename, 'rb') as fh:
        while True:
            data = fh.read(65536)  # read in 64kb chunks
            if not data:
                break
            filehash.update(data)

    # construct file name from hash
    cachefile = ".asnfile_cached_{}.gz".format(filehash.hexdigest())

    if os.path.exists(cachefile):
        with gzip.open(cachefile, "rt") as fh:
            return json.load(fh)["ip_networks"]

    # [... continue if no cache exists ...]

The function receives the file name as parameter, creates a hash for this file and uses the resulting hash value as an identifier. If some previous run has already done the work of parsing the file, read the cache and return it directly. Else the code shown in the first code block is continued with.

Another example is the resolution of hostnames to IP addresses. Sure enough, your DNS server already caches requests, but we can save even more time by not even contacting the DNS server. Note that DNS entries have their own lifetime (time to live, TTL), which we ignore in this implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def hostname_to_ips(hostname: str) -> tuple:
    if hostname in ip_cache:
        # load IP addresses from cache
        return ip_cache[hostname]["v4"], ip_cache[hostname]["v6"]

    ipv4 = []
    ipv6 = []
    try:  # try to resolve the hostname
        for s in socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP):
            if s[0] == socket.AF_INET:  # AF_INET is IPv4
                ipv4.append(s[4][0])
            if s[0] == socket.AF_INET6:  # AF_INET6 is IPv6
                ipv6.append(s[4][0])
    except socket.gaierror:
        # [Errno -2] Name or service not known
        pass

    return ipv4, ipv6

Some times it happened that the tool was run ten times a minute for small adjustments. What would you think how much time it saves to not iterate over the whole list of instances again, to resolve their IP addresses which surely did not change in the last minute?

And as a last example, the clean up job.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def cleanup_cachefiles() -> CleanupStats:
    global ip_cache, no_ip_cache, asn_cache

    # temporary list of cleanup candidates
    deletion_candidates = []

    # load IP cache
    if os.path.exists(CACHEFILE_IP):
        with open(CACHEFILE_IP, "r") as fh:
            ip_cache = json.load(fh)
    # clean up IP cache
    for hostname in ip_cache:
        if ip_cache[hostname]["timestamp"] < (time.time() - 60 * 60):  # one hour
            # exceeded timeout
            deletion_candidates.append(hostname)
    for can in deletion_candidates:
        del ip_cache[can]

    deletion_candidates.clear()

    # [... continue with two other caches ...]

# [... near the end of the programm ...]
# save caches to persistent files
with open(CACHEFILE_IP, "w") as fh:
    json.dump(ip_cache, fh)

Every cache entry has a timestamp field in addition to its value. At the start all entries are iterated through and each timestamp is evaluated, searching for expired entries. Expired? Removed from the dict! The dict is held in memory and used during the data processing. When the program finishes the dict is persisted, dropping the expired items which were removed in cleanup_cachefiles.

Talking about IP address resolution, we will now come to the third and last part of this exercise: threading.

Threading

In the previous section the IP address resolution function was shown as an example. In it socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP) has the vital mission to fetch all available IP addresses that are stored for the corresponding hostname, for IPv4 as well as IPv6. What do you think might go wrong here?

Maybe you have guessed it already - it's the request timeout applied by your OS to getaddrinfo which can give us a hard time. So what easily goes wrong is plain simple: a DNS request can take a long time (in some configurations up to 60 seconds) to fail. These delays accumulate to an insane amount of waiting time which is wasted because other hostnames could be resolved in the meantime. And this is where asynchronous workers come into play.

Please note that in Python threading is a limited endeavor. The so called Global Interpreter Lock (GIL) prevents a running Python program from utilising the full functionality of threading, namely parallel execution. If you need full speed, use multiprocessing. If you prefer async/await, use asyncio. I opted for threading because it's very easy to implement with ThreadPool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Worker function executed in thread
def worker(hostnames: list) -> [WorkerResult]:
    results = []
    for hostname in hostnames:
        v4, v6 = hostname_to_ips(hostname)

        if hostname in asn_cache:
            asn: list = asn_cache[hostname]["asn"]
        else:
            asn = []
            for ip in v4:
                asn += ip2asn.get_asn_of_ip(ip, ip_networks_ipv4)
            for ip in v6:
                asn += ip2asn.get_asn_of_ip(ip, ip_networks_ipv6)

        results.append(WorkerResult(hostname, v4, v6, asn))
    return results

# Main program which creates the threads and runs them with the worker function
pool = ThreadPool(NUM_WORKERS)
worker_results: [WorkerResult] = []

# To batch the worker payload, more than one hostname is passed to a worker to be processed.
# This also helps reducing the overhead of threading and possible global locks (counters)
hostname_batch = []

for instance in sorted(instances, key=lambda x: int(x["users"]), reverse=True)[:limit]:
    hostname = instance["name"]

    if hostname in no_ip_cache:
        # Skip unresolvable hostnames, if they have failed in previous runs
        # and are within a timeout limit
        skipped_no_ip.append(instance)
        continue

    hostname_batch.append(hostname)

    if len(hostname_batch) >= 10:
        # Start full batch
        worker_results.append(pool.apply_async(worker, args=(hostname_batch,)))
        hostname_batch = []  # reset
        continue

if len(hostname_batch):
    # last items which do not fill a batch
    worker_results.append(pool.apply_async(worker, args=(hostname_batch,)))

pool.close()
pool.join()

# Re-struct the results, fetching and unpacking each WorkerResult list from the thread result
worker_results = [] + [item for r in worker_results for item in r.get()]

for wr in worker_results:
    hostname = wr.hostname
# [... program continues with the combined results of all workers ...]

Let's go through this step-by-step. In lines 1-17 the worker function is described. Each worker has the task to a) get the IP address and b) get the corresponding AS for up to ten hostnames. From line 19 the main program runs. Define a ThreadPool with NUM_WORKERS threads (for example NUM_WORKERS = 4), iterate over all instances (line 27) and fill batches of ten hostnames to process in a worker (line 38-40). The last batch might not reach an amount of ten items, so it's handled extra (lines 44-46). Now that all instance hostnames are distributed, wait for all workers to finish (pool.join() in line 49). As a last step of threading, the returned lists of WorkerResult objects are unpacked and concatenated (line 52).

Easy right? Just define the ThreadPool, run a function via pool.apply_async and collect the results. In my case eight workers were used and they did a really good job. If from the 4233 entries in my instances data set only 12 were unresolvable, my setup would delay two minutes (ten seconds request timeout, which is low!) in the sequential implementation. With workers, the failing requests still block, but other hostnames are resolved in the meantime, virtually parallelising the request pipeline. In the best case the longest waiting time for the whole operation would be the DNS timeout, with all other requests finishing during the waiting period of the blocking request.

Threading concludes this blog post. In your next Python application, you'll now be able to search faster, re-use processed data and efficiently distribute workloads. For the whole code base, have a look at my Github repository. Thank you for reading and happy performance hacking!