PipelineGuard#

class scikitplot.corpus.PipelineGuard(policy=None, *, dedup=True, checkpoint_path=None, checkpoint_every=500, max_retries=3, retry_delay=1.0)[source]#

Wrap any document stream with resilience, deduplication, and checkpointing.

PipelineGuard is a thin, composable layer you place around any DocumentReader.get_documents call (or any Iterable[CorpusDocument]) to get:

  • Error isolation — per-document failures are handled according to ErrorPolicy instead of crashing the whole pipeline.

  • Content deduplication — documents with identical content_hash are dropped after the first occurrence.

  • Checkpoint / resume — progress is periodically saved to a JSONL file so that a failed pipeline can resume from the last safe point.

  • Retry with back-off — transient errors (I/O, network) are retried up to max_retries times with exponential back-off.

Parameters:
policyErrorPolicy, optional

How to handle per-document exceptions. Default: LOG (log and skip).

dedupbool, optional

Drop documents with duplicate content_hash. Default: True.

checkpoint_pathpathlib.Path or None, optional

Path to a JSONL file for checkpoint/resume. When set, every checkpoint_every documents are written; on restart, already-seen doc_id values are skipped. Default: None (no checkpoint).

checkpoint_everyint, optional

Flush checkpoint every N yielded documents. Default: 500.

max_retriesint, optional

Maximum retry attempts for RETRY policy. Default: 3.

retry_delayfloat, optional

Initial back-off in seconds between retries (doubles each attempt). Default: 1.0.

Parameters:

Notes

Zero-dependency design: PipelineGuard uses only pathlib, json, time, and hashlib from the stdlib. It does not require the corpus schema module at import time (it reads content_hash and doc_id as plain attributes).

Thread safety: Not thread-safe. Use one guard per thread when processing sources in parallel.

Examples

Basic error isolation — skip broken documents:

>>> guard = PipelineGuard(policy=ErrorPolicy.SKIP)
>>> docs = list(guard.iter(reader.get_documents()))

Full pipeline with dedup and checkpoint:

>>> from pathlib import Path
>>> guard = PipelineGuard(
...     policy=ErrorPolicy.LOG,
...     dedup=True,
...     checkpoint_path=Path("corpus.ckpt.jsonl"),
...     checkpoint_every=200,
... )
>>> for doc in guard.iter(reader.get_documents()):
...     process(doc)
>>> guard.close()

Context manager (auto-close):

>>> with PipelineGuard(checkpoint_path=Path("run.ckpt")) as guard:
...     docs = list(guard.iter(reader.get_documents()))

Wrap _MultiSourceReader:

>>> reader = DocumentReader.create(Path("a.mp3"), Path("b.pdf"))
>>> guard = PipelineGuard(policy=ErrorPolicy.SKIP)
>>> docs = list(guard.iter(reader.get_documents()))
close()[source]#

Flush and close the checkpoint file handle.

Notes

Called automatically when used as a context manager. Call manually when using iter without with.

Return type:

None

iter(source)[source]#

Iterate source with resilience, dedup, and checkpoint.

Parameters:
sourceIterable[CorpusDocument]

Any document iterable — typically reader.get_documents(), a list, or another iter() call.

Yields:
CorpusDocument

Documents that passed dedup and error policy filters.

Parameters:

source (Any)

Return type:

Generator[Any, None, None]

Notes

The guard opens the checkpoint file lazily on the first call to iter. Call close (or use as context manager) to ensure the file is flushed and closed.

property stats: dict[str, int]#

yielded, skipped (dedup), errors.

Type:

Runtime statistics