CorpusPipeline#

class scikitplot.corpus.CorpusPipeline(chunker=None, filter_=None, embedding_engine=None, output_dir=None, export_format=ExportFormat.CSV, normalizer=None, enricher=None, default_language=None, progress_callback=None, reader_kwargs=None)[source]#

Orchestrates the full corpus ingestion pipeline.

Instantiate once, then call run (single file), run_batch (multiple files), or run_url (URL source) any number of times. The pipeline is stateless between calls; all configuration is set at construction time.

Parameters:
chunkerChunkerBase or None, optional

Chunker to inject into every reader. None yields one CorpusDocument per raw chunk. Default: None.

filter_FilterBase or None, optional

Filter applied after chunking. None uses DefaultFilter. Default: None.

embedding_engineEmbeddingEngine or None, optional

When provided, documents are embedded in batches after chunking/filtering. Embeddings are stored in embedding. Default: None (no embedding).

output_dirpathlib.Path or None, optional

Directory where exported files are written. When None, export is skipped unless output_path is supplied explicitly in a run call. Default: None.

export_formatExportFormat or None, optional

Default export format. Individual run calls can override. Default: CSV.

default_languagestr or None, optional

ISO 639-1 language code applied to all documents when the reader cannot detect language. Default: None.

progress_callbackcallable or None, optional

Called after each batch of documents is processed. Signature: (source: str, n_done: int, n_total_estimate: int) None. n_total_estimate is -1 when the total is unknown. Default: None.

normalizerTextNormalizer or None, optional

When provided, normalized_text is populated on every document after chunking/filtering and before embedding. Insert between the filter and embedding stages to clean OCR noise, collapsed whitespace, ligatures, and other artefacts. Default: None (skip).

enricherNLPEnricher or None, optional

When provided, NLP enrichment fields (tokens, lemmas, stems, keywords, and optional metadata such as pos_tags, ner_entities, sentence_count, char_count, type_token_ratio, token_scores) are populated on every document after normalisation and before embedding. Supports 200+ world languages via the language parameter of EnricherConfig. Default: None (skip).

default_languagestr or list[str] or None, optional

ISO 639-1 language code (or list of codes, or None) applied to all documents when the reader cannot detect language. Accepts ISO 639-1 two-letter codes ("en", "ar"), NLTK names ("english", "arabic"), lists (["en", "ar"]), or None (auto-detect per document via detect_script). Forwarded to the reader; the enricher uses its own language config when set. Default: None.

reader_kwargsdict or None, optional

Extra keyword arguments forwarded to every reader constructed by this pipeline — both create (used by run and run_batch) and from_url (used by run_url). Default: None.

Audio / video URL transcription — forward Whisper kwargs directly so run_url on an .mp3 URL transcribes it:

pipeline = CorpusPipeline(
    reader_kwargs={
        "transcribe": True,
        "whisper_model": "small",  # "tiny" / "base" / "medium" / "large"
    },
)
result = pipeline.run_url("https://archive.org/details/.../episode.mp3")

ZIP archive with per-extension overrides — when the source is a .zip file, reader_kwargs is forwarded to ZipReader. Pass a nested "reader_kwargs" key to control individual member types:

pipeline = CorpusPipeline(
    reader_kwargs={
        "reader_kwargs": {
            ".mp3": {"transcribe": True, "whisper_model": "small"},
            ".jpg": {"backend": "easyocr"},
        },
    },
)
result = pipeline.run(Path("WHO-EURO-2025.zip"))

Single-type files — for a pipeline that only processes audio files (no ZIP), pass the kwargs flat:

pipeline = CorpusPipeline(
    reader_kwargs={"transcribe": True, "whisper_model": "base"},
)
result = pipeline.run(Path("podcast.mp3"))
Attributes:
chunkerChunkerBase or None
filter_FilterBase or None
embedding_engineEmbeddingEngine or None
output_dirpathlib.Path or None
export_formatExportFormat or None
default_languagestr or None
Parameters:

See also

scikitplot.corpus._export.export_documents

Low-level export function.

scikitplot.corpus._embeddings.EmbeddingEngine

Embedding backend.

Notes

Thread safety: CorpusPipeline is not thread-safe. Run one instance per thread, or use run_batch (which processes files sequentially, not in parallel).

Embedding and caching: When embedding_engine is provided, embeddings are cached to disk using the source file path and mtime as the cache key. URL sources disable caching (no stable mtime).

Examples

Basic single-file run:

>>> from pathlib import Path
>>> from scikitplot.corpus._pipeline import CorpusPipeline
>>> from scikitplot.corpus._chunkers import SentenceChunker
>>> pipeline = CorpusPipeline(
...     chunker=SentenceChunker("en_core_web_sm"),
...     output_dir=Path("output/"),
... )
>>> result = pipeline.run(Path("corpus.txt"))
>>> print(result)

Batch processing with embeddings:

>>> from scikitplot.corpus._embeddings import EmbeddingEngine
>>> engine = EmbeddingEngine(backend="sentence_transformers")
>>> pipeline = CorpusPipeline(
...     chunker=SentenceChunker("en_core_web_sm"),
...     embedding_engine=engine,
...     output_dir=Path("output/"),
...     export_format=ExportFormat.PARQUET,
... )
>>> results = pipeline.run_batch(list(Path("corpus/").glob("*.txt")))

URL ingestion:

>>> result = pipeline.run_url("https://en.wikipedia.org/wiki/Python")

Audio URL transcription via reader_kwargs:

>>> pipeline = CorpusPipeline(
...     reader_kwargs={"transcribe": True, "whisper_model": "small"},
...     output_dir=Path("output/"),
... )
>>> result = pipeline.run_url(
...     "https://archive.org/details/tale_two_cities_librivox/"
...     "tale_of_two_cities_01_dickens.mp3"
... )

ZIP archive with per-extension kwargs:

>>> pipeline = CorpusPipeline(
...     reader_kwargs={
...         "reader_kwargs": {
...             ".mp3": {"transcribe": True, "whisper_model": "small"},
...             ".jpg": {"backend": "easyocr"},
...         },
...     },
...     output_dir=Path("output/"),
... )
>>> result = pipeline.run(Path("WHO-EURO-2025.zip"))
run(input_file, *, output_path=None, export_format=None, filename_override=None)[source]#

Process a single source and return a PipelineResult.

Accepts a local file path or an http(s):// URL string. URL detection is performed before any pathlib.Path conversion, so passing a URL string routes correctly to the web/YouTube/audio reader rather than crashing with a “file not found” error.

Parameters:
input_filepathlib.Path or str

Path to a local file or an http(s):// URL string. A str that starts with http:// or https:// (case-insensitive) is treated as a URL and routed through from_url; all other values are treated as local file paths and dispatched by extension via the reader registry.

output_pathpathlib.Path or None, optional

Explicit output file path. When None, the path is derived from output_dir and the input stem. If both are None, export is skipped.

export_formatExportFormat or None, optional

Override the pipeline-level export_format for this call.

filename_overridestr or None, optional

Override the source_file label in generated documents. Ignored for URL sources.

Returns:
PipelineResult

Result summary including the document list.

Raises:
TypeError

If input_file is not a str or pathlib.Path.

ValueError

If a local file path does not exist, or no reader is registered for the file extension.

ValueError

If input_file is a URL string and the URL is invalid or cannot be resolved.

Parameters:
Return type:

PipelineResult

See also

run_batch

Process multiple sources (files and/or URLs).

run_url

Process one or more URLs directly (legacy entry point).

Examples

Local file:

>>> result = pipeline.run(Path("chapter01.txt"))
>>> len(result.documents)
312

URL string — no separate run_url call needed:

>>> result = pipeline.run("https://en.wikipedia.org/wiki/Python")
>>> result.source
'https://en.wikipedia.org/wiki/Python'
run_batch(input_files, *, stop_on_error=False, export_format=None)[source]#

Process multiple sources sequentially.

Each item may be a local file path or an http(s):// URL string. Mixed lists (some paths, some URLs) are fully supported. Each item is dispatched through _run_source, which tests for URL strings before any pathlib.Path conversion so that URL strings are never silently mangled.

Parameters:
input_fileslist of pathlib.Path or str

Sources to process in order. Each element may be:

  • a pathlib.Path or str pointing to a local file, or

  • a str starting with http:// or https:// (a URL).

Mixed lists are allowed: [Path("paper.pdf"), "https://en.wikipedia.org/wiki/Python"].

stop_on_errorbool, optional

When False (default), errors on individual sources are logged as warnings and processing continues. When True, the first error is re-raised immediately.

export_formatExportFormat or None, optional

Override the pipeline-level export_format for all sources in this batch.

Returns:
list of PipelineResult

One result per successfully processed source, in input order. Failed sources (when stop_on_error=False) are omitted from the list and logged at WARNING level.

Raises:
TypeError

If any element of input_files is not a str or pathlib.Path.

ValueError

Re-raised from _run_source when stop_on_error=True and a source fails.

Parameters:
Return type:

list[PipelineResult]

See also

run

Process a single source (file or URL).

run_url

Process one or more URLs directly (legacy entry point).

Examples

Local files only (original behaviour, unchanged):

>>> paths = list(Path("corpus/").glob("*.txt"))
>>> results = pipeline.run_batch(paths)
>>> total_docs = sum(r.n_documents for r in results)

Mixed files and URLs:

>>> results = pipeline.run_batch(
...     [
...         Path("local_report.pdf"),
...         "https://en.wikipedia.org/wiki/Python",
...         "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
...     ]
... )
>>> [r.source for r in results]
['local_report.pdf', 'https://...', 'https://...']
run_url(url, *, output_path=None, export_format=None, stop_on_error=False)[source]#

Process one URL or a list of URLs.

Accepts a single URL string or a list of URL strings. When a list is passed each URL is processed independently and a parallel list of PipelineResult objects is returned. The single-URL form returns a single PipelineResult (backwards compatible).

Parameters:
urlstr or list of str

One URL string or a list of URL strings. Every string must start with http:// or https://.

output_pathpathlib.Path or None, optional

Explicit output file path. Ignored when url is a list (each result derives its own path from the URL).

export_formatExportFormat or None, optional

Override the pipeline-level export_format for this call.

stop_on_errorbool, optional

When True and url is a list, re-raise the first exception encountered instead of continuing. Has no effect for single-URL calls (exceptions always propagate).

Returns:
PipelineResult

When url is a str.

list of PipelineResult

When url is a list. Results are in the same order as url. Failed URLs (when stop_on_error=False) are omitted from the list and logged at ERROR level.

Raises:
TypeError

If url is not a str or list.

ValueError

If any URL string does not start with http:// or https://.

ImportError

If scikitplot.corpus._readers has not been imported yet.

Parameters:
Return type:

PipelineResult | list[PipelineResult]

Examples

Single video:

>>> result = pipeline.run_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
>>> isinstance(result, PipelineResult)
True

List of URLs (returns list):

>>> results = pipeline.run_url(
...     [
...         "https://www.youtube.com/@WHO/shorts",
...         "https://www.youtube.com/@WHO/videos",
...     ]
... )
>>> isinstance(results, list)
True