Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions coordinator/clients/prover-client/riscv-client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'net.consensys.zkevm.kotlin-library-conventions'
}

dependencies {
implementation project(':coordinator:core-interfaces')
api project(':coordinator:clients:prover-client:serialization')
// Reused for ProverFileNameProvider, FileNameSuffixes and FileBasedProverConfig used by the file-based transport.
implementation project(':coordinator:clients:prover-client:file-based-client')
implementation project(':coordinator:utilities')
implementation project(':jvm-libs:generic:extensions:futures')
implementation project(':jvm-libs:generic:extensions:kotlin')
implementation project(':jvm-libs:linea:core:domain-models')
implementation project(':coordinator:ethereum:models-helper')

implementation "io.vertx:vertx-core"

// Block RLP encoding (linea.encoding.BlockRLPEncoder)
implementation project(':jvm-libs:linea:besu-libs')

implementation "com.fasterxml.jackson.core:jackson-annotations:${libs.versions.jackson.get()}"
implementation "com.fasterxml.jackson.core:jackson-databind:${libs.versions.jackson.get()}"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${libs.versions.jackson.get()}"
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${libs.versions.jackson.get()}")

testImplementation testFixtures(project(':coordinator:core-interfaces'))
testImplementation testFixtures(project(':jvm-libs:linea:clients:interfaces'))
testImplementation testFixtures(project(':jvm-libs:linea:core:domain-models'))
testImplementation "io.vertx:vertx-junit5"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package linea.coordinator.clients.prover.riscv

import com.github.michaelbull.result.Err
import com.github.michaelbull.result.getOrElse
import com.github.michaelbull.result.map
import io.vertx.core.Vertx
import linea.coordinator.clients.prover.FileBasedProverConfig
import linea.coordinator.clients.prover.GenericFileBasedProverClient
import linea.coordinator.clients.prover.ProverFileNameProvider
import linea.domain.ProofIndex
import linea.error.ErrorResponse
import linea.fileio.FileMonitor
import linea.fileio.FileReader
import linea.fileio.FileWriter
import linea.fileio.inProgressFilePattern
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.nio.file.Path

/**
* File-based [ProverProofTransport]: the request DTO is written as a JSON file into [FileBasedProverConfig.requestsDirectory]
* and the response is read from a JSON file the prover writes into [FileBasedProverConfig.responsesDirectory]. This
* reproduces the behaviour of `GenericFileBasedProverClient` behind the transport abstraction.
*/
class FileBasedProverProofTransport<RequestDto : Any, ResponseDto, TProofIndex : ProofIndex>(
private val config: FileBasedProverConfig,
vertx: Vertx,
private val fileWriter: FileWriter,
private val fileReader: FileReader<ResponseDto>,
private val requestFileNameProvider: ProverFileNameProvider<TProofIndex>,
private val responseFileNameProvider: ProverFileNameProvider<TProofIndex>,
private val fileMonitor: FileMonitor = FileMonitor(
vertx,
FileMonitor.Config(config.pollingInterval, config.pollingTimeout),
),
private val log: Logger = LogManager.getLogger(FileBasedProverProofTransport::class.java),
) : ProverProofTransport<RequestDto, ResponseDto, TProofIndex> {

init {
GenericFileBasedProverClient.createDirectoryIfNotExists(config.requestsDirectory, log)
GenericFileBasedProverClient.createDirectoryIfNotExists(config.responsesDirectory, log)
}

private fun responseFilePath(proofIndex: TProofIndex): Path =
config.responsesDirectory.resolve(responseFileNameProvider.getFileName(proofIndex))

override fun isRequestAlreadySubmitted(proofIndex: TProofIndex): SafeFuture<Boolean> {
val requestFileName = requestFileNameProvider.getFileName(proofIndex)
return fileMonitor.findFile(
directory = config.requestsDirectory,
pattern = inProgressFilePattern(requestFileName, config.inprogressProvingSuffixPattern),
).thenApply { it != null }
}

override fun submitRequest(proofIndex: TProofIndex, requestDto: RequestDto): SafeFuture<Unit> {
val requestFilePath = config.requestsDirectory.resolve(requestFileNameProvider.getFileName(proofIndex))
log.trace("Creating proof request file. file={}", requestFilePath)
return fileWriter.write(requestDto, requestFilePath, config.inprogressRequestWritingSuffix)
.thenApply {
log.trace("Created proof request file. file={}", requestFilePath)
Unit
}
}

override fun findResponse(proofIndex: TProofIndex): SafeFuture<ResponseDto?> {
val responseFilePath = responseFilePath(proofIndex)
return fileMonitor.fileExists(responseFilePath)
.thenCompose { exists ->
if (exists) {
parseResponse(responseFilePath).thenApply { it }
} else {
SafeFuture.completedFuture(null)
}
}
}

override fun awaitResponse(proofIndex: TProofIndex): SafeFuture<ResponseDto> {
val responseFilePath = responseFilePath(proofIndex)
return fileMonitor.monitor(responseFilePath)
.thenCompose { result ->
if (result is Err) {
when (result.error) {
FileMonitor.ErrorType.TIMED_OUT ->
SafeFuture.failedFuture(RuntimeException("Timeout waiting for response file=$responseFilePath"))
}
} else {
parseResponse(responseFilePath)
}
}
}

private fun parseResponse(responseFilePath: Path): SafeFuture<ResponseDto> {
return fileReader.read(responseFilePath)
.thenCompose { result ->
result
.map { SafeFuture.completedFuture(it) }
.getOrElse { errorResponse: ErrorResponse<FileReader.ErrorType> ->
when (errorResponse.type) {
FileReader.ErrorType.PARSING_ERROR ->
log.error("Failed to read response file={} errorMessage={}", responseFilePath, errorResponse.message)
}
SafeFuture.failedFuture(errorResponse.asException())
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package linea.coordinator.clients.prover.riscv

import linea.clients.ProverProofRequestCreator
import linea.clients.ProverProofResponseChecker
import linea.domain.ProofIndex
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Supplier

/**
* Generic prover client for the RISC-V provers.
*
* It mirrors the responsibilities of `GenericFileBasedProverClient` (it implements both [ProverProofResponseChecker]
* and [ProverProofRequestCreator] and exposes [requestProof]) but is NOT tied to a file-based approach: the
* submit/find/await mechanics are delegated to an injected [ProverProofTransport]. The transport decides whether the
* request and response travel through JSON files on disk or through RESTful calls to a remote prover service.
*
* The client itself only knows how to:
* - derive the [TProofIndex] from a domain request ([proofIndexProvider]);
* - map a domain request to a serializable request DTO ([requestMapper]);
* - map a response DTO back to the domain response ([parseResponse], by default delegating to [responseMapper]).
*
* @param Request domain proof request type.
* @param Response domain proof response type.
* @param RequestDto serializable request payload produced by [requestMapper].
* @param ResponseDto response payload returned by the transport and consumed by [responseMapper].
* @param TProofIndex proof index uniquely identifying a request/response pair.
*/
open class GenericRiscVProverClient<Request, Response, RequestDto, ResponseDto, TProofIndex>(
private val transport: ProverProofTransport<RequestDto, ResponseDto, TProofIndex>,
private val proofIndexProvider: (Request) -> TProofIndex,
private val requestMapper: (Request) -> SafeFuture<RequestDto>,
private val responseMapper: (TProofIndex, ResponseDto) -> Response,
private val proofTypeLabel: String,
private val log: Logger = LogManager.getLogger(GenericRiscVProverClient::class.java),
) : ProverProofResponseChecker<Response, TProofIndex>,
ProverProofRequestCreator<Request, TProofIndex>,
Supplier<Number>
where TProofIndex : ProofIndex, Request : Any, RequestDto : Any {

private val responsesWaiting = AtomicLong(0)
override fun get(): Long = responsesWaiting.get()

override fun findProofResponse(proofIndex: TProofIndex): SafeFuture<Response?> {
log.trace("Checking if response is available. {}={}", proofTypeLabel, proofIndex)
return transport.findResponse(proofIndex)
.thenApply { responseDto -> responseDto?.let { parseResponse(it, proofIndex) } }
}

override fun createProofRequest(proofRequest: Request): SafeFuture<TProofIndex> {
val proofIndex = proofIndexProvider(proofRequest)
log.debug(
"Creating proof: {}={}, proofIndexProvider={}",
proofTypeLabel,
proofIndex,
proofIndexProvider.toString(),
)
return transport.isRequestAlreadySubmitted(proofIndex)
.thenCompose { alreadySubmitted ->
if (alreadySubmitted) {
log.debug("request already submitted: {}={}", proofTypeLabel, proofIndex)
SafeFuture.completedFuture(proofIndex)
} else {
requestMapper(proofRequest)
.thenCompose { requestDto ->
log.trace("Submitting proof request. {}={}", proofTypeLabel, proofIndex)
transport.submitRequest(proofIndex, requestDto)
.thenApply {
log.trace("Submitted proof request. {}={}", proofTypeLabel, proofIndex)
proofIndex
}
}
}
}
}

fun requestProof(proofRequest: Request): SafeFuture<Response> {
val proofIndex = proofIndexProvider(proofRequest)
log.debug(
"Requesting proof: {}={}, proofIndexProvider={}",
proofTypeLabel,
proofIndex,
proofIndexProvider.toString(),
)

return findProofResponse(proofIndex)
.thenCompose { response ->
if (response != null) {
SafeFuture.completedFuture(response)
} else {
responsesWaiting.incrementAndGet()
createProofRequest(proofRequest)
.thenCompose { transport.awaitResponse(proofIndex) }
.thenApply { responseDto ->
responsesWaiting.decrementAndGet()
parseResponse(responseDto, proofIndex)
}
}
}
.whenException {
log.error(
"Failed to get proof: {}={} errorMessage={}",
proofTypeLabel,
proofIndex,
it.message,
it,
)
}
}

/**
* Parses a response DTO obtained from the transport into the domain response. Overridable for proof types whose
* response is derived from the [proofIndex] rather than from the transport payload (e.g. execution).
*/
protected open fun parseResponse(responseDto: ResponseDto, proofIndex: TProofIndex): Response {
return responseMapper(proofIndex, responseDto)
}
}
Loading
Loading