From 846d430493fcecf5476585ea0ec6d917844091a6 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Thu, 8 Aug 2024 16:24:08 +0200 Subject: [PATCH 1/9] fix: #7 fixed unit tests --- .../uprotocol/mqtt/HiveMqMQTT5Client.java | 31 ++--- .../uprotocol/mqtt/HiveMqIntegratedTest.java | 116 ++++++++---------- 2 files changed, 65 insertions(+), 82 deletions(-) diff --git a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java index a0c74d9..9333dd1 100644 --- a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java +++ b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java @@ -45,19 +45,19 @@ class HiveMqMQTT5Client implements UTransport { - private static final Logger LOG = LoggerFactory.getLogger(HiveMqMQTT5Client.class); - private static final String USER_PROPERTIES_KEY_FOR_ID = "1"; - private static final String USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE = "2"; - private static final String USER_PROPERTIES_KEY_FOR_SOURCE_NAME = "3"; - private static final String USER_PROPERTIES_KEY_FOR_SINK_NAME = "4"; - private static final String USER_PROPERTIES_KEY_FOR_PRIORITY = "5"; - private static final String USER_PROPERTIES_KEY_FOR_TTL = "6"; - private static final String USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL = "7"; - private static final String USER_PROPERTIES_KEY_FOR_COMMSTATUS = "8"; - private static final String USER_PROPERTIES_KEY_FOR_REQID = "9"; - private static final String USER_PROPERTIES_KEY_FOR_TOKEN = "10"; - private static final String USER_PROPERTIES_KEY_FOR_TRACEPARENT = "11"; - private static final String USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT = "12"; + public static final Logger LOG = LoggerFactory.getLogger(HiveMqMQTT5Client.class); + public static final String USER_PROPERTIES_KEY_FOR_ID = "1"; + public static final String USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE = "2"; + public static final String USER_PROPERTIES_KEY_FOR_SOURCE_NAME = "3"; + public static final String USER_PROPERTIES_KEY_FOR_SINK_NAME = "4"; + public static final String USER_PROPERTIES_KEY_FOR_PRIORITY = "5"; + public static final String USER_PROPERTIES_KEY_FOR_TTL = "6"; + public static final String USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL = "7"; + public static final String USER_PROPERTIES_KEY_FOR_COMMSTATUS = "8"; + public static final String USER_PROPERTIES_KEY_FOR_REQID = "9"; + public static final String USER_PROPERTIES_KEY_FOR_TOKEN = "10"; + public static final String USER_PROPERTIES_KEY_FOR_TRACEPARENT = "11"; + public static final String USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT = "12"; private final Mqtt5AsyncClient client; private final UUri source; @@ -69,18 +69,21 @@ public HiveMqMQTT5Client(UUri source, Mqtt5Client client) { @Override public CompletionStage send(UMessage uMessage) { LOG.trace("should send a message:\n{}", uMessage); - CompletableFuture result = new CompletableFuture<>(); UAttributesValidator validator = UAttributesValidator.getValidator(uMessage.getAttributes()); ValidationResult validationResult = validator.validate(uMessage.getAttributes()); if (validationResult.isFailure()) { throw new IllegalArgumentException("Invalid message attributes: " + validationResult); } + if(uMessage.getAttributes().hasTtl() && uMessage.getAttributes().getTtl() < 500){ + throw new IllegalArgumentException("TimeToLive needs to be at least 500ms. All smaller ttls will be dropped immediately by hiveMq"); + } Mqtt5UserProperties userProperties = buildUserProperties(uMessage.getAttributes()); Mqtt5PublishBuilder.Send.Complete> sendHandle = buildMqttSendHandle(uMessage, userProperties); + CompletableFuture result = new CompletableFuture<>(); sendHandle .send() .whenCompleteAsync((mqtt5PublishResult, throwable) -> { diff --git a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java index 3c44542..51cd2d4 100644 --- a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java +++ b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java @@ -16,9 +16,14 @@ import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import org.eclipse.uprotocol.communication.UPayload; import org.eclipse.uprotocol.transport.UListener; import org.eclipse.uprotocol.transport.UTransport; +import org.eclipse.uprotocol.transport.builder.UMessageBuilder; +import org.eclipse.uprotocol.uri.serializer.UriSerializer; import org.eclipse.uprotocol.v1.*; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -37,6 +42,8 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SINK_NAME; +import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SOURCE_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -78,43 +85,27 @@ void setUp() { @Test void givenValidClientAndMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException { - UMessage message = UMessage.newBuilder() - .setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset())) - .setAttributes(UAttributes.newBuilder() - .setId(UUID.newBuilder().build()) - .setTtl(1000) - .setReqid(UUID.newBuilder().build()) - .setToken("SomeToken") - .setTraceparent("someTraceParent") - .setSource(UUri.newBuilder() - .setAuthorityName("testSource.someUri.network") - .build()) - .setSink(UUri.newBuilder() - .setAuthorityName("testDestination.someUri.network") - .build()) - .build()) - .build(); + UMessage message = UMessageBuilder.request( + UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build(), + UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build(), 500) + .withToken("SomeToken") + .withTraceparent("someTraceParent") + .build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT)); UStatus response = serviceUnderTest.send(message).toCompletableFuture().join(); assertThat(response.getCode()).isEqualTo(UCode.OK); - Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get(); + + Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).orElseThrow(); assertThat(new String(receive.getPayloadAsBytes())).isEqualTo("Hello World"); } @Test void givenValidClientAndSmallestMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException { - UMessage message = UMessage.newBuilder() - .setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset())) - .setAttributes(UAttributes.newBuilder() - .setId(UUID.newBuilder().build()) - .setSource(UUri.newBuilder() - .setAuthorityName("testSource.someUri.network") - .build()) - .setSink(UUri.newBuilder() - .setAuthorityName("testDestination.someUri.network") - .build()) - .build()) - .build(); + UMessage message = UMessageBuilder.request( + UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build(), + UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build(), 500) + .build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT)); + UStatus response = serviceUnderTest.send(message).toCompletableFuture().join(); assertThat(response.getCode()).isEqualTo(UCode.OK); Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get(); @@ -124,15 +115,11 @@ void givenValidClientAndSmallestMessage_whenInvokeSend_shouldSendCorrectMessageT @Test @Disabled("Broadcast topic is not defined") void givenValidClientAndBroadcastMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException { - UMessage message = UMessage.newBuilder() - .setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset())) - .setAttributes(UAttributes.newBuilder() - .setId(UUID.newBuilder().build()) - .setSource(UUri.newBuilder() - .setAuthorityName("testSource.someUri.network") - .build()) - .build()) - .build(); + UMessage message = UMessageBuilder.publish( + UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build()) + .build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT)); + + UStatus response = serviceUnderTest.send(message).toCompletableFuture().join(); assertThat(response.getCode()).isEqualTo(UCode.OK); Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get(); @@ -145,15 +132,21 @@ void givenBlancoListener_whenAddingListenerAndReceivingMessages_shouldCallListen UStatus status = serviceUnderTest.registerListener(null, listener).toCompletableFuture().join(); - mqttClientForTests.publishWith().topic("a/some-source/c/d/e/some-sink/a/b/c").payload("Hello World".getBytes(Charset.defaultCharset())).send(); + mqttClientForTests.publishWith().topic("a/some-source/c/d/e/some-sink/a/b/c") + .userProperties(Mqtt5UserProperties.of( + Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SOURCE_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build())), + Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SINK_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build())) + )) + .payload("Hello World".getBytes(Charset.defaultCharset())) + .send(); assertThat(status.getCode()).isEqualTo(UCode.OK); ArgumentCaptor captor = ArgumentCaptor.captor(); verify(listener, Mockito.timeout(1000).times(1)).onReceive(captor.capture()); assertThat(captor.getValue()).isNotNull(); - assertThat(captor.getValue().getAttributes().getSink().getAuthorityName()).isEqualTo("some-sink"); - assertThat(captor.getValue().getAttributes().getSource().getAuthorityName()).isEqualTo("some-source"); + assertThat(captor.getValue().getAttributes().getSink().getAuthorityName()).isEqualTo("testDestination.someUri.network"); + assertThat(captor.getValue().getAttributes().getSource().getAuthorityName()).isEqualTo("testSource.someUri.network"); assertThat(captor.getValue().getPayload()).isNotNull(); assertThat(captor.getValue().getPayload().toString(Charset.defaultCharset())).isEqualTo("Hello World"); } @@ -332,14 +325,14 @@ void given2Listeners_whenUnregisterOneListener_shouldInvokeOtherListenersOnMessa } @Test - void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToListener() { + void givenListener_whenCloudSendsMessageToRadioAndRadioListens_shouldRouteMessageToRadio() { //given a radio and a cloudService UListener radioListener = mock(UListener.class); UUri radioUuid = UUri.newBuilder() .setAuthorityName("radio") .setUeId(0xffff) .setUeVersionMajor(0xff) - .setResourceId(0xffff) + .setResourceId(0) .build(); UTransport mqttClientOfRadio = TransportFactory.createInstance(radioUuid, mqttClientForTests); @@ -351,30 +344,19 @@ void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToList .build(); UTransport mqttClientOfCloud = TransportFactory.createInstance(cloudService, mqttClientForTests); - mqttClientOfRadio.registerListener( - cloudService, - radioUuid, - radioListener); + mqttClientOfRadio.registerListener(cloudService, radioUuid, radioListener); //when cloud service sends a message - mqttClientOfCloud.send( - UMessage.newBuilder() - .setPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset())) - .setAttributes(UAttributes.newBuilder() - .setId(UUID.newBuilder().setMsb(123L).build()) - .setType(UMessageType.UMESSAGE_TYPE_NOTIFICATION) - .setSource(cloudService) - .setSink(radioUuid) - .setPriority(UPriority.UPRIORITY_CS0) - .setTtl(1000) - .setPermissionLevel(4211) - .setCommstatus(UCode.OK) - .setReqid(UUID.newBuilder().setMsb(456L).build()) - .setToken("SomeToken") - .setTraceparent("someTraceParent") - .setPayloadFormat(UPayloadFormat.UPAYLOAD_FORMAT_TEXT) - .build()) - .build()); + + UMessage message = UMessageBuilder.notification(cloudService, radioUuid) + .withPriority(UPriority.UPRIORITY_CS2) + .withTtl(1000) + .withPermissionLevel(4211) + .withToken("SomeToken") + .withTraceparent("someTraceParent") + .build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT)); + + mqttClientOfCloud.send(message); //should be received by radio ArgumentCaptor captor = ArgumentCaptor.captor(); @@ -383,15 +365,13 @@ void givenListener_whenReceivingUMessageWithAllFields_shouldRouteAllFieldsToList UMessage receivedMessage = captor.getValue(); assertThat(receivedMessage.getPayload().toString(Charset.defaultCharset())).isEqualTo("Hello World"); - assertThat(receivedMessage.getAttributes().getId().getMsb()).isEqualTo(123L); assertThat(receivedMessage.getAttributes().getType()).isEqualTo(UMessageType.UMESSAGE_TYPE_NOTIFICATION); assertThat(receivedMessage.getAttributes().getSource().getAuthorityName()).isEqualTo("cloud"); assertThat(receivedMessage.getAttributes().getSink().getAuthorityName()).isEqualTo("radio"); - assertThat(receivedMessage.getAttributes().getPriority()).isEqualTo(UPriority.UPRIORITY_CS0); + assertThat(receivedMessage.getAttributes().getPriority()).isEqualTo(UPriority.UPRIORITY_CS2); assertThat(receivedMessage.getAttributes().getTtl()).isEqualTo(1000); assertThat(receivedMessage.getAttributes().getPermissionLevel()).isEqualTo(4211); assertThat(receivedMessage.getAttributes().getCommstatus()).isEqualTo(UCode.OK); - assertThat(receivedMessage.getAttributes().getReqid()).isEqualTo(UUID.newBuilder().setMsb(456L).build()); assertThat(receivedMessage.getAttributes().getToken()).isEqualTo("SomeToken"); assertThat(receivedMessage.getAttributes().getTraceparent()).isEqualTo("someTraceParent"); assertThat(receivedMessage.getAttributes().getPayloadFormat()).isEqualTo(UPayloadFormat.UPAYLOAD_FORMAT_TEXT); From a4eda406ce1ee578d332bde74bd01befe9326474 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Thu, 8 Aug 2024 16:28:57 +0200 Subject: [PATCH 2/9] fix: added jacoco plugin for test coverage calculation on pr --- pom.xml | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pom.xml b/pom.xml index a58ae39..533a164 100644 --- a/pom.xml +++ b/pom.xml @@ -199,4 +199,35 @@ + + + + + org.jacoco + jacoco-maven-plugin + 0.8.8 + + + + prepare-agent + + + + + report + test + + report + + + + + org/eclipse/uprotocol/mqtt/** + + + + + + + From 098d448926fe0a7f07a28e715a5d8ce5bdecb143 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Thu, 8 Aug 2024 16:42:33 +0200 Subject: [PATCH 3/9] chore: update setup-java to v4 --- .github/workflows/coverage.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 5f9e1d2..88533f6 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -16,10 +16,10 @@ jobs: git config --global user.email 'uprotocol-bot@eclipse.org' - name: Checkout code - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 + uses: actions/checkout@v4 - name: Set up Apache Maven Central - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: # configure settings.xml distribution: 'temurin' java-version: '17' @@ -46,7 +46,7 @@ jobs: - name: Upload JaCoCo Coverage report - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: coverage-report path: target/site/jacoco @@ -72,7 +72,7 @@ jobs: fs.writeFileSync('./pr-comment/pr-number.txt', pr_number); fs.writeFileSync('./pr-comment/body.txt', body); - - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 + - uses: actions/upload-artifact@v4 with: name: pr-comment path: pr-comment/ From 5b4fa229d2f6809ae94a69ed9e0ec81b791105a3 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Fri, 9 Aug 2024 08:23:01 +0200 Subject: [PATCH 4/9] chore: use maven wrapper to fix the maven version --- .github/workflows/coverage.yml | 2 +- .mvn/wrapper/maven-wrapper.properties | 19 ++ mvnw | 259 ++++++++++++++++++++++++++ mvnw.cmd | 149 +++++++++++++++ 4 files changed, 428 insertions(+), 1 deletion(-) create mode 100644 .mvn/wrapper/maven-wrapper.properties create mode 100755 mvnw create mode 100644 mvnw.cmd diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 88533f6..008165c 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -32,7 +32,7 @@ jobs: - name: Run tests with coverage run: | - mvn clean test jacoco:report + ./mvnw clean test jacoco:report - name: Extract JaCoCo report diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..f95f1ee --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +wrapperVersion=3.3.2 +distributionType=only-script +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.8/apache-maven-3.9.8-bin.zip diff --git a/mvnw b/mvnw new file mode 100755 index 0000000..19529dd --- /dev/null +++ b/mvnw @@ -0,0 +1,259 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Apache Maven Wrapper startup batch script, version 3.3.2 +# +# Optional ENV vars +# ----------------- +# JAVA_HOME - location of a JDK home dir, required when download maven via java source +# MVNW_REPOURL - repo url base for downloading maven distribution +# MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven +# MVNW_VERBOSE - true: enable verbose log; debug: trace the mvnw script; others: silence the output +# ---------------------------------------------------------------------------- + +set -euf +[ "${MVNW_VERBOSE-}" != debug ] || set -x + +# OS specific support. +native_path() { printf %s\\n "$1"; } +case "$(uname)" in +CYGWIN* | MINGW*) + [ -z "${JAVA_HOME-}" ] || JAVA_HOME="$(cygpath --unix "$JAVA_HOME")" + native_path() { cygpath --path --windows "$1"; } + ;; +esac + +# set JAVACMD and JAVACCMD +set_java_home() { + # For Cygwin and MinGW, ensure paths are in Unix format before anything is touched + if [ -n "${JAVA_HOME-}" ]; then + if [ -x "$JAVA_HOME/jre/sh/java" ]; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACCMD="$JAVA_HOME/jre/sh/javac" + else + JAVACMD="$JAVA_HOME/bin/java" + JAVACCMD="$JAVA_HOME/bin/javac" + + if [ ! -x "$JAVACMD" ] || [ ! -x "$JAVACCMD" ]; then + echo "The JAVA_HOME environment variable is not defined correctly, so mvnw cannot run." >&2 + echo "JAVA_HOME is set to \"$JAVA_HOME\", but \"\$JAVA_HOME/bin/java\" or \"\$JAVA_HOME/bin/javac\" does not exist." >&2 + return 1 + fi + fi + else + JAVACMD="$( + 'set' +e + 'unset' -f command 2>/dev/null + 'command' -v java + )" || : + JAVACCMD="$( + 'set' +e + 'unset' -f command 2>/dev/null + 'command' -v javac + )" || : + + if [ ! -x "${JAVACMD-}" ] || [ ! -x "${JAVACCMD-}" ]; then + echo "The java/javac command does not exist in PATH nor is JAVA_HOME set, so mvnw cannot run." >&2 + return 1 + fi + fi +} + +# hash string like Java String::hashCode +hash_string() { + str="${1:-}" h=0 + while [ -n "$str" ]; do + char="${str%"${str#?}"}" + h=$(((h * 31 + $(LC_CTYPE=C printf %d "'$char")) % 4294967296)) + str="${str#?}" + done + printf %x\\n $h +} + +verbose() { :; } +[ "${MVNW_VERBOSE-}" != true ] || verbose() { printf %s\\n "${1-}"; } + +die() { + printf %s\\n "$1" >&2 + exit 1 +} + +trim() { + # MWRAPPER-139: + # Trims trailing and leading whitespace, carriage returns, tabs, and linefeeds. + # Needed for removing poorly interpreted newline sequences when running in more + # exotic environments such as mingw bash on Windows. + printf "%s" "${1}" | tr -d '[:space:]' +} + +# parse distributionUrl and optional distributionSha256Sum, requires .mvn/wrapper/maven-wrapper.properties +while IFS="=" read -r key value; do + case "${key-}" in + distributionUrl) distributionUrl=$(trim "${value-}") ;; + distributionSha256Sum) distributionSha256Sum=$(trim "${value-}") ;; + esac +done <"${0%/*}/.mvn/wrapper/maven-wrapper.properties" +[ -n "${distributionUrl-}" ] || die "cannot read distributionUrl property in ${0%/*}/.mvn/wrapper/maven-wrapper.properties" + +case "${distributionUrl##*/}" in +maven-mvnd-*bin.*) + MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/ + case "${PROCESSOR_ARCHITECTURE-}${PROCESSOR_ARCHITEW6432-}:$(uname -a)" in + *AMD64:CYGWIN* | *AMD64:MINGW*) distributionPlatform=windows-amd64 ;; + :Darwin*x86_64) distributionPlatform=darwin-amd64 ;; + :Darwin*arm64) distributionPlatform=darwin-aarch64 ;; + :Linux*x86_64*) distributionPlatform=linux-amd64 ;; + *) + echo "Cannot detect native platform for mvnd on $(uname)-$(uname -m), use pure java version" >&2 + distributionPlatform=linux-amd64 + ;; + esac + distributionUrl="${distributionUrl%-bin.*}-$distributionPlatform.zip" + ;; +maven-mvnd-*) MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/ ;; +*) MVN_CMD="mvn${0##*/mvnw}" _MVNW_REPO_PATTERN=/org/apache/maven/ ;; +esac + +# apply MVNW_REPOURL and calculate MAVEN_HOME +# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-,maven-mvnd--}/ +[ -z "${MVNW_REPOURL-}" ] || distributionUrl="$MVNW_REPOURL$_MVNW_REPO_PATTERN${distributionUrl#*"$_MVNW_REPO_PATTERN"}" +distributionUrlName="${distributionUrl##*/}" +distributionUrlNameMain="${distributionUrlName%.*}" +distributionUrlNameMain="${distributionUrlNameMain%-bin}" +MAVEN_USER_HOME="${MAVEN_USER_HOME:-${HOME}/.m2}" +MAVEN_HOME="${MAVEN_USER_HOME}/wrapper/dists/${distributionUrlNameMain-}/$(hash_string "$distributionUrl")" + +exec_maven() { + unset MVNW_VERBOSE MVNW_USERNAME MVNW_PASSWORD MVNW_REPOURL || : + exec "$MAVEN_HOME/bin/$MVN_CMD" "$@" || die "cannot exec $MAVEN_HOME/bin/$MVN_CMD" +} + +if [ -d "$MAVEN_HOME" ]; then + verbose "found existing MAVEN_HOME at $MAVEN_HOME" + exec_maven "$@" +fi + +case "${distributionUrl-}" in +*?-bin.zip | *?maven-mvnd-?*-?*.zip) ;; +*) die "distributionUrl is not valid, must match *-bin.zip or maven-mvnd-*.zip, but found '${distributionUrl-}'" ;; +esac + +# prepare tmp dir +if TMP_DOWNLOAD_DIR="$(mktemp -d)" && [ -d "$TMP_DOWNLOAD_DIR" ]; then + clean() { rm -rf -- "$TMP_DOWNLOAD_DIR"; } + trap clean HUP INT TERM EXIT +else + die "cannot create temp dir" +fi + +mkdir -p -- "${MAVEN_HOME%/*}" + +# Download and Install Apache Maven +verbose "Couldn't find MAVEN_HOME, downloading and installing it ..." +verbose "Downloading from: $distributionUrl" +verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName" + +# select .zip or .tar.gz +if ! command -v unzip >/dev/null; then + distributionUrl="${distributionUrl%.zip}.tar.gz" + distributionUrlName="${distributionUrl##*/}" +fi + +# verbose opt +__MVNW_QUIET_WGET=--quiet __MVNW_QUIET_CURL=--silent __MVNW_QUIET_UNZIP=-q __MVNW_QUIET_TAR='' +[ "${MVNW_VERBOSE-}" != true ] || __MVNW_QUIET_WGET='' __MVNW_QUIET_CURL='' __MVNW_QUIET_UNZIP='' __MVNW_QUIET_TAR=v + +# normalize http auth +case "${MVNW_PASSWORD:+has-password}" in +'') MVNW_USERNAME='' MVNW_PASSWORD='' ;; +has-password) [ -n "${MVNW_USERNAME-}" ] || MVNW_USERNAME='' MVNW_PASSWORD='' ;; +esac + +if [ -z "${MVNW_USERNAME-}" ] && command -v wget >/dev/null; then + verbose "Found wget ... using wget" + wget ${__MVNW_QUIET_WGET:+"$__MVNW_QUIET_WGET"} "$distributionUrl" -O "$TMP_DOWNLOAD_DIR/$distributionUrlName" || die "wget: Failed to fetch $distributionUrl" +elif [ -z "${MVNW_USERNAME-}" ] && command -v curl >/dev/null; then + verbose "Found curl ... using curl" + curl ${__MVNW_QUIET_CURL:+"$__MVNW_QUIET_CURL"} -f -L -o "$TMP_DOWNLOAD_DIR/$distributionUrlName" "$distributionUrl" || die "curl: Failed to fetch $distributionUrl" +elif set_java_home; then + verbose "Falling back to use Java to download" + javaSource="$TMP_DOWNLOAD_DIR/Downloader.java" + targetZip="$TMP_DOWNLOAD_DIR/$distributionUrlName" + cat >"$javaSource" <<-END + public class Downloader extends java.net.Authenticator + { + protected java.net.PasswordAuthentication getPasswordAuthentication() + { + return new java.net.PasswordAuthentication( System.getenv( "MVNW_USERNAME" ), System.getenv( "MVNW_PASSWORD" ).toCharArray() ); + } + public static void main( String[] args ) throws Exception + { + setDefault( new Downloader() ); + java.nio.file.Files.copy( java.net.URI.create( args[0] ).toURL().openStream(), java.nio.file.Paths.get( args[1] ).toAbsolutePath().normalize() ); + } + } + END + # For Cygwin/MinGW, switch paths to Windows format before running javac and java + verbose " - Compiling Downloader.java ..." + "$(native_path "$JAVACCMD")" "$(native_path "$javaSource")" || die "Failed to compile Downloader.java" + verbose " - Running Downloader.java ..." + "$(native_path "$JAVACMD")" -cp "$(native_path "$TMP_DOWNLOAD_DIR")" Downloader "$distributionUrl" "$(native_path "$targetZip")" +fi + +# If specified, validate the SHA-256 sum of the Maven distribution zip file +if [ -n "${distributionSha256Sum-}" ]; then + distributionSha256Result=false + if [ "$MVN_CMD" = mvnd.sh ]; then + echo "Checksum validation is not supported for maven-mvnd." >&2 + echo "Please disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2 + exit 1 + elif command -v sha256sum >/dev/null; then + if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | sha256sum -c >/dev/null 2>&1; then + distributionSha256Result=true + fi + elif command -v shasum >/dev/null; then + if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | shasum -a 256 -c >/dev/null 2>&1; then + distributionSha256Result=true + fi + else + echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." >&2 + echo "Please install either command, or disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2 + exit 1 + fi + if [ $distributionSha256Result = false ]; then + echo "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised." >&2 + echo "If you updated your Maven version, you need to update the specified distributionSha256Sum property." >&2 + exit 1 + fi +fi + +# unzip and move +if command -v unzip >/dev/null; then + unzip ${__MVNW_QUIET_UNZIP:+"$__MVNW_QUIET_UNZIP"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -d "$TMP_DOWNLOAD_DIR" || die "failed to unzip" +else + tar xzf${__MVNW_QUIET_TAR:+"$__MVNW_QUIET_TAR"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -C "$TMP_DOWNLOAD_DIR" || die "failed to untar" +fi +printf %s\\n "$distributionUrl" >"$TMP_DOWNLOAD_DIR/$distributionUrlNameMain/mvnw.url" +mv -- "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" "$MAVEN_HOME" || [ -d "$MAVEN_HOME" ] || die "fail to move MAVEN_HOME" + +clean || : +exec_maven "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..249bdf3 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,149 @@ +<# : batch portion +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Apache Maven Wrapper startup batch script, version 3.3.2 +@REM +@REM Optional ENV vars +@REM MVNW_REPOURL - repo url base for downloading maven distribution +@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven +@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output +@REM ---------------------------------------------------------------------------- + +@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0) +@SET __MVNW_CMD__= +@SET __MVNW_ERROR__= +@SET __MVNW_PSMODULEP_SAVE=%PSModulePath% +@SET PSModulePath= +@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @( + IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B) +) +@SET PSModulePath=%__MVNW_PSMODULEP_SAVE% +@SET __MVNW_PSMODULEP_SAVE= +@SET __MVNW_ARG0_NAME__= +@SET MVNW_USERNAME= +@SET MVNW_PASSWORD= +@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*) +@echo Cannot start maven from wrapper >&2 && exit /b 1 +@GOTO :EOF +: end batch / begin powershell #> + +$ErrorActionPreference = "Stop" +if ($env:MVNW_VERBOSE -eq "true") { + $VerbosePreference = "Continue" +} + +# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties +$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl +if (!$distributionUrl) { + Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties" +} + +switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) { + "maven-mvnd-*" { + $USE_MVND = $true + $distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip" + $MVN_CMD = "mvnd.cmd" + break + } + default { + $USE_MVND = $false + $MVN_CMD = $script -replace '^mvnw','mvn' + break + } +} + +# apply MVNW_REPOURL and calculate MAVEN_HOME +# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-,maven-mvnd--}/ +if ($env:MVNW_REPOURL) { + $MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" } + $distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')" +} +$distributionUrlName = $distributionUrl -replace '^.*/','' +$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$','' +$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain" +if ($env:MAVEN_USER_HOME) { + $MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain" +} +$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join '' +$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME" + +if (Test-Path -Path "$MAVEN_HOME" -PathType Container) { + Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME" + Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD" + exit $? +} + +if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) { + Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl" +} + +# prepare tmp dir +$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile +$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir" +$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null +trap { + if ($TMP_DOWNLOAD_DIR.Exists) { + try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null } + catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" } + } +} + +New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null + +# Download and Install Apache Maven +Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..." +Write-Verbose "Downloading from: $distributionUrl" +Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName" + +$webclient = New-Object System.Net.WebClient +if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) { + $webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD) +} +[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12 +$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null + +# If specified, validate the SHA-256 sum of the Maven distribution zip file +$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum +if ($distributionSha256Sum) { + if ($USE_MVND) { + Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." + } + Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash + if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) { + Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property." + } +} + +# unzip and move +Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null +Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null +try { + Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null +} catch { + if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) { + Write-Error "fail to move MAVEN_HOME" + } +} finally { + try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null } + catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" } +} + +Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD" From 3983b8f364bf05eaecbd2593bf20636b71fc131f Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Fri, 9 Aug 2024 08:26:32 +0200 Subject: [PATCH 5/9] chore: use maven wrapper also in all other actions --- .github/workflows/build_and_test.yml | 2 +- .github/workflows/build_deploy.yml | 2 +- .github/workflows/release.yml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index fa2df46..34e4dff 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -18,4 +18,4 @@ jobs: distribution: 'temurin' cache: maven - name: Build and Test with Maven - run: mvn -B package --file pom.xml + run: ./mvnw -B package --file pom.xml diff --git a/.github/workflows/build_deploy.yml b/.github/workflows/build_deploy.yml index d9928de..9a10e91 100644 --- a/.github/workflows/build_deploy.yml +++ b/.github/workflows/build_deploy.yml @@ -24,7 +24,7 @@ jobs: server-password: OSSRH_PASSWORD - name: Build and Publish to OSSRH snapshot repo - run: mvn clean deploy + run: ./mvnw clean deploy env: OSSRH_USER: ${{ secrets.ORG_OSSRH_USERNAME }} OSSRH_PASSWORD: ${{ secrets.ORG_OSSRH_PASSWORD }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3fc6174..e780df8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -34,7 +34,7 @@ jobs: - name: Stage to Nexus and Release to Maven central run: | - mvn -B release:clean release:prepare -P release release:perform + ./mvnw -B release:clean release:prepare -P release release:perform env: OSSRH_USER: ${{ secrets.ORG_OSSRH_USERNAME }} OSSRH_PASSWORD: ${{ secrets.ORG_OSSRH_PASSWORD }} @@ -44,7 +44,7 @@ jobs: - if: cancelled() || failure() run: | - mvn -B release:rollback + ./mvnw -B release:rollback env: GH_TOKEN: ${{ secrets.BOT_GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.BOT_GITHUB_TOKEN }} From 7c04df89006a310878d5c505bbad6febd0e58800 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Fri, 9 Aug 2024 10:05:22 +0200 Subject: [PATCH 6/9] chore: update dependencies --- pom.xml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 533a164..5a6b65e 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ org.eclipse.uprotocol up-java - 0.1.11 + 2.0.0 com.hivemq @@ -90,35 +90,35 @@ org.junit junit-bom - 5.10.2 + 5.10.3 pom import org.mockito mockito-bom - 5.11.0 + 5.12.0 import pom org.assertj assertj-bom - 3.25.3 + 3.26.3 pom import io.cucumber cucumber-bom - 7.18.0 + 7.18.1 pom import org.testcontainers testcontainers-bom - 1.19.8 + 1.20.1 pom import From 4755e287ae4826409822a74959bddeed6f2d0003 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Fri, 9 Aug 2024 13:59:13 +0200 Subject: [PATCH 7/9] feat: adjustments to mqtt-topics specified in up-spec 1.6.0-alpha.2 --- docs/use/How-to-use-in-my-service.adoc | 4 +- pom.xml | 2 +- .../uprotocol/mqtt/HiveMqMQTT5Client.java | 50 +++++++++++-------- .../uprotocol/mqtt/HiveMqIntegratedTest.java | 10 ++-- 4 files changed, 38 insertions(+), 28 deletions(-) diff --git a/docs/use/How-to-use-in-my-service.adoc b/docs/use/How-to-use-in-my-service.adoc index b487505..72a1184 100644 --- a/docs/use/How-to-use-in-my-service.adoc +++ b/docs/use/How-to-use-in-my-service.adoc @@ -9,8 +9,8 @@ | library version | spec version | supported mqtt libraries -| `0.1.0` | `1.5.8` | -- `com.hivemq:hivemq-mqtt-client:1.3.3` +| `0.1.x` | `1.5.8` | `com.hivemq:hivemq-mqtt-client:1.3.3` +| `0.2.x-alpha` | `2.0.0-alpha.2` | `com.hivemq:hivemq-mqtt-client:1.3.3` |=== diff --git a/pom.xml b/pom.xml index 5a6b65e..1957606 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ up-client-mqtt5-java MQTT5 Transport Library for uProtocol with java MQTT5 Transport Library for uProtocol with java - 0.1.0-SNAPSHOT + 0.2.0-alpha-SNAPSHOT jar https://github.com/eclipse-uprotocol/up-client-mqtt5-java/ diff --git a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java index 9333dd1..6bc4d7a 100644 --- a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java +++ b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java @@ -14,7 +14,9 @@ import com.google.protobuf.ByteString; import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.datatypes.MqttTopicBuilder; import com.hivemq.client.mqtt.datatypes.MqttTopicFilter; +import com.hivemq.client.mqtt.datatypes.MqttTopicFilterBuilder; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; @@ -25,6 +27,7 @@ import org.eclipse.uprotocol.transport.UListener; import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.transport.validate.UAttributesValidator; +import org.eclipse.uprotocol.uri.factory.UriFactory; import org.eclipse.uprotocol.uri.serializer.UriSerializer; import org.eclipse.uprotocol.uuid.serializer.UuidSerializer; import org.eclipse.uprotocol.v1.*; @@ -104,7 +107,7 @@ public CompletionStage send(UMessage uMessage) { @SuppressWarnings("ResultOfMethodCallIgnored") private Mqtt5PublishBuilder.Send.@NotNull Complete> buildMqttSendHandle(UMessage uMessage, Mqtt5UserProperties userProperties) { Mqtt5PublishBuilder.Send.Complete> sendHandle = client.publishWith() - .topic(getTopicForSending(uMessage.getAttributes().getSource(), uMessage.getAttributes().getSink())) + .topic(getTopicForSending(uMessage.getAttributes().getSource(), uMessage.getAttributes().hasSink() ? uMessage.getAttributes().getSink() : null)) .payload(uMessage.getPayload().toByteArray()) .userProperties(userProperties); @@ -224,40 +227,47 @@ public void close() { } private @NotNull MqttTopic getTopicForSending(@NotNull UUri source, @Nullable UUri sink) { - return MqttTopic.builder() + MqttTopicBuilder.Complete build = MqttTopic.builder() .addLevel(String.valueOf(determinateClientIdentifierFromSource(source))) .addLevel(source.getAuthorityName()) .addLevel(String.format("%04x", source.getUeId())) .addLevel(String.format("%02x", source.getUeVersionMajor())) - .addLevel(String.format("%04x", source.getResourceId())) - - .addLevel(Optional.ofNullable(sink).map(UUri::getAuthorityName).orElse("")) - .addLevel(Optional.ofNullable(sink).map(UUri::getUeId).map(ueId -> String.format("%04x", ueId)).orElse("")) - .addLevel(Optional.ofNullable(sink).map(UUri::getUeVersionMajor).map(majorVersion -> String.format("%02x", majorVersion)).orElse("")) - .addLevel(Optional.ofNullable(sink).map(UUri::getResourceId).map(resourceId -> String.format("%04x", resourceId)).orElse("")) - + .addLevel(String.format("%04x", source.getResourceId())); + if (sink != null) { + build = build + .addLevel(Optional.of(sink).map(UUri::getAuthorityName).orElse("")) + .addLevel(Optional.of(sink).map(UUri::getUeId).map(ueId -> String.format("%04x", ueId)).orElse("")) + .addLevel(Optional.of(sink).map(UUri::getUeVersionMajor).map(majorVersion -> String.format("%02x", majorVersion)).orElse("")) + .addLevel(Optional.of(sink).map(UUri::getResourceId).map(resourceId -> String.format("%04x", resourceId)).orElse("")); + } else { + LOG.trace("no sink defined, therefor sink topics ar e removed (req. since up-spec 1.6.0)"); + } + return build .build(); } private @NotNull MqttTopicFilter getTopicFilterForReceiving(@Nullable UUri source, @Nullable UUri sink) { String singleLevelWildcardAsString = String.valueOf(MqttTopicFilter.SINGLE_LEVEL_WILDCARD); - return MqttTopicFilter.builder() + MqttTopicFilterBuilder.Complete builder = MqttTopicFilter.builder() .addLevel(String.valueOf(determinateClientIdentifierFromSource(source))) //if source is null or predefined wildcard -> choose singleLevelWildcardAsString, otherwise choose value (the code is the other way around) .addLevel(Optional.ofNullable(source).filter(_source -> !"*".equals(_source.getAuthorityName())).map(UUri::getAuthorityName).orElse(singleLevelWildcardAsString)) .addLevel(Optional.ofNullable(source).filter(_source -> _source.getUeId() != 0xffff).map(existingSource -> String.format("%04x", existingSource.getUeId())).orElse(singleLevelWildcardAsString)) .addLevel(Optional.ofNullable(source).filter(_source -> _source.getUeVersionMajor() != 0xff).map(UUri::getUeVersionMajor).map(Object::toString).orElse(singleLevelWildcardAsString)) - .addLevel(Optional.ofNullable(source).filter(_source -> _source.getResourceId() != 0xffff).map(UUri::getResourceId).map(i -> String.format("%04x", i)).orElse(singleLevelWildcardAsString)) - - //if sink is null or predefined wildcard -> choose singleLevelWildcardAsString, otherwise choose value (the code is the other way around) - .addLevel(Optional.ofNullable(sink).filter(_sink -> !"*".equals(_sink.getAuthorityName())).map(UUri::getAuthorityName).orElse(singleLevelWildcardAsString)) - .addLevel(Optional.ofNullable(sink).filter(_sink -> _sink.getUeId() != 0xffff).map(existingSource -> String.format("%04x", existingSource.getUeId())).orElse(singleLevelWildcardAsString)) - .addLevel(Optional.ofNullable(sink).filter(_sink -> _sink.getUeVersionMajor() != 0xff).map(UUri::getUeVersionMajor).map(Object::toString).orElse(singleLevelWildcardAsString)) - .addLevel(Optional.ofNullable(sink).filter(_sink -> _sink.getResourceId() != 0xffff).map(UUri::getResourceId).map(i -> String.format("%04x", i)).orElse(singleLevelWildcardAsString)) - - .build(); + .addLevel(Optional.ofNullable(source).filter(_source -> _source.getResourceId() != 0xffff).map(UUri::getResourceId).map(i -> String.format("%04x", i)).orElse(singleLevelWildcardAsString)); + + if (sink == null || UriFactory.ANY.equals(sink)) { + builder.addLevel(String.valueOf(MqttTopicFilter.MULTI_LEVEL_WILDCARD)); + } else { + builder = builder + .addLevel(Optional.of(sink).filter(_sink -> !"*".equals(_sink.getAuthorityName())).map(UUri::getAuthorityName).orElse(singleLevelWildcardAsString)) + .addLevel(Optional.of(sink).filter(_sink -> _sink.getUeId() != 0xffff).map(existingSource -> String.format("%04x", existingSource.getUeId())).orElse(singleLevelWildcardAsString)) + .addLevel(Optional.of(sink).filter(_sink -> _sink.getUeVersionMajor() != 0xff).map(UUri::getUeVersionMajor).map(Object::toString).orElse(singleLevelWildcardAsString)) + .addLevel(Optional.of(sink).filter(_sink -> _sink.getResourceId() != 0xffff).map(UUri::getResourceId).map(i -> String.format("%04x", i)).orElse(singleLevelWildcardAsString)); + } + return builder.build(); } private char determinateClientIdentifierFromSource(UUri source) { @@ -271,7 +281,7 @@ private char determinateClientIdentifierFromSource(UUri source) { } private UAttributes extractUAttributesFromReceivedMQTTMessage(@NotNull Mqtt5Publish mqtt5Publish) { - if (mqtt5Publish.getTopic().getLevels().size() != 9) + if (mqtt5Publish.getTopic().getLevels().size() != 9 && mqtt5Publish.getTopic().getLevels().size() != 5) throw new IllegalArgumentException("Topic did not match uProtocol pattern for mqtt messages of this spec"); Map userProperties = convertUserPropertiesToMap(mqtt5Publish.getUserProperties()); diff --git a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java index 51cd2d4..51df646 100644 --- a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java +++ b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java @@ -23,6 +23,7 @@ import org.eclipse.uprotocol.transport.UListener; import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.transport.builder.UMessageBuilder; +import org.eclipse.uprotocol.uri.factory.UriFactory; import org.eclipse.uprotocol.uri.serializer.UriSerializer; import org.eclipse.uprotocol.v1.*; import org.junit.jupiter.api.BeforeEach; @@ -39,6 +40,7 @@ import org.testcontainers.utility.DockerImageName; import java.nio.charset.Charset; +import java.util.Calendar; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -113,10 +115,8 @@ void givenValidClientAndSmallestMessage_whenInvokeSend_shouldSendCorrectMessageT } @Test - @Disabled("Broadcast topic is not defined") void givenValidClientAndBroadcastMessage_whenInvokeSend_shouldSendCorrectMessageToMqtt() throws InterruptedException { - UMessage message = UMessageBuilder.publish( - UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build()) + UMessage message = UMessageBuilder.publish(UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0x8001).build()) .build(new UPayload(ByteString.copyFrom("Hello World", Charset.defaultCharset()), UPayloadFormat.UPAYLOAD_FORMAT_TEXT)); @@ -124,6 +124,7 @@ void givenValidClientAndBroadcastMessage_whenInvokeSend_shouldSendCorrectMessage assertThat(response.getCode()).isEqualTo(UCode.OK); Mqtt5Publish receive = handleToReceiveMqttMessages.receive(1, TimeUnit.SECONDS).get(); assertThat(new String(receive.getPayloadAsBytes())).isEqualTo("Hello World"); + assertThat(receive.getTopic().toString()).isEqualTo("d/testSource.someUri.network/0002/01/8001"); } @Test @@ -169,13 +170,12 @@ void givenBlancoListener_whenAddingListenerAndReceivingAnyMessage_shouldCallList } @Test - @Disabled("Broadcast topic is not defined") void givenBlancoListener_whenAddingListenerAndReceivingBroadcastMessages_shouldCallListener() { UListener listener = mock(UListener.class); UStatus status = serviceUnderTest.registerListener(null, listener).toCompletableFuture().join(); - mqttClientForTests.publishWith().topic("a/some-source/c/d/e////").payload("Hello World".getBytes(Charset.defaultCharset())).send(); + mqttClientForTests.publishWith().topic("a/some-source/c/d/e").payload("Hello World".getBytes(Charset.defaultCharset())).send(); assertThat(status.getCode()).isEqualTo(UCode.OK); From f25b58aae5b5af89a661e68ef36f755fccd8fb08 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Thu, 15 Aug 2024 12:09:28 +0200 Subject: [PATCH 8/9] chore: pr feedback --- .../uprotocol/mqtt/HiveMqMQTT5Client.java | 79 ++++++++----------- .../uprotocol/mqtt/TransportFactory.java | 59 ++++++++++++++ .../uprotocol/mqtt/HiveMqIntegratedTest.java | 8 +- .../uprotocol/mqtt/TransportFactoryTest.java | 25 +++++- 4 files changed, 119 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java index 6bc4d7a..4a4dd40 100644 --- a/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java +++ b/src/main/java/org/eclipse/uprotocol/mqtt/HiveMqMQTT5Client.java @@ -49,18 +49,6 @@ class HiveMqMQTT5Client implements UTransport { public static final Logger LOG = LoggerFactory.getLogger(HiveMqMQTT5Client.class); - public static final String USER_PROPERTIES_KEY_FOR_ID = "1"; - public static final String USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE = "2"; - public static final String USER_PROPERTIES_KEY_FOR_SOURCE_NAME = "3"; - public static final String USER_PROPERTIES_KEY_FOR_SINK_NAME = "4"; - public static final String USER_PROPERTIES_KEY_FOR_PRIORITY = "5"; - public static final String USER_PROPERTIES_KEY_FOR_TTL = "6"; - public static final String USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL = "7"; - public static final String USER_PROPERTIES_KEY_FOR_COMMSTATUS = "8"; - public static final String USER_PROPERTIES_KEY_FOR_REQID = "9"; - public static final String USER_PROPERTIES_KEY_FOR_TOKEN = "10"; - public static final String USER_PROPERTIES_KEY_FOR_TRACEPARENT = "11"; - public static final String USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT = "12"; private final Mqtt5AsyncClient client; private final UUri source; @@ -122,40 +110,40 @@ public CompletionStage send(UMessage uMessage) { Mqtt5UserPropertiesBuilder builder = Mqtt5UserProperties.builder(); builder.add("0", "1"); if (attributes.hasId()) - builder.add(USER_PROPERTIES_KEY_FOR_ID, UuidSerializer.serialize(attributes.getId())); + builder.add(String.valueOf(UAttributes.ID_FIELD_NUMBER), UuidSerializer.serialize(attributes.getId())); if(attributes.getType() != UMessageType.UMESSAGE_TYPE_UNSPECIFIED) - builder.add(USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE, Integer.toString(attributes.getTypeValue())); + builder.add(String.valueOf(UAttributes.TYPE_FIELD_NUMBER), Integer.toString(attributes.getTypeValue())); if(attributes.hasSource()) - builder.add(USER_PROPERTIES_KEY_FOR_SOURCE_NAME, UriSerializer.serialize(attributes.getSource())); + builder.add(String.valueOf(UAttributes.SOURCE_FIELD_NUMBER), UriSerializer.serialize(attributes.getSource())); if(attributes.hasSink()) - builder.add(USER_PROPERTIES_KEY_FOR_SINK_NAME, UriSerializer.serialize(attributes.getSink())); + builder.add(String.valueOf(UAttributes.SINK_FIELD_NUMBER), UriSerializer.serialize(attributes.getSink())); if(attributes.getPriority() != UPriority.UPRIORITY_UNSPECIFIED) - builder.add(USER_PROPERTIES_KEY_FOR_PRIORITY, Integer.toString(attributes.getPriorityValue())); + builder.add(String.valueOf(UAttributes.PRIORITY_FIELD_NUMBER), Integer.toString(attributes.getPriorityValue())); if (attributes.hasTtl()) - builder.add(USER_PROPERTIES_KEY_FOR_TTL, Integer.toString(attributes.getTtl())); + builder.add(String.valueOf(UAttributes.TTL_FIELD_NUMBER), Integer.toString(attributes.getTtl())); if(attributes.hasPermissionLevel()) - builder.add(USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL, Integer.toString(attributes.getPermissionLevel())); + builder.add(String.valueOf(UAttributes.PERMISSION_LEVEL_FIELD_NUMBER), Integer.toString(attributes.getPermissionLevel())); if(attributes.hasCommstatus()) - builder.add(USER_PROPERTIES_KEY_FOR_COMMSTATUS, Integer.toString(attributes.getCommstatusValue())); + builder.add(String.valueOf(UAttributes.COMMSTATUS_FIELD_NUMBER), Integer.toString(attributes.getCommstatusValue())); if(attributes.hasReqid()) - builder.add(USER_PROPERTIES_KEY_FOR_REQID, UuidSerializer.serialize(attributes.getReqid())); + builder.add(String.valueOf(UAttributes.REQID_FIELD_NUMBER), UuidSerializer.serialize(attributes.getReqid())); if(attributes.hasToken()) - builder.add(USER_PROPERTIES_KEY_FOR_TOKEN, attributes.getToken()); + builder.add(String.valueOf(UAttributes.TOKEN_FIELD_NUMBER), attributes.getToken()); if(attributes.hasTraceparent()) - builder.add(USER_PROPERTIES_KEY_FOR_TRACEPARENT, attributes.getTraceparent()); + builder.add(String.valueOf(UAttributes.TRACEPARENT_FIELD_NUMBER), attributes.getTraceparent()); if(attributes.getPayloadFormat() != UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) - builder.add(USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT, Integer.toString(attributes.getPayloadFormatValue())); + builder.add(String.valueOf(UAttributes.PAYLOAD_FORMAT_FIELD_NUMBER), Integer.toString(attributes.getPayloadFormatValue())); return builder.build(); } @@ -294,29 +282,30 @@ private UAttributes extractUAttributesFromReceivedMQTTMessage(@NotNull Mqtt5Publ } catch (NumberFormatException e) { LOG.trace("value is not a number {}", value); } - switch (key) { - case USER_PROPERTIES_KEY_FOR_ID -> builder.setId(UuidSerializer.deserialize(value)); - case USER_PROPERTIES_KEY_FOR_MESSAGE_TYPE -> - valueAsInt.map(UMessageType::forNumber).ifPresent(builder::setType); - case USER_PROPERTIES_KEY_FOR_SOURCE_NAME -> builder.setSource(UriSerializer.deserialize(value)); - case USER_PROPERTIES_KEY_FOR_SINK_NAME -> builder.setSink(UriSerializer.deserialize(value)); - case USER_PROPERTIES_KEY_FOR_PRIORITY -> - valueAsInt.map(UPriority::forNumber).ifPresent(builder::setPriority); - case USER_PROPERTIES_KEY_FOR_TTL -> - valueAsInt.ifPresent(builder::setTtl); - case USER_PROPERTIES_KEY_FOR_PERMISSION_LEVEL -> - valueAsInt.ifPresent(builder::setPermissionLevel); - case USER_PROPERTIES_KEY_FOR_COMMSTATUS -> - valueAsInt.map(UCode::forNumber).ifPresent(builder::setCommstatus); - case USER_PROPERTIES_KEY_FOR_REQID -> builder.setReqid(UuidSerializer.deserialize(value)); - case USER_PROPERTIES_KEY_FOR_TOKEN -> - builder.setToken(value); - case USER_PROPERTIES_KEY_FOR_TRACEPARENT -> - builder.setTraceparent(value); - case USER_PROPERTIES_KEY_FOR_PAYLOAD_FORMAT -> - valueAsInt.map(UPayloadFormat::forNumber).ifPresent(builder::setPayloadFormat); + int keyAsNumber; + try { + keyAsNumber = Integer.parseInt(key); + } catch (NumberFormatException e) { + LOG.error("key is not a number {}", key); + return; + } + //@formatter:off + switch (keyAsNumber) { + case UAttributes.ID_FIELD_NUMBER -> builder.setId(UuidSerializer.deserialize(value)); + case UAttributes.TYPE_FIELD_NUMBER -> valueAsInt.map(UMessageType::forNumber).ifPresent(builder::setType); + case UAttributes.SOURCE_FIELD_NUMBER -> builder.setSource(UriSerializer.deserialize(value)); + case UAttributes.SINK_FIELD_NUMBER -> builder.setSink(UriSerializer.deserialize(value)); + case UAttributes.PRIORITY_FIELD_NUMBER -> valueAsInt.map(UPriority::forNumber).ifPresent(builder::setPriority); + case UAttributes.TTL_FIELD_NUMBER -> valueAsInt.ifPresent(builder::setTtl); + case UAttributes.PERMISSION_LEVEL_FIELD_NUMBER -> valueAsInt.ifPresent(builder::setPermissionLevel); + case UAttributes.COMMSTATUS_FIELD_NUMBER -> valueAsInt.map(UCode::forNumber).ifPresent(builder::setCommstatus); + case UAttributes.REQID_FIELD_NUMBER -> builder.setReqid(UuidSerializer.deserialize(value)); + case UAttributes.TOKEN_FIELD_NUMBER -> builder.setToken(value); + case UAttributes.TRACEPARENT_FIELD_NUMBER -> builder.setTraceparent(value); + case UAttributes.PAYLOAD_FORMAT_FIELD_NUMBER -> valueAsInt.map(UPayloadFormat::forNumber).ifPresent(builder::setPayloadFormat); default -> LOG.warn("unknown user properties for key {}", key); } + //@formatter:on }); return builder.build(); diff --git a/src/main/java/org/eclipse/uprotocol/mqtt/TransportFactory.java b/src/main/java/org/eclipse/uprotocol/mqtt/TransportFactory.java index f951608..371949b 100644 --- a/src/main/java/org/eclipse/uprotocol/mqtt/TransportFactory.java +++ b/src/main/java/org/eclipse/uprotocol/mqtt/TransportFactory.java @@ -12,16 +12,75 @@ */ package org.eclipse.uprotocol.mqtt; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import org.eclipse.uprotocol.transport.UTransport; +import org.eclipse.uprotocol.uri.serializer.UriSerializer; import org.eclipse.uprotocol.v1.UUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; public class TransportFactory { + private static final Logger log = LoggerFactory.getLogger(TransportFactory.class); + /** + * factory which receives a pre-build mqtt client + *

+ * suggested method if you use something like spring-boot which creates mqttClient by configuration + * @param source UUri of your uEntity + * @param client mqttClient to connect to the mqtt broker + * @return uTransport instance for mqtt + */ public static UTransport createInstance(UUri source, Mqtt5Client client) { if (source == null || client == null) throw new IllegalArgumentException("source and client must not be null"); + if (client.getConfig().getConnectionConfig().isEmpty()) { + log.warn("client seems NOT to be connected. try to connect now"); + client.toBlocking().connect(); + } + return new HiveMqMQTT5Client(source, client); } + + /** + * create uTransport client with given parameters. + * system default trust store, ciphers and protocols are used. + * @param source UUri of your uEntity + * @param serverHost hostname / domain of mqtt broker + * @param port of mqtt broker + * @return uTransport instance for mqtt + */ + public static UTransport createInstanceWithSystemSSLConfig(UUri source, String serverHost, int port) { + Mqtt5BlockingClient client = MqttClient.builder().useMqttVersion5() + + .identifier("uprotocol-mqtt-client-" + UriSerializer.serialize(source)) + .serverHost(serverHost) + .serverPort(port) + + .sslWithDefaultConfig() + + .automaticReconnect() + .initialDelay(1, TimeUnit.SECONDS) + .applyAutomaticReconnect() + + .buildBlocking(); + + client.connect(); + + return new HiveMqMQTT5Client(source, client.toAsync()); + } + + /** + * create uTransport client with given parameters + * @param source UUri of your uEntity + * @param serverHost hostname / domain of mqtt broker + * @return uTransport instance for mqtt + */ + public static UTransport createInstanceWithSystemSSLConfig(UUri source, String serverHost) { + return createInstanceWithSystemSSLConfig(source, serverHost, 1883); + } } diff --git a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java index 51df646..78a9600 100644 --- a/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java +++ b/src/test/java/org/eclipse/uprotocol/mqtt/HiveMqIntegratedTest.java @@ -23,7 +23,6 @@ import org.eclipse.uprotocol.transport.UListener; import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.transport.builder.UMessageBuilder; -import org.eclipse.uprotocol.uri.factory.UriFactory; import org.eclipse.uprotocol.uri.serializer.UriSerializer; import org.eclipse.uprotocol.v1.*; import org.junit.jupiter.api.BeforeEach; @@ -40,12 +39,9 @@ import org.testcontainers.utility.DockerImageName; import java.nio.charset.Charset; -import java.util.Calendar; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; -import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SINK_NAME; -import static org.eclipse.uprotocol.mqtt.HiveMqMQTT5Client.USER_PROPERTIES_KEY_FOR_SOURCE_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -135,8 +131,8 @@ void givenBlancoListener_whenAddingListenerAndReceivingMessages_shouldCallListen mqttClientForTests.publishWith().topic("a/some-source/c/d/e/some-sink/a/b/c") .userProperties(Mqtt5UserProperties.of( - Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SOURCE_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build())), - Mqtt5UserProperty.of(USER_PROPERTIES_KEY_FOR_SINK_NAME, UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build())) + Mqtt5UserProperty.of(String.valueOf(UAttributes.SOURCE_FIELD_NUMBER), UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testSource.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(0).build())), + Mqtt5UserProperty.of(String.valueOf(UAttributes.SINK_FIELD_NUMBER), UriSerializer.serialize(UUri.newBuilder().setAuthorityName("testDestination.someUri.network").setUeId(2).setUeVersionMajor(1).setResourceId(1).build())) )) .payload("Hello World".getBytes(Charset.defaultCharset())) .send(); diff --git a/src/test/java/org/eclipse/uprotocol/mqtt/TransportFactoryTest.java b/src/test/java/org/eclipse/uprotocol/mqtt/TransportFactoryTest.java index 1a5845b..323963a 100644 --- a/src/test/java/org/eclipse/uprotocol/mqtt/TransportFactoryTest.java +++ b/src/test/java/org/eclipse/uprotocol/mqtt/TransportFactoryTest.java @@ -12,14 +12,19 @@ */ package org.eclipse.uprotocol.mqtt; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConnectionConfig; import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.v1.UUri; import org.junit.jupiter.api.Test; +import java.util.Optional; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; class TransportFactoryTest { @@ -34,11 +39,29 @@ void givenNoClient_whenInvokeCreateInstance_shouldThrowAnException() { @Test void givenValidClient_whenInvokeCreateInstance_shouldReturnInstanceOfUTransport() { Mqtt5Client mockedClient = mock(Mqtt5Client.class); + Mqtt5ClientConfig mockedConfig = mock(Mqtt5ClientConfig.class); + doReturn(mockedConfig).when(mockedClient).getConfig(); + doReturn(Optional.of(mock(Mqtt5ClientConnectionConfig.class))).when(mockedConfig).getConnectionConfig(); + + var result = TransportFactory.createInstance(mock(UUri.class), mockedClient); + + assertThat(result) + .isInstanceOf(HiveMqMQTT5Client.class) + .isInstanceOf(UTransport.class); + } + + @Test + void givenNotConnectedClient_whenInvokeCreateInstance_shouldInvokeConnect() { + Mqtt5Client mockedClient = mock(Mqtt5Client.class); + doReturn(mock(Mqtt5ClientConfig.class)).when(mockedClient).getConfig(); + Mqtt5BlockingClient mockedInnerClient = mock(Mqtt5BlockingClient.class); + doReturn(mockedInnerClient).when(mockedClient).toBlocking(); var result = TransportFactory.createInstance(mock(UUri.class), mockedClient); assertThat(result) .isInstanceOf(HiveMqMQTT5Client.class) .isInstanceOf(UTransport.class); + verify(mockedInnerClient).connect(); } } \ No newline at end of file From 685fc40ba6330d6ea823378efb790b3937f126a1 Mon Sep 17 00:00:00 2001 From: r-vanooyen Date: Thu, 15 Aug 2024 12:10:08 +0200 Subject: [PATCH 9/9] chore: adjusted version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1957606..3589617 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ up-client-mqtt5-java MQTT5 Transport Library for uProtocol with java MQTT5 Transport Library for uProtocol with java - 0.2.0-alpha-SNAPSHOT + 0.2.0-SNAPSHOT jar https://github.com/eclipse-uprotocol/up-client-mqtt5-java/