diff --git a/CHANGELOG.md b/CHANGELOG.md index 01ca7d697..8951e97f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ See [Conventional Commits](https://conventionalcommits.org) for commit guideline * Replace raw usage of `EthLog.LogResult` with parameterized type to improve type safety (#2252) (https://github.com/LFDT-web3j/web3j/pull/2254) +* Fix handling of expired or missing filters in event subscriptions to prevent "filter not found" errors (#1998) ### Features diff --git a/core/src/main/java/org/web3j/protocol/core/filters/Filter.java b/core/src/main/java/org/web3j/protocol/core/filters/Filter.java index 1ceb2790e..a771a305f 100644 --- a/core/src/main/java/org/web3j/protocol/core/filters/Filter.java +++ b/core/src/main/java/org/web3j/protocol/core/filters/Filter.java @@ -50,7 +50,11 @@ public abstract class Filter { private long blockTime; - private static final String FILTER_NOT_FOUND_PATTERN = "(?i)\\bfilter\\s+not\\s+found\\b"; + private static final Pattern FILTER_NOT_FOUND_REGEX = + Pattern.compile("(?i)\\bfilter\\s+not\\s+found\\b"); + + private int reinstallRetries = 0; + private static final int MAX_REINSTALL_RETRIES = 3; public Filter(Web3j web3j, Callback callback) { this.web3j = web3j; @@ -120,11 +124,12 @@ private void getInitialFilterLogs() { } if (ethLog.hasError()) { - throwException(ethLog.getError()); + handleError(ethLog.getError()); + } else { + reinstallRetries = 0; + process(ethLog.getLogs()); } - process(ethLog.getLogs()); - } catch (IOException e) { throwException(e); } @@ -138,32 +143,44 @@ private void pollFilter(EthFilter ethFilter) { throwException(e); } if (ethLog.hasError()) { - Error error = ethLog.getError(); - String message = error.getMessage(); - switch (error.getCode()) { - case RpcErrors.FILTER_NOT_FOUND: - reinstallFilter(); - break; - default: - if (Pattern.compile(FILTER_NOT_FOUND_PATTERN).matcher(message).find()) - reinstallFilter(); - else throwException(error); - break; - } + handleError(ethLog.getError()); } else { + reinstallRetries = 0; process(ethLog.getLogs()); } } + private void handleError(Error error) { + if (RpcErrors.FILTER_NOT_FOUND == error.getCode()) { + reinstallFilter(); + } else if (error.getMessage() != null + && FILTER_NOT_FOUND_REGEX.matcher(error.getMessage()).find()) { + reinstallFilter(); + } else { + throwException(error); + } + } + protected abstract EthFilter sendRequest() throws IOException; protected abstract void process(List> logResults); private void reinstallFilter() { + if (reinstallRetries >= MAX_REINSTALL_RETRIES) { + log.error( + "Exceeded maximum number of filter re-installations ({})", + MAX_REINSTALL_RETRIES); + throw new FilterException("Exceeded maximum number of filter re-installations"); + } + reinstallRetries++; + log.warn( - "Previously installed filter has not been found, trying to re-install. Filter id: {}", - filterId); - schedule.cancel(false); + "Previously installed filter has not been found, trying to re-install. Filter id: {}, retry: {}", + filterId, + reinstallRetries); + if (schedule != null) { + schedule.cancel(false); + } this.run(scheduledExecutorService, blockTime); } diff --git a/core/src/test/java/org/web3j/protocol/core/filters/FilterRecoveryTest.java b/core/src/test/java/org/web3j/protocol/core/filters/FilterRecoveryTest.java new file mode 100644 index 000000000..75b9e2865 --- /dev/null +++ b/core/src/test/java/org/web3j/protocol/core/filters/FilterRecoveryTest.java @@ -0,0 +1,123 @@ +package org.web3j.protocol.core.filters; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.web3j.protocol.ObjectMapperFactory; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.Web3jService; +import org.web3j.protocol.core.Request; +import org.web3j.protocol.core.methods.request.EthFilter; +import org.web3j.protocol.core.methods.response.EthLog; +import org.web3j.protocol.core.methods.response.EthUninstallFilter; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class FilterRecoveryTest { + + private Web3jService web3jService; + private Web3j web3j; + private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper(); + private final ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); + + @BeforeEach + public void setUp() { + web3jService = mock(Web3jService.class); + web3j = Web3j.build(web3jService, 1000, scheduledExecutorService); + } + + @Test + public void testFilterNotFoundRecoveryInInitialLogs() throws Exception { + org.web3j.protocol.core.methods.response.EthFilter ethFilterResponse = + objectMapper.readValue( + "{\"id\":1,\"jsonrpc\":\"2.0\",\"result\":\"0x1\"}", + org.web3j.protocol.core.methods.response.EthFilter.class); + + EthLog notFoundError = + objectMapper.readValue( + "{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"filter not found\"}}", + EthLog.class); + + EthLog successLog = + objectMapper.readValue("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}", EthLog.class); + + EthUninstallFilter ethUninstallFilter = + objectMapper.readValue( + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":true}", EthUninstallFilter.class); + + // Mock ethNewFilter + when(web3jService.send( + any(Request.class), + eq(org.web3j.protocol.core.methods.response.EthFilter.class))) + .thenReturn(ethFilterResponse); + + // First call to ethGetFilterLogs returns error, subsequent returns success + when(web3jService.send(any(Request.class), eq(EthLog.class))) + .thenReturn(notFoundError) + .thenReturn(successLog); + + when(web3jService.send(any(Request.class), eq(EthUninstallFilter.class))) + .thenReturn(ethUninstallFilter); + + LogFilter filter = new LogFilter(web3j, log -> {}, new EthFilter()); + + // This should not throw FilterException anymore + filter.run(scheduledExecutorService, 100); + + // Verification: Service should have received ethNewFilter request at least twice + verify(web3jService, atLeast(2)) + .send( + any(Request.class), + eq(org.web3j.protocol.core.methods.response.EthFilter.class)); + filter.cancel(); + } + + @Test + public void testFilterNotFoundExceedsRetryLimit() throws Exception { + org.web3j.protocol.core.methods.response.EthFilter ethFilterResponse = + objectMapper.readValue( + "{\"id\":1,\"jsonrpc\":\"2.0\",\"result\":\"0x1\"}", + org.web3j.protocol.core.methods.response.EthFilter.class); + + EthLog notFoundError = + objectMapper.readValue( + "{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"filter not found\"}}", + EthLog.class); + + when(web3jService.send(any(Request.class), eq(EthLog.class))).thenReturn(notFoundError); + + when(web3jService.send( + any(Request.class), + eq(org.web3j.protocol.core.methods.response.EthFilter.class))) + .thenReturn(ethFilterResponse); + + LogFilter filter = new LogFilter(web3j, log -> {}, new EthFilter()); + + try { + filter.run(scheduledExecutorService, 100); + fail("Should have thrown FilterException due to retry limit"); + } catch (FilterException e) { + assertTrue( + e.getMessage().contains("Exceeded maximum number of filter re-installations")); + } + + // Verify it tried 4 times (1 initial + 3 retries) + verify(web3jService, times(4)) + .send( + any(Request.class), + eq(org.web3j.protocol.core.methods.response.EthFilter.class)); + } +}