Skip to content

Commit

Permalink
Update local cache to catch 3.x
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?
cherry pick all local cache related stuff back to 2.x. The principle here is to keep Trino using the same code as 3.x

### Why are the changes needed?

Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
  1. change in user-facing APIs
  2. addition or removal of property keys
  3. webui

			pr-link: #18574
			change-id: cid-3e548ccfbfd290dcefd978a55221a92753f99118
  • Loading branch information
jja725 committed Apr 26, 2024
1 parent 254aa3d commit 3e520f1
Show file tree
Hide file tree
Showing 53 changed files with 3,291 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
package alluxio.client.file.cache;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.ByteArrayTargetBuffer;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.file.ReadTargetBuffer;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.metrics.MultiDimensionalMetricsSystem;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.resource.LockResource;

import com.codahale.metrics.Counter;
Expand All @@ -27,16 +30,19 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;

/**
* Interface for managing cached pages.
*/
public interface CacheManager extends AutoCloseable {
public interface CacheManager extends AutoCloseable, CacheStatus {
Logger LOG = LoggerFactory.getLogger(CacheManager.class);

/**
* State of a cache.
Expand Down Expand Up @@ -119,6 +125,12 @@ public static CacheManager create(AlluxioConfiguration conf,
try {
boolean isShadowCacheEnabled =
conf.getBoolean(PropertyKey.USER_CLIENT_CACHE_SHADOW_ENABLED);
boolean isNettyDataTransmissionEnable = false;
// Note that Netty data transmission doesn't support async write
if (isNettyDataTransmissionEnable) {
options.setIsAsyncWriteEnabled(false);
}
MultiDimensionalMetricsSystem.setCacheStorageSupplier(pageMetaStore::bytes);
if (isShadowCacheEnabled) {
return new NoExceptionCacheManager(
new CacheManagerWithShadowCache(LocalCacheManager.create(options, pageMetaStore),
Expand All @@ -140,6 +152,8 @@ static void clear() {
CacheManager manager = CACHE_MANAGER.getAndSet(null);
if (manager != null) {
manager.close();
MultiDimensionalMetricsSystem.setCacheStorageSupplier(
MultiDimensionalMetricsSystem.NULL_SUPPLIER);
}
} catch (Exception e) {
LOG.warn("Failed to close CacheManager: {}", e.toString());
Expand Down Expand Up @@ -236,6 +250,20 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
return get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer, CacheContext.defaults());
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param buffer destination buffer to write
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
default int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
CacheContext cacheContext) {
throw new UnsupportedOperationException("This method is unsupported. ");
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
Expand Down Expand Up @@ -263,9 +291,24 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, i
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext);

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
* Loads the page otherwise.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param bytesToRead number of bytes to read in this page
* @param buffer destination buffer to write
* @param cacheContext cache related context
* @param externalDataSupplier the external data supplier to read a page
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier);

/**
* Get page ids by the given file id.
* @param fileId file identifier
Expand All @@ -276,6 +319,20 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
throw new UnsupportedOperationException();
}

/**
* Deletes all pages of the given file.
*
* @param fileId the file id of the target file
*/
void deleteFile(String fileId);

/**
* Deletes all temporary pages of the given file.
*
* @param fileId the file id of the target file
*/
void deleteTempFile(String fileId);

/**
* Deletes a page from the cache.
*
Expand All @@ -289,6 +346,14 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
*/
State state();

/**
* @param pageId the page id
* @return true if the page is cached. This method is not thread-safe as no lock is acquired
*/
default boolean hasPageUnsafe(PageId pageId) {
throw new UnsupportedOperationException();
}

/**
*
* @param pageId
Expand All @@ -306,4 +371,25 @@ default List<PageId> getCachedPageIdsByFileId(String fileId, long fileLength) {
default void invalidate(Predicate<PageInfo> predicate) {
throw new UnsupportedOperationException();
}

@Override
Optional<CacheUsage> getUsage();

/**
* Commit the File.
* @param fileId the file ID
*/
void commitFile(String fileId);

/**
* Get a {@link DataFileChannel} which wraps a {@link io.netty.channel.FileRegion}.
* @param pageId the page id
* @param pageOffset the offset inside the page
* @param bytesToRead the bytes to read
* @param cacheContext the cache context
* @return an object of {@link DataFileChannel}
*/
Optional<DataFileChannel> getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, CacheContext cacheContext)
throws PageNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
import static alluxio.client.file.CacheContext.StatsUnit.BYTE;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.quota.CacheScope;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.DataFileChannel;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

/**
Expand All @@ -46,15 +50,33 @@ public CacheManagerWithShadowCache(CacheManager cacheManager, AlluxioConfigurati
mShadowCacheManager = ShadowCacheManager.create(conf);
}

@Override
public void commitFile(String fileId) {
mCacheManager.commitFile(fileId);
}

@Override
public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
updateShadowCache(pageId, page.remaining(), cacheContext);
return mCacheManager.put(pageId, page, cacheContext);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
CacheContext cacheContext) {
getOrUpdateShadowCache(pageId, bytesToRead, cacheContext);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
getOrUpdateShadowCache(pageId, bytesToRead, cacheContext);
return mCacheManager.getAndLoad(pageId, pageOffset, bytesToRead,
buffer, cacheContext, externalDataSupplier);
}

private void getOrUpdateShadowCache(PageId pageId, int bytesToRead, CacheContext cacheContext) {
int nread = mShadowCacheManager.get(pageId, bytesToRead, getCacheScope(cacheContext));
if (nread > 0) {
Metrics.SHADOW_CACHE_PAGES_HIT.inc();
Expand All @@ -64,7 +86,6 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuf
}
Metrics.SHADOW_CACHE_PAGES_READ.inc();
Metrics.SHADOW_CACHE_BYTES_READ.inc(bytesToRead);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

/**
Expand Down Expand Up @@ -142,6 +163,27 @@ public void close() throws Exception {
mCacheManager.close();
}

@Override
public void deleteFile(String fileId) {
mCacheManager.deleteFile(fileId);
}

@Override
public void deleteTempFile(String fileId) {
mCacheManager.deleteTempFile(fileId);
}

@Override
public Optional<CacheUsage> getUsage() {
return mCacheManager.getUsage();
}

@Override
public Optional<DataFileChannel> getDataFileChannel(PageId pageId, int pageOffset,
int bytesToRead, CacheContext cacheContext) throws PageNotFoundException {
return mCacheManager.getDataFileChannel(pageId, pageOffset, bytesToRead, cacheContext);
}

/**
* Decrease each item's clock and clean stale items.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.cache;

import java.util.Optional;

/**
* Mixin interface for various cache status info.
*/
public interface CacheStatus {
/**
* Gets cache usage.
*
* @return cache usage, or none if reporting usage info is not supported
*/
Optional<CacheUsage> getUsage();
}
Loading

0 comments on commit 3e520f1

Please sign in to comment.