Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2329 Reestabilish input stream on unexpected read bytes size #3704

Open
wants to merge 4 commits into
base: version-1.4.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 70 additions & 57 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package com.wavesplatform

import java.io._
import java.net.{MalformedURLException, URL}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

import akka.actor.ActorSystem
import com.google.common.io.ByteStreams
import com.google.common.primitives.Ints
Expand All @@ -16,7 +9,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonB
import com.wavesplatform.block.{Block, BlockHeader}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.consensus.PoSSelector
import com.wavesplatform.database.{openDB, DBExt, KeyTags}
import com.wavesplatform.database.{DBExt, KeyTags, openDB}
import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent}
import com.wavesplatform.extensions.{Context, Extension}
import com.wavesplatform.features.BlockchainFeatures
Expand All @@ -25,12 +18,12 @@ import com.wavesplatform.lang.ValidationError
import com.wavesplatform.mining.Miner
import com.wavesplatform.protobuf.block.PBBlocks
import com.wavesplatform.settings.WavesSettings
import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height}
import com.wavesplatform.state.appender.BlockAppender
import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction}
import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height}
import com.wavesplatform.transaction.TxValidationError.GenericError
import com.wavesplatform.transaction.smart.script.trace.TracedResult
import com.wavesplatform.utils._
import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction}
import com.wavesplatform.utils.*
import com.wavesplatform.utx.{UtxPool, UtxPoolImpl}
import com.wavesplatform.wallet.Wallet
import kamon.Kamon
Expand All @@ -40,6 +33,13 @@ import monix.reactive.{Observable, Observer}
import org.iq80.leveldb.DB
import scopt.OParser

import java.io.*
import java.net.{MalformedURLException, URL}
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

object Importer extends ScorexLogging {
import monix.execution.Scheduler.Implicits.global

Expand All @@ -59,7 +59,7 @@ object Importer extends ScorexLogging {
import scopt.OParser

val builder = OParser.builder[ImportOptions]
import builder._
import builder.*

OParser.sequence(
programName("waves import"),
Expand Down Expand Up @@ -166,12 +166,13 @@ object Importer extends ScorexLogging {
}
}

@volatile private var quit = false
private val lock = new Object
@volatile private var quit = false
@volatile private var inputStream: InputStream = null
private val lock = new Object

// noinspection UnstableApiUsage
def startImport(
inputStream: BufferedInputStream,
getInputStream: () => InputStream,
blockchain: Blockchain,
appendBlock: AppendBlock,
importOptions: ImportOptions,
Expand All @@ -188,58 +189,68 @@ object Importer extends ScorexLogging {
if (blocksToSkip > 0) log.info(s"Skipping $blocksToSkip block(s)")

sys.addShutdownHook {
import scala.concurrent.duration._
import scala.concurrent.duration.*
val millis = (System.nanoTime() - start).nanos.toMillis
log.info(
s"Imported $counter block(s) from $startHeight to ${startHeight + counter} in ${humanReadableDuration(millis)}"
)
}

inputStream = getInputStream()

while (!quit && counter < blocksToApply) lock.synchronized {
val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES)
if (s1 == Ints.BYTES) {
val blockSize = Ints.fromByteArray(lenBytes)

lazy val blockBytes = new Array[Byte](blockSize)
val factReadSize =
if (blocksToSkip > 0) {
// File IO optimization
ByteStreams.skipFully(inputStream, blockSize)
blockSize
} else {
ByteStreams.read(inputStream, blockBytes, 0, blockSize)
}
try {
val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES)
if (s1 == Ints.BYTES) {
val blockSize = Ints.fromByteArray(lenBytes)

lazy val blockBytes = new Array[Byte](blockSize)
val factReadSize =
if (blocksToSkip > 0) {
// File IO optimization
ByteStreams.skipFully(inputStream, blockSize)
blockSize
} else {
ByteStreams.read(inputStream, blockBytes, 0, blockSize)
}

if (factReadSize == blockSize) {
if (blocksToSkip > 0) {
blocksToSkip -= 1
} else {
val blockV5 = blockchain.isFeatureActivated(
BlockchainFeatures.BlockV5,
blockchain.height + 1
)
val block =
(if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes)
else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get
if (blockchain.lastBlockId.contains(block.header.reference)) {
Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match {
case Left(ve) =>
log.error(s"Error appending block: $ve")
quit = true
case _ =>
counter = counter + 1
}
if (factReadSize == blockSize) {
if (blocksToSkip > 0) {
blocksToSkip -= 1
} else {
log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}")
val blockV5 = blockchain.isFeatureActivated(
BlockchainFeatures.BlockV5,
blockchain.height + 1
)
val block =
(if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes)
else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get
if (blockchain.lastBlockId.contains(block.header.reference)) {
Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match {
case Left(ve) =>
log.error(s"Error appending block: $ve")
quit = true
case _ =>
counter = counter + 1
}
} else {
log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}")
}
}
} else {
log.info(s"$factReadSize != expected $blockSize")
log.info(s"reestablishing input stream")
inputStream.close()
inputStream = getInputStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a notice: actually we have blockSize for each block, so we don't need to read whole BlockInfoAtHeight sequence to find the offset.

}
} else {
log.info(s"$factReadSize != expected $blockSize")
if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})")
quit = true
}
} else {
if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})")
quit = true
} catch {
case NonFatal(e) =>
log.error(s"Error reading bytes: $e")
quit = true
}
}
}
Expand Down Expand Up @@ -285,7 +296,7 @@ object Importer extends ScorexLogging {
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, db, actorSystem)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val importFileOffset =
def importFileOffset() =
if (importOptions.dryRun) 0
else
importOptions.format match {
Expand All @@ -303,7 +314,9 @@ object Importer extends ScorexLogging {

case _ => 0L
}
val inputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset), 2 * 1024 * 1024)

def establishInputStream() =
new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset()), 2 * 1024 * 1024)

if (importOptions.dryRun) {
def readNextBlock(): Future[Option[Block]] = Future.successful(None)
Expand Down Expand Up @@ -352,10 +365,10 @@ object Importer extends ScorexLogging {
levelDb.close()
db.close()
}
inputStream.close()
if (inputStream != null) inputStream.close()
}

startImport(inputStream, blockchainUpdater, extAppender, importOptions, importFileOffset == 0)
startImport(() => establishInputStream(), blockchainUpdater, extAppender, importOptions, importFileOffset() == 0)
Await.result(Kamon.stopModules(), 10.seconds)
}
}