DocSetWriter

class sycamore.writer.DocSetWriter(context: Context, plan: Node)[source]

Contains interfaces for writing to external storage systems, most notably OpenSearch.

Users should not instantiate this class directly, but instead access an instance using sycamore.docset.DocSet.write()

duckdb(dimensions: int, db_url: str = 'tmp.db', table_name: str | None = None, batch_size: int | None = None, schema: dict[str, str] | None = None, execute: bool = True, **kwargs)[source]

Writes the content of the DocSet into a DuckDB database.

Parameters:
  • dimensions -- The dimensions of the embeddings of each vector (required paramater)

  • db_url -- The URL of the DuckDB database.

  • table_name -- The table name to write the data to when possible

  • batch_size -- The file batch size when loading entries into the DuckDB database table

  • schema -- Defines the schema of the table to enter entries

  • execute -- Flag that determines whether to execute immediately

Example

The following shows how to read a pdf dataset into a DocSet and write it out to a DuckDB database and read from it.

elasticsearch(*, url: str, index_name: str, es_client_args: dict = {}, wait_for_completion: str = 'false', settings: dict | None = None, mappings: dict | None = None, execute: bool = True, **kwargs) DocSet | None[source]

Writes the content of the DocSet into the specified Elasticsearch index.

Parameters:
  • url -- Connection endpoint for the Elasticsearch instance. Note that this must be paired with the necessary client arguments below

  • index_name -- Index name to write to in the Elasticsearch instance

  • es_client_args -- Authentication arguments to be specified (if needed). See more information at https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/elasticsearch.html

  • wait_for_completion -- Whether to wait for completion of the write before proceeding with next steps. See more information at https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html

  • mappings -- Mapping of the Elasticsearch index, can be optionally specified

  • settings -- Settings of the Elasticsearch index, can be optionally specified

  • execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local Elasticsearch index called test-index.

url = "http://localhost:9200"
index_name = "test-index"
model_name = "sentence-transformers/all-MiniLM-L6-v2"
paths = str(TEST_DIR / "resources/data/pdfs/")

OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)

ctx = sycamore.init()

ds = (
    ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())
    .regex_replace(COALESCE_WHITESPACE)
    .mark_bbox_preset(tokenizer=tokenizer)
    .merge(merger=MarkedMerger())
    .spread_properties(["path"])
    .split_elements(tokenizer=tokenizer, max_tokens=512)
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .sketch(window=17)
)
ds.write.elasticsearch(url=url, index_name=index_name)
files(path: str, filesystem: ~pyarrow._fs.FileSystem | None = None, filename_fn: ~typing.Callable[[~sycamore.data.document.Document], str] = <function default_filename>, doc_to_bytes_fn: ~typing.Callable[[~sycamore.data.document.Document], bytes] = <function default_doc_to_bytes>, **resource_args) None[source]

Writes the content of each Document to a separate file.

Parameters:
  • path -- The path prefix to write to. Should include the scheme if not local.

  • filesystem -- The pyarrow.fs FileSystem to use.

  • filename_fn -- A function for generating a file name. Takes a Document and returns a unique name that will be appended to path.

  • doc_to_bytes_fn -- A function from a Document to bytes for generating the data to write. Defaults to using text_representation if available, or binary_representation if not.

  • resource_args -- Arguments to pass to the underlying execution environment.

json(path: str, filesystem: FileSystem | None = None, **resource_args) None[source]

Writes Documents in JSONL format to files, one file per block. Typically, a block corresponds to a single pre-explode source document.

Parameters:
  • path -- The path prefix to write to. Should include the scheme if not local.

  • filesystem -- The pyarrow.fs FileSystem to use.

  • resource_args -- Arguments to pass to the underlying execution environment.

opensearch(*, os_client_args: dict, index_name: str, index_settings: dict, insert_settings: dict | None = None, execute: bool = True, reliability_rewriter: bool = False, **kwargs) DocSet | None[source]

Writes the content of the DocSet into the specified OpenSearch index.

Parameters:
  • os_client_args -- Keyword parameters that are passed to the opensearch-py OpenSearch client constructor. See more information at https://opensearch.org/docs/latest/clients/python-low-level/

  • index_name -- The name of the OpenSearch index into which to load this DocSet.

  • index_settings -- Settings and mappings to pass when creating a new index. Specified as a Python dict corresponding to the JSON paramters taken by the OpenSearch CreateIndex API: https://opensearch.org/docs/latest/api-reference/index-apis/create-index/

  • insert_settings -- Settings to pass when inserting data into the index. Specified as a Python dict. Defaults to {"raise_on_error": False, "raise_on_exception": False, "chunk_size": 100, "thread_count": 3}

  • execute -- Execute the pipeline and write to opensearch on adding this operator. If false, will return a new docset with the write in the plan

  • kwargs -- Keyword arguments to pass to the underlying execution engine

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local OpenSearch index called my_index.

os_client_args = {
    "hosts": [{"host": "localhost", "port": 9200}],
    "http_auth": ("user", "password"),
}

index_settings = {
    "body": {
        "settings": {
            "index.knn": True,
        },
        "mappings": {
            "properties": {
                "embedding": {
                    "type": "knn_vector",
                    "dimension": 384,
                    "method": {"name": "hnsw", "engine": "faiss"},
                },
            },
        },
    },
}

context = sycamore.init()
pdf_docset = context.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())

pdf.write.opensearch(
     os_client_args=os_client_args,
     index_name="my_index",
     index_settings=index_settings)
pinecone(*, index_name: str, index_spec: Any | None = None, namespace: str = '', dimensions: int | None = None, distance_metric: str = 'cosine', api_key: str | None = None, execute: bool = True, log: bool = False, **kwargs) DocSet | None[source]

Writes the content of the DocSet into a Pinecone vector index.

Parameters:
  • index_name -- Name of the pinecone index to ingest into

  • index_spec -- Cloud parameters needed by pinecone to create your index. See https://docs.pinecone.io/guides/indexes/create-an-index Defaults to None, which assumes the index already exists, and will not modify an existing index if provided

  • namespace -- Namespace withing the pinecone index to ingest into. See https://docs.pinecone.io/guides/indexes/use-namespaces Defaults to "", which is the default namespace

  • dimensions -- Dimensionality of dense vectors in your index. Defaults to None, which assumes the index already exists, and will not modify an existing index if provided.

  • distance_metric -- Distance metric used for nearest-neighbor search in your index. Defaults to "cosine", but will not modify an already-existing index

  • api_key -- Pinecone service API Key. Defaults to None (will use the environment variable PINECONE_API_KEY).

  • kwargs -- Arguments to pass to the underlying execution engine

Example

The following shows how to read a pdf dataset into a DocSet and write it out to a pinecone index called "mytestingindex"

model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
    ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True))
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .term_frequency(tokenizer=tokenizer, with_token_ids=True)
    .sketch(window=17)
)

ds.write.pinecone(
    index_name="mytestingindex",
    index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"),
    namespace="",
    dimensions=384,
    distance_metric="dotproduct",
)
qdrant(client_params: dict, collection_params: dict, vector_name: str | None = None, execute: bool = True, **kwargs) DocSet | None[source]

Writes the content of the DocSet into a Qdrant collection

Parameters:

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a Qdrant collection called "sycamore_collection".

weaviate(*, wv_client_args: dict, collection_name: str, collection_config: dict[str, Any] | None = None, flatten_properties: bool = False, execute: bool = True, **kwargs) DocSet | None[source]

Writes the content of the DocSet into the specified Weaviate collection.

Parameters:
  • wv_client_args -- Keyword parameters that are passed to the weaviate client constructor. See more information at https://weaviate.io/developers/weaviate/client-libraries/python#python-client-v4-explicit-connection

  • collection_name -- The name of the Weaviate collection into which to load this DocSet.

  • collection_config -- Keyword parameters that are passed to the weaviate client's collections.create() method.If not provided, Weaviate will Auto-Schematize the incoming records, which may lead to inconsistencies or failures. See more information at https://weaviate.io/developers/weaviate/manage-data/collections#create-a-collection-and-define-properties

  • flatten_properties -- Whether to flatten documents into pure key-value pairs or to allow nested structures. Default is False (allow nested structures)

  • execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True

  • kwargs -- Arguments to pass to the underlying execution engine

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local Weaviate collection called DemoCollection.

collection = "DemoCollection"
wv_client_args = {
    "connection_params": ConnectionParams.from_params(
        http_host="localhost",
        http_port=8080,
        http_secure=False,
        grpc_host="localhost",
        grpc_port=50051,
        grpc_secure=False,
    )
}

# Weaviate will assume empty arrays are empty arrays of text, so it
# will throw errors when you try to make an array of non-text in a
# field that some records have empty. => We specify them here.
collection_config_params = {
    "name": collection,
    "description": "A collection to demo data-prep with sycamore",
    "properties": [
        Property(
            name="properties",
            data_type=DataType.OBJECT,
            nested_properties=[
                Property(
                    name="links",
                    data_type=DataType.OBJECT_ARRAY,
                    nested_properties=[
                        Property(name="text", data_type=DataType.TEXT),
                        Property(name="url", data_type=DataType.TEXT),
                        Property(name="start_index", data_type=DataType.NUMBER),
                    ],
                ),
            ],
        ),
        Property(name="bbox", data_type=DataType.NUMBER_ARRAY),
        Property(name="shingles", data_type=DataType.INT_ARRAY),
    ],
    "vectorizer_config": [Configure.NamedVectors.text2vec_transformers(name="embedding")],
    "references": [ReferenceProperty(name="parent", target_collection=collection)],
}

model_name = "sentence-transformers/all-MiniLM-L6-v2"
davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)

ctx = sycamore.init()

ds = ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())
    .regex_replace(COALESCE_WHITESPACE)
    .extract_entity(entity_extractor=OpenAIEntityExtractor(
            "title", llm=davinci_llm, prompt_template=title_template))
    .mark_bbox_preset(tokenizer=tokenizer)
    .merge(merger=MarkedMerger())
    .spread_properties(["path", "title"])
    .split_elements(tokenizer=tokenizer, max_tokens=512)
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .sketch(window=17)

ds.write.weaviate(
    wv_client_args=wv_client_args,
    collection_name=collection,
    collection_config=collection_config_params
)