diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cdcf934fc..50fe6d1b1 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -78,7 +78,8 @@ jobs: strategy: fail-fast: false matrix: - WEAVIATE_VERSION: ["1.32.24", "1.33.11", "1.34.7", "1.35.2"] + WEAVIATE_VERSION: + ["1.32.24", "1.33.11", "1.34.7", "1.35.2", "1.36.0-rc.0"] steps: - uses: actions/checkout@v4 diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index 0f463e96f..c12bf9f6e 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -26,7 +26,7 @@ public class Weaviate extends WeaviateContainer { public static final String DOCKER_IMAGE = "semitechnologies/weaviate"; - public static final String LATEST_VERSION = Version.V135.semver.toString(); + public static final String LATEST_VERSION = Version.latest().semver.toString(); public static final String VERSION; static { @@ -41,7 +41,8 @@ public enum Version { V132(1, 32, 24), V133(1, 33, 11), V134(1, 34, 7), - V135(1, 35, 2); + V135(1, 35, 2), + V136(1, 36, "0-rc.0"); public final SemanticVersion semver; @@ -49,9 +50,21 @@ private Version(int major, int minor, int patch) { this.semver = new SemanticVersion(major, minor, patch); } + private Version(int major, int minor, String patch) { + this.semver = new SemanticVersion(major, minor, patch); + } + public void orSkip() { ConcurrentTest.requireAtLeast(this); } + + public static Version latest() { + Version[] versions = Version.class.getEnumConstants(); + if (versions == null) { + throw new IllegalStateException("No versions are defined"); + } + return versions[versions.length - 1]; + } } /** diff --git a/src/it/java/io/weaviate/integration/CollectionsITest.java b/src/it/java/io/weaviate/integration/CollectionsITest.java index 8d68bb1c6..a56f6e605 100644 --- a/src/it/java/io/weaviate/integration/CollectionsITest.java +++ b/src/it/java/io/weaviate/integration/CollectionsITest.java @@ -18,6 +18,7 @@ import io.weaviate.client6.v1.api.collections.Quantization; import io.weaviate.client6.v1.api.collections.ReferenceProperty; import io.weaviate.client6.v1.api.collections.Replication; +import io.weaviate.client6.v1.api.collections.Replication.AsyncReplicationConfig; import io.weaviate.client6.v1.api.collections.VectorConfig; import io.weaviate.client6.v1.api.collections.config.Shard; import io.weaviate.client6.v1.api.collections.config.ShardStatus; @@ -328,4 +329,53 @@ public void test_objectTtl() throws IOException { .extracting(CollectionConfig::objectTtl).isNotNull() .returns(false, ObjectTtl::enabled); } + + @Test + public void test_asyncReplicationConfig() throws IOException { + Weaviate.Version.latest().orSkip(); + + // Arrange + var nsThings = ns("Things"); + + // Act + var things = client.collections.create(nsThings, + c -> c.replication(Replication.of( + repl -> repl + .asyncEnabled(true) + .asyncReplication(AsyncReplicationConfig.of( + async -> async + .hashTreeHeight(1) + .replicationConcurrency(2) + .replicationFrequencyMillis(3) + .replicationFrequencyMillisWhilePropagating(4) + .nodePingFrequencyMillis(5) + .loggingFrequencyMillis(6) + .diffBatchSize(7) + .diffPerNodeTimeoutSeconds(8) + .prePropagationTimeoutSeconds(9) + .propagationTimeoutSeconds(10) + .propagationDelayMillis(11) + .propagationLimit(12) + .propagationConcurrency(13) + .propagationBatchSize(14)))))); + + // Assert + Assertions.assertThat(things.config.get()).get() + .extracting(CollectionConfig::replication) + .extracting(Replication::asyncReplicationConfig) + .returns(1, AsyncReplicationConfig::hashTreeHeight) + .returns(2, AsyncReplicationConfig::replicationConcurrency) + .returns(3, AsyncReplicationConfig::replicationFrequencyMillis) + .returns(4, AsyncReplicationConfig::replicationFrequencyMillisWhilePropagating) + .returns(5, AsyncReplicationConfig::nodePingFrequencyMillis) + .returns(6, AsyncReplicationConfig::loggingFrequencyMillis) + .returns(7, AsyncReplicationConfig::diffBatchSize) + .returns(8, AsyncReplicationConfig::diffPerNodeTimeoutSeconds) + .returns(9, AsyncReplicationConfig::prePropagationTimeoutSeconds) + .returns(10, AsyncReplicationConfig::propagationTimeoutSeconds) + .returns(11, AsyncReplicationConfig::propagationDelayMillis) + .returns(12, AsyncReplicationConfig::propagationLimit) + .returns(13, AsyncReplicationConfig::propagationConcurrency) + .returns(14, AsyncReplicationConfig::propagationBatchSize); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/InvertedIndex.java b/src/main/java/io/weaviate/client6/v1/api/collections/InvertedIndex.java index ad2fc17f6..a350f3de0 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/InvertedIndex.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/InvertedIndex.java @@ -74,7 +74,7 @@ public Bm25 build() { } } - public record Stopwords( + public static record Stopwords( /** Selected preset. */ @SerializedName("preset") String preset, /** Custom words added to the selected preset. */ diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java b/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java index a01af272c..5ed8b9689 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java @@ -9,7 +9,8 @@ public record Replication( @SerializedName("factor") Integer replicationFactor, @SerializedName("asyncEnabled") Boolean asyncEnabled, - @SerializedName("deletionStrategy") DeletionStrategy deletionStrategy) { + @SerializedName("deletionStrategy") DeletionStrategy deletionStrategy, + @SerializedName("asyncConfig") AsyncReplicationConfig asyncReplicationConfig) { public static enum DeletionStrategy { @SerializedName("NoAutomatedResolution") @@ -28,13 +29,168 @@ public Replication(Builder builder) { this( builder.replicationFactor, builder.asyncEnabled, - builder.deletionStrategy); + builder.deletionStrategy, + builder.asyncReplicationConfig); + } + + public static record AsyncReplicationConfig( + @SerializedName("hashtreeHeight") Integer hashTreeHeight, + @SerializedName("maxWorkers") Integer replicationConcurrency, + @SerializedName("frequency") Integer replicationFrequencyMillis, + @SerializedName("frequencyWhilePropagating") Integer replicationFrequencyMillisWhilePropagating, + @SerializedName("aliveNodesCheckingFrequency") Integer nodePingFrequencyMillis, + @SerializedName("loggingFrequency") Integer loggingFrequencyMillis, + @SerializedName("diffBatchSize") Integer diffBatchSize, + @SerializedName("diffPerNodeTimeout") Integer diffPerNodeTimeoutSeconds, + @SerializedName("prePropagationTimeout") Integer prePropagationTimeoutSeconds, + @SerializedName("propagationTimeout") Integer propagationTimeoutSeconds, + @SerializedName("propagationDelay") Integer propagationDelayMillis, + @SerializedName("propagationLimit") Integer propagationLimit, + @SerializedName("propagationConcurrency") Integer propagationConcurrency, + @SerializedName("propagationBatchSize") Integer propagationBatchSize) { + + public AsyncReplicationConfig(Builder builder) { + this( + builder.hashTreeHeight, + builder.replicationConcurrency, + builder.replicationFrequencyMillis, + builder.replicationFrequencyMillisWhilePropagating, + builder.nodePingFrequencyMillis, + builder.loggingFrequencyMillis, + builder.diffBatchSize, + builder.diffPerNodeTimeoutSeconds, + builder.prePropagationTimeoutSeconds, + builder.propagationTimeoutSeconds, + builder.propagationDelayMillis, + builder.propagationLimit, + builder.propagationConcurrency, + builder.propagationBatchSize); + } + + public static AsyncReplicationConfig of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public static class Builder implements ObjectBuilder { + private Integer hashTreeHeight; + private Integer replicationConcurrency; + private Integer replicationFrequencyMillis; + private Integer replicationFrequencyMillisWhilePropagating; + private Integer nodePingFrequencyMillis; + private Integer loggingFrequencyMillis; + private Integer diffBatchSize; + private Integer diffPerNodeTimeoutSeconds; + private Integer prePropagationTimeoutSeconds; + private Integer propagationTimeoutSeconds; + private Integer propagationDelayMillis; + private Integer propagationLimit; + private Integer propagationConcurrency; + private Integer propagationBatchSize; + + /** Height of the hashtree used for diffing. */ + public Builder hashTreeHeight(int hashTreeHeight) { + this.hashTreeHeight = hashTreeHeight; + return this; + } + + /** Maximum number of async replication workers. */ + public Builder replicationConcurrency(int replicationConcurrency) { + this.replicationConcurrency = replicationConcurrency; + return this; + } + + /** + * Base frequency in milliseconds at which async replication + * runs diff calculations. + */ + public Builder replicationFrequencyMillis(int replicationFrequencyMillis) { + this.replicationFrequencyMillis = replicationFrequencyMillis; + return this; + } + + /** + * Frequency in milliseconds at which async replication runs + * while propagation is active. + */ + public Builder replicationFrequencyMillisWhilePropagating(int replicationFrequencyMillisWhilePropagating) { + this.replicationFrequencyMillisWhilePropagating = replicationFrequencyMillisWhilePropagating; + return this; + } + + /** Interval in milliseconds at which liveness of target nodes is checked." */ + public Builder nodePingFrequencyMillis(int nodePingFrequencyMillis) { + this.nodePingFrequencyMillis = nodePingFrequencyMillis; + return this; + } + + /** Interval in seconds at which async replication logs its status. */ + public Builder loggingFrequencyMillis(int loggingFrequencyMillis) { + this.loggingFrequencyMillis = loggingFrequencyMillis; + return this; + } + + /** Maximum number of object keys included in a single diff batch. */ + public Builder diffBatchSize(int diffBatchSize) { + this.diffBatchSize = diffBatchSize; + return this; + } + + /** Timeout in seconds for computing a diff against a single node. */ + public Builder diffPerNodeTimeoutSeconds(int diffPerNodeTimeoutSeconds) { + this.diffPerNodeTimeoutSeconds = diffPerNodeTimeoutSeconds; + return this; + } + + /** Overall timeout in seconds for the pre-propagation phase. */ + public Builder prePropagationTimeoutSeconds(int prePropagationTimeoutSeconds) { + this.prePropagationTimeoutSeconds = prePropagationTimeoutSeconds; + return this; + } + + /** Timeout in seconds for propagating batch of changes to a node. */ + public Builder propagationTimeoutSeconds(int propagationTimeoutSeconds) { + this.propagationTimeoutSeconds = propagationTimeoutSeconds; + return this; + } + + /** + * Delay in milliseconds before newly added or updated objects are propagated. + */ + public Builder propagationDelayMillis(int propagationDelayMillis) { + this.propagationDelayMillis = propagationDelayMillis; + return this; + } + + /** Maximum number of objects to propagate in a single async replication run. */ + public Builder propagationLimit(int propagationLimit) { + this.propagationLimit = propagationLimit; + return this; + } + + /** Maximum number of concurrent propagation workers. */ + public Builder propagationConcurrency(int propagationConcurrency) { + this.propagationConcurrency = propagationConcurrency; + return this; + } + + /** Maximum number of objects to propagate in a single async replication run. */ + public Builder propagationBatchSize(int propagationBatchSize) { + this.propagationBatchSize = propagationBatchSize; + return this; + } + + @Override + public AsyncReplicationConfig build() { + return new AsyncReplicationConfig(this); + } + } } public static class Builder implements ObjectBuilder { private Integer replicationFactor; private Boolean asyncEnabled; private DeletionStrategy deletionStrategy; + private AsyncReplicationConfig asyncReplicationConfig; /** Set desired replication factor for this collection. */ public Builder replicationFactor(int replicationFactor) { @@ -57,6 +213,12 @@ public Builder deletionStrategy(DeletionStrategy deletionStrategy) { return this; } + /** Configuration parameters for asynchronous replication. */ + public Builder asyncReplication(AsyncReplicationConfig asyncReplicationConfig) { + this.asyncReplicationConfig = asyncReplicationConfig; + return this; + } + @Override public Replication build() { return new Replication(this);