DocSetWriter¶
- class sycamore.writer.DocSetWriter(context: Context, plan: Node)[source]¶
Contains interfaces for writing to external storage systems, most notably OpenSearch.
Users should not instantiate this class directly, but instead access an instance using
sycamore.docset.DocSet.write()
- duckdb(dimensions: int, db_url: str = 'tmp.db', table_name: str | None = None, batch_size: int | None = None, schema: dict[str, str] | None = None, execute: bool = True, **kwargs)[source]¶
Writes the content of the DocSet into a DuckDB database.
- Parameters:
dimensions -- The dimensions of the embeddings of each vector (required paramater)
db_url -- The URL of the DuckDB database.
table_name -- The table name to write the data to when possible
batch_size -- The file batch size when loading entries into the DuckDB database table
schema -- Defines the schema of the table to enter entries
execute -- Flag that determines whether to execute immediately
Example
The following shows how to read a pdf dataset into a
DocSet
and write it out to a DuckDB database and read from it.
- elasticsearch(*, url: str, index_name: str, es_client_args: dict = {}, wait_for_completion: str = 'false', settings: dict | None = None, mappings: dict | None = None, execute: bool = True, **kwargs) DocSet | None [source]¶
Writes the content of the DocSet into the specified Elasticsearch index.
- Parameters:
url -- Connection endpoint for the Elasticsearch instance. Note that this must be paired with the necessary client arguments below
index_name -- Index name to write to in the Elasticsearch instance
es_client_args -- Authentication arguments to be specified (if needed). See more information at https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/elasticsearch.html
wait_for_completion -- Whether to wait for completion of the write before proceeding with next steps. See more information at https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
mappings -- Mapping of the Elasticsearch index, can be optionally specified
settings -- Settings of the Elasticsearch index, can be optionally specified
execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local Elasticsearch index called test-index.url = "http://localhost:9200" index_name = "test-index" model_name = "sentence-transformers/all-MiniLM-L6-v2" paths = str(TEST_DIR / "resources/data/pdfs/") OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value) tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ( ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) .regex_replace(COALESCE_WHITESPACE) .mark_bbox_preset(tokenizer=tokenizer) .merge(merger=MarkedMerger()) .spread_properties(["path"]) .split_elements(tokenizer=tokenizer, max_tokens=512) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .sketch(window=17) ) ds.write.elasticsearch(url=url, index_name=index_name)
- files(path: str, filesystem: ~pyarrow._fs.FileSystem | None = None, filename_fn: ~typing.Callable[[~sycamore.data.document.Document], str] = <function default_filename>, doc_to_bytes_fn: ~typing.Callable[[~sycamore.data.document.Document], bytes] = <function default_doc_to_bytes>, **resource_args) None [source]¶
Writes the content of each Document to a separate file.
- Parameters:
path -- The path prefix to write to. Should include the scheme if not local.
filesystem -- The pyarrow.fs FileSystem to use.
filename_fn -- A function for generating a file name. Takes a Document and returns a unique name that will be appended to path.
doc_to_bytes_fn -- A function from a Document to bytes for generating the data to write. Defaults to using text_representation if available, or binary_representation if not.
resource_args -- Arguments to pass to the underlying execution environment.
- json(path: str, filesystem: FileSystem | None = None, **resource_args) None [source]¶
Writes Documents in JSONL format to files, one file per block. Typically, a block corresponds to a single pre-explode source document.
- Parameters:
path -- The path prefix to write to. Should include the scheme if not local.
filesystem -- The pyarrow.fs FileSystem to use.
resource_args -- Arguments to pass to the underlying execution environment.
- opensearch(*, os_client_args: dict, index_name: str, index_settings: dict, insert_settings: dict | None = None, execute: bool = True, reliability_rewriter: bool = False, **kwargs) DocSet | None [source]¶
Writes the content of the DocSet into the specified OpenSearch index.
- Parameters:
os_client_args -- Keyword parameters that are passed to the opensearch-py OpenSearch client constructor. See more information at https://opensearch.org/docs/latest/clients/python-low-level/
index_name -- The name of the OpenSearch index into which to load this DocSet.
index_settings -- Settings and mappings to pass when creating a new index. Specified as a Python dict corresponding to the JSON paramters taken by the OpenSearch CreateIndex API: https://opensearch.org/docs/latest/api-reference/index-apis/create-index/
insert_settings -- Settings to pass when inserting data into the index. Specified as a Python dict. Defaults to {"raise_on_error": False, "raise_on_exception": False, "chunk_size": 100, "thread_count": 3}
execute -- Execute the pipeline and write to opensearch on adding this operator. If false, will return a new docset with the write in the plan
kwargs -- Keyword arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local OpenSearch index called my_index.os_client_args = { "hosts": [{"host": "localhost", "port": 9200}], "http_auth": ("user", "password"), } index_settings = { "body": { "settings": { "index.knn": True, }, "mappings": { "properties": { "embedding": { "type": "knn_vector", "dimension": 384, "method": {"name": "hnsw", "engine": "faiss"}, }, }, }, }, } context = sycamore.init() pdf_docset = context.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) pdf.write.opensearch( os_client_args=os_client_args, index_name="my_index", index_settings=index_settings)
- pinecone(*, index_name: str, index_spec: Any | None = None, namespace: str = '', dimensions: int | None = None, distance_metric: str = 'cosine', api_key: str | None = None, execute: bool = True, log: bool = False, **kwargs) DocSet | None [source]¶
Writes the content of the DocSet into a Pinecone vector index.
- Parameters:
index_name -- Name of the pinecone index to ingest into
index_spec -- Cloud parameters needed by pinecone to create your index. See https://docs.pinecone.io/guides/indexes/create-an-index Defaults to None, which assumes the index already exists, and will not modify an existing index if provided
namespace -- Namespace withing the pinecone index to ingest into. See https://docs.pinecone.io/guides/indexes/use-namespaces Defaults to "", which is the default namespace
dimensions -- Dimensionality of dense vectors in your index. Defaults to None, which assumes the index already exists, and will not modify an existing index if provided.
distance_metric -- Distance metric used for nearest-neighbor search in your index. Defaults to "cosine", but will not modify an already-existing index
api_key -- Pinecone service API Key. Defaults to None (will use the environment variable PINECONE_API_KEY).
kwargs -- Arguments to pass to the underlying execution engine
Example
The following shows how to read a pdf dataset into a
DocSet
and write it out to a pinecone index called "mytestingindex"model_name = "sentence-transformers/all-MiniLM-L6-v2" tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ( ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True)) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .term_frequency(tokenizer=tokenizer, with_token_ids=True) .sketch(window=17) ) ds.write.pinecone( index_name="mytestingindex", index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"), namespace="", dimensions=384, distance_metric="dotproduct", )
- qdrant(client_params: dict, collection_params: dict, vector_name: str | None = None, execute: bool = True, **kwargs) DocSet | None [source]¶
Writes the content of the DocSet into a Qdrant collection
- Parameters:
client_params -- Parameters that are passed to the Qdrant client constructor. See more information at https://python-client.qdrant.tech/qdrant_client.qdrant_client
collection_params -- Parameters that are passed into the qdrant_client.QdrantClient.create_collection method. See more information at https://python-client.qdrant.tech/_modules/qdrant_client/qdrant_client#QdrantClient.create_collection
vector_name -- The name of the vector in the Qdrant collection. Defaults to None.
execute -- Execute the pipeline and write to Qdrant on adding this operator. If False, will return a DocSet with this write in the plan. Defaults to True.
kwargs -- Arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a Qdrant collection called "sycamore_collection".
- weaviate(*, wv_client_args: dict, collection_name: str, collection_config: dict[str, Any] | None = None, flatten_properties: bool = False, execute: bool = True, **kwargs) DocSet | None [source]¶
Writes the content of the DocSet into the specified Weaviate collection.
- Parameters:
wv_client_args -- Keyword parameters that are passed to the weaviate client constructor. See more information at https://weaviate.io/developers/weaviate/client-libraries/python#python-client-v4-explicit-connection
collection_name -- The name of the Weaviate collection into which to load this DocSet.
collection_config -- Keyword parameters that are passed to the weaviate client's collections.create() method.If not provided, Weaviate will Auto-Schematize the incoming records, which may lead to inconsistencies or failures. See more information at https://weaviate.io/developers/weaviate/manage-data/collections#create-a-collection-and-define-properties
flatten_properties -- Whether to flatten documents into pure key-value pairs or to allow nested structures. Default is False (allow nested structures)
execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True
kwargs -- Arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local Weaviate collection called DemoCollection.collection = "DemoCollection" wv_client_args = { "connection_params": ConnectionParams.from_params( http_host="localhost", http_port=8080, http_secure=False, grpc_host="localhost", grpc_port=50051, grpc_secure=False, ) } # Weaviate will assume empty arrays are empty arrays of text, so it # will throw errors when you try to make an array of non-text in a # field that some records have empty. => We specify them here. collection_config_params = { "name": collection, "description": "A collection to demo data-prep with sycamore", "properties": [ Property( name="properties", data_type=DataType.OBJECT, nested_properties=[ Property( name="links", data_type=DataType.OBJECT_ARRAY, nested_properties=[ Property(name="text", data_type=DataType.TEXT), Property(name="url", data_type=DataType.TEXT), Property(name="start_index", data_type=DataType.NUMBER), ], ), ], ), Property(name="bbox", data_type=DataType.NUMBER_ARRAY), Property(name="shingles", data_type=DataType.INT_ARRAY), ], "vectorizer_config": [Configure.NamedVectors.text2vec_transformers(name="embedding")], "references": [ReferenceProperty(name="parent", target_collection=collection)], } model_name = "sentence-transformers/all-MiniLM-L6-v2" davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value) tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) .regex_replace(COALESCE_WHITESPACE) .extract_entity(entity_extractor=OpenAIEntityExtractor( "title", llm=davinci_llm, prompt_template=title_template)) .mark_bbox_preset(tokenizer=tokenizer) .merge(merger=MarkedMerger()) .spread_properties(["path", "title"]) .split_elements(tokenizer=tokenizer, max_tokens=512) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .sketch(window=17) ds.write.weaviate( wv_client_args=wv_client_args, collection_name=collection, collection_config=collection_config_params )