import re
from typing import Union
import numpy as np
import pandas as pd
from pandas.core.dtypes.common import is_integer_dtype
from iccas.queries import cols, only_counts
from iccas.types import PandasObj
[docs]def reindex_by_interpolating(
data: PandasObj,
new_index: pd.DatetimeIndex,
preserve_ints: bool = True,
method="pchip",
**interpolation,
) -> PandasObj:
"""
Reindexes `data` and fills new values by interpolation (PCHIP, by default).
This function was motivated by the fact that :meth:`pandas.DataFrame.resample`
followed by :meth:`pandas.DataFrame.resample` doesn't take into account
misaligned datetimes.
Args:
data:
a DataFrame or Series with a datetime index
new_index:
preserve_ints:
after interpolation, columns containing integers in the original
dataframe are rounded and converted back to int
method:
interpolation method (see :meth:`pandas.DataFrame.interpolate`)
**interpolation:
other interpolation keyword argument different from `method` passed
to :meth:`pandas.DataFrame.interpolate`
Returns:
a new Dataframe/Series
See Also:
:func:`reindex_by_interpolating`
"""
extended_index = data.index.union(new_index)
out = data.reindex(extended_index)
out = out.interpolate(method=method, **interpolation) # fill NaNs
out = out.loc[new_index]
if preserve_ints:
if isinstance(data, pd.DataFrame):
int_cols = [
col for col, dtype in data.dtypes.items() if is_integer_dtype(dtype)
]
out.loc[:, int_cols] = out.loc[:, int_cols].round().astype(int)
elif is_integer_dtype(data):
out = out.round().astype(int)
out.index.rename("date", inplace=True)
return out
[docs]def resample(
data: PandasObj,
freq: Union[int, str] = "1D",
hour: int = 18,
preserve_ints: bool = True,
method="pchip",
**interpolation,
) -> PandasObj:
"""
Resamples `data` and fills missing values by interpolation.
The resulting index is a `pandas.DatetimeIndex` whose elements are spaced
accordingly to `freq` and having the time set to `{hour}:00`.
In the case of "day frequencies" ('{num}D'), the index always includes the
latest date (`data.index[-1]`): the new index is a datetime range built
going backwards from the latest date.
This function was motivated by the fact that :meth:`pandas.DataFrame.resample`
followed by :meth:`pandas.DataFrame.resample` doesn't take into account
misaligned datetimes. If you want to back-fill or forward-fill, just use
:meth:`DataFrame.resample`.
Args:
data:
a DataFrame or Series with a datetime index
freq:
resampling frequency in `pandas` notation
hour:
reference hour; all datetimes in the new index will have this hour
preserve_ints:
after interpolation, columns containing integers in the original
dataframe are rounded and converted back to int
method:
interpolation method (see :meth:`pandas.DataFrame.interpolate`)
**interpolation:
other interpolation keyword argument different from `method` passed
to :meth:`pandas.DataFrame.interpolate`
Returns:
a new Dataframe/Series with index elements spaced according to ``freq``
See Also:
:func:`reindex_by_interpolating`
"""
if isinstance(freq, int):
freq = f"{freq}D"
else:
freq = freq.upper()
match = re.fullmatch(r"(\d*)D", freq)
if match:
new_index = pd.date_range(
data.index[-1].replace(hour=hour, minute=0),
data.index[0].replace(hour=hour, minute=0),
freq="-1D" if freq == "D" else f"-{freq}",
)[::-1]
else:
new_index = pd.date_range(
data.index[0],
data.index[-1].replace(hour=hour, minute=0),
freq=freq,
)
return reindex_by_interpolating(
data, new_index, preserve_ints=preserve_ints, method=method, **interpolation
)
# ===========================
# DATA CORRECTION
# ===========================
[docs]def nullify_series_local_bumps(series: pd.Series):
""" Set to NaN all elements s[i] such that s[i] > s[i+k] """
curr_min = np.inf
bad = set()
for i in reversed(series.index):
if series[i] > curr_min:
bad.add(i)
else:
curr_min = series[i]
return series.mask(series.index.isin(bad))
[docs]def nullify_local_bumps(df: pd.DataFrame):
def f(series):
if "unknown" in series.name:
return series
return nullify_series_local_bumps(series)
return df.apply(f)
[docs]def fix_monotonicity(data: pd.DataFrame, method="pchip", **interpolation):
"""
Replaces tracts of all cases and deaths time series that break the non-decreasing
trend of the series with interpolated data.
This function also ensures that the following conditions are still satisfied
even after the "correction"::
male_cases + female_cases <= cases
male_deaths + female_deaths <= deaths
Non-integer columns, if present, are ignored and returned as they are in
the output DataFrame.
Args:
data: a DataFrame containing all integer columns about cases and deaths
method: interpolation method
Returns:
a DataFrame with all integer time series (columns) modified so that they
are non-decreasing time series
"""
# Fix all individual series independently
orig = only_counts(data)
fixed = (
nullify_local_bumps(orig)
.interpolate(method=method, **interpolation)
.round()
.astype(int)
)
# Ensure that (males + females <= total) for each age group (including
# "unknown age" group) taking the maximum between the fixed totals and the
# fixed males+females
for variable in ["deaths", "cases"]:
males_plus_females = fixed[cols("mf", variable)].sum(axis=1, level=1)
totals = fixed[variable]
totals_fixed = (
totals
.where(totals >= males_plus_females)
.fillna(males_plus_females)
)
fixed[variable] = totals_fixed
return fixed.astype(int)