-
Notifications
You must be signed in to change notification settings - Fork 12
/
LambdaMessageProcessor.java
151 lines (135 loc) · 6.34 KB
/
LambdaMessageProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.jashmore.sqs.processor;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.argument.visibility.DefaultVisibilityExtender;
import com.jashmore.sqs.processor.argument.Acknowledge;
import com.jashmore.sqs.processor.argument.VisibilityExtender;
import com.jashmore.sqs.util.concurrent.CompletableFutureUtils;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
/**
* {@link MessageProcessor} that takes a lambda/function for synchronous processing of a message.
*/
@Slf4j
public class LambdaMessageProcessor implements MessageProcessor {
private final SqsAsyncClient sqsAsyncClient;
private final QueueProperties queueProperties;
private final boolean usesAcknowledgeParameter;
private final MessageProcessingFunction messageProcessingFunction;
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message and return the future
*/
public LambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final Consumer<Message> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = false;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.accept(message);
}
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message and acknowledge
*/
public LambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final BiConsumer<Message, Acknowledge> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = true;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.accept(message, acknowledge);
}
/**
* Constructor.
*
* <p>As Java generics has type erasure and will convert <code>BiFunction<A, B, C></code> to <code>BiFunction</code> we need to change
* the type signature to distinguish the function that consumes a message and an acknowledge compared to the function that consumes a message
* and a visibility extender. As the visibility extender use case seems less common, this one has the unused parameter.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param ignoredForTypeErasure field needed due to type erasure
* @param messageProcessor the function to consume a message and visibility extender and return the future
*/
public LambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
@SuppressWarnings("unused") final boolean ignoredForTypeErasure,
final BiConsumer<Message, VisibilityExtender> messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = false;
this.messageProcessingFunction = (message, acknowledge, visibilityExtender) -> messageProcessor.accept(message, visibilityExtender);
}
/**
* Constructor.
*
* @param sqsAsyncClient the client to communicate with SQS
* @param queueProperties the properties of the queue
* @param messageProcessor the function to consume a message, acknowledge and visibility extender and return the future
*/
public LambdaMessageProcessor(
final SqsAsyncClient sqsAsyncClient,
final QueueProperties queueProperties,
final MessageProcessingFunction messageProcessor
) {
this.sqsAsyncClient = sqsAsyncClient;
this.queueProperties = queueProperties;
this.usesAcknowledgeParameter = true;
this.messageProcessingFunction = messageProcessor;
}
@Override
public CompletableFuture<?> processMessage(Message message, Supplier<CompletableFuture<?>> resolveMessageCallback) {
final Acknowledge acknowledge = resolveMessageCallback::get;
final VisibilityExtender visibilityExtender = new DefaultVisibilityExtender(sqsAsyncClient, queueProperties, message);
try {
messageProcessingFunction.processMessage(message, acknowledge, visibilityExtender);
} catch (final MessageProcessingException messageProcessingException) {
return CompletableFutureUtils.completedExceptionally(messageProcessingException);
} catch (final RuntimeException runtimeException) {
return CompletableFutureUtils.completedExceptionally(new MessageProcessingException(runtimeException));
}
if (usesAcknowledgeParameter) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture
.completedFuture(null)
.thenAccept(ignored -> {
try {
resolveMessageCallback
.get()
.handle((i, throwable) -> {
if (throwable != null) {
log.error("Error resolving successfully processed message", throwable);
}
return null;
});
} catch (final RuntimeException runtimeException) {
log.error("Failed to trigger message resolving", runtimeException);
}
});
}
/**
* Represents a message processing function that consumes the {@link Message}, {@link Acknowledge} and {@link VisibilityExtender}.
*/
@FunctionalInterface
public interface MessageProcessingFunction {
void processMessage(Message message, Acknowledge acknowledge, VisibilityExtender visibilityExtender);
}
}