#!/usr/bin/env python # -*- coding: utf-8 -*- ''' :author: Maximilian Golla :contact: maximilian.golla@rub.de :version: 0.0.7, 2022-02-03 :description: Parses and formats RKI Todesfaelle nach Sterbedatum :data: https://www.rki.de/DE/Content/InfAZ/N/Neuartiges_Coronavirus/Projekte_RKI/COVID-19_Todesfaelle.xlsx ''' import copy # deep copy dicts import sys import math from collections import OrderedDict STATES = { 'BB': 2512, # Brandenburg 'BE': 3645, # Berlin 'BW': 11070, # Baden-Württemberg 'BY': 13077, # Bayern 'HB': 683, # Bremen 'HE': 6266, # Hessen 'HH': 1841, # Hamburg 'MV': 1610, # Mecklenburg-Vorpommern 'NI': 7982, # Niedersachsen 'NW': 17933, # Nordrhein-Westfalen 'RP': 4085, # Rheinland-Pfalz 'SH': 2897, # Schleswig-Holstein 'SL': 991, # Saarland 'SN': 4078, # Sachsen 'ST': 2208, # Sachsen-Anhalt 'TH': 2143 # Thüringen } def read_file(filename): data = [] with open(filename, 'r') as inputfile: inputfile.readline() # Skip the RKI header for line in inputfile: line = line.rstrip('\r\n') data.append(line) return data def parse(data): result = OrderedDict() # Initialize the data structure with 0 for year in ["2020", "2021", "2022"]: result[year] = OrderedDict() if year == "2020": # In 2020 the RKI reports 53 weeks start = 10 stop = 54 if year == "2021": # In 2021 the RKI reports 52 weeks start = 1 stop = 53 if year == "2022": # In 2022 the RKI reports 1 week start = 1 stop = 2 for week in range(start, stop): week = str(week) if len(week) == 1: week = "0" + str(week) result[year][week] = dict() for state in STATES: result[year][week][state] = 0 # Parse the actual data for line in data: splitted = line.split(',') state = splitted[0] week = str(splitted[1]) if len(week) == 1: week = "0" + str(week) year = str(splitted[2]) # Special treatment of "<4" cases if '<' in splitted[3]: dead = int(splitted[3].replace('<', '')) - 1 else: dead = int(splitted[3]) # Noramlize the data or skip this line for absolute values dead = round( (dead * 100.0) / STATES[state], 2) # Detect possible data issues in RKI data if result[year][week][state] != 0: sys.stderr.write("Error in RKI data: Year {} Week {} State {}\n".format(year, week, state)) sys.exit(-1) else: result[year][week][state] = dead return result def query(data, base, compare, n): b_year = base[0:4] b_week = base[4:6] c_year = compare[0:4] c_week = compare[4:6] print("Base week: {}-{}: {}".format(b_year,b_week, data[b_year][b_week])) print("Compare week: {}-{}: {}\n".format(c_year,c_week, data[c_year][c_week])) # Base must always be later than (<=) compare if b_year > c_year: sys.stderr.write("Invalid query: b_year {} > c_year {}\n".format(b_year, c_year)) sys.exit(-1) if b_year == c_year and b_week > c_week: sys.stderr.write("Invalid query: b_week {} > c_week {}\n".format(b_week, c_week)) sys.exit(-1) return # We can not determine more than top / flop 16: if n > len(STATES): sys.stderr.write("Invalid query: n {} > no. of states {}\n".format(n, len(STATES))) sys.exit(-1) return # There is no cool way to determine the number of weeks between 202033 and 202119 because of RKI all_entries = [] years = data.keys() for year in data: for week in data[year]: all_entries.append(year + week) # Determine the index in our list which marks the base and compare week start = 0 end = 0 for key, value in enumerate(all_entries): if value == b_year+b_week: start = key if value == c_year+c_week: end = key # Init data structures base_value = dict() differences = dict() difference_percent = dict() for state in STATES: base_value[state] = 0 differences[state] = 0 difference_percent[state] = 0 # Sum the dead from the beginning of the pandemic to the base week, and from beginning of the pandemic to compare week for i in range(0, end + 1): year = all_entries[i][0:4] week = all_entries[i][4:6] for state in STATES: if i <= start: base_value[state] += data[year][week][state] differences[state] += data[year][week][state] print("Beginning to Base Week: ", base_value) print("Beginning to Compare Week:", differences, "\n") # Determine the change from the base week to the compare week in absolute and percent for state in STATES: # Absolute diff = round(differences[state] - base_value[state], 2) # Percentage diff_percent = round( (diff * 100.0) / base_value[state], 2) ''' if diff == 0: diff = "+-" + str(diff) elif diff > 0: diff = "+" + str(diff) else: diff = str(diff) ''' differences[state] = diff ''' if diff_percent == 0: diff_percent = "+-" + str(diff_percent) + " %" elif diff_percent > 0: diff_percent = "+" + str(diff_percent) + " %" else: diff_percent = str(diff_percent) + " %" ''' difference_percent[state] = diff_percent print("Change (Absolute):") print(differences) print("Change (Percent):") print(difference_percent) tmp_top = copy.deepcopy(difference_percent) tmp_flop = copy.deepcopy(difference_percent) # Get Top and Flop N entries top = dict() flop = dict() for i in range(0, n): max_key = max(tmp_top, key=tmp_top.get) min_key = min(tmp_flop, key=tmp_flop.get) top[max_key] = tmp_top[max_key] flop[min_key] = tmp_flop[min_key] del tmp_top[max_key] del tmp_flop[min_key] print("\nTop {}:".format(n)) print(top) print("\nFlop {}:".format(n)) print(flop) time_series_top = dict() time_series_flop = dict() for i in range(0, end + 1): year = all_entries[i][0:4] week = all_entries[i][4:6] for state in top: if i >= start: if year not in time_series_top: time_series_top[year] = dict() if week not in time_series_top[year]: time_series_top[year][week] = dict() if state not in time_series_top[year][week]: time_series_top[year][week][state] = data[year][week][state] for state in flop: if i >= start: if year not in time_series_flop: time_series_flop[year] = dict() if week not in time_series_flop[year]: time_series_flop[year][week] = dict() if state not in time_series_flop[year][week]: time_series_flop[year][week][state] = data[year][week][state] print("\nTime Series Top:") output(time_series_top, top) print("\nTime Series Flop:") output(time_series_flop, flop) def output(data, states): # Print the header header = ["Jahr", "Woche"] for state in states: header.append(state) print("\t".join(header)) # Print the main data for year in data: for week in data[year]: line = [str(year), str(week)] for state in states: dead = str(data[year][week][state]) line.append(dead) print("\t".join(line)) def main(): data = read_file('COVID-19_Todesfaelle.csv') data = parse(data) output(data, STATES) base = "202130" compare = "202139" query(data, base, compare, 5) if __name__ == '__main__': main()