Skip to content

Commit

Permalink
Merge pull request #23 from codingapi/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
xlorne authored Dec 22, 2017
2 parents 2c10b99 + aebde1f commit 96ca5f5
Show file tree
Hide file tree
Showing 28 changed files with 539 additions and 367 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<java.version>1.8</java.version>
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>

<lcn.last.version>4.0.3.SNAPSHOT</lcn.last.version>
<lcn.last.version>4.0.3.M1</lcn.last.version>
</properties>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ public class TxCompensateLocal {

private String groupId;

private String type;

private int startState;


public int getStartState() {
return startState;
}

public void setStartState(int startState) {
this.startState = startState;
}

public String getGroupId() {
return groupId;
Expand All @@ -19,6 +31,14 @@ public void setGroupId(String groupId) {
this.groupId = groupId;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public static TxCompensateLocal current() {
return currentLocal.get();
}
Expand All @@ -27,5 +47,4 @@ public static void setCurrent(TxCompensateLocal current) {
currentLocal.set(current);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.codingapi.tx.aop.bean.TxTransactionLocal;
import com.codingapi.tx.aop.service.TransactionServer;
import com.codingapi.tx.framework.task.TaskGroupManager;
import com.codingapi.tx.framework.task.TaskState;
import com.codingapi.tx.framework.task.TxTask;
import com.codingapi.tx.framework.thread.HookRunnable;
import com.codingapi.tx.model.TxGroup;
Expand Down Expand Up @@ -88,20 +89,37 @@ public void run0() {

int lastState = rs==-1?0:resState;

int executeConnectionError = 0;

//控制本地事务的数据提交
final TxTask waitTask = TaskGroupManager.getInstance().getTask(groupId, type);
if(waitTask!=null){
waitTask.setState(lastState);
waitTask.signalTask();

while (!waitTask.isRemove()){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if(waitTask.getState()== TaskState.connectionError.getCode()){
//本地执行失败.
executeConnectionError = 1;

lastState = 0;
}
}


if (compensateLocal == null) {
long end = System.currentTimeMillis();
long time = end - start;
if (lastState == 1 && rs == 0) {
if (executeConnectionError == 1||(lastState == 1 && rs == 0)) {
//记录补偿日志
txManagerService.sendCompensateMsg(groupId, time, info);
txManagerService.sendCompensateMsg(groupId, time, info,executeConnectionError);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class CompensateInfo {
private String address;
private long time;
private String resJson;
private int startError;

private int state;

Expand All @@ -24,7 +25,7 @@ public String toParamsString() {
String postParam = "model=" + modelName + "&uniqueKey=" + uniqueKey + "" +
"&address=" + address + "&currentTime=" + currentTime +
"&data=" + data + "&time=" + time + "&groupId=" + groupId + "" +
"&className=" + className + "&methodStr=" + methodStr;
"&className=" + className + "&methodStr=" + methodStr+"&startError="+startError;
return postParam;
}

Expand All @@ -33,7 +34,7 @@ public CompensateInfo() {

public CompensateInfo(long currentTime, String modelName, String uniqueKey, String data,
String methodStr, String className, String groupId, String address,
long time) {
long time,int startError) {
this.currentTime = currentTime;
this.modelName = modelName;
this.uniqueKey = uniqueKey;
Expand All @@ -44,9 +45,18 @@ public CompensateInfo(long currentTime, String modelName, String uniqueKey, Stri
this.address = address;
this.time = time;
this.state = 0;
this.startError =startError;
}


public int getStartError() {
return startError;
}

public void setStartError(int startError) {
this.startError = startError;
}

public int getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public interface CompensateService {

void saveLocal(CompensateInfo compensateInfo);

boolean invoke(TransactionInvocation invocation, String groupId);
boolean invoke(TransactionInvocation invocation, String groupId, int startState);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public void saveLocal(CompensateInfo compensateInfo) {
}

@Override
public boolean invoke(TransactionInvocation invocation, String groupId) {
public boolean invoke(TransactionInvocation invocation, String groupId, int startState) {

TxCompensateLocal compensateLocal = new TxCompensateLocal();
compensateLocal.setGroupId(groupId);
compensateLocal.setStartState(startState);

TxCompensateLocal.setCurrent(compensateLocal);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ public String execute(JSONObject resObj, String json) {

String groupId = resObj.getString("g");

int startState = resObj.getInteger("ss");

byte[] bytes = Base64Utils.decode(data);

TransactionInvocation invocation = SerializerUtils.parserTransactionInvocation(bytes);

if (invocation != null) {
logger.info("compensate method ->" + invocation.getMethodStr());

boolean res = compensateService.invoke(invocation, groupId);
boolean res = compensateService.invoke(invocation, groupId,startState);

logger.info("compensate res ->" + res);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.codingapi.tx.control.service.IActionService;
import com.codingapi.tx.framework.task.TaskGroup;
import com.codingapi.tx.framework.task.TaskGroupManager;
import com.codingapi.tx.framework.task.TaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -64,7 +65,9 @@ private String notifyWaitTask(TaskGroup task, int state) {
while (true) {
if (task.isRemove()) {

if (task.getState() == 0 || task.getState() == 1) {
if (task.getState() == TaskState.rollback.getCode()
|| task.getState() == TaskState.commit.getCode()) {

res = "1";
} else {
res = "0";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.codingapi.tx.datasource;


import com.codingapi.tx.aop.bean.TxCompensateLocal;
//import com.codingapi.tx.aop.bean.TxCompensateLocal;
import com.codingapi.tx.aop.bean.TxTransactionLocal;
import com.codingapi.tx.datasource.service.DataSourceService;
import com.lorne.core.framework.utils.task.Task;
Expand Down Expand Up @@ -64,8 +64,8 @@ public void close(ILCNResource connection) {

protected abstract void initDbType();

protected abstract C getRollback(C connection);

// protected abstract C getCompensateConnection(C connection,TxCompensateLocal txCompensateLocal);
//


protected ILCNResource loadConnection(){
Expand Down Expand Up @@ -128,10 +128,10 @@ protected C initLCNConnection(C connection) {
logger.info("lcn datasource transaction control ");

//补偿的情况的
if (TxCompensateLocal.current() != null) {
logger.info("rollback transaction ");
return getRollback(connection);
}
// if (TxCompensateLocal.current() != null) {
// logger.info("rollback transaction ");
// return getCompensateConnection(connection,TxCompensateLocal.current());
// }

if(StringUtils.isNotEmpty(txTransactionLocal.getGroupId())){

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.codingapi.tx.framework.task;

/**
* create by lorne on 2017/12/21
*/
public enum TaskState {

rollback(0),commit(1),networkError(-1),networkTimeOut(-2),connectionError(-3);


/**
* 数据状态:
* 1:commit
* 0:rollback
* -1:network error
* -2:network time out
* -3:execute Connection error
* @return state
*/

private int code;

TaskState(int code) {
this.code = code;
}


public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public interface MQTxManagerService {
* @param groupId 事务组Id
* @param time 执行时间
* @param info 事务信息
* @param startError 启动模块db执行异常
*/
void sendCompensateMsg(String groupId, long time, TxTransactionInfo info);
void sendCompensateMsg(String groupId, long time, TxTransactionInfo info,int startError);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public String httpGetServer() {
}

@Override
public void sendCompensateMsg(String groupId, long time, TxTransactionInfo info) {
public void sendCompensateMsg(String groupId, long time, TxTransactionInfo info,int startError) {

String modelName = modelNameService.getModelName();
String uniqueKey = modelNameService.getUniqueKey();
Expand All @@ -135,7 +135,7 @@ public void sendCompensateMsg(String groupId, long time, TxTransactionInfo info)
long currentTime = System.currentTimeMillis();


CompensateInfo compensateInfo = new CompensateInfo(currentTime, modelName, uniqueKey, data, methodStr, className, groupId, address, time);
CompensateInfo compensateInfo = new CompensateInfo(currentTime, modelName, uniqueKey, data, methodStr, className, groupId, address, time,startError);

String json = managerHelper.httpPost(configReader.getTxUrl() + "sendCompensateMsg", compensateInfo.toParamsString());

Expand Down
2 changes: 1 addition & 1 deletion tx-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.codingapi</groupId>
<artifactId>tx-manager</artifactId>
<version>4.0.3.SNAPSHOT</version>
<version>4.0.3.M1</version>
<packaging>jar</packaging>

<name>tx-manager</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public boolean sendCompensateMsg(@RequestParam("model") String model, @RequestPa
@RequestParam("currentTime") long currentTime,
@RequestParam("groupId") String groupId, @RequestParam("className") String className,
@RequestParam("time") int time, @RequestParam("data") String data,
@RequestParam("methodStr") String methodStr, @RequestParam("address") String address) {
return apiTxManagerService.sendCompensateMsg(currentTime, groupId, model, address, uniqueKey, className, methodStr, data, time);
@RequestParam("methodStr") String methodStr, @RequestParam("address") String address,
@RequestParam("startError") int startError) {
return apiTxManagerService.sendCompensateMsg(currentTime, groupId, model, address, uniqueKey, className, methodStr, data, time,startError);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public interface ApiTxManagerService {
* @param methodStr 事务启动方法
* @param data 切面数据
* @param time 执行时间
* @param startError 启动模块异常
* @return 是否保存成功
*/
boolean sendCompensateMsg(long currentTime, String groupId, String model, String address, String uniqueKey, String className, String methodStr, String data, int time);
boolean sendCompensateMsg(long currentTime, String groupId, String model, String address, String uniqueKey, String className, String methodStr, String data, int time,int startError);

/**
* 获取服务器状态
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public int cleanNotifyTransaction(String groupId, String taskId) {


@Override
public boolean sendCompensateMsg(long currentTime, String groupId, String model, String address, String uniqueKey, String className, String methodStr, String data, int time) {
TransactionCompensateMsg transactionCompensateMsg = new TransactionCompensateMsg(currentTime, groupId, model, address, uniqueKey, className, methodStr, data, time, 0);
public boolean sendCompensateMsg(long currentTime, String groupId, String model, String address, String uniqueKey, String className, String methodStr, String data, int time,int startError) {
TransactionCompensateMsg transactionCompensateMsg = new TransactionCompensateMsg(currentTime, groupId, model, address, uniqueKey, className, methodStr, data, time, 0,startError);
return compensateService.saveCompensateMsg(transactionCompensateMsg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class TransactionCompensateMsg {
private String methodStr;
private String data;
private int time;
private int startError;

private TxGroup txGroup;

Expand All @@ -24,7 +25,7 @@ public class TransactionCompensateMsg {

public TransactionCompensateMsg(long currentTime, String groupId, String model, String address,
String uniqueKey, String className,
String methodStr, String data, int time, int state) {
String methodStr, String data, int time, int state,int startError) {
this.currentTime = currentTime;
this.groupId = groupId;
this.model = model;
Expand All @@ -35,8 +36,16 @@ public TransactionCompensateMsg(long currentTime, String groupId, String model,
this.time = time;
this.address = address;
this.state = state;
this.startError = startError;
}

public int getStartError() {
return startError;
}

public void setStartError(int startError) {
this.startError = startError;
}

public int getState() {
return state;
Expand Down
Loading

0 comments on commit 96ca5f5

Please sign in to comment.