flexrag.text_process.pipeline 源代码
from flexrag.utils import TIME_METER
from .processor import PROCESSORS, Processor, TextUnit
TextProcessPipelineConfig = PROCESSORS.make_config(
allow_multiple=True, config_name="TextProcessPipelineConfig"
)
[文档]
class TextProcessPipeline:
def __init__(self, cfg: TextProcessPipelineConfig) -> None: # type: ignore
# load processors
self.processors: list[Processor] = PROCESSORS.load(cfg)
return
@TIME_METER("text_process_pipeline")
def __call__(self, text: str, return_detail: bool = False) -> str | TextUnit | None:
unit = TextUnit(content=text)
for processor in self.processors:
unit = processor(unit)
if not unit.reserved:
break
if return_detail:
return unit
return unit.content if unit.reserved else None
def __contains__(self, processor: Processor | str) -> bool:
if isinstance(processor, str):
return any(
isinstance(p, PROCESSORS[processor]["item"]) for p in self.processors
)
return processor in self.processors
def __getitem__(self, processor: str | int) -> Processor:
if isinstance(processor, int):
return self.processors[processor]
assert isinstance(processor, str), "str or int is required"
for p in self.processors:
if isinstance(p, PROCESSORS[processor]["item"]):
return p
raise KeyError(f"Processor {processor} not found in the pipeline")
def __repr__(self) -> str:
return f"Pipeline({[p.name for p in self.processors]})"