from great_tables import GT, md
import numpy as np
import numpy.typing as npt
import pandas as pd
from pathlib import Path
import plotnine as pn
import statsmodels.api as sm
from tqdm import tqdm
from typing import Any, Callable, List
= Path().cwd()
base_dir = np.random.default_rng() generator
Conformal Prediction Beyond Exchangeability
Replicating empirical examples from Conformal Prediction Beyond Exchangeability by Barber, et al.
Dependencies
Data
# Electricity data
= pd.read_csv(base_dir / "data" / "electricity-normalized.csv")
electricity = (
electricity
electricity17760:]
.iloc[=lambda x: x.period*24)
.assign(periodlambda df: (df["period"] >= 9) & (df["period"] <= 12)]
.loc["transfer", "nswprice", "vicprice", "nswdemand", "vicdemand"]]
[[=True)
.reset_index(drop
)= electricity.sample(frac=1).reset_index(drop=True)
permuted
# Function to generate simulated data
def sim_data(N: int, d: int, setting: int) -> tuple[pd.DataFrame, npt.NDArray]:
= np.random.multivariate_normal(mean=np.zeros(d), cov=np.eye(d), size=N)
X if setting == 1:
= np.array([2, 1, 0, 0])
beta = X @ beta + np.random.normal(0, 1, N)
y = pd.DataFrame(X, columns=[f"feature_{i+1}" for i in range(d)])
X elif setting == 2:
= np.array([2, 1, 0, 0])
beta_1 = np.array([0, -2, -1, 0])
beta_2 = np.array([0, 0, 2, 1])
beta_3 = np.zeros(N)
y # Generate y for different segments
500] = X[:500] @ beta_1 + np.random.normal(0, 1, 500)
y[:500:1500] = X[500:1500] @ beta_2 + np.random.normal(0, 1, 1000)
y[1500:] = X[1500:] @ beta_3 + np.random.normal(0, 1, 500)
y[= pd.DataFrame(X, columns=[f"feature_{i+1}" for i in range(d)])
X else:
= np.array([2, 1, 0, 0])
beta_start = np.array([0, 0, 2, 1])
beta_end = np.linspace(beta_start, beta_end, N)
beta = np.array([X[i] @ beta[i] + np.random.normal(0, 1) for i in range(N)])
y = pd.DataFrame(X, columns=[f'feature_{i+1}' for i in range(d)])
X return (X, y)
Functions
The nexcp_split
function implements non-exchangeable split conformal prediction (CP). However, we can force it to also implement standard CP, which assumes exchangeability, by setting uniform weights. So we only need one function to replicate the results!
def normalize_weights(weights: npt.NDArray):
return weights / weights.sum()
def nexcp_split(
model: Callable[[npt.NDArray, pd.DataFrame, npt.NDArray], Any],int], npt.NDArray],
split_function: Callable[[
y: npt.NDArray,
X: pd.DataFrame,int], npt.NDArray],
tag_function: Callable[[int], npt.NDArray],
weight_function: Callable[[float,
alpha: int
test_index:
):"""Implements non-exchangeable split conformal prediction"""
# Pull test observation from data
= y[test_index]
y_test = X.iloc[[test_index]]
X_test # Select all observations up to that point
= y[:test_index]
y = X.iloc[:test_index]
X # Generate indices for train/calibration split
= split_function(test_index)
split_indices # Split data, tags, and weights
= X.iloc[split_indices]
X_train = y[split_indices]
y_train = X.drop(split_indices)
X_calib = np.delete(y, split_indices)
y_calib # Generate tags and weights
= tag_function(test_index)
tags = weight_function(test_index)
weights # Train model
= model(y_train, X_train, weights=tags[split_indices])
model_base = model_base.fit()
model_fitted # Generate residuals
= np.abs(y_calib - model_fitted.predict(X_calib))
residuals # Calculate weighted quantile of residuals
= normalize_weights(np.delete(weights[:test_index], split_indices))
weights_calib = np.quantile(
q_hat
residuals,1 - alpha,
=weights_calib,
weights="inverted_cdf"
method
)# Calculate predicted value
= model_fitted.predict(X_test).iloc[0]
y_hat # Generate CI
= y_hat - q_hat
lb = y_hat + q_hat
ub = lb <= y_test <= ub
covered return {"ci": np.array([lb, y_hat, ub]), "covered": covered, "width": ub-lb}
def plot_rolling_coverage(
dict],
results: List[float = 0.1,
alpha: int = 300,
window: int = 2,
rows: bool = False
repeated:
):"""Plot the algorithm's mean coverage over a sliding window"""
= pd.DataFrame(results)
coverage_df if repeated:
= (
coverage_df
coverage_df"method", "dataset", "index"])["covered"]
.groupby([
.mean()
.reset_index()
)"coverage_mean"] = (
coverage_df[
coverage_df"method", "dataset"])["covered"]
.groupby([lambda x: x.rolling(window=window).mean())
.transform(
)"time"] = coverage_df.groupby(["method", "dataset"]).cumcount() + 1
coverage_df[= coverage_df.dropna(subset=["coverage_mean"])
coverage_df = (
coverage_plot
pn.ggplot(
coverage_df,="time", y="coverage_mean", color="method", group="method")
pn.aes(x
)+ pn.geom_line()
+ pn.geom_hline(yintercept=1-alpha, linetype="solid")
+ pn.scale_y_continuous(limits=(0, 1))
+ pn.facet_wrap("~ dataset", nrow=rows, scales="free")
+ pn.theme_538()
+ pn.labs(x="Time", y="Coverage", color="Method")
)return coverage_plot
def plot_rolling_width(
dict,
results: int = 300,
window: int = 2,
rows: bool = False
repeated:
):"""Plot the algorithm's mean prediction interval width over a sliding window"""
= pd.DataFrame(results)
width_df if repeated:
= (
width_df
width_df"method", "dataset", "index"])["width"]
.groupby([
.mean()
.reset_index()
)"width_mean"] = (
width_df[
width_df"method", "dataset"])["width"]
.groupby([lambda x: x.rolling(window=window).mean())
.transform(
)"time"] = width_df.groupby(["method", "dataset"]).cumcount() + 1
width_df[= width_df.dropna(subset=["width_mean"])
width_df = (
width_plot
pn.ggplot(
width_df,="time", y="width_mean", color="method", group="method")
pn.aes(x
)+ pn.geom_line()
+ pn.facet_wrap("~ dataset", nrow=rows, scales="free")
+ pn.theme_538()
+ pn.labs(x="Time", y="Width", color="Method")
)return width_plot
Electricity example
Note: I implement non-exchangeable split CP with least-squares by using WLS and setting all the tags to a uniform value. Standard CP is implemented by setting all the weights to a uniform value as mentioned above.
= lambda x: np.sort(generator.choice(x, int(np.floor(x*0.3)), replace=False))
split_fn = []
results
# Create X and y for the normal and permuted data
= (electricity.drop("transfer", axis=1), electricity["transfer"].to_numpy())
X, y = (permuted.drop("transfer", axis=1), permuted["transfer"].to_numpy())
X_perm, y_perm
# Predict for each observation from N=100 to N=len(electricity)
for i in tqdm(range(100, len(electricity)), total=len(electricity)-100):
for method in ["NexCP+LS", "NexCP+WLS", "CP+LS"]:
for dataset in ["Electricity", "Permuted"]:
if dataset == "Electricity":
= (X, y)
X_model, y_model else:
= (X_perm, y_perm)
X_model, y_model if method == "NexCP+LS":
= lambda x: np.array([1.]*(x + 1))
tag_fn = lambda x: 0.99**np.arange(x, -1, -1)
weight_fn elif method == "NexCP+WLS":
= lambda x: 0.99**np.arange(x, -1, -1)
tag_fn = tag_fn
weight_fn else:
= lambda x: np.array([1.]*(x + 1))
tag_fn = tag_fn
weight_fn = nexcp_split(
out =sm.WLS,
model=split_fn,
split_function=y_model,
y=X_model,
X=tag_fn,
tag_function=weight_fn,
weight_function=0.1,
alpha=i
test_index
)"method"] = method
out["dataset"] = dataset
out["index"] = i
out[del out["ci"]
results.append(out)
Plots
= plot_rolling_coverage(results, alpha=0.1, window=300)
coverage_plot
coverage_plot.show()
= plot_rolling_width(results, window=300)
width_plot width_plot.show()
Table
= (
table
pd
.DataFrame(results)"method", "dataset"])
.groupby([
.mean()
.reset_index()
)= (
table
table
.pivot_table(='method',
index='dataset',
columns=['covered', 'width']
values
)
)= [f'{col[0]}_{col[1].lower()}' for col in table.columns]
table.columns = table.reset_index()
table = (
table ="method")
GT(table, rowname_col
.tab_spanner(="Electricity data",
label=["covered_electricity", "width_electricity"]
columns
)
.tab_spanner(="Permuted electricity data",
label=["covered_permuted", "width_permuted"]
columns
)
.fmt_number(= [
columns "covered_electricity",
"width_electricity",
"covered_permuted",
"width_permuted"
],=3
decimals
)
.cols_label(= "Coverage",
covered_electricity = "Width",
width_electricity = "Coverage",
covered_permuted = "Width"
width_permuted
)
) table.show()
Electricity data | Permuted electricity data | |||
Coverage | Width | Coverage | Width | |
CP+LS | 0.859 | 0.558 | 0.895 | 0.628 |
NexCP+LS | 0.878 | 0.581 | 0.902 | 0.638 |
NexCP+WLS | 0.880 | 0.497 | 0.895 | 0.629 |
Simulated example
This demonstrates the conformal prediction algorithm in the following data settings: i.i.d. data, data generating process with changepoints, and data with distribution drift. In the paper they repeat this 200 times to smooth the estimates, but for computational purposes here I only repeated it 50 times.
= lambda x: np.sort(generator.choice(x, int(np.floor(x*0.3)), replace=False))
split_fn = []
results
# Predict for each observation from N=100 to N=len(electricity)
for i in tqdm(range(100, 2000), total=2000-100):
for rep in range(50):
for method in ["NexCP+LS", "NexCP+WLS", "CP+LS"]:
for dataset in ["setting_1", "setting_2", "setting_3"]:
if dataset == "setting_1":
= sim_data(2000, 4, setting=1)
X_model, y_model elif dataset == "setting_2":
= sim_data(2000, 4, setting=2)
X_model, y_model else:
= sim_data(2000, 4, setting=3)
X_model, y_model if method == "NexCP+LS":
= lambda x: np.array([1.]*(x + 1))
tag_fn = lambda x: 0.99**np.arange(x, -1, -1)
weight_fn elif method == "NexCP+WLS":
= lambda x: 0.99**np.arange(x, -1, -1)
tag_fn = tag_fn
weight_fn else:
= lambda x: np.array([1.]*(x + 1))
tag_fn = tag_fn
weight_fn = nexcp_split(
out =sm.WLS,
model=split_fn,
split_function=y_model,
y=X_model,
X=tag_fn,
tag_function=weight_fn,
weight_function=0.1,
alpha=i
test_index
)"method"] = method
out["dataset"] = dataset
out["index"] = i
out[del out["ci"]
results.append(out)
Plots
= plot_rolling_coverage(
coverage_plot
results,=0.1,
alpha=10,
window=3,
rows=True
repeated
)
coverage_plot.show()
= plot_rolling_width(results, window=10, rows=3, repeated=True)
width_plot width_plot.show()
Table
= (
table
pd
.DataFrame(results)"method", "dataset", "index"])
.groupby([
.mean()
.reset_index()=["index"], axis=1)
.drop(labels"method", "dataset"])
.groupby([
.mean()
.reset_index()
)= (
table
table
.pivot_table(="method",
index="dataset",
columns=["covered", "width"]
values
)
)= [f"{col[0]}_{col[1].lower()}" for col in table.columns]
table.columns = table.reset_index()
table = (
table ="method")
GT(table, rowname_col
.tab_spanner(="Setting 1 (i.i.d. data)",
label=["covered_setting_1", "width_setting_1"]
columns
)
.tab_spanner(="Setting 2 (changepoints)",
label=["covered_setting_2", "width_setting_2"]
columns
)
.tab_spanner(="Setting 3 (drift)",
label=["covered_setting_3", "width_setting_3"]
columns
)
.fmt_number(= [
columns "covered_setting_1",
"width_setting_1",
"covered_setting_2",
"width_setting_2",
"covered_setting_3",
"width_setting_3"
],=3
decimals
)
.cols_label(= "Coverage",
covered_setting_1 = "Width",
width_setting_1 = "Coverage",
covered_setting_2 = "Width",
width_setting_2 = "Coverage",
covered_setting_3 = "Width"
width_setting_3
)
) table.show()
Setting 1 (i.i.d. data) | Setting 2 (changepoints) | Setting 3 (drift) | ||||
Coverage | Width | Coverage | Width | Coverage | Width | |
CP+LS | 0.899 | 3.325 | 0.832 | 6.022 | 0.836 | 3.754 |
NexCP+LS | 0.898 | 3.324 | 0.874 | 6.692 | 0.880 | 4.208 |
NexCP+WLS | 0.897 | 3.403 | 0.896 | 4.114 | 0.897 | 3.440 |