# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers.vectorstore._async._utils import model_must_be_deployed
from elasticsearch.helpers.vectorstore._utils import DistanceMetric
class AsyncRetrievalStrategy(ABC):
[docs]
@abstractmethod
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
"""
Returns the Elasticsearch query body for the given parameters.
The store will execute the query.
:param query: The text query. Can be None if query_vector is given.
:param k: The total number of results to retrieve.
:param num_candidates: The number of results to fetch initially in knn search.
:param filter: List of filter clauses to apply to the query.
:param query_vector: The query vector. Can be None if a query string is given.
:return: The Elasticsearch query body.
"""
[docs]
@abstractmethod
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Create the required index and do necessary preliminary work, like
creating inference pipelines or checking if a required model was deployed.
:param client: Elasticsearch client connection.
:param text_field: The field containing the text data in the index.
:param vector_field: The field containing the vector representations in the index.
:param num_dimensions: If vectors are indexed, how many dimensions do they have.
:return: Dictionary with field and field type pairs that describe the schema.
"""
[docs]
async def before_index_creation(
self, *, client: AsyncElasticsearch, text_field: str, vector_field: str
) -> None:
"""
Executes before the index is created. Used for setting up
any required Elasticsearch resources like a pipeline.
Defaults to a no-op.
:param client: The Elasticsearch client.
:param text_field: The field containing the text data in the index.
:param vector_field: The field containing the vector representations in the index.
"""
pass
[docs]
def needs_inference(self) -> bool:
"""
Some retrieval strategies index embedding vectors and allow search by embedding
vector, for example the `DenseVectorStrategy` strategy. Mapping a user input query
string to an embedding vector is called inference. Inference can be applied
in Elasticsearch (using a `model_id`) or outside of Elasticsearch (using an
`EmbeddingService` defined on the `VectorStore`). In the latter case,
this method has to return True.
"""
return False
class AsyncSparseVectorStrategy(AsyncRetrievalStrategy):
"""Sparse retrieval strategy using the `text_expansion` processor."""
[docs]
def __init__(self, model_id: str = ".elser_model_2"):
self.model_id = model_id
self._tokens_field = "tokens"
self._pipeline_name = f"{self.model_id}_sparse_embedding"
[docs]
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
if query_vector:
raise ValueError(
"Cannot do sparse retrieval with a query_vector. "
"Inference is currently always applied in Elasticsearch."
)
if query is None:
raise ValueError("please specify a query string")
return {
"query": {
"bool": {
"must": [
{
"text_expansion": {
f"{vector_field}.{self._tokens_field}": {
"model_id": self.model_id,
"model_text": query,
}
}
}
],
"filter": filter,
}
}
}
[docs]
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
mappings: Dict[str, Any] = {
"properties": {
vector_field: {
"properties": {self._tokens_field: {"type": "rank_features"}}
}
}
}
settings = {"default_pipeline": self._pipeline_name}
return mappings, settings
[docs]
async def before_index_creation(
self, *, client: AsyncElasticsearch, text_field: str, vector_field: str
) -> None:
if self.model_id:
await model_must_be_deployed(client, self.model_id)
# Create a pipeline for the model
await client.ingest.put_pipeline(
id=self._pipeline_name,
description="Embedding pipeline for Python VectorStore",
processors=[
{
"inference": {
"model_id": self.model_id,
"target_field": vector_field,
"field_map": {text_field: "text_field"},
"inference_config": {
"text_expansion": {"results_field": self._tokens_field}
},
}
}
],
)
class AsyncDenseVectorStrategy(AsyncRetrievalStrategy):
"""K-nearest-neighbors retrieval."""
[docs]
def __init__(
self,
*,
distance: DistanceMetric = DistanceMetric.COSINE,
model_id: Optional[str] = None,
hybrid: bool = False,
rrf: Union[bool, Dict[str, Any]] = True,
text_field: Optional[str] = "text_field",
):
if hybrid and not text_field:
raise ValueError(
"to enable hybrid you have to specify a text_field (for BM25Strategy matching)"
)
self.distance = distance
self.model_id = model_id
self.hybrid = hybrid
self.rrf = rrf
self.text_field = text_field
[docs]
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
knn = {
"filter": filter,
"field": vector_field,
"k": k,
"num_candidates": num_candidates,
}
if query_vector is not None:
knn["query_vector"] = query_vector
else:
# Inference in Elasticsearch. When initializing we make sure to always have
# a model_id if don't have an embedding_service.
knn["query_vector_builder"] = {
"text_embedding": {
"model_id": self.model_id,
"model_text": query,
}
}
if self.hybrid:
return self._hybrid(query=cast(str, query), knn=knn, filter=filter)
return {"knn": knn}
[docs]
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if self.distance is DistanceMetric.COSINE:
similarity = "cosine"
elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE:
similarity = "l2_norm"
elif self.distance is DistanceMetric.DOT_PRODUCT:
similarity = "dot_product"
elif self.distance is DistanceMetric.MAX_INNER_PRODUCT:
similarity = "max_inner_product"
else:
raise ValueError(f"Similarity {self.distance} not supported.")
mappings: Dict[str, Any] = {
"properties": {
vector_field: {
"type": "dense_vector",
"dims": num_dimensions,
"index": True,
"similarity": similarity,
},
}
}
return mappings, {}
[docs]
async def before_index_creation(
self, *, client: AsyncElasticsearch, text_field: str, vector_field: str
) -> None:
if self.model_id:
await model_must_be_deployed(client, self.model_id)
def _hybrid(
self, query: str, knn: Dict[str, Any], filter: List[Dict[str, Any]]
) -> Dict[str, Any]:
# Add a query to the knn query.
# RRF is used to even the score from the knn query and text query
# RRF has two optional parameters: {'rank_constant':int, 'rank_window_size':int}
# https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html
standard_query = {
"query": {
"bool": {
"must": [
{
"match": {
self.text_field: {
"query": query,
}
}
}
],
"filter": filter,
}
}
}
if self.rrf is False:
query_body = {
"knn": knn,
**standard_query,
}
else:
rrf_options = {}
if isinstance(self.rrf, Dict):
if "rank_constant" in self.rrf:
rrf_options["rank_constant"] = self.rrf["rank_constant"]
if "window_size" in self.rrf:
# 'window_size' was renamed to 'rank_window_size', but we support
# the older name for backwards compatibility
rrf_options["rank_window_size"] = self.rrf["window_size"]
if "rank_window_size" in self.rrf:
rrf_options["rank_window_size"] = self.rrf["rank_window_size"]
query_body = {
"retriever": {
"rrf": {
"retrievers": [
{"standard": standard_query},
{"knn": knn},
],
**rrf_options,
},
},
}
return query_body
[docs]
def needs_inference(self) -> bool:
return not self.model_id
class AsyncDenseVectorScriptScoreStrategy(AsyncRetrievalStrategy):
"""Exact nearest neighbors retrieval using the `script_score` query."""
[docs]
def __init__(self, distance: DistanceMetric = DistanceMetric.COSINE) -> None:
self.distance = distance
[docs]
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
if not query_vector:
raise ValueError("specify a query_vector")
if self.distance is DistanceMetric.COSINE:
similarity_algo = (
f"cosineSimilarity(params.query_vector, '{vector_field}') + 1.0"
)
elif self.distance is DistanceMetric.EUCLIDEAN_DISTANCE:
similarity_algo = f"1 / (1 + l2norm(params.query_vector, '{vector_field}'))"
elif self.distance is DistanceMetric.DOT_PRODUCT:
similarity_algo = f"""
double value = dotProduct(params.query_vector, '{vector_field}');
return sigmoid(1, Math.E, -value);
"""
elif self.distance is DistanceMetric.MAX_INNER_PRODUCT:
similarity_algo = f"""
double value = dotProduct(params.query_vector, '{vector_field}');
if (dotProduct < 0) {{
return 1 / (1 + -1 * dotProduct);
}}
return dotProduct + 1;
"""
else:
raise ValueError(f"Similarity {self.distance} not supported.")
query_bool: Dict[str, Any] = {"match_all": {}}
if filter:
query_bool = {"bool": {"filter": filter}}
return {
"query": {
"script_score": {
"query": query_bool,
"script": {
"source": similarity_algo,
"params": {"query_vector": query_vector},
},
},
}
}
[docs]
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
mappings = {
"properties": {
vector_field: {
"type": "dense_vector",
"dims": num_dimensions,
"index": False,
}
}
}
return mappings, {}
[docs]
def needs_inference(self) -> bool:
return True
class AsyncBM25Strategy(AsyncRetrievalStrategy):
[docs]
def __init__(
self,
k1: Optional[float] = None,
b: Optional[float] = None,
):
self.k1 = k1
self.b = b
[docs]
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
return {
"query": {
"bool": {
"must": [
{
"match": {
text_field: {
"query": query,
}
},
},
],
"filter": filter,
},
},
}
[docs]
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
similarity_name = "custom_bm25"
mappings: Dict[str, Any] = {
"properties": {
text_field: {
"type": "text",
"similarity": similarity_name,
},
},
}
bm25: Dict[str, Any] = {
"type": "BM25",
}
if self.k1 is not None:
bm25["k1"] = self.k1
if self.b is not None:
bm25["b"] = self.b
settings = {
"similarity": {
similarity_name: bm25,
}
}
return mappings, settings