berechnung_todesfaelle/parse_todesfaelle.py

258 lines
7.9 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
:author: Maximilian Golla
:contact: maximilian.golla@rub.de
:version: 0.0.7, 2022-02-03
:description: Parses and formats RKI Todesfaelle nach Sterbedatum
:data: https://www.rki.de/DE/Content/InfAZ/N/Neuartiges_Coronavirus/Projekte_RKI/COVID-19_Todesfaelle.xlsx
'''
import copy # deep copy dicts
import sys
import math
from collections import OrderedDict
STATES = {
'BB': 2512, # Brandenburg
'BE': 3645, # Berlin
'BW': 11070, # Baden-Württemberg
'BY': 13077, # Bayern
'HB': 683, # Bremen
'HE': 6266, # Hessen
'HH': 1841, # Hamburg
'MV': 1610, # Mecklenburg-Vorpommern
'NI': 7982, # Niedersachsen
'NW': 17933, # Nordrhein-Westfalen
'RP': 4085, # Rheinland-Pfalz
'SH': 2897, # Schleswig-Holstein
'SL': 991, # Saarland
'SN': 4078, # Sachsen
'ST': 2208, # Sachsen-Anhalt
'TH': 2143 # Thüringen
}
def read_file(filename):
data = []
with open(filename, 'r') as inputfile:
inputfile.readline() # Skip the RKI header
for line in inputfile:
line = line.rstrip('\r\n')
data.append(line)
return data
def parse(data):
result = OrderedDict()
# Initialize the data structure with 0
for year in ["2020", "2021", "2022"]:
result[year] = OrderedDict()
if year == "2020": # In 2020 the RKI reports 53 weeks
start = 10
stop = 54
if year == "2021": # In 2021 the RKI reports 52 weeks
start = 1
stop = 53
if year == "2022": # In 2022 the RKI reports 1 week
start = 1
stop = 2
for week in range(start, stop):
week = str(week)
if len(week) == 1:
week = "0" + str(week)
result[year][week] = dict()
for state in STATES:
result[year][week][state] = 0
# Parse the actual data
for line in data:
splitted = line.split(',')
state = splitted[0]
week = str(splitted[1])
if len(week) == 1:
week = "0" + str(week)
year = str(splitted[2])
# Special treatment of "<4" cases
if '<' in splitted[3]:
dead = int(splitted[3].replace('<', '')) - 1
else:
dead = int(splitted[3])
# Noramlize the data or skip this line for absolute values
dead = round( (dead * 100.0) / STATES[state], 2)
# Detect possible data issues in RKI data
if result[year][week][state] != 0:
sys.stderr.write("Error in RKI data: Year {} Week {} State {}\n".format(year, week, state))
sys.exit(-1)
else:
result[year][week][state] = dead
return result
def query(data, base, compare, n):
b_year = base[0:4]
b_week = base[4:6]
c_year = compare[0:4]
c_week = compare[4:6]
print("Base week: {}-{}: {}".format(b_year,b_week, data[b_year][b_week]))
print("Compare week: {}-{}: {}\n".format(c_year,c_week, data[c_year][c_week]))
# Base must always be later than (<=) compare
if b_year > c_year:
sys.stderr.write("Invalid query: b_year {} > c_year {}\n".format(b_year, c_year))
sys.exit(-1)
if b_year == c_year and b_week > c_week:
sys.stderr.write("Invalid query: b_week {} > c_week {}\n".format(b_week, c_week))
sys.exit(-1)
return
# We can not determine more than top / flop 16:
if n > len(STATES):
sys.stderr.write("Invalid query: n {} > no. of states {}\n".format(n, len(STATES)))
sys.exit(-1)
return
# There is no cool way to determine the number of weeks between 202033 and 202119 because of RKI
all_entries = []
years = data.keys()
for year in data:
for week in data[year]:
all_entries.append(year + week)
# Determine the index in our list which marks the base and compare week
start = 0
end = 0
for key, value in enumerate(all_entries):
if value == b_year+b_week:
start = key
if value == c_year+c_week:
end = key
# Init data structures
base_value = dict()
differences = dict()
difference_percent = dict()
for state in STATES:
base_value[state] = 0
differences[state] = 0
difference_percent[state] = 0
# Sum the dead from the beginning of the pandemic to the base week, and from beginning of the pandemic to compare week
for i in range(0, end + 1):
year = all_entries[i][0:4]
week = all_entries[i][4:6]
for state in STATES:
if i <= start:
base_value[state] += data[year][week][state]
differences[state] += data[year][week][state]
print("Beginning to Base Week: ", base_value)
print("Beginning to Compare Week:", differences, "\n")
# Determine the change from the base week to the compare week in absolute and percent
for state in STATES:
# Absolute
diff = round(differences[state] - base_value[state], 2)
# Percentage
diff_percent = round( (diff * 100.0) / base_value[state], 2)
'''
if diff == 0:
diff = "+-" + str(diff)
elif diff > 0:
diff = "+" + str(diff)
else:
diff = str(diff)
'''
differences[state] = diff
'''
if diff_percent == 0:
diff_percent = "+-" + str(diff_percent) + " %"
elif diff_percent > 0:
diff_percent = "+" + str(diff_percent) + " %"
else:
diff_percent = str(diff_percent) + " %"
'''
difference_percent[state] = diff_percent
print("Change (Absolute):")
print(differences)
print("Change (Percent):")
print(difference_percent)
tmp_top = copy.deepcopy(difference_percent)
tmp_flop = copy.deepcopy(difference_percent)
# Get Top and Flop N entries
top = dict()
flop = dict()
for i in range(0, n):
max_key = max(tmp_top, key=tmp_top.get)
min_key = min(tmp_flop, key=tmp_flop.get)
top[max_key] = tmp_top[max_key]
flop[min_key] = tmp_flop[min_key]
del tmp_top[max_key]
del tmp_flop[min_key]
print("\nTop {}:".format(n))
print(top)
print("\nFlop {}:".format(n))
print(flop)
time_series_top = dict()
time_series_flop = dict()
for i in range(0, end + 1):
year = all_entries[i][0:4]
week = all_entries[i][4:6]
for state in top:
if i >= start:
if year not in time_series_top:
time_series_top[year] = dict()
if week not in time_series_top[year]:
time_series_top[year][week] = dict()
if state not in time_series_top[year][week]:
time_series_top[year][week][state] = data[year][week][state]
for state in flop:
if i >= start:
if year not in time_series_flop:
time_series_flop[year] = dict()
if week not in time_series_flop[year]:
time_series_flop[year][week] = dict()
if state not in time_series_flop[year][week]:
time_series_flop[year][week][state] = data[year][week][state]
print("\nTime Series Top:")
output(time_series_top, top)
print("\nTime Series Flop:")
output(time_series_flop, flop)
def output(data, states):
# Print the header
header = ["Jahr", "Woche"]
for state in states:
header.append(state)
print("\t".join(header))
# Print the main data
for year in data:
for week in data[year]:
line = [str(year), str(week)]
for state in states:
dead = str(data[year][week][state])
line.append(dead)
print("\t".join(line))
def main():
data = read_file('COVID-19_Todesfaelle.csv')
data = parse(data)
output(data, STATES)
base = "202130"
compare = "202139"
query(data, base, compare, 5)
if __name__ == '__main__':
main()