PipelineGuard#
- class scikitplot.corpus.PipelineGuard(policy=None, *, dedup=True, checkpoint_path=None, checkpoint_every=500, max_retries=3, retry_delay=1.0)[source]#
Wrap any document stream with resilience, deduplication, and checkpointing.
PipelineGuardis a thin, composable layer you place around anyDocumentReader.get_documentscall (or anyIterable[CorpusDocument]) to get:Error isolation — per-document failures are handled according to
ErrorPolicyinstead of crashing the whole pipeline.Content deduplication — documents with identical
content_hashare dropped after the first occurrence.Checkpoint / resume — progress is periodically saved to a JSONL file so that a failed pipeline can resume from the last safe point.
Retry with back-off — transient errors (I/O, network) are retried up to
max_retriestimes with exponential back-off.
- Parameters:
- policyErrorPolicy, optional
How to handle per-document exceptions. Default:
LOG(log and skip).- dedupbool, optional
Drop documents with duplicate
content_hash. Default:True.- checkpoint_pathpathlib.Path or None, optional
Path to a JSONL file for checkpoint/resume. When set, every
checkpoint_everydocuments are written; on restart, already-seendoc_idvalues are skipped. Default:None(no checkpoint).- checkpoint_everyint, optional
Flush checkpoint every N yielded documents. Default: 500.
- max_retriesint, optional
Maximum retry attempts for
RETRYpolicy. Default: 3.- retry_delayfloat, optional
Initial back-off in seconds between retries (doubles each attempt). Default: 1.0.
- Parameters:
policy (Any | None)
dedup (bool)
checkpoint_path (pathlib.Path | None)
checkpoint_every (int)
max_retries (int)
retry_delay (float)
Notes
Zero-dependency design:
PipelineGuarduses onlypathlib,json,time, andhashlibfrom the stdlib. It does not require the corpus schema module at import time (it readscontent_hashanddoc_idas plain attributes).Thread safety: Not thread-safe. Use one guard per thread when processing sources in parallel.
Examples
Basic error isolation — skip broken documents:
>>> guard = PipelineGuard(policy=ErrorPolicy.SKIP) >>> docs = list(guard.iter(reader.get_documents()))
Full pipeline with dedup and checkpoint:
>>> from pathlib import Path >>> guard = PipelineGuard( ... policy=ErrorPolicy.LOG, ... dedup=True, ... checkpoint_path=Path("corpus.ckpt.jsonl"), ... checkpoint_every=200, ... ) >>> for doc in guard.iter(reader.get_documents()): ... process(doc) >>> guard.close()
Context manager (auto-close):
>>> with PipelineGuard(checkpoint_path=Path("run.ckpt")) as guard: ... docs = list(guard.iter(reader.get_documents()))
Wrap
_MultiSourceReader:>>> reader = DocumentReader.create(Path("a.mp3"), Path("b.pdf")) >>> guard = PipelineGuard(policy=ErrorPolicy.SKIP) >>> docs = list(guard.iter(reader.get_documents()))
- close()[source]#
Flush and close the checkpoint file handle.
Notes
Called automatically when used as a context manager. Call manually when using
iterwithoutwith.- Return type:
None
- iter(source)[source]#
Iterate source with resilience, dedup, and checkpoint.
- Parameters:
- sourceIterable[CorpusDocument]
Any document iterable — typically
reader.get_documents(), a list, or anotheriter()call.
- Yields:
- CorpusDocument
Documents that passed dedup and error policy filters.
- Parameters:
source (Any)
- Return type:
Notes
The guard opens the checkpoint file lazily on the first call to
iter. Callclose(or use as context manager) to ensure the file is flushed and closed.