Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FS upload speed improvement #85

Merged
merged 5 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions mcumgr-core/src/main/java/io/runtime/mcumgr/dfu/task/Upload.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import io.runtime.mcumgr.image.McuMgrImage;
import io.runtime.mcumgr.managers.ImageManager;
import io.runtime.mcumgr.task.TaskManager;
import io.runtime.mcumgr.transfer.ImageUploader;
import io.runtime.mcumgr.transfer.TransferController;
import io.runtime.mcumgr.transfer.UploadCallback;

import static io.runtime.mcumgr.transfer.ImageUploaderKt.windowUpload;

class Upload extends FirmwareUpgradeTask {
private final McuMgrImage mcuMgrImage;
private final int image;
Expand Down Expand Up @@ -71,13 +70,12 @@ public void onUploadCompleted() {
final Settings settings = performer.getSettings();
final ImageManager manager = new ImageManager(settings.transport);
if (settings.windowCapacity > 1) {
mUploadController = windowUpload(
mUploadController = new ImageUploader(
manager,
mcuMgrImage.getData(), image,
settings.windowCapacity,
settings.memoryAlignment,
callback
);
settings.memoryAlignment
).uploadAsync(callback);
} else {
mUploadController = manager.imageUpload(mcuMgrImage.getData(), image, callback);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.runtime.mcumgr.transfer

import io.runtime.mcumgr.McuMgrCallback
import io.runtime.mcumgr.exception.McuMgrException
import io.runtime.mcumgr.managers.FsManager
import io.runtime.mcumgr.response.UploadResponse
import io.runtime.mcumgr.util.CBOR

private const val OP_WRITE = 2
private const val ID_FILE = 0

open class FileUploader(
private val fsManager: FsManager,
private val name: String,
data: ByteArray,
windowCapacity: Int = 1,
memoryAlignment: Int = 1,
) : Uploader(
data,
windowCapacity,
memoryAlignment,
fsManager.mtu,
fsManager.scheme
) {
override fun write(requestMap: Map<String, Any>, timeout: Long, callback: (UploadResult) -> Unit) {
fsManager.uploadAsync(requestMap, timeout, callback)
}

override fun getAdditionalData(
data: ByteArray,
offset: Int,
map: MutableMap<String, Any>
) {
map["name"] = name
}

override fun getAdditionalSize(offset: Int): Int =
CBOR.stringLength("name") + CBOR.stringLength(name)
}

private fun FsManager.uploadAsync(
requestMap: Map<String, Any>,
timeout: Long,
callback: (UploadResult) -> Unit
) = send(OP_WRITE, ID_FILE, requestMap, timeout, UploadResponse::class.java,
object : McuMgrCallback<UploadResponse> {
override fun onResponse(response: UploadResponse) {
callback(UploadResult.Response(response, response.returnCode))
}

override fun onError(error: McuMgrException) {
callback(UploadResult.Failure(error))
}
}
)
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package io.runtime.mcumgr.transfer

import io.runtime.mcumgr.McuMgrCallback
import io.runtime.mcumgr.exception.InsufficientMtuException
import io.runtime.mcumgr.exception.McuMgrException
import io.runtime.mcumgr.managers.ImageManager
import io.runtime.mcumgr.response.UploadResponse
import io.runtime.mcumgr.util.CBOR
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.slf4j.LoggerFactory
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
import java.util.*
Expand All @@ -18,76 +13,26 @@ private const val OP_WRITE = 2
private const val ID_UPLOAD = 1
private const val TRUNCATED_HASH_LEN = 3

@Deprecated(
message = "Use TransferManager.windowUpload instead",
replaceWith = ReplaceWith(
"ImageUploader(this, data, image, windowCapacity, memoryAlignment).uploadAsync(callback)",
"io.runtime.mcumgr.transfer.TransferManager.windowUpload"
)
)
fun ImageManager.windowUpload(
data: ByteArray,
image: Int,
windowCapacity: Int,
memoryAlignment: Int,
callback: UploadCallback
): TransferController {
val log = LoggerFactory.getLogger("ImageUploader")
val uploader = ImageUploader(data, image, this, windowCapacity, memoryAlignment)

val exceptionHandler = CoroutineExceptionHandler { _, t ->
log.error("Upload failed", t)
}
val job = GlobalScope.launch(exceptionHandler) {
val progress = uploader.progress.onEach { progress ->
callback.onUploadProgressChanged(
progress.offset,
progress.size,
progress.timestamp,
)
}.launchIn(this)

val start = System.currentTimeMillis()
uploader.uploadCatchMtu()
val duration = System.currentTimeMillis() - start
log.info("Upload completed. ${data.size} bytes sent in $duration ms with avg speed: ${data.size.toFloat() / (duration.toFloat() + 1f)} kBytes/s") // + 1 to prevent division by zero
progress.cancel()
}

job.invokeOnCompletion { throwable ->
throwable?.printStackTrace()
when (throwable) {
null -> callback.onUploadCompleted()
is CancellationException -> callback.onUploadCanceled()
is McuMgrException -> callback.onUploadFailed(throwable)
else -> callback.onUploadFailed(McuMgrException(throwable))
}
}

return object : TransferController {
var paused: Job? = null

override fun pause() {
paused = GlobalScope.launch { uploader.pause() }
}
override fun resume() {
uploader.resume()
paused = null
}
override fun cancel() {
paused?.cancel()
job.cancel()
}
}
}
): TransferController =
ImageUploader(this, data, image, windowCapacity, memoryAlignment).uploadAsync(callback)

// Catches an mtu exception, sets the new mtu and restarts the upload.
private suspend fun Uploader.uploadCatchMtu() {
try {
upload()
} catch (e: InsufficientMtuException) {
mtu = e.mtu
upload()
}
}

internal class ImageUploader(
open class ImageUploader(
private val imageManager: ImageManager,
imageData: ByteArray,
private val image: Int,
private val imageManager: ImageManager,
windowCapacity: Int = 1,
memoryAlignment: Int = 1,
) : Uploader(
Expand Down
93 changes: 90 additions & 3 deletions mcumgr-core/src/main/java/io/runtime/mcumgr/transfer/Uploader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package io.runtime.mcumgr.transfer

import io.runtime.mcumgr.McuMgrScheme
import io.runtime.mcumgr.exception.InsufficientMtuException
import io.runtime.mcumgr.exception.McuMgrException
import io.runtime.mcumgr.exception.McuMgrTimeoutException
import io.runtime.mcumgr.util.CBOR
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
Expand All @@ -31,6 +32,24 @@ private data class Chunk(val data: ByteArray, val offset: Int) {
override fun toString(): String {
return "Chunk(offset=$offset, size=${data.size})"
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as Chunk

if (!data.contentEquals(other.data)) return false
if (offset != other.offset) return false

return true
}

override fun hashCode(): Int {
var result = data.contentHashCode()
result = 31 * result + offset
return result
}
}

abstract class Uploader(
Expand Down Expand Up @@ -165,6 +184,74 @@ abstract class Uploader(
}
}

/**
* Uploads the data asynchronously.
*/
@JvmOverloads fun uploadAsync(
callback: UploadCallback,
scope: CoroutineScope = GlobalScope,
): TransferController {
val exceptionHandler = CoroutineExceptionHandler { _, t ->
log.error("Upload failed", t)
}
val job = scope.launch(exceptionHandler) {
val progress = progress.onEach { progress ->
callback.onUploadProgressChanged(
progress.offset,
progress.size,
progress.timestamp,
)
}.launchIn(this)

val size = data.size
val start = System.currentTimeMillis()
uploadCatchMtu()
val duration = System.currentTimeMillis() - start
log.info("Upload completed. $size bytes sent in $duration ms with avg speed: ${size.toFloat() / (duration.toFloat() + 1f)} kBytes/s") // + 1 to prevent division by zero
progress.cancel()
}

job.invokeOnCompletion { throwable ->
throwable?.printStackTrace()
when (throwable) {
null -> callback.onUploadCompleted()
is CancellationException -> callback.onUploadCanceled()
is McuMgrException -> callback.onUploadFailed(throwable)
else -> callback.onUploadFailed(McuMgrException(throwable))
}
}

val uploader = this
return object : TransferController {
var paused: Job? = null

override fun pause() {
paused = scope.launch {
uploader.pause()
paused = null
}
}
override fun resume() {
uploader.resume()
paused = null
}
override fun cancel() {
paused?.cancel()
job.cancel()
}
}
}

// Catches an mtu exception, sets the new mtu and restarts the upload.
private suspend fun uploadCatchMtu() {
try {
upload()
} catch (e: InsufficientMtuException) {
mtu = e.mtu
upload()
}
}

/**
* Pauses upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.runtime.mcumgr.exception.McuMgrException;
import io.runtime.mcumgr.managers.FsManager;
import io.runtime.mcumgr.sample.viewmodel.SingleLiveEvent;
import io.runtime.mcumgr.transfer.FileUploader;
import io.runtime.mcumgr.transfer.TransferController;
import io.runtime.mcumgr.transfer.UploadCallback;
import no.nordicsemi.android.ble.ConnectionPriorityRequest;
Expand Down Expand Up @@ -98,7 +99,14 @@ public void upload(final String path, final byte[] data) {
requestHighConnectionPriority();
setLoggingEnabled(false);
initialBytes = 0;
controller = manager.fileUpload(path, data, this);

// The previous way of uploading the file:
// controller = manager.fileUpload(path, data, this);

// Improved uploader which makes use of window upload mechanism
// (sending multiple packets without waiting for the response):
controller = new FileUploader(manager, path, data, 3, 4)
.uploadAsync(this);
}

public void pause() {
Expand Down