Skip to content

Commit

Permalink
Add batch endpoint rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Sep 18, 2024
1 parent e8570d9 commit e7718ac
Show file tree
Hide file tree
Showing 30 changed files with 398 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.api;

import com.comet.opik.api.validate.DatasetItemBatchValidation;
import com.comet.opik.infrastructure.ratelimit.RateEventContainer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
Expand All @@ -26,6 +27,12 @@ public record DatasetItemBatch(
DatasetItem.View.Write.class}) @Pattern(regexp = NULL_OR_NOT_BLANK, message = "must not be blank") @Schema(description = "If null, dataset_id must be provided") String datasetName,
@JsonView({
DatasetItem.View.Write.class}) @Schema(description = "If null, dataset_name must be provided") UUID datasetId,
@JsonView({DatasetItem.View.Write.class}) @NotNull @Size(min = 1, max = 1000) @Valid List<DatasetItem> items){
@JsonView({DatasetItem.View.Write.class}) @NotNull @Size(min = 1, max = 1000) @Valid List<DatasetItem> items)
implements
RateEventContainer{

@Override
public long eventCount() {
return items.size();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.comet.opik.api;

import com.comet.opik.infrastructure.ratelimit.RateEventContainer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
Expand All @@ -16,5 +17,12 @@
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ExperimentItemsBatch(
@JsonView( {
ExperimentItem.View.Write.class}) @NotNull @Size(min = 1, max = 1000) @Valid Set<ExperimentItem> experimentItems){
ExperimentItem.View.Write.class}) @NotNull @Size(min = 1, max = 1000) @Valid Set<ExperimentItem> experimentItems)
implements
RateEventContainer{

@Override
public long eventCount() {
return experimentItems.size();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.comet.opik.api;

import com.comet.opik.infrastructure.ratelimit.RateEventContainer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
Expand All @@ -13,6 +14,11 @@
@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record FeedbackScoreBatch(@NotNull @Size(min = 1, max = 1000) @Valid List<FeedbackScoreBatchItem> scores) {
public record FeedbackScoreBatch(
@NotNull @Size(min = 1, max = 1000) @Valid List<FeedbackScoreBatchItem> scores) implements RateEventContainer {

@Override
public long eventCount() {
return scores.size();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.comet.opik.api;

import com.comet.opik.infrastructure.ratelimit.RateEventContainer;
import com.fasterxml.jackson.annotation.JsonView;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
Expand All @@ -10,5 +11,10 @@

@Builder(toBuilder = true)
public record SpanBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Span.View.Write.class}) @Valid List<Span> spans){
Span.View.Write.class}) @Valid List<Span> spans) implements RateEventContainer{

@Override
public long eventCount() {
return spans.size();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.comet.opik.api;

import com.comet.opik.infrastructure.ratelimit.RateEventContainer;
import com.fasterxml.jackson.annotation.JsonView;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
Expand All @@ -10,5 +11,10 @@

@Builder(toBuilder = true)
public record TraceBatch(@NotNull @Size(min = 1, max = 1000) @JsonView( {
Trace.View.Write.class}) @Valid List<Trace> traces){
Trace.View.Write.class}) @Valid List<Trace> traces) implements RateEventContainer{

@Override
public long eventCount() {
return traces.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public Response updateDataset(@PathParam("id") UUID id,
@Operation(operationId = "deleteDataset", summary = "Delete dataset by id", description = "Delete dataset by id", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
})
@RateLimited
public Response deleteDataset(@PathParam("id") UUID id) {

String workspaceId = requestContext.get().getWorkspaceId();
Expand All @@ -190,7 +189,6 @@ public Response deleteDataset(@PathParam("id") UUID id) {
@Operation(operationId = "deleteDatasetByName", summary = "Delete dataset by name", description = "Delete dataset by name", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
})
@RateLimited
public Response deleteDatasetByName(
@RequestBody(content = @Content(schema = @Schema(implementation = DatasetIdentifier.class))) @NotNull @Valid DatasetIdentifier identifier) {

Expand Down Expand Up @@ -383,7 +381,6 @@ public Response createDatasetItems(
@Operation(operationId = "deleteDatasetItems", summary = "Delete dataset items", description = "Delete dataset items", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
})
@RateLimited
public Response deleteDatasetItems(
@RequestBody(content = @Content(schema = @Schema(implementation = DatasetItemsDelete.class))) @NotNull @Valid DatasetItemsDelete request) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ public Response createExperimentItems(
@Operation(operationId = "deleteExperimentItems", summary = "Delete experiment items", description = "Delete experiment items", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
})
@RateLimited
public Response deleteExperimentItems(
@RequestBody(content = @Content(schema = @Schema(implementation = ExperimentItemsDelete.class))) @NotNull @Valid ExperimentItemsDelete request) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public Response update(final @PathParam("id") UUID id,
@Operation(operationId = "deleteFeedbackDefinitionById", summary = "Delete feedback definition by id", description = "Delete feedback definition by id", responses = {
@ApiResponse(responseCode = "204", description = "No Content")
})
@RateLimited
public Response deleteById(@PathParam("id") UUID id) {

String workspaceId = requestContext.get().getWorkspaceId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ public Response update(@PathParam("id") UUID id,
@ApiResponse(responseCode = "204", description = "No Content"),
@ApiResponse(responseCode = "409", description = "Conflict", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))
})
@RateLimited
public Response deleteById(@PathParam("id") UUID id) {

String workspaceId = requestContext.get().getWorkspaceId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public Response update(@PathParam("id") UUID id,
@Operation(operationId = "deleteSpanById", summary = "Delete span by id", description = "Delete span by id", responses = {
@ApiResponse(responseCode = "501", description = "Not implemented"),
@ApiResponse(responseCode = "204", description = "No Content")})
@RateLimited
public Response deleteById(@PathParam("id") @NotNull String id) {

log.info("Deleting span with id '{}' on workspaceId '{}'", id, requestContext.get().getWorkspaceId());
Expand Down Expand Up @@ -225,7 +224,6 @@ public Response addSpanFeedbackScore(@PathParam("id") UUID id,
@Path("/{id}/feedback-scores/delete")
@Operation(operationId = "deleteSpanFeedbackScore", summary = "Delete span feedback score", description = "Delete span feedback score", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
@RateLimited
public Response deleteSpanFeedbackScore(@PathParam("id") UUID id,
@RequestBody(content = @Content(schema = @Schema(implementation = DeleteFeedbackScore.class))) @NotNull @Valid DeleteFeedbackScore score) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public Response update(@PathParam("id") UUID id,
@Path("{id}")
@Operation(operationId = "deleteTraceById", summary = "Delete trace by id", description = "Delete trace by id", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
@RateLimited
public Response deleteById(@PathParam("id") UUID id) {

log.info("Deleting trace with id '{}'", id);
Expand All @@ -216,7 +215,6 @@ public Response deleteById(@PathParam("id") UUID id) {
@Path("/delete")
@Operation(operationId = "deleteTraces", summary = "Delete traces", description = "Delete traces", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
@RateLimited
public Response deleteTraces(
@RequestBody(content = @Content(schema = @Schema(implementation = TracesDelete.class))) @NotNull @Valid TracesDelete request) {
log.info("Deleting traces, count '{}'", request.ids().size());
Expand Down Expand Up @@ -252,7 +250,6 @@ public Response addTraceFeedbackScore(@PathParam("id") UUID id,
@Path("/{id}/feedback-scores/delete")
@Operation(operationId = "deleteTraceFeedbackScore", summary = "Delete trace feedback score", description = "Delete trace feedback score", responses = {
@ApiResponse(responseCode = "204", description = "No Content")})
@RateLimited
public Response deleteTraceFeedbackScore(@PathParam("id") UUID id,
@RequestBody(content = @Content(schema = @Schema(implementation = DeleteFeedbackScore.class))) @NotNull @Valid DeleteFeedbackScore score) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.comet.opik.api.error.IdentifierMismatchException;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.infrastructure.redis.LockService;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.utils.WorkspaceUtils;
import com.google.inject.ImplementedBy;
import com.google.inject.Singleton;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.error.IdentifierMismatchException;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.redis.LockService;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.utils.WorkspaceUtils;
import com.google.common.base.Preconditions;
import com.newrelic.api.agent.Trace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.comet.opik.api.error.IdentifierMismatchException;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.db.TransactionTemplate;
import com.comet.opik.infrastructure.redis.LockService;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.utils.AsyncUtils;
import com.comet.opik.utils.WorkspaceUtils;
import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.comet.opik.infrastructure.AuthenticationConfig;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.redis.LockService;
import com.comet.opik.infrastructure.lock.LockService;
import com.google.common.base.Preconditions;
import com.google.inject.Provides;
import jakarta.inject.Provider;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.comet.opik.infrastructure.auth;

import com.comet.opik.domain.ProjectService;
import com.comet.opik.infrastructure.redis.LockService;
import com.comet.opik.infrastructure.lock.LockService;
import jakarta.inject.Provider;
import jakarta.ws.rs.ClientErrorException;
import jakarta.ws.rs.client.Client;
Expand All @@ -22,7 +22,7 @@

import static com.comet.opik.infrastructure.AuthenticationConfig.UrlConfig;
import static com.comet.opik.infrastructure.auth.AuthCredentialsCacheService.AuthCredentials;
import static com.comet.opik.infrastructure.redis.LockService.Lock;
import static com.comet.opik.infrastructure.lock.LockService.Lock;

@RequiredArgsConstructor
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.comet.opik.infrastructure.redis;
package com.comet.opik.infrastructure.lock;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.comet.opik.infrastructure.ratelimit;

public interface RateEventContainer {

long eventCount();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.comet.opik.infrastructure.RateLimitConfig;
import com.comet.opik.infrastructure.auth.RequestContext;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import jakarta.inject.Provider;
import jakarta.ws.rs.ClientErrorException;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -31,23 +32,26 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
return invocation.proceed();
}

RateLimited rateLimit = method.getAnnotation(RateLimited.class);
String bucket = rateLimit.value();

if (!rateLimitConfig.isEnabled()) {
return invocation.proceed();
}

RateLimited rateLimit = method.getAnnotation(RateLimited.class);
String bucket = rateLimit.value();

// Check if the bucket is the general events bucket
if (bucket.equals(RateLimited.GENERAL_EVENTS)) {

Object body = getParameters(invocation);
long events = body instanceof RateEventContainer container ? container.eventCount() : 1;

long limit = rateLimitConfig.getGeneralEvents().limit();
long limitDurationInSeconds = rateLimitConfig.getGeneralEvents().durationInSeconds();
String apiKey = requestContext.get().getApiKey();

// Check if the rate limit is exceeded
Boolean limitExceeded = rateLimitService.get()
.isLimitExceeded(apiKey, bucket, limit, limitDurationInSeconds)
.isLimitExceeded(apiKey, events, bucket, limit, limitDurationInSeconds)
.block();

if (Boolean.TRUE.equals(limitExceeded)) {
Expand All @@ -57,20 +61,31 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
try {
return invocation.proceed();
} catch (Exception ex) {
decreaseLimitInCaseOfError(bucket);
decreaseLimitInCaseOfError(bucket, events);
throw ex;
}
}

return invocation.proceed();
}

private void decreaseLimitInCaseOfError(String bucket) {
private Object getParameters(MethodInvocation method) {

for (int i = 0; i < method.getArguments().length; i++) {
if (method.getMethod().getParameters()[i].isAnnotationPresent(RequestBody.class)) {
return method.getArguments()[i];
}
}

return null;
}

private void decreaseLimitInCaseOfError(String bucket, Long events) {
try {
Mono.deferContextual(context -> {
String apiKey = context.get(RequestContext.API_KEY);

return rateLimitService.get().decrement(apiKey, bucket);
return rateLimitService.get().decrement(apiKey, bucket, events);
}).subscribe();
} catch (Exception ex) {
log.warn("Failed to decrement rate limit", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

public interface RateLimitService {

Mono<Boolean> isLimitExceeded(String apiKey, String bucketName, long limit, long limitDurationInSeconds);
Mono<Boolean> isLimitExceeded(String apiKey, long events, String bucketName, long limit,
long limitDurationInSeconds);

Mono<Void> decrement(String apiKey, String bucketName);
Mono<Void> decrement(String apiKey, String bucketName, long events);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
String GENERAL_EVENTS = "general_events";

String value() default GENERAL_EVENTS; // bucket capacity
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.comet.opik.infrastructure.DistributedLockConfig;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.RedisConfig;
import com.comet.opik.infrastructure.lock.LockService;
import com.comet.opik.infrastructure.ratelimit.RateLimitService;
import com.google.inject.Provides;
import jakarta.inject.Singleton;
Expand Down
Loading

0 comments on commit e7718ac

Please sign in to comment.