monSQLize v1.1.0 引入了 Saga 分布式事务模式,用于协调跨服务的事务操作。
Saga 是一种分布式事务模式,通过将长事务分解为多个本地事务,每个本地事务都有对应的补偿操作。当某个步骤失败时,通过逆序执行补偿操作来撤销已完成的步骤。
- ✅ 跨服务事务协调:协调多个服务的操作
- ✅ 自动补偿机制:失败时自动逆序执行补偿
- ✅ 无时间限制:突破 MongoDB 60秒事务限制
- ✅ Redis 分布式支持:多进程环境下共享 Saga 定义
- ✅ 详细日志:完整的执行和补偿日志
-
跨服务事务
A 服务(订单) → B 服务(库存) → C 服务(支付) -
第三方 API 集成
创建订单 → Stripe 扣款 → 发送邮件 -
长时间流程
超过 60 秒的复杂业务流程
- 单服务单库操作 → 使用
withTransaction - 外部 API 只读取 → 使用
withTransaction
Saga 功能已内置在 monSQLize v1.1.0+,无需额外安装。
npm install monsqlize@^1.1.0const MonSQLize = require('monsqlize');
// 初始化
const msq = new MonSQLize({
type: 'mongodb',
config: { uri: 'mongodb://localhost:27017/mydb' }
});
await msq.connect();
// 定义 Saga
msq.defineSaga({
name: 'create-order-with-payment',
steps: [
{
name: 'create-order',
execute: async (ctx) => {
// 创建订单
const order = await createOrder(ctx.data);
ctx.set('orderId', order.id);
return order;
},
compensate: async (ctx) => {
// 取消订单
const orderId = ctx.get('orderId');
await cancelOrder(orderId);
}
},
{
name: 'charge-payment',
execute: async (ctx) => {
// 扣款
const charge = await stripe.charges.create({
amount: ctx.data.amount,
source: ctx.data.paymentToken
});
ctx.set('chargeId', charge.id);
return charge;
},
compensate: async (ctx) => {
// 退款
const chargeId = ctx.get('chargeId');
await stripe.refunds.create({ charge: chargeId });
}
}
]
});
// 执行 Saga
try {
const result = await msq.executeSaga('create-order-with-payment', {
userId: 'user123',
amount: 9900,
paymentToken: 'tok_visa'
});
console.log('订单创建成功:', result.sagaId);
} catch (error) {
console.error('订单创建失败:', error.message);
}定义一个 Saga。
参数:
config.name(string): Saga 名称,全局唯一config.steps(Array): 步骤列表
步骤配置:
{
name: 'step-name', // 步骤名称
execute: async (ctx) => { }, // 执行函数
compensate: async (ctx, result) => { } // 补偿函数(可选)
}返回值:SagaDefinition 实例
示例:
msq.defineSaga({
name: 'my-saga',
steps: [
{
name: 'step1',
execute: async (ctx) => {
// 正向操作
return { success: true };
},
compensate: async (ctx, result) => {
// 补偿操作
}
}
]
});执行 Saga。
参数:
name(string): Saga 名称data(Object): 执行数据,可通过ctx.data访问
返回值:Promise
成功返回:
{
success: true,
sagaId: 'saga_xxx',
sagaName: 'my-saga',
completedSteps: 3,
duration: 123 // 毫秒
}失败返回:
{
success: false,
sagaId: 'saga_xxx',
sagaName: 'my-saga',
completedSteps: 2,
failedStep: 2,
error: 'Error message',
duration: 123,
compensation: {
success: true,
results: [...]
}
}列出所有已定义的 Saga。
返回值:Promise<string[]>
const sagas = await msq.listSagas();
console.log('已定义的 Saga:', sagas);
// ['create-order', 'update-inventory', ...]获取 Saga 统计信息。
返回值:Object
const stats = msq.getSagaStats();
console.log(stats);
// {
// totalExecutions: 100,
// successfulExecutions: 95,
// failedExecutions: 5,
// compensatedExecutions: 5,
// successRate: '95.00%',
// storageMode: 'Redis' // 或 '内存'
// }在 execute 和 compensate 函数中可以访问 SagaContext。
获取执行数据(只读)。
execute: async (ctx) => {
const userId = ctx.data.userId;
const amount = ctx.data.amount;
}保存自定义数据,用于步骤间传递。支持任何类型的值(字符串、对象、数组、数字等)。
execute: async (ctx) => {
// ✅ 字符串
ctx.set('orderId', 'ORDER123');
// ✅ 对象
ctx.set('orderData', {
orderId: 'ORDER123',
amount: 9900,
items: [{ sku: 'SKU001', quantity: 2 }]
});
// ✅ 数组
ctx.set('itemIds', ['id1', 'id2', 'id3']);
// ✅ 数字
ctx.set('totalAmount', 9900);
}获取自定义数据。返回值类型与存入时一致。
compensate: async (ctx) => {
// 获取字符串
const orderId = ctx.get('orderId');
await cancelOrder(orderId);
// 获取对象
const orderData = ctx.get('orderData');
console.log('订单金额:', orderData.amount);
// 获取数组
const itemIds = ctx.get('itemIds');
for (const id of itemIds) {
await releaseItem(id);
}
}
await cancelOrder(orderId);
}获取 Saga 唯一标识。
execute: async (ctx) => {
console.log('Saga ID:', ctx.sagaId);
}每个有副作用的步骤都应该有补偿操作。
// ✅ 推荐
{
name: 'create-order',
execute: async (ctx) => {
const order = await createOrder(ctx.data);
ctx.set('orderId', order.id);
return order;
},
compensate: async (ctx) => {
const orderId = ctx.get('orderId');
await cancelOrder(orderId);
}
}
// ❌ 不推荐:有副作用但没有补偿
{
name: 'create-order',
execute: async (ctx) => {
return await createOrder(ctx.data);
}
// 没有 compensate
}补偿操作需要的信息应该在 execute 中保存。支持保存完整对象,简化代码。
// ✅ 推荐:保存完整对象
execute: async (ctx) => {
const result = await createOrder(ctx.data);
// 保存完整的订单对象,补偿时可以访问所有信息
ctx.set('order', result);
return result;
},
compensate: async (ctx) => {
// 使用保存的完整对象
const order = ctx.get('order');
await cancelOrder(order.id, {
reason: 'saga-compensation',
amount: order.amount,
items: order.items
});
}
// ✅ 也可以:保存单个字段
execute: async (ctx) => {
const result = await doSomething();
ctx.set('resourceId', result.id);
ctx.set('amount', result.amount);
return result;
},
compensate: async (ctx) => {
const resourceId = ctx.get('resourceId');
const amount = ctx.get('amount');
await revertOperation(resourceId, amount);
}补偿操作应该是幂等的(可重复执行)。
compensate: async (ctx) => {
const orderId = ctx.get('orderId');
// ✅ 推荐:检查状态
const order = await getOrder(orderId);
if (order.status !== 'cancelled') {
await cancelOrder(orderId);
}
// ❌ 不推荐:直接执行
// await cancelOrder(orderId); // 可能重复取消
}补偿失败应该记录详细日志。
compensate: async (ctx) => {
try {
await doCompensation();
} catch (error) {
// Saga 会自动记录错误
// 但你可以添加额外的日志
console.error('[Compensation Error]', {
sagaId: ctx.sagaId,
error: error.message
});
throw error; // 重新抛出
}
}单进程模式(默认):
const msq = new MonSQLize({
type: 'mongodb',
config: { uri: '...' },
cache: false // 或不配置 cache
});Redis 模式(多进程):
const { createRedisCacheAdapter } = require('monsqlize/lib/redis-cache-adapter');
const msq = new MonSQLize({
type: 'mongodb',
config: { uri: '...' },
cache: createRedisCacheAdapter('redis://localhost:6379')
});
// ⚠️ 重要:每个进程启动时都需要调用 defineSaga()
msq.defineSaga({
name: 'my-saga',
steps: [...]
});| 维度 | withTransaction | Saga |
|---|---|---|
| 适用场景 | 单库操作 | 跨服务操作 |
| 回滚方式 | MongoDB 自动回滚 | 手动补偿 |
| 时间限制 | 60秒 | 无限制 |
| 外部 API | 不支持回滚 | 支持补偿 |
补偿失败会被记录在返回结果中:
const result = await msq.executeSaga('my-saga', data);
if (!result.success && !result.compensation.success) {
console.error('补偿失败,需要人工介入');
console.error('失败的步骤:', result.compensation.results);
}启用日志:
const msq = new MonSQLize({
type: 'mongodb',
config: { uri: '...' },
logger: { level: 'debug' } // 启用详细日志
});可以,但不推荐。建议将复杂流程拆分为多个独立的 Saga。
使用模拟服务:
// 模拟外部服务
const mockOrderService = {
create: async (data) => ({ orderId: 'TEST_ORDER' }),
cancel: async (orderId) => ({ cancelled: true })
};
// 定义测试 Saga
msq.defineSaga({
name: 'test-saga',
steps: [
{
name: 'create-order',
execute: async (ctx) => {
return await mockOrderService.create(ctx.data);
},
compensate: async (ctx) => {
const orderId = ctx.get('orderId');
await mockOrderService.cancel(orderId);
}
}
]
});
// 执行测试
const result = await msq.executeSaga('test-saga', {});
assert(result.success === true);完整示例请参考:
- v1.1.0 (2026-01-17): 首次发布 Saga 功能
文档更新时间: 2026-01-17
版本: v1.1.0