Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- CreateTable
CREATE TABLE "Property" (
"key" TEXT NOT NULL,
"value" TEXT NOT NULL,

CONSTRAINT "Property_pkey" PRIMARY KEY ("key")
);
15 changes: 10 additions & 5 deletions packages/persistance/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ model Block {
toMessagesHash String
fromStateRoot String

beforeBlockStateTransitions Json @db.Json
createdAt DateTime @default(now())
beforeBlockStateTransitions Json @db.Json
createdAt DateTime @default(now())

parentHash String? @unique
parent Block? @relation("Parent", fields: [parentHash], references: [hash])
Expand All @@ -124,7 +124,7 @@ model Block {
model Batch {
height Int @id

proof Json @db.Json
proof Json @db.Json
createdAt DateTime @default(now())

blocks Block[]
Expand All @@ -148,9 +148,9 @@ model BlockResult {

model Settlement {
// transaction String
transactionHash String @id
transactionHash String @id
promisedMessagesHash String
createdAt DateTime @default(now())
createdAt DateTime @default(now())

batches Batch[]
}
Expand All @@ -173,3 +173,8 @@ model IncomingMessageBatch {

messages IncomingMessageBatchTransaction[]
}

model Property {
key String @id
value String
}
4 changes: 4 additions & 0 deletions packages/persistance/src/PrismaDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { PrismaSettlementStorage } from "./services/prisma/PrismaSettlementStora
import { PrismaMessageStorage } from "./services/prisma/PrismaMessageStorage";
import { PrismaTransactionStorage } from "./services/prisma/PrismaTransactionStorage";
import { PrismaLinkedLeafStore } from "./services/prisma/PrismaLinkedLeafStore";
import { PrismaPropertyStorage } from "./services/prisma/PrismaPropertyStorage";

export interface PrismaDatabaseConfig {
// Either object-based config or connection string
Expand Down Expand Up @@ -95,6 +96,9 @@ export class PrismaDatabaseConnection
transactionStorage: {
useClass: PrismaTransactionStorage,
},
propertyStorage: {
useClass: PrismaPropertyStorage,
},

asyncLinkedLeafStore: {
useGenerated: (module) => {
Expand Down
1 change: 1 addition & 0 deletions packages/persistance/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from "./services/prisma/PrismaBatchStore";
export * from "./services/prisma/PrismaSettlementStorage";
export * from "./services/prisma/PrismaMessageStorage";
export * from "./services/prisma/PrismaTransactionStorage";
export * from "./services/prisma/PrismaPropertyStorage";
export * from "./services/prisma/mappers/BatchMapper";
export * from "./services/prisma/mappers/BlockMapper";
export * from "./services/prisma/mappers/FieldMapper";
Expand Down
27 changes: 21 additions & 6 deletions packages/persistance/src/services/prisma/PrismaBatchStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ export class PrismaBatchStore implements BatchStorage {
height: Prisma.SortOrder.desc,
},
include: {
blocks: {
select: {
hash: true,
},
},
blocks: {},
},
take: 1,
});
Expand All @@ -84,7 +80,26 @@ export class PrismaBatchStore implements BatchStorage {
}
return this.batchMapper.mapIn([
batch,
batch.blocks.map((block) => block.hash),
batch.blocks.map(({ hash }) => hash),
]);
}

public async getUnsettledBatches(): Promise<Batch[]> {
const batches = await this.connection.prismaClient.batch.findMany({
include: {
blocks: {
select: {
hash: true,
},
},
},
where: {
settlement: null,
},
});

return batches.map((batch) =>
this.batchMapper.mapIn([batch, batch.blocks.map(({ hash }) => hash)])
);
}
}
29 changes: 29 additions & 0 deletions packages/persistance/src/services/prisma/PrismaPropertyStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { PropertyStorage } from "@proto-kit/sequencer";
import { inject } from "tsyringe";

import type { PrismaConnection } from "../../PrismaDatabaseConnection";

export class PrismaPropertyStorage implements PropertyStorage {
public constructor(
@inject("Database") private readonly connection: PrismaConnection
) {}

public async get(key: string): Promise<string | undefined> {
const record = await this.connection.prismaClient.property.findFirst({
where: {
key,
},
});

return record?.value;
}

public async set(key: string, value: string): Promise<void> {
await this.connection.prismaClient.property.create({
data: {
key,
value,
},
});
}
}
2 changes: 2 additions & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ export * from "./storage/repositories/BlockStorage";
export * from "./storage/repositories/SettlementStorage";
export * from "./storage/repositories/MessageStorage";
export * from "./storage/repositories/TransactionStorage";
export * from "./storage/repositories/PropertyStorage";
export * from "./storage/inmemory/InMemoryDatabase";
export * from "./storage/inmemory/InMemoryAsyncMerkleTreeStore";
export * from "./storage/inmemory/InMemoryBlockStorage";
export * from "./storage/inmemory/InMemoryBatchStorage";
export * from "./storage/inmemory/InMemorySettlementStorage";
export * from "./storage/inmemory/InMemoryMessageStorage";
export * from "./storage/inmemory/InMemoryTransactionStorage";
export * from "./storage/inmemory/InMemoryPropertyStorage";
export * from "./storage/StorageDependencyFactory";
export * from "./storage/Database";
export * from "./storage/Prunable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import {
SequencerModule,
} from "../../sequencer/builder/SequencerModule";
import { BatchStorage } from "../../storage/repositories/BatchStorage";
import { SettleableBatch } from "../../storage/model/Batch";
import { Batch, SettleableBatch } from "../../storage/model/Batch";
import { BlockWithResult } from "../../storage/model/Block";
import type { Database } from "../../storage/Database";
import { AsyncLinkedLeafStore } from "../../state/async/AsyncLinkedLeafStore";
import { CachedLinkedLeafStore } from "../../state/lmt/CachedLinkedLeafStore";
import { ensureNotBusy } from "../../helpers/BusyGuard";
import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore";
import { BlockStorage } from "../../storage/repositories/BlockStorage";

import { BlockProofSerializer } from "./tasks/serializers/BlockProofSerializer";
import { BatchTracingService } from "./tracing/BatchTracingService";
Expand Down Expand Up @@ -52,6 +53,7 @@ export class BatchProducerModule extends SequencerModule {
@inject("AsyncTreeStore")
private readonly merkleStore: AsyncMerkleTreeStore,
@inject("BatchStorage") private readonly batchStorage: BatchStorage,
@inject("BlockStorage") private readonly blockStorage: BlockStorage,
@inject("Database")
private readonly database: Database,
private readonly batchFlow: BatchFlow,
Expand Down Expand Up @@ -97,6 +99,30 @@ export class BatchProducerModule extends SequencerModule {
return batchWithStateDiff?.batch;
}

public async getSettleableBatches() {
return await this.batchStorage.getUnsettledBatches();
}

public async recoverSettleableBatch(batch: Batch) {
const firstBlock = await this.blockStorage.getBlock(batch.blockHashes[0]);
const lastBlock = await this.blockStorage.getBlock(
batch.blockHashes.at(-1)!
);

if (firstBlock === undefined || lastBlock === undefined) {
throw new Error("First or last block not found");
}
const lastBlockResult = await this.blockStorage.getBlockWithResultAt(
parseInt(lastBlock.height.toString(), 10)
);

return {
...batch,
fromNetworkState: firstBlock.networkState.before,
toNetworkState: lastBlockResult!.result.afterNetworkState,
};
}

public async start(): Promise<void> {
noop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ export class ReductionTaskFlow<Input, Output> {
private readonly options: {
name: string;
inputLength: number;
mappingTask: Task<Input, Output>;
mappingTask?: Task<Input, Output>;
reductionTask: Task<PairTuple<Output>, Output>;
mergableFunction: (a: Output, b: Output) => boolean;
mappingFunction?: (input: Input) => Promise<Output>;
},
flowCreator: FlowCreator
) {
Expand Down Expand Up @@ -202,18 +203,27 @@ export class ReductionTaskFlow<Input, Output> {
});
}

private async handleMappingResult(result: Output) {
if (this.options.inputLength === 1) {
this.flow.resolve(result);
} else {
this.flow.state.queue.push(result);
await this.resolveReduction();
}
}

public async pushInput(input: Input) {
await this.flow.pushTask(
this.options.mappingTask,
input,
async (result) => {
if (this.options.inputLength === 1) {
this.flow.resolve(result);
} else {
this.flow.state.queue.push(result);
await this.resolveReduction();
if (this.options.mappingTask !== undefined) {
await this.flow.pushTask(
this.options.mappingTask,
input,
async (result) => {
await this.handleMappingResult(result);
}
}
);
);
} else if (this.options.mappingFunction !== undefined) {
const result = await this.options.mappingFunction(input);
await this.handleMappingResult(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class BlockTriggerBase<
}

protected async settle(
batch: SettleableBatch,
batches: SettleableBatch[],
config: SettlementTokenConfig
// nonce?: number
) {
Expand All @@ -103,10 +103,10 @@ export class BlockTriggerBase<
);
return undefined;
}
const settlement = await this.settlementModule.settleBatch(batch);
const settlement = await this.settlementModule.settleBatch(batches);

const txs = await this.bridgingModule?.sendRollupTransactions(
[batch],
batches,
config
// TODO nonce override
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,17 @@ export class ManualBlockTrigger
return await super.produceBatch();
}

public async settle(batch: SettleableBatch, config: SettlementTokenConfig) {
return await super.settle(batch, config);
public async settle(
batches: SettleableBatch | SettleableBatch[],
config: SettlementTokenConfig
) {
let batchArray: SettleableBatch[];
if (Array.isArray(batches)) {
batchArray = batches;
} else {
batchArray = [batches];
}
return await super.settle(batchArray, config);
}

public async produceBlock(): Promise<Block | undefined> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { inject, injectable } from "tsyringe";
import { dependencyFactory, log } from "@proto-kit/common";
import { dependencyFactory, log, mapSequential } from "@proto-kit/common";

import { closeable, Closeable } from "../../../sequencer/builder/Closeable";
import { BatchProducerModule } from "../BatchProducerModule";
Expand All @@ -15,6 +15,7 @@ import { ensureNotBusy } from "../../../helpers/BusyGuard";
import { SequencerStartupModule } from "../../../sequencer/SequencerStartupModule";
import { BlockProductionInstrumentation } from "../../../metrics/BlockProductionInstrumentation";
import { SequencerCoreModule } from "../../../sequencer/SequencerCoreModule";
import { SettleableBatch } from "../../../storage/model/Batch";

import { BlockTriggerBase } from "./BlockTrigger";

Expand All @@ -35,6 +36,8 @@ export class TimedBlockTrigger
{
private intervals: NodeJS.Timeout[] = [];

private isFirstSettlement = true;

public constructor(
@inject("BatchProducerModule", { isOptional: true })
batchProducerModule: BatchProducerModule | undefined,
Expand Down Expand Up @@ -87,23 +90,15 @@ export class TimedBlockTrigger
}

const blockIntervalId = setInterval(async () => {
try {
// Trigger unproven blocks
await this.produceUnprovenBlock();
} catch (error) {
log.error(error);
}
// Trigger unproven blocks
await this.produceUnprovenBlock();
}, blockInterval);
this.intervals.push(blockIntervalId);

if (settlementInterval !== undefined) {
const settlementIntervalId = setInterval(async () => {
try {
// Trigger settlement
await this.tryProduceSettlement();
} catch (error) {
log.error(error);
}
// Trigger settlement
await this.tryProduceSettlement();
}, settlementInterval);
this.intervals.push(settlementIntervalId);
}
Expand All @@ -126,8 +121,24 @@ export class TimedBlockTrigger
@ensureNotBusy()
private async tryProduceSettlement(): Promise<void> {
const batch = await this.produceBatch();
if (batch !== undefined) {
await this.settle(batch, this.config.settlementTokenConfig);

let batches: SettleableBatch[] | undefined = undefined;
if (this.isFirstSettlement) {
const rawBatches = await this.batchProducerModule?.getSettleableBatches();
if (rawBatches !== undefined) {
batches = await mapSequential(
rawBatches,
async (rawBatch) =>
await this.batchProducerModule!.recoverSettleableBatch(rawBatch)
);
}
this.isFirstSettlement = false;
} else if (batch !== undefined) {
batches = [batch];
}

if (batches !== undefined) {
await this.settle(batches, this.config.settlementTokenConfig);
}
}

Expand Down
Loading
Loading