Source code for sycamore.transforms.partition

from abc import abstractmethod, ABC
import io
from typing import Any, Literal, Optional, Union

from bs4 import BeautifulSoup

from sycamore.functions import TextOverlapChunker, Chunker
from sycamore.functions import CharacterTokenizer, Tokenizer
from sycamore.data import BoundingBox, Document, Element, TableElement, Table
from sycamore.plan_nodes import Node
from sycamore.transforms.base import CompositeTransform
from sycamore.transforms.extract_table import TableExtractor
from sycamore.transforms.table_structure.extract import TableStructureExtractor
from sycamore.transforms.map import Map
from sycamore.utils.cache import Cache
from sycamore.utils.time_trace import timetrace
from sycamore.utils import choose_device
from sycamore.utils.aryn_config import ArynConfig
from sycamore.utils.bbox_sort import bbox_sort_document

from sycamore.transforms.detr_partitioner import (
    ARYN_DETR_MODEL,
    DEFAULT_ARYN_PARTITIONER_ADDRESS,
    DEFAULT_LOCAL_THRESHOLD,
)


class Partitioner(ABC):
    def __init__(self, device=None, batch_size=1):
        self.device = device
        self.batch_size = batch_size

    @abstractmethod
    def partition(self, document: Document) -> Document:
        pass


[docs] class UnstructuredPPTXPartitioner(Partitioner): """ UnstructuredPPTXPartitioner utilizes open-source Unstructured library to extract structured elements from unstructured PPTX files. Args: include_page_breaks: Whether to include page breaks as separate elements. strategy: The partitioning strategy to use ("auto" for automatic detection). infer_table_structure: Whether to infer table structures in the document. ocr_languages: The languages to use for OCR. Default is "eng" (English). max_partition_length: The maximum length of each partition (in characters). include_metadata: Whether to include metadata in the partitioned elements. Example: .. code-block:: python pptx_partitioner = UnstructuredPPTXPartitioner( include_page_breaks=False, include_metadata=True, include_slide_notes=False, chunking_strategy=None, **kwargs ) context = sycamore.init() pdf_docset = context.read.binary(paths, binary_format="pptx") .partition(partitioner=pptx_partitioner) """ @staticmethod def to_element(dict: dict[str, Any], element_index: Optional[int] = None) -> Element: text = dict.pop("text") if isinstance(text, str): binary = text.encode("utf-8") else: binary = text text = str(binary, "utf-8") element = Element() element.type = dict.pop("type", "unknown") element.binary_representation = binary element.text_representation = text element.properties.update(dict.pop("metadata")) element.properties.update(dict) element.element_index = element_index return element def __init__( self, include_page_breaks: bool = False, include_metadata: bool = True, include_slide_notes: bool = False, chunking_strategy: Optional[str] = None, **kwargs, ): super().__init__(device="cpu") self._include_page_breaks = include_page_breaks self._include_metadata = include_metadata self._include_slide_notes = include_slide_notes self._chunking_strategy = chunking_strategy self._kwargs = kwargs def partition(self, document: Document) -> Document: from unstructured.partition.pptx import partition_pptx binary_file = io.BytesIO(document.data["binary_representation"]) elements = partition_pptx( file=binary_file, include_page_breaks=self._include_page_breaks, include_metadata=self._include_metadata, include_slide_notes=self._include_slide_notes, chunking_strategy=self._chunking_strategy, **self._kwargs, ) # Here we convert unstructured.io elements into our elements and # append them as child elements to the document. document.elements = [self.to_element(element.to_dict(), i) for i, element in enumerate(elements)] del elements return document
[docs] class UnstructuredPdfPartitioner(Partitioner): """ UnstructuredPdfPartitioner utilizes open-source Unstructured library to extract structured elements from unstructured PDFs. Args: include_page_breaks: Whether to include page breaks as separate elements. strategy: The partitioning strategy to use ("auto" for automatic detection). infer_table_structure: Whether to infer table structures in the document. ocr_languages: The languages to use for OCR. Default is "eng" (English). max_partition_length: The maximum length of each partition (in characters). include_metadata: Whether to include metadata in the partitioned elements. retain_coordinates: Whether to keep the coordinates property from unstructured. Default is False. In either case, bbox will be popuplated. Example: .. code-block:: python pdf_partitioner = UnstructuredPdfPartitioner( include_page_breaks=True, strategy="auto", infer_table_structure=True, ocr_languages="eng", max_partition_length=2000, include_metadata=True, ) context = sycamore.init() pdf_docset = context.read.binary(paths, binary_format="pdf") .partition(partitioner=pdf_partitioner) """ def __init__( self, include_page_breaks: bool = False, strategy: str = "auto", infer_table_structure: bool = False, languages: list[str] = ["eng"], max_partition_length: Optional[int] = None, min_partition_length: Optional[int] = 500, include_metadata: bool = True, retain_coordinates: bool = False, ): super().__init__(device="cpu") self._include_page_breaks = include_page_breaks self._strategy = strategy self._infer_table_structure = infer_table_structure self._languages = languages self._max_partition_length = max_partition_length self._min_partition_length = min_partition_length self._include_metadata = include_metadata self._retain_coordinates = retain_coordinates @staticmethod def to_element(dict: dict[str, Any], element_index: Optional[int] = None, retain_coordinates=False) -> Element: text = dict.pop("text") if isinstance(text, str): binary = text.encode("utf-8") else: binary = text text = str(binary, "utf-8") element = Element() element.type = dict.pop("type", "unknown") element.element_index = element_index element.binary_representation = binary element.text_representation = text element.properties.update(dict.pop("metadata")) element.properties.update(dict) coordinates = element.properties.get("coordinates") if not retain_coordinates: element.properties.pop("coordinates") if coordinates is not None: x1 = coordinates.get("points")[0][0] / coordinates.get("layout_width") y1 = coordinates.get("points")[0][1] / coordinates.get("layout_height") x2 = coordinates.get("points")[2][0] / coordinates.get("layout_width") y2 = coordinates.get("points")[2][1] / coordinates.get("layout_height") element.bbox = BoundingBox(x1, y1, x2, y2) return element @timetrace("unstructuredPdf") def partition(self, document: Document) -> Document: from unstructured.partition.pdf import partition_pdf binary = io.BytesIO(document.data["binary_representation"]) try: elements = partition_pdf( file=binary, include_page_breaks=self._include_page_breaks, strategy=self._strategy, infer_table_structure=self._infer_table_structure, languages=self._languages, max_partition=self._max_partition_length, min_partition=self._min_partition_length, include_metadata=self._include_metadata, ) except Exception as e: path = document.properties["path"] raise RuntimeError(f"UnstructuredPartitioner Error processing {path}") from e # Here we convert unstructured.io elements into our elements and # set them as the child elements of the document. document.elements = [ self.to_element(ee.to_dict(), retain_coordinates=self._retain_coordinates, element_index=i) for i, ee in enumerate(elements) ] del elements bbox_sort_document(document) return document
[docs] class HtmlPartitioner(Partitioner): """ HtmlPartitioner processes HTML documents extracting structured content. Args: skip_headers_and_footers: Whether to skip headers and footers in the document. Default is True. extract_tables: Whether to extract tables from the HTML document. Default is False. text_chunker: The text chunking strategy to use for processing text content. tokenizer: The tokenizer to use for tokenizing text content. Example: .. code-block:: python html_partitioner = HtmlPartitioner( skip_headers_and_footers=True, extract_tables=True, text_chunker=TokenOverlapChunker(chunk_token_count=1000, chunk_overlap_token_count=100), tokenizer=CharacterTokenizer(), ) context = sycamore.init() pdf_docset = context.read.binary(paths, binary_format="html") .partition(partitioner=html_partitioner) """ def __init__( self, skip_headers_and_footers: bool = True, extract_tables: bool = False, text_chunker: Chunker = TextOverlapChunker(), tokenizer: Tokenizer = CharacterTokenizer(), ): super().__init__(device="cpu") self._skip_headers_and_footers = skip_headers_and_footers self._extract_tables = extract_tables self._text_chunker = text_chunker self._tokenizer = tokenizer @timetrace("beautSoup") def partition(self, document: Document) -> Document: raw_html = document.binary_representation if raw_html is None: raise RuntimeError("Attempting to partition invalid document where content=None") # note: if content is bytes, BeautifulSoup default to utf-8 encoding soup = BeautifulSoup(raw_html, "html.parser") # extract title titles = soup.find_all("title") title = document.doc_id if len(titles) > 0: title = titles[0].text.replace("\n", "").strip() document.properties["title"] = title # chunk text and create text elements elements = [] text = soup.get_text(separator=" ", strip=True) tokens = self._tokenizer.tokenize(text) chunks = self._text_chunker.chunk(tokens) for i, chunk in enumerate(chunks): content = "".join(chunk) element = Element() element.element_index = i element.type = "text" element.text_representation = content element.properties.update(document.properties) elements += [element] # extract tables last_element_index = len(chunks) if self._extract_tables: for table in soup.find_all("table"): # ignore nested tables if len(table.find_all("table")) > 0: continue table_object = Table.from_html(html_tag=table) table_element = TableElement(table=table_object) table_element.properties.update(document.properties) table_element.element_index = last_element_index elements.append(table_element) last_element_index += 1 document.elements = document.elements + elements return document def transform_transcript_elements(self, document: Document) -> Document: if not document.binary_representation: return document parts = document.binary_representation.decode().split("\n") if not parts: return document elements: list[Element] = [] start_time = "" speaker = "" end_time = "" text = "" for i in parts: if i == "": continue assert i.startswith("[") time_ix = i.find(" ") assert time_ix > 0 spk_ix = i.find(" ", time_ix + 1) assert spk_ix > 0 if start_time != "": end_time = i[0:time_ix] element = Element({"start_time": start_time, "end_time": end_time, "speaker": speaker, "text": text}) element.element_index = len(elements) elements.append(element) start_time = i[0:time_ix] speaker = i[time_ix:spk_ix] text = i[spk_ix:] if start_time != "": end_time = i[0:time_ix] element = Element({"start_time": start_time, "end_time": "N/A", "speaker": speaker, "text": text}) element.element_index = len(elements) elements.append(element) document.elements = elements return document
[docs] class ArynPartitioner(Partitioner): """ The ArynPartitioner uses an object recognition model to partition the document into structured elements. Args: model_name_or_path: The HuggingFace coordinates or model local path. Should be set to the default ARYN_DETR_MODEL unless you are testing a custom model. Ignored when local mode is false threshold: The threshold to use for accepting the model's predicted bounding boxes. When using Aryn DocParse, this defaults to "auto", where the service will automatically find the best predictions. You can override this or set it locally by specifying a numerical threshold between 0 and 1. A lower value will include more objects, but may have overlaps, while a higher value will reduce the number of overlaps, but may miss legitimate objects. use_ocr: Whether to use OCR to extract text from the PDF. If false, we will attempt to extract the text from the underlying PDF. default: False ocr_images: If set with use_ocr, will attempt to OCR regions of the document identified as images. default: False ocr_model: model to use for OCR. Choices are "easyocr", "paddle", "tesseract" and "legacy", which correspond to EasyOCR, PaddleOCR, and Tesseract respectively, with "legacy" being a combination of Tesseract for text and EasyOCR for tables. If you choose paddle make sure to install paddlepaddle or paddlepaddle-gpu depending on whether you have a CPU or GPU. Further details are found at: https://www.paddlepaddle.org.cn/documentation/docs/en/install/index_en.html. Note: this will be ignored for Aryn DocParse, which uses its own OCR implementation. default: "easyocr" per_element_ocr: If true, will run OCR on each element individually instead of the entire page. Note: this will be ignored for Aryn DocParse, which uses its own OCR implementation. default: True extract_table_structure: If true, runs a separate table extraction model to extract cells from regions of the document identified as tables. table_structure_extractor: The table extraction implementaion to use when extract_table_structure is True. The default is the TableTransformerStructureExtractor. Ignored when local mode is false. table_extractor_options: Dictionary of options that are sent to the TableExtractor implementation. Currently supports union_tokens, which is a boolean that controls whether to union OCR / PDFMiner tokens in the table cells. default: {"union_tokens": False} extract_images: If true, crops each region identified as an image and attaches it to the associated ImageElement. This can later be fed into the SummarizeImages transform. default: False device: Device on which to run the partitioning model locally. One of 'cpu', 'cuda', and 'mps'. If not set, Sycamore will choose based on what's available. If running remotely, this doesn't matter. batch_size: How many pages to partition at once, when running locally. Default is 1. Ignored when running remotely. local: If false, runs the partitioner remotely. Defaults to false aryn_api_key: The account token used to authenticate with Aryn's servers. aryn_partitioner_address: The address of the server to use to partition the document use_cache: Cache results from the partitioner for faster inferences on the same documents in future runs. default: False pages_per_call: Number of pages to send in a single call to the remote service. Default is -1, which means send all pages in one call. output_format: controls output representation: json (default) or markdown. text_extraction_options: Dict of options that are sent to the TextExtractor implementation, either pdfminer or OCR. Currently supports the 'object_type' property for pdfminer, which can be set to 'boxes' or 'lines' to control the granularity of output. source: The application that is using the partitioner. This is used for logging purposes. output_label_options: A dictionary for configuring output label behavior. It supports two options: promote_title, a boolean specifying whether to pick the largest element by font size on the first page from among the elements on that page that have one of the types specified in title_candidate_elements and promote it to type "Title" if there is no element on the first page of type "Title" already. title_candidate_elements, a list of strings representing the label types allowed to be promoted to a title. Here is an example set of output label options: {"promote_title": True, "title_candidate_elements": ["Section-header", "Caption"]} default: None (no element is promoted to "Title") Example: The following shows an example of using the ArynPartitioner to partition a PDF and extract both table structure and image .. code-block:: python context = scyamore.init() partitioner = ArynPartitioner(local=True, extract_table_structure=True, extract_images=True) context.read.binary(paths, binary_format="pdf")\ .partition(partitioner=partitioner) """ def __init__( self, model_name_or_path=ARYN_DETR_MODEL, threshold: Optional[Union[float, Literal["auto"]]] = None, use_ocr: bool = False, ocr_images: bool = False, ocr_model: str = "easyocr", per_element_ocr: bool = True, extract_table_structure: bool = False, table_structure_extractor: Optional[TableStructureExtractor] = None, table_extractor_options: dict[str, Any] = {}, extract_images: bool = False, device=None, batch_size: int = 1, use_partitioning_service: bool = True, aryn_api_key: str = "", aryn_partitioner_address: str = DEFAULT_ARYN_PARTITIONER_ADDRESS, use_cache=False, pages_per_call: int = -1, cache: Optional[Cache] = None, output_format: Optional[str] = None, text_extraction_options: dict[str, Any] = {}, source: str = "", output_label_options: dict[str, Any] = {}, ): if use_partitioning_service: device = "cpu" else: device = choose_device(device) super().__init__(device=device, batch_size=batch_size) if not aryn_api_key: self._aryn_api_key = ArynConfig.get_aryn_api_key() else: self._aryn_api_key = aryn_api_key self._model_name_or_path = model_name_or_path self._device = device if threshold is None: if use_partitioning_service: self._threshold: Union[float, Literal["auto"]] = "auto" else: self._threshold = DEFAULT_LOCAL_THRESHOLD else: if not isinstance(threshold, float) and not use_partitioning_service: raise ValueError("Auto threshold is only supported with Aryn DocParse.") self._threshold = threshold self._use_ocr = use_ocr self._ocr_images = ocr_images self._ocr_model = ocr_model self._per_element_ocr = per_element_ocr self._extract_table_structure = extract_table_structure self._table_structure_extractor = table_structure_extractor self._table_extractor_options = table_extractor_options self._extract_images = extract_images self._output_format = output_format self._batch_size = batch_size self._use_partitioning_service = use_partitioning_service self._aryn_partitioner_address = aryn_partitioner_address self._use_cache = use_cache self._cache = cache self._pages_per_call = pages_per_call self._text_extraction_options = text_extraction_options self._source = source self.output_label_options = output_label_options @timetrace("SycamorePdf") def partition(self, document: Document) -> Document: binary = io.BytesIO(document.data["binary_representation"]) from sycamore.transforms.detr_partitioner import ArynPDFPartitioner partitioner = ArynPDFPartitioner(self._model_name_or_path, device=self._device, cache=self._cache) try: elements = partitioner.partition_pdf( binary, self._threshold, use_ocr=self._use_ocr, ocr_images=self._ocr_images, per_element_ocr=self._per_element_ocr, ocr_model=self._ocr_model, extract_table_structure=self._extract_table_structure, table_structure_extractor=self._table_structure_extractor, table_extractor_options=self._table_extractor_options, extract_images=self._extract_images, batch_size=self._batch_size, use_partitioning_service=self._use_partitioning_service, aryn_api_key=self._aryn_api_key, aryn_partitioner_address=self._aryn_partitioner_address, use_cache=self._use_cache, pages_per_call=self._pages_per_call, output_format=self._output_format, text_extraction_options=self._text_extraction_options, source=self._source, output_label_options=self.output_label_options, ) except Exception as e: path = document.properties["path"] raise RuntimeError(f"ArynPartitioner Error processing {path}") from e document.elements = elements bbox_sort_document(document) return document
[docs] class SycamorePartitioner(ArynPartitioner): """ The SycamorePartitioner is equivalent to the ArynPartitioner, except that it only runs locally. This class mostly exists for backwards compatibility with scripts written before the remote partitioning service existed. Please use `ArynPartitioner` instead. """ def __init__( self, model_name_or_path=ARYN_DETR_MODEL, threshold: float = 0.4, use_ocr=False, ocr_images=False, ocr_tables=False, extract_table_structure=False, table_structure_extractor=None, extract_images=False, device=None, batch_size: int = 1, ): device = choose_device(device) super().__init__( model_name_or_path=model_name_or_path, threshold=threshold, use_ocr=use_ocr, ocr_images=ocr_images, extract_table_structure=extract_table_structure, extract_images=extract_images, device=device, batch_size=batch_size, use_partitioning_service=False, )
[docs] class Partition(CompositeTransform): """ The Partition transform segments documents into elements. For example, a typical partitioner might chunk a document into elements corresponding to paragraphs, images, and tables. Partitioners are format specific, so for instance for HTML you can use the HtmlPartitioner and for PDFs, we provide the UnstructuredPdfPartitioner, which utilizes the unstructured open-source library. Args: child: The source node or component that provides the dataset to be embedded. partitioner: An instance of a Partitioner class to be applied resource_args: Additional resource-related arguments that can be passed to the Partition operation. Example: .. code-block:: python source_node = ... # Define a source node or component that provides a dataset. custom_partitioner = MyPartitioner(partitioner_params) partition_transform = Partition(child=source_node, partitioner=custom_partitioner) partitioned_dataset = partition_transform.execute() """ def __init__( self, child: Node, partitioner: Partitioner, table_extractor: Optional[TableExtractor] = None, **resource_args ): ops = [] if isinstance(partitioner, ArynPartitioner) and partitioner._use_partitioning_service: resource_args["parallelism"] = 1 if partitioner.device == "cuda": if "num_gpus" not in resource_args: resource_args["num_gpus"] = 1.0 assert resource_args["num_gpus"] >= 0 if "parallelism" not in resource_args: resource_args["parallelism"] = 1 if "batch_size" not in resource_args: resource_args["batch_size"] = partitioner.batch_size elif partitioner.device == "cpu": resource_args.pop("num_gpus", None) ops = [{**resource_args, "f": Map.wrap(partitioner.partition)}] if table_extractor is not None: ops.append({"f": Map.wrap(table_extractor.extract_tables)}) # Note: we are not applying resource args to the entire composite operation just the first step because that # matches with the original code. It is unclear if this is the correct behavior. super().__init__(child, ops)