Skip to content

Backends

backends

Classes

ParquetBackend

ParquetBackend(
    file_dir: PathLike,
    partition_by: Sequence[str] | None = None,
    purge_existing: bool = False,
)

Bases: BaseBackend

Backend for storing and retrieving SHAP explanations using Parquet files.

Parameters:

Name Type Description Default
file_dir PathLike

Directory where Parquet files will be stored.

required
partition_by Sequence[str]

List of columns to partition the data by (default is ("date",)). Other Parameters include: "batch_id", "model_version".

None
purge_existing bool

If True, existing files in the directory will be deleted (default is False).

False

Raises:

Type Description
ValueError

If invalid partition_by values are provided.

NotADirectoryError

If the provided file_dir is not a valid directory.

Source code in shapmonitor/backends/_parquet.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    file_dir: PathLike,
    partition_by: Sequence[str] | None = None,
    purge_existing: bool = False,
) -> None:
    self._file_dir = Path(file_dir)
    _logger.info("ParquetBackend initialized at: %s", self._file_dir)

    self.partition_by = partition_by or ["date"]

    invalid = set(self.partition_by) - _SUPPORTED_PARTITION_BY_VALUES
    if invalid:
        raise ValueError(
            f"Invalid partition_by value(s): {sorted(invalid)}. "
            f"Supported values: {sorted(_SUPPORTED_PARTITION_BY_VALUES)}."
        )

    if purge_existing and self._file_dir.exists():
        shutil.rmtree(self._file_dir)
        _logger.warning("Purged existing files in directory: %s", file_dir)

    self._file_dir.mkdir(parents=True, exist_ok=True)

    if not self._file_dir.is_dir():
        raise NotADirectoryError(
            f"{self._file_dir} is not a valid directory."
        )  # pragma: no cover
Attributes
file_dir property
file_dir: Path

Get the directory where Parquet files are stored.

Functions
read
read(
    start_dt: datetime | date | None = None,
    end_dt: datetime | date | None = None,
    batch_id: str | None = None,
    model_version: str | None = None,
) -> DFrameLike

Read explanations from Parquet files within a specified date range.

Parameters:

Name Type Description Default
start_dt datetime | date | None

Start datetime for filtering explanations.

None
end_dt datetime | date | None

End datetime for filtering explanations.

None
batch_id str

Batch ID to filter explanations.

None
model_version str

Model version to filter explanations.

None

Returns:

Type Description
DataFrame

A DataFrame containing the explanations within the specified range.

Raises:

Type Description
ValueError

If no filters are provided.

Source code in shapmonitor/backends/_parquet.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def read(
    self,
    start_dt: datetime | date | None = None,
    end_dt: datetime | date | None = None,
    batch_id: str | None = None,
    model_version: str | None = None,
) -> DFrameLike:
    """
    Read explanations from Parquet files within a specified date range.


    Parameters
    ----------
    start_dt: datetime
        Start datetime for filtering explanations.
    end_dt: datetime | None
        End datetime for filtering explanations.
    batch_id : str, optional
        Batch ID to filter explanations.
    model_version : str, optional
        Model version to filter explanations.

    Returns
    -------
    DataFrame
        A DataFrame containing the explanations within the specified range.

    Raises
    ------
    ValueError
        If no filters are provided.
    """
    filters = []

    if start_dt:
        start_date = start_dt.date().strftime("%Y-%m-%d")
        end_date = end_dt.date().strftime("%Y-%m-%d") if end_dt else start_date

        filters.append(("date", ">=", start_date))
        filters.append(("date", "<=", end_date))

    if batch_id:
        filters.append(("batch_id", "==", batch_id))
    if model_version:
        filters.append(("model_version", "==", model_version))

    if not filters:
        raise ValueError(
            "At least one filter (date range, batch_id, model_version) must be provided."
        )

    _logger.debug("Reading data with filters: %s", filters)

    try:
        return pd.read_parquet(self._file_dir, filters=filters)
    except FileNotFoundError:
        _logger.warning(
            "No Parquet files found in directory: %s with filters: %s",
            self._file_dir,
            filters,
        )
        return pd.DataFrame()
    except pyarrow.lib.ArrowInvalid as e:
        # Filter matched no rows - expected
        if "No match" in str(e) or "empty" in str(e).lower():
            _logger.debug("No data matched filters: %s", filters)
            return pd.DataFrame()
        raise
write
write(batch: ExplanationBatch) -> Path

Write a batch of explanations to a Parquet file.

Parameters:

Name Type Description Default
batch ExplanationBatch

The batch of explanations to write.

required

Returns:

Type Description
Path

The path to the written Parquet file.

Source code in shapmonitor/backends/_parquet.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def write(self, batch: ExplanationBatch) -> Path:
    """
    Write a batch of explanations to a Parquet file.

    Parameters
    ----------
    batch : ExplanationBatch
        The batch of explanations to write.

    Returns
    -------
    Path
        The path to the written Parquet file.
    """
    file_path = self._get_partition_path(batch)
    file_path.parent.mkdir(parents=True, exist_ok=True)

    df = batch.to_dataframe()

    # Drop partition columns to avoid type conflicts with Hive-style partitioning
    # (partition columns are reconstructed from directory names when reading)
    cols_to_drop = [col for col in self.partition_by if col in df.columns]
    if cols_to_drop:
        df = df.drop(columns=cols_to_drop)

    df.to_parquet(file_path, index=False)

    _logger.info("Wrote batch %s to %s", batch.batch_id, file_path)
    return file_path
delete
delete(cutoff_dt: datetime | date) -> int

Delete Parquet files containing explanations before a specified datetime.

Parameters:

Name Type Description Default
cutoff_dt datetime

Datetime before which files will be deleted.

required

Returns:

Type Description
int

Number of partitions deleted.

Source code in shapmonitor/backends/_parquet.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def delete(self, cutoff_dt: datetime | date) -> int:
    """
    Delete Parquet files containing explanations before a specified datetime.

    Parameters
    ----------
    cutoff_dt : datetime
        Datetime before which files will be deleted.

    Returns
    -------
    int
        Number of partitions deleted.
    """
    cutoff_date = cutoff_dt.date()
    deleted_count = 0

    # Find all date= partition directories (Hive-style)
    for date_dir in self.file_dir.rglob("date=*"):
        if not date_dir.is_dir():
            continue

        try:
            date_str = date_dir.name.split("=", 1)[1]
            partition_date = datetime.strptime(date_str, "%Y-%m-%d").date()
        except (ValueError, IndexError):
            _logger.debug("Skipping invalid date directory: %s", date_dir.name)
            continue

        if partition_date < cutoff_date:
            shutil.rmtree(date_dir)
            _logger.info("Deleted partition: %s", date_dir)
            deleted_count += 1

    return deleted_count

BaseBackend

Abstract base class for all backends.

Functions
read abstractmethod
read(start_dt: datetime, end_dt: datetime) -> DFrameLike

Read data from the backend.

Source code in shapmonitor/backends/_base.py
11
12
13
14
15
16
17
18
@abstractmethod
def read(
    self,
    start_dt: datetime,
    end_dt: datetime,
) -> DFrameLike:
    """Read data from the backend."""
    pass  # pragma: no cover
write abstractmethod
write(batch: ExplanationBatch) -> None

Write data to the backend.

Source code in shapmonitor/backends/_base.py
20
21
22
23
@abstractmethod
def write(self, batch: ExplanationBatch) -> None:
    """Write data to the backend."""
    pass  # pragma: no cover
delete abstractmethod
delete(cutoff_dt: datetime) -> None

Delete data before a certain datetime.

Source code in shapmonitor/backends/_base.py
25
26
27
28
@abstractmethod
def delete(self, cutoff_dt: datetime) -> None:
    """Delete data before a certain datetime."""
    pass  # pragma: no cover

BackendFactory

Factory class for creating backend instances.

Functions
get_backend classmethod
get_backend(
    backend_name: str, *args, **kwargs
) -> BaseBackend

Get an instance of the specified backend.

Parameters:

Name Type Description Default
backend_name str

Name of the backend to instantiate.

required
*args

Positional arguments to pass to the backend constructor.

()
**kwargs

Keyword arguments to pass to the backend constructor.

{}

Returns:

Type Description
BaseBackend

An instance of the requested backend.

Raises:

Type Description
ValueError

If the specified backend is not supported.

Source code in shapmonitor/backends/__init__.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@classmethod
def get_backend(cls, backend_name: str, *args, **kwargs) -> BaseBackend:
    """Get an instance of the specified backend.

    Parameters
    ----------
    backend_name : str
        Name of the backend to instantiate.
    *args
        Positional arguments to pass to the backend constructor.
    **kwargs
        Keyword arguments to pass to the backend constructor.

    Returns
    -------
    BaseBackend
        An instance of the requested backend.

    Raises
    ------
    ValueError
        If the specified backend is not supported.
    """
    if backend_name not in cls._backends:
        raise ValueError(f"Unsupported backend: {backend_name}")

    backend_class = cls._backends[backend_name]
    return backend_class(*args, **kwargs)