Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 168 additions & 136 deletions besu-plugin/core/src/main/java/samba/BesuSambaPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.auto.service.AutoService;
import com.sun.tools.sjavac.Log;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.ServiceManager;
import org.hyperledger.besu.plugin.services.BesuConfiguration;
Expand All @@ -14,150 +18,178 @@
import org.hyperledger.besu.plugin.services.PicoCLIOptions;
import org.hyperledger.besu.plugin.services.RpcEndpointService;
import org.hyperledger.besu.plugin.services.metrics.MetricCategoryRegistry;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import samba.api.HistoryService;
import samba.rpc.GetBlockBodyByBlockHash;
import samba.rpc.GetBlockHeaderByBlockHash;
import samba.rpc.GetTransactionReceiptByBlockHash;

@AutoService(BesuPlugin.class)
public class BesuSambaPlugin implements BesuPlugin {
private static final Logger LOG = LoggerFactory.getLogger(BesuSambaPlugin.class);
public static final String PLUGIN_NAME = "samba";
private static final String CLI_OPTIONS_PREFIX = "--plugin-" + PLUGIN_NAME + "-";

private ServiceManager serviceManager;
private MetricCategoryRegistry metricCategoryRegistryService;
private BesuConfiguration besuConfigurationService;
private PicoCLIOptions picoCLIOptionsService;
protected MetricsSystem metricsSystemService;
private RpcEndpointService rpcEndpointService;

private static final AtomicBoolean registrationTaskDone = new AtomicBoolean(false);
private static final AtomicBoolean startingTasksDone = new AtomicBoolean(false);

@CommandLine.Option(names = CLI_OPTIONS_PREFIX + "host")
public String host = "0.0.0.0";

@CommandLine.Option(names = {"--plugin-samba-logging"})
public String loggingLevel;

@CommandLine.Option(names = {"--plugin-samba-data-path"})
public String dataPath;

private final CompletableFuture<SambaSDK> sambaSDKFuture = new CompletableFuture<>();

@Override
public void register(ServiceManager serviceManager) {
LOG.debug("Registering Samba plugin");
this.serviceManager = serviceManager;
if (registrationTaskDone.compareAndSet(false, true)) {
this.metricCategoryRegistryService =
this.getBesuService(this.serviceManager, MetricCategoryRegistry.class);
this.besuConfigurationService =
this.getBesuService(this.serviceManager, BesuConfiguration.class);
this.picoCLIOptionsService = this.getBesuService(this.serviceManager, PicoCLIOptions.class);
// TODO create a function
this.picoCLIOptionsService.addPicoCLIOptions(PLUGIN_NAME, this);
this.rpcEndpointService = this.getBesuService(this.serviceManager, RpcEndpointService.class);
this.starRpcEndpoints(); // TODO use a completable future till samba is fully initialized.
public class BesuSambaPlugin implements BesuPlugin, HistoryService {
private static final Logger LOG = LoggerFactory.getLogger(BesuSambaPlugin.class);
public static final String PLUGIN_NAME = "samba";
private static final String CLI_OPTIONS_PREFIX = "--plugin-" + PLUGIN_NAME + "-";

private ServiceManager serviceManager;
private MetricCategoryRegistry metricCategoryRegistryService;
private BesuConfiguration besuConfigurationService;
private PicoCLIOptions picoCLIOptionsService;
protected MetricsSystem metricsSystemService;
private RpcEndpointService rpcEndpointService;

private static final AtomicBoolean registrationTaskDone = new AtomicBoolean(false);
private static final AtomicBoolean startingTasksDone = new AtomicBoolean(false);

@CommandLine.Option(names = CLI_OPTIONS_PREFIX + "host")
public String host = "0.0.0.0";

@CommandLine.Option(names = {"--plugin-samba-logging"})
public String loggingLevel;

@CommandLine.Option(names = {"--plugin-samba-data-path"})
public String dataPath;

private final CompletableFuture<SambaSDK> sambaSDKFuture = new CompletableFuture<>();

@Override
public void register(ServiceManager serviceManager) {
LOG.debug("Registering Samba plugin");
this.serviceManager = serviceManager;
if (registrationTaskDone.compareAndSet(false, true)) {
this.metricCategoryRegistryService =
this.getBesuService(this.serviceManager, MetricCategoryRegistry.class);
this.besuConfigurationService =
this.getBesuService(this.serviceManager, BesuConfiguration.class);
this.picoCLIOptionsService = this.getBesuService(this.serviceManager, PicoCLIOptions.class);
// TODO create a function
this.picoCLIOptionsService.addPicoCLIOptions(PLUGIN_NAME, this);
this.rpcEndpointService = this.getBesuService(this.serviceManager, RpcEndpointService.class);
this.starRpcEndpoints(); // TODO use a completable future till samba is fully initialized.
}
}
}

@Override
public Optional<String> getName() {
return Optional.of(PLUGIN_NAME);
}

@Override
public void start() {
LOG.info("Starting Samba plugin");
if (startingTasksDone.compareAndSet(false, true)) {
SambaSDK sdk = this.initSamba();
sambaSDKFuture.complete(sdk);
this.metricsSystemService = this.getBesuService(this.serviceManager, MetricsSystem.class);

@Override
public Optional<String> getName() {
return Optional.of(PLUGIN_NAME);
}
}

@Override
public void stop() {
LOG.info("Stopping Samba plugin");
registrationTaskDone.set(false);
startingTasksDone.set(false);
this.besuConfigurationService = null;
this.metricCategoryRegistryService = null;
this.rpcEndpointService = null;
this.picoCLIOptionsService = null;
// TODO should we do something with Samba | its db ?
// TODO call samba to stop
}

private SambaSDK initSamba() {
try {
String[] options = {
"--portal-subnetworks=history-network",
"--p2p-advertised-ip=" + host,
"--disable-json-rpc-server",
"--disable-rest--server",
"--logging=" + loggingLevel,
"--data-path=" + dataPath
};
return Samba.init(options);

} catch (Exception e) {
LOG.error("Halting Besu startup: exception in plugin startup: ", e);
e.printStackTrace();
System.exit(1);
return null; // unreachable, but required to compile

@Override
public void start() {
LOG.info("Starting Samba plugin");
if (startingTasksDone.compareAndSet(false, true)) {
SambaSDK sdk = this.initSamba();
sambaSDKFuture.complete(sdk);
this.metricsSystemService = this.getBesuService(this.serviceManager, MetricsSystem.class);
}
}

@Override
public void stop() {
LOG.info("Stopping Samba plugin");
registrationTaskDone.set(false);
startingTasksDone.set(false);
this.besuConfigurationService = null;
this.metricCategoryRegistryService = null;
this.rpcEndpointService = null;
this.picoCLIOptionsService = null;
// TODO should we do something with Samba | its db ?
// TODO call samba to stop
}

private SambaSDK initSamba() {
try {
String[] options = {
"--portal-subnetworks=history-network",
"--p2p-advertised-ip=" + host,
"--disable-json-rpc-server",
"--disable-rest--server",
"--logging=" + loggingLevel,
"--data-path=" + dataPath
};
return Samba.init(options);

} catch (Exception e) {
LOG.error("Halting Besu startup: exception in plugin startup: ", e);
e.printStackTrace();
System.exit(1);
return null; // unreachable, but required to compile
}
}

private void starRpcEndpoints() {
var methods = List.of(new GetBlockBodyByBlockHash(this.sambaSDKFuture),
new GetBlockHeaderByBlockHash(this.sambaSDKFuture),
new GetTransactionReceiptByBlockHash(this.sambaSDKFuture),
new GetTransactionReceiptByBlockHash(this.sambaSDKFuture));
methods.forEach(
method -> {
LOG.info(
"Registering RPC plugin endpoint {}_{}", method.getNamespace(), method.getName());
rpcEndpointService.registerRPCEndpoint(
method.getNamespace(), method.getName(), method::execute);
});
}

private <T extends BesuService> T getBesuService(ServiceManager context, Class<T> clazz) {
return context
.getService(clazz)
.orElseThrow(
() ->
new RuntimeException(
"Unable to find given Besu service. Please ensure %s is registered."
.formatted(clazz.getName())));
}


@Override
public Optional<BlockHeader> getBlockHeaderByBlockHash(Hash blockHash) {
try {
return this.sambaSDKFuture
.get().historyAPI().flatMap(history -> history.getBlockHeaderByBlockHash(blockHash));
} catch (InterruptedException | ExecutionException e) {
LOG.debug("Error when executing GetBlockHeaderByBlockHash operation");
}
return Optional.empty();
}

@Override
public Optional<BlockBody> getBlockBodyByBlockHash(Hash blockHash) {
try {
return this.sambaSDKFuture.get()
.historyAPI()
.flatMap(history -> history.getBlockBodyByBlockHash(blockHash));
} catch (InterruptedException | ExecutionException e) {
LOG.debug("Error when executing GetBlockBodyByBlockHash operation");
}
return Optional.empty();
}

@Override
public Optional<List<TransactionReceipt>> getTransactionReceiptByBlockHash(Hash blockHash) {
try {
return this.sambaSDKFuture.get().historyAPI().flatMap(history -> history.getTransactionReceiptByBlockHash(blockHash));
} catch (InterruptedException | ExecutionException e) {
LOG.debug("Error when executing GetTransactionReceiptByBlockHash operation");
}
return Optional.empty();
}
}

private void starRpcEndpoints() {
var methods = List.of(new GetBlockBodyByBlockHash(this.sambaSDKFuture));
methods.forEach(
method -> {
LOG.info(
"Registering RPC plugin endpoint {}_{}", method.getNamespace(), method.getName());
rpcEndpointService.registerRPCEndpoint(
method.getNamespace(), method.getName(), method::execute);
});
}

private <T extends BesuService> T getBesuService(ServiceManager context, Class<T> clazz) {
return context
.getService(clazz)
.orElseThrow(
() ->
new RuntimeException(
"Unable to find given Besu service. Please ensure %s is registered."
.formatted(clazz.getName())));
}
}

/*
@Override
public Optional<BlockHeader> getBlockHeaderByBlockHash(Hash blockHash) {
return this.sambaSDK
.historyAPI()
.flatMap(history -> history.getBlockHeaderByBlockHash(blockHash));
}

@Override
public Optional<BlockBody> getBlockBodyByBlockHash(Hash blockHash) {
return this.sambaSDK
.historyAPI()
.flatMap(history -> history.getBlockBodyByBlockHash(blockHash));
}

@Override
public Optional<List<TransactionReceipt>> getTransactionReceiptByBlockHash(Hash blockHash) {
return this.sambaSDK.historyAPI().flatMap(history -> history.getReceiptByBlockHash(blockHash));
}

@Override
public Optional<BlockHeader> getBlockHeaderByBlockNumber(long blockNumber) {
return this.sambaSDK
.historyAPI()
.flatMap(history -> history.getBlockHeaderByBlockNumber(blockNumber));
}
@Override
public Optional<BlockHeader> getBlockHeaderByBlockNumber(String blockNumber) {
try {
return this.sambaSDKFuture.get()
.historyAPI()
.flatMap(history -> history.getBlockHeaderByBlockNumber(blockNumber));
} catch (InterruptedException | ExecutionException e) {
LOG.debug("Error when executing GetBlockHeaderByBlockNumber operation");
}
return Optional.empty();
}
}
*/

3 changes: 2 additions & 1 deletion besu-plugin/core/src/main/java/samba/api/HistoryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public interface HistoryService {

Optional<List<TransactionReceipt>> getTransactionReceiptByBlockHash(Hash blockHash);

Optional<BlockHeader> getBlockHeaderByBlockNumber(long blockNumber);
//The characters in the string must all be decimal digits
Optional<BlockHeader> getBlockHeaderByBlockNumber(String blockNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.JsonRpcParameter;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.plugin.services.rpc.PluginRpcRequest;
import samba.BesuSambaPlugin;
import samba.SambaSDK;
Expand All @@ -25,7 +25,7 @@ public String getNamespace() {

@Override
public String getName() {
return RpcMethod.GET_BLOCK_BODY_BY_HASH.getMethodName();
return RpcMethod.GET_BLOCK_BODY_BY_BLOCK_HASH.getMethodName();
}

@Override
Expand All @@ -36,8 +36,8 @@ public Object execute(PluginRpcRequest rpcRequest) {
return this.sambaSDKFuture
.get()
.historyAPI()
.flatMap(history -> history.getBlockHeaderByBlockHash(blockHash))
.map(BlockHeader::toString)
.flatMap(history -> history.getBlockBodyByBlockHash(blockHash))
.map(BlockBody::toString)
.orElse("");
} catch (JsonRpcParameter.JsonRpcParameterException
| ExecutionException
Expand Down
Loading
Loading