kohlmajer.de



DWD CDC (Climate Data Center)

— letzte Aktualisierung am

1932 Wörter, Lesezeit: 11 min

Der Deutsche Wetterdienst bietet über das CDC (Climate Data Center) offenen Zugang zu vielfältigen Klimadaten an. Glücklicherweise befindet sich eine Wetterstation des DWD im Norden von Nienburg (ID 3612, Lage 52.671° N, 9.223° E). Die Stationsliste des DWD kann hier eingesehen werden.

Auf dem CDC-Server gibt es einen guten Hilfebereich. Dort gibt es z. B. eine Excel-Liste für die erfassten Parameter. Die Datenqualität ist sehr gut. Für die Jahre 2008 bis 2023 liegen etwa 836 k Datensätze mit einer Datenverfügbarkeit von mindestens 99 % vor 😌.

Update! Der DWD hat den Betrieb der Wetterstation 3612 Nienburg zum 01. Juli 2025 eingestellt. Das private Grundstück, auf dem die Station steht, wird verkauft und daher wurde der Vertrag gekündigt. Sehr schade 😞!

Ich frage diese Daten aus dem 10-Minuten-Messarchiv ab; jeweils aus dem recent-Unterordner d. h. die Daten vom Vortag:

Apache IoTDB

Zum Speichern habe ich mich für Apache IoTDB (v2.0.3) als Open-Source Zeitreihendatenbank entschieden; zuvor hatte ich InfluxDB (v2) im Einsatz.

Das ganze wird über ein Python-Skript abgearbeitet, das direkt auf meinem IoT-Server läuft. Die Daten werden als Pandas-Dataframe aufbereitet und als CSV-Datei exportiert. Die CSV-Datei kann sehr leicht direkt über ein Skript in Apache IoTDB geladen werden. Der Cronjob läuft einmal morgens und braucht etwas weniger als zehn Sekunden:

################################
##
## cronjob.py
##
## import DWD CDC data for weather station Nienburg (3612)
## data will be added to IoTDB device root.dwd.id3612
##
## timeseries:
## root.dwd.id3612.tt_10: momentane Lufttemperatur in 2 m Höhe [°C]
## root.dwd.id3612.tm5_10: momentane Temperatur in 5 cm Höhe [°C]
## root.dwd.id3612.rws_10: Summe der Niederschlagshöhe der vorangegangenen 10 min [mm]
## root.dwd.id3612.rf_10: relative Feuchte in 2 m Höhe [%]
## root.dwd.id3612.sd_10: Sonnenscheindauer der vorangegangenen 10 min [min]
## root.dwd.id3612.td_10: Taupunkttemperatur in 2 m Höhe [°C]
##
## 03-NOV-2024
##
################################

# packages
import datetime
import pandas as pd
import logging
import os
import sys
import platform
import time
import io{#alpha-101}
from zipfile import ZipFile
from urllib.request import urlopen

# init logger
ts = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
logging.basicConfig(
    filename="cronjob.log",
    filemode="a",
    format="%(asctime)s.%(msecs)d %(levelname)s %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
    level=logging.DEBUG,
)

# set session to UTC
os.environ["TZ"] = "UTC"
time.tzset()

logging.debug("################")
logging.debug(sys.executable)
logging.debug(sys.version)

# DWD URLs
root = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/10_minutes"
url = {
    "tu": root + "/air_temperature/recent/10minutenwerte_TU_03612_akt.zip",
    "rr": root + "/precipitation/recent/10minutenwerte_nieder_03612_akt.zip",
    "sd": root + "/solar/recent/10minutenwerte_SOLAR_03612_akt.zip",
}
df = pd.DataFrame([])

try:
    resp = urlopen(url["tu"])
    zip = ZipFile(io.BytesIO(resp.read()))
    f = zip.namelist()[0]
    logging.info("open URL " + url["tu"])
    logging.info("read " + f)
    data = io.TextIOWrapper(zip.open(f))
    tu = pd.read_csv(
        data,
        sep=";",
        na_values="-999",
        index_col="MESS_DATUM",
        parse_dates=True,
        date_format="%Y%m%d%H%M",
    )
    tu.drop(["STATIONS_ID", "  QN", "PP_10", "eor"], axis=1, inplace=True)
    tu.rename(
        {"TT_10": "tt_10", "TM5_10": "tm5_10", "RF_10": "rf_10", "TD_10": "td_10"},
        axis=1,
        inplace=True,
    )
    tu.index.names = ["timestamp"]
    tu.sort_index(inplace=True)
    tu.dropna(how="all", inplace=True)
    tu = tu[~tu.index.duplicated(keep="first")]
    df = tu
    logging.info("processed TT_10, TM5_10, RF_10, TD_10")
except:
    logging.error("cannot open URL " + url["tu"])

try:
    resp = urlopen(url["rr"])
    zip = ZipFile(io.BytesIO(resp.read()))
    f = zip.namelist()[0]
    logging.info("open URL " + url["rr"])
    logging.info("read " + f)
    data = io.TextIOWrapper(zip.open(f))
    rr = pd.read_csv(
        data,
        sep=";",
        na_values="-999",
        index_col="MESS_DATUM",
        parse_dates=True,
        date_format="%Y%m%d%H%M",
    )
    rr.drop(
        ["STATIONS_ID", "  QN", "RWS_DAU_10", "RWS_IND_10", "eor"], axis=1, inplace=True
    )
    rr.rename({"RWS_10": "rws_10"}, axis=1, inplace=True)
    rr.index.names = ["timestamp"]
    rr.sort_index(inplace=True)
    rr = rr[~rr.index.duplicated(keep="first")]
    df = pd.concat([df, rr], axis=1)
    df.dropna(how="all", inplace=True)
    logging.info("processed RWS_10")
except:
    logging.error("cannot open URL " + url["rr"])

try:
    resp = urlopen(url["sd"])
    zip = ZipFile(io.BytesIO(resp.read()))
    f = zip.namelist()[0]
    logging.info("open URL " + url["sd"])
    logging.info("read " + f)
    data = io.TextIOWrapper(zip.open(f))
    sd = pd.read_csv(
        data,
        sep=";",
        na_values="-999",
        index_col="MESS_DATUM",
        parse_dates=True,
        date_format="%Y%m%d%H%M",
    )
    sd.drop(
        ["STATIONS_ID", "  QN", "DS_10", "GS_10", "LS_10", "eor"], axis=1, inplace=True
    )
    sd.rename({"SD_10": "sd_10"}, axis=1, inplace=True)
    sd.index.names = ["timestamp"]
    sd.sort_index(inplace=True)
    sd = sd[~sd.index.duplicated(keep="first")]
    df = pd.concat([df, sd], axis=1)
    df["sd_10"] = 60.0 * df["sd_10"]
    df.dropna(how="all", inplace=True)
    logging.info("processed SD_10")
except:
    logging.error("cannot open URL " + url["sd"])

# convert index to column
df.reset_index(inplace=True)

# take last 1000 records=~ 1 week
# rename to match IoTDB
logging.info("prepare dataframe and take last 1000 records (approx. 1 week)")
df = df.tail(1000)
df.rename(
    {
        "timestamp": "time",
        "tt_10": "root.dwd.id3612.tt_10",
        "tm5_10": "root.dwd.id3612.tm5_10",
        "rf_10": "root.dwd.id3612.rf_10",
        "td_10": "root.dwd.id3612.td_10",
        "rws_10": "root.dwd.id3612.rws_10",
        "sd_10": "root.dwd.id3612.sd_10",
    },
    axis=1,
    inplace=True,
)

try:
    df.to_csv("./cronjob.csv", na_rep="null", index=False)
    logging.info("export to cronjob.csv")
except:
    logging.error("failed to export to cronjob.csv")

# import data to iotdb
try:
    os.system("/bin/sh -c ./import.sh")
    logging.info("import data to iotdb using ./import.sh")
except:
    logging.error("failed to import data to iotdb")

Import nach IoTDB:

#!/bin/sh
# file import.sh

# set variables
v=2.0.3
data=/home/andreas/python-venvs/dwd-iotdb

cd /opt/apache-iotdb-$v-all-bin/confgzip
/opt/apache-iotdb-$v-all-bin/tools/import-data.sh \
--host localhost --port 6667 \
--username xxx --password 'yyy' \
--source $data/cronjob.csv -aligned true --timestamp_precision ms -lpf 1000 -ft csv

Auswertung

Die 10-Minuten-Werte werden zunächst auf Tagesbasis aggregiert (Minimum, Maximum, Mittelwert) und dann weiter verarbeitet. Aus den aggregierten Temperaturdaten leite ich die folgenden Parameter ab:

Beispiel SQL-Abfrage für die Frosttage — Apache IoTDB SQL-Manual:

/* drop aggregated timeseries */ drop timeseries root.dwd.id3612.agg.*
/* aggregate minimum */ select min_value(tt_10) into root.dwd.id3612.agg(mintemp) from root.dwd.id3612 group by ([2008-01-01T00:00:00+00:00,2025-01-01T00:00:00+00:00),1d)
/* Frosttag */ select count(mintemp) from root.dwd.id3612.agg where mintemp < 0 group by ([2008-01-01T00:00:00+00:00,2025-01-01T00:00:00+00:00),1y)

Tabellarische Zusammenfassung


Wetterstatistik DWD-Station 3612 (Nienburg/Weser)
JahrDatenpunkteMin [°C]Avg [°C]Max [°C]EistagFrosttagVegetationstagHeiztagSommertagTropennachtTropentagWüstentagHeizgradtag
200852704-10.110.333.05502772643201002070
200952560-17.010.236.61667271262430712098
201052560-18.98.535.6501012482773221242733
201152560-9.910.332.1966276257380302017
201252704-18.19.836.41658287268360712211
201352560-12.99.435.319842592673201112380
201452560-10.111.133.11143302265330301769
201552560-6.310.537.6149286281360811969
201652704-12.310.334.79662732444501102161
201752560-10.310.333.2940281255250402019
201847829-11.810.334.97642352266001101990
201951665-8.811.237.43462962485021931840
202052704-5.211.435.60243052614011111718
202152560-18.610.032.61451275252390202181
202252560-11.911.237.96512932475902131872
202352560-6.111.233.84502962415301001808
202452704-9.011.634.3532308235490901711
202525768-9.89.133.6657124145160301199

Die Daten werden ebenfalls über ein kleines Python-Skript zusammengestellt und sind hier ( Datenstand: Tue, 01 Jul 2025 07:50:00 UTC ) als CSV-Datei verfügbar. Die Rohdaten der mittleren Tagestemperatur sind hier als gzip-CSV-Datei verfügbar.

Die Daten werden aus IoTDB per REST API abgefragt. Das Python-Skript:

# packages
from configparser import ConfigParser
import pandas as pd
import numpy as np
import os
import time
import requests
import base64
import json
from datetime import datetime


# functions
def b64(x: str):
    return base64.b64encode(x.encode("ascii")).decode("ascii")


# load config
config = ConfigParser()
config.read("secrets.ini")

# set session to UTC
os.environ["TZ"] = "UTC"
time.tzset()

# post request skeleton
user = config["iotdb"]["user"]
pwd = config["iotdb"]["pwd"]
url = "https://" + config["iotdb"]["server"] + config["iotdb"]["url"] + "/query"
headers = {
    "Authorization": "Basic " + b64(user + ":" + pwd),
    "Content-Type": "application/json",
}

# initial df
endyear = str(int(datetime.now().strftime("%Y")) + 1)
range = "[2008-01-01T00:00:00+00:00," + endyear + "-01-01T00:00:00+00:00)"
sql = {
    "sql": "select count(tt_10) as 'records',min_value(tt_10) as 'min_t',max_value(tt_10) as 'max_t',round(avg(tt_10),1) as 'avg_t' from root.dwd.id3612 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df = pd.DataFrame(
    data=np.array(data["values"]).T,
    columns=["Datenpunkte", "Min [°C]", "Max [°C]", "Avg [°C]"],
)
df["Jahr"] = np.float64(data["timestamps"]) / 1000
df["Jahr"] = df["Jahr"].map(lambda x: datetime.fromtimestamp(x).strftime("%Y"))
df = df[["Jahr", "Datenpunkte", "Min [°C]", "Avg [°C]", "Max [°C]"]]

# build aggregation
build = True

if build:

    # remove timeseries
    sql = {"sql": "drop timeseries root.dwd.id3612.agg.*"}
    post = requests.post(url, json=sql, headers=headers)
    if not post.status_code == 200:
        print("failed!")

    # agg mintemp
    sql = {
        "sql": "select min_value(tt_10) into root.dwd.id3612.agg(mintemp) from root.dwd.id3612 group by ("
        + range
        + ",1d)"
    }
    post = requests.post(url, json=sql, headers=headers)
    if not post.status_code == 200:
        print("failed!")

    # agg avgtemp
    sql = {
        "sql": "select avg(tt_10) into root.dwd.id3612.agg(avgtemp) from root.dwd.id3612 group by ("
        + range
        + ",1d)"
    }
    post = requests.post(url, json=sql, headers=headers)
    if not post.status_code == 200:
        print("failed!")

    # agg maxtemp
    sql = {
        "sql": "select max_value(tt_10) into root.dwd.id3612.agg(maxtemp) from root.dwd.id3612 group by ("
        + range
        + ",1d)"
    }
    post = requests.post(url, json=sql, headers=headers)
    if not post.status_code == 200:
        print("failed!")

# Eistag
sql = {
    "sql": "select count(maxtemp) from root.dwd.id3612.agg where maxtemp < 0 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Eistag"] = np.array(data["values"]).T

# Frosttag
sql = {
    "sql": "select count(mintemp) from root.dwd.id3612.agg where mintemp < 0 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Frosttag"] = np.array(data["values"]).T

# Vegetationstag
sql = {
    "sql": "select count(avgtemp) from root.dwd.id3612.agg where avgtemp >= 5 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Vegetationstag"] = np.array(data["values"]).T

# Heiztag
sql = {
    "sql": "select count(avgtemp) from root.dwd.id3612.agg where avgtemp < 15 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Heiztag"] = np.array(data["values"]).T

# Sommertag
sql = {
    "sql": "select count(maxtemp) from root.dwd.id3612.agg where maxtemp >= 25 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Sommertag"] = np.array(data["values"]).T

# Tropennacht
sql = {
    "sql": "select count(mintemp) from root.dwd.id3612.agg where mintemp >= 20 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Tropennacht"] = np.array(data["values"]).T

# Tropentag
sql = {
    "sql": "select count(maxtemp) from root.dwd.id3612.agg where maxtemp >= 30 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Tropentag"] = np.array(data["values"]).T

# Wüstentag
sql = {
    "sql": "select count(maxtemp) from root.dwd.id3612.agg where maxtemp >= 35 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Wüstentag"] = np.array(data["values"]).T

# Heizgradtag
sql = {
    "sql": "select round(sum(15-avgtemp),0) from root.dwd.id3612.agg where avgtemp < 15 group by ("
    + range
    + ",1y)"
}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df["Heizgradtag"] = np.array(data["values"]).T

# last
sql = {"sql": "select last tt_10 from root.dwd.id3612"}
post = requests.post(url, json=sql, headers=headers)
last = datetime.fromtimestamp(
    np.float64(json.loads(post.text)["timestamps"][0]) / 1000
).strftime("%a, %d %b %Y %H:%M:%S UTC")

# JSON output
df = df.astype({"Datenpunkte": "int32", "Jahr": "int32", "Heizgradtag": "int32"})
df.to_csv("./34534878-937d-4077-8bb7-60904025b6e0.csv", index=False)

# daily values csv.gz
sql = {"sql": "select round(avgtemp,1) from root.dwd.id3612.agg"}
post = requests.post(url, json=sql, headers=headers)
data = json.loads(post.text)
df = pd.DataFrame(data=np.array(data["values"]).T, columns=["Avg [°C]"])
df["Tag"] = np.float64(data["timestamps"]) / 1000
df["Tag"] = df["Tag"].map(lambda x: datetime.fromtimestamp(x).strftime("%Y-%m-%d"))
df = df[["Tag", "Avg [°C]"]]
df.to_csv(
    "./8e364833-4d53-4592-97d9-428d4fd3b670.csv.gz", index=False, compression="gzip"
)

Der Klimawandel

Wie sieht es mit dem Klimawandel aus? Die Statistik ist eindeutig! (Daten für 2008 bis einschließlich 2024)

# from scipy import stats
from statsmodels.formula.api import ols
import pandas as pd

df = pd.read_csv("8e364833-4d53-4592-97d9-428d4fd3b670.csv.gz")
df.rename(columns={"Avg [°C]": "y"}, inplace=True)  # dependend variable
df["Tag"] = pd.to_datetime(df["Tag"])
df.set_index("Tag", inplace=True)
df = df.loc["2008-01-01":"2024-12-31"]  # select 2008 -- 2024

# set x variable as factional index per year, so slope is average increase per year
df["x"] = range(len(df))
df["x"] = df["x"] / 365.0

# fit and print regression report
model = ols("y ~ x", df).fit()
print(model.summary())

# print confidence intervals for 99.9%
print("\nLower and upper bound for alpha = 0.001:")
print(model.conf_int(alpha=0.001, cols=None))
                            OLS Regression Results                            
==============================================================================
Dep. Variable:                      y   R-squared:                       0.008
Model:                            OLS   Adj. R-squared:                  0.008
Method:                 Least Squares   F-statistic:                     47.89
Date:                Wed, 02 Jul 2025   Prob (F-statistic):           4.97e-12
Time:                        20:25:42   Log-Likelihood:                -20631.
No. Observations:                6173   AIC:                         4.127e+04
Df Residuals:                    6171   BIC:                         4.128e+04
Df Model:                           1                                         
Covariance Type:            nonrobust                                         
==============================================================================
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
Intercept      9.4041      0.174     53.989      0.000       9.063       9.746
x              0.1235      0.018      6.920      0.000       0.088       0.158
==============================================================================
Omnibus:                      159.637   Durbin-Watson:                   0.106
Prob(Omnibus):                  0.000   Jarque-Bera (JB):               89.436
Skew:                          -0.120   Prob(JB):                     3.80e-20
Kurtosis:                       2.461   Cond. No.                         19.7
==============================================================================

Notes:
[1] Standard Errors assume that the covariance matrix of the errors is correctly specified.

Lower and upper bound for alpha = 0.001:
                  0         1
Intercept  8.830636  9.977507
x          0.064729  0.182199

Die mittlere Tagestemperatur über den Zeitraum von Anfang 2008 bis Ende 2024 (17 Jahre; mehr als 6.1 k Datenpunkte) beträgt 9.4 °C. Das Konfidenzintervall für ein Konfidenzniveau von 99.9 % ist {8.8 ; 10.0}. Die Steigung beträgt im Mittel 0.12 °C/Jahr mit einem Konfidenzintervall von {0.06 ; 0.18} °C/Jahr und ist somit positiv! Dieses Intervall enthält den wahren Wert mit 99.9 % Wahrscheinlichkeit! Der mittlere Anstieg beträgt etwa 1.3 % pro Jahr 😢.