Skip to content

Commit

Permalink
ucp server/client WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Oct 9, 2023
1 parent ceeeefe commit 1472081
Show file tree
Hide file tree
Showing 10 changed files with 748 additions and 0 deletions.
4 changes: 4 additions & 0 deletions dora/core/client/fs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<dependency>
<groupId>com.jucx</groupId>
<artifactId>jucx</artifactId>
</dependency>

<dependency>
<groupId>org.alluxio</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.ReadType;
import alluxio.client.file.dora.DoraCacheClient;
import alluxio.client.file.dora.ucx.UcxDataReader;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.client.file.ufs.UfsBaseFileSystem;
import alluxio.collections.Pair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.LocalPageStore;
import alluxio.client.file.cache.store.PageStoreDir;
import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
Expand All @@ -38,6 +39,7 @@
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.openucx.jucx.ucp.UcpMemory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -582,6 +584,25 @@ public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
return get(pageId, pageOffset, (int) pageSize, buffer, cacheContext);
}

public Optional<UcpMemory> get(PageId pageId, int pageOffset, int bytesToRead)
throws PageNotFoundException, IOException {
if (mState.get() == NOT_IN_USE) {
Metrics.GET_NOT_READY_ERRORS.inc();
Metrics.GET_ERRORS.inc();
return Optional.empty();
}
PageInfo pageInfo;
try (LockResource r2 = new LockResource(mPageMetaStore.getLock().readLock())) {
pageInfo = mPageMetaStore.getPageInfo(pageId); //check if page exists and refresh LRU items
} catch (PageNotFoundException e) {
LOG.debug("get({},pageOffset={}) fails due to page not found", pageId, pageOffset);
throw e;
}
UcpMemory ucpMemory = ((LocalPageStore)(pageInfo.getLocalCacheDir().getPageStore()))
.get(pageId, false, pageOffset, bytesToRead);
return Optional.of(ucpMemory);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.client.file.cache.store;

import static alluxio.client.file.cache.store.PageStoreDir.getFileBucket;
import static java.nio.file.StandardOpenOption.READ;

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
Expand All @@ -23,13 +24,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.openucx.jucx.UcxUtils;
import org.openucx.jucx.ucp.UcpContext;
import org.openucx.jucx.ucp.UcpMemMapParams;
import org.openucx.jucx.ucp.UcpMemory;
import org.openucx.jucx.ucp.UcpParams;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -199,6 +207,23 @@ public Path getPagePath(PageId pageId, boolean isTemporary) {
return filePath.resolve(Long.toString(pageId.getPageIndex()));
}

public static final UcpContext sGlobalContext = new UcpContext(new UcpParams()
.requestStreamFeature()
.requestTagFeature()
.requestWakeupFeature());
public UcpMemory get(PageId pageId, boolean isTemporary,
int pageOffset, int bytesToRead) throws IOException {
Path filePath = getPagePath(pageId, isTemporary);
FileChannel fileChannel = FileChannel.open(filePath, READ);
MappedByteBuffer buf = fileChannel.map(FileChannel.MapMode.READ_ONLY,
pageOffset, bytesToRead);
UcpMemory mmapedMemory = sGlobalContext.memoryMap(new UcpMemMapParams()
.setAddress(UcxUtils.getAddress(buf))
.setLength(bytesToRead).nonBlocking());
return mmapedMemory;
}


@Override
public DataFileChannel getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, boolean isTemporary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import alluxio.client.file.URIStatus;
import alluxio.client.file.dora.netty.NettyDataReader;
import alluxio.client.file.dora.netty.NettyDataWriter;
import alluxio.client.file.dora.ucx.UcxDataReader;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -60,8 +61,14 @@
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.ucx.UcpProxy;

import org.openucx.jucx.ucp.UcpContext;
import org.openucx.jucx.ucp.UcpParams;
import org.openucx.jucx.ucp.UcpWorker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -162,6 +169,19 @@ public DoraCachePositionReader createNettyPositionReader(URIStatus status,
return new DoraCachePositionReader(reader, status.getLength(), externalPositionReader);
}

public UcxDataReader createUcxPositionReader(URIStatus status,
Protocol.OpenUfsBlockOptions ufsOptions) {
WorkerNetAddress workerNetAddress = getWorkerNetAddress(status.toString());
InetSocketAddress inetSocketAddress =
new InetSocketAddress(workerNetAddress.getHost(), 1234);
Protocol.ReadRequest.Builder builder = Protocol.ReadRequest.newBuilder()
.setBlockId(DUMMY_BLOCK_ID)
.setOpenUfsBlockOptions(ufsOptions)
.setChunkSize(mChunkSize);
return new UcxDataReader(inetSocketAddress, UcpProxy.getInstance().mWorker,
builder);
}

protected GrpcDataReader.Factory createGrpcDataReader(
WorkerNetAddress workerNetAddress,
Protocol.OpenUfsBlockOptions ufsOptions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package alluxio.client.file.dora.ucx;

import alluxio.PositionReader;
import alluxio.file.ByteBufferTargetBuffer;
import alluxio.file.ReadTargetBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.ucx.UcpProxy;

import com.google.common.base.Preconditions;
import org.openucx.jucx.UcxCallback;
import org.openucx.jucx.UcxException;
import org.openucx.jucx.UcxUtils;
import org.openucx.jucx.ucp.UcpEndpoint;
import org.openucx.jucx.ucp.UcpEndpointParams;
import org.openucx.jucx.ucp.UcpMemory;
import org.openucx.jucx.ucp.UcpRequest;
import org.openucx.jucx.ucp.UcpWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.function.Supplier;

public class UcxDataReader implements PositionReader {
private static final Logger LOG = LoggerFactory.getLogger(UcxDataReader.class);

InetSocketAddress mAddr;
UcpWorker mWorker;
UcpEndpoint mWorkerEndpoint;
Supplier<Protocol.ReadRequest.Builder> mRequestBuilder;
public UcxDataReader(InetSocketAddress addr, UcpWorker worker,
Protocol.ReadRequest.Builder requestBuilder) {
mAddr = addr;
mWorker = worker;
mRequestBuilder = requestBuilder::clone;
}

public void acquireServerConn() {
if (mWorkerEndpoint != null) {
return;
}
mWorkerEndpoint = mWorker.newEndpoint(
new UcpEndpointParams()
.setPeerErrorHandlingMode()
.setErrorHandler((ep, status, errorMsg) ->
System.out.println("[ERROR] creating ep to remote:"
+ mAddr + " errored out: " + errorMsg
+ " status:" + status + ",ep:" + ep.toString()))
.setSocketAddress(mAddr));
}

public void waitForRequest(UcpRequest ucpRequest) {
while(!ucpRequest.isCompleted()) {
try {
mWorker.progress();
} catch (Exception e) {
LOG.error("Error progressing req:", e);
}
}
}

@Override
public int readInternal(long position, ReadTargetBuffer buffer, int length) throws IOException {
Protocol.ReadRequest.Builder builder = mRequestBuilder.get()
.setLength(length)
.setOffset(position)
.clearCancel();
Protocol.ReadRequest readRequest = builder.build();
byte[] serializedBytes = readRequest.toByteArray();
ByteBuffer buf = ByteBuffer.allocateDirect(serializedBytes.length);
buf.put(serializedBytes);
buf.rewind();
UcpRequest sendRequest = mWorkerEndpoint.sendTaggedNonBlocking(buf, serializedBytes.length, new UcxCallback() {
public void onSuccess(UcpRequest request) {
LOG.info("ReadReq:{} sent.", readRequest);
}

public void onError(int ucsStatus, String errorMsg) {
throw new UcxException(errorMsg);
}
});
waitForRequest(sendRequest);
// now wait to recv data
Preconditions.checkArgument((buffer instanceof ByteBufferTargetBuffer
&& buffer.byteBuffer().isDirect()),
"Must be ByteBufferTargetBuffer with direct ByteBuffer");
UcpRequest recvRequest = mWorker.recvTaggedNonBlocking(
UcxUtils.getAddress(buffer.byteBuffer()), length, 0,0, null);
waitForRequest(recvRequest);
return 0;
}
}
40 changes: 40 additions & 0 deletions dora/core/client/fs/src/main/java/alluxio/worker/ucx/UcpProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package alluxio.worker.ucx;

import org.openucx.jucx.ucp.UcpContext;
import org.openucx.jucx.ucp.UcpParams;
import org.openucx.jucx.ucp.UcpWorker;
import org.openucx.jucx.ucp.UcpWorkerParams;

import java.util.concurrent.locks.ReentrantLock;

public class UcpProxy {

private static final UcpContext sGlobalContext = new UcpContext(new UcpParams()
.requestStreamFeature()
.requestTagFeature()
.requestWakeupFeature());
public UcpWorker mWorker;
public static UcpProxy sUcpProxy;
private static ReentrantLock sInstanceLock = new ReentrantLock();

public static UcpProxy getInstance() {
if (sUcpProxy != null) {
return sUcpProxy;
}
sInstanceLock.lock();
try {
if (sUcpProxy != null) {
return sUcpProxy;
}
sUcpProxy = new UcpProxy();
return sUcpProxy;
} finally {
sInstanceLock.unlock();
}
}

public UcpProxy() {
mWorker = sGlobalContext.newWorker(new UcpWorkerParams());
}

}
Loading

0 comments on commit 1472081

Please sign in to comment.