-
Notifications
You must be signed in to change notification settings - Fork 12
/
AsyncLambdaMessageProcessor.java
155 lines (138 loc) · 6.52 KB
/
AsyncLambdaMessageProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.jashmore.sqs.processor;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.argument.visibility.DefaultVisibilityExtender;
import com.jashmore.sqs.processor.argument.Acknowledge;
import com.jashmore.sqs.processor.argument.VisibilityExtender;
import com.jashmore.sqs.util.concurrent.CompletableFutureUtils;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
/**
* {@link MessageProcessor} that takes a lambda/function for asynchronous processing of a message.
*/
@Slf4j
public class AsyncLambdaMessageProcessor implements MessageProcessor {
private final SqsAsyncClient sqsAsyncClient;
private final QueueProperties queueProperties;
private final MessageProcessingFunction messageProcessingFunction;
private final boolean usesAcknowledgeParameter;
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message and return the future
*/
public AsyncLambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final Function<Message, CompletableFuture<?>> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = false;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message);
}
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message and acknowledge and return the future
*/
public AsyncLambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final BiFunction<Message, Acknowledge, CompletableFuture<?>> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = true;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message, acknowledge);
}
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message, acknowledge and visibility extender and return the future
*/
public AsyncLambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final MessageProcessingFunction messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = true;
this.messageProcessingFunction = messageProcessor;
}
/**
* Constructor.
*
* <p>As Java generics has type erasure and will convert <code>BiFunction<A, B, C></code> to <code>BiFunction</code> we need to change
* the type signature to distinguish the function that consumes a message and an acknowledge compared to the function that consumes a message
* and a visibility extender. As the visibility extender use case seems less common, this one has the unused parameter.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param ignoredForTypeErasure field needed due to type erasure
* @param messageProcessor the function to consume a message and visibility extender and return the future
*/
public AsyncLambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
@SuppressWarnings("unused") final boolean ignoredForTypeErasure,
final BiFunction<Message, VisibilityExtender, CompletableFuture<?>> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = false;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.apply(message, visibilityExtender);
}
@Override
public CompletableFuture<?> processMessage(Message message, Supplier<CompletableFuture<?>> resolveMessageCallback) {
final Acknowledge acknowledge = resolveMessageCallback::get;
final VisibilityExtender visibilityExtender = new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message);
final CompletableFuture<?> result;
try {
result = messageProcessingFunction.processMessage(message, acknowledge, visibilityExtender);
} catch (RuntimeException runtimeException) {
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(runtimeException));
}
if (result == null) {
return CompletableFutureUtils.completedExceptionally(
new MessageProcessingException("Method returns CompletableFuture but null was returned")
);
}
if (usesAcknowledgeParameter) {
return result;
}
final Runnable resolveCallbackLoggingErrorsOnly = () -> {
try {
resolveMessageCallback
.get()
.handle((i, throwable) -> {
if (throwable != null) {
log.error("Error resolving successfully processed message", throwable);
}
return null;
});
} catch (RuntimeException runtimeException) {
log.error("Failed to trigger message resolving", runtimeException);
}
};
return result.thenAccept(ignored -> resolveCallbackLoggingErrorsOnly.run());
}
/**
* Represents a message processing function that consumes the {@link Message}, {@link Acknowledge} and {@link VisibilityExtender}.
*/
@FunctionalInterface
public interface MessageProcessingFunction {
CompletableFuture<?> processMessage(Message message, Acknowledge acknowledge, VisibilityExtender visibilityExtender);
}
}