DocSetWriter#

class sycamore.writer.DocSetWriter(context: Context, plan: Node)[source]#

Contains interfaces for writing to external storage systems, most notably OpenSearch.

Users should not instantiate this class directly, but instead access an instance using sycamore.docset.DocSet.write()

duckdb(dimensions: int, db_url: str | None = None, table_name: str | None = None, batch_size: int | None = None, schema: dict[str, str] | None = None, execute: bool = True, **kwargs)[source]#

Writes the content of the DocSet into a DuckDB database.

Parameters:
  • dimensions -- The dimensions of the embeddings of each vector (required paramater)

  • db_url -- The URL of the DuckDB database.

  • table_name -- The table name to write the data to when possible

  • batch_size -- The file batch size when loading entries into the DuckDB database table

  • schema -- Defines the schema of the table to enter entries

  • execute -- Flag that determines whether to execute immediately

Example

The following shows how to read a pdf dataset into a DocSet and write it out to a DuckDB database and read from it.

elasticsearch(*, url: str, index_name: str, es_client_args: dict = {}, wait_for_completion: str = 'false', settings: dict | None = None, mappings: dict | None = None, execute: bool = True, **kwargs) DocSet | None[source]#

Writes the content of the DocSet into the specified Elasticsearch index.

Parameters:
  • url -- Connection endpoint for the Elasticsearch instance. Note that this must be paired with the necessary client arguments below

  • index_name -- Index name to write to in the Elasticsearch instance

  • es_client_args -- Authentication arguments to be specified (if needed). See more information at https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/elasticsearch.html

  • wait_for_completion -- Whether to wait for completion of the write before proceeding with next steps. See more information at https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html

  • mappings -- Mapping of the Elasticsearch index, can be optionally specified

  • settings -- Settings of the Elasticsearch index, can be optionally specified

  • execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local Elasticsearch index called test-index.

url = "http://localhost:9200"
index_name = "test-index"
model_name = "sentence-transformers/all-MiniLM-L6-v2"
paths = str(TEST_DIR / "resources/data/pdfs/")

OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)

ctx = sycamore.init()

ds = (
    ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())
    .regex_replace(COALESCE_WHITESPACE)
    .mark_bbox_preset(tokenizer=tokenizer)
    .merge(merger=MarkedMerger())
    .spread_properties(["path"])
    .split_elements(tokenizer=tokenizer, max_tokens=512)
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .sketch(window=17)
)
ds.write.elasticsearch(url=url, index_name=index_name)
files(path: str, filesystem: ~pyarrow._fs.FileSystem | None = None, filename_fn: ~typing.Callable[[~sycamore.data.document.Document], str] = <function default_filename>, doc_to_bytes_fn: ~typing.Callable[[~sycamore.data.document.Document], bytes] = <function default_doc_to_bytes>, **resource_args) None[source]#

Writes the content of each Document to a separate file.

Parameters:
  • path -- The path prefix to write to. Should include the scheme if not local.

  • filesystem -- The pyarrow.fs FileSystem to use.

  • filename_fn -- A function for generating a file name. Takes a Document and returns a unique name that will be appended to path.

  • doc_to_bytes_fn -- A function from a Document to bytes for generating the data to write. Defaults to using text_representation if available, or binary_representation if not.

  • resource_args -- Arguments to pass to the underlying execution environment.

json(path: str, filesystem: FileSystem | None = None, **resource_args) None[source]#

Writes Documents in JSONL format to files, one file per block. Typically, a block corresponds to a single pre-explode source document.

Parameters:
  • path -- The path prefix to write to. Should include the scheme if not local.

  • filesystem -- The pyarrow.fs FileSystem to use.

  • resource_args -- Arguments to pass to the underlying execution environment.

neo4j(uri: str, auth: tuple[Any, Any] | Auth | AuthManager | None, import_dir: str, database: str = 'neo4j', use_auradb: bool = False, s3_session: Session | None = None, **kwargs) DocSet | None[source]#

*EXPERIMENTAL*

Writes the content of the DocSet into the specified Neo4j database.

Parameters:
  • uri -- Connection endpoint for the neo4j instance. Note that this must be paired with the necessary client arguments below

  • auth -- Authentication arguments to be specified. See more information at https://neo4j.com/docs/api/python-driver/current/api.html#auth-ref

  • import_dir -- The import directory that neo4j uses. You can specify where to mount this volume when you launch your neo4j docker container.

  • database -- Database to write to in Neo4j. By default in the neo4j community addition, new databases cannot be instantiated so you must use "neo4j". If using enterprise edition, ensure the database exists.

  • use_auradb -- Set to true if you are using neo4j's serverless implementation called AuraDB. Defaults to false.

  • s3_session -- An AWS S3 Session. This must be passed in if use_auradb is set to true. This is used as a public csv proxy to securly upload your files into AuraDB. Defaults to None.

Example

The following code shows how to write to a neo4j database.

..code-block::python ds = (

ctx.read.manifest(...) .partition(...) .extract_document_structure(...) .extract_graph_entities(...) .extract_graph_relationships(...) .resolve_graph_entities(...) .explode()

)

URI = "neo4j+s://<AURADB_INSTANCE_ID>.databases.neo4j.io" AUTH = ("neo4j", "sample_password") DATABASE = "neo4j IMPORT_DIR = "/tmp/neo4j" S3_SESSION = boto3.session.Session()

ds.write.neo4j(

uri=URI, auth=AUTH, database=DATABASE, import_dir=IMPORT_DIR, use_auradb=True, s3_session=S3_SESSION

) .. code-block:: python

opensearch(*, os_client_args: dict, index_name: str, index_settings: dict, execute: bool = True, **kwargs) DocSet | None[source]#

Writes the content of the DocSet into the specified OpenSearch index.

Parameters:
  • os_client_args -- Keyword parameters that are passed to the opensearch-py OpenSearch client constructor. See more information at https://opensearch.org/docs/latest/clients/python-low-level/

  • index_name -- The name of the OpenSearch index into which to load this DocSet.

  • index_settings -- Settings and mappings to pass when creating a new index. Specified as a Python dict corresponding to the JSON paramters taken by the OpenSearch CreateIndex API: https://opensearch.org/docs/latest/api-reference/index-apis/create-index/

  • execute -- Execute the pipeline and write to opensearch on adding this operator. If false, will return a new docset with the write in the plan

  • kwargs -- Keyword arguments to pass to the underlying execution engine

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local OpenSearch index called my_index.

os_client_args = {
    "hosts": [{"host": "localhost", "port": 9200}],
    "http_auth": ("user", "password"),
}

index_settings = {
    "body": {
        "settings": {
            "index.knn": True,
        },
        "mappings": {
            "properties": {
                "embedding": {
                    "type": "knn_vector",
                    "dimension": 384,
                    "method": {"name": "hnsw", "engine": "faiss"},
                },
            },
        },
    },
}

context = sycamore.init()
pdf_docset = context.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())

pdf.write.opensearch(
     os_client_args=os_client_args,
     index_name="my_index",
     index_settings=index_settings)
pinecone(*, index_name: str, index_spec: Any | None = None, namespace: str = '', dimensions: int | None = None, distance_metric: str = 'cosine', api_key: str | None = None, execute: bool = True, log: bool = False, **kwargs) DocSet | None[source]#

Writes the content of the DocSet into a Pinecone vector index.

Parameters:
  • index_name -- Name of the pinecone index to ingest into

  • index_spec -- Cloud parameters needed by pinecone to create your index. See https://docs.pinecone.io/guides/indexes/create-an-index Defaults to None, which assumes the index already exists, and will not modify an existing index if provided

  • namespace -- Namespace withing the pinecone index to ingest into. See https://docs.pinecone.io/guides/indexes/use-namespaces Defaults to "", which is the default namespace

  • dimensions -- Dimensionality of dense vectors in your index. Defaults to None, which assumes the index already exists, and will not modify an existing index if provided.

  • distance_metric -- Distance metric used for nearest-neighbor search in your index. Defaults to "cosine", but will not modify an already-existing index

  • api_key -- Pinecone service API Key. Defaults to None (will use the environment variable PINECONE_API_KEY).

  • kwargs -- Arguments to pass to the underlying execution engine

Example

The following shows how to read a pdf dataset into a DocSet and write it out to a pinecone index called "mytestingindex"

model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
    ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True))
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .term_frequency(tokenizer=tokenizer, with_token_ids=True)
    .sketch(window=17)
)

ds.write.pinecone(
    index_name="mytestingindex",
    index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"),
    namespace="",
    dimensions=384,
    distance_metric="dotproduct",
)
qdrant(client_params: dict, collection_params: dict, vector_name: str | None = None, execute: bool = True, **kwargs) DocSet | None[source]#

Writes the content of the DocSet into a Qdrant collection

Parameters:

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a Qdrant collection called "sycamore_collection".

weaviate(*, wv_client_args: dict, collection_name: str, collection_config: dict[str, Any] | None = None, flatten_properties: bool = False, execute: bool = True, **kwargs) DocSet | None[source]#

Writes the content of the DocSet into the specified Weaviate collection.

Parameters:
  • wv_client_args -- Keyword parameters that are passed to the weaviate client constructor. See more information at https://weaviate.io/developers/weaviate/client-libraries/python#python-client-v4-explicit-connection

  • collection_name -- The name of the Weaviate collection into which to load this DocSet.

  • collection_config -- Keyword parameters that are passed to the weaviate client's collections.create() method.If not provided, Weaviate will Auto-Schematize the incoming records, which may lead to inconsistencies or failures. See more information at https://weaviate.io/developers/weaviate/manage-data/collections#create-a-collection-and-define-properties

  • flatten_properties -- Whether to flatten documents into pure key-value pairs or to allow nested structures. Default is False (allow nested structures)

  • execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True

  • kwargs -- Arguments to pass to the underlying execution engine

Example

The following code shows how to read a pdf dataset into a DocSet and write it out to a local Weaviate collection called DemoCollection.

collection = "DemoCollection"
wv_client_args = {
    "connection_params": ConnectionParams.from_params(
        http_host="localhost",
        http_port=8080,
        http_secure=False,
        grpc_host="localhost",
        grpc_port=50051,
        grpc_secure=False,
    )
}

# Weaviate will assume empty arrays are empty arrays of text, so it
# will throw errors when you try to make an array of non-text in a
# field that some records have empty. => We specify them here.
collection_config_params = {
    "name": collection,
    "description": "A collection to demo data-prep with sycamore",
    "properties": [
        Property(
            name="properties",
            data_type=DataType.OBJECT,
            nested_properties=[
                Property(
                    name="links",
                    data_type=DataType.OBJECT_ARRAY,
                    nested_properties=[
                        Property(name="text", data_type=DataType.TEXT),
                        Property(name="url", data_type=DataType.TEXT),
                        Property(name="start_index", data_type=DataType.NUMBER),
                    ],
                ),
            ],
        ),
        Property(name="bbox", data_type=DataType.NUMBER_ARRAY),
        Property(name="shingles", data_type=DataType.INT_ARRAY),
    ],
    "vectorizer_config": [Configure.NamedVectors.text2vec_transformers(name="embedding")],
    "references": [ReferenceProperty(name="parent", target_collection=collection)],
}

model_name = "sentence-transformers/all-MiniLM-L6-v2"
davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)

ctx = sycamore.init()

ds = ctx.read.binary(paths, binary_format="pdf")
    .partition(partitioner=UnstructuredPdfPartitioner())
    .regex_replace(COALESCE_WHITESPACE)
    .extract_entity(entity_extractor=OpenAIEntityExtractor(
            "title", llm=davinci_llm, prompt_template=title_template))
    .mark_bbox_preset(tokenizer=tokenizer)
    .merge(merger=MarkedMerger())
    .spread_properties(["path", "title"])
    .split_elements(tokenizer=tokenizer, max_tokens=512)
    .explode()
    .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
    .sketch(window=17)

ds.write.weaviate(
    wv_client_args=wv_client_args,
    collection_name=collection,
    collection_config=collection_config_params
)