-
Notifications
You must be signed in to change notification settings - Fork 12
/
DecoratingMessageProcessor.java
105 lines (96 loc) · 4.45 KB
/
DecoratingMessageProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.jashmore.sqs.processor;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.decorator.MessageProcessingContext;
import com.jashmore.sqs.decorator.MessageProcessingDecorator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.sqs.model.Message;
/**
* {@link MessageProcessor} that will decorate the processing of the message using the supplied {@link MessageProcessingDecorator}s.
*/
@Slf4j
public class DecoratingMessageProcessor implements MessageProcessor {
private final String listenerIdentifier;
private final QueueProperties queueProperties;
private final List<MessageProcessingDecorator> decorators;
private final MessageProcessor delegate;
public DecoratingMessageProcessor(
final String listenerIdentifier,
final QueueProperties queueProperties,
final List<MessageProcessingDecorator> decorators,
final MessageProcessor delegate
) {
this.listenerIdentifier = listenerIdentifier;
this.queueProperties = queueProperties;
this.decorators = decorators;
this.delegate = delegate;
}
@Override
public CompletableFuture<?> processMessage(final Message message, final Supplier<CompletableFuture<?>> resolveMessageCallback)
throws MessageProcessingException {
final MessageProcessingContext context = MessageProcessingContext
.builder()
.listenerIdentifier(listenerIdentifier)
.queueProperties(queueProperties)
.attributes(new HashMap<>())
.build();
decorators.forEach(decorator -> {
try {
decorator.onPreMessageProcessing(context, message);
} catch (RuntimeException runtimeException) {
throw new MessageProcessingException(runtimeException);
}
});
try {
final Supplier<CompletableFuture<?>> wrappedResolveMessageCallback = () -> {
safelyRun(decorators, decorator -> decorator.onMessageResolve(context, message));
return resolveMessageCallback
.get()
.whenComplete((returnValue, throwable) -> {
if (throwable != null) {
safelyRun(decorators, decorator -> decorator.onMessageResolvedFailure(context, message, throwable));
} else {
safelyRun(decorators, decorator -> decorator.onMessageResolvedSuccess(context, message));
}
});
};
return delegate
.processMessage(message, wrappedResolveMessageCallback)
.whenComplete((returnValue, throwable) -> {
if (throwable != null) {
safelyRun(decorators, decorator -> decorator.onMessageProcessingFailure(context, message, throwable));
} else {
safelyRun(decorators, decorator -> decorator.onMessageProcessingSuccess(context, message, returnValue));
}
});
} catch (RuntimeException runtimeException) {
safelyRun(decorators, decorator -> decorator.onMessageProcessingFailure(context, message, runtimeException));
throw runtimeException;
} finally {
safelyRun(decorators, decorator -> decorator.onMessageProcessingThreadComplete(context, message));
}
}
/**
* Used to run the {@link MessageProcessingDecorator} methods for each of the decorators, completing all regardless of whether a previous decorator
* failed.
*
* @param messageProcessingDecorators the decorators to consume
* @param decoratorConsumer the consumer method that would be used to run one of the decorator methods
*/
private void safelyRun(
final List<MessageProcessingDecorator> messageProcessingDecorators,
final Consumer<MessageProcessingDecorator> decoratorConsumer
) {
messageProcessingDecorators.forEach(decorator -> {
try {
decoratorConsumer.accept(decorator);
} catch (RuntimeException runtimeException) {
log.error("Error processing decorator: " + decorator.getClass().getSimpleName(), runtimeException);
}
});
}
}