Skip to content

Commit

Permalink
[feat] Add skip_updated_at_modification for indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Apr 20, 2024
1 parent 8d14c9c commit e2e1e8c
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 17 deletions.
23 changes: 21 additions & 2 deletions aiosumma/aiosumma/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ def prepare_query(query):
return query


def documents_portion_iter(index_name: str, documents: Iterable, bulk_size: int, conflict_strategy: Optional[str] = None):
def documents_portion_iter(
index_name: str,
documents: Iterable,
bulk_size: int,
conflict_strategy: Optional[str] = None,
skip_updated_at_modification: bool = False,
):
documents_portion = []
for document in documents:
documents_portion.append(document)
Expand All @@ -59,13 +65,15 @@ def documents_portion_iter(index_name: str, documents: Iterable, bulk_size: int,
index_name=index_name,
documents=documents_portion,
conflict_strategy=conflict_strategy,
skip_updated_at_modification=skip_updated_at_modification,
)
documents_portion = []
if documents_portion:
yield index_service_pb.IndexDocumentStreamRequest(
index_name=index_name,
documents=documents_portion,
conflict_strategy=conflict_strategy,
skip_updated_at_modification=skip_updated_at_modification,
)

class SummaClient(BaseGrpcClient):
Expand Down Expand Up @@ -505,6 +513,7 @@ async def index_document_stream(
documents: Union[Iterable[str], str] = None,
conflict_strategy: Optional[str] = None,
bulk_size: int = 100,
skip_updated_at_modification: bool = False,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> index_service_pb.IndexDocumentStreamResponse:
Expand All @@ -516,6 +525,7 @@ async def index_document_stream(
documents: list of bytes
conflict_strategy: recommended to set to DoNothing for large updates and maintain uniqueness in your application
bulk_size: document portion size to send
skip_updated_at_modification: skip modifying of updated at attribute of document
request_id: request id
session_id: session id
"""
Expand All @@ -530,7 +540,13 @@ def documents_iter():
documents = documents_iter()

return await self.stubs['index_api'].index_document_stream(
documents_portion_iter(index_name, documents, bulk_size, conflict_strategy=conflict_strategy),
documents_portion_iter(
index_name,
documents,
bulk_size,
conflict_strategy=conflict_strategy,
skip_updated_at_modification=skip_updated_at_modification,
),
metadata=setup_metadata(session_id, request_id),
)

Expand All @@ -539,6 +555,7 @@ async def index_document(
self,
index_name: str,
document: Union[dict, bytes, str],
skip_updated_at_modification: bool = False,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> index_service_pb.IndexDocumentResponse:
Expand All @@ -548,13 +565,15 @@ async def index_document(
Args:
index_name: index name
document: bytes
skip_updated_at_modification: bool
request_id: request id
session_id: session id
"""
return await self.stubs['index_api'].index_document(
index_service_pb.IndexDocumentRequest(
index_name=index_name,
document=json.dumps(document) if isinstance(document, dict) else document,
skip_updated_at_modification=skip_updated_at_modification,
),
metadata=setup_metadata(session_id, request_id),
)
Expand Down
2 changes: 1 addition & 1 deletion summa-core/src/components/index_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ impl IndexHolder {
/// Index generic `SummaDocument`
///
/// `IndexUpdater` bounds unbounded `SummaDocument` inside
pub async fn index_document(&self, document: SummaDocument<'_>) -> SummaResult<()> {
pub async fn index_document(&self, document: SummaDocument<'_>, skip_updated_at_modification: bool) -> SummaResult<()> {
let document = document.bound_with(&self.index.schema()).try_into()?;
self.index_writer_holder()?.read().await.index_document(document, self.conflict_strategy())
}
Expand Down
20 changes: 11 additions & 9 deletions summa-core/src/components/summa_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ pub enum DocumentParsingError {
ValueError(String, ValueParsingError),
}

pub fn process_dynamic_fields(schema: &Schema, json_object: &mut serde_json::Map<String, JsonValue>) {
pub fn process_dynamic_fields(schema: &Schema, json_object: &mut serde_json::Map<String, JsonValue>, skip_updated_at_modification: bool) {
if schema.get_field("page_rank").is_ok() && schema.get_field("quantized_page_rank").is_ok() {
if let Some(page_rank_value) = json_object.get_mut("page_rank") {
if let Some(v) = page_rank_value.as_f64() {
json_object.insert("quantized_page_rank".to_string(), json!(quantize_page_rank(v)));
}
}
}
if schema.get_field("updated_at").is_ok() {
if schema.get_field("updated_at").is_ok() && !skip_updated_at_modification {
json_object.insert("updated_at".to_string(), json!(current_time()));
}
}
Expand Down Expand Up @@ -189,10 +189,10 @@ impl<'a> SummaDocument<'a> {
}

/// Build a document object from a json-object.
pub fn parse_and_setup_document(&self, schema: &Schema, doc_json: &str) -> SummaResult<TantivyDocument> {
pub fn parse_and_setup_document(&self, schema: &Schema, doc_json: &str, skip_updated_at_modification: bool) -> SummaResult<TantivyDocument> {
let mut json_obj: serde_json::Map<String, JsonValue> =
serde_json::from_str(doc_json).map_err(|_| DocumentParsingError::InvalidJson(doc_json.to_owned()))?;
process_dynamic_fields(schema, &mut json_obj);
process_dynamic_fields(schema, &mut json_obj, skip_updated_at_modification);
self.json_object_to_doc(schema, json_obj)
}

Expand Down Expand Up @@ -223,18 +223,20 @@ impl<'a> SummaDocument<'a> {
}
Ok(doc)
}

pub fn parse_json_bytes(&self, schema: &Schema, json_bytes: &[u8], skip_updated_at_modification: bool) -> SummaResult<TantivyDocument> {
let text_document = from_utf8(json_bytes).map_err(ValidationError::Utf8)?;
let parsed_document = self.parse_and_setup_document(schema, text_document, skip_updated_at_modification)?;
Ok(parsed_document)
}
}

impl<'a> TryInto<TantivyDocument> for SummaDocument<'a> {
type Error = Error;

fn try_into(self) -> SummaResult<TantivyDocument> {
match self {
SummaDocument::BoundJsonBytes((schema, json_bytes)) => {
let text_document = from_utf8(json_bytes).map_err(ValidationError::Utf8)?;
let parsed_document = self.parse_and_setup_document(schema, text_document)?;
Ok(parsed_document)
}
SummaDocument::BoundJsonBytes((schema, json_bytes)) => self.parse_json_bytes(schema, json_bytes, false),
SummaDocument::UnboundJsonBytes(_) => Err(Error::UnboundDocument),
SummaDocument::TantivyDocument(document) => Ok(document),
}
Expand Down
2 changes: 2 additions & 0 deletions summa-proto/proto/index_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ message IndexDocumentStreamRequest {
string index_name = 1;
repeated bytes documents = 2;
optional ConflictStrategy conflict_strategy = 3;
bool skip_updated_at_modification = 4;
}

message IndexDocumentStreamResponse {
Expand All @@ -237,6 +238,7 @@ message IndexDocumentStreamResponse {
message IndexDocumentRequest {
string index_name = 1;
bytes document = 2;
bool skip_updated_at_modification = 3;
}

message IndexDocumentResponse {}
Expand Down
2 changes: 1 addition & 1 deletion summa-server/src/apis/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl proto::index_api_server::IndexApi for IndexApiImpl {
self.index_service
.get_index_holder(&proto_request.index_name)
.await?
.index_document(SummaDocument::UnboundJsonBytes(&proto_request.document))
.index_document(SummaDocument::UnboundJsonBytes(&proto_request.document), proto_request.skip_updated_at_modification)
.await
.map_err(crate::errors::Error::from)?;
let response = proto::IndexDocumentResponse {};
Expand Down
6 changes: 3 additions & 3 deletions summa-server/src/services/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,10 +878,10 @@ pub(crate) mod tests {
.await?;

for d in generate_documents(index_holder.schema(), 1000) {
index_holder.index_document(d).await?;
index_holder.index_document(d, false).await?;
}
index_holder
.index_document(generate_unique_document(index_holder.schema(), "testtitle"))
.index_document(generate_unique_document(index_holder.schema(), "testtitle"), false)
.await?;
index_service.commit(&index_holder, false).await?;

Expand Down Expand Up @@ -922,7 +922,7 @@ pub(crate) mod tests {
let mut rng = SmallRng::seed_from_u64(42);
for _ in 0..4 {
for d in generate_documents_with_doc_id_gen_and_rng(AtomicI64::new(1), &mut rng, &schema, 300) {
index_holder.index_document(d).await?;
index_holder.index_document(d, false).await?;
}
index_service.commit(&index_holder, false).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion summa-wasm/crate/web_index_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl WrappedIndexRegistry {
pub async fn index_document(&self, index_name: &str, document: &str) -> Result<(), JsValue> {
let index_holder = self.index_registry.get_index_holder_by_name(index_name).await.map_err(Error::from)?;
index_holder
.index_document(SummaDocument::UnboundJsonBytes(document.as_bytes()))
.index_document(SummaDocument::UnboundJsonBytes(document.as_bytes()), false)
.await
.map_err(Error::from)?;
Ok(())
Expand Down

0 comments on commit e2e1e8c

Please sign in to comment.