CorpusPipeline#
- class scikitplot.corpus.CorpusPipeline(chunker=None, filter_=None, embedding_engine=None, output_dir=None, export_format=ExportFormat.CSV, normalizer=None, enricher=None, default_language=None, progress_callback=None, reader_kwargs=None)[source]#
Orchestrates the full corpus ingestion pipeline.
Instantiate once, then call
run(single file),run_batch(multiple files), orrun_url(URL source) any number of times. The pipeline is stateless between calls; all configuration is set at construction time.- Parameters:
- chunkerChunkerBase or None, optional
Chunker to inject into every reader.
Noneyields oneCorpusDocumentper raw chunk. Default:None.- filter_FilterBase or None, optional
Filter applied after chunking.
NoneusesDefaultFilter. Default:None.- embedding_engineEmbeddingEngine or None, optional
When provided, documents are embedded in batches after chunking/filtering. Embeddings are stored in
embedding. Default:None(no embedding).- output_dirpathlib.Path or None, optional
Directory where exported files are written. When
None, export is skipped unlessoutput_pathis supplied explicitly in aruncall. Default:None.- export_formatExportFormat or None, optional
Default export format. Individual
runcalls can override. Default:CSV.- default_languagestr or None, optional
ISO 639-1 language code applied to all documents when the reader cannot detect language. Default:
None.- progress_callbackcallable or None, optional
Called after each batch of documents is processed. Signature:
(source: str, n_done: int, n_total_estimate: int) → None.n_total_estimateis-1when the total is unknown. Default:None.- normalizerTextNormalizer or None, optional
When provided,
normalized_textis populated on every document after chunking/filtering and before embedding. Insert between the filter and embedding stages to clean OCR noise, collapsed whitespace, ligatures, and other artefacts. Default:None(skip).- enricherNLPEnricher or None, optional
When provided, NLP enrichment fields (
tokens,lemmas,stems,keywords, and optional metadata such aspos_tags,ner_entities,sentence_count,char_count,type_token_ratio,token_scores) are populated on every document after normalisation and before embedding. Supports 200+ world languages via thelanguageparameter ofEnricherConfig. Default:None(skip).- default_languagestr or list[str] or None, optional
ISO 639-1 language code (or list of codes, or
None) applied to all documents when the reader cannot detect language. Accepts ISO 639-1 two-letter codes ("en","ar"), NLTK names ("english","arabic"), lists (["en", "ar"]), orNone(auto-detect per document viadetect_script). Forwarded to the reader; the enricher uses its ownlanguageconfig when set. Default:None.- reader_kwargsdict or None, optional
Extra keyword arguments forwarded to every reader constructed by this pipeline — both
create(used byrunandrun_batch) andfrom_url(used byrun_url). Default:None.Audio / video URL transcription — forward Whisper kwargs directly so
run_urlon an.mp3URL transcribes it:pipeline = CorpusPipeline( reader_kwargs={ "transcribe": True, "whisper_model": "small", # "tiny" / "base" / "medium" / "large" }, ) result = pipeline.run_url("https://archive.org/details/.../episode.mp3")
ZIP archive with per-extension overrides — when the source is a
.zipfile,reader_kwargsis forwarded toZipReader. Pass a nested"reader_kwargs"key to control individual member types:pipeline = CorpusPipeline( reader_kwargs={ "reader_kwargs": { ".mp3": {"transcribe": True, "whisper_model": "small"}, ".jpg": {"backend": "easyocr"}, }, }, ) result = pipeline.run(Path("WHO-EURO-2025.zip"))
Single-type files — for a pipeline that only processes audio files (no ZIP), pass the kwargs flat:
pipeline = CorpusPipeline( reader_kwargs={"transcribe": True, "whisper_model": "base"}, ) result = pipeline.run(Path("podcast.mp3"))
- Attributes:
- chunkerChunkerBase or None
- filter_FilterBase or None
- embedding_engineEmbeddingEngine or None
- output_dirpathlib.Path or None
- export_formatExportFormat or None
- default_languagestr or None
- Parameters:
chunker (ChunkerBase | None)
filter_ (FilterBase | None)
embedding_engine (Any | None)
output_dir (pathlib.Path | None)
export_format (ExportFormat | None)
normalizer (TextNormalizer | None)
enricher (NLPEnricher | None)
See also
scikitplot.corpus._export.export_documentsLow-level export function.
scikitplot.corpus._embeddings.EmbeddingEngineEmbedding backend.
Notes
Thread safety:
CorpusPipelineis not thread-safe. Run one instance per thread, or userun_batch(which processes files sequentially, not in parallel).Embedding and caching: When
embedding_engineis provided, embeddings are cached to disk using the source file path and mtime as the cache key. URL sources disable caching (no stable mtime).Examples
Basic single-file run:
>>> from pathlib import Path >>> from scikitplot.corpus._pipeline import CorpusPipeline >>> from scikitplot.corpus._chunkers import SentenceChunker >>> pipeline = CorpusPipeline( ... chunker=SentenceChunker("en_core_web_sm"), ... output_dir=Path("output/"), ... ) >>> result = pipeline.run(Path("corpus.txt")) >>> print(result)
Batch processing with embeddings:
>>> from scikitplot.corpus._embeddings import EmbeddingEngine >>> engine = EmbeddingEngine(backend="sentence_transformers") >>> pipeline = CorpusPipeline( ... chunker=SentenceChunker("en_core_web_sm"), ... embedding_engine=engine, ... output_dir=Path("output/"), ... export_format=ExportFormat.PARQUET, ... ) >>> results = pipeline.run_batch(list(Path("corpus/").glob("*.txt")))
URL ingestion:
>>> result = pipeline.run_url("https://en.wikipedia.org/wiki/Python")
Audio URL transcription via
reader_kwargs:>>> pipeline = CorpusPipeline( ... reader_kwargs={"transcribe": True, "whisper_model": "small"}, ... output_dir=Path("output/"), ... ) >>> result = pipeline.run_url( ... "https://archive.org/details/tale_two_cities_librivox/" ... "tale_of_two_cities_01_dickens.mp3" ... )
ZIP archive with per-extension kwargs:
>>> pipeline = CorpusPipeline( ... reader_kwargs={ ... "reader_kwargs": { ... ".mp3": {"transcribe": True, "whisper_model": "small"}, ... ".jpg": {"backend": "easyocr"}, ... }, ... }, ... output_dir=Path("output/"), ... ) >>> result = pipeline.run(Path("WHO-EURO-2025.zip"))
- run(input_file, *, output_path=None, export_format=None, filename_override=None)[source]#
Process a single source and return a
PipelineResult.Accepts a local file path or an
http(s)://URL string. URL detection is performed before anypathlib.Pathconversion, so passing a URL string routes correctly to the web/YouTube/audio reader rather than crashing with a “file not found” error.- Parameters:
- input_filepathlib.Path or str
Path to a local file or an
http(s)://URL string. Astrthat starts withhttp://orhttps://(case-insensitive) is treated as a URL and routed throughfrom_url; all other values are treated as local file paths and dispatched by extension via the reader registry.- output_pathpathlib.Path or None, optional
Explicit output file path. When
None, the path is derived fromoutput_dirand the input stem. If both areNone, export is skipped.- export_formatExportFormat or None, optional
Override the pipeline-level
export_formatfor this call.- filename_overridestr or None, optional
Override the
source_filelabel in generated documents. Ignored for URL sources.
- Returns:
- PipelineResult
Result summary including the document list.
- Raises:
- TypeError
If input_file is not a
strorpathlib.Path.- ValueError
If a local file path does not exist, or no reader is registered for the file extension.
- ValueError
If input_file is a URL string and the URL is invalid or cannot be resolved.
- Parameters:
output_path (Path | None)
export_format (ExportFormat | None)
filename_override (str | None)
- Return type:
See also
Examples
Local file:
>>> result = pipeline.run(Path("chapter01.txt")) >>> len(result.documents) 312
URL string — no separate
run_urlcall needed:>>> result = pipeline.run("https://en.wikipedia.org/wiki/Python") >>> result.source 'https://en.wikipedia.org/wiki/Python'
- run_batch(input_files, *, stop_on_error=False, export_format=None)[source]#
Process multiple sources sequentially.
Each item may be a local file path or an
http(s)://URL string. Mixed lists (some paths, some URLs) are fully supported. Each item is dispatched through_run_source, which tests for URL strings before anypathlib.Pathconversion so that URL strings are never silently mangled.- Parameters:
- input_fileslist of pathlib.Path or str
Sources to process in order. Each element may be:
a
pathlib.Pathorstrpointing to a local file, ora
strstarting withhttp://orhttps://(a URL).
Mixed lists are allowed:
[Path("paper.pdf"), "https://en.wikipedia.org/wiki/Python"].- stop_on_errorbool, optional
When
False(default), errors on individual sources are logged as warnings and processing continues. WhenTrue, the first error is re-raised immediately.- export_formatExportFormat or None, optional
Override the pipeline-level
export_formatfor all sources in this batch.
- Returns:
- list of PipelineResult
One result per successfully processed source, in input order. Failed sources (when
stop_on_error=False) are omitted from the list and logged at WARNING level.
- Raises:
- TypeError
If any element of input_files is not a
strorpathlib.Path.- ValueError
Re-raised from
_run_sourcewhenstop_on_error=Trueand a source fails.
- Parameters:
stop_on_error (bool)
export_format (ExportFormat | None)
- Return type:
See also
Examples
Local files only (original behaviour, unchanged):
>>> paths = list(Path("corpus/").glob("*.txt")) >>> results = pipeline.run_batch(paths) >>> total_docs = sum(r.n_documents for r in results)
Mixed files and URLs:
>>> results = pipeline.run_batch( ... [ ... Path("local_report.pdf"), ... "https://en.wikipedia.org/wiki/Python", ... "https://www.youtube.com/watch?v=dQw4w9WgXcQ", ... ] ... ) >>> [r.source for r in results] ['local_report.pdf', 'https://...', 'https://...']
- run_url(url, *, output_path=None, export_format=None, stop_on_error=False)[source]#
Process one URL or a list of URLs.
Accepts a single URL string or a list of URL strings. When a list is passed each URL is processed independently and a parallel list of
PipelineResultobjects is returned. The single-URL form returns a singlePipelineResult(backwards compatible).- Parameters:
- urlstr or list of str
One URL string or a list of URL strings. Every string must start with
http://orhttps://.- output_pathpathlib.Path or None, optional
Explicit output file path. Ignored when url is a list (each result derives its own path from the URL).
- export_formatExportFormat or None, optional
Override the pipeline-level
export_formatfor this call.- stop_on_errorbool, optional
When
Trueand url is a list, re-raise the first exception encountered instead of continuing. Has no effect for single-URL calls (exceptions always propagate).
- Returns:
- PipelineResult
When url is a
str.- list of PipelineResult
When url is a
list. Results are in the same order as url. Failed URLs (whenstop_on_error=False) are omitted from the list and logged at ERROR level.
- Raises:
- TypeError
If url is not a
strorlist.- ValueError
If any URL string does not start with
http://orhttps://.- ImportError
If
scikitplot.corpus._readershas not been imported yet.
- Parameters:
- Return type:
Examples
Single video:
>>> result = pipeline.run_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ") >>> isinstance(result, PipelineResult) True
List of URLs (returns list):
>>> results = pipeline.run_url( ... [ ... "https://www.youtube.com/@WHO/shorts", ... "https://www.youtube.com/@WHO/videos", ... ] ... ) >>> isinstance(results, list) True
Gallery examples#
corpus Knowledge and Information local .png with examples
corpus WHO European Region YouTube shorts with examples
corpus WHO European Region local .zip with examples