Skip to content

Commit

Permalink
Add timeout for DataExchange (TransfertFiles), set default timeout fo…
Browse files Browse the repository at this point in the history
…r AbstractFSURL (30 sec) #141
  • Loading branch information
hdsdi3g committed Jun 22, 2023
1 parent fcf85b7 commit 76c81e0
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,17 @@ static void checkIsSameFileSystem(final AbstractFile from,
}
}

default Duration getDataExchangeTimeout() {
return Duration.ofSeconds(30);
}

default DataExchangeInOutStream copyAbstractToAbstract(final AbstractFile destination,
final DataExchangeObserver dataExchangeObserver,
final DataExchangeFilter... filters) {
final var bufferSize = Math.max(8192,
Math.max(destination.getFileSystem().getIOBufferSize(),
getFileSystem().getIOBufferSize()));
final var exchange = new DataExchangeInOutStream();
final var exchange = new DataExchangeInOutStream(getDataExchangeTimeout());
Stream.of(filters).forEach(exchange::addFilter);
copyAbstractToAbstract(destination, bufferSize, dataExchangeObserver, exchange);
return exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ default int getIOBufferSize() {
default void setTimeout(final long duration, final TimeUnit unit) {
}

default long getTimeout() {
return 30000;
}

/**
* If disconnected, can we re-connect after ?
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public AbstractFileSystemURL(final String ressourceURL) {
.orElse(0);
if (timeout > 0) {
fileSystem.setTimeout(timeout, SECONDS);
} else {
fileSystem.setTimeout(30, SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Objects;
import java.util.stream.Stream;

Expand All @@ -43,6 +44,11 @@ protected CommonAbstractFile(final T fileSystem, final String... path) {
.collect(joining("/")));
}

@Override
public Duration getDataExchangeTimeout() {
return Duration.ofMillis(fileSystem.getTimeout());
}

@Override
public AbstractFileSystem<?> getFileSystem() {
return fileSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void setTimeout(final long duration, final TimeUnit unit) {
}
}

@Override
public long getTimeout() {
return timeoutDuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,7 +37,7 @@
* Not reusable
*/
@Slf4j
public class DataExchangeInOutStream { // TODO add timeout wait
public class DataExchangeInOutStream implements TimeOutTrait {

private final InternalInputStream internalInputStream;
private final InternalOutputStream internalOutputStream;
Expand All @@ -47,11 +48,14 @@ public class DataExchangeInOutStream { // TODO add timeout wait
private final HashMap<DataExchangeFilter, Long> filterPerformance;
private final HashMap<DataExchangeFilter, Long> filterDeltaThroughput;
private final AtomicLong ioWaitTime;
private final Duration timeOut;

private volatile State state;

public enum State {
WORKING(false, false),
READ_TIMEOUT(false, true),
WRITE_TIMEOUT(true, false),
STOPPED_BY_USER(true, false),
STOPPED_BY_FILTER(true, false),
WRITER_MANUALLY_CLOSED(false, true),
Expand All @@ -67,6 +71,11 @@ public enum State {
}

public DataExchangeInOutStream() {
this(Duration.ofSeconds(30));
}

public DataExchangeInOutStream(final Duration timeOut) {// TODO use with other calls
this.timeOut = timeOut;
internalInputStream = new InternalInputStream();
internalOutputStream = new InternalOutputStream();
filters = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -78,6 +87,11 @@ public DataExchangeInOutStream() {
ioWaitTime = new AtomicLong(0);
}

@Override
public Duration getTimeout() {
return timeOut;
}

private class InternalInputStream extends InputStream {
private volatile boolean readerClosed = false;

Expand All @@ -92,11 +106,14 @@ public int read(final byte[] b, final int off, final int len) throws IOException
log.trace("Read event (wait) of {} byte(s), {} in queue...", len, readQueue.size());
}

while (readQueue.isEmpty()
&& state == State.WORKING
&& readerClosed == false) {
Thread.onSpinWait();
}
whileToTimeout(() -> readQueue.isEmpty()
&& state == State.WORKING
&& readerClosed == false,
() -> {
log.error("Read timeout");
readerClosed = true;
state = State.READ_TIMEOUT;
});

if (readerClosed) {
throw new IOException("Closed InputStream (reader)");
Expand Down Expand Up @@ -178,9 +195,11 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
}

if (state == State.WORKING) {
while (readQueue.isEmpty() == false) {// FIXME dangerous
Thread.onSpinWait();
}
whileToTimeout(() -> readQueue.isEmpty() == false,
() -> {
log.error("Write timeout");
state = State.WRITE_TIMEOUT;
});

final var now = System.currentTimeMillis();
buffers.write(b, off, len);
Expand Down Expand Up @@ -406,4 +425,5 @@ public DataExchangeInOutStream addFilter(final DataExchangeFilter filter) {
}
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* This file is part of transfertfiles.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2023
*
*/
package tv.hd3g.transfertfiles;

import java.io.InterruptedIOException;
import java.time.Duration;
import java.util.function.BooleanSupplier;

public interface TimeOutTrait {

Duration getTimeout();

default void whileToTimeout(final BooleanSupplier condition,
final Runnable onTimeout) throws InterruptedIOException {
if (condition.getAsBoolean() == false) {
return;
}
final var timeoutDate = System.currentTimeMillis() + getTimeout().toMillis();

while (condition.getAsBoolean()) {
Thread.onSpinWait();
if (System.currentTimeMillis() > timeoutDate) {
onTimeout.run();
throw new InterruptedIOException();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void testNotSetTimeout() {
assertNotNull(fs);
assertTrue(fs instanceof CommonAbstractFileSystem);
final var cfs = (CommonAbstractFileSystem<?>) fs;
assertEquals(0, cfs.timeoutDuration);
assertEquals(30000, cfs.timeoutDuration);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,27 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.FILTER_ERROR;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.READ_TIMEOUT;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.STOPPED_BY_FILTER;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.STOPPED_BY_USER;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.WORKING;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.WRITER_MANUALLY_CLOSED;
import static tv.hd3g.transfertfiles.DataExchangeInOutStream.State.WRITE_TIMEOUT;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -92,10 +97,6 @@ void testBaseCopy() throws IOException, InterruptedException, ExecutionException
assertEquals(WRITER_MANUALLY_CLOSED, exchange.getState());
}

// TODO check behavior if reader is blocked
// TODO check behavior if writer is blocked
// TODO check behavior if readed is 0 size

@Test
void testBaseCopy_filtered() throws IOException, InterruptedException, ExecutionException, TimeoutException {
exchange = new DataExchangeInOutStream();
Expand Down Expand Up @@ -1155,4 +1156,50 @@ void testGetTransfertStats() throws InterruptedException, ExecutionException, IO
assertTrue(tStats.getTotalDuration() >= 0);
}

interface RunWithIOException {
void run() throws IOException;
}

@Nested
class Blocking {

byte[] dataInput;
byte[] dataOutput;

@BeforeEach
void init() {
exchange = new DataExchangeInOutStream(Duration.ofMillis(100));
dataInput = "0123456789".getBytes();
dataOutput = new byte[dataInput.length];
}

CompletableFuture<Void> async(final RunWithIOException run) {
return CompletableFuture.runAsync(() -> {
try {
run.run();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
}

@Test
void testWriteEmpty() throws IOException, InterruptedException, ExecutionException, TimeoutException {
assertThrows(InterruptedIOException.class,
() -> exchange.getSourceOriginStream().read(dataOutput));

assertArrayEquals(new byte[dataOutput.length], dataOutput);
assertEquals(READ_TIMEOUT, exchange.getState());
}

@Test
void testReadEmpty() throws IOException, InterruptedException, ExecutionException, TimeoutException {
exchange.getDestTargetStream().write(dataInput);
assertThrows(InterruptedIOException.class,
() -> exchange.getDestTargetStream().write(dataInput));

assertEquals(WRITE_TIMEOUT, exchange.getState());
}
}

}
Loading

0 comments on commit 76c81e0

Please sign in to comment.