Skip to content

Commit

Permalink
Merge pull request #1 from alibaba/master
Browse files Browse the repository at this point in the history
pr
  • Loading branch information
nbqyqx committed Jun 7, 2018
2 parents debfdb8 + b734a32 commit b6967dc
Show file tree
Hide file tree
Showing 43 changed files with 10,129 additions and 8,397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@ public interface CanalConnector {
*/
void rollback() throws CanalClientException;

/**
* 中断的阻塞,用于优雅停止client
* @throws CanalClientException
*/
void stopRunning() throws CanalClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,8 @@ public SimpleCanalConnector getCurrentConnector() {
return currentConnector;
}

public void stopRunning() {
currentConnector.stopRunning();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

/**
* 基于{@linkplain CanalServerWithNetty}定义的网络协议接口,对于canal数据进行get/rollback/ack等操作
*
*
* @author jianghang 2012-10-24 下午05:37:20
* @version 1.0.0
*/
Expand Down Expand Up @@ -75,6 +75,8 @@ public class SimpleCanalConnector implements CanalConnector {
private Object readDataLock = new Object();
private Object writeDataLock = new Object();

private boolean running = false;

public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
this(address, username, password, destination, 60000);
}
Expand All @@ -99,6 +101,9 @@ public void connect() throws CanalClientException {
}
} else {
waitClientRunning();
if (!running) {
return;
}
doConnect();
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
subscribe(filter);
Expand Down Expand Up @@ -208,6 +213,9 @@ public void subscribe() throws CanalClientException {

public void subscribe(String filter) throws CanalClientException {
waitClientRunning();
if (!running) {
return;
}
try {
writeWithHeader(Packet.newBuilder()
.setType(PacketType.SUBSCRIPTION)
Expand All @@ -234,6 +242,9 @@ public void subscribe(String filter) throws CanalClientException {

public void unsubscribe() throws CanalClientException {
waitClientRunning();
if (!running) {
return;
}
try {
writeWithHeader(Packet.newBuilder()
.setType(PacketType.UNSUBSCRIPTION)
Expand Down Expand Up @@ -445,7 +456,11 @@ private void waitClientRunning() {
throw new CanalClientException("should connect first");
}

running = true;
mutex.get();// 阻塞等待
} else {
// 单机模式直接设置为running
running = true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -502,4 +517,13 @@ public void setFilter(String filter) {
this.filter = filter;
}

public void stopRunning() {
if (running) {
running = false; //设置为非running状态
if (!mutex.state()) {
mutex.set(true); //中断阻塞
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ static void putEntry(final int charsetId, String mysqlCharset, String mysqlColla
putEntry(42, "latin7", "latin7_general_cs", "ISO8859_7");
putEntry(43, "macce", "macce_bin", "MacCentralEurope");
putEntry(44, "cp1250", "cp1250_croatian_ci", "Cp1250");
putEntry(45, "utf8mb4", "utf8mb4_general_ci", "MacCentralEurope");
putEntry(46, "utf8mb4", "utf8mb4_bin", "MacCentralEurope");
putEntry(45, "utf8mb4", "utf8mb4_general_ci", "UTF-8");
putEntry(46, "utf8mb4", "utf8mb4_bin", "UTF-8");
putEntry(47, "latin1", "latin1_bin", "ISO8859_1");
putEntry(48, "latin1", "latin1_general_ci", "ISO8859_1");
putEntry(49, "latin1", "latin1_general_cs", "ISO8859_1");
Expand Down
35 changes: 32 additions & 3 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@
import com.taobao.tddl.dbsync.binlog.event.StartLogEventV3;
import com.taobao.tddl.dbsync.binlog.event.StopLogEvent;
import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
import com.taobao.tddl.dbsync.binlog.event.TransactionContextLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
import com.taobao.tddl.dbsync.binlog.event.ViewChangeEvent;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XaPrepareLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;

/**
* Implements a binary-log decoder.
Expand Down Expand Up @@ -366,6 +370,24 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.TRANSACTION_CONTEXT_EVENT: {
TransactionContextLogEvent event = new TransactionContextLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.VIEW_CHANGE_EVENT: {
ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.XA_PREPARE_LOG_EVENT: {
XaPrepareLogEvent event = new XaPrepareLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.ANNOTATE_ROWS_EVENT: {
AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
/* updating position in context */
Expand All @@ -390,6 +412,12 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.START_ENCRYPTION_EVENT: {
StartEncryptionLogEvent event = new StartEncryptionLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
default:
/*
* Create an object of Ignorable_log_event for unrecognized
Expand All @@ -402,9 +430,10 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
logPosition.position = header.getLogPos();
return event;
} else {
if (logger.isWarnEnabled()) logger.warn("Skipping unrecognized binlog event "
+ LogEvent.getTypeName(header.getType()) + " from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unrecognized binlog event " + LogEvent.getTypeName(header.getType())
+ " from: " + context.getLogPosition());
}
}
}

Expand Down
30 changes: 19 additions & 11 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,17 @@ public abstract class LogEvent {

public static final int PREVIOUS_GTIDS_LOG_EVENT = 35;

/* MySQL 5.7 events */
public static final int TRANSACTION_CONTEXT_EVENT = 36;

public static final int VIEW_CHANGE_EVENT = 37;

/* Prepared XA transaction terminal event similar to Xid */
public static final int XA_PREPARE_LOG_EVENT = 38;

// mariaDb 5.5.34
/* New MySQL/Sun events are to be added right above this comment */
public static final int MYSQL_EVENTS_END = 36;
public static final int MYSQL_EVENTS_END = 39;

public static final int MARIA_EVENTS_BEGIN = 160;
/* New Maria event numbers start from here */
Expand All @@ -189,8 +197,10 @@ public abstract class LogEvent {
*/
public static final int GTID_LIST_EVENT = 163;

public static final int START_ENCRYPTION_EVENT = 164;

/** end marker */
public static final int ENUM_END_EVENT = 164;
public static final int ENUM_END_EVENT = 165;

/**
* 1 byte length, 1 byte format Length is total length in bytes, including 2
Expand Down Expand Up @@ -359,7 +369,7 @@ public static String getTypeName(final int type) {
protected static final Log logger = LogFactory.getLog(LogEvent.class);

protected final LogHeader header;

/**
* mysql半同步semi标识
*
Expand All @@ -369,18 +379,16 @@ public static String getTypeName(final int type) {
* </pre>
*/
protected int semival;



public int getSemival() {
return semival;
}
return semival;
}

public void setSemival(int semival) {
this.semival = semival;
}
public void setSemival(int semival) {
this.semival = semival;
}

protected LogEvent(LogHeader header){
protected LogEvent(LogHeader header){
this.header = header;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
public static final int HEARTBEAT_HEADER_LEN = 0;
public static final int IGNORABLE_HEADER_LEN = 0;
public static final int ROWS_HEADER_LEN_V2 = 10;
public static final int TRANSACTION_CONTEXT_HEADER_LEN = 18;
public static final int VIEW_CHANGE_HEADER_LEN = 52;
public static final int XA_PREPARE_HEADER_LEN = 0;

public static final int ANNOTATE_ROWS_HEADER_LEN = 0;
public static final int BINLOG_CHECKPOINT_HEADER_LEN = 4;
public static final int GTID_HEADER_LEN = 19;
public static final int GTID_LIST_HEADER_LEN = 4;
public static final int START_ENCRYPTION_HEADER_LEN = 0;

public static final int POST_HEADER_LENGTH = 11;

Expand Down Expand Up @@ -202,11 +207,17 @@ public FormatDescriptionLogEvent(final int binlogVersion){
postHeaderLen[GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
postHeaderLen[ANONYMOUS_GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
postHeaderLen[PREVIOUS_GTIDS_LOG_EVENT - 1] = IGNORABLE_HEADER_LEN;

postHeaderLen[TRANSACTION_CONTEXT_EVENT - 1] = TRANSACTION_CONTEXT_HEADER_LEN;
postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;

// mariadb 10
postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;
postHeaderLen[BINLOG_CHECKPOINT_EVENT - 1] = BINLOG_CHECKPOINT_HEADER_LEN;
postHeaderLen[GTID_EVENT - 1] = GTID_HEADER_LEN;
postHeaderLen[GTID_LIST_EVENT - 1] = GTID_LIST_HEADER_LEN;
postHeaderLen[START_ENCRYPTION_EVENT - 1] = START_ENCRYPTION_HEADER_LEN;
break;

case 3: /* 4.0.x x>=2 */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.taobao.tddl.dbsync.binlog.event;

import java.nio.ByteBuffer;
import java.util.UUID;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;

Expand All @@ -16,6 +19,8 @@ public class GtidLogEvent extends LogEvent {
public static final int ENCODED_SID_LENGTH = 16;

private boolean commitFlag;
private UUID sid;
private long gno;

public GtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
Expand All @@ -27,6 +32,14 @@ public GtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEven
buffer.position(commonHeaderLen);
commitFlag = (buffer.getUint8() != 0); // ENCODED_FLAG_LENGTH

byte[] bs = buffer.getData(ENCODED_SID_LENGTH);
ByteBuffer bb = ByteBuffer.wrap(bs);
long high = bb.getLong();
long low = bb.getLong();
sid = new UUID(high, low);

gno = buffer.getLong64();

// ignore gtid info read
// sid.copy_from((uchar *)ptr_buffer);
// ptr_buffer+= ENCODED_SID_LENGTH;
Expand All @@ -42,4 +55,11 @@ public boolean isCommitFlag() {
return commitFlag;
}

public UUID getSid() {
return sid;
}

public long getGno() {
return gno;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.taobao.tddl.dbsync.binlog.event;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;

/**
* @author agapple 2018年5月7日 下午7:05:39
* @version 1.0.26
* @since mysql 5.7
*/
public class TransactionContextLogEvent extends LogEvent {

public TransactionContextLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.taobao.tddl.dbsync.binlog.event;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;

/**
* @author agapple 2018年5月7日 下午7:05:39
* @version 1.0.26
* @since mysql 5.7
*/
public class ViewChangeEvent extends LogEvent {

public ViewChangeEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
}
}
Loading

0 comments on commit b6967dc

Please sign in to comment.