Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ See [Conventional Commits](https://conventionalcommits.org) for commit guideline

* Replace raw usage of `EthLog.LogResult` with parameterized type to improve type safety (#2252)
(https://github.com/LFDT-web3j/web3j/pull/2254)
* Fix handling of expired or missing filters in event subscriptions to prevent "filter not found" errors (#1998)

### Features

Expand Down
55 changes: 36 additions & 19 deletions core/src/main/java/org/web3j/protocol/core/filters/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public abstract class Filter<T> {

private long blockTime;

private static final String FILTER_NOT_FOUND_PATTERN = "(?i)\\bfilter\\s+not\\s+found\\b";
private static final Pattern FILTER_NOT_FOUND_REGEX =
Pattern.compile("(?i)\\bfilter\\s+not\\s+found\\b");

private int reinstallRetries = 0;
private static final int MAX_REINSTALL_RETRIES = 3;

public Filter(Web3j web3j, Callback<T> callback) {
this.web3j = web3j;
Expand Down Expand Up @@ -120,11 +124,12 @@ private void getInitialFilterLogs() {
}

if (ethLog.hasError()) {
throwException(ethLog.getError());
handleError(ethLog.getError());
} else {
reinstallRetries = 0;
process(ethLog.getLogs());
}

process(ethLog.getLogs());

} catch (IOException e) {
throwException(e);
}
Expand All @@ -138,32 +143,44 @@ private void pollFilter(EthFilter ethFilter) {
throwException(e);
}
if (ethLog.hasError()) {
Error error = ethLog.getError();
String message = error.getMessage();
switch (error.getCode()) {
case RpcErrors.FILTER_NOT_FOUND:
reinstallFilter();
break;
default:
if (Pattern.compile(FILTER_NOT_FOUND_PATTERN).matcher(message).find())
reinstallFilter();
else throwException(error);
break;
}
handleError(ethLog.getError());
} else {
reinstallRetries = 0;
process(ethLog.getLogs());
}
}

private void handleError(Error error) {
if (RpcErrors.FILTER_NOT_FOUND == error.getCode()) {
reinstallFilter();
} else if (error.getMessage() != null
&& FILTER_NOT_FOUND_REGEX.matcher(error.getMessage()).find()) {
reinstallFilter();
} else {
throwException(error);
}
}

protected abstract EthFilter sendRequest() throws IOException;

protected abstract void process(List<EthLog.LogResult<?>> logResults);

private void reinstallFilter() {
if (reinstallRetries >= MAX_REINSTALL_RETRIES) {
log.error(
"Exceeded maximum number of filter re-installations ({})",
MAX_REINSTALL_RETRIES);
throw new FilterException("Exceeded maximum number of filter re-installations");
}
reinstallRetries++;

log.warn(
"Previously installed filter has not been found, trying to re-install. Filter id: {}",
filterId);
schedule.cancel(false);
"Previously installed filter has not been found, trying to re-install. Filter id: {}, retry: {}",
filterId,
reinstallRetries);
if (schedule != null) {
schedule.cancel(false);
}
this.run(scheduledExecutorService, blockTime);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.web3j.protocol.core.filters;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.web3j.protocol.ObjectMapperFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.Web3jService;
import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthLog;
import org.web3j.protocol.core.methods.response.EthUninstallFilter;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class FilterRecoveryTest {

private Web3jService web3jService;
private Web3j web3j;
private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();

@BeforeEach
public void setUp() {
web3jService = mock(Web3jService.class);
web3j = Web3j.build(web3jService, 1000, scheduledExecutorService);
}

@Test
public void testFilterNotFoundRecoveryInInitialLogs() throws Exception {
org.web3j.protocol.core.methods.response.EthFilter ethFilterResponse =
objectMapper.readValue(
"{\"id\":1,\"jsonrpc\":\"2.0\",\"result\":\"0x1\"}",
org.web3j.protocol.core.methods.response.EthFilter.class);

EthLog notFoundError =
objectMapper.readValue(
"{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"filter not found\"}}",
EthLog.class);

EthLog successLog =
objectMapper.readValue("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":[]}", EthLog.class);

EthUninstallFilter ethUninstallFilter =
objectMapper.readValue(
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":true}", EthUninstallFilter.class);

// Mock ethNewFilter
when(web3jService.send(
any(Request.class),
eq(org.web3j.protocol.core.methods.response.EthFilter.class)))
.thenReturn(ethFilterResponse);

// First call to ethGetFilterLogs returns error, subsequent returns success
when(web3jService.send(any(Request.class), eq(EthLog.class)))
.thenReturn(notFoundError)
.thenReturn(successLog);

when(web3jService.send(any(Request.class), eq(EthUninstallFilter.class)))
.thenReturn(ethUninstallFilter);

LogFilter filter = new LogFilter(web3j, log -> {}, new EthFilter());

// This should not throw FilterException anymore
filter.run(scheduledExecutorService, 100);

// Verification: Service should have received ethNewFilter request at least twice
verify(web3jService, atLeast(2))
.send(
any(Request.class),
eq(org.web3j.protocol.core.methods.response.EthFilter.class));
filter.cancel();
}

@Test
public void testFilterNotFoundExceedsRetryLimit() throws Exception {
org.web3j.protocol.core.methods.response.EthFilter ethFilterResponse =
objectMapper.readValue(
"{\"id\":1,\"jsonrpc\":\"2.0\",\"result\":\"0x1\"}",
org.web3j.protocol.core.methods.response.EthFilter.class);

EthLog notFoundError =
objectMapper.readValue(
"{\"jsonrpc\":\"2.0\",\"id\":1,\"error\":{\"code\":-32000,\"message\":\"filter not found\"}}",
EthLog.class);

when(web3jService.send(any(Request.class), eq(EthLog.class))).thenReturn(notFoundError);

when(web3jService.send(
any(Request.class),
eq(org.web3j.protocol.core.methods.response.EthFilter.class)))
.thenReturn(ethFilterResponse);

LogFilter filter = new LogFilter(web3j, log -> {}, new EthFilter());

try {
filter.run(scheduledExecutorService, 100);
fail("Should have thrown FilterException due to retry limit");
} catch (FilterException e) {
assertTrue(
e.getMessage().contains("Exceeded maximum number of filter re-installations"));
}

// Verify it tried 4 times (1 initial + 3 retries)
verify(web3jService, times(4))
.send(
any(Request.class),
eq(org.web3j.protocol.core.methods.response.EthFilter.class));
}
}
Loading