Skip to content

Commit

Permalink
spring-projectsGH-1352: RabbitMQ Stream: Map Remaining Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jun 23, 2021
1 parent 086822c commit bc099bd
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.lang.Nullable;

import com.rabbitmq.stream.MessageBuilder.PropertiesBuilder;
import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.Properties;

/**
* {@link MessageProperties} extension for stream messages.
Expand All @@ -34,6 +36,18 @@ public class StreamMessageProperties extends MessageProperties {

private transient Context context;

private String to;

private String subject;

private long creationTime;

private String groupId;

private long groupSequence;

private String replyToGroupId;

/**
* Create a new instance with the provided context.
* @param context the context.
Expand All @@ -51,4 +65,100 @@ public Context getContext() {
return this.context;
}

/**
* See {@link Properties#getTo()}.
* @return the to address.
*/
public String getTo() {
return this.to;
}

/**
* See {@link PropertiesBuilder#to(String)}.
* @param address the address.
*/
public void setTo(String address) {
this.to = address;
}

/**
* See {@link Properties#getSubject()}.
* @return the subject.
*/
public String getSubject() {
return this.subject;
}

/**
* See {@link PropertiesBuilder#subject(String)}.
* @param subject the subject.
*/
public void setSubject(String subject) {
this.subject = subject;
}

/**
* See {@link Properties#getCreationTime()}.
* @return the creation time.
*/
public long getCreationTime() {
return this.creationTime;
}

/**
* See {@link PropertiesBuilder#creationTime(long)}.
* @param creationTime the creation time.
*/
public void setCreationTime(long creationTime) {
this.creationTime = creationTime;
}

/**
* See {@link Properties#getGroupId()}.
* @return the group id.
*/
public String getGroupId() {
return this.groupId;
}

/**
* See {@link PropertiesBuilder#groupId(String)}.
* @param groupId the group id.
*/
public void setGroupId(String groupId) {
this.groupId = groupId;
}

/**
* See {@link Properties#getGroupSequence()}.
* @return the group sequence.
*/
public long getGroupSequence() {
return this.groupSequence;
}

/**
* See {@link PropertiesBuilder#groupSequence(long)}.
* @param groupSequence the group sequence.
*/
public void setGroupSequence(long groupSequence) {
this.groupSequence = groupSequence;
}

/**
* See {@link Properties#getReplyToGroupId()}.
* @return the reply to group id.
*/
public String getReplyToGroupId() {
return this.replyToGroupId;
}

/**
* See {@link PropertiesBuilder#replyToGroupId(String)}.
* @param replyToGroupId the reply to group id.
*/
public void setReplyToGroupId(String replyToGroupId) {
this.replyToGroupId = replyToGroupId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@

package org.springframework.rabbit.stream.support.converter;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.util.Assert;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.MessageBuilder.ApplicationPropertiesBuilder;
import com.rabbitmq.stream.MessageBuilder.PropertiesBuilder;
import com.rabbitmq.stream.Properties;
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
Expand All @@ -43,6 +48,8 @@ public class DefaultStreamMessageConverter implements StreamMessageConverter {

private final Supplier<MessageBuilder> builderSupplier;

private final Charset charset = StandardCharsets.UTF_8;

/**
* Construct an instance using a {@link WrapperMessageBuilder}.
*/
Expand All @@ -59,7 +66,7 @@ public DefaultStreamMessageConverter(@Nullable Codec codec) {
}

@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
public Message toMessage(Object object, StreamMessageProperties messageProperties) throws MessageConversionException {
Assert.isInstanceOf(com.rabbitmq.stream.Message.class, object);
com.rabbitmq.stream.Message streamMessage = (com.rabbitmq.stream.Message) object;
toMessageProperties(streamMessage, messageProperties);
Expand All @@ -72,22 +79,87 @@ public Message toMessage(Object object, MessageProperties messageProperties) thr
public com.rabbitmq.stream.Message fromMessage(Message message) throws MessageConversionException {
MessageBuilder builder = this.builderSupplier.get();
PropertiesBuilder propsBuilder = builder.properties();
MessageProperties mProps = message.getMessageProperties();
MessageProperties props = message.getMessageProperties();
Assert.isInstanceOf(StreamMessageProperties.class, props);
StreamMessageProperties mProps = (StreamMessageProperties) props;
JavaUtils.INSTANCE
.acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId);
// TODO ...
.acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId) // TODO different types
.acceptIfNotNull(mProps.getUserId(), usr -> propsBuilder.userId(usr.getBytes(this.charset)))
.acceptIfNotNull(mProps.getTo(), propsBuilder::to)
.acceptIfNotNull(mProps.getSubject(), propsBuilder::subject)
.acceptIfNotNull(mProps.getReplyTo(), propsBuilder::replyTo)
.acceptIfNotNull(mProps.getCorrelationId(), propsBuilder::correlationId) // TODO different types
.acceptIfNotNull(mProps.getContentType(), propsBuilder::contentType)
.acceptIfNotNull(mProps.getContentEncoding(), propsBuilder::contentEncoding)
.acceptIfNotNull(mProps.getExpiration(), exp -> propsBuilder.absoluteExpiryTime(Long.parseLong(exp)))
.acceptIfNotNull(mProps.getCreationTime(), propsBuilder::creationTime)
.acceptIfNotNull(mProps.getGroupId(), propsBuilder::groupId)
.acceptIfNotNull(mProps.getGroupSequence(), propsBuilder::groupSequence)
.acceptIfNotNull(mProps.getReplyToGroupId(), propsBuilder::replyToGroupId);
if (mProps.getHeaders().size() > 0) {
ApplicationPropertiesBuilder appPropsBuilder = builder.applicationProperties();
mProps.getHeaders().forEach((key, val) -> {
mapProp(key, val, appPropsBuilder);
});
}
builder.addData(message.getBody());
return builder.build();
}

private void toMessageProperties(com.rabbitmq.stream.Message streamMessage, MessageProperties messageProperties) {
private void mapProp(String key, Object val, ApplicationPropertiesBuilder builder) {
if (val instanceof String) {
builder.entry(key, (String) val);
}
else if (val instanceof Long) {
builder.entry(key, (Long) val);
}
else if (val instanceof Integer) {
builder.entry(key, (Integer) val);
}
else if (val instanceof Short) {
builder.entry(key, (Short) val);
}
else if (val instanceof Byte) {
builder.entry(key, (Byte) val);
}
else if (val instanceof Double) {
builder.entry(key, (Double) val);
}
else if (val instanceof Float) {
builder.entry(key, (Float) val);
}
else if (val instanceof Character) {
builder.entry(key, (Character) val);
}
else if (val instanceof UUID) {
builder.entry(key, (UUID) val);
}
else if (val instanceof byte[]) {
builder.entry(key, (byte[]) val);
}
}

private void toMessageProperties(com.rabbitmq.stream.Message streamMessage,
StreamMessageProperties mProps) {

Properties properties = streamMessage.getProperties();
JavaUtils.INSTANCE
.acceptIfNotNull(properties.getMessageIdAsString(), messageProperties::setMessageId);
// TODO ...
.acceptIfNotNull(properties.getMessageIdAsString(), mProps::setMessageId)
.acceptIfNotNull(properties.getUserId(), usr -> mProps.setUserId(new String(usr, this.charset)))
.acceptIfNotNull(properties.getTo(), mProps::setTo)
.acceptIfNotNull(properties.getSubject(), mProps::setSubject)
.acceptIfNotNull(properties.getReplyTo(), mProps::setReplyTo)
.acceptIfNotNull(properties.getCorrelationIdAsString(), mProps::setCorrelationId)
.acceptIfNotNull(properties.getContentType(), mProps::setContentType)
.acceptIfNotNull(properties.getContentEncoding(), mProps::setContentEncoding)
.acceptIfNotNull(properties.getAbsoluteExpiryTime(), exp -> mProps.setExpiration(Long.toString(exp)))
.acceptIfNotNull(properties.getCreationTime(), mProps::setCreationTime)
.acceptIfNotNull(properties.getGroupId(), mProps::setGroupId)
.acceptIfNotNull(properties.getGroupSequence(), mProps::setGroupSequence)
.acceptIfNotNull(properties.getReplyToGroupId(), mProps::setReplyToGroupId);
Map<String, Object> applicationProperties = streamMessage.getApplicationProperties();
if (applicationProperties != null) {
messageProperties.getHeaders().putAll(applicationProperties);
mProps.getHeaders().putAll(applicationProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.rabbit.stream.support.StreamMessageProperties;

/**
* Converts between {@link com.rabbitmq.stream.Message} and
Expand All @@ -31,8 +32,12 @@
*/
public interface StreamMessageConverter extends MessageConverter {

Message toMessage(Object object, StreamMessageProperties messageProperties) throws MessageConversionException;

@Override
Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;
default Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new UnsupportedOperationException();
}

@Override
com.rabbitmq.stream.Message fromMessage(Message message) throws MessageConversionException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.support.converter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Message;
import org.springframework.rabbit.stream.support.StreamMessageProperties;

import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.Properties;

/**
* @author Gary Russell
* @since 2.4
*
*/
public class DefaultStreamMessageConverterTests {

@Test
void toAndFrom() {
StreamMessageProperties smp = new StreamMessageProperties(mock(Context.class));
smp.setMessageId("test");
smp.setUserId("user");
smp.setTo("to");
smp.setSubject("subject");
smp.setReplyTo("replyTo");
smp.setCorrelationId("correlation");
smp.setContentType("application/json");
smp.setContentEncoding("UTF-8");
smp.setExpiration("42");
smp.setCreationTime(43L);
smp.setGroupId("groupId");
smp.setGroupSequence(44L);
smp.setReplyToGroupId("replyGroupId");
smp.setHeader("foo", "bar");
DefaultStreamMessageConverter converter = new DefaultStreamMessageConverter();
Message msg = new Message("foo".getBytes(), smp);
com.rabbitmq.stream.Message streamMessage = converter.fromMessage(msg);
Properties props = streamMessage.getProperties();
assertThat(props.getMessageIdAsString()).isEqualTo("test");
assertThat(props.getUserId()).isEqualTo("user".getBytes());
assertThat(props.getTo()).isEqualTo("to");
assertThat(props.getSubject()).isEqualTo("subject");
assertThat(props.getReplyTo()).isEqualTo("replyTo");
assertThat(props.getCorrelationIdAsString()).isEqualTo("correlation");
assertThat(props.getContentType()).isEqualTo("application/json");
assertThat(props.getContentEncoding()).isEqualTo("UTF-8");
assertThat(props.getAbsoluteExpiryTime()).isEqualTo(42L);
assertThat(props.getCreationTime()).isEqualTo(43L);
assertThat(props.getGroupId()).isEqualTo("groupId");
assertThat(props.getGroupSequence()).isEqualTo(44L);
assertThat(props.getReplyToGroupId()).isEqualTo("replyGroupId");
assertThat(streamMessage.getApplicationProperties().get("foo")).isEqualTo("bar");

StreamMessageProperties smp2 = new StreamMessageProperties(mock(Context.class));
msg = converter.toMessage(streamMessage, smp2);
assertThat(msg.getMessageProperties()).isSameAs(smp2);
assertThat(smp2).isEqualTo(smp);
}

}

0 comments on commit bc099bd

Please sign in to comment.