Skip to content

Commit

Permalink
Merge pull request #1 from alibaba/master
Browse files Browse the repository at this point in the history
merge latest version
  • Loading branch information
jasonhx140 committed Jan 2, 2018
2 parents 8600e98 + 3104b4c commit 325d16b
Show file tree
Hide file tree
Showing 129 changed files with 5,023 additions and 1,013 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<li>slave重做中继日志中的事件,将改变反映它自己的数据。</li>
</ol>
<h3>canal的工作原理:</h3>
<p><img width="590" src="https://camo.githubusercontent.com/46c626b4cde399db43b2634a7911a04aecf273a0/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333130372f63383762363762612d333934632d333038362d393537372d3964623035626530346339352e6a7067" alt="" height="273">
<p><img width="590" src="http://dl.iteye.com/upload/attachment/0080/3107/c87b67ba-394c-3086-9577-9db05be04c95.jpg" alt="" height="273">
<p>原理相对比较简单:</p>
<ol>
<li>canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议</li>
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal</artifactId>
<version>1.0.25-SNAPSHOT</version>
<version>1.0.26-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.alibaba.otter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,9 +106,10 @@ public void subscribe(String filter) throws CanalClientException {
this.filter = filter;
return;
} catch (Throwable t) {
logger.warn("something goes wrong when subscribing from server:{}\n{}",
currentConnector != null ? currentConnector.getAddress() : "null",
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format(
"something goes wrong when subscribing from server: %s",
currentConnector != null ? currentConnector.getAddress() : "null"),
t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -126,9 +126,8 @@ public void unsubscribe() throws CanalClientException {
currentConnector.unsubscribe();
return;
} catch (Throwable t) {
logger.warn("something goes wrong when unsubscribing from server:{}\n{}",
currentConnector != null ? currentConnector.getAddress() : "null",
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when unsubscribing from server:%s",
currentConnector != null ? currentConnector.getAddress() : "null"), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -144,9 +143,8 @@ public Message get(int batchSize) throws CanalClientException {
Message msg = currentConnector.get(batchSize);
return msg;
} catch (Throwable t) {
logger.warn("something goes wrong when getting data from server:{}\n{}",
currentConnector != null ? currentConnector.getAddress() : "null",
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when getting data from server:%s",
currentConnector != null ? currentConnector.getAddress() : "null"), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -162,9 +160,8 @@ public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClien
Message msg = currentConnector.get(batchSize, timeout, unit);
return msg;
} catch (Throwable t) {
logger.warn("something goes wrong when getting data from server:{}\n{}",
currentConnector != null ? currentConnector.getAddress() : "null",
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when getting data from server:%s",
currentConnector != null ? currentConnector.getAddress() : "null"), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -180,9 +177,8 @@ public Message getWithoutAck(int batchSize) throws CanalClientException {
Message msg = currentConnector.getWithoutAck(batchSize);
return msg;
} catch (Throwable t) {
logger.warn("something goes wrong when getWithoutAck data from server:{}\n{}",
currentConnector.getAddress(),
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when getWithoutAck data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -198,9 +194,8 @@ public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws
Message msg = currentConnector.getWithoutAck(batchSize, timeout, unit);
return msg;
} catch (Throwable t) {
logger.warn("something goes wrong when getWithoutAck data from server:{}\n{}",
currentConnector.getAddress(),
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when getWithoutAck data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -216,9 +211,8 @@ public void rollback(long batchId) throws CanalClientException {
currentConnector.rollback(batchId);
return;
} catch (Throwable t) {
logger.warn("something goes wrong when rollbacking data from server:{}\n{}",
currentConnector.getAddress(),
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -234,9 +228,8 @@ public void rollback() throws CanalClientException {
currentConnector.rollback();
return;
} catch (Throwable t) {
logger.warn("something goes wrong when rollbacking data from server:{}\n{}",
currentConnector.getAddress(),
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand All @@ -253,9 +246,8 @@ public void ack(long batchId) throws CanalClientException {
currentConnector.ack(batchId);
return;
} catch (Throwable t) {
logger.warn("something goes wrong when acking data from server:{}\n{}",
currentConnector.getAddress(),
ExceptionUtils.getFullStackTrace(t));
logger.warn(String.format("something goes wrong when acking data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -37,7 +41,6 @@
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* 基于{@linkplain CanalServerWithNetty}定义的网络协议接口,对于canal数据进行get/rollback/ack等操作
Expand All @@ -57,6 +60,8 @@ public class SimpleCanalConnector implements CanalConnector {
private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
private SocketChannel channel;
private ReadableByteChannel readableChannel;
private WritableByteChannel writableChannel;
private List<Compression> supportedCompressions = new ArrayList<Compression>();
private ClientIdentity clientIdentity;
private ClientRunningMonitor runningMonitor; // 运行控制
Expand Down Expand Up @@ -130,7 +135,9 @@ private InetSocketAddress doConnect() throws CanalClientException {
address = getNextAddress();
}
channel.connect(address);
Packet p = Packet.parseFrom(readNextPacket(channel));
readableChannel = Channels.newChannel(channel.socket().getInputStream());
writableChannel = Channels.newChannel(channel.socket().getOutputStream());
Packet p = Packet.parseFrom(readNextPacket());
if (p.getVersion() != 1) {
throw new CanalClientException("unsupported version at this client.");
}
Expand All @@ -144,17 +151,17 @@ private InetSocketAddress doConnect() throws CanalClientException {
//
ClientAuth ca = ClientAuth.newBuilder()
.setUsername(username != null ? username : "")
.setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
.setNetReadTimeout(soTimeout)
.setNetWriteTimeout(soTimeout)
.build();
writeWithHeader(channel,
Packet.newBuilder()
.setType(PacketType.CLIENTAUTHENTICATION)
.setBody(ca.toByteString())
.build()
.toByteArray());
writeWithHeader(Packet.newBuilder()
.setType(PacketType.CLIENTAUTHENTICATION)
.setBody(ca.toByteString())
.build()
.toByteArray());
//
Packet ack = Packet.parseFrom(readNextPacket(channel));
Packet ack = Packet.parseFrom(readNextPacket());
if (ack.getType() != PacketType.ACK) {
throw new CanalClientException("unexpected packet type when ack is expected");
}
Expand All @@ -173,36 +180,47 @@ private InetSocketAddress doConnect() throws CanalClientException {
}

private void doDisconnnect() throws CanalClientException {
if (readableChannel != null) {
quietlyClose(readableChannel);
readableChannel = null;
}
if (writableChannel != null) {
quietlyClose(writableChannel);
writableChannel = null;
}
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
logger.warn("exception on closing channel:{} \n {}", channel, e);
}
quietlyClose(channel);
channel = null;
}
}

private void quietlyClose(Channel channel) {
try {
channel.close();
} catch (IOException e) {
logger.warn("exception on closing channel:{} \n {}", channel, e);
}
}

public void subscribe() throws CanalClientException {
subscribe(""); // 传递空字符即可
}

public void subscribe(String filter) throws CanalClientException {
waitClientRunning();
try {
writeWithHeader(channel,
Packet.newBuilder()
.setType(PacketType.SUBSCRIPTION)
.setBody(Sub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFilter(filter != null ? filter : "")
.build()
.toByteString())
writeWithHeader(Packet.newBuilder()
.setType(PacketType.SUBSCRIPTION)
.setBody(Sub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFilter(filter != null ? filter : "")
.build()
.toByteArray());
.toByteString())
.build()
.toByteArray());
//
Packet p = Packet.parseFrom(readNextPacket(channel));
Packet p = Packet.parseFrom(readNextPacket());
Ack ack = Ack.parseFrom(p.getBody());
if (ack.getErrorCode() > 0) {
throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
Expand All @@ -217,18 +235,17 @@ public void subscribe(String filter) throws CanalClientException {
public void unsubscribe() throws CanalClientException {
waitClientRunning();
try {
writeWithHeader(channel,
Packet.newBuilder()
.setType(PacketType.UNSUBSCRIPTION)
.setBody(Unsub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.build()
.toByteString())
writeWithHeader(Packet.newBuilder()
.setType(PacketType.UNSUBSCRIPTION)
.setBody(Unsub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.build()
.toByteArray());
.toByteString())
.build()
.toByteArray());
//
Packet p = Packet.parseFrom(readNextPacket(channel));
Packet p = Packet.parseFrom(readNextPacket());
Ack ack = Ack.parseFrom(p.getBody());
if (ack.getErrorCode() > 0) {
throw new CanalClientException("failed to unSubscribe with reason: " + ack.getErrorMessage());
Expand Down Expand Up @@ -261,29 +278,27 @@ public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws
unit = TimeUnit.MILLISECONDS;
}

writeWithHeader(channel,
Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteArray());

.toByteString())
.build()
.toByteArray());
return receiveMessages();
} catch (IOException e) {
throw new CanalClientException(e);
}
}

private Message receiveMessages() throws InvalidProtocolBufferException, IOException {
Packet p = Packet.parseFrom(readNextPacket(channel));
private Message receiveMessages() throws IOException {
Packet p = Packet.parseFrom(readNextPacket());
switch (p.getType()) {
case MESSAGES: {
if (!p.getCompression().equals(Compression.NONE)) {
Expand Down Expand Up @@ -315,7 +330,7 @@ public void ack(long batchId) throws CanalClientException {
.setBatchId(batchId)
.build();
try {
writeWithHeader(channel, Packet.newBuilder()
writeWithHeader(Packet.newBuilder()
.setType(PacketType.CLIENTACK)
.setBody(ca.toByteString())
.build()
Expand All @@ -333,7 +348,7 @@ public void rollback(long batchId) throws CanalClientException {
.setBatchId(batchId)
.build();
try {
writeWithHeader(channel, Packet.newBuilder()
writeWithHeader(Packet.newBuilder()
.setType(PacketType.CLIENTROLLBACK)
.setBody(ca.toByteString())
.build()
Expand All @@ -350,7 +365,15 @@ public void rollback() throws CanalClientException {

// ==================== helper method ====================

private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
private void writeWithHeader(byte[] body) throws IOException {
writeWithHeader(writableChannel, body);
}

private byte[] readNextPacket() throws IOException {
return readNextPacket(readableChannel);
}

private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {
synchronized (writeDataLock) {
writeHeader.clear();
writeHeader.putInt(body.length);
Expand All @@ -360,7 +383,7 @@ private void writeWithHeader(SocketChannel channel, byte[] body) throws IOExcept
}
}

private byte[] readNextPacket(SocketChannel channel) throws IOException {
private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {
synchronized (readDataLock) {
readHeader.clear();
read(channel, readHeader);
Expand All @@ -371,7 +394,7 @@ private byte[] readNextPacket(SocketChannel channel) throws IOException {
}
}

private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
private void read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
int r = channel.read(buffer);
if (r == -1) {
Expand Down
2 changes: 1 addition & 1 deletion client/src/test/java/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</encoder>
</appender>

<root level="WARN">
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Loading

0 comments on commit 325d16b

Please sign in to comment.