DocSetWriter#
- class sycamore.writer.DocSetWriter(context: Context, plan: Node)[source]#
Contains interfaces for writing to external storage systems, most notably OpenSearch.
Users should not instantiate this class directly, but instead access an instance using
sycamore.docset.DocSet.write()
- duckdb(dimensions: int, db_url: str | None = None, table_name: str | None = None, batch_size: int | None = None, schema: dict[str, str] | None = None, execute: bool = True, **kwargs)[source]#
Writes the content of the DocSet into a DuckDB database.
- Parameters:
dimensions -- The dimensions of the embeddings of each vector (required paramater)
db_url -- The URL of the DuckDB database.
table_name -- The table name to write the data to when possible
batch_size -- The file batch size when loading entries into the DuckDB database table
schema -- Defines the schema of the table to enter entries
execute -- Flag that determines whether to execute immediately
Example
The following shows how to read a pdf dataset into a
DocSet
and write it out to a DuckDB database and read from it.
- elasticsearch(*, url: str, index_name: str, es_client_args: dict = {}, wait_for_completion: str = 'false', settings: dict | None = None, mappings: dict | None = None, execute: bool = True, **kwargs) DocSet | None [source]#
Writes the content of the DocSet into the specified Elasticsearch index.
- Parameters:
url -- Connection endpoint for the Elasticsearch instance. Note that this must be paired with the necessary client arguments below
index_name -- Index name to write to in the Elasticsearch instance
es_client_args -- Authentication arguments to be specified (if needed). See more information at https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/elasticsearch.html
wait_for_completion -- Whether to wait for completion of the write before proceeding with next steps. See more information at https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
mappings -- Mapping of the Elasticsearch index, can be optionally specified
settings -- Settings of the Elasticsearch index, can be optionally specified
execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local Elasticsearch index called test-index.url = "http://localhost:9200" index_name = "test-index" model_name = "sentence-transformers/all-MiniLM-L6-v2" paths = str(TEST_DIR / "resources/data/pdfs/") OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value) tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ( ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) .regex_replace(COALESCE_WHITESPACE) .mark_bbox_preset(tokenizer=tokenizer) .merge(merger=MarkedMerger()) .spread_properties(["path"]) .split_elements(tokenizer=tokenizer, max_tokens=512) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .sketch(window=17) ) ds.write.elasticsearch(url=url, index_name=index_name)
- files(path: str, filesystem: ~pyarrow._fs.FileSystem | None = None, filename_fn: ~typing.Callable[[~sycamore.data.document.Document], str] = <function default_filename>, doc_to_bytes_fn: ~typing.Callable[[~sycamore.data.document.Document], bytes] = <function default_doc_to_bytes>, **resource_args) None [source]#
Writes the content of each Document to a separate file.
- Parameters:
path -- The path prefix to write to. Should include the scheme if not local.
filesystem -- The pyarrow.fs FileSystem to use.
filename_fn -- A function for generating a file name. Takes a Document and returns a unique name that will be appended to path.
doc_to_bytes_fn -- A function from a Document to bytes for generating the data to write. Defaults to using text_representation if available, or binary_representation if not.
resource_args -- Arguments to pass to the underlying execution environment.
- json(path: str, filesystem: FileSystem | None = None, **resource_args) None [source]#
Writes Documents in JSONL format to files, one file per block. Typically, a block corresponds to a single pre-explode source document.
- Parameters:
path -- The path prefix to write to. Should include the scheme if not local.
filesystem -- The pyarrow.fs FileSystem to use.
resource_args -- Arguments to pass to the underlying execution environment.
- neo4j(uri: str, auth: tuple[Any, Any] | Auth | AuthManager | None, import_dir: str, database: str = 'neo4j', use_auradb: bool = False, s3_session: Session | None = None, **kwargs) DocSet | None [source]#
*EXPERIMENTAL*
Writes the content of the DocSet into the specified Neo4j database.
- Parameters:
uri -- Connection endpoint for the neo4j instance. Note that this must be paired with the necessary client arguments below
auth -- Authentication arguments to be specified. See more information at https://neo4j.com/docs/api/python-driver/current/api.html#auth-ref
import_dir -- The import directory that neo4j uses. You can specify where to mount this volume when you launch your neo4j docker container.
database -- Database to write to in Neo4j. By default in the neo4j community addition, new databases cannot be instantiated so you must use "neo4j". If using enterprise edition, ensure the database exists.
use_auradb -- Set to true if you are using neo4j's serverless implementation called AuraDB. Defaults to false.
s3_session -- An AWS S3 Session. This must be passed in if use_auradb is set to true. This is used as a public csv proxy to securly upload your files into AuraDB. Defaults to None.
Example
The following code shows how to write to a neo4j database.
..code-block::python ds = (
ctx.read.manifest(...) .partition(...) .extract_document_structure(...) .extract_graph_entities(...) .extract_graph_relationships(...) .resolve_graph_entities(...) .explode()
)
URI = "neo4j+s://<AURADB_INSTANCE_ID>.databases.neo4j.io" AUTH = ("neo4j", "sample_password") DATABASE = "neo4j IMPORT_DIR = "/tmp/neo4j" S3_SESSION = boto3.session.Session()
- ds.write.neo4j(
uri=URI, auth=AUTH, database=DATABASE, import_dir=IMPORT_DIR, use_auradb=True, s3_session=S3_SESSION
) .. code-block:: python
- opensearch(*, os_client_args: dict, index_name: str, index_settings: dict, execute: bool = True, **kwargs) DocSet | None [source]#
Writes the content of the DocSet into the specified OpenSearch index.
- Parameters:
os_client_args -- Keyword parameters that are passed to the opensearch-py OpenSearch client constructor. See more information at https://opensearch.org/docs/latest/clients/python-low-level/
index_name -- The name of the OpenSearch index into which to load this DocSet.
index_settings -- Settings and mappings to pass when creating a new index. Specified as a Python dict corresponding to the JSON paramters taken by the OpenSearch CreateIndex API: https://opensearch.org/docs/latest/api-reference/index-apis/create-index/
execute -- Execute the pipeline and write to opensearch on adding this operator. If false, will return a new docset with the write in the plan
kwargs -- Keyword arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local OpenSearch index called my_index.os_client_args = { "hosts": [{"host": "localhost", "port": 9200}], "http_auth": ("user", "password"), } index_settings = { "body": { "settings": { "index.knn": True, }, "mappings": { "properties": { "embedding": { "type": "knn_vector", "dimension": 384, "method": {"name": "hnsw", "engine": "faiss"}, }, }, }, }, } context = sycamore.init() pdf_docset = context.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) pdf.write.opensearch( os_client_args=os_client_args, index_name="my_index", index_settings=index_settings)
- pinecone(*, index_name: str, index_spec: Any | None = None, namespace: str = '', dimensions: int | None = None, distance_metric: str = 'cosine', api_key: str | None = None, execute: bool = True, log: bool = False, **kwargs) DocSet | None [source]#
Writes the content of the DocSet into a Pinecone vector index.
- Parameters:
index_name -- Name of the pinecone index to ingest into
index_spec -- Cloud parameters needed by pinecone to create your index. See https://docs.pinecone.io/guides/indexes/create-an-index Defaults to None, which assumes the index already exists, and will not modify an existing index if provided
namespace -- Namespace withing the pinecone index to ingest into. See https://docs.pinecone.io/guides/indexes/use-namespaces Defaults to "", which is the default namespace
dimensions -- Dimensionality of dense vectors in your index. Defaults to None, which assumes the index already exists, and will not modify an existing index if provided.
distance_metric -- Distance metric used for nearest-neighbor search in your index. Defaults to "cosine", but will not modify an already-existing index
api_key -- Pinecone service API Key. Defaults to None (will use the environment variable PINECONE_API_KEY).
kwargs -- Arguments to pass to the underlying execution engine
Example
The following shows how to read a pdf dataset into a
DocSet
and write it out to a pinecone index called "mytestingindex"model_name = "sentence-transformers/all-MiniLM-L6-v2" tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ( ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True)) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .term_frequency(tokenizer=tokenizer, with_token_ids=True) .sketch(window=17) ) ds.write.pinecone( index_name="mytestingindex", index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"), namespace="", dimensions=384, distance_metric="dotproduct", )
- qdrant(client_params: dict, collection_params: dict, vector_name: str | None = None, execute: bool = True, **kwargs) DocSet | None [source]#
Writes the content of the DocSet into a Qdrant collection
- Parameters:
client_params -- Parameters that are passed to the Qdrant client constructor. See more information at https://python-client.qdrant.tech/qdrant_client.qdrant_client
collection_params -- Parameters that are passed into the qdrant_client.QdrantClient.create_collection method. See more information at https://python-client.qdrant.tech/_modules/qdrant_client/qdrant_client#QdrantClient.create_collection
vector_name -- The name of the vector in the Qdrant collection. Defaults to None.
execute -- Execute the pipeline and write to Qdrant on adding this operator. If False, will return a DocSet with this write in the plan. Defaults to True.
kwargs -- Arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a Qdrant collection called "sycamore_collection".
- weaviate(*, wv_client_args: dict, collection_name: str, collection_config: dict[str, Any] | None = None, flatten_properties: bool = False, execute: bool = True, **kwargs) DocSet | None [source]#
Writes the content of the DocSet into the specified Weaviate collection.
- Parameters:
wv_client_args -- Keyword parameters that are passed to the weaviate client constructor. See more information at https://weaviate.io/developers/weaviate/client-libraries/python#python-client-v4-explicit-connection
collection_name -- The name of the Weaviate collection into which to load this DocSet.
collection_config -- Keyword parameters that are passed to the weaviate client's collections.create() method.If not provided, Weaviate will Auto-Schematize the incoming records, which may lead to inconsistencies or failures. See more information at https://weaviate.io/developers/weaviate/manage-data/collections#create-a-collection-and-define-properties
flatten_properties -- Whether to flatten documents into pure key-value pairs or to allow nested structures. Default is False (allow nested structures)
execute -- Execute the pipeline and write to weaviate on adding this operator. If False, will return a DocSet with this write in the plan. Default is True
kwargs -- Arguments to pass to the underlying execution engine
Example
The following code shows how to read a pdf dataset into a
DocSet
and write it out to a local Weaviate collection called DemoCollection.collection = "DemoCollection" wv_client_args = { "connection_params": ConnectionParams.from_params( http_host="localhost", http_port=8080, http_secure=False, grpc_host="localhost", grpc_port=50051, grpc_secure=False, ) } # Weaviate will assume empty arrays are empty arrays of text, so it # will throw errors when you try to make an array of non-text in a # field that some records have empty. => We specify them here. collection_config_params = { "name": collection, "description": "A collection to demo data-prep with sycamore", "properties": [ Property( name="properties", data_type=DataType.OBJECT, nested_properties=[ Property( name="links", data_type=DataType.OBJECT_ARRAY, nested_properties=[ Property(name="text", data_type=DataType.TEXT), Property(name="url", data_type=DataType.TEXT), Property(name="start_index", data_type=DataType.NUMBER), ], ), ], ), Property(name="bbox", data_type=DataType.NUMBER_ARRAY), Property(name="shingles", data_type=DataType.INT_ARRAY), ], "vectorizer_config": [Configure.NamedVectors.text2vec_transformers(name="embedding")], "references": [ReferenceProperty(name="parent", target_collection=collection)], } model_name = "sentence-transformers/all-MiniLM-L6-v2" davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value) tokenizer = HuggingFaceTokenizer(model_name) ctx = sycamore.init() ds = ctx.read.binary(paths, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) .regex_replace(COALESCE_WHITESPACE) .extract_entity(entity_extractor=OpenAIEntityExtractor( "title", llm=davinci_llm, prompt_template=title_template)) .mark_bbox_preset(tokenizer=tokenizer) .merge(merger=MarkedMerger()) .spread_properties(["path", "title"]) .split_elements(tokenizer=tokenizer, max_tokens=512) .explode() .embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100)) .sketch(window=17) ds.write.weaviate( wv_client_args=wv_client_args, collection_name=collection, collection_config=collection_config_params )