diff --git a/deploy/docker/venice/docker-compose-single-dc-setup.yaml b/deploy/docker/venice/docker-compose-single-dc-setup.yaml
index ce9ecc72..9c979b68 100644
--- a/deploy/docker/venice/docker-compose-single-dc-setup.yaml
+++ b/deploy/docker/venice/docker-compose-single-dc-setup.yaml
@@ -29,7 +29,7 @@ services:
retries: 5
venice-controller:
- image: venicedb/venice-controller:0.4.340
+ image: venicedb/venice-controller:0.4.858
container_name: venice-controller
hostname: venice-controller
depends_on:
@@ -45,7 +45,7 @@ services:
retries: 5
venice-server:
- image: venicedb/venice-server:0.4.340
+ image: venicedb/venice-server:0.4.858
container_name: venice-server
hostname: venice-server
depends_on:
@@ -59,7 +59,7 @@ services:
retries: 5
venice-router:
- image: venicedb/venice-router:0.4.340
+ image: venicedb/venice-router:0.4.858
container_name: venice-router
hostname: venice-router
depends_on:
@@ -75,7 +75,7 @@ services:
retries: 5
venice-client:
- image: venicedb/venice-client:0.4.340
+ image: venicedb/venice-client:0.4.858
container_name: venice-client
hostname: venice-client
tty: true
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 6641eeba..c7a5a921 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -42,6 +42,6 @@ slf4j-simple = "org.slf4j:slf4j-simple:2.0.11"
slf4j-api = "org.slf4j:slf4j-api:2.0.11"
sqlline = "sqlline:sqlline:1.12.0"
quidem = "net.hydromatic:quidem:0.11"
-venice = "com.linkedin.venice:venice-common:0.4.697"
-venice-client = "com.linkedin.venice:venice-thin-client:0.4.697"
+venice = "com.linkedin.venice:venice-common:0.4.858"
+venice-client = "com.linkedin.venice:venice-thin-client:0.4.858"
yaml = "org.yaml:snakeyaml:1.33"
diff --git a/hoptimator-logical/src/test/resources/logical-graph.id b/hoptimator-logical/src/test/resources/logical-graph.id
index 3da39c81..d4c898aa 100644
--- a/hoptimator-logical/src/test/resources/logical-graph.id
+++ b/hoptimator-logical/src/test/resources/logical-graph.id
@@ -7,41 +7,41 @@
# externals nest in their respective tier subgraphs.
# ─────────────────────────────────────────────────────────────────────────────
-create or replace table "LOGICAL"."testevent" ("KEY" varchar, "memberId" bigint, "pageKey" varchar);
+create or replace table "LOGICAL"."testevent-graph" ("KEY" varchar, "memberId" bigint, "pageKey" varchar);
(0 rows modified)
!update
flowchart TD
- subgraph n0["LogicalTable logical-testevent"]
+ subgraph n0["LogicalTable logical-testevent-graph"]
direction LR
subgraph s_nearline["nearline"]
- n2[("KAFKA.testevent")]
+ n2[("KAFKA.testevent-graph")]
end
subgraph s_online["online"]
- n3[("VENICE.testevent")]
+ n3[("VENICE.testevent-graph")]
end
- n1[/"logical-testevent-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
+ n1[/"logical-testevent-graph-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
end
n2 --> n1
n1 --> n3
-!graph LOGICAL.testevent
+!graph LOGICAL.testevent-graph
flowchart LR
- n0[("KAFKA.testevent")]
- n1[/"logical-testevent-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
- n2[("VENICE.testevent")]
+ n0[("KAFKA.testevent-graph")]
+ n1[/"logical-testevent-graph-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
+ n2[("VENICE.testevent-graph")]
n0 --> n1
n1 --> n2
-!graph KAFKA.testevent
+!graph KAFKA.testevent-graph
flowchart LR
- n0[("VENICE.testevent")]
- n1[/"logical-testevent-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
- n2[("KAFKA.testevent")]
+ n0[("VENICE.testevent-graph")]
+ n1[/"logical-testevent-graph-nearline-to-online
kind: FlinkSessionJob
engine: Flink"/]
+ n2[("KAFKA.testevent-graph")]
n2 --> n1
n1 --> n0
-!graph VENICE.testevent
+!graph VENICE.testevent-graph
# ─────────────────────────────────────────────────────────────────────────────
# LogicalTable with an offline tier + auto-spawned backfill trigger
@@ -92,7 +92,7 @@ flowchart LR
n2 -.-> n3
!graph ADS.MEMBERS
-drop table "LOGICAL"."testevent";
+drop table "LOGICAL"."testevent-graph";
(0 rows modified)
!update
diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java
index fbd1eb32..cfc374b1 100644
--- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java
+++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java
@@ -4,10 +4,10 @@
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
-import com.linkedin.venice.controllerapi.ControllerClient;
-import com.linkedin.venice.controllerapi.ControllerClientFactory;
-import com.linkedin.venice.controllerapi.ControllerResponse;
-import com.linkedin.venice.exceptions.ErrorType;
+import com.linkedin.venice.client.store.RouterBasedStoreMetadataFetcher;
+import com.linkedin.venice.client.store.StoreMetadataFetcher;
+import com.linkedin.venice.client.store.transport.TransportClient;
+import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
import org.apache.calcite.schema.Table;
@@ -19,11 +19,14 @@
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.linkedin.hoptimator.util.DeploymentService.parseHints;
@@ -33,27 +36,29 @@ public class ClusterSchema extends AbstractSchema {
protected static final String SSL_FACTORY_CLASS_NAME = "ssl.factory.class.name";
protected static final String DEFAULT_SSL_FACTORY_CLASS_NAME = "com.linkedin.venice.security.DefaultSSLFactory";
- protected final Properties properties;
+ protected static final String ROUTER_URL = "router.url";
+ protected static final String SSL_CONFIG_PATH = "ssl-config-path";
private static final String STORE_HINT_KEY_PREFIX = "venice.%s.";
+ private static final String DISCOVER_CLUSTER_PATH = "discover_cluster";
+ private static final long DISCOVERY_TIMEOUT_SECONDS = 5;
+
+ protected final Properties properties;
private final LazyReference> tables = new LazyReference<>();
public ClusterSchema(Properties properties) {
this.properties = properties;
}
- protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
- String routerUrl = properties.getProperty("router.url");
- if (routerUrl.contains("localhost")) {
- return new LocalControllerClient(cluster, routerUrl, sslFactory);
- } else {
- return ControllerClientFactory.getControllerClient(cluster, routerUrl, sslFactory);
- }
+ protected StoreMetadataFetcher createStoreMetadataFetcher() {
+ return new RouterBasedStoreMetadataFetcher(
+ new ClientConfig<>()
+ .setVeniceURL(properties.getProperty(ROUTER_URL)));
}
protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
return ClientFactory.createStoreSchemaFetcher(
ClientConfig.defaultGenericClientConfig(storeName)
- .setVeniceURL(properties.getProperty("router.url")));
+ .setVeniceURL(properties.getProperty(ROUTER_URL)));
}
protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) {
@@ -78,15 +83,14 @@ public Lookup tables() {
@Override
protected Map loadAll() throws Exception {
Map tableMap = new HashMap<>();
- String clusterStr = properties.getProperty("clusters");
- String[] clusters = clusterStr.split(",");
-
- for (String cluster : clusters) {
- try (ControllerClient controllerClient = createControllerClient(cluster, getSslFactory())) {
- String[] stores = controllerClient.queryStoreList(false).getStores();
- for (String store : stores) {
- StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
- tableMap.put(store, createVeniceStore(store, storeSchemaFetcher));
+ try (StoreMetadataFetcher fetcher = createStoreMetadataFetcher()) {
+ Set storeNames = fetcher.getAllStoreNames();
+ log.info("Discovered {} Venice stores via router /stores endpoint", storeNames.size());
+ for (String storeName : storeNames) {
+ try {
+ tableMap.put(storeName, createVeniceStore(storeName, createStoreSchemaFetcher(storeName)));
+ } catch (Exception e) {
+ log.warn("Skipping Venice store {} due to setup failure", storeName, e);
}
}
}
@@ -95,47 +99,49 @@ protected Map loadAll() throws Exception {
@Override
protected @Nullable Table load(String name) throws Exception {
- String clusterStr = properties.getProperty("clusters");
- String[] clusters = clusterStr.split(",");
-
- try (ControllerClient controllerClient = createControllerClient(clusters[0], getSslFactory())) {
- ControllerResponse controllerResponse = controllerClient.discoverCluster(name);
- if (controllerResponse.isError() && controllerResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) {
- return null;
- } else if (controllerResponse.isError()) {
- throw new RuntimeException(String.format("Error fetching store errorType=%s, error=%s",
- controllerResponse.getErrorType(), controllerResponse.getError()));
- }
-
- // Venice does not currently have the ability to list all clusters so the "clusters" property
- // is required as part of the JDBC driver. To keep loadTable and loadAllTables consistent, we
- // check that the fetched store actually belongs to one of these clusters, otherwise we could end up
- // with different results.
- if (!Arrays.asList(clusters).contains(controllerResponse.getCluster())) {
- return null;
- }
+ if (!storeExists(name)) {
+ return null;
}
-
- StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(name);
- return createVeniceStore(name, storeSchemaFetcher);
+ return createVeniceStore(name, createStoreSchemaFetcher(name));
}
@Override
protected String getDescription() {
- return "Venice clusters " + properties.getProperty("clusters");
- }
-
- private Optional getSslFactory() throws IOException {
- String sslConfigPath = properties.getProperty("ssl-config-path");
- if (sslConfigPath != null) {
- log.debug("Using ssl configs at {}", sslConfigPath);
- Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath);
- String sslFactoryClassName =
- sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME);
- return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
- }
- return Optional.empty();
+ return "Venice router " + properties.getProperty(ROUTER_URL);
}
});
}
+
+ /**
+ * Targeted per-store existence check against the router's {@code discover_cluster} endpoint
+ * over HTTP/HTTPS. A {@code null} response indicates HTTP 404 per the transport callback
+ * contract, which means the store does not exist.
+ */
+ protected boolean storeExists(String storeName) throws IOException {
+ try (TransportClient transport = ClientFactory.getTransportClient(
+ new ClientConfig<>()
+ .setVeniceURL(properties.getProperty(ROUTER_URL))
+ .setSslFactory(getSslFactory().orElse(null)))) {
+ TransportClientResponse response = transport.get(DISCOVER_CLUSTER_PATH + "/" + storeName)
+ .get(DISCOVERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ return response != null;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while discovering Venice store " + storeName, e);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new IOException("Failed to discover Venice store " + storeName, e);
+ }
+ }
+
+ protected Optional getSslFactory() throws IOException {
+ String sslConfigPath = properties.getProperty(SSL_CONFIG_PATH);
+ if (sslConfigPath != null) {
+ log.debug("Using ssl configs at {}", sslConfigPath);
+ Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath);
+ String sslFactoryClassName =
+ sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME);
+ return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
+ }
+ return Optional.empty();
+ }
}
diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
index c5b62272..b5040884 100644
--- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
+++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
@@ -1,23 +1,27 @@
package com.linkedin.hoptimator.venice;
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
-import com.linkedin.venice.controllerapi.ControllerClient;
-import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
-import com.linkedin.venice.controllerapi.MultiStoreResponse;
-import com.linkedin.venice.exceptions.ErrorType;
-import com.linkedin.venice.security.SSLFactory;
+import com.linkedin.venice.client.store.ClientConfig;
+import com.linkedin.venice.client.store.ClientFactory;
+import com.linkedin.venice.client.store.StoreMetadataFetcher;
+import com.linkedin.venice.client.store.transport.TransportClient;
+import com.linkedin.venice.client.store.transport.TransportClientResponse;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.IOException;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -25,36 +29,45 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ClusterSchemaTest {
- @Mock
- private ControllerClient mockControllerClient;
-
@Mock
private StoreSchemaFetcher mockSchemaFetcher;
@Mock
private VeniceStore mockVeniceStore;
+ @Mock
+ private MockedStatic clientFactory;
+
private Properties properties;
@BeforeEach
void setUp() {
properties = new Properties();
properties.setProperty("router.url", "http://localhost:1234");
- properties.setProperty("clusters", "test-cluster");
}
- private ClusterSchema createTestableSchema() {
+ private ClusterSchema createTestableSchema(StoreMetadataFetcher metadataFetcher) {
+ return createTestableSchema(metadataFetcher, Set.of());
+ }
+
+ private ClusterSchema createTestableSchema(StoreMetadataFetcher metadataFetcher, Set existingStores) {
return new ClusterSchema(properties) {
@Override
- protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
- return mockControllerClient;
+ protected StoreMetadataFetcher createStoreMetadataFetcher() {
+ return metadataFetcher;
}
@Override
@@ -66,9 +79,16 @@ protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) {
return mockVeniceStore;
}
+
+ @Override
+ protected boolean storeExists(String storeName) {
+ return existingStores.contains(storeName);
+ }
};
}
+ // --- filterStoreHints ---
+
@Test
void testFilterStoreHintsReturnsMatchingHints() {
ClusterSchema schema = new ClusterSchema(properties);
@@ -99,149 +119,86 @@ void testFilterStoreHintsReturnsEmptyWhenNoMatch() {
assertTrue(result.isEmpty());
}
- // --- tables().get() / loadTable() tests ---
+ // --- tables().get() (load path uses storeExists, NOT the bulk /stores endpoint) ---
@Test
- void testLoadTableReturnsStoreWhenFound() {
- D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class);
- when(discoverResponse.isError()).thenReturn(false);
- when(discoverResponse.getCluster()).thenReturn("test-cluster");
- when(mockControllerClient.discoverCluster("myStore")).thenReturn(discoverResponse);
+ void testLoadReturnsTableWhenStoreFoundInDiscovery() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
- ClusterSchema schema = createTestableSchema();
- Lookup tables = schema.tables();
- Table result = tables.get("myStore");
+ ClusterSchema schema = createTestableSchema(fetcher, Set.of("known-store"));
+ Table result = schema.tables().get("known-store");
assertNotNull(result);
assertEquals(mockVeniceStore, result);
}
@Test
- void testLoadTableReturnsNullWhenStoreNotFound() {
- D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class);
- when(discoverResponse.isError()).thenReturn(true);
- when(discoverResponse.getErrorType()).thenReturn(ErrorType.STORE_NOT_FOUND);
- when(mockControllerClient.discoverCluster("unknownStore")).thenReturn(discoverResponse);
+ void testLoadReturnsNullWhenStoreNotFoundInDiscovery() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
- ClusterSchema schema = createTestableSchema();
- Lookup tables = schema.tables();
- Table result = tables.get("unknownStore");
+ ClusterSchema schema = createTestableSchema(fetcher, Set.of());
+ Table result = schema.tables().get("nonexistent-store");
assertNull(result);
}
@Test
- void testLoadTableThrowsOnOtherError() {
- D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class);
- when(discoverResponse.isError()).thenReturn(true);
- when(discoverResponse.getErrorType()).thenReturn(ErrorType.GENERAL_ERROR);
- when(discoverResponse.getError()).thenReturn("something went wrong");
- when(mockControllerClient.discoverCluster("badStore")).thenReturn(discoverResponse);
-
- ClusterSchema schema = createTestableSchema();
- Lookup tables = schema.tables();
+ void testLoadDoesNotHitMetadataFetcher() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ ClusterSchema schema = createTestableSchema(fetcher, Set.of("a", "b", "c"));
- RuntimeException ex = assertThrows(RuntimeException.class, () -> tables.get("badStore"));
- assertTrue(ex.getMessage().contains("badStore"));
- }
+ schema.tables().get("a");
+ schema.tables().get("b");
+ schema.tables().get("c");
- @Test
- void testLoadTableReturnsNullWhenStoreInDifferentCluster() {
- D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class);
- when(discoverResponse.isError()).thenReturn(false);
- when(discoverResponse.getCluster()).thenReturn("other-cluster");
- when(mockControllerClient.discoverCluster("remoteStore")).thenReturn(discoverResponse);
-
- ClusterSchema schema = createTestableSchema();
- Lookup tables = schema.tables();
- Table result = tables.get("remoteStore");
-
- assertNull(result);
+ // load() uses targeted per-store discovery (storeExists), never the bulk /stores endpoint.
+ verify(fetcher, never()).getAllStoreNames();
}
- // --- loadAllTables tests ---
+ // --- tables().getNames(...) (loadAll path) ---
@Test
- void testLoadAllTablesPopulatesFromStoreList() {
- MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class);
- when(storeListResponse.getStores()).thenReturn(new String[]{"store1", "store2"});
- when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse);
+ void testLoadAllTablesPopulatesFromRouterStores() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ when(fetcher.getAllStoreNames()).thenReturn(Set.of("store1", "store2"));
- ClusterSchema schema = createTestableSchema();
+ ClusterSchema schema = createTestableSchema(fetcher);
Lookup tables = schema.tables();
- // getNames triggers loadAllTables
- assertNotNull(tables.getNames(LikePattern.any()));
- }
-
- // --- createControllerClient tests ---
-
- @Test
- void testCreateControllerClientWithLocalhostUrl() {
- properties.setProperty("router.url", "http://localhost:5555");
- ClusterSchema schema = new ClusterSchema(properties);
- ControllerClient client = schema.createControllerClient("test-cluster", Optional.empty());
- assertNotNull(client);
- client.close();
+ Set names = tables.getNames(LikePattern.any());
+ assertEquals(Set.of("store1", "store2"), names);
}
@Test
- void testCreateControllerClientWithNonLocalhostUrl() {
- // non-localhost URL takes the else branch → ControllerClientFactory path
- // We can't easily call ControllerClientFactory in unit tests, so override it in a subclass
- properties.setProperty("router.url", "http://venice.example.com:5555");
- ClusterSchema schema = new ClusterSchema(properties) {
- @Override
- protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
- // verify the non-localhost branch would be reached (url does not contain localhost)
- assertFalse(properties.getProperty("router.url").contains("localhost"));
- return mockControllerClient;
- }
- };
- ControllerClient client = schema.createControllerClient("test-cluster", Optional.empty());
- assertNotNull(client);
- }
+ void testLoadAllTablesAccessibleViaGet() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ when(fetcher.getAllStoreNames()).thenReturn(Set.of("storeA", "storeB"));
- // --- getSchemaDescription() must return non-empty string ---
- @Test
- void testLoadAllTablesSchemaDescriptionIsNonEmpty() {
- MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class);
- when(storeListResponse.getStores()).thenReturn(new String[]{"store1"});
- when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse);
-
- ClusterSchema schema = createTestableSchema();
+ ClusterSchema schema = createTestableSchema(fetcher);
Lookup tables = schema.tables();
- // getNames(LikePattern.any()) triggers loadAllTables and calls getSchemaDescription internally
- Iterable names = tables.getNames(LikePattern.any());
- assertNotNull(names);
+ tables.getNames(LikePattern.any());
- // Verify description via the toString contains cluster info
- // The description returned is "Venice clusters test-cluster"
- assertNotNull(tables.toString()); // non-null confirms the table lookup was constructed
+ assertNotNull(tables.get("storeA"));
+ assertNotNull(tables.get("storeB"));
}
- // --- getSslFactory() no ssl-config-path → returns empty Optional ---
-
@Test
- void testGetSslFactoryReturnsEmptyWhenNoSslConfigPath() {
- // Properties without ssl-config-path: getSslFactory() should return Optional.empty()
- // We verify this indirectly: loadAllTables() completes without throwing
- MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class);
- when(storeListResponse.getStores()).thenReturn(new String[]{"store1"});
- when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse);
-
- // properties does NOT have ssl-config-path
+ void testLoadAllSkipsStoreWhenSchemaFetcherConstructionFails() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ when(fetcher.getAllStoreNames()).thenReturn(Set.of("good-store", "bad-store"));
+
ClusterSchema schema = new ClusterSchema(properties) {
@Override
- protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
- // ssl-config-path absent → sslFactory must be empty
- assertFalse(sslFactory.isPresent());
- return mockControllerClient;
+ protected StoreMetadataFetcher createStoreMetadataFetcher() {
+ return fetcher;
}
@Override
protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
+ if ("bad-store".equals(storeName)) {
+ throw new RuntimeException("schema fetcher setup failed");
+ }
return mockSchemaFetcher;
}
@@ -249,59 +206,118 @@ protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) {
return mockVeniceStore;
}
+
+ @Override
+ protected boolean storeExists(String storeName) {
+ return true;
+ }
};
Lookup tables = schema.tables();
- Iterable names = tables.getNames(LikePattern.any());
- assertNotNull(names);
+ tables.getNames(LikePattern.any());
+
+ assertNotNull(tables.get("good-store"));
+ assertNull(tables.get("bad-store"));
+ }
+
+ @Test
+ void testGetNamesTriggersMetadataFetcherOnce() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ when(fetcher.getAllStoreNames()).thenReturn(Set.of("a", "b"));
+
+ ClusterSchema schema = createTestableSchema(fetcher);
+ schema.tables().getNames(LikePattern.any());
+ schema.tables().getNames(LikePattern.any());
+
+ // loadAll() runs at most once per schema lifetime (LazyLookup caches the result).
+ verify(fetcher).getAllStoreNames();
+ }
+
+ // --- storeExists (real impl, HTTP/HTTPS transport) ---
+
+ @Test
+ void testStoreExistsReturnsTrueOnNonNullResponse() throws Exception {
+ TransportClient transport = mock(TransportClient.class);
+ TransportClientResponse response = mock(TransportClientResponse.class);
+ doReturn(CompletableFuture.completedFuture(response)).when(transport).get(eq("discover_cluster/known"));
+ clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport);
+
+ ClusterSchema schema = new ClusterSchema(properties);
+ assertTrue(schema.storeExists("known"));
+ }
+
+ @Test
+ void testStoreExistsReturnsFalseOnNullResponse() throws Exception {
+ TransportClient transport = mock(TransportClient.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(transport).get(eq("discover_cluster/missing"));
+ clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport);
+
+ ClusterSchema schema = new ClusterSchema(properties);
+ assertFalse(schema.storeExists("missing"));
+ }
+
+ @Test
+ void testStoreExistsWrapsExecutionFailureAsIOException() {
+ TransportClient transport = mock(TransportClient.class);
+ CompletableFuture failed = new CompletableFuture<>();
+ failed.completeExceptionally(new RuntimeException("router down"));
+ doReturn(failed).when(transport).get(eq("discover_cluster/boom"));
+ clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport);
+
+ ClusterSchema schema = new ClusterSchema(properties);
+ IOException ex = assertThrows(IOException.class, () -> schema.storeExists("boom"));
+ assertTrue(ex.getMessage().contains("boom"));
}
- // --- getSslFactory() with invalid ssl-config-path → throws IOException during loadAllTables ---
+ // --- getSslFactory ---
+
+ @Test
+ void testGetSslFactoryReturnsEmptyWhenNoSslConfigPath() throws IOException {
+ ClusterSchema schema = new ClusterSchema(properties);
+ assertTrue(schema.getSslFactory().isEmpty());
+ }
@Test
void testGetSslFactoryThrowsWhenSslConfigPathInvalid() {
properties.setProperty("ssl-config-path", "/nonexistent/ssl.properties");
-
- // No need to override createControllerClient — exception happens before reaching it
ClusterSchema schema = new ClusterSchema(properties);
- Lookup tables = schema.tables();
- // getNames triggers loadAllTables which calls getSslFactory → IOException
- assertThrows(RuntimeException.class, () -> tables.getNames(LikePattern.any()));
+ assertThrows(Exception.class, schema::getSslFactory);
}
- // --- loadAllTables() result must be non-empty when stores are registered ---
+ // --- description ---
@Test
- void testLoadAllTablesReturnsNonEmptyMapWhenStoresExist() {
- MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class);
- when(storeListResponse.getStores()).thenReturn(new String[]{"storeA", "storeB"});
- when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse);
+ void testTablesDescriptionMentionsRouterUrl() {
+ StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class);
+ lenient().when(fetcher.getAllStoreNames()).thenReturn(Set.of("storeX"));
- ClusterSchema schema = createTestableSchema();
+ ClusterSchema schema = createTestableSchema(fetcher);
Lookup tables = schema.tables();
- // Trigger loadAllTables
- Iterable names = tables.getNames(LikePattern.any());
- assertNotNull(names);
+ assertNotNull(tables.toString());
+ }
- // Verify that both stores are accessible via get()
- assertNotNull(tables.get("storeA"));
- assertNotNull(tables.get("storeB"));
+ // --- real ClientFactory wiring (createStoreSchemaFetcher) ---
+
+ @Test
+ void testCreateStoreSchemaFetcherBuildsClientConfigFromRouterUrl() {
+ ArgumentCaptor captor = ArgumentCaptor.forClass(ClientConfig.class);
+ clientFactory.when(() -> ClientFactory.createStoreSchemaFetcher(captor.capture())).thenReturn(mockSchemaFetcher);
+
+ ClusterSchema schema = new ClusterSchema(properties);
+ assertNotNull(schema.createStoreSchemaFetcher("myStore"));
+ assertEquals("myStore", captor.getValue().getStoreName());
+ assertEquals("http://localhost:1234", captor.getValue().getVeniceURL());
}
- // --- createVeniceStore with real hints ---
+ // --- createVeniceStore wires hints ---
@Test
void testCreateVeniceStoreWithFilteredHints() {
properties.setProperty("hints", "venice.myStore.valueSchemaId=5");
ClusterSchema schema = new ClusterSchema(properties) {
- @Override
- protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
- return mockControllerClient;
- }
-
@Override
protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
return mockSchemaFetcher;