diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java index ed6a91c2..33b59fac 100644 --- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java @@ -48,19 +48,19 @@ protected void printStats() throws IOException { return; } - // tps: 每秒文件打印的行数 + // tps: rows to print for each second final long tps = currentRowCount - previousRowCount.get(); previousRowCount.set(currentRowCount); writeCount.add(currentRowCount); costTime.add(1000); - // delayTime(条/ms)=接收的数量/花费的时间 + // delayTime(record/ms)= receiving-amount / time final double delayTime = writeCount.longValue() / costTime.longValue(); // String delayTimeStr = twoDecimal(delayTime); String info = String.format("Current Time: %s | TPS: %d ", UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps); - System.out.println(info); + System.out.printf("%s%n", info); } diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java index e9acb87e..8f91eb82 100644 --- a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java @@ -16,18 +16,20 @@ */ package org.apache.rocketmq.eventbridge.adapter.benchmark; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.LineNumberReader; import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** - * 整条链路 + * End-to-End use case */ public class EventTPSCommon extends AbstractEventCommon { public static void main(String[] args) { @@ -35,12 +37,10 @@ public static void main(String[] args) { if (args.length > 0) { filePath = args[0]; } - EventTPSCommon tpsCommon = null; + EventTPSCommon tpsCommon; try { tpsCommon = new EventTPSCommon(filePath); tpsCommon.start(); - } catch (FileNotFoundException e) { - e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } @@ -56,7 +56,7 @@ private void init(String filePath) throws FileNotFoundException { previousRowCount = new AtomicReference<>(); previousRowCount.set(0); executorService = new ScheduledThreadPoolExecutor(1, - new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build()); + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build()); } @Override diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java index 6718b8d6..84188307 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java @@ -75,7 +75,7 @@ public class DatasourceConfig { private Long validationTimeoutMs; @Bean("dataSource") - public DataSource getMasterDataSource(){ + public DataSource getMasterDataSource() { HikariConfig hikariConfig = new HikariConfig(); hikariConfig.setJdbcUrl(baseUrl); hikariConfig.setDriverClassName(baseDriverClassName); @@ -102,7 +102,8 @@ public SqlSessionFactory masterSqlSessionFactory(@Qualifier("dataSource") DataSo } @Bean("sqlSessionTemplate") - public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){ + public SqlSessionTemplate masterSqlSessionTemplate( + @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); } diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java index 97bfb888..adab7ec4 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper; diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java index 77c6bc92..a0ebc6ea 100644 --- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java +++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java @@ -52,19 +52,21 @@ private TargetRunnerConfig buildTargetRunnerConfig(String accountId, String name targetRunnerConfig.setName(name); List> components = Lists.newArrayList(); targetRunnerConfig.setComponents(components); - Map sourceComponent = new Gson().fromJson(new Gson().toJson(source - .getConfig()), new TypeToken>() { - }.getType()); - Map filterComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern) - .getConfig()), new TypeToken>() { - }.getType()); - - Map transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform) - .getConfig()), new TypeToken>() { - }.getType()); - Map targetComponent = new Gson().fromJson(new Gson().toJson(target - .getConfig()), new TypeToken>() { - }.getType()); + + Map sourceComponent = new Gson().fromJson( + new Gson().toJson(source.getConfig()), + new TypeToken>() {}.getType()); + + Map filterComponent = new Gson().fromJson( + new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()), + new TypeToken>() {}.getType()); + + Map transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()), + new TypeToken>() {}.getType()); + + Map targetComponent = new Gson().fromJson(new Gson().toJson(target.getConfig()), + new TypeToken>() {}.getType()); + components.add(sourceComponent); components.add(filterComponent); components.add(transformComponent); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java index af8bbbd3..ba3ed1fc 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java @@ -34,19 +34,16 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown; - import javax.annotation.PostConstruct; import java.util.concurrent.atomic.AtomicReference; /** * event bridge runtime - * - * @author artisan */ @Component public class Runtime { - private static final Logger logger = LoggerFactory.getLogger(Runtime.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Runtime.class); private AtomicReference runtimerState; @@ -65,7 +62,7 @@ public class Runtime { @PostConstruct public void initAndStart() throws Exception { - logger.info("Start init runtime."); + LOGGER.info("Start init runtime."); circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig()); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); @@ -80,11 +77,11 @@ public void initAndStart() throws Exception { RUNTIME_START_AND_SHUTDOWN.start(); java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("try to shutdown server"); + LOGGER.info("try to shutdown server"); try { RUNTIME_START_AND_SHUTDOWN.shutdown(); } catch (Exception e) { - logger.error("err when shutdown runtime ", e); + LOGGER.error("err when shutdown runtime ", e); } })); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index 48af27f3..b14b551e 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -30,12 +30,10 @@ /** * listen the event and offer to queue - * - * @author artisan */ public class EventBusListener extends ServiceThread { - private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EventBusListener.class); private final CirculatorContext circulatorContext; private final EventSubscriber eventSubscriber; @@ -60,7 +58,7 @@ public void run() { } circulatorContext.offerEventRecords(pullRecordList); } catch (Exception exception) { - logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); + LOGGER.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception)); } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index f24ed1b7..bc671dba 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -41,7 +41,7 @@ */ public class EventRuleTransfer extends ServiceThread { - private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EventRuleTransfer.class); private volatile Integer batchSize = 100; @@ -68,18 +68,18 @@ public void init() { @Override public void run() { - List afterTransformConnect = new CopyOnWriteArrayList<>();; + List afterTransformConnect = new CopyOnWriteArrayList<>(); while (!stopped) { try { Map> eventRecordMap = circulatorContext.takeEventRecords(batchSize); if (MapUtils.isEmpty(eventRecordMap)) { - logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); + LOGGER.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(1000); continue; } Map> latestTransformMap = circulatorContext.getTaskTransformMap(); if (MapUtils.isEmpty(latestTransformMap)) { - logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); + LOGGER.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(3000); continue; } @@ -92,7 +92,7 @@ public void run() { curEventRecords.forEach(pullRecord -> { CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { - logger.error("transfer do transform event record failed,stackTrace-", exception); + LOGGER.error("transfer do transform event record failed, stackTrace-", exception); errorHandler.handle(pullRecord, exception); return null; }) @@ -108,9 +108,9 @@ public void run() { } CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); - logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); + LOGGER.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { - logger.error("transfer event record failed, stackTrace-", exception); + LOGGER.error("transfer event record failed, stackTrace-", exception); afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception)); } @@ -127,7 +127,7 @@ public void shutdown() { try { circulatorContext.releaseTaskTransform(); } catch (Exception e) { - logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e))); + LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e))); } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index 85dae3b8..b9d175b9 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -36,12 +36,10 @@ /** * event target push to sink task - * - * @author artisan */ public class EventTargetTrigger extends ServiceThread { - private static final Logger logger = LoggerFactory.getLogger(EventTargetTrigger.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EventTargetTrigger.class); private final CirculatorContext circulatorContext; private final OffsetManager offsetManager; @@ -49,7 +47,7 @@ public class EventTargetTrigger extends ServiceThread { private volatile Integer batchSize = 100; public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager, - ErrorHandler errorHandler) { + ErrorHandler errorHandler) { this.circulatorContext = circulatorContext; this.offsetManager = offsetManager; this.errorHandler = errorHandler; @@ -60,15 +58,15 @@ public void run() { while (!stopped) { Map> targetRecordMap = circulatorContext.takeTargetRecords(batchSize); if (MapUtils.isEmpty(targetRecordMap)) { - logger.trace("current target pusher is empty"); + LOGGER.trace("current target pusher is empty"); this.waitForRunning(1000); continue; } - if (logger.isDebugEnabled()) { - logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap)); } - for(String runnerName: targetRecordMap.keySet()){ + for (String runnerName : targetRecordMap.keySet()) { ExecutorService executorService = circulatorContext.getExecutorService(runnerName); executorService.execute(() -> { SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName); @@ -77,7 +75,7 @@ public void run() { sinkTask.put(triggerRecords); offsetManager.commit(triggerRecords); } catch (Exception exception) { - logger.error(getServiceName() + " push target exception, stackTrace-", exception); + LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception); triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception)); } }); @@ -101,7 +99,7 @@ public void shutdown() { circulatorContext.releaseExecutorService(); circulatorContext.releaseTriggerTask(); } catch (Exception e) { - logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e))); + LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e))); } } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java index 2763be30..3ea21fad 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java @@ -21,6 +21,12 @@ import com.google.common.collect.Maps; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext; @@ -41,40 +47,40 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; /** - * event circulator context for listener, transfer and trigger + * Event circulatory context for listener, transfer and trigger */ @Component public class CirculatorContext implements TargetRunnerListener { - private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener); + private final static Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BUS_LISTENER); @Autowired private Plugin plugin; - private static Integer QUEUE_CAPACITY = 50000; + private static final Integer QUEUE_CAPACITY = 50000; - private BlockingQueue eventQueue = new LinkedBlockingQueue<>(50000); + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(50000); - private BlockingQueue targetQueue = new LinkedBlockingQueue<>(50000); + private final BlockingQueue targetQueue = new LinkedBlockingQueue<>(50000); - private Map runnerConfigMap = new ConcurrentHashMap<>(30); + private final Map runnerConfigMap = new ConcurrentHashMap<>(30); - private Map> eventQueueMap = new ConcurrentHashMap<>(30); + private final Map> eventQueueMap = new ConcurrentHashMap<>(30); - private Map> targetQueueMap = new ConcurrentHashMap<>(30); + private final Map> targetQueueMap = new ConcurrentHashMap<>(30); - private Map> taskTransformMap = new ConcurrentHashMap<>(20); + private final Map> taskTransformMap = new ConcurrentHashMap<>(20); - private Map pusherTaskMap = new ConcurrentHashMap<>(20); + private final Map pusherTaskMap = new ConcurrentHashMap<>(20); - private Map pusherExecutorMap = new ConcurrentHashMap<>(10); + private final Map pusherExecutorMap = new ConcurrentHashMap<>(10); /** * initial targetRunnerMap, taskTransformMap, pusherTaskMap - * @param targetRunnerConfigs + * + * @param targetRunnerConfigs Configurations for the target runner */ public void initCirculatorContext(Set targetRunnerConfigs) { if (CollectionUtils.isEmpty(targetRunnerConfigs)) { @@ -102,10 +108,11 @@ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { /** * get target runner config by runner name + * * @param runnerName * @return */ - public TargetRunnerConfig getRunnerConfig(String runnerName){ + public TargetRunnerConfig getRunnerConfig(String runnerName) { return runnerConfigMap.get(runnerName); } @@ -122,21 +129,23 @@ public boolean offerEventRecords(List connectRecords) { /** * update record queue map + * * @param recordMap * @param eventQueueMap */ - private boolean updateRecordQueueMap(Map> recordMap, Map> eventQueueMap) { - try{ - for(String runnerName : recordMap.keySet()){ + private boolean updateRecordQueueMap(Map> recordMap, + Map> eventQueueMap) { + try { + for (String runnerName : recordMap.keySet()) { BlockingQueue recordQueue = eventQueueMap.get(runnerName); - if(CollectionUtils.isEmpty(recordQueue)){ + if (CollectionUtils.isEmpty(recordQueue)) { recordQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); } recordQueue.addAll(recordMap.get(runnerName)); eventQueueMap.put(runnerName, recordQueue); } return true; - }catch (Exception exception){ + } catch (Exception exception) { return false; } } @@ -147,7 +156,7 @@ private boolean updateRecordQueueMap(Map> recordMap, * @return */ public Map> takeEventRecords(int batchSize) { - if(eventQueue.isEmpty()){ + if (eventQueue.isEmpty()) { return null; } List eventRecords = Lists.newArrayList(); @@ -171,11 +180,12 @@ public boolean offerTargetTaskQueue(List connectRecords) { /** * take batch target records + * * @param batchSize * @return */ public Map> takeTargetRecords(Integer batchSize) { - if(targetQueue.isEmpty()){ + if (targetQueue.isEmpty()) { return null; } List targetRecords = Lists.newArrayList(); @@ -185,6 +195,7 @@ public Map> takeTargetRecords(Integer batchSize) { /** * user runner-name as key + * * @param eventRecords * @return */ @@ -193,7 +204,7 @@ private Map> buildWithRunnerNameKeyMap(List curEventRecords = eventRecordMap.get(runnerName); - if(CollectionUtils.isEmpty(curEventRecords)){ + if (CollectionUtils.isEmpty(curEventRecords)) { curEventRecords = Lists.newArrayList(); } curEventRecords.add(connectRecord); @@ -204,15 +215,17 @@ private Map> buildWithRunnerNameKeyMap(List transformChain = new TransformEngine<>(targetRunnerConfig.getComponents(), plugin); taskTransformMap.put(runnerName, transformChain); - int endIndex = targetRunnerConfig.getComponents().size() -1; + int endIndex = targetRunnerConfig.getComponents().size() - 1; TargetKeyValue targetKeyValue = new TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex)); SinkTask sinkTask = initTargetSinkTask(targetKeyValue); pusherTaskMap.put(runnerName, sinkTask); @@ -234,16 +247,16 @@ private void refreshRunnerContext(TargetRunnerConfig targetRunnerConfig, Refresh pusherExecutorMap.put(runnerName, initDefaultThreadPoolExecutor(runnerName)); } - if(logger.isInfoEnabled()){ - logger.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name()); } break; case DELETE: runnerConfigMap.remove(runnerName); taskTransformMap.remove(runnerName); pusherTaskMap.remove(runnerName); - if(logger.isInfoEnabled()){ - logger.info("runnerName -{}- remove context succeed", runnerName); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("runnerName -{}- remove context succeed", runnerName); } break; default: @@ -253,16 +266,18 @@ private void refreshRunnerContext(TargetRunnerConfig targetRunnerConfig, Refresh /** * init default thread poll param, support auto config + * * @param threadPollName * @return */ private ExecutorService initDefaultThreadPoolExecutor(String threadPollName) { return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false)); + new LinkedBlockingQueue<>(300), ThreadUtils.newThreadFactory(threadPollName, false)); } /** * init target sink task + * * @param targetKeyValue * @return */ @@ -286,13 +301,12 @@ private SinkTask initTargetSinkTask(TargetKeyValue targetKeyValue) { Plugin.compareAndSwapLoaders(loader); } return sinkTask; - }catch (Exception exception) { - logger.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception); + } catch (Exception exception) { + LOGGER.error("task class -" + taskClass + "- init its sinkTask failed, ex- ", exception); } return null; } - public void releaseTaskTransform() throws Exception { for (Map.Entry> taskTransform : taskTransformMap.entrySet()) { String runnerName = taskTransform.getKey(); @@ -303,7 +317,7 @@ public void releaseTaskTransform() throws Exception { } public void releaseTriggerTask() { - for (Map.Entry triggerTask: pusherTaskMap.entrySet()) { + for (Map.Entry triggerTask : pusherTaskMap.entrySet()) { SinkTask sinkTask = triggerTask.getValue(); String runnerName = triggerTask.getKey(); sinkTask.stop(); @@ -312,7 +326,7 @@ public void releaseTriggerTask() { } public void releaseExecutorService() throws Exception { - for (Map.Entry pusherExecutor: pusherExecutorMap.entrySet()) { + for (Map.Entry pusherExecutor : pusherExecutorMap.entrySet()) { ExecutorService pusher = pusherExecutor.getValue(); ShutdownUtils.shutdownThreadPool(pusher); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java index be4db9bb..dc18ab7f 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java @@ -29,6 +29,7 @@ public abstract class EventSubscriber implements TargetRunnerListener { /** * Refresh subscriber inner data when runner keys changed + * * @param subscribeRunnerKeys * @param refreshTypeEnum */ @@ -42,7 +43,7 @@ public abstract class EventSubscriber implements TargetRunnerListener { public abstract List pull(); /** - * Commit the connect records. + * Commit connect records. * * @param connectRecordList */ @@ -54,12 +55,13 @@ public abstract class EventSubscriber implements TargetRunnerListener { public abstract void close(); /** - * Put the connect record to the eventbus. + * Put connect record to the eventbus. + * * @param eventBusName * @param connectRecord * @param delaySec */ - public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec){ + public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec) { // convert the eventBusName to Topic ? return true; } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java index f9e879a8..5737892e 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java @@ -37,7 +37,7 @@ public class TransformEngine implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventRule_Transfer); + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_RULE_TRANSFER); private final List transformList; @@ -76,18 +76,19 @@ private void init() { transform.init(transformConfig); this.transformList.add(transform); } catch (Exception e) { - logger.error("transform new instance error", e); + LOGGER.error("transform new instance error", e); } } } /** * format listener and pusher key + * * @param components * @return */ private TargetKeyValue formatTargetKey(List> components) { - if(CollectionUtils.isEmpty(components)){ + if (CollectionUtils.isEmpty(components)) { return null; } int startIndex = 0; @@ -101,6 +102,7 @@ private TargetKeyValue formatTargetKey(List> components) { /** * transform event record for target record + * * @param connectRecord * @return */ @@ -120,10 +122,11 @@ public R doTransforms(R connectRecord) { /** * get task config value by key + * * @param configKey * @return */ - public String getConnectConfig(String configKey){ + public String getConnectConfig(String configKey) { return config.getString(configKey); } @@ -149,8 +152,10 @@ private Transform getTransform(String transformClass) throws Exception { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; TransformEngine that = (TransformEngine) o; return transformList.equals(that.transformList) && config.equals(that.config); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java index 6af6382c..6edb06d1 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java @@ -43,7 +43,7 @@ public class TriggerTaskContext implements SinkTaskContext { */ private final TargetKeyValue taskConfig; - private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventTarget_Trigger); + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_TARGET_TRIGGER); private final Map messageQueuesOffsetMap = new ConcurrentHashMap<>(64); @@ -61,20 +61,20 @@ public TriggerTaskContext(TargetKeyValue taskConfig) { @Override public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) { - logger.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset); + LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset); return; } String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME); String topic = (String) recordPartition.getPartition().get(TOPIC); Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID)); if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) { - logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); + LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); return; } MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET)); if (null == offset) { - logger.warn("resetOffset, offset is null"); + LOGGER.warn("resetOffset, offset is null"); return; } messageQueuesOffsetMap.put(messageQueue, offset); @@ -83,12 +83,12 @@ public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffs @Override public void resetOffset(Map offsets) { if (MapUtils.isEmpty(offsets)) { - logger.warn("resetOffset, offsets {} is null", offsets); + LOGGER.warn("resetOffset, offsets {} is null", offsets); return; } for (Map.Entry entry : offsets.entrySet()) { if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) { - logger.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry); + LOGGER.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry); continue; } RecordPartition recordPartition = entry.getKey(); @@ -96,14 +96,14 @@ public void resetOffset(Map offsets) { String topic = (String) recordPartition.getPartition().get(TOPIC); Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID)); if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) { - logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); + LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); continue; } MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); RecordOffset recordOffset = entry.getValue(); Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET)); if (null == offset) { - logger.warn("resetOffset, offset is null"); + LOGGER.warn("resetOffset, offset is null"); continue; } messageQueuesOffsetMap.put(messageQueue, offset); @@ -113,24 +113,24 @@ public void resetOffset(Map offsets) { @Override public void pause(List recordPartitions) { if (recordPartitions == null || recordPartitions.size() == 0) { - logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions)); + LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions)); return; } for (RecordPartition recordPartition : recordPartitions) { if (null == recordPartition || null == recordPartition.getPartition()) { - logger.warn("recordPartition {} info is null", recordPartition); + LOGGER.warn("recordPartition {} info is null", recordPartition); continue; } String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME); String topic = (String) recordPartition.getPartition().get(TOPIC); Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID)); if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) { - logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); + LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); continue; } MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); if (!messageQueuesOffsetMap.containsKey(messageQueue)) { - logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue); + LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue); continue; } messageQueuesStateMap.put(messageQueue, QueueState.PAUSE); @@ -140,24 +140,24 @@ public void pause(List recordPartitions) { @Override public void resume(List recordPartitions) { if (recordPartitions == null || recordPartitions.size() == 0) { - logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions)); + LOGGER.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions)); return; } for (RecordPartition recordPartition : recordPartitions) { if (null == recordPartition || null == recordPartition.getPartition()) { - logger.warn("recordPartition {} info is null", recordPartition); + LOGGER.warn("recordPartition {} info is null", recordPartition); continue; } String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME); String topic = (String) recordPartition.getPartition().get(TOPIC); Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID)); if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) { - logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); + LOGGER.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic); continue; } MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); if (!messageQueuesOffsetMap.containsKey(messageQueue)) { - logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue); + LOGGER.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue); continue; } messageQueuesStateMap.remove(messageQueue); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java index 39734c4d..21740608 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java @@ -21,8 +21,8 @@ * Define all the logger name of the runtime. */ public class LoggerName { - public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer"; - public static final String EventBus_Listener = "EventBusListener"; - public static final String EventRule_Transfer = "EventRuleTransfer"; - public static final String EventTarget_Trigger = "EventTargetTrigger"; + public static final String EVENT_BRIDGE_RUNTIMER = "EventBridgeRuntimer"; + public static final String EVENT_BUS_LISTENER = "EventBusListener"; + public static final String EVENT_RULE_TRANSFER = "EventRuleTransfer"; + public static final String EVENT_TARGET_TRIGGER = "EventTargetTrigger"; } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java index 400cf471..027925e7 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java @@ -27,7 +27,7 @@ public abstract class ServiceThread extends AbstractStartAndShutdown implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER); private static final long JOIN_TIME = 90 * 1000; @@ -45,14 +45,14 @@ public ServiceThread() { public abstract String getServiceName(); public void start() { - logger.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); + LOGGER.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); if (!hasNotified.compareAndSet(false, true)) { return; } stopped = false; this.thread.setDaemon(isDaemon); this.thread.start(); - logger.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); + LOGGER.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); } public void shutdown() { @@ -61,7 +61,7 @@ public void shutdown() { public void shutdown(final boolean interrupt) { this.stopped = true; - logger.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); + LOGGER.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify @@ -77,10 +77,10 @@ public void shutdown(final boolean interrupt) { this.thread.join(this.getJointime()); } long eclipseTime = System.currentTimeMillis() - beginTime; - logger.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + LOGGER.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + this.getJointime()); } catch (InterruptedException e) { - logger.error("Interrupted", e); + LOGGER.error("Interrupted", e); } } @@ -94,7 +94,7 @@ public void stop() { public void stop(final boolean interrupt) { this.stopped = true; - logger.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); + LOGGER.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify @@ -107,7 +107,7 @@ public void stop(final boolean interrupt) { public void makeStop() { this.stopped = true; - logger.info("makestop thread " + this.getServiceName()); + LOGGER.info("makestop thread " + this.getServiceName()); } public void wakeup() { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java index aa696e86..58d8e2b0 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java @@ -139,15 +139,17 @@ public void setProperties(Map properties) { this.properties = properties; } - public KeyValue putAll(Map configProps){ + public KeyValue putAll(Map configProps) { this.properties.putAll(configProps); return this; } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; TargetKeyValue that = (TargetKeyValue) o; return Objects.equals(targetKeyId, that.targetKeyId) && Objects.equals(properties, that.properties); } @@ -160,9 +162,9 @@ public int hashCode() { @Override public String toString() { return "TargetKeyValue{" + - "targetKeyId='" + targetKeyId + '\'' + - ", properties=" + properties + - '}'; + "targetKeyId='" + targetKeyId + '\'' + + ", properties=" + properties + + '}'; } public String getTargetKeyId() { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java index 6cbc2f70..d3ab201c 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java @@ -71,19 +71,11 @@ public String toString() { private boolean isEqualsComponents(List> source, List> target) { if (source == null || target == null) { - if (source != target) { - return false; - } else { - return true; - } + return source == target; } if (source.isEmpty() || target.isEmpty()) { - if (source.isEmpty() && target.isEmpty()) { - return true; - } else { - return false; - } + return source.isEmpty() && target.isEmpty(); } if (source.size() != target.size()) { @@ -99,10 +91,8 @@ private boolean isEqualsComponents(List> source, List classLoaderMap = new HashMap<>(); + private final Map classLoaderMap = new HashMap<>(); public Plugin() { super(new URL[0], Plugin.class.getClassLoader()); @@ -62,7 +68,7 @@ public void initPlugin() { } } - private List initPluginPath(String plugin){ + private List initPluginPath(String plugin) { List pluginPaths = new ArrayList<>(); if (StringUtils.isNotEmpty(plugin)) { String[] strArr = plugin.split(","); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java index 2c11e1ae..0cae63d4 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java @@ -16,6 +16,16 @@ */ package org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.TreeSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +33,6 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; import java.util.regex.Pattern; public class PluginUtils { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java index e16d4ec9..04aa07b0 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java @@ -37,7 +37,7 @@ */ public class FileBaseKeyValueStore extends MemoryBasedKeyValueStore { - private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); + private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER); private String configFilePath; private Converter keyConverter; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java index ea5fd118..cf0cc794 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java @@ -29,25 +29,24 @@ */ public class RuntimeConfigProps { - private final static Logger logger = LoggerFactory.getLogger(RuntimeConfigProps.class); + private final static Logger LOGGER = LoggerFactory.getLogger(RuntimeConfigProps.class); private Properties properties; - private RuntimeConfigProps(){ + private RuntimeConfigProps() { try { - properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties"); + properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties"); } catch (IOException exception) { - logger.error("runtime load properties failed, stackTrace-", exception); + LOGGER.error("runtime load properties failed, stackTrace-", exception); } } - private static class RuntimerConfigPropsHolder{ - private static final RuntimeConfigProps instance = new RuntimeConfigProps(); + private static class RuntimerConfigPropsHolder { + private static final RuntimeConfigProps INSTANCE = new RuntimeConfigProps(); } - public static RuntimeConfigProps build(){ - return RuntimerConfigPropsHolder.instance; + public static RuntimeConfigProps build() { + return RuntimerConfigPropsHolder.INSTANCE; } - } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java index 56fa1239..00b3c4a8 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java @@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; @Configuration public class RuntimeConfiguration { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java index dfad654e..34800ba1 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java @@ -30,7 +30,7 @@ */ public class JsonConverter implements Converter { - private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); + private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER); private Class clazz; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java index b704c7bb..2c310466 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java @@ -32,7 +32,7 @@ */ public class ListConverter implements Converter { - private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); + private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER); private Class clazz; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java index 4a6d9ef2..5b3af125 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java @@ -31,7 +31,7 @@ */ public class RecordOffsetConverter implements Converter { - private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); + private static final Logger log = LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER); @Override public byte[] objectToByte(RecordOffset recordOffset) { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java index b7cee558..a988981b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java @@ -91,7 +91,7 @@ private int calcDelaySec(int retryTimes, PushRetryStrategyEnum pushRetryStrategy return -1; } int pow = (int) Math.pow(2, 3 + retryTimes); - return (pow > 512 ? 512 : pow); + return pow > 512 ? 512 : pow; default: return -1; } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java index 0615e358..589238fc 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java @@ -46,7 +46,7 @@ public Set getTargetRunnerConfig() { @Override public Set getSubscribeRunnerKeys() { - if(CollectionUtils.isEmpty(targetRunnerConfigs)){ + if (CollectionUtils.isEmpty(targetRunnerConfigs)) { return null; } return targetRunnerConfigs.stream().map(item -> { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java index cbd01e07..85547f2b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java @@ -38,7 +38,6 @@ import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; -import org.springframework.stereotype.Component; @Slf4j //@Component diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java index e6a9d1da..30bb31e2 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java @@ -25,7 +25,7 @@ public class ExceptionUtil { - private static final Logger logger = LoggerFactory.getLogger(ExceptionUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionUtil.class); public static String getErrorMessage(Throwable e) { if (null == e) { @@ -40,7 +40,7 @@ public static String getErrorMessage(Throwable e) { StringBuffer buffer = stringWriter.getBuffer(); return buffer.toString(); } catch (Throwable ex) { - logger.error("", ex); + LOGGER.error("", ex); } return null; } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java index cf8679d8..2405a8fd 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java @@ -20,14 +20,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ShutdownUtils { - private static final Logger logger = LoggerFactory.getLogger(ShutdownUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownUtils.class); public static void shutdownThreadPool(ExecutorService executor) { if (executor != null) { @@ -35,7 +33,7 @@ public static void shutdownThreadPool(ExecutorService executor) { try { executor.awaitTermination(60, TimeUnit.SECONDS); } catch (Exception e) { - logger.error("Shutdown threadPool failed", e); + LOGGER.error("Shutdown threadPool failed", e); } if (!executor.isTerminated()) { executor.shutdownNow(); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index 4d32dad9..58dda61b 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -27,6 +27,13 @@ import io.openmessaging.connector.api.data.RecordPartition; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -57,7 +64,6 @@ import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -72,7 +78,7 @@ @DependsOn("flyway") public class RocketMQEventSubscriber extends EventSubscriber { - private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQEventSubscriber.class); @Autowired private EventDataRepository eventDataRepository; @@ -98,7 +104,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { public static final String MSG_ID = "msgId"; @PostConstruct - public void initRocketMQEventSubscriber(){ + public void initRocketMQEventSubscriber() { this.initMqProperties(); this.initConsumeWorkers(); } @@ -123,18 +129,18 @@ public List pull() { ArrayList messages = new ArrayList<>(); messageBuffer.drainTo(messages, pullBatchSize); if (CollectionUtils.isEmpty(messages)) { - logger.trace("consumer poll message empty."); + LOGGER.trace("consumer poll message empty."); return null; } List connectRecords = new CopyOnWriteArrayList<>(); List> completableFutures = Lists.newArrayList(); - messages.forEach(item->{ - CompletableFuture recordCompletableFuture = CompletableFuture.supplyAsync(()-> convertToSinkRecord(item)) - .exceptionally((exception) -> { - logger.error("execute completable job failed,stackTrace-", exception); - return null; - }) - .thenAccept(connectRecords::add); + messages.forEach(item -> { + CompletableFuture recordCompletableFuture = CompletableFuture.supplyAsync(() -> convertToSinkRecord(item)) + .exceptionally((exception) -> { + LOGGER.error("execute completable job failed", exception); + return null; + }) + .thenAccept(connectRecords::add); completableFutures.add(recordCompletableFuture); }); @@ -145,24 +151,25 @@ public List pull() { /** * group by runner name batch commit + * * @param connectRecordList */ @Override public void commit(List connectRecordList) { - if(CollectionUtils.isEmpty(connectRecordList)){ - logger.warn("commit event record data empty!"); + if (CollectionUtils.isEmpty(connectRecordList)) { + LOGGER.warn("commit event record data empty!"); return; } String runnerName = connectRecordList.iterator().next().getExtension(RuntimeConfigDefine.RUNNER_NAME); List msgIds = connectRecordList.stream().map(item -> item.getPosition() - .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList()); + .getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList()); consumeWorkerMap.get(runnerName).commit(msgIds); } @Override public void close() { for (Map.Entry item : consumeWorkerMap.entrySet()) { - ConsumeWorker consumeWorker = item.getValue(); + ConsumeWorker consumeWorker = item.getValue(); consumeWorker.shutdown(); } } @@ -187,7 +194,7 @@ private void initMqProperties() { clientConfig.setNameSrvAddr(namesrvAddr); clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ? - AccessChannel.CLOUD : AccessChannel.LOCAL); + AccessChannel.CLOUD : AccessChannel.LOCAL); clientConfig.setNamespace(namespace); this.clientConfig = clientConfig; @@ -196,7 +203,7 @@ private void initMqProperties() { } if (StringUtils.isNotBlank(socks5UserName) && StringUtils.isNotBlank(socks5Password) - && StringUtils.isNotBlank(socks5Endpoint)) { + && StringUtils.isNotBlank(socks5Endpoint)) { SocksProxyConfig proxyConfig = new SocksProxyConfig(); proxyConfig.setUsername(socks5UserName); proxyConfig.setPassword(socks5Password); @@ -206,8 +213,8 @@ private void initMqProperties() { this.socksProxy = new Gson().toJson(proxyConfigMap); } - }catch (Exception exception){ - logger.error("init rocket mq property exception, stack trace-", exception); + } catch (Exception exception) { + LOGGER.error("init rocket mq property exception, stack trace-", exception); } } @@ -215,8 +222,8 @@ private void initMqProperties() { * init rocket mq pull consumer */ private void initConsumeWorkers() { - Set subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys(); - if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){ + Set subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys(); + if (subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()) { return; } for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) { @@ -229,6 +236,7 @@ private void initConsumeWorkers() { /** * first init default rocketmq pull consumer + * * @return */ public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys subscribeRunnerKeys) { @@ -245,7 +253,7 @@ public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys subscribeRunner pullConsumer.attachTopic(topic, "*"); pullConsumer.startup(); } catch (Exception exception) { - logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception); + LOGGER.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception); throw new EventBridgeException(" init rocketmq consumer failed"); } return pullConsumer; @@ -265,6 +273,7 @@ private String createGroupName(SubscribeRunnerKeys subscribeRunnerKeys) { /** * MessageExt convert to connect record + * * @param messageExt * @return */ @@ -311,7 +320,7 @@ private RecordOffset convertToRecordOffset(Long offset) { private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) { ConsumeWorker consumeWorker = consumeWorkerMap.get(subscribeRunnerKeys.getRunnerName()); - if (!Objects.isNull(consumeWorker)){ + if (!Objects.isNull(consumeWorker)) { consumeWorker.shutdown(); } LitePullConsumer litePullConsumer = initLitePullConsumer(subscribeRunnerKeys); @@ -322,7 +331,7 @@ private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) { private void removeConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) { ConsumeWorker consumeWorker = consumeWorkerMap.remove(subscribeRunnerKeys.getRunnerName()); - if (!Objects.isNull(consumeWorker)){ + if (!Objects.isNull(consumeWorker)) { consumeWorker.shutdown(); } } @@ -352,12 +361,12 @@ public void run() { messageBuffer.put(message); } } catch (Exception exception) { - logger.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception); + LOGGER.error(getServiceName() + " - RocketMQEventSubscriber pull record exception, stackTrace - ", exception); } } } - public void commit(List messageIds){ + public void commit(List messageIds) { this.pullConsumer.commit(messageIds); } diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java index b37b851c..f42cf9b9 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java @@ -20,10 +20,6 @@ import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -/** - * @Author changfeng - * @Date 2023/4/9 10:08 上午 - */ public class ClientConfig { private int rmqPullMessageCacheCapacity = 1000; private int rmqPullMessageBatchNums = 20; @@ -59,7 +55,7 @@ public ConsumeFromWhere getConsumeFromWhere() { } public void setConsumeFromWhere( - final ConsumeFromWhere consumeFromWhere) { + final ConsumeFromWhere consumeFromWhere) { this.consumeFromWhere = consumeFromWhere; } @@ -119,7 +115,6 @@ public void setAccessChannel(AccessChannel accessChannel) { this.accessChannel = accessChannel; } - public static ClientConfig cloneConfig(ClientConfig clientConfig) { ClientConfig newConfig = new ClientConfig(); newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums()); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java index 9923eab1..17554902 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java @@ -21,10 +21,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -/** - * @Author changfeng - * @Date 2023/4/9 10:07 上午 - */ public class ConsumeRequest { private final MessageExt messageExt; private final MessageQueue messageQueue; diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java index 667f17a3..834b7c17 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java @@ -23,10 +23,6 @@ import java.time.Duration; import java.util.List; -/** - * @Author changfeng - * @Date 2023/4/9 10:09 上午 - */ public interface LitePullConsumer { void startup() throws MQClientException; diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java index a76012e7..888d36d0 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java @@ -48,12 +48,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -/** - * @Author changfeng - * @Date 2023/4/9 10:10 上午 - */ public class LitePullConsumerImpl implements LitePullConsumer { - private static final Logger log = LoggerFactory.getLogger(LitePullConsumerImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LitePullConsumerImpl.class); private final DefaultMQPullConsumer rocketmqPullConsumer; private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; @@ -83,7 +79,7 @@ public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook rpcHo @Override public void startup() throws MQClientException { rocketmqPullConsumer.start(); - log.info("RocketmqPullConsumer start."); + LOGGER.info("RocketmqPullConsumer start."); } @Override @@ -98,7 +94,7 @@ private void shutdownThreadPool(ExecutorService executor) { try { executor.awaitTermination(60, TimeUnit.SECONDS); } catch (Exception e) { - log.error("Shutdown threadPool failed", e); + LOGGER.error("Shutdown threadPool failed", e); } if (!executor.isTerminated()) { executor.shutdownNow(); @@ -114,7 +110,7 @@ public void attachTopic(final String topic, final String tag) { public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { submitPullTask(topic, tag, mqDivided); localMessageCache.shrinkPullOffsetTable(mqDivided); - log.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided); + LOGGER.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided); } }); } @@ -156,7 +152,7 @@ private void submitPullTask(String topic, String tag, Set assigned } } if (CollectionUtils.isEmpty(assignedQueues)) { - log.warn("Not found any messageQueue, topic:{}", topic); + LOGGER.warn("Not found any messageQueue, topic:{}", topic); return; } @@ -167,10 +163,10 @@ private void submitPullTask(String topic, String tag, Set assigned try { PullTask pullTask = new PullTask(messageQueue, tag); pullImmediately(pullTask); - log.info("Submit pullTask:{}", messageQueue); + LOGGER.info("Submit pullTask:{}", messageQueue); } catch (Exception e) { - log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e); - // 添加pull失败,等待下次 rebalance + LOGGER.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e); + // Failed to add pull task, waiting for the next round of re-balance processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() .getProcessQueueTable().remove(messageQueue); if (processQueue != null) { @@ -215,13 +211,13 @@ public PullTask(MessageQueue messageQueue, String tag) { public void run() { try { if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) { - log.warn("RocketmqPullConsumer not running, pullTask exit."); + LOGGER.warn("RocketmqPullConsumer not running, pullTask exit."); return; } ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() .getProcessQueueTable().get(messageQueue); if (processQueue == null || processQueue.isDropped()) { - log.info("ProcessQueue {} dropped, pullTask exit", messageQueue); + LOGGER.info("ProcessQueue {} dropped, pullTask exit", messageQueue); return; } long offset = localMessageCache.nextPullOffset(messageQueue); @@ -231,7 +227,7 @@ public void run() { public void onSuccess(PullResult pullResult) { try { if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) { - log.warn("rocketmqPullConsumer not running, pullTask exit."); + LOGGER.warn("rocketmqPullConsumer not running, pullTask exit."); return; } @@ -248,11 +244,11 @@ public void onSuccess(PullResult pullResult) { pullImmediately(PullTask.this); } else { localMessageCache.removePullOffset(messageQueue); - log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue); + LOGGER.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue); } break; case OFFSET_ILLEGAL: - log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " + + LOGGER.warn("The pull request offset is illegal, offset is {}, message queue is {}, " + "pull result is {}, delay {} ms for next pull", offset, messageQueue, pullResult, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); @@ -260,16 +256,16 @@ public void onSuccess(PullResult pullResult) { break; case NO_NEW_MSG: case NO_MATCHED_MSG: - log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue); + LOGGER.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue); localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); pullImmediately(PullTask.this); break; default: - log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult); + LOGGER.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult); break; } } catch (Throwable t) { - log.error("Exception occurs when process pullResult", t); + LOGGER.error("Exception occurs when process pullResult", t); pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS); } } @@ -282,13 +278,13 @@ public void onException(Throwable e) { } else { delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION; } - log.error("Exception happens when pull message process, delay {} ms for message queue {}", + LOGGER.error("Exception happens when pull message process, delay {} ms for message queue {}", delayTimeMillis, messageQueue, e); pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS); } }); } catch (Throwable t) { - log.error("Error occurs when pull message process, delay {} ms for message queue {}", + LOGGER.error("Error occurs when pull message process, delay {} ms for message queue {}", PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t); pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS); } diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java index f3e3617d..131e6fc2 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java @@ -37,10 +37,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -/** - * @Author changfeng - * @Date 2023/4/9 10:06 上午 - */ public class LocalMessageCache { private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class); private final BlockingQueue consumeRequestCache; diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java index 61000fd8..aae17c2a 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java @@ -24,8 +24,9 @@ public enum PushRetryStrategyEnum { * 3 times: every 10s~20s */ BACKOFF_RETRY(1, 3), + /** - * 176 times: 1,2,4,8,16,32,64,128,256,512,512...512秒 ... 512s(176) + * 176 times: 1, 2, 4, 8, 16, 36, 64, 128, 256, 512, 512...512s(176) */ EXPONENTIAL_DECAY_RETRY(2, 176); diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java index 739ecbe6..e9e578d8 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java @@ -19,9 +19,8 @@ /** - * EventMeshTraceService - * SPI可扩展 - * 基于OpenTelemetry实现封装不同追踪器 + * Offers extension capability via SPI, allowing different tracing/metrics observation implementations: OpenTelemetry, + * Jaeger, Zipkin, etc. */ public interface TraceStrategy { diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java index e6f81c25..0ed31bae 100644 --- a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java @@ -43,27 +43,27 @@ public class ValidateFilter implements WebFilter { private List validations = new CopyOnWriteArrayList<>(); - @Value(value="${auth.validation:default}") + @Value(value = "${auth.validation:default}") private String validationName; @PostConstruct public void init() { List validationNames = Arrays.stream(validationName.split(",")).collect(Collectors.toList()); - boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName-> validationName.equals("default")); + boolean match = Arrays.stream(validationName.split(",")).allMatch(validationName -> validationName.equals("default")); if (!match) { validationNames.add(0, "default"); } - validationNames.forEach(action->validations.add(ValidationServiceFactory.getInstance(action))); + validationNames.forEach(action -> validations.add(ValidationServiceFactory.getInstance(action))); } @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); return chain.filter(exchange) - .subscriberContext(ctx -> { - AtomicReference result = new AtomicReference(); - validations.forEach(validation-> result.set(validation.validate(request, ctx))); - return result.get(); - }); + .subscriberContext(ctx -> { + AtomicReference result = new AtomicReference(); + validations.forEach(validation -> result.set(validation.validate(request, ctx))); + return result.get(); + }); } }