import logging
from typing import Any, Callable, Optional, Union, TYPE_CHECKING
from pyarrow.fs import FileSystem
from sycamore.connectors.aryn.client import ArynClient
from sycamore.context import Context, ExecMode, context_params
from sycamore.connectors.common import HostAndPort
from sycamore.connectors.file.file_writer import default_doc_to_bytes, default_filename, FileWriter, JsonWriter
from sycamore.data import Document
from sycamore.decorators import experimental
from sycamore.executor import Execution
from sycamore.plan_nodes import Node
from sycamore.docset import DocSet
from sycamore.utils.aryn_config import ArynConfig
from sycamore.utils.import_utils import requires_modules
from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.service_resource import S3ServiceResource
from boto3.session import Session
if TYPE_CHECKING:
from neo4j import Auth
from neo4j.auth_management import AuthManager
from sycamore.schema import SchemaV2
from sycamore.connectors.base_writer import BaseDBWriter
logger = logging.getLogger(__name__)
[docs]
class DocSetWriter:
"""
Contains interfaces for writing to external storage systems, most notably OpenSearch.
Users should not instantiate this class directly, but instead access an instance using
:meth:`sycamore.docset.DocSet.write`
"""
def __init__(self, context: Context, plan: Node):
self.context = context
self.plan = plan
[docs]
@context_params
def opensearch(
self,
*,
os_client_args: dict,
index_name: str,
index_settings: dict,
insert_settings: Optional[dict] = None,
execute: bool = True,
reliability_rewriter: bool = False,
**kwargs,
) -> Optional["DocSet"]:
"""Writes the content of the DocSet into the specified OpenSearch index.
Args:
os_client_args: Keyword parameters that are passed to the opensearch-py OpenSearch client constructor.
See more information at https://opensearch.org/docs/latest/clients/python-low-level/
index_name: The name of the OpenSearch index into which to load this DocSet.
index_settings: Settings and mappings to pass when creating a new index. Specified as a Python dict
corresponding to the JSON paramters taken by the OpenSearch CreateIndex API:
https://opensearch.org/docs/latest/api-reference/index-apis/create-index/
insert_settings: Settings to pass when inserting data into the index. Specified as a Python dict. Defaults
to {"raise_on_error": False, "raise_on_exception": False, "chunk_size": 100, "thread_count": 3}
execute: Execute the pipeline and write to opensearch on adding this operator. If false,
will return a new docset with the write in the plan
kwargs: Keyword arguments to pass to the underlying execution engine
Example:
The following code shows how to read a pdf dataset into a ``DocSet`` and write it out to a
local OpenSearch index called `my_index`.
.. code-block:: python
os_client_args = {
"hosts": [{"host": "localhost", "port": 9200}],
"http_auth": ("user", "password"),
}
index_settings = {
"body": {
"settings": {
"index.knn": True,
},
"mappings": {
"properties": {
"embedding": {
"type": "knn_vector",
"dimension": 384,
"method": {"name": "hnsw", "engine": "faiss"},
},
},
},
},
}
context = sycamore.init()
pdf_docset = context.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner())
pdf.write.opensearch(
os_client_args=os_client_args,
index_name="my_index",
index_settings=index_settings)
"""
from sycamore.connectors.opensearch import (
OpenSearchWriterClientParams,
OpenSearchWriterTargetParams,
OpenSearchWriter,
)
from typing import Any
import copy
# We mutate os_client_args, so mutate a copy
os_client_args = copy.deepcopy(os_client_args)
# Type narrowing for hosts joy
def _convert_to_host_port_list(hostlist: Any) -> list[HostAndPort]:
if not isinstance(hostlist, list):
raise ValueError('OpenSearch client args "hosts" param must be a list of hosts')
for h in hostlist:
if (
not isinstance(h, dict)
or "host" not in h
or not isinstance(h["host"], str)
or "port" not in h
or not isinstance(h["port"], int)
):
raise ValueError(
'OpenSearch client args "hosts" objects must consist of dicts of '
"the form {'host': '<address>', 'port': <port num>}\n"
f"Found: {h}"
)
return [HostAndPort(host=h["host"], port=h["port"]) for h in hostlist]
hosts = os_client_args.get("hosts", None)
if hosts is not None:
os_client_args["hosts"] = _convert_to_host_port_list(hosts)
client_params = OpenSearchWriterClientParams(**os_client_args)
target_params = OpenSearchWriterTargetParams.from_write_args(
index_name=index_name,
plan=self.plan,
context=self.context,
reliability_rewriter=reliability_rewriter,
execute=execute,
insert_settings=insert_settings,
index_settings=index_settings,
)
os = OpenSearchWriter(
self.plan, client_params=client_params, target_params=target_params, name="OsrchWrite", **kwargs
)
client = None
if reliability_rewriter:
client = os.Client.from_client_params(client_params)
if client._client.indices.exists(index=index_name):
logger.info(f"\n\nWARNING WARNING WARNING: Deleting existing index {index_name}\n\n")
client._client.indices.delete(index=index_name)
# We will probably want to break this at some point so that write
# doesn't execute automatically, and instead you need to say something
# like docset.write.opensearch().execute(), allowing sensible writes
# to multiple locations and post-write operations.
return self._maybe_execute(os, execute, client)
[docs]
@requires_modules(["weaviate", "weaviate.collections.classes.config"], extra="weaviate")
def weaviate(
self,
*,
wv_client_args: dict,
collection_name: str,
collection_config: Optional[dict[str, Any]] = None,
flatten_properties: bool = False,
execute: bool = True,
**kwargs,
) -> Optional["DocSet"]:
"""Writes the content of the DocSet into the specified Weaviate collection.
Args:
wv_client_args: Keyword parameters that are passed to the weaviate client constructor.
See more information at https://weaviate.io/developers/weaviate/client-libraries/python#python-client-v4-explicit-connection
collection_name: The name of the Weaviate collection into which to load this DocSet.
collection_config: Keyword parameters that are passed to the weaviate client's `collections.create()`
method.If not provided, Weaviate will Auto-Schematize the incoming records, which may lead to
inconsistencies or failures. See more information at
https://weaviate.io/developers/weaviate/manage-data/collections#create-a-collection-and-define-properties
flatten_properties: Whether to flatten documents into pure key-value pairs or to allow nested
structures. Default is False (allow nested structures)
execute: Execute the pipeline and write to weaviate on adding this operator. If False,
will return a DocSet with this write in the plan. Default is True
kwargs: Arguments to pass to the underlying execution engine
Example:
The following code shows how to read a pdf dataset into a ``DocSet`` and write it out to a
local Weaviate collection called `DemoCollection`.
.. code-block:: python
collection = "DemoCollection"
wv_client_args = {
"connection_params": ConnectionParams.from_params(
http_host="localhost",
http_port=8080,
http_secure=False,
grpc_host="localhost",
grpc_port=50051,
grpc_secure=False,
)
}
# Weaviate will assume empty arrays are empty arrays of text, so it
# will throw errors when you try to make an array of non-text in a
# field that some records have empty. => We specify them here.
collection_config_params = {
"name": collection,
"description": "A collection to demo data-prep with sycamore",
"properties": [
Property(
name="properties",
data_type=DataType.OBJECT,
nested_properties=[
Property(
name="links",
data_type=DataType.OBJECT_ARRAY,
nested_properties=[
Property(name="text", data_type=DataType.TEXT),
Property(name="url", data_type=DataType.TEXT),
Property(name="start_index", data_type=DataType.NUMBER),
],
),
],
),
Property(name="bbox", data_type=DataType.NUMBER_ARRAY),
Property(name="shingles", data_type=DataType.INT_ARRAY),
],
"vectorizer_config": [Configure.NamedVectors.text2vec_transformers(name="embedding")],
"references": [ReferenceProperty(name="parent", target_collection=collection)],
}
model_name = "sentence-transformers/all-MiniLM-L6-v2"
davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = ctx.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner())
.regex_replace(COALESCE_WHITESPACE)
.extract_entity(entity_extractor=OpenAIEntityExtractor(
"title", llm=davinci_llm, prompt_template=title_template))
.mark_bbox_preset(tokenizer=tokenizer)
.merge(merger=MarkedMerger())
.spread_properties(["path", "title"])
.split_elements(tokenizer=tokenizer, max_tokens=512)
.explode()
.embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
.sketch(window=17)
ds.write.weaviate(
wv_client_args=wv_client_args,
collection_name=collection,
collection_config=collection_config_params
)
"""
from sycamore.connectors.weaviate import (
WeaviateDocumentWriter,
WeaviateCrossReferenceWriter,
WeaviateClientParams,
WeaviateWriterTargetParams,
)
# Importing _ prefixed stuff is fairly common for users of the weaviate codebase
from weaviate.collections.classes.config import _CollectionConfigCreate
if collection_config is None:
collection_config = dict()
client_params = WeaviateClientParams(**wv_client_args)
collection_config_object: _CollectionConfigCreate
if "name" in collection_config:
assert collection_config["name"] == collection_name
collection_config_object = _CollectionConfigCreate(**collection_config)
else:
collection_config_object = _CollectionConfigCreate(name=collection_name, **collection_config)
target_params = WeaviateWriterTargetParams(
name=collection_name, collection_config=collection_config_object, flatten_properties=flatten_properties
)
wv_docs = WeaviateDocumentWriter(
self.plan, client_params, target_params, name="weaviate_write_documents", **kwargs
)
wv_refs = WeaviateCrossReferenceWriter(
wv_docs, client_params, target_params, name="weaviate_write_references", **kwargs
)
return self._maybe_execute(wv_refs, execute)
[docs]
@requires_modules("pinecone", extra="pinecone")
def pinecone(
self,
*,
index_name: str,
index_spec: Optional[Any] = None,
namespace: str = "",
dimensions: Optional[int] = None,
distance_metric: str = "cosine",
api_key: Optional[str] = None,
execute: bool = True,
log: bool = False,
**kwargs,
) -> Optional["DocSet"]:
"""Writes the content of the DocSet into a Pinecone vector index.
Args:
index_name: Name of the pinecone index to ingest into
index_spec: Cloud parameters needed by pinecone to create your index. See
https://docs.pinecone.io/guides/indexes/create-an-index
Defaults to None, which assumes the index already exists, and
will not modify an existing index if provided
namespace: Namespace withing the pinecone index to ingest into. See
https://docs.pinecone.io/guides/indexes/use-namespaces
Defaults to "", which is the default namespace
dimensions: Dimensionality of dense vectors in your index.
Defaults to None, which assumes the index already exists, and
will not modify an existing index if provided.
distance_metric: Distance metric used for nearest-neighbor search in your index.
Defaults to "cosine", but will not modify an already-existing index
api_key: Pinecone service API Key. Defaults to None (will use the environment
variable PINECONE_API_KEY).
kwargs: Arguments to pass to the underlying execution engine
Example:
The following shows how to read a pdf dataset into a ``DocSet`` and write it out
to a pinecone index called "mytestingindex"
.. code-block:: python
model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
ctx.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True))
.explode()
.embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
.term_frequency(tokenizer=tokenizer, with_token_ids=True)
.sketch(window=17)
)
ds.write.pinecone(
index_name="mytestingindex",
index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"),
namespace="",
dimensions=384,
distance_metric="dotproduct",
)
"""
from sycamore.connectors.pinecone import (
PineconeWriter,
PineconeWriterClientParams,
PineconeWriterTargetParams,
)
import os
if log:
logger.setLevel(20)
if api_key is None:
api_key = os.environ.get("PINECONE_API_KEY", "")
assert (
api_key is not None and len(api_key) != 0
), "Missing api key: either provide it as an argument or set the PINECONE_API_KEY env variable."
pcp = PineconeWriterClientParams(api_key=api_key)
ptp = PineconeWriterTargetParams(
index_name=index_name,
namespace=namespace,
index_spec=index_spec,
dimensions=dimensions,
distance_metric=distance_metric,
)
pc = PineconeWriter(self.plan, client_params=pcp, target_params=ptp, name="pinecone_write", **kwargs)
return self._maybe_execute(pc, execute)
[docs]
@requires_modules("duckdb", extra="duckdb")
def duckdb(
self,
dimensions: int,
db_url: str = "tmp.db",
table_name: Optional[str] = None,
batch_size: Optional[int] = None,
schema: Optional[dict[str, str]] = None,
execute: bool = True,
**kwargs,
):
"""
Writes the content of the DocSet into a DuckDB database.
Args:
dimensions: The dimensions of the embeddings of each vector (required paramater)
db_url: The URL of the DuckDB database.
table_name: The table name to write the data to when possible
batch_size: The file batch size when loading entries into the DuckDB database table
schema: Defines the schema of the table to enter entries
execute: Flag that determines whether to execute immediately
Example:
The following shows how to read a pdf dataset into a ``DocSet`` and write it out
to a DuckDB database and read from it.
.. code-block:: python
table_name = "duckdb_table"
db_url = "tmp.db"
model_name = "sentence-transformers/all-MiniLM-L6-v2"
paths = str(TEST_DIR / "resources/data/pdfs/")
OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
ctx.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner())
.regex_replace(COALESCE_WHITESPACE)
.mark_bbox_preset(tokenizer=tokenizer)
.merge(merger=MarkedMerger())
.spread_properties(["path"])
.split_elements(tokenizer=tokenizer, max_tokens=512)
.explode()
.embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
)
ds.write.duckdb(table_name=table_name, db_url=db_url)
"""
from sycamore.connectors.duckdb.duckdb_writer import (
DuckDBWriter,
DuckDBWriterClientParams,
DuckDBWriterTargetParams,
)
client_params = DuckDBWriterClientParams(db_url=db_url)
target_params = DuckDBWriterTargetParams(
**{
k: v
for k, v in {
"table_name": table_name,
"batch_size": batch_size,
"schema": schema,
"dimensions": dimensions,
}.items()
if v is not None
} # type: ignore
)
kwargs["parallelism"] = 1
ddb = DuckDBWriter(
self.plan,
client_params=client_params,
target_params=target_params,
name="duckdb_write_documents",
**kwargs,
)
return self._maybe_execute(ddb, execute)
[docs]
@requires_modules("elasticsearch", extra="elasticsearch")
def elasticsearch(
self,
*,
url: str,
index_name: str,
es_client_args: dict = {},
wait_for_completion: str = "false",
settings: Optional[dict] = None,
mappings: Optional[dict] = None,
execute: bool = True,
**kwargs,
) -> Optional["DocSet"]:
"""Writes the content of the DocSet into the specified Elasticsearch index.
Args:
url: Connection endpoint for the Elasticsearch instance. Note that this must be paired with the
necessary client arguments below
index_name: Index name to write to in the Elasticsearch instance
es_client_args: Authentication arguments to be specified (if needed). See more information at
https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/elasticsearch.html
wait_for_completion: Whether to wait for completion of the write before proceeding with next steps.
See more information at https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
mappings: Mapping of the Elasticsearch index, can be optionally specified
settings: Settings of the Elasticsearch index, can be optionally specified
execute: Execute the pipeline and write to weaviate on adding this operator. If False,
will return a DocSet with this write in the plan. Default is True
Example:
The following code shows how to read a pdf dataset into a ``DocSet`` and write it out to a
local Elasticsearch index called `test-index`.
.. code-block:: python
url = "http://localhost:9200"
index_name = "test-index"
model_name = "sentence-transformers/all-MiniLM-L6-v2"
paths = str(TEST_DIR / "resources/data/pdfs/")
OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
ctx.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner())
.regex_replace(COALESCE_WHITESPACE)
.mark_bbox_preset(tokenizer=tokenizer)
.merge(merger=MarkedMerger())
.spread_properties(["path"])
.split_elements(tokenizer=tokenizer, max_tokens=512)
.explode()
.embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
.sketch(window=17)
)
ds.write.elasticsearch(url=url, index_name=index_name)
"""
from sycamore.connectors.elasticsearch import (
ElasticsearchDocumentWriter,
ElasticsearchWriterClientParams,
ElasticsearchWriterTargetParams,
)
client_params = ElasticsearchWriterClientParams(url=url, es_client_args=es_client_args)
target_params = ElasticsearchWriterTargetParams(
index_name=index_name,
wait_for_completion=wait_for_completion,
**{
k: v
for k, v in {
"mappings": mappings,
"settings": settings,
}.items()
if v is not None
}, # type: ignore
)
es_docs = ElasticsearchDocumentWriter(
self.plan, client_params, target_params, name="elastic_document_writer", **kwargs
)
return self._maybe_execute(es_docs, execute)
@experimental
@requires_modules("neo4j", extra="neo4j")
def neo4j(
self,
uri: str,
auth: Union[tuple[Any, Any], "Auth", "AuthManager", None],
import_dir: str,
database: str = "neo4j",
use_auradb: bool = False,
s3_session: Optional[Session] = None,
**kwargs,
) -> Optional["DocSet"]:
"""
***EXPERIMENTAL***
Writes the content of the DocSet into the specified Neo4j database.
Args:
uri: Connection endpoint for the neo4j instance. Note that this must be paired with the
necessary client arguments below
auth: Authentication arguments to be specified. See more information at
https://neo4j.com/docs/api/python-driver/current/api.html#auth-ref
import_dir: The import directory that neo4j uses. You can specify where to mount this volume when you launch
your neo4j docker container.
database: Database to write to in Neo4j. By default in the neo4j community addition, new databases
cannot be instantiated so you must use "neo4j". If using enterprise edition, ensure the database exists.
use_auradb: Set to true if you are using neo4j's serverless implementation called AuraDB. Defaults to false.
s3_session: An AWS S3 Session. This must be passed in if use_auradb is set to true. This is used as a public
csv proxy to securly upload your files into AuraDB. Defaults to None.
Example:
The following code shows how to write to a neo4j database.
..code-block::python
ds = (
ctx.read.manifest(...)
.partition(...)
.extract_document_structure(...)
.extract_graph_entities(...)
.extract_graph_relationships(...)
.resolve_graph_entities(...)
.explode()
)
URI = "neo4j+s://<AURADB_INSTANCE_ID>.databases.neo4j.io"
AUTH = ("neo4j", "sample_password")
DATABASE = "neo4j
IMPORT_DIR = "/tmp/neo4j"
S3_SESSION = boto3.session.Session()
ds.write.neo4j(
uri=URI,
auth=AUTH,
database=DATABASE,
import_dir=IMPORT_DIR,
use_auradb=True,
s3_session=S3_SESSION
)
.. code-block:: python
"""
import os
from sycamore.connectors.neo4j import (
Neo4jWriterClientParams,
Neo4jWriterTargetParams,
Neo4jValidateParams,
)
from sycamore.plan_nodes import Node
from sycamore.connectors.neo4j import Neo4jPrepareCSV, Neo4jWriteCSV, Neo4jLoadCSV
from sycamore.connectors.neo4j.neo4j_writer import (
create_temp_bucket,
delete_temp_bucket,
load_to_s3_bucket,
get_neo4j_import_info,
)
import time
if kwargs.get("execute") is False:
raise NotImplementedError
if self.context.exec_mode != ExecMode.RAY:
raise NotImplementedError
if use_auradb and not s3_session:
raise ValueError("If using AuraDB, you must also pass in a s3_session object to temporarily host files.")
class Wrapper(Node):
def __init__(self, dataset):
self._ds = dataset
def execute(self, **kwargs):
return self._ds
import_dir = os.path.expanduser(import_dir)
client_params = Neo4jWriterClientParams(uri=uri, auth=auth, import_dir=import_dir)
target_params = Neo4jWriterTargetParams(database=database)
Neo4jValidateParams(client_params=client_params, target_params=target_params)
# Execute the docset up until this point and store it in a node
pre_n4j_plan = Execution(self.context)._apply_rules(self.plan)
pnjds = Execution(self.context)._execute_ray(pre_n4j_plan)
pnjds = pnjds.materialize()
self.plan = Wrapper(pnjds)
pre_n4j_plan.traverse(visit=lambda n: n.finalize())
start = time.time()
Neo4jPrepareCSV(plan=self.plan, client_params=client_params)
Neo4jWriteCSV(plan=self.plan, client_params=client_params).execute().materialize()
end = time.time()
logger.info(f"TIME TAKEN TO WRITE CSV: {end-start} SECONDS")
nodes, relationships, labels = get_neo4j_import_info(import_dir=import_dir)
# If using auradb, load to files to s3
s3_client: S3Client
s3_resource: S3ServiceResource
s3_bucket: str
if use_auradb:
assert s3_session is not None
s3_client = s3_session.client("s3")
s3_resource = s3_session.resource("s3")
s3_bucket = create_temp_bucket(s3_client=s3_client)
nodes, relationships = load_to_s3_bucket(s3_client=s3_client, bucket_name=s3_bucket, import_dir=import_dir)
import_paths = {"nodes": nodes, "relationships": relationships, "labels": labels}
Neo4jLoadCSV(client_params=client_params, target_params=target_params, import_paths=import_paths)
# cleanup s3 files if using auradb
if use_auradb:
delete_temp_bucket(s3_client, s3_resource, s3_bucket)
return None
[docs]
@requires_modules("qdrant_client", extra="qdrant")
def qdrant(
self,
client_params: dict,
collection_params: dict,
vector_name: Optional[str] = None,
execute: bool = True,
**kwargs,
) -> Optional["DocSet"]:
"""Writes the content of the DocSet into a Qdrant collection
Args:
client_params: Parameters that are passed to the Qdrant client constructor.
See more information at
https://python-client.qdrant.tech/qdrant_client.qdrant_client
collection_params: Parameters that are passed into the qdrant_client.QdrantClient.create_collection method.
See more information at
https://python-client.qdrant.tech/_modules/qdrant_client/qdrant_client#QdrantClient.create_collection
vector_name: The name of the vector in the Qdrant collection. Defaults to None.
execute: Execute the pipeline and write to Qdrant on adding this operator. If False,
will return a DocSet with this write in the plan. Defaults to True.
kwargs: Arguments to pass to the underlying execution engine
Example:
The following code shows how to read a pdf dataset into a ``DocSet`` and write it out to a
Qdrant collection called `"sycamore_collection"`.
.. code-block:: python
model_name = "sentence-transformers/all-MiniLM-L6-v2"
davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value, api_key=os.environ["OPENAI_API_KEY"])
tokenizer = HuggingFaceTokenizer(model_name)
ctx = sycamore.init()
ds = (
ctx.read.binary(paths, binary_format="pdf")
.partition(partitioner=SycamorePartitioner(extract_table_structure=True, extract_images=True))
.regex_replace(COALESCE_WHITESPACE)
.extract_entity(
entity_extractor=OpenAIEntityExtractor(
"title", llm=davinci_llm, prompt_template=title_template
)
)
.mark_bbox_preset(tokenizer=tokenizer)
.merge(merger=MarkedMerger())
.spread_properties(["path", "title"])
.split_elements(tokenizer=tokenizer, max_tokens=512)
.explode()
.embed(embedder=SentenceTransformerEmbedder(model_name=model_name, batch_size=100))
.term_frequency(tokenizer=tokenizer, with_token_ids=True)
.sketch(window=17)
)
ds.write.qdrant(
{
"location": "http://localhost:6333",
},
{
"collection_name": "sycamore_collection",
"vectors_config": {
"size": 384,
"distance": "Cosine",
},
},
)
"""
from sycamore.connectors.qdrant import (
QdrantWriter,
QdrantWriterClientParams,
QdrantWriterTargetParams,
)
qw = QdrantWriter(
self.plan,
client_params=QdrantWriterClientParams(**client_params),
target_params=QdrantWriterTargetParams(collection_params=collection_params, vector_name=vector_name),
name="qdrant_write",
**kwargs,
)
pcds = DocSet(self.context, qw)
if execute:
pcds.execute()
return None
else:
return pcds
[docs]
def files(
self,
path: str,
filesystem: Optional[FileSystem] = None,
filename_fn: Callable[[Document], str] = default_filename,
doc_to_bytes_fn: Callable[[Document], bytes] = default_doc_to_bytes,
**resource_args,
) -> None:
"""Writes the content of each Document to a separate file.
Args:
path: The path prefix to write to. Should include the scheme if not local.
filesystem: The pyarrow.fs FileSystem to use.
filename_fn: A function for generating a file name. Takes a Document
and returns a unique name that will be appended to path.
doc_to_bytes_fn: A function from a Document to bytes for generating the data to write.
Defaults to using text_representation if available, or binary_representation
if not.
resource_args: Arguments to pass to the underlying execution environment.
"""
file_writer: Node = FileWriter(
self.plan,
path,
filesystem=filesystem,
filename_fn=filename_fn,
doc_to_bytes_fn=doc_to_bytes_fn,
**resource_args,
)
self._maybe_execute(file_writer, True)
[docs]
def json(
self,
path: str,
filesystem: Optional[FileSystem] = None,
include_metadata: bool = False,
**resource_args,
) -> None:
"""
Writes Documents in JSONL format to files, one file per
block. Typically, a block corresponds to a single
pre-explode source document.
Args:
path: The path prefix to write to. Should include the scheme if not local.
filesystem: The pyarrow.fs FileSystem to use.
resource_args: Arguments to pass to the underlying execution environment.
"""
node: Node = JsonWriter(
self.plan, path, filesystem=filesystem, include_metadata=include_metadata, **resource_args
)
self._maybe_execute(node, True)
@requires_modules("pyiceberg", extra="iceberg")
def iceberg(
self,
catalog_kwargs: dict[str, Any],
schema: "SchemaV2",
table_identifier: str,
property_root: str = "entity",
location: Optional[str] = None,
) -> None:
from sycamore.connectors.iceberg.iceberg_writer import IcebergWriter
node: Node = IcebergWriter(
child=self.plan,
catalog_kwargs=catalog_kwargs,
schema=schema,
table_identifier=table_identifier,
property_root=property_root,
location=location,
)
self._maybe_execute(node, True)
@experimental
def aryn(
self,
*,
docset_id: Optional[str] = None,
name: Optional[str] = None,
aryn_api_key: Optional[str] = None,
aryn_url: Optional[str] = None,
update_schema: bool = True,
only_properties: bool = False,
**kwargs,
) -> Optional["DocSet"]:
"""
Writes all documents of a DocSet to Aryn.
Args:
docset_id: The id of the docset to write to. If not provided, a new docset will be created.
name: The name of the new docset to create. Required if create_new_docset is true.
aryn_api_key: The api key to use for authentication. If not provided, the api key from the config
file will be used.
aryn_url: The url of the Aryn instance to write to. If not provided, the url from the config file
will be used.
"""
from sycamore.connectors.aryn.ArynWriter import (
ArynWriter,
ArynWriterClientParams,
ArynWriterTargetParams,
)
if aryn_api_key is None:
aryn_api_key = ArynConfig.get_aryn_api_key()
if aryn_url is None:
aryn_url = ArynConfig.get_aryn_url()
if docset_id is None and name is None:
raise ValueError("Either docset_id or name must be provided")
if docset_id is None and name is not None:
try:
aryn_client = ArynClient(aryn_url, aryn_api_key)
docset_id = aryn_client.create_docset(name)
logger.info(f"Created new docset with id {docset_id} and name {name}")
except Exception as e:
logger.error(f"Error creating new docset: {e}")
raise e
client_params = ArynWriterClientParams(aryn_url, aryn_api_key)
target_params = ArynWriterTargetParams(docset_id, update_schema=update_schema, only_properties=only_properties)
writer: Node = ArynWriter(self.plan, client_params=client_params, target_params=target_params, **kwargs)
return self._maybe_execute(writer, True)
def _maybe_execute(
self, node: Node, execute: bool, client: Optional[BaseDBWriter.Client] = None
) -> Optional[DocSet]:
ds = DocSet(self.context, node)
if not execute:
return ds
ds.execute()
if client is not None:
if isinstance(node, BaseDBWriter):
client.reliability_assertor(node._target_params)
return None