getLastResponse() {
+ return this.lastResponse;
+ }
}
diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java
index eda7bcb3..cf15268c 100644
--- a/src/main/java/com/coveo/pushapiclient/StreamService.java
+++ b/src/main/java/com/coveo/pushapiclient/StreamService.java
@@ -27,7 +27,11 @@ public class StreamService {
* @param userAgents The user agent to use for the requests.
*/
public StreamService(StreamEnabledSource source, String[] userAgents) {
- this(source, new BackoffOptionsBuilder().build(), userAgents);
+ this(
+ source,
+ new BackoffOptionsBuilder().build(),
+ userAgents,
+ DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -42,7 +46,11 @@ public StreamService(StreamEnabledSource source, String[] userAgents) {
* @param source The source to which you want to send your documents.
*/
public StreamService(StreamEnabledSource source) {
- this(source, new BackoffOptionsBuilder().build());
+ this(
+ source,
+ new BackoffOptionsBuilder().build(),
+ null,
+ DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -58,7 +66,7 @@ public StreamService(StreamEnabledSource source) {
* @param options The configuration options for exponential backoff.
*/
public StreamService(StreamEnabledSource source, BackoffOptions options) {
- this(source, options, null);
+ this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -70,11 +78,23 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) {
* {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can
* also be used for an initial catalog upload.
*
+ * Example batch sizes in bytes:
+ *
+ *
+ * - 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880}
+ *
- 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800}
+ *
- 256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456}
+ *
+ *
* @param source The source to which you want to send your documents.
* @param options The configuration options for exponential backoff.
* @param userAgents The user agent to use for the requests.
+ * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max:
+ * 256MB).
+ * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive.
*/
- public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ public StreamService(
+ StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) {
String apiKey = source.getApiKey();
String organizationId = source.getOrganizationId();
PlatformUrl platformUrl = source.getPlatformUrl();
@@ -82,15 +102,17 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[
Logger logger = LogManager.getLogger(StreamService.class);
this.source = source;
- this.queue = new DocumentUploadQueue(uploader);
+ this.queue = new DocumentUploadQueue(uploader, maxQueueSize);
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options);
- platformClient.setUserAgents(userAgents);
+ if (userAgents != null) {
+ platformClient.setUserAgents(userAgents);
+ }
this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger);
}
/**
- * Adds documents to the previously specified source. This function will open a stream before
- * uploading documents into it.
+ * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the
+ * maximum content length. See {@link DocumentUploadQueue#flush}.
*
* If called several times, the service will automatically batch documents and create new
* stream chunks whenever the data payload exceeds the Implementations of this interface handle the complete stream upload workflow:
+ *
+ *
+ * - Create a file container via {@code platformClient.createFileContainer()}
+ *
- Upload content to the container via {@code platformClient.uploadContentToFileContainer()}
+ *
- Push the container content to the stream source via {@code
+ * platformClient.pushFileContainerContentToStreamSource()}
+ *
+ *
+ * This is an internal implementation detail and should only be used within the package for
+ * handling stream-specific upload operations.
+ */
+@FunctionalInterface
+interface StreamUploadHandler {
+ /**
+ * Handles a stream update by executing the upload and push workflow.
+ *
+ * @param stream the {@link StreamUpdate} containing documents and operations to push
+ * @return the HTTP response from the push operation
+ * @throws IOException if an I/O error occurs during upload or push operations
+ * @throws InterruptedException if the operation is interrupted
+ */
+ HttpResponse uploadAndPush(StreamUpdate stream) throws IOException, InterruptedException;
+}
diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
index 948c3461..69ade903 100644
--- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
+++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
@@ -1,7 +1,6 @@
package com.coveo.pushapiclient;
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
-import com.google.gson.Gson;
import java.io.IOException;
import java.net.http.HttpResponse;
import org.apache.logging.log4j.LogManager;
@@ -12,8 +11,6 @@ public class UpdateStreamService {
private final PlatformClient platformClient;
private final UpdateStreamServiceInternal updateStreamServiceInternal;
- private FileContainer fileContainer;
-
/**
* Creates a service to stream your documents to the provided source by interacting with the
* Stream API. This provides the ability to incrementally add, update, or delete documents via a
@@ -26,7 +23,11 @@ public class UpdateStreamService {
* @param userAgents The user agent to use for the requests.
*/
public UpdateStreamService(StreamEnabledSource source, String[] userAgents) {
- this(source, new BackoffOptionsBuilder().build(), userAgents);
+ this(
+ source,
+ new BackoffOptionsBuilder().build(),
+ userAgents,
+ DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -40,7 +41,11 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) {
* @param source The source to which you want to send your documents.
*/
public UpdateStreamService(StreamEnabledSource source) {
- this(source, new BackoffOptionsBuilder().build());
+ this(
+ source,
+ new BackoffOptionsBuilder().build(),
+ null,
+ DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -55,7 +60,25 @@ public UpdateStreamService(StreamEnabledSource source) {
* @param options The configuration options for exponential backoff.
*/
public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) {
- this(source, options, null);
+ this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize());
+ }
+
+ /**
+ * Creates a service to stream your documents to the provided source by interacting with the
+ * Stream API. This provides the ability to incrementally add, update, or delete documents via a
+ * stream.
+ *
+ * To perform a full source rebuild, use the
+ * {@link StreamService}.
+ *
+ * @param source The source to push to
+ * @param options The backoff parameters
+ * @param userAgents The user-agents to append to the "User-Agent" HTTP header when performing
+ * requests against the Coveo Platform.
+ */
+ public UpdateStreamService(
+ StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -66,23 +89,37 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) {
*
To perform a full source rebuild, use the
* {@StreamService}
*
+ *
Example batch sizes in bytes:
+ *
+ *
+ * - 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880}
+ *
- 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800}
+ *
- 256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456}
+ *
+ *
* @param source The source to which you want to send your documents.
* @param options The configuration options for exponential backoff.
* @param userAgents The user agent to use for the requests.
+ * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max:
+ * 256MB).
+ * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive.
*/
public UpdateStreamService(
- StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) {
Logger logger = LogManager.getLogger(UpdateStreamService.class);
this.platformClient =
new PlatformClient(
source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options);
- this.platformClient.setUserAgents(userAgents);
+ if (userAgents != null) {
+ this.platformClient.setUserAgents(userAgents);
+ }
+
+ CatalogStreamUploadHandler handler =
+ new CatalogStreamUploadHandler(source, this.platformClient);
+ StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, maxQueueSize);
+
this.updateStreamServiceInternal =
- new UpdateStreamServiceInternal(
- source,
- new StreamDocumentUploadQueue(this.getUploadStrategy()),
- this.platformClient,
- logger);
+ new UpdateStreamServiceInternal(source, queue, this.platformClient, logger);
}
/**
@@ -90,10 +127,12 @@ public UpdateStreamService(
* open to receive the documents, this function will open a file container before uploading
* documents into it.
*
- * If called several times, the service will automatically batch documents and create new
- * stream chunks whenever the data payload exceeds the batch size limit set for the
- * Stream API.
+ *
If called several times, the service will automatically batch documents and create new file
+ * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable
+ * via constructor). Each batch is sent to its own file container and immediately pushed to the
+ * stream source, following the
+ * catalog stream API best practices.
*
*
Once there are no more documents to add, it is important to call the {@link
* UpdateStreamService#close} function in order to send any buffered documents and push the file
@@ -118,7 +157,7 @@ public UpdateStreamService(
* @throws IOException If the creation of the file container or adding the document fails.
*/
public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.addOrUpdate(document);
+ updateStreamServiceInternal.addOrUpdate(document);
}
/**
@@ -129,10 +168,12 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte
* href="https://docs.coveo.com/en/l62e0540/coveo-for-commerce/how-to-update-your-catalog#partial-item-updates">
* Partial item updates
section.
*
- *
If called several times, the service will automatically batch documents and create new
- * stream chunks whenever the data payload exceeds the batch size limit set for the
- * Stream API.
+ *
If called several times, the service will automatically batch documents and create new file
+ * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable
+ * via constructor). Each batch is sent to its own file container and immediately pushed to the
+ * stream source, following the
+ * catalog stream API best practices.
*
*
Once there are no more documents to add, it is important to call the {@link
* UpdateStreamService#close} function in order to send any buffered documents and push the file
@@ -158,7 +199,7 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte
*/
public void addPartialUpdate(PartialUpdateDocument document)
throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.addPartialUpdate(document);
+ updateStreamServiceInternal.addPartialUpdate(document);
}
/**
@@ -194,7 +235,7 @@ public void addPartialUpdate(PartialUpdateDocument document)
* @throws IOException If the creation of the file container or adding the document fails.
*/
public void delete(DeleteDocument document) throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.delete(document);
+ updateStreamServiceInternal.delete(document);
}
/**
@@ -214,11 +255,4 @@ public HttpResponse close()
throws IOException, InterruptedException, NoOpenFileContainerException {
return updateStreamServiceInternal.close();
}
-
- private UploadStrategy getUploadStrategy() {
- return (streamUpdate) -> {
- String batchUpdateJson = new Gson().toJson(streamUpdate.marshal());
- return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson);
- };
- }
}
diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
index 32f7ddc9..275c60c1 100644
--- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
+++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
@@ -1,79 +1,38 @@
package com.coveo.pushapiclient;
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
-import com.google.gson.Gson;
import java.io.IOException;
import java.net.http.HttpResponse;
import org.apache.logging.log4j.Logger;
/** For internal use only. Made to easily test the service without having to use PowerMock */
class UpdateStreamServiceInternal {
- private final Logger logger;
- private final StreamEnabledSource source;
- private final PlatformClient platformClient;
private final StreamDocumentUploadQueue queue;
- private FileContainer fileContainer;
public UpdateStreamServiceInternal(
final StreamEnabledSource source,
final StreamDocumentUploadQueue queue,
final PlatformClient platformClient,
final Logger logger) {
- this.source = source;
this.queue = queue;
- this.platformClient = platformClient;
- this.logger = logger;
}
- public FileContainer addOrUpdate(DocumentBuilder document)
- throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
+ public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
queue.add(document);
- return this.fileContainer;
}
- public FileContainer addPartialUpdate(PartialUpdateDocument document)
+ public void addPartialUpdate(PartialUpdateDocument document)
throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
queue.add(document);
- return this.fileContainer;
}
- public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
+ public void delete(DeleteDocument document) throws IOException, InterruptedException {
queue.add(document);
- return this.fileContainer;
}
public HttpResponse close()
throws IOException, InterruptedException, NoOpenFileContainerException {
- return this.pushFileContainer(this.getSourceId());
- }
-
- private FileContainer createFileContainer() throws IOException, InterruptedException {
- this.logger.info("Creating new file container");
- HttpResponse response = this.platformClient.createFileContainer();
- return new Gson().fromJson(response.body(), FileContainer.class);
- }
-
- private HttpResponse pushFileContainer(String sourceId)
- throws NoOpenFileContainerException, IOException, InterruptedException {
- if (this.fileContainer == null) {
- throw new NoOpenFileContainerException(
- "No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents.");
- }
queue.flush();
- this.logger.info("Pushing to file container " + this.fileContainer.fileId);
- return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer);
- }
-
- private String getSourceId() {
- return this.source.getId();
+ return queue.getLastResponse();
}
}
diff --git a/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java
new file mode 100644
index 00000000..e148b087
--- /dev/null
+++ b/src/test/java/com/coveo/pushapiclient/CatalogStreamUploadHandlerTest.java
@@ -0,0 +1,121 @@
+package com.coveo.pushapiclient;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.http.HttpResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class CatalogStreamUploadHandlerTest {
+ @Mock private StreamEnabledSource mockSource;
+ @Mock private PlatformClient mockPlatformClient;
+ @Mock private HttpResponse mockContainerResponse;
+ @Mock private HttpResponse mockPushResponse;
+ @Mock private StreamUpdate mockStreamUpdate;
+
+ private CatalogStreamUploadHandler handler;
+ private AutoCloseable closeable;
+
+ @Before
+ public void setUp() {
+ closeable = MockitoAnnotations.openMocks(this);
+ handler = new CatalogStreamUploadHandler(mockSource, mockPlatformClient);
+ when(mockSource.getId()).thenReturn("test-source-id");
+ }
+
+ @After
+ public void closeService() throws Exception {
+ closeable.close();
+ }
+
+ @Test
+ public void uploadAndPushShouldExecute3StepWorkflowInOrder()
+ throws IOException, InterruptedException {
+ when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-container-id\"}");
+ when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse);
+ StreamUpdateRecord mockRecord =
+ new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {});
+ when(mockStreamUpdate.marshal()).thenReturn(mockRecord);
+ when(mockPlatformClient.pushFileContainerContentToStreamSource(
+ anyString(), any(FileContainer.class)))
+ .thenReturn(mockPushResponse);
+
+ HttpResponse result = handler.uploadAndPush(mockStreamUpdate);
+
+ InOrder inOrder = inOrder(mockPlatformClient);
+ inOrder.verify(mockPlatformClient).createFileContainer();
+ inOrder
+ .verify(mockPlatformClient)
+ .uploadContentToFileContainer(any(FileContainer.class), anyString());
+ inOrder
+ .verify(mockPlatformClient)
+ .pushFileContainerContentToStreamSource(eq("test-source-id"), any(FileContainer.class));
+ assertEquals(mockPushResponse, result);
+ }
+
+ @Test
+ public void uploadAndPushShouldReturnPushResponse() throws IOException, InterruptedException {
+ when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}");
+ when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse);
+ StreamUpdateRecord mockRecord =
+ new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {});
+ when(mockStreamUpdate.marshal()).thenReturn(mockRecord);
+ when(mockPlatformClient.pushFileContainerContentToStreamSource(
+ anyString(), any(FileContainer.class)))
+ .thenReturn(mockPushResponse);
+
+ HttpResponse result = handler.uploadAndPush(mockStreamUpdate);
+
+ assertSame(mockPushResponse, result);
+ }
+
+ @Test(expected = IOException.class)
+ public void uploadAndPushShouldPropagateIOExceptionFromCreateFileContainer()
+ throws IOException, InterruptedException {
+ when(mockPlatformClient.createFileContainer())
+ .thenThrow(new IOException("Container creation failed"));
+
+ handler.uploadAndPush(mockStreamUpdate);
+ }
+
+ @Test(expected = IOException.class)
+ public void uploadAndPushShouldPropagateIOExceptionFromUploadContent()
+ throws IOException, InterruptedException {
+ when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}");
+ when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse);
+ StreamUpdateRecord mockRecord =
+ new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {});
+ when(mockStreamUpdate.marshal()).thenReturn(mockRecord);
+ when(mockPlatformClient.uploadContentToFileContainer(any(FileContainer.class), anyString()))
+ .thenThrow(new IOException("Upload failed"));
+
+ handler.uploadAndPush(mockStreamUpdate);
+ }
+
+ @Test(expected = IOException.class)
+ public void uploadAndPushShouldPropagateIOExceptionFromPush()
+ throws IOException, InterruptedException {
+ when(mockContainerResponse.body()).thenReturn("{\"fileId\":\"test-id\"}");
+ when(mockPlatformClient.createFileContainer()).thenReturn(mockContainerResponse);
+ StreamUpdateRecord mockRecord =
+ new StreamUpdateRecord(new JsonObject[] {}, new JsonObject[] {}, new JsonObject[] {});
+ when(mockStreamUpdate.marshal()).thenReturn(mockRecord);
+ when(mockPlatformClient.pushFileContainerContentToStreamSource(
+ anyString(), any(FileContainer.class)))
+ .thenThrow(new IOException("Push failed"));
+
+ handler.uploadAndPush(mockStreamUpdate);
+ }
+}
diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java
new file mode 100644
index 00000000..d9425ff1
--- /dev/null
+++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java
@@ -0,0 +1,248 @@
+package com.coveo.pushapiclient;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * Integration tests for file container rotation when pushing large amounts of data. These tests
+ * verify the end-to-end flow from UpdateStreamService through CatalogStreamUploadHandler to
+ * PlatformClient, using a small batch size to trigger rotation without needing large test data.
+ *
+ * Key architectural pattern: Each batch creates its own file container via
+ * CatalogStreamUploadHandler. The handler executes create→upload→push for each uploadAndPush()
+ * call, ensuring container rotation per batch.
+ */
+public class FileContainerRotationIntegrationTest {
+
+ private static final int SMALL_BATCH_SIZE = 1000;
+ private static final String SOURCE_ID = "test-source-id";
+ private static final String ORG_ID = "test-org";
+ private static final String API_KEY = "test-api-key";
+
+ private PlatformClient platformClient;
+ private StreamEnabledSource source;
+ private AtomicInteger containerCounter;
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ platformClient = mock(PlatformClient.class);
+ source = mock(StreamEnabledSource.class);
+ containerCounter = new AtomicInteger(0);
+
+ doReturn(SOURCE_ID).when(source).getId();
+ doReturn(ORG_ID).when(source).getOrganizationId();
+ doReturn(API_KEY).when(source).getApiKey();
+ doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl();
+
+ doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer();
+ doReturn(createGenericResponse())
+ .when(platformClient)
+ .uploadContentToFileContainer(any(), anyString());
+ doReturn(createGenericResponse())
+ .when(platformClient)
+ .pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldCreateMultipleContainersWhenDataExceedsBatchSize() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ service.addOrUpdate(createDocument("doc2", 600));
+ service.addOrUpdate(createDocument("doc3", 600));
+ service.addOrUpdate(createDocument("doc4", 600));
+ service.close();
+
+ verify(platformClient, times(4)).createFileContainer();
+ verify(platformClient, times(4)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldCreateSingleContainerWhenDataFitsInOneBatch() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 100));
+ service.addOrUpdate(createDocument("doc2", 100));
+ service.close();
+
+ verify(platformClient, times(1)).createFileContainer();
+ verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldHandleMixedOperationsWithRotation() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 400));
+ service.delete(new DeleteDocument("doc2"));
+ service.addPartialUpdate(createPartialUpdate("doc3", 400));
+ service.addOrUpdate(createDocument("doc4", 400));
+ service.close();
+
+ verify(platformClient, times(3)).createFileContainer();
+ verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldUseUniqueContainerIdForEachBatch() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ service.addOrUpdate(createDocument("doc2", 600));
+ service.addOrUpdate(createDocument("doc3", 600));
+ service.close();
+
+ ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class);
+ verify(platformClient, times(3))
+ .pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture());
+
+ assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId);
+ assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId);
+ assertEquals("container-3", containerCaptor.getAllValues().get(2).fileId);
+ }
+
+ @Test
+ public void shouldPushImmediatelyWhenBatchSizeExceeded() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ verify(platformClient, times(0)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.addOrUpdate(createDocument("doc2", 600));
+ verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.addOrUpdate(createDocument("doc3", 600));
+ verify(platformClient, times(2)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.close();
+ verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ for (int i = 0; i < 20; i++) {
+ service.addOrUpdate(createDocument("doc" + i, 200));
+ }
+ service.close();
+
+ int expectedContainers = 10;
+ verify(platformClient, times(expectedContainers)).createFileContainer();
+ verify(platformClient, times(expectedContainers))
+ .pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception {
+ Map pushCountPerContainer = new HashMap<>();
+ List containerCreationOrder = new ArrayList<>();
+
+ doAnswer(
+ invocation -> {
+ HttpResponse response = createContainerResponse();
+ String fileId = "container-" + containerCounter.get();
+ containerCreationOrder.add(fileId);
+ pushCountPerContainer.put(fileId, 0);
+ return response;
+ })
+ .when(platformClient)
+ .createFileContainer();
+
+ doAnswer(
+ invocation -> {
+ FileContainer container = invocation.getArgument(1);
+ int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0);
+ pushCountPerContainer.put(container.fileId, currentCount + 1);
+ return createGenericResponse();
+ })
+ .when(platformClient)
+ .pushFileContainerContentToStreamSource(anyString(), any());
+
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ for (int i = 0; i < 10; i++) {
+ service.addOrUpdate(createDocument("doc" + i, 400));
+ }
+ service.close();
+
+ for (Map.Entry entry : pushCountPerContainer.entrySet()) {
+ assertEquals(
+ "Container "
+ + entry.getKey()
+ + " should receive exactly 1 push, but received "
+ + entry.getValue(),
+ Integer.valueOf(1),
+ entry.getValue());
+ }
+
+ assertTrue("Should have created multiple containers", containerCreationOrder.size() > 1);
+ }
+
+ private UpdateStreamServiceInternal createServiceWithSmallBatchSize() {
+ CatalogStreamUploadHandler handler = new CatalogStreamUploadHandler(source, platformClient);
+ StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(handler, SMALL_BATCH_SIZE);
+ org.apache.logging.log4j.Logger logger =
+ org.apache.logging.log4j.LogManager.getLogger(getClass());
+ return new UpdateStreamServiceInternal(source, queue, platformClient, logger);
+ }
+
+ private DocumentBuilder createDocument(String id, int dataSize) {
+ return new DocumentBuilder("https://example.com/" + id, "Title " + id)
+ .withData(generateData(dataSize));
+ }
+
+ private PartialUpdateDocument createPartialUpdate(String id, int dataSize) {
+ return new PartialUpdateDocument(
+ "https://example.com/" + id,
+ PartialUpdateOperator.FIELDVALUEREPLACE,
+ "field",
+ generateData(dataSize));
+ }
+
+ private String generateData(int size) {
+ byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++) {
+ bytes[i] = 65;
+ }
+ return new String(bytes);
+ }
+
+ @SuppressWarnings("unchecked")
+ private HttpResponse createContainerResponse() {
+ HttpResponse response = mock(HttpResponse.class);
+ int id = containerCounter.incrementAndGet();
+ String responseBody =
+ String.format(
+ "{\"uploadUri\": \"https://upload.uri/container-%d\", "
+ + "\"fileId\": \"container-%d\"}",
+ id, id);
+ doReturn(responseBody).when(response).body();
+ return response;
+ }
+
+ @SuppressWarnings("unchecked")
+ private HttpResponse createGenericResponse() {
+ HttpResponse response = mock(HttpResponse.class);
+ doReturn("{\"status\": \"ok\"}").when(response).body();
+ return response;
+ }
+}
diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java
new file mode 100644
index 00000000..021ca8f9
--- /dev/null
+++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java
@@ -0,0 +1,314 @@
+package com.coveo.pushapiclient;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.http.HttpResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for batch size configuration and auto-flush behavior in StreamDocumentUploadQueue. Each
+ * batch that exceeds the configured limit should trigger automatic flush and push via the handler.
+ */
+public class StreamDocumentUploadQueueBatchingTest {
+
+ private static final int SMALL_BATCH_SIZE = 5000;
+
+ @Mock private StreamUploadHandler mockHandler;
+ @Mock private HttpResponse httpResponse;
+
+ private StreamDocumentUploadQueue queue;
+ private AutoCloseable closeable;
+
+ @Before
+ public void setUp() throws Exception {
+ closeable = MockitoAnnotations.openMocks(this);
+ queue = new StreamDocumentUploadQueue(mockHandler, SMALL_BATCH_SIZE);
+ when(mockHandler.uploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
+ }
+
+ @Test
+ public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc1 =
+ new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000));
+ DocumentBuilder doc2 =
+ new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000));
+
+ queue.add(doc1);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+
+ queue.add(doc2);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached()
+ throws IOException, InterruptedException {
+ DocumentBuilder smallDoc1 = new DocumentBuilder("https://doc.uri/1", "Small Doc 1");
+ DocumentBuilder smallDoc2 = new DocumentBuilder("https://doc.uri/2", "Small Doc 2");
+
+ queue.add(smallDoc1);
+ queue.add(smallDoc2);
+
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+ assertFalse(queue.isEmpty());
+ }
+
+ @Test
+ public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc1 =
+ new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000));
+ DocumentBuilder doc2 =
+ new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000));
+ DocumentBuilder doc3 =
+ new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000));
+
+ queue.add(doc1);
+ queue.add(doc2);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+
+ queue.add(doc3);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class);
+ verify(mockHandler).uploadAndPush(captor.capture());
+ assertEquals(2, captor.getValue().getAddOrUpdate().size());
+ }
+
+ @Test
+ public void multipleBatchesShouldCreateMultipleHandlerCalls()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc1 =
+ new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000));
+ DocumentBuilder doc2 =
+ new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000));
+ DocumentBuilder doc3 =
+ new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000));
+ DocumentBuilder doc4 =
+ new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000));
+
+ queue.add(doc1);
+ queue.add(doc2);
+ queue.add(doc3);
+ queue.add(doc4);
+
+ verify(mockHandler, times(3)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException {
+ DocumentBuilder doc =
+ new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10));
+ queue.add(doc);
+ assertFalse(queue.isEmpty());
+
+ queue.flush();
+
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void flushAndPushShouldReturnResponseFromHandler()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc =
+ new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10));
+ queue.add(doc);
+
+ queue.flush();
+ HttpResponse response = queue.getLastResponse();
+
+ assertEquals(httpResponse, response);
+ }
+
+ @Test
+ public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException {
+ queue.flush();
+ HttpResponse response = queue.getLastResponse();
+
+ assertNull(response);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void flushAndPushShouldPassCorrectStreamUpdateToHandler()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc");
+ DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2");
+ PartialUpdateDocument partialDoc =
+ new PartialUpdateDocument(
+ "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value");
+
+ queue.add(doc);
+ queue.add(deleteDoc);
+ queue.add(partialDoc);
+
+ queue.flush();
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class);
+ verify(mockHandler).uploadAndPush(captor.capture());
+
+ StreamUpdate captured = captor.getValue();
+ assertEquals(1, captured.getAddOrUpdate().size());
+ assertEquals(1, captured.getDelete().size());
+ assertEquals(1, captured.getPartialUpdate().size());
+ }
+
+ @Test
+ public void deleteDocumentsTriggerFlushWhenExceedingLimit()
+ throws IOException, InterruptedException {
+ queue = new StreamDocumentUploadQueue(mockHandler, 50);
+ when(mockHandler.uploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse);
+
+ DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1");
+ DeleteDocument deleteDoc2 =
+ new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds");
+
+ queue.add(deleteDoc1);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+
+ queue.add(deleteDoc2);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit()
+ throws IOException, InterruptedException {
+ PartialUpdateDocument partialDoc1 =
+ new PartialUpdateDocument(
+ "https://doc.uri/1", PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v");
+ PartialUpdateDocument partialDoc2 =
+ new PartialUpdateDocument(
+ "https://doc.uri/2",
+ PartialUpdateOperator.FIELDVALUEREPLACE,
+ "field",
+ generateData(SMALL_BATCH_SIZE));
+
+ queue.add(partialDoc1);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+
+ queue.add(partialDoc2);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc =
+ new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500));
+ DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2");
+ PartialUpdateDocument partialDoc =
+ new PartialUpdateDocument(
+ "https://doc.uri/3",
+ PartialUpdateOperator.FIELDVALUEREPLACE,
+ "field",
+ generateData(4000));
+
+ queue.add(doc);
+ queue.add(deleteDoc);
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
+
+ queue.add(partialDoc);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void constructorShouldRejectBatchSizeExceeding256MB() {
+ int exceeding256MB = 256 * 1024 * 1024 + 1;
+ try {
+ new StreamDocumentUploadQueue(mockHandler, exceeding256MB);
+ throw new AssertionError("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void constructorShouldRejectZeroBatchSize() {
+ try {
+ new StreamDocumentUploadQueue(mockHandler, 0);
+ throw new AssertionError("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void constructorShouldRejectNegativeBatchSize() {
+ try {
+ new StreamDocumentUploadQueue(mockHandler, -1);
+ throw new AssertionError("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void constructorShouldAcceptMaxAllowedBatchSize() {
+ int max256MB = 256 * 1024 * 1024;
+ StreamDocumentUploadQueue q = new StreamDocumentUploadQueue(mockHandler, max256MB);
+ assertNotNull(q);
+ }
+
+ @Test
+ public void queueShouldUseSystemPropertyForDefaultBatchSize() {
+ String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ try {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "1048576");
+ int configuredSize = DocumentUploadQueue.getConfiguredBatchSize();
+ assertEquals(1048576, configuredSize);
+ } finally {
+ if (originalValue != null) {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue);
+ } else {
+ System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ }
+ }
+ }
+
+ @Test
+ public void systemPropertyExceeding256MBShouldThrow() {
+ String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ try {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "268435457");
+ DocumentUploadQueue.getConfiguredBatchSize();
+ throw new AssertionError("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ } finally {
+ if (originalValue != null) {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue);
+ } else {
+ System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ }
+ }
+ }
+
+ private String generateData(int numBytes) {
+ if (numBytes <= 0) return "";
+ byte[] bytes = new byte[numBytes];
+ for (int i = 0; i < numBytes; i++) {
+ bytes[i] = 65;
+ }
+ return new String(bytes);
+ }
+}
diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
index 12cd2f54..79463fcd 100644
--- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
+++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
@@ -14,15 +14,14 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class StreamDocumentUploadQueueTest {
- @Mock private UploadStrategy uploadStrategy;
+ @Mock private StreamUploadHandler mockHandler;
- @InjectMocks private StreamDocumentUploadQueue queue;
+ private StreamDocumentUploadQueue queue;
private AutoCloseable closeable;
private DocumentBuilder documentToAdd;
@@ -80,6 +79,7 @@ public void setup() {
"value");
closeable = MockitoAnnotations.openMocks(this);
+ queue = new StreamDocumentUploadQueue(mockHandler, DocumentUploadQueue.DEFAULT_QUEUE_SIZE);
}
@After
@@ -137,7 +137,7 @@ public void testFlushShouldNotUploadDocumentsWhenRequiredSizeIsNotMet()
// The maximum queue size has not been reached yet (1MB left of free space).
// Therefore, the accumulated documents will not be automatically flushed.
// Unless the user runs `.flush()` the queue will keep the 4MB of documents
- verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
}
@Test
@@ -168,15 +168,15 @@ public void testShouldAutomaticallyFlushAccumulatedDocuments()
// uploaded to the source.
queue.add(firstBulkyDocument);
queue.add(secondBulkyDocument);
- verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
// The 3rd document added to the queue will be included in a separate batch,
// which will not be uploaded unless the `flush()` method is called or until the
// queue size limit has been reached
queue.add(thirdBulkyDocument);
- verify(uploadStrategy, times(1)).apply(any(BatchUpdate.class));
- verify(uploadStrategy, times(1)).apply(firstBatch);
+ verify(mockHandler, times(1)).uploadAndPush(any(StreamUpdate.class));
+ verify(mockHandler, times(1)).uploadAndPush(firstBatch);
}
@Test
@@ -225,9 +225,9 @@ public void testShouldManuallyFlushAccumulatedDocuments()
// Additional flush will have no effect if documents where already flushed
queue.flush();
- verify(uploadStrategy, times(2)).apply(any(StreamUpdate.class));
- verify(uploadStrategy, times(1)).apply(firstBatch);
- verify(uploadStrategy, times(1)).apply(secondBatch);
+ verify(mockHandler, times(2)).uploadAndPush(any(StreamUpdate.class));
+ verify(mockHandler, times(1)).uploadAndPush(firstBatch);
+ verify(mockHandler, times(1)).uploadAndPush(secondBatch);
}
@Test
@@ -237,7 +237,7 @@ public void testAddingEmptyDocument() throws IOException, InterruptedException {
queue.add(nullDocument);
queue.flush();
- verify(uploadStrategy, times(0)).apply(any(StreamUpdate.class));
+ verify(mockHandler, times(0)).uploadAndPush(any(StreamUpdate.class));
}
@Rule public ExpectedException expectedException = ExpectedException.none();
diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
index 726769b8..1052fb21 100644
--- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
+++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
@@ -1,7 +1,6 @@
package com.coveo.pushapiclient;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -70,11 +69,12 @@ public void closeService() throws Exception {
}
@Test
- public void addOrUpdateShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void addOrUpdateShouldAddDocumentsToQueue() throws IOException, InterruptedException {
service.addOrUpdate(documentA);
service.addOrUpdate(documentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(queue, times(1)).add(documentA);
+ verify(queue, times(1)).add(documentB);
}
@Test
@@ -94,62 +94,50 @@ public void addOrUpdateAndPartialAndDeleteShouldAddDocumentsToQueue()
}
@Test
- public void deleteShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void deleteShouldAddDocumentsToQueue() throws IOException, InterruptedException {
service.delete(deleteDocumentA);
service.delete(deleteDocumentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(queue, times(1)).add(deleteDocumentA);
+ verify(queue, times(1)).add(deleteDocumentB);
}
@Test
- public void partialUpdateShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void partialUpdateShouldAddDocumentsToQueue() throws IOException, InterruptedException {
service.addPartialUpdate(partialUpdateDocumentA);
service.addPartialUpdate(partialUpdateDocumentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(queue, times(1)).add(partialUpdateDocumentA);
+ verify(queue, times(1)).add(partialUpdateDocumentB);
}
@Test
- public void closeShouldPushFileContainerOnAddOrUpdate()
+ public void closeShouldCallFlushAndPush()
throws IOException, InterruptedException, NoOpenFileContainerException {
service.addOrUpdate(documentA);
service.close();
- verify(platformClient, times(1))
- .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class));
- }
-
- @Test
- public void closeShouldPushFileContainerOnDelete()
- throws IOException, InterruptedException, NoOpenFileContainerException {
- service.delete(deleteDocumentA);
- service.close();
-
- verify(platformClient, times(1))
- .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class));
+ verify(queue, times(1)).flush();
}
@Test
- public void closeShouldFlushBufferedDocuments()
+ public void closeShouldReturnFlushAndPushResponse()
throws IOException, InterruptedException, NoOpenFileContainerException {
+ when(queue.getLastResponse()).thenReturn(httpResponse);
service.addOrUpdate(documentA);
- service.close();
- verify(queue, times(1)).flush();
+ HttpResponse result = service.close();
+
+ assertEquals(httpResponse, result);
}
@Test
- public void shouldLogInfoOnCreateFileContainer()
+ public void closeShouldReturnNullWhenQueueIsEmpty()
throws IOException, InterruptedException, NoOpenFileContainerException {
- service.addOrUpdate(documentA);
- verify(logger, times(1)).info("Creating new file container");
- service.close();
- verify(logger, times(1)).info("Pushing to file container file-id");
- }
+ when(queue.getLastResponse()).thenReturn(null);
- @Test(expected = NoOpenFileContainerException.class)
- public void shouldThrowExceptionOnCloseIfNoOpenFileContainer()
- throws IOException, InterruptedException, NoOpenFileContainerException {
- service.close();
+ HttpResponse result = service.close();
+
+ assertEquals(null, result);
}
}