diff --git a/packages/runtime/src/full-service.ts b/packages/runtime/src/full-service.ts index f96e74d5..e5dcf5a2 100644 --- a/packages/runtime/src/full-service.ts +++ b/packages/runtime/src/full-service.ts @@ -1,51 +1,16 @@ import { create } from '@bufbuild/protobuf' import { type HandlerContext, type ServiceImpl } from '@connectrpc/connect' import { - type Empty, ExecutionConfigSchema, - type PreprocessStreamRequest, - type ProcessBindingsRequest, type ProcessConfigRequest, - Processor, ProcessorV3, type ProcessStreamRequest, type StartRequest, type UpdateTemplatesRequest } from '@sentio/protos' -import { ProcessorServiceImpl } from './service.js' import { ProcessorServiceImplV3 } from './service-v3.js' import { GLOBAL_CONFIG } from './global-config.js' -export class FullProcessorServiceImpl implements ServiceImpl { - constructor(readonly instance: ProcessorServiceImpl) {} - - async getConfig(request: ProcessConfigRequest, context: HandlerContext) { - const config = await this.instance.getConfig(request, context) - config.executionConfig = create(ExecutionConfigSchema, GLOBAL_CONFIG.execution) - return config - } - - async start(request: StartRequest, context: HandlerContext) { - return await this.instance.start(request, context) - } - - async stop(request: Empty, context: HandlerContext) { - return await this.instance.stop(request, context) - } - - async processBindings(request: ProcessBindingsRequest, context: HandlerContext) { - return await this.instance.processBindings(request, context) - } - - async *processBindingsStream(requests: AsyncIterable, context: HandlerContext) { - yield* this.instance.processBindingsStream(requests, context) - } - - async *preprocessBindingsStream(requests: AsyncIterable, context: HandlerContext) { - yield* this.instance.preprocessBindingsStream(requests, context) - } -} - export class FullProcessorServiceV3Impl implements ServiceImpl { constructor(readonly instance: ProcessorServiceImplV3) {} diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index e26fcbe8..5fc27b6c 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -3,7 +3,7 @@ export * from './state.js' export * from './utils.js' export * from './endpoints.js' export * from './chain-config.js' -export * from './service.js' +export * from './service-v3.js' export { GLOBAL_CONFIG, type GlobalConfig } from './global-config.js' export * from './db-context.js' export * from './provider.js' diff --git a/packages/runtime/src/processor-runner.ts b/packages/runtime/src/processor-runner.ts index 8309669a..04c6d919 100644 --- a/packages/runtime/src/processor-runner.ts +++ b/packages/runtime/src/processor-runner.ts @@ -10,14 +10,13 @@ import { Session } from 'node:inspector/promises' import { fork, ChildProcess } from 'child_process' import { fileURLToPath } from 'url' -import { ProcessorServiceImpl } from './service.js' import { configureEndpoints } from './endpoints.js' -import { FullProcessorServiceImpl, FullProcessorServiceV3Impl } from './full-service.js' +import { FullProcessorServiceV3Impl } from './full-service.js' import { setupLogger } from './logger.js' import { setupOTLP } from './otlp.js' import { ActionServer } from './action-server.js' -import { Processor, ProcessorV3 } from '@sentio/protos' +import { ProcessorV3 } from '@sentio/protos' import { ProcessorServiceImplV3 } from './service-v3.js' import { dirname, join } from 'path' import { program, ProcessorRuntimeOptions } from './processor-runner-program.js' @@ -67,7 +66,6 @@ if (!isChildProcess) { let server: any let processorHttp2Server: http2.Http2Server | undefined -let baseService: ProcessorServiceImpl let httpServer: http.Server | undefined const loader = async () => { @@ -82,15 +80,9 @@ if (options.startActionServer) { } else { const shutdown = () => processorHttp2Server?.close() - // for V2 - baseService = new ProcessorServiceImpl(loader, options, shutdown) - const serviceV2 = new FullProcessorServiceImpl(baseService) - - // for V3 const serviceV3 = new FullProcessorServiceV3Impl(new ProcessorServiceImplV3(loader, options, shutdown)) const routes = (router: ConnectRouter) => { - router.service(Processor, serviceV2) router.service(ProcessorV3, serviceV3) } @@ -208,9 +200,6 @@ process }) .on('uncaughtException', (err) => { console.error('Uncaught Exception, please checking if await is properly used', err) - if (baseService) { - baseService.unhandled = err - } // shutdownServers(1) }) .on('unhandledRejection', (reason, _p) => { @@ -219,9 +208,6 @@ process return } console.error('Unhandled Rejection, please checking if await is properly', reason) - if (baseService) { - baseService.unhandled = reason as Error - } // shutdownServers(1) }) diff --git a/packages/runtime/src/seq-mode.test.ts b/packages/runtime/src/seq-mode.test.ts deleted file mode 100644 index 9ab33a6c..00000000 --- a/packages/runtime/src/seq-mode.test.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { before, describe, test } from 'node:test' -import { ProcessorServiceImpl } from './service.js' -import { FullProcessorServiceImpl } from './full-service.js' -import { type HandlerContext } from '@connectrpc/connect' -import { - type DataBinding, - DataBindingSchema, - HandlerType, - type ProcessResult, - ProcessResultSchema, - ProcessBindingsRequestSchema, - StartRequestSchema -} from '@sentio/protos' -import { create } from '@bufbuild/protobuf' -import { Plugin, PluginManager } from './plugin.js' -import { assert } from 'chai' -import { GLOBAL_CONFIG } from './global-config.js' -import { getTestConfig } from './processor-runner-program.js' - -export const TEST_CONTEXT = {} as HandlerContext - -let testRequest: DataBinding - -class TestPlugin extends Plugin { - async processBinding(request: DataBinding): Promise { - testRequest = request - return create(ProcessResultSchema, {}) - } - supportedHandlers = [HandlerType.ETH_BLOCK] -} - -describe('Test seq mode', () => { - const baseService = new ProcessorServiceImpl(async () => { - PluginManager.INSTANCE.plugins = [] - PluginManager.INSTANCE.register(new TestPlugin()) - }, getTestConfig()) - const service = new FullProcessorServiceImpl(baseService) - - before(async () => { - GLOBAL_CONFIG.execution.sequential = true - - await service.start(create(StartRequestSchema, { templateInstances: [] }), TEST_CONTEXT) - }) - - test('Check block dispatch in seq', async () => { - const binding1 = create(DataBindingSchema, { - data: { - value: { - case: 'ethBlock', - value: { rawBlock: JSON.stringify({ number: '0x1', timestamp: '0x65ed3a46' }) } - } - }, - handlerType: HandlerType.ETH_BLOCK, - handlerIds: [0], - chainId: '1' - }) - - const binding2 = create(DataBindingSchema, { - data: { - value: { - case: 'ethBlock', - value: { rawBlock: JSON.stringify({ number: '0x2', timestamp: '0x65ed3b46' }) } - } - }, - handlerType: HandlerType.ETH_BLOCK, - handlerIds: [0], - chainId: '1' - }) - - const binding3 = create(DataBindingSchema, { - data: { - value: { - case: 'ethBlock', - value: { rawBlock: JSON.stringify({ number: '0x1', timestamp: '0x65ed3c46' }) } - } - }, - handlerType: HandlerType.ETH_BLOCK, - handlerIds: [0], - chainId: '1' - }) - - await service.processBindings( - create(ProcessBindingsRequestSchema, { bindings: [binding2, binding1, binding3] }), - TEST_CONTEXT - ) - assert(testRequest.handlerType === HandlerType.ETH_BLOCK) - }) -}) diff --git a/packages/runtime/src/service-v3.ts b/packages/runtime/src/service-v3.ts index 96e3d6da..35319319 100644 --- a/packages/runtime/src/service-v3.ts +++ b/packages/runtime/src/service-v3.ts @@ -17,10 +17,9 @@ import { PluginManager } from './plugin.js' import { Subject } from 'rxjs' import { from } from 'ix/asynciterable' import { withAbort } from 'ix/asynciterable/operators' -import { errorString } from './utils.js' +import { errorString, recordRuntimeInfo } from './utils.js' import { processMetrics } from './metrics.js' -import { recordRuntimeInfo } from './service.js' import { DataBindingContext } from './db-context.js' import { freezeGlobalConfig } from './global-config.js' import { ProcessorRuntimeOptions } from './processor-runner-program.js' @@ -168,6 +167,8 @@ export class ProcessorServiceImplV3 implements ServiceImpl { PluginManager.INSTANCE.processBinding(binding, undefined, context) .then(async (result) => { await context.awaitPendings() + recordRuntimeInfo(result, binding.handlerType) + const timeseriesResult = result.timeseriesResult for (let i = 0; i < timeseriesResult.length; i += TIME_SERIES_RESULT_BATCH_SIZE) { const batch = timeseriesResult.slice(i, i + TIME_SERIES_RESULT_BATCH_SIZE) @@ -188,7 +189,6 @@ export class ProcessorServiceImplV3 implements ServiceImpl { value: WRITE_V2_EVENT_LOGS ? otherResults : create(ProcessResultSchema, { states: result.states }) } }) - recordRuntimeInfo(result, binding.handlerType) }) .catch((e) => { console.error(e, e.stack) diff --git a/packages/runtime/src/service.test.ts b/packages/runtime/src/service.test.ts deleted file mode 100644 index 0312ce34..00000000 --- a/packages/runtime/src/service.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { before, describe, test } from 'node:test' -import { ProcessorServiceImpl } from './service.js' -import { FullProcessorServiceImpl } from './full-service.js' -import { type HandlerContext } from '@connectrpc/connect' -import { - type DataBinding, - HandlerType, - type ProcessResult, - ProcessResultSchema, - StartRequestSchema -} from '@sentio/protos' -import { create } from '@bufbuild/protobuf' -import { Plugin, PluginManager } from './plugin.js' -import { getTestConfig } from './processor-runner-program.js' - -export const TEST_CONTEXT = {} as HandlerContext - -// TODO use mock -let testRequest: DataBinding - -class TestPlugin extends Plugin { - async processBinding(request: DataBinding): Promise { - testRequest = request - return create(ProcessResultSchema, {}) - } - supportedHandlers = [HandlerType.UNKNOWN, HandlerType.APT_EVENT] -} - -describe('Test Service Compatibility', () => { - const baseService = new ProcessorServiceImpl(async () => { - PluginManager.INSTANCE.plugins = [] - PluginManager.INSTANCE.register(new TestPlugin()) - }, getTestConfig()) - const service = new FullProcessorServiceImpl(baseService) - - before(async () => { - await service.start(create(StartRequestSchema, { templateInstances: [] }), TEST_CONTEXT) - }) - - test('Check transaction dispatch', async () => { - // const binding1 = create(DataBindingSchema, { - // data: { value: { case: 'ethBlock', value: { rawBlock: JSON.stringify({ number: '0x1' }) } } }, - // handlerType: HandlerType.UNKNOWN, - // handlerIds: [0] - // }) - // - // await service.processBindings(create(ProcessBindingsRequestSchema, { bindings: [binding1] }), TEST_CONTEXT) - // assert(testRequest.handlerType === HandlerType.UNKNOWN) - }) -}) diff --git a/packages/runtime/src/service.ts b/packages/runtime/src/service.ts deleted file mode 100644 index bc60a1f1..00000000 --- a/packages/runtime/src/service.ts +++ /dev/null @@ -1,556 +0,0 @@ -import { ConnectError, Code, type HandlerContext, type ServiceImpl } from '@connectrpc/connect' -import { from } from 'ix/Ix.asynciterable' -import { withAbort } from 'ix/Ix.asynciterable.operators' - -import { - type DataBinding, - type Empty, - EmptySchema, - type EthCallParam, - HandlerType, - type PreparedData, - PreparedDataSchema, - type PreprocessResult, - type PreprocessStreamRequest, - ProcessBindingResponseSchema, - type ProcessBindingsRequest, - type ProcessConfigRequest, - type ProcessConfigResponse, - ProcessConfigResponseSchema, - Processor, - type ProcessResult, - ProcessResultSchema, - RuntimeInfoSchema, - type ProcessStreamRequest, - ProcessStreamResponseSchema, - PreprocessStreamResponseSchema, - type StartRequest -} from '@sentio/protos' -import { create, type MessageInitShape } from '@bufbuild/protobuf' - -import { PluginManager } from './plugin.js' -import { errorString, makeEthCallKey, mergeProcessResults } from './utils.js' -import { freezeGlobalConfig, GLOBAL_CONFIG } from './global-config.js' - -import { StoreContext } from './db-context.js' -import { Subject } from 'rxjs' -import { getProvider } from './provider.js' -import { EthChainId } from '@sentio/chain' -import { Provider } from 'ethers' -import { decodeMulticallResult, encodeMulticallData, getMulticallAddress, Multicall3Call } from './multicall.js' - -import { processMetrics } from './metrics.js' -import { ProcessorRuntimeOptions } from './processor-runner-program.js' - -const { process_binding_count, process_binding_time, process_binding_error } = processMetrics - -;(BigInt.prototype as any).toJSON = function () { - return this.toString() -} - -// Init-shapes carried over the rxjs Subject before being yielded by connect. -// connect accepts MessageInitShape for streaming outputs, so the oneof discriminated -// union must be filled in (e.g. { value: { case: 'result', value: ... } }). -export type ProcessStreamResponseInit = MessageInitShape -export type PreprocessStreamResponseInit = MessageInitShape - -export class ProcessorServiceImpl implements ServiceImpl { - private started = false - // When there is unhandled error, stop process and return unavailable error - unhandled: Error - // private processorConfig: ProcessConfigResponse - - private readonly loader: () => Promise - - private readonly shutdownHandler?: () => void - - private readonly enablePreprocess: boolean - - private preparedData: PreparedData | undefined - readonly enablePartition: boolean - - constructor(loader: () => Promise, options?: ProcessorRuntimeOptions, shutdownHandler?: () => void) { - this.loader = loader - this.shutdownHandler = shutdownHandler - - this.enablePreprocess = process.env['ENABLE_PREPROCESS'] - ? process.env['ENABLE_PREPROCESS'].toLowerCase() == 'true' - : false - - this.enablePartition = options?.enablePartition == true - } - - async getConfig(request: ProcessConfigRequest, context: HandlerContext): Promise { - if (!this.started) { - throw new ConnectError('Service Not started.', Code.Unavailable) - } - // if (!this.processorConfig) { - // throw new ConnectError('Process config empty.', Code.Internal) - // } - - // Don't use .create to keep compatiblity - const newConfig = create(ProcessConfigResponseSchema, {}) - await PluginManager.INSTANCE.configure(newConfig) - return newConfig - } - - // - // async configure() { - // this.processorConfig = ProcessConfigResponse.fromPartial({}) - // await PluginManager.INSTANCE.configure(this.processorConfig) - // } - - async start(request: StartRequest, context: HandlerContext): Promise> { - if (this.started) { - return {} - } - - freezeGlobalConfig() - - try { - // for (const plugin of ['@sentio/sdk', '@sentio/sdk/eth']) { - // try { - // await import(plugin) - // } catch (e) { - // console.error('Failed to load plugin: ', plugin) - // } - // } - // - // for (const plugin of ['@sentio/sdk/aptos', '@sentio/sdk/solana']) { - // try { - // await import(plugin) - // } catch (e) {} - // } - - await this.loader() - } catch (e) { - throw new ConnectError('Failed to load processor: ' + errorString(e), Code.InvalidArgument) - } - - await PluginManager.INSTANCE.start(request) - - // try { - // await this.configure() - // } catch (e) { - // throw new ConnectError('Failed to start processor : ' + errorString(e), Code.Internal) - // } - this.started = true - return {} - } - - async stop(request: Empty, context: HandlerContext): Promise> { - console.log('Server Shutting down in 5 seconds') - if (this.shutdownHandler) { - setTimeout(this.shutdownHandler, 5000) - } - return {} - } - - async processBindings( - request: ProcessBindingsRequest, - context?: HandlerContext - ): Promise> { - const preparedData = this.enablePreprocess - ? await this.preprocessBindings(request.bindings, {}, undefined, context) - : create(PreparedDataSchema, { ethCallResults: {} }) - - const promises = [] - for (const binding of request.bindings) { - const promise = this.processBinding(binding, preparedData) - if (GLOBAL_CONFIG.execution.sequential) { - await promise - } - promises.push(promise) - } - let promise - try { - promise = await Promise.all(promises) - processMetrics.process_binding_count.add(request.bindings.length) - } catch (e) { - processMetrics.process_binding_error.add(request.bindings.length) - throw e - } - const result = mergeProcessResults(promise) - - // let updated = false - // if (PluginManager.INSTANCE.stateDiff(this.processorConfig)) { - // await this.configure() - // updated = true - // } - - return { - result - } - } - - async preprocessBindings( - bindings: DataBinding[], - preprocessStore: { [k: string]: any }, - dbContext?: StoreContext, - options?: HandlerContext - ): Promise { - // console.debug(`preprocessBindings start, bindings: ${bindings.length}`) - const promises = [] - for (const binding of bindings) { - promises.push(this.preprocessBinding(binding, preprocessStore, dbContext, options)) - } - let preprocessResults: PreprocessResult[] - try { - preprocessResults = await Promise.all(promises) - } catch (e) { - throw e - } - const groupedRequests = new Map() - const providers = new Map() - for (const result of preprocessResults) { - for (const param of result.ethCallParams) { - const { chainId, blockTag } = param.context! - if (!providers.has(chainId)) { - providers.set(chainId, getProvider(chainId as EthChainId)) - } - const key = [chainId, blockTag].join('|') - if (!groupedRequests.has(key)) { - groupedRequests.set(key, []) - } - groupedRequests.get(key)!.push(param) - } - } - - const start = Date.now() - const MULTICALL_THRESHOLD = 1 - const callPromises: Promise<[string, string]>[] = [] - const multicallPromises: Promise<[string, string][]>[] = [] - - for (const params of groupedRequests.values()) { - const { chainId, blockTag } = params[0].context! - const multicallAddress = getMulticallAddress(chainId as EthChainId) - if (params.length <= MULTICALL_THRESHOLD || !multicallAddress) { - for (const param of params) { - callPromises.push( - providers - .get(chainId)! - .call({ - to: param.context!.address, - data: param.calldata, - blockTag - }) - .then((result) => [makeEthCallKey(param), result]) - ) - } - continue - } - - // construct multicalls - const CHUNK_SIZE = 128 - for (let i = 0; i < params.length; i += CHUNK_SIZE) { - const chunk = params.slice(i, i + CHUNK_SIZE) - const calls: Multicall3Call[] = chunk.map((param) => ({ - target: param.context!.address, - callData: param.calldata - })) - const data = encodeMulticallData(calls) - multicallPromises.push( - providers - .get(chainId)! - .call({ - to: multicallAddress, - data: data, - blockTag - }) - .then((raw) => { - const result = decodeMulticallResult(raw).returnData - if (result.length != chunk.length) { - throw new Error(`multicall result length mismatch, params: ${chunk.length}, result: ${result.length}`) - } - const ret: [string, string][] = [] - for (let i = 0; i < chunk.length; i++) { - ret.push([makeEthCallKey(chunk[i]), result[i]]) - } - return ret - }) - ) - } - } - - let results: { [p: string]: string } = {} - try { - results = Object.fromEntries(await Promise.all(callPromises)) - for (const multicallResult of await Promise.all(multicallPromises)) { - results = { - ...results, - ...Object.fromEntries(multicallResult) - } - } - } catch (e) { - console.error(`eth call error: ${e}`) - } - // console.debug( - // `${Object.keys(results).length} calls finished, actual calls: ${callPromises.length + multicallPromises.length}, elapsed: ${Date.now() - start}ms` - // ) - return create(PreparedDataSchema, { - ethCallResults: results - }) - } - - async preprocessBinding( - request: DataBinding, - preprocessStore: { [k: string]: any }, - dbContext?: StoreContext, - options?: HandlerContext - ): Promise { - if (!this.started) { - throw new ConnectError('Service Not started.', Code.Unavailable) - } - if (this.unhandled) { - throw new ConnectError( - 'Unhandled exception/rejection in previous request: ' + errorString(this.unhandled), - Code.Unavailable - ) - } - return await PluginManager.INSTANCE.preprocessBinding(request, preprocessStore, dbContext) - } - - async processBinding( - request: DataBinding, - preparedData: PreparedData | undefined, - options?: HandlerContext - ): Promise { - if (!this.started) { - throw new ConnectError('Service Not started.', Code.Unavailable) - } - if (this.unhandled) { - throw new ConnectError( - 'Unhandled exception/rejection in previous request: ' + errorString(this.unhandled), - Code.Unavailable - ) - } - - const result = await PluginManager.INSTANCE.processBinding( - request, - preparedData, - PluginManager.INSTANCE.dbContextLocalStorage.getStore() - ) - recordRuntimeInfo(result, request.handlerType) - return result - } - - async *processBindingsStream(requests: AsyncIterable, context: HandlerContext) { - if (!this.started) { - throw new ConnectError('Service Not started.', Code.Unavailable) - } - - const subject = new Subject() - this.handleRequests(requests, subject) - .then(() => { - if (this.preparedData) { - this.preparedData = create(PreparedDataSchema, { ethCallResults: {} }) - } - subject.complete() - }) - .catch((e) => { - console.error(e) - subject.error(e) - }) - yield* from(subject).pipe(withAbort(context.signal)) - } - - async handlePreprocessRequests( - requests: AsyncIterable, - subject: Subject - ) { - const contexts = new Contexts() - const preprocessStore: { [k: string]: any } = {} - - for await (const request of requests) { - try { - if (request.value.case === 'bindings') { - const bindings = request.value.value.bindings - // NOTE: StoreContext/Contexts are typed for the V2 ProcessStreamResponse stream, but the - // preprocess flow reuses them only to drive DB request/response plumbing. The preprocess - // stream message (flat `dbRequest`) differs from the V2 oneof shape, so we hand the - // preprocess subject in via a cast. db-context.ts owns the actual emit shape; integrator - // should confirm StoreContext.doSend stays compatible with both stream message types. - const dbContext = contexts.new(request.processId, subject as unknown as Subject) - const start = Date.now() - this.preprocessBindings(bindings, preprocessStore, dbContext, undefined) - .then((preparedData) => { - // TODO maybe not proper to pass data in this way - this.preparedData = create(PreparedDataSchema, { - ethCallResults: { - ...this.preparedData?.ethCallResults, - ...preparedData.ethCallResults - } - }) - subject.next({ - processId: request.processId - }) - }) - .catch((e) => { - console.debug(e) - dbContext.error(request.processId, e) - }) - .finally(() => { - const cost = Date.now() - start - console.debug('preprocessBinding', request.processId, ' took', cost, 'ms') - contexts.delete(request.processId) - }) - } - if (request.value.case === 'dbResult') { - const dbContext = contexts.get(request.processId) - dbContext?.result(request.value.value) - } - } catch (e) { - // should not happen - console.error('unexpect error during handle loop', e) - } - } - } - - async *preprocessBindingsStream(requests: AsyncIterable, context: HandlerContext) { - if (!this.started) { - throw new ConnectError('Service Not started.', Code.Unavailable) - } - - const subject = new Subject() - this.handlePreprocessRequests(requests, subject) - .then(() => { - subject.complete() - }) - .catch((e) => { - console.error(e) - subject.error(e) - }) - yield* from(subject).pipe(withAbort(context.signal)) - } - - private dbContexts = new Contexts() - - protected async handleRequests( - requests: AsyncIterable, - subject: Subject - ) { - let lastBinding: DataBinding | undefined = undefined - for await (const request of requests) { - try { - // console.log('received request:', request, 'lastBinding:', lastBinding) - if (request.value.case === 'binding') { - lastBinding = request.value.value - } - this.handleRequest(request, lastBinding, subject) - } catch (e) { - // should not happen - console.error('unexpect error during handle loop', e) - } - } - } - - async handleRequest( - request: ProcessStreamRequest, - lastBinding: DataBinding | undefined, - subject: Subject - ) { - if (request.value.case === 'binding') { - const binding = request.value.value - process_binding_count.add(1) - - // Adjust binding will make some request become invalid by setting UNKNOWN HandlerType - // for older SDK version, so we just return empty result for them here - if (binding.handlerType === HandlerType.UNKNOWN) { - subject.next({ - processId: request.processId, - value: { case: 'result', value: create(ProcessResultSchema) } - }) - return - } - - if (this.enablePartition) { - try { - const partitions = await PluginManager.INSTANCE.partition(binding) - subject.next({ - processId: request.processId, - value: { case: 'partitions', value: partitions } - }) - } catch (e) { - console.error('Partition error:', e) - subject.error(new Error('Partition error: ' + errorString(e))) - return - } - } else { - this.startProcess(request.processId, binding, subject) - } - } - - if (request.value.case === 'start') { - if (!lastBinding) { - console.error('start request received without binding') - subject.error(new Error('start request received without binding')) - return - } - this.startProcess(request.processId, lastBinding, subject) - } - - if (request.value.case === 'dbResult') { - const dbContext = this.dbContexts.get(request.processId) - try { - dbContext?.result(request.value.value) - } catch (e) { - subject.error(new Error('db result error, process should stop')) - } - } - } - - private startProcess(processId: number, binding: DataBinding, subject: Subject) { - const dbContext = this.dbContexts.new(processId, subject) - const start = Date.now() - PluginManager.INSTANCE.processBinding(binding, this.preparedData, dbContext) - .then(async (result) => { - // await all pending db requests - await dbContext.awaitPendings() - subject.next({ - value: { case: 'result', value: result }, - processId: processId - }) - recordRuntimeInfo(result, binding.handlerType) - }) - .catch((e) => { - console.error(e, e.stack) - dbContext.error(processId, e) - process_binding_error.add(1) - }) - .finally(() => { - const cost = Date.now() - start - process_binding_time.add(cost) - this.dbContexts.delete(processId) - }) - } -} - -export function recordRuntimeInfo(results: ProcessResult, handlerType: HandlerType) { - for (const list of [results.gauges, results.counters, results.events, results.exports]) { - list.forEach((e) => { - e.runtimeInfo = create(RuntimeInfoSchema, { - from: handlerType - }) - }) - } -} - -class Contexts { - private contexts: Map = new Map() - - get(processId: number) { - return this.contexts.get(processId) - } - - new(processId: number, subject: Subject) { - const context = new StoreContext(subject, processId) - this.contexts.set(processId, context) - return context - } - - delete(processId: number) { - const context = this.get(processId) - context?.close() - this.contexts.delete(processId) - } -} diff --git a/packages/runtime/src/utils.ts b/packages/runtime/src/utils.ts index 67c3dfb0..4bdf810a 100644 --- a/packages/runtime/src/utils.ts +++ b/packages/runtime/src/utils.ts @@ -1,4 +1,11 @@ -import { type EthCallParam, type ProcessResult, ProcessResultSchema, StateResultSchema } from '@sentio/protos' +import { + type EthCallParam, + HandlerType, + type ProcessResult, + ProcessResultSchema, + RuntimeInfoSchema, + StateResultSchema +} from '@sentio/protos' import { create } from '@bufbuild/protobuf' import { createRequire } from 'module' @@ -6,6 +13,10 @@ import { Required } from 'utility-types' import path from 'path' import fs from 'fs' +;(BigInt.prototype as any).toJSON = function () { + return this.toString() +} + export function mergeProcessResults(results: ProcessResult[]): Required { const res = create(ProcessResultSchema, { states: create(StateResultSchema) @@ -34,6 +45,16 @@ export function mergeProcessResultsInPlace( return res as Required } +export function recordRuntimeInfo(results: ProcessResult, handlerType: HandlerType) { + for (const list of [results.gauges, results.counters, results.events, results.exports]) { + list.forEach((e) => { + e.runtimeInfo = create(RuntimeInfoSchema, { + from: handlerType + }) + }) + } +} + function mergeArrayInPlace(dst: T[], src: T[]): T[] { const res = dst || [] if (Array.isArray(src)) { diff --git a/packages/sdk/src/testing/memory-database.ts b/packages/sdk/src/testing/memory-database.ts index e37cc268..2d5d39a9 100644 --- a/packages/sdk/src/testing/memory-database.ts +++ b/packages/sdk/src/testing/memory-database.ts @@ -3,8 +3,9 @@ import { type DBRequest, type DBRequest_DBFilter, DBRequest_DBOperator, + type DBResponse, DBResponseSchema, - type ProcessStreamResponse, + type ProcessStreamResponseV3, type RichStruct, type RichValue, RichValueSchema, @@ -17,20 +18,26 @@ import { GraphQLField, GraphQLSchema, parse, StringValueNode } from 'graphql/ind import { DatabaseSchemaState } from '../core/database-schema.js' import { buildSchema } from '../store/schema.js' import { GraphQLList, GraphQLNonNull, GraphQLObjectType, GraphQLOutputType } from 'graphql' -import { PluginManager } from '@sentio/runtime' +import { type IStoreContext, PluginManager } from '@sentio/runtime' import { BigDecimalConverter, BigIntConverter } from '../store/convert.js' import { BigDecimal } from '@sentio/bigdecimal' import { Store } from '../store/store.js' +import { Subject } from 'rxjs' // Internal entity name used by MemoryCache - bypasses schema validation const MEMORY_CACHE_ITEM_ENTITY = 'MemoryCacheItem' +type MemoryDatabaseContext = IStoreContext & { + subject: Subject + result(dbResult: DBResponse, processId?: number): void +} + export class MemoryDatabase { db = new Map>() public lastDbRequest: DBRequest | undefined _schema: GraphQLSchema - constructor(readonly dbContext: StoreContext) {} + constructor(readonly dbContext: MemoryDatabaseContext) {} get schema() { if (!this._schema) { @@ -48,9 +55,9 @@ export class MemoryDatabase { } start() { - // The subject is typed as the `ProcessStreamResponse` init-shape, but at runtime it always carries a + // The subject is typed as the `ProcessStreamResponseV3` init-shape, but at runtime it always carries a // full response (the store context emits the complete oneof). Treat it as a full message and narrow inside. - this.dbContext.subject.subscribe((request) => this.processRequest(request as unknown as ProcessStreamResponse)) + this.dbContext.subject.subscribe((request) => this.processRequest(request as unknown as ProcessStreamResponseV3)) } stop() { @@ -149,7 +156,7 @@ export class MemoryDatabase { return result } - private processRequest(request: ProcessStreamResponse) { + private processRequest(request: ProcessStreamResponseV3) { const req = request.value.case === 'dbRequest' ? request.value.value : undefined // Check if schema is required for this request @@ -170,7 +177,7 @@ export class MemoryDatabase { this.upsert(entityName, id, d) }) - this.dbContext.result(create(DBResponseSchema, { opId: req.opId })) + this.sendResult(request, create(DBResponseSchema, { opId: req.opId })) } if (req.op.case === 'delete') { const { id, entity } = req.op.value @@ -178,13 +185,14 @@ export class MemoryDatabase { const entityName = entity[idx] this.delete(entityName, i) }) - this.dbContext.result(create(DBResponseSchema, { opId: req.opId })) + this.sendResult(request, create(DBResponseSchema, { opId: req.opId })) } if (req.op.case === 'get') { const { entity, id } = req.op.value const data = this.getById(entity, id) - this.dbContext.result( + this.sendResult( + request, create(DBResponseSchema, { opId: req.opId, value: { @@ -203,7 +211,8 @@ export class MemoryDatabase { if (cursor) { const idx = parseInt(cursor) - this.dbContext.result( + this.sendResult( + request, create(DBResponseSchema, { opId: req.opId, value: { case: 'entityList', value: { entities: list.slice(idx, idx + 1).map((d) => toEntity(d)) } }, @@ -211,7 +220,8 @@ export class MemoryDatabase { }) ) } else { - this.dbContext.result( + this.sendResult( + request, create(DBResponseSchema, { opId: req.opId, value: { case: 'entityList', value: { entities: list.length ? [toEntity(list[0])] : [] } }, @@ -223,6 +233,10 @@ export class MemoryDatabase { } } + private sendResult(request: ProcessStreamResponseV3, response: DBResponse) { + this.dbContext.result(response, request.processId) + } + reset() { this.db.clear() } diff --git a/packages/sdk/src/testing/test-processor-server.ts b/packages/sdk/src/testing/test-processor-server.ts index ee6d67c2..66ef374a 100644 --- a/packages/sdk/src/testing/test-processor-server.ts +++ b/packages/sdk/src/testing/test-processor-server.ts @@ -2,30 +2,35 @@ import { type AccountConfig, type ContractConfig, type DataBinding, + type DBResponse, type Empty, EmptySchema, - type PreprocessStreamRequest, type ProcessBindingResponse, ProcessBindingResponseSchema, ProcessBindingsRequestSchema, ProcessConfigRequestSchema, type ProcessConfigResponse, + type ProcessResult, + ProcessResultSchema, type ProcessStreamRequest, - ProcessStreamResponseSchema, + ProcessStreamRequestSchema, + ProcessStreamResponseV3Schema, StartRequestSchema, type TemplateInstance, + TemplateInstanceSchema, type TimeseriesResult, UpdateTemplatesRequestSchema } from '@sentio/protos' import { create, type MessageInitShape } from '@bufbuild/protobuf' import { type HandlerContext } from '@connectrpc/connect' import { + DataBindingContext, Endpoints, IDataBindingContext, + mergeProcessResults, PluginManager, - ProcessorServiceImpl, State, - StoreContext + ProcessorServiceImplV3 } from '@sentio/runtime' import { AptosFacet } from './aptos-facet.js' @@ -40,7 +45,7 @@ import { DatabaseSchemaState } from '../core/database-schema.js' import { IotaFacet } from './iota-facet.js' import { ChainInfo } from '@sentio/chain' -type ProcessStreamResponseInit = MessageInitShape +type ProcessStreamResponseV3Init = MessageInitShape export const TEST_CONTEXT = {} as HandlerContext @@ -52,10 +57,11 @@ export function cleanTest() { } export class TestProcessorServer { - service: ProcessorServiceImpl + service: ProcessorServiceImplV3 contractConfigs: ContractConfig[] accountConfigs: AccountConfig[] storeContext: TestStoreContext + private nextProcessId = 1 aptos: AptosFacet eth: EthFacet @@ -69,7 +75,7 @@ export class TestProcessorServer { constructor(loader: () => Promise, httpEndpoints: Record = {}) { cleanTest() - this.service = new ProcessorServiceImpl(loader) + this.service = new ProcessorServiceImplV3(loader) this.aptos = new AptosFacet(this) this.solana = new SolanaFacet(this) this.eth = new EthFacet(this) @@ -84,8 +90,8 @@ export class TestProcessorServer { } // start a memory database for testing - const subject = new Subject() - this.storeContext = new TestStoreContext(subject, 1) + const subject = new Subject() + this.storeContext = new TestStoreContext(subject, 1, this.service) this._db = new MemoryDatabase(this.storeContext) } @@ -104,7 +110,8 @@ export class TestProcessorServer { } stop(request: Empty = create(EmptySchema), context = TEST_CONTEXT) { - return this.service.stop(request, context) + this._db.stop() + return request } async getConfig( @@ -121,43 +128,84 @@ export class TestProcessorServer { context: HandlerContext = TEST_CONTEXT ): Promise { const req = create(ProcessBindingsRequestSchema, request) - return PluginManager.INSTANCE.dbContextLocalStorage.run(this.storeContext, async () => { - const ret = await this.service.processBindings(req, context) - if (ret.result?.states?.configUpdated) { - // template may changed + return this.processBindingList(req.bindings, context) + } + + async processBinding(request: DataBinding, context: HandlerContext = TEST_CONTEXT): Promise { + return this.processBindingList([request], context) + } + + private async processBindingList( + bindings: DataBinding[], + context: HandlerContext = TEST_CONTEXT + ): Promise { + const results: ProcessResult[] = [] + for (const binding of bindings) { + const result = await this.processBindingV3(binding, context) + results.push(result) + + if (result.states?.configUpdated) { await PluginManager.INSTANCE.updateTemplates( create(UpdateTemplatesRequestSchema, { - chainId: req.bindings[0].chainId, + chainId: binding.chainId, templateInstances: this.storeContext.templateInstances }) ) } - return create(ProcessBindingResponseSchema, ret) + } + return create(ProcessBindingResponseSchema, { + result: mergeProcessResults(results) }) } - async processBinding(request: DataBinding, context: HandlerContext = TEST_CONTEXT): Promise { - const ret = await PluginManager.INSTANCE.dbContextLocalStorage.run(this.storeContext, () => { - return this.service.processBindings(create(ProcessBindingsRequestSchema, { bindings: [request] }), context) - }) - if (ret.result?.states?.configUpdated) { - // template may changed - await PluginManager.INSTANCE.updateTemplates( - create(UpdateTemplatesRequestSchema, { - chainId: request.chainId, - templateInstances: this.storeContext.templateInstances - }) - ) - } - return create(ProcessBindingResponseSchema, ret) - } + private processBindingV3(request: DataBinding, context: HandlerContext): Promise { + const processId = this.nextProcessId++ + const subject = this.storeContext.subject - processBindingsStream(requests: AsyncIterable, context: HandlerContext): never { - throw new Error('Method not implemented.') + return new Promise((resolve, reject) => { + const subscription = subject.subscribe({ + next: (response) => { + if (response.processId !== processId) { + return + } + if (response.value?.case === 'tplRequest') { + this.storeContext.applyTemplateRequest( + (response.value.value.templates ?? []).map((template) => create(TemplateInstanceSchema, template)), + response.value.value.remove ?? false + ) + } + if (response.value?.case === 'result') { + subscription.unsubscribe() + resolve(create(ProcessResultSchema, response.value.value)) + } + }, + error: (e) => { + subscription.unsubscribe() + reject(e) + } + }) + + this.service + .handleRequest( + create(ProcessStreamRequestSchema, { + processId, + value: { + case: 'binding', + value: request + } + }), + undefined, + subject + ) + .catch((e) => { + subscription.unsubscribe() + reject(e) + }) + }) } - preprocessBindingsStream(requests: AsyncIterable, context: HandlerContext): never { - throw new Error('Method not implemented.') + processBindingsStream(requests: AsyncIterable, context: HandlerContext) { + return this.service.processBindingsStream(requests, context) } // processBindingsStream(request: AsyncIterable, context: HandlerContext) { @@ -172,17 +220,32 @@ export class TestProcessorServer { } } -class TestStoreContext extends StoreContext implements IDataBindingContext { +class TestStoreContext extends DataBindingContext implements IDataBindingContext { constructor( - readonly subject: Subject, - processId: number + subject: Subject, + processId: number, + private readonly service: ProcessorServiceImplV3 ) { - super(subject, processId) + super(processId, subject) } templateInstances: TemplateInstance[] = [] - sendTemplateRequest(templates: Array, remove: boolean): void { + result(dbResult: DBResponse, processId = this.processId): void { + void this.service.handleRequest( + create(ProcessStreamRequestSchema, { + processId, + value: { + case: 'dbResult', + value: dbResult + } + }), + undefined, + this.subject + ) + } + + applyTemplateRequest(templates: Array, remove: boolean): void { if (remove) { this.templateInstances = this.templateInstances.filter( (i) => !templates.find((t) => t.templateId === i.templateId && t.contract?.address === i.contract?.address) @@ -191,7 +254,12 @@ class TestStoreContext extends StoreContext implements IDataBindingContext { this.templateInstances.push(...templates) } } + + sendTemplateRequest(templates: Array, remove: boolean): void { + this.applyTemplateRequest(templates, remove) + } + sendTimeseriesRequest(timeseries: Array): void { - throw new Error('Method not implemented.') + // Test helpers currently expose metric/event/export results through ProcessResult. } }