diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java index 586bfa7ad2c..3ceb3734bcc 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java @@ -18,9 +18,7 @@ */ package org.apache.pinot.tools.streams; -import com.fasterxml.jackson.databind.JsonNode; -import javax.websocket.MessageHandler; -import org.apache.pinot.spi.utils.JsonUtils; +import java.util.function.Consumer; import static java.nio.charset.StandardCharsets.UTF_8; @@ -38,20 +36,17 @@ public MeetupRsvpJsonStream(boolean partitionByKey) } @Override - protected MessageHandler.Whole getMessageHandler() { + protected Consumer createConsumer() { return message -> { - if (_keepPublishing) { - if (_partitionByKey) { - try { - JsonNode messageJson = JsonUtils.stringToJsonNode(message); - String rsvpId = messageJson.get("rsvp_id").asText(); - _producer.produce(_topicName, rsvpId.getBytes(UTF_8), message.getBytes(UTF_8)); - } catch (Exception e) { - LOGGER.error("Caught exception while processing the message: {}", message, e); - } - } else { - _producer.produce(_topicName, message.getBytes(UTF_8)); + if (_partitionByKey) { + try { + _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString() + .getBytes(UTF_8)); + } catch (Exception e) { + LOGGER.error("Caught exception while processing the message: {}", message, e); } + } else { + _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8)); } }; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index 5aa8abea301..d10955a22eb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -18,20 +18,20 @@ */ package org.apache.pinot.tools.streams; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.net.URI; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; import java.util.Properties; -import javax.websocket.ClientEndpointConfig; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.MessageHandler; -import javax.websocket.Session; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.utils.KafkaStarterUtils; -import org.glassfish.tyrus.client.ClientManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +40,18 @@ public class MeetupRsvpStream { protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class); + private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter(); private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents"; protected String _topicName = DEFAULT_TOPIC_NAME; protected final boolean _partitionByKey; protected final StreamDataProducer _producer; - - protected ClientManager _client; - protected volatile boolean _keepPublishing; + private final Source _source; public MeetupRsvpStream() throws Exception { @@ -63,77 +67,87 @@ public MeetupRsvpStream(boolean partitionByKey) properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); - } - - public MeetupRsvpStream(boolean partitionByKey, StreamDataProducer producer, String topicName) { - _partitionByKey = partitionByKey; - _producer = producer; - _topicName = topicName; + _source = new Source(createConsumer()); } public void run() throws Exception { - _client = ClientManager.createClient(); - _keepPublishing = true; - - _client.connectToServer(new Endpoint() { - @Override - public void onOpen(Session session, EndpointConfig config) { - session.addMessageHandler(String.class, getMessageHandler()); - } - }, ClientEndpointConfig.Builder.create().build(), new URI("wss://stream.meetup.com/2/rsvps")); + _source.start(); } public void stopPublishing() { - _keepPublishing = false; - _client.shutdown(); _producer.close(); + _source.close(); } - protected MessageHandler.Whole getMessageHandler() { + protected Consumer createConsumer() { return message -> { try { - JsonNode messageJson = JsonUtils.stringToJsonNode(message); - ObjectNode extractedJson = JsonUtils.newObjectNode(); - - JsonNode venue = messageJson.get("venue"); - if (venue != null) { - extractedJson.set("venue_name", venue.get("venue_name")); + if (_partitionByKey) { + _producer.produce(_topicName, message.getEventId().getBytes(UTF_8), + message.getPayload().toString().getBytes(UTF_8)); + } else { + _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8)); } + } catch (Exception e) { + LOGGER.error("Caught exception while processing the message: {}", message, e); + } + }; + } - JsonNode event = messageJson.get("event"); - String eventId = ""; - if (event != null) { - extractedJson.set("event_name", event.get("event_name")); - eventId = event.get("event_id").asText(); - extractedJson.put("event_id", eventId); - extractedJson.set("event_time", event.get("time")); - } + private static class Source implements AutoCloseable, Runnable { - JsonNode group = messageJson.get("group"); - if (group != null) { - extractedJson.set("group_city", group.get("group_city")); - extractedJson.set("group_country", group.get("group_country")); - extractedJson.set("group_id", group.get("group_id")); - extractedJson.set("group_name", group.get("group_name")); - extractedJson.set("group_lat", group.get("group_lat")); - extractedJson.set("group_lon", group.get("group_lon")); - } + private final Consumer _consumer; + + private final ExecutorService _executorService = Executors.newSingleThreadExecutor(); + private volatile Future _future; - extractedJson.set("mtime", messageJson.get("mtime")); - extractedJson.put("rsvp_count", 1); + private Source(Consumer consumer) { + _consumer = consumer; + } - if (_keepPublishing) { - if (_partitionByKey) { - _producer.produce(_topicName, eventId.getBytes(UTF_8), - extractedJson.toString().getBytes(UTF_8)); - } else { - _producer.produce(_topicName, extractedJson.toString().getBytes(UTF_8)); - } + @Override + public void close() { + if (_future != null) { + _future.cancel(true); + } + _executorService.shutdownNow(); + } + + public void start() { + _future = _executorService.submit(this); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + RSVP rsvp = createMessage(); + _consumer.accept(rsvp); + int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1; + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - } catch (Exception e) { - LOGGER.error("Caught exception while processing the message: {}", message, e); } - }; + } + + private RSVP createMessage() { + String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + ""; + ObjectNode json = JsonUtils.newObjectNode(); + json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt()); + json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt()); + json.put("event_id", eventId); + json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10))); + json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt()); + json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt()); + json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong())); + json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt()); + json.put("group_lat", ThreadLocalRandom.current().nextFloat()); + json.put("group_lon", ThreadLocalRandom.current().nextFloat()); + json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now())); + json.put("rsvp_count", 1); + return new RSVP(eventId, eventId, json); + } } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java new file mode 100644 index 00000000000..a192c788a5d --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.streams; + +import com.fasterxml.jackson.databind.JsonNode; + + +class RSVP { + private final String _eventId; + private final String _rsvpId; + private final JsonNode _payload; + + RSVP(String eventId, String rsvpId, JsonNode payload) { + _eventId = eventId; + _rsvpId = rsvpId; + _payload = payload; + } + + public String getEventId() { + return _eventId; + } + + public String getRsvpId() { + return _rsvpId; + } + + public JsonNode getPayload() { + return _payload; + } +}