Skip to content

Commit

Permalink
WIP - basic skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Oct 26, 2023
1 parent c4206be commit 9e2648e
Show file tree
Hide file tree
Showing 12 changed files with 637 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package alluxio.util.io;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import jdk.internal.util.xml.impl.Input;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;

public class ByteBufferInputStream extends DataInputStream {
private final InternalInputStream mInternalInputStream;

public static class InternalInputStream extends InputStream {
private final ByteBuffer mBuffer;

public InternalInputStream(ByteBuffer buffer) {
Preconditions.checkArgument(buffer.isDirect(), "Input bytebuffer must be direct.");
mBuffer = buffer.duplicate();
}

@Override
public int read() throws IOException {
try {
return mBuffer.get();
} catch (BufferUnderflowException ex) {
throw new IOException(ex);
}
}

@Override
public void close() throws IOException {
// DO NOTHING
}
}

public static ByteBufferInputStream getInputStream(ByteBuffer buffer) {
InternalInputStream internalIs = new InternalInputStream(buffer);
return new ByteBufferInputStream(internalIs);
}

public ByteBufferInputStream(InternalInputStream internalIs) {
super(internalIs);
mInternalInputStream = internalIs;
}

public void read(ByteBuffer targetBuffer, int len) throws IOException {
if (targetBuffer.remaining() < len) {
throw new IOException("Not enough space left in targetBuffer to fill " + len + " bytes");
}
for (int i = 0; i < len; i++) {
targetBuffer.put((byte)super.read());
}
}

@Override
public void close() throws IOException {
super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package alluxio.util.io;

import com.google.common.base.Preconditions;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;

public class ByteBufferOutputStream extends DataOutputStream {

public static class InternalOutputStream extends OutputStream {
private final ByteBuffer mBuffer;

public InternalOutputStream(ByteBuffer buffer) {
Preconditions.checkArgument(buffer.isDirect(), "Input bytebuffer must be direct.");
mBuffer = buffer.duplicate();
}

@Override
public void write(int b) throws IOException {
try {
mBuffer.put((byte) b);
} catch (BufferOverflowException | ReadOnlyBufferException ex) {
throw new IOException(ex);
}
}

@Override
public void close() throws IOException {
// DO NOTHING
}
}

/**
* Assumption here is this buffer got its writable window adjusted already
* when calling this func.
* @param buffer
* @return
*/
public static ByteBufferOutputStream getOutputStream(ByteBuffer buffer) {
ByteBufferOutputStream.InternalOutputStream internalOs =
new ByteBufferOutputStream.InternalOutputStream(buffer);
return new ByteBufferOutputStream(internalOs);
}

public ByteBufferOutputStream(ByteBufferOutputStream.InternalOutputStream internalOs) {
super(internalOs);
}

@Override
public void close() throws IOException {
super.close();
}
}
16 changes: 16 additions & 0 deletions dora/core/server/worker/output.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
digraph G {
CREATED -> READING_DATA [label = "START" ];
TERMINATED_NORMALLY -> COMPLETED [label = "END" ];
TERMINATED_NORMALLY -> TERMINATED_EXCEPTIONALLY [label = "COMPLETE_REQUEST_ERROR" ];
READING_DATA -> SENDING_DATA [label = "SEND_DATA_AFTER_READING" ];
READING_DATA -> TERMINATED_EXCEPTIONALLY [label = "SEND_DATA_ERROR" ];
READING_DATA -> TERMINATED_EXCEPTIONALLY [label = "CHANNEL_EXCEPTION" ];
READING_DATA -> TERMINATED_NORMALLY [label = "CANCELLED" ];
READING_DATA -> TERMINATED_EXCEPTIONALLY [label = "READ_DATA_ERROR" ];
TERMINATED_EXCEPTIONALLY -> COMPLETED [label = "END" ];
SENDING_DATA -> TERMINATED_EXCEPTIONALLY [label = "SEND_DATA_ERROR" ];
SENDING_DATA -> TERMINATED_EXCEPTIONALLY [label = "CHANNEL_EXCEPTION" ];
SENDING_DATA -> TERMINATED_NORMALLY [label = "OUTPUT_LENGTH_FULFILLED" ];
SENDING_DATA -> TERMINATED_NORMALLY [label = "CANCELLED" ];
SENDING_DATA -> READING_DATA [label = "OUTPUT_LENGTH_NOT_FULFILLED" ];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package alluxio.worker.ucx;

import io.netty.buffer.ByteBuf;
import org.openucx.jucx.ucp.UcpWorker;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

public class AlluxioUcxUtils {
public static final int METADATA_SIZE_COMMON = 4096;

public static void writeConnectionMetadata(
ByteBuffer targetBuffer,
long tagForRemote, UcpWorker localWorker) {
// long (tag assigned to remote) | int (worker addr size) | bytes (worker addr)
// we allocate the common metadata size to match the send/recv tag exchange size
targetBuffer.putLong(tagForRemote);
ByteBuffer localWorkerAddr = localWorker.getAddress();
targetBuffer.putInt(localWorkerAddr.capacity()); // UcpWorer.getAddress always return a buffer with full capacity filled
targetBuffer.put(localWorkerAddr);
targetBuffer.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package alluxio.worker.ucx;

import org.openucx.jucx.ucp.UcpEndpoint;

public class ReadRequestRMAHandler implements UcxRequestHandler {

@Override
public void handle(UcxMessage message, UcpEndpoint endpoint) {

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package alluxio.worker.ucx;

import alluxio.AlluxioURI;
import alluxio.client.file.cache.LocalCacheManager;
import alluxio.client.file.cache.PageId;
import alluxio.exception.PageNotFoundException;
import alluxio.proto.dataserver.Protocol;
Expand All @@ -22,16 +21,36 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

public class RPCMessageHandler implements Runnable {
public class ReadRequestStreamHandler implements UcxRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(UcpServer.class);
private static final long WORKER_PAGE_SIZE = 1*1024*1024L;
Protocol.ReadRequest mReadRequest = null;
UcpEndpoint mRemoteEp;
AtomicLong mSequencer;
//conf.getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);

public ReadRequestStreamHandler() {

}

public ReadRequestStreamHandler(UcpEndpoint remoteEndpoint, AtomicLong sequencer,
Protocol.ReadRequest request) {
mReadRequest = null;
mRemoteEp = remoteEndpoint;
mSequencer = sequencer;
// sequencer = mPeerToSequencers.computeIfAbsent(peerInfo, pi -> new AtomicLong(0L));
// mReadRequest = parseReadRequest(recvBuffer);
// mReadRequest = request;
}

@Override
public void run() {
public void handle(UcxMessage message, UcpEndpoint remoteEndpoint) {
mRemoteEp = remoteEndpoint;
try {
mReadRequest = Protocol.ReadRequest.parseFrom(message.getRPCMessage());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
final String fileId =
new AlluxioURI(mReadRequest.getOpenUfsBlockOptions().getUfsPath()).hash();
long offset = mReadRequest.getOffset();
Expand Down Expand Up @@ -106,14 +125,4 @@ public void onError(int ucsStatus, String errorMsg) {
}
}
}

public RPCMessageHandler(UcpEndpoint remoteEndpoint, AtomicLong sequencer,
Protocol.ReadRequest request) {
mReadRequest = null;
mRemoteEp = remoteEndpoint;
mSequencer = sequencer;
// sequencer = mPeerToSequencers.computeIfAbsent(peerInfo, pi -> new AtomicLong(0L));
// mReadRequest = parseReadRequest(recvBuffer);
mReadRequest = request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package alluxio.worker.ucx;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Stage {
private final ThreadPoolExecutor mInternalThreadPool = null;

public Stage(String stageName,
int corePoolSize,
int maximumPoolSize,
ThreadFactory threadFactory,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {


}

}
Loading

0 comments on commit 9e2648e

Please sign in to comment.