Skip to content

Commit

Permalink
feat: Streaming with queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Sep 9, 2023
1 parent 040305b commit 98a736f
Show file tree
Hide file tree
Showing 20 changed files with 149 additions and 119 deletions.
44 changes: 22 additions & 22 deletions aiosumma/aiosumma/client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import asyncio
import sys
from typing import (
AsyncIterator,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
from typing import AsyncIterator, Dict, Iterable, List, Optional, Tuple, Union

import grpc
import orjson as json
from aiogrpcclient import (
BaseGrpcClient,
expose,
)
from aiogrpcclient import BaseGrpcClient, expose
from grpc import StatusCode
from grpc.experimental.aio import AioRpcError
from izihawa_utils.pb_to_json import ParseDict
Expand All @@ -29,10 +18,7 @@
from .proto.index_service_pb2_grpc import IndexApiStub
from .proto.reflection_service_pb2_grpc import ReflectionApiStub
from .proto.search_service_pb2_grpc import SearchApiStub
from .proto.utils_pb2 import ( # noqa
Asc,
Desc,
)
from .proto.utils_pb2 import Asc, Desc # noqa


def setup_metadata(session_id, request_id):
Expand All @@ -44,6 +30,22 @@ def setup_metadata(session_id, request_id):
return metadata


def prepare_search_request(search_request):
if isinstance(search_request, Dict):
dict_search_request = search_request
search_request = search_service_pb.SearchRequest()
ParseDict(dict_search_request, search_request)
return search_request


def prepare_query(query):
if isinstance(query, Dict):
dict_query = query
query = query_pb.Query()
ParseDict(dict_query, query)
return query


def documents_portion_iter(index_name: str, documents: Iterable, bulk_size: int, conflict_strategy: Optional[str] = None):
documents_portion = []
for document in documents:
Expand Down Expand Up @@ -459,6 +461,7 @@ async def get_indices_aliases(
async def documents(
self,
index_name: str,
query_filter: Optional[dict] = None,
fields: Optional[List[str]] = None,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
Expand All @@ -475,6 +478,7 @@ async def documents(
streaming_call = self.stubs['index_api'].documents(
index_service_pb.DocumentsRequest(
index_name=index_name,
query_filter=prepare_query(query_filter),
fields=fields,
),
metadata=setup_metadata(session_id, request_id),
Expand Down Expand Up @@ -563,12 +567,8 @@ async def search(
session_id: session id
"""
try:
if isinstance(search_request, Dict):
dict_search_request = search_request
search_request = search_service_pb.SearchRequest()
ParseDict(dict_search_request, search_request)
return await self.stubs['search_api'].search(
search_request,
prepare_search_request(search_request),
metadata=setup_metadata(session_id, request_id),
)
except AioRpcError as e:
Expand Down
1 change: 1 addition & 0 deletions aiosumma/aiosumma/proto/consumer_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aiosumma/aiosumma/proto/dag_pb_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 41 additions & 41 deletions aiosumma/aiosumma/proto/index_service_pb2.py

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions aiosumma/aiosumma/proto/index_service_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,14 @@ class SetIndexAliasResponse(_message.Message):
def __init__(self, old_index_name: _Optional[str] = ...) -> None: ...

class DocumentsRequest(_message.Message):
__slots__ = ["index_name", "fields"]
__slots__ = ["index_name", "fields", "query_filter"]
INDEX_NAME_FIELD_NUMBER: _ClassVar[int]
FIELDS_FIELD_NUMBER: _ClassVar[int]
QUERY_FILTER_FIELD_NUMBER: _ClassVar[int]
index_name: str
fields: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, index_name: _Optional[str] = ..., fields: _Optional[_Iterable[str]] = ...) -> None: ...
query_filter: _query_pb2.Query
def __init__(self, index_name: _Optional[str] = ..., fields: _Optional[_Iterable[str]] = ..., query_filter: _Optional[_Union[_query_pb2.Query, _Mapping]] = ...) -> None: ...

class DocumentsResponse(_message.Message):
__slots__ = ["document"]
Expand Down
2 changes: 1 addition & 1 deletion aiosumma/aiosumma/proto/query_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aiosumma/aiosumma/proto/reflection_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aiosumma/aiosumma/proto/search_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aiosumma/aiosumma/proto/unixfs_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aiosumma/aiosumma/proto/utils_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aiosumma/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "aiosumma"
version = "2.46.2"
version = "2.46.3"
authors = [{ name = "Pasha Podolsky", email = "ppodolsky@me.com" }]
description = "Async client for Summa Search"
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion summa-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "summa-core"
version = "0.19.2"
version = "0.19.3"
authors = ["Pasha Podolsky <ppodolsky@me.com>"]
edition = "2021"
license-file = "LICENSE"
Expand Down
Loading

0 comments on commit 98a736f

Please sign in to comment.