Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: code style issues to make compile pass with check-style plugin enabled #160

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ protected void printStats() throws IOException {
return;
}

// tps: 每秒文件打印的行数
// tps: rows to print for each second
final long tps = currentRowCount - previousRowCount.get();
previousRowCount.set(currentRowCount);
writeCount.add(currentRowCount);
costTime.add(1000);
// delayTime(条/ms)=接收的数量/花费的时间
// delayTime(record/ms)= receiving-amount / time
final double delayTime = writeCount.longValue() / costTime.longValue();
// String delayTimeStr = twoDecimal(delayTime);

String info = String.format("Current Time: %s | TPS: %d ",
UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps);

System.out.println(info);
System.out.printf("%s%n", info);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@
*/
package org.apache.rocketmq.eventbridge.adapter.benchmark;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* 整条链路
* End-to-End use case
*/
public class EventTPSCommon extends AbstractEventCommon {
public static void main(String[] args) {
String filePath = System.getProperty("user.home") + "/demo.eventbridge";
if (args.length > 0) {
filePath = args[0];
}
EventTPSCommon tpsCommon = null;
EventTPSCommon tpsCommon;
try {
tpsCommon = new EventTPSCommon(filePath);
tpsCommon.start();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
Expand All @@ -56,7 +56,7 @@ private void init(String filePath) throws FileNotFoundException {
previousRowCount = new AtomicReference<>();
previousRowCount.set(0);
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class DatasourceConfig {
private Long validationTimeoutMs;

@Bean("dataSource")
public DataSource getMasterDataSource(){
public DataSource getMasterDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(baseUrl);
hikariConfig.setDriverClassName(baseDriverClassName);
Expand All @@ -102,7 +102,8 @@ public SqlSessionFactory masterSqlSessionFactory(@Qualifier("dataSource") DataSo
}

@Bean("sqlSessionTemplate")
public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
public SqlSessionTemplate masterSqlSessionTemplate(
@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,21 @@ private TargetRunnerConfig buildTargetRunnerConfig(String accountId, String name
targetRunnerConfig.setName(name);
List<Map<String, String>> components = Lists.newArrayList();
targetRunnerConfig.setComponents(components);
Map<String, String> sourceComponent = new Gson().fromJson(new Gson().toJson(source
.getConfig()), new TypeToken<Map<String, String>>() {
}.getType());
Map<String, String> filterComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
.getConfig()), new TypeToken<Map<String, String>>() {
}.getType());

Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
.getConfig()), new TypeToken<Map<String, String>>() {
}.getType());
Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target
.getConfig()), new TypeToken<Map<String, String>>() {
}.getType());

Map<String, String> sourceComponent = new Gson().fromJson(
new Gson().toJson(source.getConfig()),
new TypeToken<Map<String, String>>() {}.getType());

Map<String, String> filterComponent = new Gson().fromJson(
new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()),
new TypeToken<Map<String, String>>() {}.getType());

Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()),
new TypeToken<Map<String, String>>() {}.getType());

Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target.getConfig()),
new TypeToken<Map<String, String>>() {}.getType());

components.add(sourceComponent);
components.add(filterComponent);
components.add(transformComponent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,16 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;


import javax.annotation.PostConstruct;
import java.util.concurrent.atomic.AtomicReference;

/**
* event bridge runtime
*
* @author artisan
*/
@Component
public class Runtime {

private static final Logger logger = LoggerFactory.getLogger(Runtime.class);
private static final Logger LOGGER = LoggerFactory.getLogger(Runtime.class);

private AtomicReference<RuntimeState> runtimerState;

Expand All @@ -65,7 +62,7 @@ public class Runtime {

@PostConstruct
public void initAndStart() throws Exception {
logger.info("Start init runtime.");
LOGGER.info("Start init runtime.");
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
runnerConfigObserver.registerListener(circulatorContext);
runnerConfigObserver.registerListener(eventSubscriber);
Expand All @@ -80,11 +77,11 @@ public void initAndStart() throws Exception {
RUNTIME_START_AND_SHUTDOWN.start();

java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("try to shutdown server");
LOGGER.info("try to shutdown server");
try {
RUNTIME_START_AND_SHUTDOWN.shutdown();
} catch (Exception e) {
logger.error("err when shutdown runtime ", e);
LOGGER.error("err when shutdown runtime ", e);
}
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@

/**
* listen the event and offer to queue
*
* @author artisan
*/
public class EventBusListener extends ServiceThread {

private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventBusListener.class);

private final CirculatorContext circulatorContext;
private final EventSubscriber eventSubscriber;
Expand All @@ -60,7 +58,7 @@ public void run() {
}
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
LOGGER.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class EventRuleTransfer extends ServiceThread {

private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventRuleTransfer.class);

private volatile Integer batchSize = 100;

Expand All @@ -68,18 +68,18 @@ public void init() {

@Override
public void run() {
List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();;
List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();
while (!stopped) {
try {
Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
LOGGER.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
if (MapUtils.isEmpty(latestTransformMap)) {
logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
LOGGER.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(3000);
continue;
}
Expand All @@ -92,7 +92,7 @@ public void run() {
curEventRecords.forEach(pullRecord -> {
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failedstackTrace-", exception);
LOGGER.error("transfer do transform event record failed, stackTrace-", exception);
errorHandler.handle(pullRecord, exception);
return null;
})
Expand All @@ -108,9 +108,9 @@ public void run() {
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
LOGGER.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
logger.error("transfer event record failed, stackTrace-", exception);
LOGGER.error("transfer event record failed, stackTrace-", exception);
afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
}

Expand All @@ -127,7 +127,7 @@ public void shutdown() {
try {
circulatorContext.releaseTaskTransform();
} catch (Exception e) {
logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,18 @@

/**
* event target push to sink task
*
* @author artisan
*/
public class EventTargetTrigger extends ServiceThread {

private static final Logger logger = LoggerFactory.getLogger(EventTargetTrigger.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventTargetTrigger.class);

private final CirculatorContext circulatorContext;
private final OffsetManager offsetManager;
private final ErrorHandler errorHandler;
private volatile Integer batchSize = 100;

public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
ErrorHandler errorHandler) {
ErrorHandler errorHandler) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
Expand All @@ -60,15 +58,15 @@ public void run() {
while (!stopped) {
Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
if (MapUtils.isEmpty(targetRecordMap)) {
logger.trace("current target pusher is empty");
LOGGER.trace("current target pusher is empty");
this.waitForRunning(1000);
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start push content by pusher - {}", JSON.toJSONString(targetRecordMap));
}

for(String runnerName: targetRecordMap.keySet()){
for (String runnerName : targetRecordMap.keySet()) {
ExecutorService executorService = circulatorContext.getExecutorService(runnerName);
executorService.execute(() -> {
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);
Expand All @@ -77,7 +75,7 @@ public void run() {
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
} catch (Exception exception) {
logger.error(getServiceName() + " push target exception, stackTrace-", exception);
LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception);
triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
}
});
Expand All @@ -101,7 +99,7 @@ public void shutdown() {
circulatorContext.releaseExecutorService();
circulatorContext.releaseTriggerTask();
} catch (Exception e) {
logger.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
LOGGER.error(String.format("current thread: %s, error Track: %s ", getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}
}
Loading
Loading