Skip to content

Commit

Permalink
Add optional vectorizer bytes conversion and exponential backoff (#35)
Browse files Browse the repository at this point in the history
This PR adds support for:
- Exponential backoff for the openAI vectorizer
- Optional `as_buffer` flag that can be passed to the various vectorizer
embedding creation methods. Defaults to `False` always and assumes the
user wants it back as a list of floats. But now this option is enabled.
  • Loading branch information
tylerhutcherson authored Aug 5, 2023
1 parent 9de14f6 commit 4877f8b
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 41 deletions.
4 changes: 1 addition & 3 deletions docs/examples/openai_qna.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -710,13 +710,11 @@
"source": [
"import os\n",
"from redisvl.vectorize.text import OpenAITextVectorizer\n",
"from redisvl.utils.utils import array_to_buffer\n",
"\n",
"api_key = os.environ.get(\"OPENAI_API_KEY\", \"\")\n",
"oaip = OpenAITextVectorizer(EMBEDDINGS_MODEL, api_config={\"api_key\": api_key})\n",
"\n",
"chunked_data[\"embedding\"] = oaip.embed_many(chunked_data[\"content\"].tolist())\n",
"chunked_data[\"embedding\"] = chunked_data[\"embedding\"].apply(lambda x: array_to_buffer(x))\n",
"chunked_data[\"embedding\"] = oaip.embed_many(chunked_data[\"content\"].tolist(), as_buffer=True)\n",
"chunked_data"
]
},
Expand Down
4 changes: 2 additions & 2 deletions docs/user_guide/vectorizers_03.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
" \"Today is a sunny day\"\n",
"]\n",
"\n",
"embeddings = hf.embed_many(sentences)\n"
"embeddings = hf.embed_many(sentences, as_buffer=True)\n"
]
},
{
Expand Down Expand Up @@ -183,7 +183,7 @@
"# the vector is stored as a bytes buffer\n",
"\n",
"data = [{\"text\": t,\n",
" \"embedding\": array_to_buffer(v)}\n",
" \"embedding\": v}\n",
" for t, v in zip(sentences, embeddings)]\n",
"\n",
"index.load(data)"
Expand Down
27 changes: 21 additions & 6 deletions redisvl/vectorize/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Callable, Dict, List, Optional

from redisvl.utils.utils import array_to_buffer


class BaseVectorizer:
def __init__(self, model: str, dims: int, api_config: Optional[Dict] = None):
Expand All @@ -21,27 +23,35 @@ def set_model(self, model: str, dims: Optional[int] = None) -> None:

def embed_many(
self,
inputs: List[str],
texts: List[str],
preprocess: Optional[Callable] = None,
chunk_size: int = 1000,
batch_size: Optional[int] = 1000,
as_buffer: Optional[bool] = False,
) -> List[List[float]]:
raise NotImplementedError

def embed(
self, emb_input: str, preprocess: Optional[Callable] = None
self,
text: str,
preprocess: Optional[Callable] = None,
as_buffer: Optional[bool] = False,
) -> List[float]:
raise NotImplementedError

async def aembed_many(
self,
inputs: List[str],
texts: List[str],
preprocess: Optional[Callable] = None,
chunk_size: int = 1000,
batch_size: Optional[int] = 1000,
as_buffer: Optional[bool] = False,
) -> List[List[float]]:
raise NotImplementedError

async def aembed(
self, emb_input: str, preprocess: Optional[Callable] = None
self,
text: str,
preprocess: Optional[Callable] = None,
as_buffer: Optional[bool] = False,
) -> List[float]:
raise NotImplementedError

Expand All @@ -51,3 +61,8 @@ def batchify(self, seq: list, size: int, preprocess: Optional[Callable] = None):
yield [preprocess(chunk) for chunk in seq[pos : pos + size]]
else:
yield seq[pos : pos + size]

def _process_embedding(self, embedding: List[float], as_buffer: bool):
if as_buffer:
return array_to_buffer(embedding)
return embedding
55 changes: 46 additions & 9 deletions redisvl/vectorize/text/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


class HFTextVectorizer(BaseVectorizer):
# TODO - add docstring
def __init__(self, model: str, api_config: Optional[Dict] = None):
# TODO set dims based on model
dims = 768
Expand All @@ -18,21 +19,57 @@ def __init__(self, model: str, api_config: Optional[Dict] = None):
self._model_client = SentenceTransformer(model)

def embed(
self, emb_input: str, preprocess: Optional[Callable] = None
self,
text: str,
preprocess: Optional[Callable] = None,
as_buffer: Optional[float] = False,
) -> List[float]:
"""Embed a chunk of text using the Hugging Face sentence transformer.
Args:
text (str): Chunk of text to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[float]: Embedding.
"""
if preprocess:
emb_input = preprocess(emb_input)
embedding = self._model_client.encode([emb_input])[0]
return embedding.tolist()
text = preprocess(text)
embedding = self._model_client.encode([text])[0]
return self._process_embedding(embedding.tolist(), as_buffer)

def embed_many(
self,
inputs: List[str],
texts: List[str],
preprocess: Optional[Callable] = None,
chunk_size: int = 1000,
batch_size: int = 1000,
as_buffer: Optional[float] = None,
) -> List[List[float]]:
embeddings = []
for batch in self.batchify(inputs, chunk_size, preprocess):
"""Asynchronously embed many chunks of texts using the Hugging Face sentence
transformer.
Args:
texts (List[str]): List of text chunks to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
batch_size (int, optional): Batch size of texts to use when creating
embeddings. Defaults to 10.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[List[float]]: List of embeddings.
"""
embeddings: List = []
for batch in self.batchify(texts, batch_size, preprocess):
batch_embeddings = self._model_client.encode(batch)
embeddings.extend([embedding.tolist() for embedding in batch_embeddings])
embeddings.extend(
[
self._process_embedding(embedding.tolist(), as_buffer)
for embedding in batch_embeddings
]
)
return embeddings
117 changes: 97 additions & 20 deletions redisvl/vectorize/text/openai.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from typing import Callable, Dict, List, Optional

from tenacity import ( # for exponential backoff
retry,
stop_after_attempt,
wait_random_exponential,
)

from redisvl.vectorize.base import BaseVectorizer


class OpenAITextVectorizer(BaseVectorizer):
# TODO - add docstring
def __init__(self, model: str, api_config: Optional[Dict] = None):
dims = 1536
super().__init__(model, dims, api_config)
Expand All @@ -18,42 +25,112 @@ def __init__(self, model: str, api_config: Optional[Dict] = None):
openai.api_key = api_config.get("api_key", None)
self._model_client = openai.Embedding

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def embed_many(
self,
inputs: List[str],
texts: List[str],
preprocess: Optional[Callable] = None,
chunk_size: int = 1000,
batch_size: Optional[int] = 10,
as_buffer: Optional[float] = False,
) -> List[List[float]]:
results = []
for batch in self.batchify(inputs, chunk_size, preprocess):
"""Embed many chunks of texts using the OpenAI API.
Args:
texts (List[str]): List of text chunks to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
batch_size (int, optional): Batch size of texts to use when creating
embeddings. Defaults to 10.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[List[float]]: List of embeddings.
"""
embeddings: List = []
for batch in self.batchify(texts, batch_size, preprocess):
response = self._model_client.create(input=batch, engine=self._model)
results += [r["embedding"] for r in response["data"]]
return results
embeddings += [
self._process_embedding(r["embedding"], as_buffer)
for r in response["data"]
]
return embeddings

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def embed(
self, emb_input: str, preprocess: Optional[Callable] = None
self,
text: str,
preprocess: Optional[Callable] = None,
as_buffer: Optional[float] = False,
) -> List[float]:
"""Embed a chunk of text using the OpenAI API.
Args:
text (str): Chunk of text to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[float]: Embedding.
"""
if preprocess:
emb_input = preprocess(emb_input)
result = self._model_client.create(input=[emb_input], engine=self._model)
return result["data"][0]["embedding"]
text = preprocess(text)
result = self._model_client.create(input=[text], engine=self._model)
return self._process_embedding(result["data"][0]["embedding"], as_buffer)

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
async def aembed_many(
self,
inputs: List[str],
texts: List[str],
preprocess: Optional[Callable] = None,
chunk_size: int = 1000,
batch_size: int = 1000,
as_buffer: Optional[bool] = False,
) -> List[List[float]]:
results = []
for batch in self.batchify(inputs, chunk_size, preprocess):
"""Asynchronously embed many chunks of texts using the OpenAI API.
Args:
texts (List[str]): List of text chunks to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
batch_size (int, optional): Batch size of texts to use when creating
embeddings. Defaults to 10.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[List[float]]: List of embeddings.
"""
embeddings: List = []
for batch in self.batchify(texts, batch_size, preprocess):
response = await self._model_client.acreate(input=batch, engine=self._model)
results += [r["embedding"] for r in response["data"]]
return results
embeddings += [
self._process_embedding(r["embedding"], as_buffer)
for r in response["data"]
]
return embeddings

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
async def aembed(
self, emb_input: str, preprocess: Optional[Callable] = None
self,
text: str,
preprocess: Optional[Callable] = None,
as_buffer: Optional[bool] = False,
) -> List[float]:
"""Asynchronously embed a chunk of text using the OpenAI API.
Args:
text (str): Chunk of text to embed.
preprocess (Optional[Callable], optional): Optional preprocessing callable to
perform before vectorization. Defaults to None.
as_buffer (Optional[float], optional): Whether to convert the raw embedding
to a byte string. Defaults to False.
Returns:
List[float]: Embedding.
"""
if preprocess:
emb_input = preprocess(emb_input)
result = await self._model_client.acreate(input=[emb_input], engine=self._model)
return result["data"][0]["embedding"]
text = preprocess(text)
result = await self._model_client.acreate(input=[text], engine=self._model)
return self._process_embedding(result["data"][0]["embedding"], as_buffer)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ numpy
redis>=4.3.4
pyyaml
coloredlogs
pydantic>=2.0.0
pydantic>=2.0.0
tenacity==8.2.2

0 comments on commit 4877f8b

Please sign in to comment.