Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error during snapshot #654

Open
sarthaksingh-tomar opened this issue Jun 21, 2024 · 2 comments
Open

Error during snapshot #654

sarthaksingh-tomar opened this issue Jun 21, 2024 · 2 comments

Comments

@sarthaksingh-tomar
Copy link

Hello,

I am trying clickhouse-sink-connector-lightweight to replicate data from Mariadb to clickhouse but it is failing with this exception during snapshot.

using default config with below mariadb configs.

# Mariadb specific properties
connector.adapter: "mariadb"
database.protocol: "jdbc:mariadb"
database.jdbc.driver: "org.mariadb.jdbc.Driver"

https://github.com/Altinity/clickhouse-sink-connector/blob/develop/sink-connector-lightweight/docker/config.yml

**[Sink Connector thread-pool-5] ERROR io.debezium.embedded.EmbeddedEngine - Timed out waiting to flush EmbeddedEngine{id=connector-shard18} offsets to storage**

[debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
**java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table db.table1 failed**
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:467)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:189)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table db.table1 failed
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:597)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:519)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	... 5 more
**Caused by: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data**
	at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
	at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
	at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:130)
	at org.mariadb.jdbc.client.result.StreamingResult.nextStreamingValue(StreamingResult.java:113)
	at org.mariadb.jdbc.client.result.StreamingResult.next(StreamingResult.java:163)
	at io.debezium.jdbc.CancellableResultSet.next(CancellableResultSet.java:52)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:576)
	... 7 more
	Suppressed: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data
		at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
		at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
		at org.mariadb.jdbc.client.result.Result.close(Result.java:307)
		at io.debezium.jdbc.CancellableResultSet.close(CancellableResultSet.java:65)
		at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:545)
		... 7 more
	Caused by: java.net.SocketException: Connection reset
		at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:325)
		at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
		at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
		at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
		at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
		at org.mariadb.jdbc.client.socket.impl.PacketReader.readPacket(PacketReader.java:76)
		at org.mariadb.jdbc.client.result.Result.skipRemaining(Result.java:210)
		at org.mariadb.jdbc.client.result.Result.close(Result.java:305)
		... 9 more
	Suppressed: java.sql.SQLNonTransientConnectionException: (conn=272359) Error while streaming resultSet data
		at org.mariadb.jdbc.export.ExceptionFactory.createException(ExceptionFactory.java:293)
		at org.mariadb.jdbc.export.ExceptionFactory.create(ExceptionFactory.java:359)
		at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:130)
		at org.mariadb.jdbc.client.result.StreamingResult.fetchRemaining(StreamingResult.java:145)
		at org.mariadb.jdbc.client.result.Result.closeFromStmtClose(Result.java:325)
		at org.mariadb.jdbc.Statement.close(Statement.java:175)
		at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:545)
		... 7 more
	Caused by: java.net.SocketException: Connection reset
		at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:325)
		at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
		at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
		at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
		at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
		at org.mariadb.jdbc.client.socket.impl.PacketReader.readPacket(PacketReader.java:76)
		at org.mariadb.jdbc.client.result.Result.readNext(Result.java:155)
		at org.mariadb.jdbc.client.result.StreamingResult.addStreamingValue(StreamingResult.java:124)
		... 11 more

@subkanthi
Copy link
Collaborator

This could be related to acquiring table locks, u can try this debezium configuration

snapshot.locking.mode= none 

@sarthaksingh-tomar
Copy link
Author

This could be related to acquiring table locks, u can try this debezium configuration

snapshot.locking.mode= none 

@subkanthi
Already using that to avoid locking but still connector is failing.
connector parameter configs

database.allowPublicKeyRetrieval: "true"

# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.
# The default value of the property is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property
snapshot.mode: "when_needed"
snapshot.locking.mode: "none"
#snapshot.fetch.size: "15000"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 5000

# connector.class: The Java class for the connector. This must be set to io.debezium.connector.mysql.MySqlConnector.
connector.class: "io.debezium.connector.mysql.MySqlConnector"

# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse://#########:8123/altinity_sink_connector"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "#####"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "#######"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists altinity_sink_connector.replica_source_info 
(
    `id` String,
    `offset_key` String,
    `offset_val` String,
    `record_insert_ts` DateTime,
    `record_insert_seq` UInt64,
    `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"

# offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.delete: "select * from altinity_sink_connector.replica_source_info"

offset.storage.jdbc.offset.table.select: "SELECT id, offset_key, offset_val FROM altinity_sink_connector.replica_source_info FINAL ORDER BY record_insert_ts, record_insert_seq"

# schema.history.internal: The Java class that implements the schema history strategy. This must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory.
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"

# schema.history.internal.jdbc.schema.history.table.name: The name of the database table where connector schema history is to be stored.
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
schema.history.internal.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"

# schema.history.internal.jdbc.url: The JDBC URL for the database where connector schema history is to be stored.
schema.history.internal.jdbc.url: "jdbc:clickhouse://########:8123/altinity_sink_connector"

# schema.history.internal.jdbc.user: The name of the database user to be used when connecting to the database where connector schema history is to be stored.
schema.history.internal.jdbc.user: "#####"
# schema.history.internal.jdbc.password: The password of the database user to be used when connecting to the database where connector schema history is to be stored.
schema.history.internal.jdbc.password: "########"

# schema.history.internal.jdbc.schema.history.table.ddl: The DDL statement used to create the database table where connector schema history is to be stored.(Advanced)
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists altinity_sink_connector.replicate_schema_history
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"

# enable.snapshot.ddl: If set to true, the connector will parse the DDL statements from the initial load
enable.snapshot.ddl: "true"

# persist.raw.bytes: If set to true, the connector will persist raw bytes as received in a String column.
persist.raw.bytes: "false"

# auto.create.tables: If set to true, the connector will create tables in the target based on the schema received in the incoming message.
auto.create.tables: "true"

# auto.create.tables.replicated: If set to true, the connector will create table with Engine set to ReplicatedReplacingMergeTree
#"auto.create.tables.replicated: "true"

# database.connectionTimeZone: The timezone of the MySQL database server used to correctly shift the commit transaction timestamp.
database.connectionTimeZone: "UTC"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"

# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.
#skip_replica_start: "false"

# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.
#binary.handling.mode: "base64"

# ignore_delete: If set to true, the connector will ignore delete events. The default is false.
#ignore_delete: "true"

#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
#disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"

#restart.event.loop: This will restart the CDC event loop if there are no messages received after timeout specified in restart.event.loop.timeout.period.secs
restart.event.loop: "false"

#restart.event.loop.timeout.period.secs: Defines the restart timeout period.
restart.event.loop.timeout.period.secs: "3000"

# Max number of records for the flush buffer.
buffer.max.records: "10000"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants