Query

This package allows you to build sophisticated LLM query-powered pipelines using Sycamore.

class sycamore.query.client.SycamoreQueryClient(context: Context | None = None, llm_cache_dir: str | None = None, os_config: dict = {'search_pipeline': 'hybrid_pipeline'}, os_client_args: dict | None = None, cache_dir: str | None = None, sycamore_exec_mode: ExecMode = ExecMode.RAY, llm: LLM | str | None = None, query_plan_strategy: QueryPlanStrategy | None = None)[source]

A client for the Sycamore Query engine.

Parameters:
  • context (optional) -- a configured Sycamore Context. A fresh one is created if not provided.

  • llm_cache_dir (optional) -- Directory to use for LLM result caching.

  • os_config (optional) -- OpenSearch configuration. Defaults to DEFAULT_OS_CONFIG.

  • os_client_args (optional) -- OpenSearch client arguments. Defaults to DEFAULT_OS_CLIENT_ARGS.

  • cache_dir (optional) -- Directory to use for caching intermediate query results.

  • llm (optional) -- LLM implementation to use for planning and execution.

  • query_plan_strategy (optional) -- Strategy to use for planning, can be used to balance cost vs speed.

Notes

If you override the context, you cannot override the llm_cache_dir, os_client_args, or llm; you need to pass those in via the context paramaters, i.e. sycamore.init(params={...})

To override os_client_args, set params["opensearch"]["os_client_args"]. You are likely to also need params["opensearch"]["text_embedder"] = SycamoreQueryClient.default_text_embedder() or another embedder of your choice.

To override the LLM or cache path, you need to override the llm, for example: from sycamore.utils.cache import cache_from_path params["default"]["llm"] = OpenAI(OpenAIModels.GPT_40.value, cache=cache_from_path("/example/path"))

generate_plan(query: str, index: str, schema: OpenSearchSchema, examples: List[PlannerExample] | None = None, natural_language_response: bool = False) LogicalPlan[source]

Generate a logical query plan for the given query, index, and schema.

Parameters:
  • query -- The query to generate a plan for.

  • index -- The index to query against.

  • schema -- The schema for the index.

  • examples -- Optional examples to use for planning.

  • natural_language_response -- Whether to generate a natural language response. If False, raw data will be returned.

get_opensearch_indices() List[str][source]

Get the schema for the provided OpenSearch index.

get_opensearch_schema(index: str) OpenSearchSchema[source]

Get the schema for the provided OpenSearch index.

To debug: logging.getLogger("sycamore.query.schema").setLevel(logging.DEBUG)

query(query: str, index: str, dry_run: bool = False, codegen_mode: bool = False) SycamoreQueryResult[source]

Run a query against the given index.

run_plan(plan: LogicalPlan, dry_run=False, codegen_mode=False) SycamoreQueryResult[source]

Run the given logical query plan and return a tuple of the query ID and result.

sycamore.query.client.configure_logging(logfile: str | None = None, log_level=30)[source]

Configure logging for Sycamore query execution.

class sycamore.query.planner.LlmPlanner(index: str, data_schema: ~sycamore.query.schema.OpenSearchSchema, os_config: dict[str, str], os_client: OpenSearch, strategy: ~sycamore.query.strategy.QueryPlanStrategy = <sycamore.query.strategy.QueryPlanStrategy object>, llm_client: ~sycamore.llms.llms.LLM | None = None, examples: ~typing.List[~sycamore.query.planner.PlannerExample] | None = None, natural_language_response: bool = False)[source]

The top-level query planner for SycamoreQuery. This class is responsible for generating a logical query plan from a user query using the OpenAI LLM.

Parameters:
  • index -- The name of the index to query.

  • data_schema -- A dictionary mapping field names to their types.

  • os_config -- The OpenSearch configuration.

  • os_client -- The OpenSearch client.

  • strategy -- Strategy to use for planning, can be used to balance cost vs speed.

  • llm_client -- The LLM client.

  • examples -- Query examples to assist the LLM planner in few-shot learning. You may override this to customize the few-shot examples provided to the planner.

  • natural_language_response -- Whether to generate a natural language response. If False, the response will be raw data.

generate_from_llm(question: str) Tuple[Any, str][source]

Use LLM to generate a query plan for the given question.

Returns the prompt sent to the LLM, and the plan.

generate_system_prompt(_query: str) str[source]

Generate the LLM system prompt for the given query.

generate_user_prompt(query: str) str[source]

Generate the LLM user prompt for the given query.

make_examples_prompt() str[source]

Generate the prompt fragment for the query examples.

make_operator_prompt(operator: Type[Node]) str[source]

Generate the prompt fragment for the given Node.

make_schema_prompt(schema: OpenSearchSchema) str[source]

Generate the prompt fragment for the provided schema.

plan(question: str) LogicalPlan[source]

Given a question from the user, generate a logical query plan.

class sycamore.query.planner.PlannerExample(schema: OpenSearchSchema, plan: LogicalPlan)[source]

Represents an example query and query plan for the planner.

sycamore.query.planner.process_json_plan(json_plan: str) LogicalPlan[source]

Deserialize the query plan returned by the LLM.

class sycamore.query.logical_plan.LogicalNodeDiffType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
class sycamore.query.logical_plan.LogicalPlan(*, query: str, nodes: MutableMapping[int, Annotated[Node, SerializeAsAny()]], result_node: int, llm_prompt: Any | None = None, llm_plan: str | None = None)[source]

Represents a logical query plan.

Parameters:
  • query -- The query that the plan is for.

  • nodes -- A mapping of node IDs to nodes.

  • result_node -- The node that is the result of the query.

  • llm_prompt -- The LLM prompt that was used to generate this query plan.

  • llm_plan -- The LLM plan that was used to generate this query plan.

compare(other: LogicalPlan) list[LogicalPlanDiffEntry][source]

A simple method to compare 2 logical plans. This comparator traverses a plan 'forward', i.e. it attempts to start from node_id == 0 which is typically a data source query. This helps us detect differences in the plan in the natural flow of data. If the plans diverge structurally, i.e. 2 nodes have different number of downstream nodes we stop traversing.

@param other: plan to compare against @return: List of comparison metrics.

downstream_nodes(node_id: int) List[int][source]

Return the IDs of all nodes that are downstream of the given node.

insert_node(node_id: int, new_node: Node) None[source]

Insert a node into the plan at the specified node_id. Any nodes that depend on the current node_id are shifted to the right, and their node_ids are incremented. Also, the input arrays of the affected nodes are updated.

Precondition: node_id must be greater than 0, and the current node at node_id must have exactly one input.

If there is no current node at node_id (i.e., the new node is being "appended"), we use the current result node as the input to it.

llm_plan: str | None

The result generated by the LLM.

llm_prompt: Any | None

The LLM prompt that was used to generate this query plan.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'llm_plan': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'llm_prompt': FieldInfo(annotation=Union[Any, NoneType], required=False, default=None), 'nodes': FieldInfo(annotation=MutableMapping[int, Annotated[Node, SerializeAsAny]], required=True), 'query': FieldInfo(annotation=str, required=True), 'result_node': FieldInfo(annotation=int, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

nodes: MutableMapping[int, Annotated[Node, SerializeAsAny()]]

A mapping of node IDs to nodes in the query plan.

patch_node_inputs() LogicalPlan[source]

Model validator for LogicalPlan that sets the _input_nodes values for each node.

query: str

The query that the plan is for.

replace_node(node_id: int, new_node: Node) None[source]

Replace the existing node at node_id with "new_node".

result_node: int

The ID of the node that is the result of the query.

class sycamore.query.logical_plan.LogicalPlanDiffEntry(*, node_a: Annotated[Node, SerializeAsAny()], node_b: Annotated[Node, SerializeAsAny()], diff_type: LogicalNodeDiffType, message: str | None = None)[source]
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'diff_type': FieldInfo(annotation=LogicalNodeDiffType, required=True), 'message': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'node_a': FieldInfo(annotation=Node, required=True, metadata=[SerializeAsAny()]), 'node_b': FieldInfo(annotation=Node, required=True, metadata=[SerializeAsAny()])}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class sycamore.query.logical_plan.Node(*, node_type: str = None, node_id: int, description: str | None = None, inputs: List[int] = [])[source]

Represents a node in a logical query plan.

Parameters:
  • node_id -- The ID of the node.

  • _inputs -- The nodes that this node depends on.

cache_dict() dict[source]

Returns a dict representation of this node that can be used for comparison.

cache_key() str[source]

Returns the cache key of this node, used for caching intermediate query results during execution.

description: str | None

A detailed description of why this operator was chosen for this query plan.

classmethod deserialize(data: Dict[str, Any]) Node[source]

Used to deserialize a Node from a dictionary, by returning the appropriate Node subclass.

input_nodes() List[Node][source]

Returns the nodes that this node depends on.

classmethod input_schema() Dict[str, NodeSchemaField][source]

Return a dict mapping field name to type hint for each input field.

inputs: List[int]

A list of node IDs that this operation depends on.

logical_compare(other: Node) bool[source]

Logically compare two instances of a Node.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'use_attribute_docstrings': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'description': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, description='A detailed description of why this operator was chosen for this query plan.', json_schema_extra={'exclude_from_comparison': True}), 'inputs': FieldInfo(annotation=List[int], required=False, default=[], description='A list of node IDs that this operation depends on.'), 'node_id': FieldInfo(annotation=int, required=True, description='A unique integer ID representing this node.'), 'node_type': FieldInfo(annotation=str, required=False, default=None, description='The type of this node.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:
  • self -- The BaseModel instance.

  • context -- The context.

node_id: int

A unique integer ID representing this node.

node_type: str

The type of this node.

serialize_node_type(value: str) str[source]

Field serializer for node_type that returns the class name as a default.

classmethod usage() str[source]

Return a detailed description of the this query operator. Used by the planner.

class sycamore.query.logical_plan.NodeSchemaField(field_name: str, description: str | None, type_hint: str)[source]
sycamore.query.logical_plan.compare_graphs(plan_a: LogicalPlan, plan_b: LogicalPlan, node_id_a: int, node_id_b: int, visited_a: set[int], visited_b: set[int]) list[LogicalPlanDiffEntry][source]

Traverse and compare 2 graphs given a node pointer in each. Computes different comparison metrics per node. The function will continue to traverse as long as the graph structure is identical, i.e. same number of outgoing nodes per node. It also assumes that the "downstream nodes"/edges are ordered - this is the current logical plan implementation to support operations like math.

@param plan_a: LogicalPlan a @param plan_b: LogicalPlan b @param node_id_a: graph node a @param node_id_b: graph node b @param visited_a: helper to track traversal in graph a @param visited_b: helper to track traversal in graph b @return: list of LogicalPlanDiffEntry