Skip to content

Commit

Permalink
1. make LocalCacheManager.cache() available in interface
Browse files Browse the repository at this point in the history
2. allocate direct ByteBuffer and then register with UcpMemory to provide UcpMemory wrapped buffer to avoid ucx failure to allocate user mem ( mm_sysv.c:114  UCX  ERROR   failed to allocate 4096 bytes with mm for user memory )
3. make UcxReadTest#testClientServerr do random unaligned read, and add testStandaloneServer as a sanity test for standalone UcpServer
4. downgrade debugging logs' level from info to debug
5. remove standalone testing process class UcpClientTest
  • Loading branch information
lucyge2022 committed Dec 13, 2023
1 parent 7154d4f commit f0ba8ac
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 298 deletions.
3 changes: 0 additions & 3 deletions conf/start-UcpClientTest.sh

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
return mCacheManager.put(pageId, page, cacheContext);
}

@Override
public int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
return mCacheManager.cache(pageId, cacheContext, externalDataSupplier);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
CacheContext cacheContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
}
}

@Override
public int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
return mCacheManager.cache(pageId, cacheContext, externalDataSupplier);
}

@Override
public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void fail(Throwable ex) {
mFuture.completeExceptionally(ex);
}

public boolean get() throws ExecutionException, InterruptedException {
public boolean get() throws Throwable {
if (mCompletedCount.incrementAndGet() >= mTotalExpected.get()) {
mFuture.complete(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.util.io.ByteBufferOutputStream;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.openucx.jucx.UcxCallback;
import org.openucx.jucx.UcxException;
import org.openucx.jucx.UcxUtils;
Expand Down Expand Up @@ -56,17 +57,13 @@ public void handle(UcxMessage message, UcxConnection remoteConnection) {
// RPC message contains:
// client remote mem addr (long) | client remote mem addr Rkey buffer
// ReadRequest protobuf
LOG.info("[DEBUG], before read remote mem info, infobuffer capacity:{}:pos:{}:limit:{}.",
infoBuffer.capacity(), infoBuffer.position(), infoBuffer.limit());
readRequest = Protocol.ReadRequestRMA.parseFrom(infoBuffer.duplicate());
remoteMemAddress = readRequest.getRemoteMemAddr();
byte[] rkeyBufBytes = readRequest.getRkeyBuf().toByteArray();
ByteBuffer rkeyBuf = ByteBuffer.allocateDirect(rkeyBufBytes.length);
rkeyBuf.put(rkeyBufBytes);
rkeyBuf.clear();
remoteRKey = remoteEp.unpackRemoteKey(rkeyBuf);
LOG.info("[DEBUG], after read remote mem info, infobuffer capacity:{}:pos:{}:limit:{}.",
infoBuffer.capacity(), infoBuffer.position(), infoBuffer.limit());
} catch (IOException e) {
LOG.error("Exception in parsing RMA Read Request:", e);
throw new RuntimeException(e);
Expand All @@ -79,7 +76,7 @@ public void handle(UcxMessage message, UcxConnection remoteConnection) {
long pageSize = Configuration.global().getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);
long remoteAddrPosition = remoteMemAddress;

List<UcpRequest> requests = new ArrayList<>();
String errMsg = "";
AsyncFuture<String> asyncFuture = new AsyncFuture<>();
int totalRequests = 0;
int bytesRead = 0;
Expand All @@ -94,22 +91,24 @@ public void handle(UcxMessage message, UcxConnection remoteConnection) {
if (!readContentUcpMem.isPresent()) {
break;
}
LOG.info("PUT-ing to remoteAddr:{}", remoteAddrPosition);
LOG.debug("PUT-ing to remoteAddr:{}", remoteAddrPosition);
UcpRequest putRequest = remoteEp.putNonBlocking(readContentUcpMem.get().getAddress(),
readContentUcpMem.get().getLength(), remoteAddrPosition,
remoteRKey, new UcxCallback() {
public void onSuccess(UcpRequest request) {
LOG.info("onSuccess put pageid:{}:pageOffset:{}:len:{}",
LOG.debug("onSuccess put pageid:{}:pageOffset:{}:len:{}",
pageId, pageOffset, readLen);
asyncFuture.complete(String.format("pageid:%s:pageOffset:%d:len:%d",
pageId.toString(), pageOffset, readLen));
readContentUcpMem.get().deregister();
}

public void onError(int ucsStatus, String errorMsg) {
LOG.error("onError put pageid:{}:pageOffset:{}:len:{}"
+ " ucsStatus:{}:errMsg:{}",
pageId, pageOffset, readLen, ucsStatus, errorMsg);
asyncFuture.fail(new UcxException(errorMsg));
readContentUcpMem.get().deregister();
}
});
totalRequests += 1;
Expand All @@ -126,22 +125,28 @@ public void onError(int ucsStatus, String errorMsg) {
break;
}
} // end for
LOG.info("Handle RMA read req:{} complete, blockingly wait for all RMA PUT to compelte",
LOG.debug("Handle RMA read req:{} complete, blockingly wait for all RMA PUT to compelte",
readRequest);
asyncFuture.setTotalExpected(totalRequests);
try {
asyncFuture.get();
} catch (ExecutionException | InterruptedException e) {
throw new UnknownRuntimeException("Exception during waiting for RMA PUT to complete.");
} catch (Throwable e) {
errMsg = String.format("Error during read ufsPath:%s:fileId:%s:offset:%s:length:%s, exception:%s",
readRequest.getOpenUfsBlockOptions().getUfsPath(),
fileId, offset, totalLength, e.getMessage());
LOG.error(errMsg);
}

LOG.info("All PUT request completed, notifying client...");
Protocol.ReadResponseRMA readResponseRMA = Protocol.ReadResponseRMA.newBuilder()
.setReadLength(bytesRead).build();
byte[] responseBytes = readResponseRMA.toByteArray();
LOG.debug("All PUT request completed, notifying client...");
Protocol.ReadResponseRMA.Builder readResponseRMABuilder = Protocol.ReadResponseRMA.newBuilder()
.setReadLength(bytesRead);
if (StringUtils.isNotEmpty(errMsg)) {
readResponseRMABuilder.setErrorMsg(errMsg);
}
byte[] responseBytes = readResponseRMABuilder.build().toByteArray();
UcxMessage replyMessage = new UcxMessage(message.getMessageId(),
UcxMessage.Type.Reply,
ByteBuffer.wrap(responseBytes)); // reply -> Protocol.ReadResponseRMA
ByteBuffer.wrap(responseBytes));
UcpMemory relyMemoryBlock = UcxMemoryPool.allocateMemory(AlluxioUcxUtils.METADATA_SIZE_COMMON,
UcsConstants.MEMORY_TYPE.UCS_MEMORY_TYPE_HOST);
ByteBuffer replyBuffer = UcxUtils.getByteBufferView(relyMemoryBlock.getAddress(),
Expand All @@ -156,7 +161,6 @@ public void onError(int ucsStatus, String errorMsg) {
UcpRequest req = remoteConnection.getEndpoint().sendStreamNonBlocking(
replyBuffer, new UcxCallback() {
public void onSuccess(UcpRequest request) {
LOG.error("onSuccess");
relyMemoryBlock.deregister();
completed.complete(true);
}
Expand All @@ -167,7 +171,7 @@ public void onError(int ucsStatus, String errorMsg) {
throw new UcxException(errorMsg);
}
});
LOG.info("Blockingly wait for completion reply msg sending...");
LOG.debug("Blockingly wait for completion reply msg sending...");
boolean completeSending = false;
try {
completeSending = completed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.PageId;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.runtime.UnknownRuntimeException;
import alluxio.proto.dataserver.Protocol;
Expand All @@ -22,25 +23,19 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/**
* (Deprecated, unused now)
* Handling ReadRequest with UCX Stream API.
*/
public class ReadRequestStreamHandler implements UcxRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(UcpServer.class);
private static final long WORKER_PAGE_SIZE = 1*1024*1024L;
Protocol.ReadRequest mReadRequest = null;
UcpEndpoint mRemoteEp;
AtomicLong mSequencer;

public ReadRequestStreamHandler() {
}

public ReadRequestStreamHandler(UcpEndpoint remoteEndpoint, AtomicLong sequencer,
Protocol.ReadRequest request) {
mReadRequest = null;
mRemoteEp = remoteEndpoint;
mSequencer = sequencer;
}

@Override
public void handle(UcxMessage message, UcxConnection remoteConnection) {
mRemoteEp = remoteConnection.getEndpoint();
Expand All @@ -57,11 +52,12 @@ public void handle(UcxMessage message, UcxConnection remoteConnection) {
new AlluxioURI(mReadRequest.getOpenUfsBlockOptions().getUfsPath()).hash();
long offset = mReadRequest.getOffset();
long totalLength = mReadRequest.getLength();
long pageSize = Configuration.global().getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);
List<UcpRequest> requests = new ArrayList<>();
for (int bytesRead = 0; bytesRead < totalLength; ) {
int pageIndex = (int)(offset / WORKER_PAGE_SIZE);
int pageOffset = (int)(offset % WORKER_PAGE_SIZE);
int readLen = (int)Math.min(totalLength - bytesRead, WORKER_PAGE_SIZE - pageOffset);
int pageIndex = (int)(offset / pageSize);
int pageOffset = (int)(offset % pageSize);
int readLen = (int)Math.min(totalLength - bytesRead, pageSize - pageOffset);
PageId pageId = new PageId(fileId, pageIndex);
try {
Optional<UcpMemory> readContentUcpMem =
Expand All @@ -81,7 +77,7 @@ public void handle(UcxMessage message, UcxConnection remoteConnection) {
UcpRequest preambleReq = mRemoteEp.sendStreamNonBlocking(UcxUtils.getAddress(preamble),
16, new UcxCallback() {
public void onSuccess(UcpRequest request) {
LOG.info("preamble sent, sequence:{}, len:{}"
LOG.debug("preamble sent, sequence:{}, len:{}"
,seq, readContentUcpMem.get().getLength());
}

Expand All @@ -101,7 +97,7 @@ public void onError(int ucsStatus, String errorMsg) {
UcpRequest sendReq = mRemoteEp.sendStreamNonBlocking(
addrs, sizes, new UcxCallback() {
public void onSuccess(UcpRequest request) {
LOG.info("send complete for pageoffset:{}:readLen:{}",
LOG.debug("send complete for pageoffset:{}:readLen:{}",
pageOffset, readLen);
readContentUcpMem.get().deregister();
}
Expand All @@ -117,9 +113,9 @@ public void onError(int ucsStatus, String errorMsg) {
throw new RuntimeException(e);
}
}// end for
LOG.info("Handle read req:{} complete", mReadRequest);
LOG.debug("Handle read req:{} complete", mReadRequest);
while (requests.stream().anyMatch(r -> !r.isCompleted())) {
LOG.info("Wait for all {} ucpreq to complete, sleep for 5 sec...", requests.size());
LOG.debug("Wait for all {} ucpreq to complete, sleep for 5 sec...", requests.size());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Expand Down

This file was deleted.

Loading

0 comments on commit f0ba8ac

Please sign in to comment.