From dc3a429af4fb779a53d41988649a81fdb1c62c3e Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Tue, 30 Dec 2025 20:05:36 +0800 Subject: [PATCH 1/8] JindoFileIO support cache using JindoCache --- .../paimon/rest/RESTCatalogOptions.java | 7 + .../apache/paimon/rest/RESTTokenFileIO.java | 31 +++++ .../apache/paimon/rest/RESTCatalogTest.java | 62 +++++++++ .../paimon/jindo/HadoopCompliantFileIO.java | 126 +++++++++++++++--- .../org/apache/paimon/jindo/JindoFileIO.java | 20 ++- .../jindo/JindoMultiPartUploadCommitter.java | 2 +- 6 files changed, 222 insertions(+), 26 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index d1a88ef4f839..4e1211c1b2f0 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -103,4 +103,11 @@ public class RESTCatalogOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog DLF OSS endpoint."); + + public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = + ConfigOptions.key("dlf.file-io.cache.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index d59ca6dd47c5..c8480ebcc577 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -45,11 +45,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; /** A {@link FileIO} to support getting token from REST Server. */ @@ -63,6 +66,13 @@ public class RESTTokenFileIO implements FileIO { .defaultValue(false) .withDescription("Whether to support data token provided by the REST server."); + public static final ConfigOption FILE_IO_CACHE_POLICY = + ConfigOptions.key("dlf.file-io.cache.policy") + .stringType() + .noDefaultValue() + .withDescription( + "The cache policy of a table provided by the REST server, combined with: meta,read,write"); + private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() .maximumSize(1000) @@ -239,6 +249,27 @@ private Map mergeTokenWithCatalogOptions(Map tok if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) { newToken.put("fs.oss.endpoint", dlfOssEndpoint); } + + // Process file io cache configuration + if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) { + // Disable file io cache, remove the cache policy configs + newToken.remove(FILE_IO_CACHE_POLICY.key()); + } else { + // Enable file io cache, reorder cache policy in fixed order, + // and allow user to override policy provided by REST server. + String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY); + if (cachePolicy == null) { + cachePolicy = token.get(FILE_IO_CACHE_POLICY.key()); + } + if (cachePolicy != null) { + Set cachePolicySet = new TreeSet<>(); + for (String policy : cachePolicy.split(",")) { + cachePolicySet.add(policy.trim().toLowerCase()); + } + newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet)); + } + } + return ImmutableMap.copyOf(newToken); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 288c925ceb23..bf0143dfc2d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -108,6 +108,7 @@ import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; @@ -2694,6 +2695,67 @@ void testReadPartitionsTable() throws Exception { } } + @Test + void testEnableFileIOCache() throws Exception { + // Enable cache at client-side + Map options = new HashMap<>(); + options.put( + DLF_FILE_IO_CACHE_ENABLED.key(), + "true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + "dlf.file-io.cache.policy", + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertEquals(cachePolicy, fileDataToken.token().get("dlf.file-io.cache.policy")); + } + + @Test + void testDisableFileIOCache() throws Exception { + // Disable cache at client-side + Map options = new HashMap<>(); + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + "dlf.file-io.cache.policy", + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertNull(fileDataToken.token().get("dlf.file-io.cache.policy")); + } + private TestPagedResponse generateTestPagedResponse( Map queryParams, List testData, diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 3ced092f1d68..219de5608dc1 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -18,6 +18,7 @@ package org.apache.paimon.jindo; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -27,31 +28,95 @@ import org.apache.paimon.fs.VectoredReadable; import org.apache.paimon.utils.Pair; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import com.aliyun.jindodata.common.JindoHadoopSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; + /** * Hadoop {@link FileIO}. * *

Important: copy this class from HadoopFileIO here to avoid class loader conflicts. */ public abstract class HadoopCompliantFileIO implements FileIO { + private static final Logger LOG = LoggerFactory.getLogger(HadoopCompliantFileIO.class); private static final long serialVersionUID = 1L; + /// Detailed cache strategies are retrieved from REST server. + private static final String META_CACHE_ENABLED_TAG = "meta"; + private static final String READ_CACHE_ENABLED_TAG = "read"; + private static final String WRITE_CACHE_ENABLED_TAG = "write"; + + private boolean metaCacheEnabled = false; + private boolean readCacheEnabled = false; + private boolean writeCacheEnabled = false; + protected transient volatile Map> fsMap; + protected transient volatile Map> jindoCacheFsMap; + + // Only enable cache for path which is generated with uuid + private static final List CACHE_WHITELIST_PATH_PATTERN = + Lists.newArrayList("bucket-", "manifest"); + + private boolean shouldCache(Path path) { + String pathStr = path.toUri().getPath(); + for (String pattern : CACHE_WHITELIST_PATH_PATTERN) { + if (pathStr.contains(pattern)) { + return true; + } + } + return false; + } + + @Override + public void configure(CatalogContext context) { + if (context.options().get(DLF_FILE_IO_CACHE_ENABLED) + && context.options().get(FILE_IO_CACHE_POLICY) != null) { + if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { + LOG.info( + "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); + } else { + metaCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(META_CACHE_ENABLED_TAG); + readCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(READ_CACHE_ENABLED_TAG); + writeCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(WRITE_CACHE_ENABLED_TAG); + LOG.info( + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}", + metaCacheEnabled, + readCacheEnabled, + writeCacheEnabled); + } + } + } @Override public SeekableInputStream newInputStream(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + boolean shouldCache = readCacheEnabled && shouldCache(path); + LOG.debug("InputStream should cache {} for path {}", shouldCache, path); + Pair pair = getFileSystemPair(hadoopPath, shouldCache); JindoHadoopSystem fs = pair.getKey(); String sysType = pair.getValue(); FSDataInputStream fsInput = fs.open(hadoopPath); @@ -63,14 +128,19 @@ public SeekableInputStream newInputStream(Path path) throws IOException { @Override public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); + boolean shouldCache = writeCacheEnabled && shouldCache(path); + LOG.debug("OutputStream should cache {} for path {}", shouldCache, path); return new HadoopPositionOutputStream( - getFileSystem(hadoopPath).create(hadoopPath, overwrite)); + getFileSystem(hadoopPath, shouldCache).create(hadoopPath, overwrite)); } @Override public FileStatus getFileStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return new HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath)); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("GetFileStatus should cache {} for path {}", shouldCache, path); + return new HadoopFileStatus( + getFileSystem(hadoopPath, shouldCache).getFileStatus(hadoopPath)); } @Override @@ -78,7 +148,7 @@ public FileStatus[] listStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); FileStatus[] statuses = new FileStatus[0]; org.apache.hadoop.fs.FileStatus[] hadoopStatuses = - getFileSystem(hadoopPath).listStatus(hadoopPath); + getFileSystem(hadoopPath, false).listStatus(hadoopPath); if (hadoopStatuses != null) { statuses = new FileStatus[hadoopStatuses.length]; for (int i = 0; i < hadoopStatuses.length; i++) { @@ -93,7 +163,7 @@ public RemoteIterator listFilesIterative(Path path, boolean recursiv throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); org.apache.hadoop.fs.RemoteIterator hadoopIter = - getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + getFileSystem(hadoopPath, false).listFiles(hadoopPath, recursive); return new RemoteIterator() { @Override public boolean hasNext() throws IOException { @@ -111,26 +181,28 @@ public FileStatus next() throws IOException { @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).exists(hadoopPath); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("Exists should cache {} for path {}", shouldCache, path); + return getFileSystem(hadoopPath, shouldCache).exists(hadoopPath); } @Override public boolean delete(Path path, boolean recursive) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).delete(hadoopPath, recursive); + return getFileSystem(hadoopPath, false).delete(hadoopPath, recursive); } @Override public boolean mkdirs(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).mkdirs(hadoopPath); + return getFileSystem(hadoopPath, false).mkdirs(hadoopPath); } @Override public boolean rename(Path src, Path dst) throws IOException { org.apache.hadoop.fs.Path hadoopSrc = path(src); org.apache.hadoop.fs.Path hadoopDst = path(dst); - return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); + return getFileSystem(hadoopSrc, false).rename(hadoopSrc, hadoopDst); } protected org.apache.hadoop.fs.Path path(Path path) { @@ -141,22 +213,34 @@ protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } - protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { - return getFileSystemPair(path).getKey(); + protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path, boolean enableCache) + throws IOException { + return getFileSystemPair(path, enableCache).getKey(); } - protected Pair getFileSystemPair(org.apache.hadoop.fs.Path path) - throws IOException { - if (fsMap == null) { - synchronized (this) { - if (fsMap == null) { - fsMap = new ConcurrentHashMap<>(); + protected Pair getFileSystemPair( + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException { + Map> map; + if (enableCache) { + if (jindoCacheFsMap == null) { + synchronized (this) { + if (jindoCacheFsMap == null) { + jindoCacheFsMap = new ConcurrentHashMap<>(); + } + } + } + map = jindoCacheFsMap; + } else { + if (fsMap == null) { + synchronized (this) { + if (fsMap == null) { + fsMap = new ConcurrentHashMap<>(); + } } } + map = fsMap; } - Map> map = fsMap; - String authority = path.toUri().getAuthority(); if (authority == null) { authority = "DEFAULT"; @@ -166,7 +250,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs authority, k -> { try { - return createFileSystem(path); + return createFileSystem(path, enableCache); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -177,7 +261,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs } protected abstract Pair createFileSystem( - org.apache.hadoop.fs.Path path) throws IOException; + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException; private static class HadoopSeekableInputStream extends SeekableInputStream { diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 0aa0939d50f7..40d67d4ed9b3 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -82,6 +82,7 @@ public class JindoFileIO extends HadoopCompliantFileIO { new ConcurrentHashMap<>(); private Options hadoopOptions; + private Options hadoopOptionsWithCache; private boolean allowCache = true; @Override @@ -91,6 +92,7 @@ public boolean isObjectStore() { @Override public void configure(CatalogContext context) { + super.configure(context); allowCache = context.options().get(FILE_IO_ALLOW_CACHE); hadoopOptions = new Options(); // read all configuration with prefix 'CONFIG_PREFIXES' @@ -127,6 +129,14 @@ public void configure(CatalogContext context) { .iterator() .forEachRemaining(entry -> hadoopOptions.set(entry.getKey(), entry.getValue())); } + + // another config when enable cache + hadoopOptionsWithCache = new Options(hadoopOptions.toMap()); + hadoopOptionsWithCache.set("fs.xengine", "jindocache"); + // Workaround: following configurations to avoid bug in some JindoSDK versions + hadoopOptionsWithCache.set("fs.oss.read.profile.columnar.use-pread", "false"); + hadoopOptionsWithCache.set( + "fs.jindocache.read.profile.columnar.readahead.pread.enable", "false"); } public Options hadoopOptions() { @@ -140,20 +150,22 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite throw new IOException("File " + path + " already exists."); } org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + Pair pair = getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoTwoPhaseOutputStream( new JindoMultiPartUpload(fs, hadoopPath), hadoopPath, path); } @Override - protected Pair createFileSystem(org.apache.hadoop.fs.Path path) { + protected Pair createFileSystem( + org.apache.hadoop.fs.Path path, boolean enableCache) { final String scheme = path.toUri().getScheme(); final String authority = path.toUri().getAuthority(); + Options options = enableCache ? hadoopOptionsWithCache : hadoopOptions; Supplier> supplier = () -> { Configuration hadoopConf = new Configuration(false); - hadoopOptions.toMap().forEach(hadoopConf::set); + options.toMap().forEach(hadoopConf::set); URI fsUri = path.toUri(); if (scheme == null && authority == null) { fsUri = FileSystem.getDefaultUri(hadoopConf); @@ -185,7 +197,7 @@ protected Pair createFileSystem(org.apache.hadoop.fs. if (allowCache) { return CACHE.computeIfAbsent( - new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get()); + new CacheKey(options, scheme, authority), key -> supplier.get()); } else { return supplier.get(); } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java index 435a0bc49a8f..0f8ba549abba 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java @@ -47,7 +47,7 @@ protected MultiPartUploadStore multiPartUploa FileIO fileIO, Path targetPath) throws IOException { JindoFileIO jindoFileIO = (JindoFileIO) fileIO; org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath); - Pair pair = jindoFileIO.getFileSystemPair(hadoopPath); + Pair pair = jindoFileIO.getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoMultiPartUpload(fs, hadoopPath); } From d4e35e86ac0149472b35c15871ed77bb2af65e4a Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 5 Jan 2026 20:36:12 +0800 Subject: [PATCH 2/8] Refactor config --- .../org/apache/paimon/rest/RESTCatalogOptions.java | 2 +- .../org/apache/paimon/rest/RESTTokenFileIO.java | 2 +- .../org/apache/paimon/rest/RESTCatalogTest.java | 13 +++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 4e1211c1b2f0..8799ca3b4716 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -105,7 +105,7 @@ public class RESTCatalogOptions { .withDescription("REST Catalog DLF OSS endpoint."); public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = - ConfigOptions.key("dlf.file-io.cache.enabled") + ConfigOptions.key("dlf.io-cache-enabled") .booleanType() .defaultValue(false) .withDescription( diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index c8480ebcc577..6d1464b12082 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -67,7 +67,7 @@ public class RESTTokenFileIO implements FileIO { .withDescription("Whether to support data token provided by the REST server."); public static final ConfigOption FILE_IO_CACHE_POLICY = - ConfigOptions.key("dlf.file-io.cache.policy") + ConfigOptions.key("dlf.io-cache.policy") .stringType() .noDefaultValue() .withDescription( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index bf0143dfc2d8..55c9392da9a2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -2713,18 +2713,19 @@ void testEnableFileIOCache() throws Exception { "akId", "akSecret", UUID.randomUUID().toString(), - "dlf.file-io.cache.policy", + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), System.currentTimeMillis() + 3600_000L); setDataTokenToRestServerForMock(identifier, token); createTable( identifier, - ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), Lists.newArrayList("col1")); FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); RESTToken fileDataToken = fileIO.validToken(); - assertEquals(cachePolicy, fileDataToken.token().get("dlf.file-io.cache.policy")); + assertEquals( + cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); } @Test @@ -2742,18 +2743,18 @@ void testDisableFileIOCache() throws Exception { "akId", "akSecret", UUID.randomUUID().toString(), - "dlf.file-io.cache.policy", + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), System.currentTimeMillis() + 3600_000L); setDataTokenToRestServerForMock(identifier, token); createTable( identifier, - ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), Lists.newArrayList("col1")); FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); RESTToken fileDataToken = fileIO.validToken(); - assertNull(fileDataToken.token().get("dlf.file-io.cache.policy")); + assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); } private TestPagedResponse generateTestPagedResponse( From 8c12687298c205a3d30437f2c69c42819467541c Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 12 Jan 2026 16:46:13 +0800 Subject: [PATCH 3/8] Support cache for lance format --- .../paimon/rest/RESTCatalogOptions.java | 6 +++++ .../apache/paimon/rest/RESTTokenFileIO.java | 5 +++- .../paimon/jindo/HadoopCompliantFileIO.java | 27 ++++++++++++------- .../org/apache/paimon/jindo/JindoFileIO.java | 23 ++++++++++++++-- .../format/lance/LanceReaderFactory.java | 4 +-- .../paimon/format/lance/LanceUtils.java | 15 +++++++++-- .../format/lance/LanceWriterFactory.java | 4 +-- 7 files changed, 66 insertions(+), 18 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 8799ca3b4716..238cf4c72c3f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -110,4 +110,10 @@ public class RESTCatalogOptions { .defaultValue(false) .withDescription( "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); + public static final ConfigOption DLF_FILE_IO_CACHE_WHITELIST_PATH = + ConfigOptions.key("dlf.io-cache.whitelist-path") + .stringType() + .defaultValue("bucket-,manifest") + .withDescription( + "Cache is only applied to paths which contain the specified pattern, and * means all paths."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index 6d1464b12082..acc2489c86f1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -71,7 +71,10 @@ public class RESTTokenFileIO implements FileIO { .stringType() .noDefaultValue() .withDescription( - "The cache policy of a table provided by the REST server, combined with: meta,read,write"); + "The cache policy of a table provided by the REST server, combined with: meta,read,write." + + "`meta`: meta cache is enabled for visiting files; " + + "`read`: cache is enabled when reading files; " + + "`write`: data is also cached when writing files."); private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 219de5608dc1..2be789d02d93 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -39,11 +39,13 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; /** @@ -61,20 +63,22 @@ public abstract class HadoopCompliantFileIO implements FileIO { private static final String READ_CACHE_ENABLED_TAG = "read"; private static final String WRITE_CACHE_ENABLED_TAG = "write"; - private boolean metaCacheEnabled = false; - private boolean readCacheEnabled = false; - private boolean writeCacheEnabled = false; + protected boolean metaCacheEnabled = false; + protected boolean readCacheEnabled = false; + protected boolean writeCacheEnabled = false; protected transient volatile Map> fsMap; protected transient volatile Map> jindoCacheFsMap; // Only enable cache for path which is generated with uuid - private static final List CACHE_WHITELIST_PATH_PATTERN = - Lists.newArrayList("bucket-", "manifest"); + private List cacheWhitelistPaths = new ArrayList<>(); - private boolean shouldCache(Path path) { + protected boolean shouldCache(Path path) { + if (cacheWhitelistPaths.isEmpty()) { + return true; + } String pathStr = path.toUri().getPath(); - for (String pattern : CACHE_WHITELIST_PATH_PATTERN) { + for (String pattern : cacheWhitelistPaths) { if (pathStr.contains(pattern)) { return true; } @@ -102,11 +106,16 @@ public void configure(CatalogContext context) { context.options() .get(FILE_IO_CACHE_POLICY) .contains(WRITE_CACHE_ENABLED_TAG); + String whitelist = context.options().get(DLF_FILE_IO_CACHE_WHITELIST_PATH); + if (!whitelist.equals("*")) { + cacheWhitelistPaths = Lists.newArrayList(whitelist.split(",")); + } LOG.info( - "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}", + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}, whitelist path: {}", metaCacheEnabled, readCacheEnabled, - writeCacheEnabled); + writeCacheEnabled, + whitelist); } } } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 40d67d4ed9b3..2960dae23bbf 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -139,8 +139,27 @@ public void configure(CatalogContext context) { "fs.jindocache.read.profile.columnar.readahead.pread.enable", "false"); } - public Options hadoopOptions() { - return hadoopOptions; + /** + * This method is used to initialize some thirdparty connector, such as Lance reader/writer. + * + * @param path file path + * @param opType read/write/meta + * @return + */ + public Options hadoopOptions(Path path, String opType) { + boolean shouldCache = false; + if (opType.equalsIgnoreCase("read")) { + shouldCache = readCacheEnabled && shouldCache(path); + } else if (opType.equalsIgnoreCase("write")) { + shouldCache = writeCacheEnabled && shouldCache(path); + } else if (opType.equalsIgnoreCase("meta")) { + shouldCache = metaCacheEnabled && shouldCache(path); + } + if (shouldCache) { + return hadoopOptionsWithCache; + } else { + return hadoopOptions; + } } @Override diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java index 36de644b5b37..d1a27e10afd4 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecified; +import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecifiedForReader; /** A factory to create Lance Reader. */ public class LanceReaderFactory implements FormatReaderFactory { @@ -55,7 +55,7 @@ public FileRecordReader createReader(Context context) throws IOExce } Pair> lanceSpecified = - toLanceSpecified(context.fileIO(), context.filePath()); + toLanceSpecifiedForReader(context.fileIO(), context.filePath()); return new LanceRecordsReader( lanceSpecified.getLeft(), selectionRangesArray, diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java index 407dc4238583..3748d0403cb4 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java @@ -90,7 +90,18 @@ public class LanceUtils { hadoopFileIOKlass = klass; } - public static Pair> toLanceSpecified(FileIO fileIO, Path path) { + public static Pair> toLanceSpecifiedForReader( + FileIO fileIO, Path path) { + return toLanceSpecified(fileIO, path, true); + } + + public static Pair> toLanceSpecifiedForWriter( + FileIO fileIO, Path path) { + return toLanceSpecified(fileIO, path, false); + } + + private static Pair> toLanceSpecified( + FileIO fileIO, Path path, boolean isRead) { URI uri = path.toUri(); String schema = uri.getScheme(); @@ -107,7 +118,7 @@ public static Pair> toLanceSpecified(FileIO fileIO, Pa if (ossFileIOKlass != null && ossFileIOKlass.isInstance(fileIO)) { originOptions = ((OSSFileIO) fileIO).hadoopOptions(); } else if (jindoFileIOKlass != null && jindoFileIOKlass.isInstance(fileIO)) { - originOptions = ((JindoFileIO) fileIO).hadoopOptions(); + originOptions = ((JindoFileIO) fileIO).hadoopOptions(path, isRead ? "read" : "write"); } else if (pluginFileIOKlass != null && pluginFileIOKlass.isInstance(fileIO)) { originOptions = ((PluginFileIO) fileIO).options(); } else if (hadoopFileIOKlass != null && hadoopFileIOKlass.isInstance(fileIO)) { diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java index 70a531c4d6bd..afa21edd86a0 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.function.Supplier; -import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecified; +import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecifiedForWriter; /** A factory to create Lance {@link FormatWriter}. */ public class LanceWriterFactory implements FormatWriterFactory, SupportsDirectWrite { @@ -52,7 +52,7 @@ public FormatWriter create(PositionOutputStream positionOutputStream, String com @Override public FormatWriter create(FileIO fileIO, Path path, String compression) throws IOException { - Pair> lanceSpecified = toLanceSpecified(fileIO, path); + Pair> lanceSpecified = toLanceSpecifiedForWriter(fileIO, path); LanceWriter lanceWriter = new LanceWriter(lanceSpecified.getLeft().toString(), lanceSpecified.getRight()); return new LanceRecordsWriter( From d0673cfd2d660527b89672371b589a499a3ae9c2 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 12 Jan 2026 17:36:01 +0800 Subject: [PATCH 4/8] Rebase and fix build --- .../java/org/apache/paimon/format/lance/LanceUtilsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java index d00f9a3dd635..0a0d28c3bfc5 100644 --- a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java +++ b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java @@ -67,7 +67,7 @@ void testOssUrlConversion() { TestFileIO fileIO = new TestFileIO(); fileIO.setOptions(options); - Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + Pair> result = LanceUtils.toLanceSpecifiedForReader(fileIO, path); assertTrue(result.getKey().toString().startsWith("oss://test-bucket/")); @@ -97,7 +97,7 @@ void testOssUrlWithSecurityToken() { TestFileIO fileIO = new TestFileIO(); fileIO.setOptions(options); - Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + Pair> result = LanceUtils.toLanceSpecifiedForReader(fileIO, path); Map storageOptions = result.getValue(); assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_SESSION_TOKEN)); From 9f78609528bfb723ac5083a282d3ef7b849860a0 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Tue, 13 Jan 2026 18:31:46 +0800 Subject: [PATCH 5/8] Support negative cache policy --- .../src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java | 3 ++- .../java/org/apache/paimon/jindo/HadoopCompliantFileIO.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index acc2489c86f1..e41a3d64c6ab 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -74,7 +74,8 @@ public class RESTTokenFileIO implements FileIO { "The cache policy of a table provided by the REST server, combined with: meta,read,write." + "`meta`: meta cache is enabled for visiting files; " + "`read`: cache is enabled when reading files; " - + "`write`: data is also cached when writing files."); + + "`write`: data is also cached when writing files; " + + "`none`: cache is all disabled."); private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 2be789d02d93..1aae64a180d0 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -62,6 +62,7 @@ public abstract class HadoopCompliantFileIO implements FileIO { private static final String META_CACHE_ENABLED_TAG = "meta"; private static final String READ_CACHE_ENABLED_TAG = "read"; private static final String WRITE_CACHE_ENABLED_TAG = "write"; + private static final String DISABLE_CACHE_TAG = "none"; protected boolean metaCacheEnabled = false; protected boolean readCacheEnabled = false; @@ -89,7 +90,8 @@ protected boolean shouldCache(Path path) { @Override public void configure(CatalogContext context) { if (context.options().get(DLF_FILE_IO_CACHE_ENABLED) - && context.options().get(FILE_IO_CACHE_POLICY) != null) { + && context.options().get(FILE_IO_CACHE_POLICY) != null + && !context.options().get(FILE_IO_CACHE_POLICY).contains(DISABLE_CACHE_TAG)) { if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { LOG.info( "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); From 46e465af8c45e2fe0a818361fe8edc2936917226 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 14 Jan 2026 15:37:57 +0800 Subject: [PATCH 6/8] Refactor RESTTokenFileIO --- .../paimon/rest/RESTCatalogOptions.java | 10 +++ .../apache/paimon/rest/RESTTokenFileIO.java | 34 ---------- .../apache/paimon/rest/RESTCatalogTest.java | 63 ------------------- .../paimon/jindo/HadoopCompliantFileIO.java | 63 ++++++++++--------- .../org/apache/paimon/jindo/JindoFileIO.java | 4 ++ 5 files changed, 47 insertions(+), 127 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 238cf4c72c3f..876dba4969a9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -116,4 +116,14 @@ public class RESTCatalogOptions { .defaultValue("bucket-,manifest") .withDescription( "Cache is only applied to paths which contain the specified pattern, and * means all paths."); + public static final ConfigOption DLF_FILE_IO_CACHE_POLICY = + ConfigOptions.key("dlf.io-cache.policy") + .stringType() + .noDefaultValue() + .withDescription( + "The table-level cache policy provided by the REST server, combined with: meta,read,write." + + "`meta`: meta cache is enabled for visiting files; " + + "`read`: cache is enabled when reading files; " + + "`write`: data is also cached when writing files; " + + "`none`: cache is all disabled."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index e41a3d64c6ab..1af62c2f1243 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -45,14 +45,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; /** A {@link FileIO} to support getting token from REST Server. */ @@ -66,17 +63,6 @@ public class RESTTokenFileIO implements FileIO { .defaultValue(false) .withDescription("Whether to support data token provided by the REST server."); - public static final ConfigOption FILE_IO_CACHE_POLICY = - ConfigOptions.key("dlf.io-cache.policy") - .stringType() - .noDefaultValue() - .withDescription( - "The cache policy of a table provided by the REST server, combined with: meta,read,write." - + "`meta`: meta cache is enabled for visiting files; " - + "`read`: cache is enabled when reading files; " - + "`write`: data is also cached when writing files; " - + "`none`: cache is all disabled."); - private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() .maximumSize(1000) @@ -254,26 +240,6 @@ private Map mergeTokenWithCatalogOptions(Map tok newToken.put("fs.oss.endpoint", dlfOssEndpoint); } - // Process file io cache configuration - if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) { - // Disable file io cache, remove the cache policy configs - newToken.remove(FILE_IO_CACHE_POLICY.key()); - } else { - // Enable file io cache, reorder cache policy in fixed order, - // and allow user to override policy provided by REST server. - String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY); - if (cachePolicy == null) { - cachePolicy = token.get(FILE_IO_CACHE_POLICY.key()); - } - if (cachePolicy != null) { - Set cachePolicySet = new TreeSet<>(); - for (String policy : cachePolicy.split(",")) { - cachePolicySet.add(policy.trim().toLowerCase()); - } - newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet)); - } - } - return ImmutableMap.copyOf(newToken); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 55c9392da9a2..288c925ceb23 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -108,7 +108,6 @@ import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; @@ -2695,68 +2694,6 @@ void testReadPartitionsTable() throws Exception { } } - @Test - void testEnableFileIOCache() throws Exception { - // Enable cache at client-side - Map options = new HashMap<>(); - options.put( - DLF_FILE_IO_CACHE_ENABLED.key(), - "true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache - this.catalog = newRestCatalogWithDataToken(options); - Identifier identifier = - Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); - String cachePolicy = "meta,read"; - RESTToken token = - new RESTToken( - ImmutableMap.of( - "akId", - "akId", - "akSecret", - UUID.randomUUID().toString(), - RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), - cachePolicy), - System.currentTimeMillis() + 3600_000L); - setDataTokenToRestServerForMock(identifier, token); - createTable( - identifier, - ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), - Lists.newArrayList("col1")); - FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); - RESTToken fileDataToken = fileIO.validToken(); - assertEquals( - cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); - } - - @Test - void testDisableFileIOCache() throws Exception { - // Disable cache at client-side - Map options = new HashMap<>(); - this.catalog = newRestCatalogWithDataToken(options); - Identifier identifier = - Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); - String cachePolicy = "meta,read"; - RESTToken token = - new RESTToken( - ImmutableMap.of( - "akId", - "akId", - "akSecret", - UUID.randomUUID().toString(), - RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), - cachePolicy), - System.currentTimeMillis() + 3600_000L); - setDataTokenToRestServerForMock(identifier, token); - createTable( - identifier, - ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), - Lists.newArrayList("col1")); - FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); - RESTToken fileDataToken = fileIO.validToken(); - assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); - } - private TestPagedResponse generateTestPagedResponse( Map queryParams, List testData, diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 1aae64a180d0..a7104457e312 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -45,8 +45,8 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_POLICY; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; -import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; /** * Hadoop {@link FileIO}. @@ -89,36 +89,39 @@ protected boolean shouldCache(Path path) { @Override public void configure(CatalogContext context) { - if (context.options().get(DLF_FILE_IO_CACHE_ENABLED) - && context.options().get(FILE_IO_CACHE_POLICY) != null - && !context.options().get(FILE_IO_CACHE_POLICY).contains(DISABLE_CACHE_TAG)) { - if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { - LOG.info( - "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); - } else { - metaCacheEnabled = - context.options() - .get(FILE_IO_CACHE_POLICY) - .contains(META_CACHE_ENABLED_TAG); - readCacheEnabled = - context.options() - .get(FILE_IO_CACHE_POLICY) - .contains(READ_CACHE_ENABLED_TAG); - writeCacheEnabled = - context.options() - .get(FILE_IO_CACHE_POLICY) - .contains(WRITE_CACHE_ENABLED_TAG); - String whitelist = context.options().get(DLF_FILE_IO_CACHE_WHITELIST_PATH); - if (!whitelist.equals("*")) { - cacheWhitelistPaths = Lists.newArrayList(whitelist.split(",")); - } - LOG.info( - "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}, whitelist path: {}", - metaCacheEnabled, - readCacheEnabled, - writeCacheEnabled, - whitelist); + // Process file io cache configuration + if (!context.options().get(DLF_FILE_IO_CACHE_ENABLED) + || context.options().get(DLF_FILE_IO_CACHE_POLICY) == null + || context.options().get(DLF_FILE_IO_CACHE_POLICY).contains(DISABLE_CACHE_TAG)) { + return; + } + // Enable file io cache + if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { + LOG.info( + "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); + } else { + metaCacheEnabled = + context.options() + .get(DLF_FILE_IO_CACHE_POLICY) + .contains(META_CACHE_ENABLED_TAG); + readCacheEnabled = + context.options() + .get(DLF_FILE_IO_CACHE_POLICY) + .contains(READ_CACHE_ENABLED_TAG); + writeCacheEnabled = + context.options() + .get(DLF_FILE_IO_CACHE_POLICY) + .contains(WRITE_CACHE_ENABLED_TAG); + String whitelist = context.options().get(DLF_FILE_IO_CACHE_WHITELIST_PATH); + if (!whitelist.equals("*")) { + cacheWhitelistPaths = Lists.newArrayList(whitelist.split(",")); } + LOG.info( + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}, whitelist path: {}", + metaCacheEnabled, + readCacheEnabled, + writeCacheEnabled, + whitelist); } } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 2960dae23bbf..909cf247e7d2 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -133,6 +133,10 @@ public void configure(CatalogContext context) { // another config when enable cache hadoopOptionsWithCache = new Options(hadoopOptions.toMap()); hadoopOptionsWithCache.set("fs.xengine", "jindocache"); + if (!hadoopOptionsWithCache.containsKey("fs.jindocache.client.metrics.enable")) { + // enable metrics report by default + hadoopOptionsWithCache.set("fs.jindocache.client.metrics.enable", "true"); + } // Workaround: following configurations to avoid bug in some JindoSDK versions hadoopOptionsWithCache.set("fs.oss.read.profile.columnar.use-pread", "false"); hadoopOptionsWithCache.set( From 59cd04c4e53854302a7257884762e3c8398f441c Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 14 Jan 2026 17:51:17 +0800 Subject: [PATCH 7/8] Add test for JindoFileIO --- .../apache/paimon/rest/RESTTokenFileIO.java | 1 - paimon-filesystems/paimon-jindo/pom.xml | 35 ++++ .../paimon/jindo/HadoopCompliantFileIO.java | 2 +- .../paimon/jindo/TestJindoCacheEnable.java | 180 ++++++++++++++++++ 4 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index 1af62c2f1243..d59ca6dd47c5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -239,7 +239,6 @@ private Map mergeTokenWithCatalogOptions(Map tok if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) { newToken.put("fs.oss.endpoint", dlfOssEndpoint); } - return ImmutableMap.copyOf(newToken); } diff --git a/paimon-filesystems/paimon-jindo/pom.xml b/paimon-filesystems/paimon-jindo/pom.xml index 8bd96b0b065f..eafba044cc2a 100644 --- a/paimon-filesystems/paimon-jindo/pom.xml +++ b/paimon-filesystems/paimon-jindo/pom.xml @@ -88,5 +88,40 @@ + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + test + + + org.apache.avro + avro + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + commons-collections + commons-collections + 3.2.2 + test + + + + commons-logging + commons-logging + 1.1.3 + test + \ No newline at end of file diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index a7104457e312..3f0e7226ad28 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -74,7 +74,7 @@ public abstract class HadoopCompliantFileIO implements FileIO { // Only enable cache for path which is generated with uuid private List cacheWhitelistPaths = new ArrayList<>(); - protected boolean shouldCache(Path path) { + boolean shouldCache(Path path) { if (cacheWhitelistPaths.isEmpty()) { return true; } diff --git a/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java new file mode 100644 index 000000000000..7e1db5b6a172 --- /dev/null +++ b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jindo; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; + +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_POLICY; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for Jindo cache enable configuration. */ +public class TestJindoCacheEnable { + + private static final String JINDO_CACHE_RPC_ADDRESS = "fs.jindocache.namespace.rpc.address"; + + private CatalogContext createCatalogContext( + boolean cacheEnabled, + String cachePolicy, + boolean setRpcAddress, + String cacheWhitelist) { + Options options = new Options(); + options.set(DLF_FILE_IO_CACHE_ENABLED, cacheEnabled); + if (cachePolicy != null) { + options.set(DLF_FILE_IO_CACHE_POLICY, cachePolicy); + } + if (setRpcAddress) { + options.set(JINDO_CACHE_RPC_ADDRESS, "test-rpc-address:8080"); + } + if (cacheWhitelist != null) { + options.set(DLF_FILE_IO_CACHE_WHITELIST_PATH, cacheWhitelist); + } + return CatalogContext.create(options); + } + + private boolean getCacheFlag(HadoopCompliantFileIO fileIO, String fieldName) { + try { + Field field = HadoopCompliantFileIO.class.getDeclaredField(fieldName); + field.setAccessible(true); + return field.getBoolean(fileIO); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to access field: " + fieldName, e); + } + } + + private void verifyCacheFlags( + HadoopCompliantFileIO fileIO, + boolean expectedMeta, + boolean expectedRead, + boolean expectedWrite) { + assertThat(getCacheFlag(fileIO, "metaCacheEnabled")).isEqualTo(expectedMeta); + assertThat(getCacheFlag(fileIO, "readCacheEnabled")).isEqualTo(expectedRead); + assertThat(getCacheFlag(fileIO, "writeCacheEnabled")).isEqualTo(expectedWrite); + } + + @Test + public void testCacheEnabledWithMetaPolicy() { + CatalogContext context = createCatalogContext(true, "meta", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, true, false, false); + } + + @Test + public void testCacheEnabledWithReadPolicy() { + CatalogContext context = createCatalogContext(true, "read", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, true, false); + } + + @Test + public void testCacheEnabledWithWritePolicy() { + CatalogContext context = createCatalogContext(true, "write", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, false, true); + } + + @Test + public void testCacheEnabledWithCombinedPolicy() { + CatalogContext context = createCatalogContext(true, "meta,read,write", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, true, true, true); + } + + @Test + public void testCacheDisabledWhenEnabledFalse() { + CatalogContext context = createCatalogContext(false, "meta,read,write", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, false, false); + } + + @Test + public void testCacheDisabledWhenPolicyNone() { + CatalogContext context = createCatalogContext(true, "none", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, false, false); + } + + @Test + public void testCacheDisabledWhenRpcAddressMissing() { + CatalogContext context = createCatalogContext(true, "meta,read,write", false, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, false, false); + } + + @Test + public void testCacheDisabledWhenPolicyNull() { + CatalogContext context = createCatalogContext(true, null, true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + + verifyCacheFlags(fileIO, false, false, false); + } + + @Test + public void testCacheWhitelist() { + // default config + CatalogContext context = createCatalogContext(true, "meta,read,write", true, null); + JindoFileIO fileIO = new JindoFileIO(); + fileIO.configure(context); + assertThat( + fileIO.shouldCache( + new Path("oss://test-bucket/database/table/bucket-0/file.orc"))) + .isTrue(); + assertThat( + fileIO.shouldCache( + new Path("oss://test-bucket/database/table/manifest/manifest-111"))) + .isTrue(); + assertThat( + fileIO.shouldCache( + new Path("oss://test-bucket/database/table/snapshot/snapshot-1"))) + .isFalse(); + + // set whitelist config as * + context = createCatalogContext(true, "meta,read,write", true, "*"); + fileIO = new JindoFileIO(); + fileIO.configure(context); + assertThat( + fileIO.shouldCache( + new Path("oss://test-bucket/database/table/snapshot/snapshot-1"))) + .isTrue(); + assertThat(fileIO.shouldCache(new Path("oss://test-bucket/database/table/dir/file"))) + .isTrue(); + } +} From de4693ac381bd47a485ddceec669aabb95b566cb Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 14 Jan 2026 20:42:13 +0800 Subject: [PATCH 8/8] Refactor some config naming --- .../paimon/rest/RESTCatalogOptions.java | 12 ++++---- .../paimon/jindo/HadoopCompliantFileIO.java | 30 +++++++++---------- .../paimon/jindo/TestJindoCacheEnable.java | 12 ++++---- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 876dba4969a9..f12eb12501cc 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -104,20 +104,20 @@ public class RESTCatalogOptions { .noDefaultValue() .withDescription("REST Catalog DLF OSS endpoint."); - public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = - ConfigOptions.key("dlf.io-cache-enabled") + public static final ConfigOption IO_CACHE_ENABLED = + ConfigOptions.key("io-cache.enabled") .booleanType() .defaultValue(false) .withDescription( "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); - public static final ConfigOption DLF_FILE_IO_CACHE_WHITELIST_PATH = - ConfigOptions.key("dlf.io-cache.whitelist-path") + public static final ConfigOption IO_CACHE_WHITELIST_PATH = + ConfigOptions.key("io-cache.whitelist-path") .stringType() .defaultValue("bucket-,manifest") .withDescription( "Cache is only applied to paths which contain the specified pattern, and * means all paths."); - public static final ConfigOption DLF_FILE_IO_CACHE_POLICY = - ConfigOptions.key("dlf.io-cache.policy") + public static final ConfigOption IO_CACHE_POLICY = + ConfigOptions.key("io-cache.policy") .stringType() .noDefaultValue() .withDescription( diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 3f0e7226ad28..7d4e34894fa3 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -44,9 +44,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_POLICY; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_POLICY; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_WHITELIST_PATH; /** * Hadoop {@link FileIO}. @@ -90,9 +90,13 @@ boolean shouldCache(Path path) { @Override public void configure(CatalogContext context) { // Process file io cache configuration - if (!context.options().get(DLF_FILE_IO_CACHE_ENABLED) - || context.options().get(DLF_FILE_IO_CACHE_POLICY) == null - || context.options().get(DLF_FILE_IO_CACHE_POLICY).contains(DISABLE_CACHE_TAG)) { + if (!context.options().get(IO_CACHE_ENABLED) + || context.options().get(IO_CACHE_POLICY) == null + || context.options().get(IO_CACHE_POLICY).contains(DISABLE_CACHE_TAG)) { + LOG.debug( + "Cache is disabled with io-cache.enabled={}, io-cache.policy={}", + context.options().get(IO_CACHE_ENABLED), + context.options().get(IO_CACHE_POLICY)); return; } // Enable file io cache @@ -101,18 +105,12 @@ public void configure(CatalogContext context) { "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); } else { metaCacheEnabled = - context.options() - .get(DLF_FILE_IO_CACHE_POLICY) - .contains(META_CACHE_ENABLED_TAG); + context.options().get(IO_CACHE_POLICY).contains(META_CACHE_ENABLED_TAG); readCacheEnabled = - context.options() - .get(DLF_FILE_IO_CACHE_POLICY) - .contains(READ_CACHE_ENABLED_TAG); + context.options().get(IO_CACHE_POLICY).contains(READ_CACHE_ENABLED_TAG); writeCacheEnabled = - context.options() - .get(DLF_FILE_IO_CACHE_POLICY) - .contains(WRITE_CACHE_ENABLED_TAG); - String whitelist = context.options().get(DLF_FILE_IO_CACHE_WHITELIST_PATH); + context.options().get(IO_CACHE_POLICY).contains(WRITE_CACHE_ENABLED_TAG); + String whitelist = context.options().get(IO_CACHE_WHITELIST_PATH); if (!whitelist.equals("*")) { cacheWhitelistPaths = Lists.newArrayList(whitelist.split(",")); } diff --git a/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java index 7e1db5b6a172..19a3da7bd533 100644 --- a/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java +++ b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoCacheEnable.java @@ -26,9 +26,9 @@ import java.lang.reflect.Field; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_POLICY; -import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_POLICY; +import static org.apache.paimon.rest.RESTCatalogOptions.IO_CACHE_WHITELIST_PATH; import static org.assertj.core.api.Assertions.assertThat; /** Test for Jindo cache enable configuration. */ @@ -42,15 +42,15 @@ private CatalogContext createCatalogContext( boolean setRpcAddress, String cacheWhitelist) { Options options = new Options(); - options.set(DLF_FILE_IO_CACHE_ENABLED, cacheEnabled); + options.set(IO_CACHE_ENABLED, cacheEnabled); if (cachePolicy != null) { - options.set(DLF_FILE_IO_CACHE_POLICY, cachePolicy); + options.set(IO_CACHE_POLICY, cachePolicy); } if (setRpcAddress) { options.set(JINDO_CACHE_RPC_ADDRESS, "test-rpc-address:8080"); } if (cacheWhitelist != null) { - options.set(DLF_FILE_IO_CACHE_WHITELIST_PATH, cacheWhitelist); + options.set(IO_CACHE_WHITELIST_PATH, cacheWhitelist); } return CatalogContext.create(options); }