diff --git a/.github/skills/review/SKILL.md b/.github/skills/review/SKILL.md new file mode 100644 index 00000000..ca1d059f --- /dev/null +++ b/.github/skills/review/SKILL.md @@ -0,0 +1,409 @@ +--- +name: review +description: "Use when: reviewing Rust crates or services in spacegate, code review, auditing module quality, checking spacegate conventions, PR review, naming consistency, performance, security, distributed safety, test coverage, or documentation quality. 适用于 spacegate 仓库的完整代码审查:架构 → 命名 → 类型与错误处理 → 注释 → 性能 → 分布式 → 安全 → 日志 → 测试 → 文档。" +--- + +# review — 模块代码审查规范 + +> 面向 AI 助手的 spacegate 仓库代码审查指南。 +> +> spacegate 是一个基于 Rust + tokio + hyper + tower 的高性能 API 网关,采用 workspace 多 crate 组织方式,核心模块为 `spacegate-kernel`,插件系统为 `spacegate-plugin`,配置系统为 `spacegate-config`,shell 层负责组装运行时。 + +--- + +## §1 审查准则 + +1. **先定位范围再审查**:优先确认本次审查对象是单个 crate、单个模块还是一次跨 crate 变更,避免把整个 workspace 一锅端。 +2. **先搜索再改动**:先确认调用链、导出链、feature 开关、测试入口和配置入口,再下结论。 +3. **最小变更**:除非测试或证据表明存在缺陷,否则不主动改变业务语义、不顺手大改风格。 +4. **成套更新**:发现问题后,优先同步更新代码 / 测试 / 文档;如果暂不修复,要明确记录风险与原因。 +5. **遵循 spacegate 语境**:审查标准必须贴合项目实际约定——`BoxError` / `BoxResult`、`tracing` 日志体系、`tower::Layer` 中间件模式、`tokio` 异步运行时、`hyper` HTTP 类型、`#[deny(clippy::unwrap_used)]` 等 lint 要求。 +6. **两阶段完整执行**:必须依次执行 **Part 1(基础审查)** 和 **Part 2(交叉评审与模拟)**,最后输出变更汇总;禁止只做一半就收工。 + +### 审查总览 + +``` +Part 1 — 基础审查 + 逐维度(§2-§11)审查代码 → 发现问题即时修复或记录风险 → 输出审查清单 + ↓ 自动进入 +Part 2 — 交叉评审与模拟 + 4 角色独立评审 → 交叉质疑 → 生产事故模拟 → 修复遗漏问题 → 质量门禁 + ↓ +变更汇总 + 合并 Part 1 + Part 2 全部修改,按优先级输出清单 +``` + +--- + +> **审查维度(§2 – §11)** — 以下各节为审查检查项参考,由执行流程中的 Part 1 / Part 2 驱动使用。 + +--- + +## §2 架构与结构审查 + +- [ ] crate 入口职责清晰:`lib.rs` 负责导出、模块组织;`main.rs` 负责启动与参数解析,不承载大段业务逻辑 +- [ ] 模块分层清晰:`kernel`(核心路由/中间件)/ `plugin`(扩展插件)/ `config`(配置读写)/ `shell`(组装运行时)/ `model`(共享数据模型)职责边界明确 +- [ ] 依赖方向合理:`shell` → `kernel` / `plugin` / `config`,`plugin` / `config` 依赖 `kernel`,`kernel` 不反向依赖上层 crate +- [ ] tower Layer / Service 实现符合 spacegate 插件模式:`PluginBuilder` → `PluginInstance` → `tower::Layer` +- [ ] 公共导出最小化:`pub`、`pub(crate)`、re-export 使用克制,不把内部实现意外暴露成公共 API +- [ ] feature 边界明确:`#[cfg(feature = ...)]` 只包裹必要逻辑,不制造默认构建与全特性构建行为漂移 +- [ ] workspace / crate 结构与 spacegate 实际目录约定一致(`crates/`、`binary/`、`examples/`),不引入与仓库风格冲突的新层级 +- [ ] lint 配置遵守:`#[deny(clippy::unwrap_used, clippy::dbg_macro, clippy::unimplemented, clippy::todo)]` 已在 crate 入口声明 + +--- + +## §3 命名与可读性审查 + +- [ ] **命名三问**通过:看名字知职责?不会混淆?6 个月后仍理解? +- [ ] crate 名、模块名、文件名、核心结构体名语义一致(如 `PluginBuilder`、`SgBody`、`SgRequest`、`CancellationToken`) +- [ ] 无含糊命名(如 `data` / `info` / `handler` / `process` / `tmp`),特别是层(Layer)、服务(Service)、插件(Plugin)类型 +- [ ] 常量命名能体现作用域(如插件名称、route key、header key) +- [ ] 长函数可读:重要分支拆成私有函数,避免深层 `match` / `if` 套娃 +- [ ] 返回路径清晰,不在 `return` / `Ok(...)` / `Err(...)` 中塞入过度复杂逻辑 +- [ ] `use` 组织稳定,外部依赖、本 crate 模块、trait 引入分组合理 + +--- + +## §4 类型与错误处理审查 + +- [ ] 对外 API 优先使用仓库既有模式:`BoxResult`(即 `Result`)或 `PluginError`(插件层) +- [ ] 禁止在库代码中使用 `unwrap()` / `expect()` / `panic!`(crate 已 `#[deny(clippy::unwrap_used)]`);测试和有足够证明安全的启动路径除外 +- [ ] 禁止使用 `todo!()` / `unimplemented!()` 进入生产路径(crate 已 `#[deny(...)]`) +- [ ] 错误向上透传时保留上下文,不吞错、不把底层失败 silently 变成成功 +- [ ] `?`、`map_err`、`From`/`Into` 转换使用合理,错误语义不被扭曲 +- [ ] `tower::Service` / `tower::Layer` 的关联类型 `Error` 使用正确,避免与 `Infallible` 混用造成类型误导 +- [ ] 并发原语(`Arc>>`、`OnceLock`、`CancellationToken`)和 `'static` 生命周期使用清晰 + +--- + +## §5 注释与文档注释审查 + +- [ ] 公共导出 API 有注释 +- [ ] crate 级文档、关键模块文档与实际能力一致 +- [ ] 关键类型 / trait / 初始化入口有一句话说明职责 +- [ ] 示例只在真正能帮助使用者时补充,避免为了凑格式编造伪示例 +- [ ] 代码注释中文,日志消息英文 +- [ ] 注释解释“为什么”,而不是机械复述“代码正在做什么” + +--- + +## §6 性能审查 + +### 时间复杂度 + +- [ ] **串行 await**:循环中的异步调用是否必须串行;能批量化时避免无意义串行等待 +- [ ] **重复计算**:热路径中是否存在可复用但未缓存的计算、配置解析或实例构造 +- [ ] **不必要的序列化**:是否频繁做 `serde_json` 编解码、字符串拷贝、`to_string()` / `clone()` 链式开销 + +### 空间复杂度 + +- [ ] **内存增长**:全局 `HashMap` / `Vec` / 缓存是否可能无限增长,是否有淘汰策略或生命周期边界 +- [ ] **大数据加载**:查询、聚合、导出是否一次性把大结果集全部载入内存 +- [ ] **对象复制**:是否有不必要的 `clone()`、`to_owned()`、大对象搬运 + +### I/O 性能 + +- [ ] **阻塞调用**:异步上下文中是否执行阻塞 I/O、长时间 CPU 计算或 `std::thread::sleep` +- [ ] **连接复用**:DB / HTTP / MQ / WebSocket / client 实例是否复用,避免每次请求临时初始化 +- [ ] **资源释放**:连接、任务、订阅、文件句柄、后台线程在关闭或异常路径能否释放 + +### 代码结构 + +- [ ] **热路径对象创建**:`TardisFunsInst`、client、provider 等是否被重复创建而不是复用 +- [ ] **锁使用正确**:避免在持锁状态下 `.await`,避免大粒度锁包裹慢操作 +- [ ] **复杂度可维护**:超长函数、超深嵌套、重复分支需要拆分或抽象 + +--- + +## §7 分布式审查 + +### 状态一致性 + +- [ ] 是否用模块级 Map/Set 缓存了需跨节点一致的业务数据? + - 必须持久化到 DB 的:任务注册表、分布式锁、业务配置、加密密钥 + - 可以进程内缓存的:SDK client 实例、不可变配置、连接池 + +### 并发安全 + +- [ ] 初始化逻辑是否会被并发重复执行,是否需要 `OnceLock`、幂等检查或显式串行化保护 +- [ ] 共享可变状态是否有竞态风险(check-then-act、先读后写、重复注册、重复提交) +- [ ] 锁粒度是否合理,是否存在死锁、活锁或锁竞争热点 + +### 幂等性与重试 + +- [ ] 写操作接口是否幂等(重复调用结果一致) +- [ ] 网络失败后重试是否会导致重复数据 +- [ ] 写入中途失败时事务是否正确回滚 + +### 定时器与资源 + +- [ ] 后台任务、定时轮询、spawn 出来的线程 / task 是否具备退出条件 +- [ ] 订阅、监听器、连接在异常路径与重复初始化路径都能释放 +- [ ] 多节点部署时是否错误依赖本地内存状态作为唯一真相来源 + +--- + +## §8 安全审查 + +### 注入攻击 + +- [ ] **SQL 注入**:原生 SQL 是否参数化,ORM / 查询构造器是否避免字符串拼接输入 +- [ ] **命令注入**:脚本调用、shell 命令、插件执行是否拼接未校验输入 +- [ ] **路径遍历**:文件读写、对象存储、本地缓存路径是否允许 `../` 或绝对路径逃逸 +- [ ] **反序列化 / 输入污染**:外部输入、配置、HTTP 参数、MQ 负载是否有边界校验 + +### 认证与授权 + +- [ ] 管理接口、内部接口、初始化接口是否有明确权限边界 +- [ ] 租户、组织、空间、上下文隔离是否完整,避免串租户 / 越权访问 +- [ ] 认证信息、证书、token 的使用与刷新逻辑是否安全 + +### 敏感信息 + +- [ ] 无硬编码密钥(API Key、密码、token) +- [ ] 密码/token/apiKey/用户隐私字段未出现在日志中 +- [ ] 连接串、证书、账号、第三方密钥输出前已脱敏 +- [ ] 默认配置、示例配置、测试夹具中未泄露真实凭据 + +### HTTP 安全 + +- [ ] 外部 URL / 回调地址 / webhook 地址经过校验,避免 SSRF +- [ ] 文件上传、下载、转发接口有类型与大小限制 +- [ ] 安全响应头、跨域策略、回源策略符合该 service 的暴露方式 + +### 数据安全 + +- [ ] 密码或敏感凭据使用安全算法处理,避免自造加密或弱哈希 +- [ ] 随机数、ID、token 来源安全,避免可预测值 +- [ ] 配置、DTO、外部入参在 API 边界完成校验,不把脏数据一路带到存储层 + +### 依赖安全 + +- [ ] 新增依赖有必要性,且不会明显增加供应链风险 +- [ ] 对 unsafe、FFI、脚本桥接或 wasm 边界保持额外审慎 + +--- + +## §9 日志与可观测性审查 + +- [ ] 日志统一使用 `tracing` 宏(`tracing::info!` / `tracing::warn!` / `tracing::error!` / `tracing::debug!`),禁止 `println!` / `eprintln!`(CLI / wasm 边界等特例除外) +- [ ] 禁止 `dbg!`(crate 已 `#[deny(clippy::dbg_macro)]`) +- [ ] 日志级别符合语义:启动与生命周期用 info/error,异常细节用 debug/warn,避免刷屏式 info +- [ ] 关键路径有足够上下文(模块、对象标识、操作阶段),但不泄露敏感信息(请求 body、凭据、token) +- [ ] 重要错误可定位:失败日志至少能看出发生位置、动作和失败原因 +- [ ] tracing span 在长链路(async task、插件调用、后端转发)中正确传播 +- [ ] 日志消息英文,代码注释中文 + +--- + +## §10 测试审查 + +- [ ] 测试入口符合 Rust crate 习惯:单元测试放模块内,集成测试放 `tests/`,跨 crate 测试尽量走公开 API +- [ ] 正常路径 + 边界路径 + 参数选项均有覆盖 +- [ ] 错误路径有断言,必要时断言错误码、错误类别或关键消息 +- [ ] 多实现 / 多 feature / 多 provider 场景有共同测试约束 +- [ ] 测试名称清晰表达场景 +- [ ] 无不稳定断言(随机性、时序依赖、外部服务强耦合) +- [ ] 涉及数据库、缓存、消息系统时,测试资源初始化与清理明确 + +--- + +## §11 文档审查 + +### README + +- [ ] README 至少说明用途、主要能力、关键配置、运行/测试方式与限制条件 +- [ ] README 无完整类型定义、无内部实现细节 +- [ ] 任何 API、feature、配置项、插件、服务启动方式变更已同步 README / 测试 / 示例 + +### 仓库配套文档 + +- [ ] `Cargo.toml`、示例配置(`examples/`、`resource/install/default-config/`)与实现保持一致 +- [ ] 新增 feature、环境变量、启动参数、对外接口时有对应说明(`docs/` 目录下相关文档) +- [ ] 如果变更影响多个 crate,需检查调用方文档是否需要同步 +- [ ] k8s 相关变更需同步 `docs/k8s/` 及 `resource/kube-manifests/` 中的配置 + +--- + +> **执行流程** — 以下为审查的执行过程,依次完成 Part 1 → Part 2 → 变更汇总。 + +--- + +## Part 1 — 基础审查 + +> 按 §2-§11 逐维度审查代码,发现问题**即时修复或明确记录原因**,完成后输出审查清单。 + +### 执行步骤 + +1. **界定范围**:先说明审查对象、crate 边界、feature、运行方式和风险面。 +2. **读取代码**:按 §2-§11 维度逐一审查源码、测试、配置与工作流,必要时确认引用关系。 +3. **即时修复**:发现明确缺陷时,优先做小而可验证的修改;若暂不修复,必须记录风险等级和原因。 +4. **输出清单**:每个维度输出 checkbox 结果表,标记 ✅ 合规 / ❌ 不合规(已修复)/ ⚠️ 需注意 / ⏸ 暂未处理。 + +### 维度对照 + +| 维度 | 章节 | 审查要点 | +|------|------|---------| +| 架构与结构 | §2 | crate 入口职责、模块分层、依赖方向、tower Layer 模式、lint 配置 | +| 命名与可读性 | §3 | 命名三问、一致性、函数拆分、use 组织 | +| 类型与错误处理 | §4 | `BoxResult` / `PluginError`、禁止 panic / todo / unwrap、错误上下文 | +| 注释 | §5 | crate 文档、公共 API 注释、中英文规范 | +| 性能 | §6 | 串行 await、阻塞调用、clone 开销、资源释放 | +| 分布式 | §7 | 状态一致性、并发安全、幂等性、CancellationToken 清理 | +| 安全 | §8 | 注入、权限、敏感信息、配置与输入边界 | +| 日志与可观测性 | §9 | `tracing` 宏、禁止 `dbg!` / `println!`、日志级别、上下文、脱敏 | +| 测试 | §10 | 覆盖度、断言风格、集成测试稳定性 | +| 文档 | §11 | README、Cargo 配置、示例与实现一致性 | + +### Part 1 完成标志 + +- 所有 §2-§11 维度均已审查 +- P0/P1 问题已修复或被明确标记为阻塞项 +- 已输出各维度 checkbox 清单 + +> Part 1 完成后,**自动进入 Part 2**。 + +--- + +## Part 2 — 交叉评审与模拟 + +> 以 4 个专家角色重新审视代码,交叉辩论发现深层问题,并进行生产事故模拟。修复 Part 1 遗漏的问题。 + +### 阶段 1:4 角色独立评审 + +| 角色 | 关注领域 | +|------|---------| +| **架构师** | 模块结构、分层、依赖方向、扩展性(§2) | +| **性能专家** | 时间/空间复杂度、I/O、并发、分布式状态(§6-§7) | +| **安全专家** | 注入、权限绕过、敏感信息、输入校验(§8) | +| **代码审查专家** | 命名、类型、注释、测试、文档合规(§3-§5, §9-§11) | + +每个角色给出 **高价值发现**: + +- 优先列出 P0/P1 问题;没有问题时可明确写“未发现明显高风险问题” +- 结论尽量指向具体文件、函数或代码片段 +- 不强行凑数量,禁止为了满足配额制造伪问题 + +### 阶段 2:交叉辩论 + +至少进行 1 轮交叉质疑: + +- 对存在争议的风险点补充证据 +- 区分“真实缺陷”“风格偏好”“未来优化建议” +- 若前一角色判断过重或过轻,必须修正优先级 + +### 阶段 3:生产事故模拟 + +分析代码在以下场景的行为: + +| 场景 | 分析要点 | +|------|---------| +| **高并发** | 多请求同时 init / 写入 / 关闭 — 竞态?状态一致? | +| **恶意输入** | 超长字符串、特殊字符、类型不匹配 — 校验完整? | +| **依赖不可用** | DB 断连、Redis 超时、API 限流 — 优雅降级?错误准确? | +| **资源耗尽** | 内存不足、连接池打满、task 积压 — 有背压?能恢复? | +| **数据损坏** | 写入中途失败 — 事务回滚正确? | +| **配置错误** | 错误类型、缺必填、值越界、feature 组合异常 — 校验完整? | +| **多节点部署** | 模块级缓存是否业务数据?各节点状态一致?DB 为唯一数据源? | + +### 阶段 4:问题汇总 + +合并 4 角色评审 + 事故模拟发现的问题,与 Part 1 已修复项去重,按 P0/P1/P2/P3 排序。 + +### 阶段 5:修复遗漏 + +针对 Part 2 新发现的 P0/P1 问题: + +1. 输出修复方案 +2. 实施代码 / 测试 / 文档成套修改 +3. 能补测试的必须补测试;若确实无法自动化覆盖,必须说明原因和替代验证方式 + +### 阶段 6:二次审查 + +确认 P0/P1 已清零且未引入新问题。 + +### 阶段 7:质量门禁 + +优先使用与改动范围匹配的 Rust 校验链: + +1. `cargo fmt --all -- --check` 或针对目标 package 的 `cargo fmt -p -- --check` +2. `cargo clippy --all-features` 或针对目标 package / feature 的 clippy 校验(注意仓库 `clippy.toml` 自定义配置) +3. `cargo test` 或针对目标 package / integration tests 的测试(`cargo test -p `) + +必要时补充: + +- `cargo check --all-features` +- 对应 crate 的 `Cargo.toml` feature 组合检查 +- 涉及 k8s 配置或 docker 镜像变更时,检查 `resource/` 下的配置文件 +- 仅文档改动时,至少保证内容与仓库事实一致 + +P0 或 P1 未清零 → 回到阶段 5。 + +### 阶段 8:文档完善 + +- 参考 §11 要求,更新 README、`Cargo.toml` 相关说明、配置示例、工作流说明等与实际实现保持一致。 + +--- + +## 变更汇总与输出格式 + +> Part 1 + Part 2 全部完成后,输出本次审查的合并变更清单。 + +### 问题优先级 + +| 等级 | 含义 | 处理 | +|------|------|------| +| **P0** | 数据损坏 / 安全漏洞 / 服务崩溃 / 连接泄漏 / 死锁 / OOM | 必须立即修复 | +| **P1** | 内存泄漏 / 竞态 / 权限绕过 / 错误吞没 / 非幂等写入 | 本轮修复 | +| **P2** | 性能瓶颈 / 可读性 / 冗余 / 缺少缓存 | 建议修复 | +| **P3** | 风格 / 命名 / 注释 / 结构优化 | 顺手修复 | + +### 单项问题格式 + +``` +### {等级} 问题标题 +- 位置:`src/...` 或 `tests/...` 或 `Cargo.toml` 的具体位置 +- 问题:具体描述 +- 风险:后果 / 影响 +- 修复:实际执行的改动 / 保留的建议(P2/P3 未修复时说明原因) +``` + +### 变更汇总清单 + +审查最终输出一份合并清单,包含: + +1. **已修复项**:列出所有 Part 1 + Part 2 中实际修改的问题(含等级、文件、改动摘要) +2. **保留建议项**:P2/P3 中未修复但建议后续处理的项 +3. **合规确认项**:审查通过无需改动的维度汇总 + +格式示例: + +``` +## 变更汇总 + +### 已修复(N 项) +| # | 等级 | 文件 | 改动摘要 | +|---|------|------|---------| +| 1 | P1 | `backend/foo/src/lib.rs` | 修复初始化竞态 | +| 2 | P3 | `backend/foo/README.md` | 补充配置说明 | + +### 保留建议(M 项) +| # | 等级 | 位置 | 建议 | +|---|------|------|------| +| 1 | P2 | `backend/foo/src/serv.rs` | 建议对热路径实例做缓存复用 | + +### 合规确认 +- ✅ 架构与结构(§2) +- ✅ 安全(§8) +- ... +``` + +--- + +## 示例触发语句 + +- "审查 `crates/kernel` 的代码质量" +- "review plugin 模块代码质量" +- "检查 spacegate-config 的分布式安全性" +- "审查新插件的初始化与配置处理" +- "做一次完整 Rust 模块代码审查" diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index a37dc7b0..fb1ae3ad 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -381,6 +381,24 @@ jobs: - name: Copy binary to workspace run: cp ./target/release/spacegate resource/docker/spacegate/ + - name: Compute Docker tag + shell: bash + run: | + if [[ "$GITHUB_REF" == "refs/heads/master" ]]; then + tag="latest" + else + raw_tag="${GITHUB_HEAD_REF:-$GITHUB_REF_NAME}" + tag=$(printf '%s' "$raw_tag" | tr -cs '[:alnum:]._-' '-') + tag="${tag#[-.]}" + tag="${tag%[-.]}" + + if [[ -z "$tag" ]]; then + tag="${GITHUB_SHA::12}" + fi + fi + + echo "DOCKER_TAG=$tag" >> "$GITHUB_ENV" + - name: Build and push Docker image to dockerhub uses: mr-smithers-excellent/docker-build-push@v5 with: @@ -388,7 +406,7 @@ jobs: dockerfile: resource/docker/spacegate/Dockerfile registry: docker.io image: ecfront/spacegate - tags: ${{ github.ref == 'refs/heads/master' && 'latest' || github.ref_name }} + tags: ${{ env.DOCKER_TAG }} username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} @@ -447,4 +465,4 @@ jobs: uses: softprops/action-gh-release@v1 with: files: | - ${{ env.ARTIFACT_DIR }}/*.* \ No newline at end of file + ${{ env.ARTIFACT_DIR }}/*.* diff --git a/PERF_SECURITY_REPORT.md b/PERF_SECURITY_REPORT.md new file mode 100644 index 00000000..16bfcd87 --- /dev/null +++ b/PERF_SECURITY_REPORT.md @@ -0,0 +1,309 @@ +# Spacegate 性能与安全专项审查报告 + +- 分支:`review/all-modules` +- 审查范围:`crates/kernel`、`crates/plugin`、`crates/config`、`crates/shell`、`crates/extension/redis`、`binary/admin-server` +- 依据:`.github/skills/review/SKILL.md` §6(性能)与 §8(安全) +- 范围说明:本报告为在 `REVIEW_REPORT.md` 基础上 **针对性能 / 安全的深挖**,聚焦在「会在请求热路径或生产暴露面上真正出问题」的点,不重复已在前一轮修复的条目。 + +--- + +## 一、总体结论 + +| 等级 | 性能 | 安全 | +| ---- | ---- | ---------------------------------------------------------------------------------------- | +| P0 | 0 | **1** — 默认后端 HTTP 客户端关闭 TLS 证书校验 | +| P1 | 2 | **4** — admin-server 明文 HTTP / 无请求体大小限制 / 无登录限速 / kernel 无全局 body 上限 | +| P2 | 3 | 3 | +| P3 | 2 | 2 | + +最严重的一条是 **P0-SEC-1**:在任何未显式配置自定义 `rustls::ClientConfig` 的部署中,spacegate 作为反向代理向后端建立 HTTPS 连接时会 **跳过所有证书校验**,完全可被中间人。 + +--- + +## 二、安全(Security) + +### P0-SEC-1 — 默认后端 HTTPS 客户端关闭证书校验(MITM 风险) + +**位置**:[crates/kernel/src/backend_service/http_client_service.rs](crates/kernel/src/backend_service/http_client_service.rs#L67-L76) + +```rust +fn get_rustls_config_dangerous() -> rustls::ClientConfig { + let store = rustls::RootCertStore::empty(); + ... + let mut dangerous_config = rustls::ClientConfig::dangerous(&mut config); + dangerous_config.set_certificate_verifier(Arc::new(NoCertificateVerification {})); + config +} +... +impl Default for ClientRepo { + fn default() -> Self { + let config = get_rustls_config_dangerous(); // ← default 调用 dangerous + let default = HttpClient::new(config); + ... + } +} +``` + +- `ClientRepo::default()` 是 `ClientRepo::global()` 的初始化值。只要未手动调用 `set_global_default` 注入一个安全的 `HttpClient`,所有走 `get_client()`、`get_or_default` 的后端请求 **都会接受任意伪造证书**。 +- `NoCertificateVerification` 的 `verify_server_cert` 直接 `Ok(ServerCertVerified::assertion())`,对主机名、CA、签名、到期一律放行。 +- 该 ClientRepo 被 `http_client_service` 作为所有 HTTPS backend 请求的默认出口使用。 +- **攻击面**:对手只要能在网关与后端之间插入流量(同机房二层劫持、BGP 劫持、DNS 劫持、服务网格 sidecar 被攻破),就能冒充任意后端接收请求、改造响应、窃取 token。 +- **建议**: + 1. `ClientRepo::default()` 改为使用 `rustls::ClientConfig::builder().with_native_roots().with_no_client_auth()`(fallback 到 `webpki-roots` 也可)。 + 2. 保留 `HttpClient::new_dangerous()` 作为显式 opt-in,并在注释中警告「仅用于受控测试或内网全白名单」。 + 3. 在 `ClientRepo` 文档中强调 `set_global_default` 的必要性。 + 4. 建议增加 `SGE_ALLOW_INSECURE_BACKEND` 或类似环境变量做最后一道闸门,默认关闭。 + +--- + +### P1-SEC-2 — admin-server 监听明文 HTTP + +**位置**:[binary/admin-server/src/main.rs](binary/admin-server/src/main.rs#L40-L48) + +```rust +let listener = tokio::net::TcpListener::bind(addr).await?; +axum::serve(listener, ...).await?; +``` + +- admin-server 承载登录(JWT 签发)、配置写入(gateway / route / plugin CRUD)、密钥摘要比对,却完全走明文 HTTP。 +- 上一轮我们已给登录 Cookie 加上 `Secure; SameSite=Strict`,但 **当连接是 HTTP 时浏览器不会发送 `Secure` Cookie**,整个认证与授权链路在明文网络里暴露 `Authorization: Bearer ...` 和登录请求体 `{ak, sk}`。 +- **建议**: + 1. 通过 clap 参数或配置文件增加 `--tls-cert`、`--tls-key`,默认要求启用 rustls 监听(`axum-server` 或 `hyper-util` + `tokio-rustls`)。 + 2. 如需保持 plain HTTP(比如部署在 K8s mTLS 服务网格内),要求显式 `--allow-insecure`,并在启动日志中 `tracing::warn!` 提示。 + +--- + +### P1-SEC-3 — admin-server 无请求体大小 / 连接超时限制 + +**位置**:[binary/admin-server/src/main.rs](binary/admin-server/src/main.rs#L40-L48)、登录端点 [binary/admin-server/src/service/auth.rs](binary/admin-server/src/service/auth.rs) + +- axum 默认 `Json` extractor 读取整个请求体到内存;未加 `tower-http::limit::RequestBodyLimitLayer`、未加 `tower::timeout::TimeoutLayer`、未加 `axum::extract::DefaultBodyLimit`(axum 0.6+ 默认 2 MiB,但 admin-server 使用 axum 0.7 需手动确认)。 +- 攻击者可发送 1 GB `Content-Length` 的 `POST /login`,结合多并发耗尽进程内存;或发送慢速连接占满 FD。 +- **建议**: + ```rust + let app = app + .layer(axum::extract::DefaultBodyLimit::max(1 << 20)) // 1 MiB + .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(30))); + ``` + 并在启动时通过 `tower-http::limit::RequestBodyLimitLayer` 强制每个路由上限。 + +--- + +### P1-SEC-4 — admin-server 登录无速率限制 / 锁定 + +**位置**:[binary/admin-server/src/service/auth.rs](binary/admin-server/src/service/auth.rs) + +- `login` 端点直接比较 `sk` 的 SHA-256 摘要,无失败计数、无 IP 限速、无账户锁定。 +- 虽然 SHA-256(sk) 抗撞攻击成本高,但若 `sk` 是人类可读口令,离线字典攻击对 SHA-256(无盐)成本极低,线上只要能持续试就可能命中。 +- **建议**: + 1. 将 `sk_digest` 的对比改为 **constant-time**(`subtle::ConstantTimeEq`),消除时序侧信道。 + 2. 增加 tower middleware 做 IP/账户级滑窗限速(`tower-governor` 或基于 Redis 的自写中间件)。 + 3. 更长期:存储改用 `argon2`/`bcrypt` 加盐哈希替代无盐 SHA-256。 + +--- + +### P1-SEC-5 — kernel 无全局请求 body 上限 + +**位置**:`crates/kernel/src/service/http_gateway.rs` 及其 `listen.rs`、`SgBody` + +- 在 kernel 中搜 `RequestBodyLimit|BodyLimit|max_body|max_frame` 无任何匹配。hyper v1 的 `http1::Builder` / `http2::Builder` 默认不对请求 body 大小做限制。 +- 对于 gateway:客户端可发起无限大小 `Transfer-Encoding: chunked` 请求;任何后续插件或插件里 `body.collect().await` 会把整条流拉入内存,耗尽堆。 +- **建议**:在 `SgListen` / `Gateway::builder` 提供 `.max_request_body_size(Option)`,通过 `hyper_util::server::conn::auto::Builder::max_body_size` 或在首个 tower Layer 中用 `http_body_util::Limited` 包一层。对 `SgBody` 默认值建议 8–16 MiB,可由 gateway 配置覆盖。 + +--- + +### P2-SEC-6 — static_file_service 的 canonicalize 失败回退不够稳健 + +**位置**:[crates/kernel/src/backend_service/static_file_service.rs](crates/kernel/src/backend_service/static_file_service.rs#L51-L57) + +```rust +let path = dir.join(request.uri().path().trim_start_matches('/')) + .canonicalize() + .unwrap_or(dir.to_owned()); +if !path.starts_with(dir) { return 404; } +``` + +- 当 `canonicalize()` 失败(文件不存在、权限不足、Windows 上带 `\\?\` 前缀问题等)时 fallback 到 `dir` 本身;后续 `File::open(dir)` 会得到目录,并进入分支返回 303 → `/index.html`。对于不存在的文件应当返回 404 而不是重定向,当前行为会把 **任何不存在路径的请求都重定向到 /index.html**,这对 SPA 是功能特性但对纯静态资源目录是信息泄漏(告诉外部 `/index.html` 存在)。 +- 更重要的是,`canonicalize` 在某些 Windows / 符号链接 / 容器挂载场景会返回 UNC 路径(`\\?\C:\...`),此时 `starts_with(dir)` 可能因 `dir` 为 `C:\...` 而 **判空失败**,把合法文件当成越权过滤掉;反之亦可能因前缀匹配误判通过。 +- **建议**: + 1. canonicalize 失败时直接 `404`,不要回退到 `dir`。 + 2. 对 `dir` 本身预先 `canonicalize()` 一次缓存,比较时用 canonical vs canonical。 + 3. 在 Windows 上显式 strip `\\?\` 前缀后再比较,或使用 `same_file::is_same_file`。 + +--- + +### P2-SEC-7 — redis 插件的 key 拼接未限制 header 值字符 + +**位置**:[crates/plugin/src/ext/redis/plugins.rs](crates/plugin/src/ext/redis/plugins.rs#L9-L23) + +```rust +let header = req.headers().get(header).and_then(|v| v.to_str().ok())?; +Some(format!("{}:{}:{}", method, path, header)) +``` + +- 被 `redis_count`、`redis_time_range`、`redis_limit`、`redis_dynamic_route` 共用。 +- RESP 协议本身通过长度前缀传递,不存在「Redis 命令注入」,但 **键名逻辑层的冒号 `:` 用作分隔符**,而 `HeaderValue::to_str()` 放行任何 ASCII 可见字符(含 `:`)。 +- 攻击面(仅 `redis_dynamic_route` 危险):插件用 `format!("{}:{}", prefix, key)` 再读取后端 `domain`。攻击者若能同时控制路由/方法匹配与 header 值,可构造 header 值为 `*:/other-prefix:victim-key`,引导插件读到原本属于另一个 `{prefix'}:{method}:{path}:{header}` 条目的 domain,等同把流量路由到 **任意由管理员在 Redis 中登记过的后端域名**(管理员本意绑定给另一 header 值的域名)。 +- **建议**: + 1. 对 header 值做白名单过滤(`A-Za-z0-9_-.` 等),遇到其他字符返回 401。 + 2. 对 domain 解出后额外做白名单校验(参见 P2-SEC-8)。 + +--- + +### P2-SEC-8 — redis_dynamic_route 的 domain 未限制为白名单 + +**位置**:[crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs](crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs#L50-L73) + +- 插件从 Redis 读 `domain`,直接拆 `scheme://host`,拼入出站 URI;结合 P0-SEC-1 默认 HTTPS 不验证证书,相当于把 gateway 变成 **由 Redis 控制的开放代理**。 +- 若 Redis 被未授权写入(管理面权限、Redis 同实例跨业务共用、或 key 冲突如 P2-SEC-7),攻击者可把某个路由后端改成内网 `http://169.254.169.254/`(云元数据)、内部管理网段、`file://` 类 scheme-confusion(尽管本插件过滤了后者)等。 +- **建议**: + 1. `RedisDynamicRouteConfig` 增加 `allowed_hosts: Vec` 或 `allowed_host_regex: String`,只接受白名单命中的 domain。 + 2. 明确只接受 `http` / `https` scheme(当前其实没校验 scheme_str,攻击者可写入 `gopher://...` 之类让 hyper 报错而触发不可控异常)。 + +--- + +### P1-SEC-9 — kernel x_request_id.rs `unwrap_unchecked` 引入未定义行为 + +**位置**:[crates/kernel/src/utils/x_request_id.rs](crates/kernel/src/utils/x_request_id.rs#L44) 以及第 48 行 + +```rust +let ts_id = unsafe { SystemTime::now().duration_since(UNIX_EPOCH).unwrap_unchecked().as_millis() as u64 } << 22; +... +unsafe { HeaderValue::from_str(&format!("{:016x}", id)).unwrap_unchecked() } +``` + +- `duration_since(UNIX_EPOCH)` 在系统时钟被人为调到 1970 之前时返回 `Err`,`unwrap_unchecked` 上 **直接 UB**(不是 panic 而是未定义行为)。 +- `HeaderValue::from_str(format!("{:016x}"))` 虽然输入永远是十六进制,`unwrap_unchecked` 在代码层确实安全,但缺 SAFETY 注释,且一旦有人重构 format 串(例如加空格或非 ASCII 分隔)就会悄悄变成 UB。 +- **建议**: + ```rust + let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_millis() as u64).unwrap_or(0); + let ts_id = now_ms << 22; + ... + HeaderValue::from_str(&format!("{:016x}", id)).unwrap_or_else(|_| HeaderValue::from_static("0")) + ``` + `unsafe` 块要么去掉,要么补 `// SAFETY:` 注释解释为什么 `unwrap_unchecked` 成立。 + +--- + +### P2-SEC-10 — helper_layers/function.rs 与 http_route.rs 的 `unwrap_unchecked` + +- [crates/kernel/src/helper_layers/function.rs](crates/kernel/src/helper_layers/function.rs#L231) `Inner::call` 对 `ArcHyperService::call` 的 `Error` 做 `unwrap_unchecked`,依赖 `Error = Infallible`。 +- [crates/kernel/src/service/http_route.rs](crates/kernel/src/service/http_route.rs#L229-L233) 对 `http_backend_service` 返回做 `unwrap_unchecked`。 +- 如果将来有人把 backend service 的 `Error` 从 `Infallible` 改为别的类型,这里会变成 UB 而且可能编译通过。 +- **建议**:改为 `match ... { Ok(r) => r, Err(e) => match e {} }`(对 `Infallible` 的空枚举匹配是零成本等价写法,且编译器会在类型变更时报错),或至少添加 `// SAFETY: inner is ArcHyperService` 注释。 + +--- + +## 三、性能(Performance) + +### P1-PERF-1 — `http_client_service::ClientRepo` 每次请求取用都要 `Mutex::lock` + +**位置**:[crates/kernel/src/backend_service/http_client_service.rs](crates/kernel/src/backend_service/http_client_service.rs#L100-L110) + +```rust +pub fn get(&self, code: &str) -> Option { + self.repo.lock().expect("failed to lock client repo").get(code).cloned() +} +``` + +- `ClientRepo.repo` 是 `std::sync::Mutex>`。`get_or_default` / `get` 位于每个后端请求路径,所有线程串行抢互斥锁,同时 `ClientRepo::global()` 外层又加了一个 `RwLock`。双层同步且内层 Mutex。 +- 客户端对象本身通过 `Arc` 共享,配置变更频率远低于请求频率。 +- **建议**: + 1. 把内层 `Mutex` 换成 `ArcSwap>` 或 `parking_lot::RwLock`;读路径零竞争克隆 `Arc`,写路径少数几次。 + 2. 去掉外层 `RwLock`,让 `ClientRepo` 本身通过 `Arc` + 内部 `ArcSwap` 自我可变。 + +--- + +### P1-PERF-2 — 静态 mutex 持有期间做 HashMap 查找,可被请求路径放大 + +**位置**:[crates/kernel/src/extension/defer.rs](crates/kernel/src/extension/defer.rs#L22-L27) + +```rust +let mut g = self.mappers.lock().expect("never poisoned"); +``` + +- `Defer::push_back` / `apply` 在每个请求生命周期内都被调用(路由匹配、插件后置动作)。`std::sync::Mutex` 粒度是整个 Defer 容器。 +- 虽然临界区短,但在多 vCPU 机器上高并发抢锁会是微延迟瓶颈。 +- **建议**:`Defer` 改为 `SmallVec<[BoxedDeferFn; 4]>` 直接挂在 request extension 里,而不是走共享 Mutex;或改 `parking_lot::Mutex`。 + +--- + +### P2-PERF-3 — `shell/src/server.rs` 配置重载路径大量 `.clone()` + +**位置**:[crates/shell/src/server.rs](crates/shell/src/server.rs#L33-L90)、L249-L350 + +- 每次 route 重建时对 `gateway_name: Arc`、`route_name: Arc` 多次 `.clone()`(每次 Arc 增减引用计数),对 `HashMap` 做 `.clone()`,对 `config_item.gateway` 整体 clone。 +- 不在请求热路径而在 **配置变更** 路径,影响的是重载延迟与短时内存峰值。对于 K8s 场景(watcher 抖动会触发频繁 reload),仍建议优化。 +- **建议**:`Arc` 的 clone 本就便宜,可忽略;但 `HashMap` 与 `SgGateway`(含 TLS cert 原始数据)的深克隆建议改为 `Arc>` / `Arc` 共享。 + +--- + +### P2-PERF-4 — `crates/plugin/src/plugins/static_resource.rs::create()` 同步 `std::fs::read` + +**位置**:[crates/plugin/src/plugins/static_resource.rs](crates/plugin/src/plugins/static_resource.rs#L77) + +- 位于插件初始化路径(异步任务 `create()`),当前同步 `fs::read` 不在 `call()` 热路径,属 **可接受**。 +- 但 `create()` 在 K8s watcher 路径被异步任务调用;若静态资源文件 >100 MiB 或位于慢盘(NFS),会阻塞 runtime 的一个 worker 数毫秒到数秒。 +- **建议**:`create()` 中改用 `tokio::task::spawn_blocking` 包一次 `std::fs::read`,保持初始化语义但让出 runtime。 + +--- + +### P2-PERF-5 — redis 插件每次请求 `serde_json::from_value(config.spec.clone())` 被初始化一次 + +**位置**:[crates/plugin/src/ext/redis/plugins/redis_count.rs](crates/plugin/src/ext/redis/plugins/redis_count.rs#L54)、`redis_dynamic_route.rs:31` 等 + +- 这些都在 `Plugin::create()` 初始化路径,不在 `call()` 热路径 — **可接受**。 +- 顺手提醒:`redis_count::redis_call` 里每个请求打 4 次 Redis round-trip(`EXISTS / SET / INCR / GET`),可用 Lua `EVAL` 合并为 1 次 RTT,降低尾延迟。 + +--- + +### P3-PERF-6 — `crates/kernel/src/service/http_route/match_hostname.rs` 安全化后引入额外探测 + +**位置**:`match_hostname.rs::get_mut_by_iter`(上一轮修复 UB 时改造) + +- 新实现通过「先只读探测 `has_child_match`,再下钻」多走一次不可变遍历。对比旧 UB 实现在最坏情况下增加 O(depth) 次只读比较。对一般主机名树(深度 ≤ 10),影响几十纳秒,**可接受**。 +- 建议:后续如 profiling 显示热点,可用 `hashbrown::raw_entry_mut` 或切回 `entry().or_insert_with` 的结构性改动。 + +--- + +### P3-PERF-7 — `crates/kernel/src/backend_service/http_client_service.rs::HttpClient::request` 在 error 路径把 body 全部读入 `Response::bad_gateway` + +- 仅影响错误路径延迟,不影响主流程。低优先级。 + +--- + +## 四、不阻塞发布但建议跟进 + +| 编号 | 类型 | 说明 | 建议 | +| ----- | ---- | --------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------- | +| FUP-1 | 观测 | `tracing::instrument` 缺少 `skip_all`、`fields` 少量关键维度(gateway/route/plugin code) | 在 kernel 关键 service 的 `call` 上补 `#[instrument(skip_all, fields(gateway=%..., route=%...))]` | +| FUP-2 | 安全 | admin-server 对 Retrieve 接口没有租户边界,任何具备 JWT 的 caller 可读任意 gateway | 增加 RBAC/ACL 或至少区分 "read_only" / "admin" role | +| FUP-3 | 安全 | `SgTlsConfig` 中证书/私钥字段应考虑在日志中 `impl Debug` 时脱敏 | 自定义 `Debug` 打印 `""` | +| FUP-4 | 性能 | `ArcHyperService` 与 `tower::util::BoxCloneService` 每个 Layer 包裹一次 `Arc>`,开销累积 | 分析插件 layer 数量,考虑 `tower::ServiceBuilder::into_make_service` 合并 | + +--- + +## 五、建议的修复批次 + +优先级排序(按影响面 × 发生概率): + +1. **本轮立即修**(生产安全阻塞) + - P0-SEC-1 默认 HTTPS 证书校验开关 + - P1-SEC-2 admin-server 默认 TLS + - P1-SEC-3 admin-server 请求体大小 / 超时 + - P1-SEC-5 kernel 全局 body 上限 + - P1-SEC-9 x_request_id 去 UB + +2. **下一迭代** + - P1-SEC-4 登录限速 / constant-time + - P2-SEC-6 static_file canonicalize 回退 + - P2-SEC-7 / P2-SEC-8 redis 插件 key 白名单 + domain 白名单 + - P2-SEC-10 `unwrap_unchecked` 替换为 `Infallible` match + - P1-PERF-1 / P1-PERF-2 Mutex → ArcSwap / parking_lot + +3. **长尾** + - FUP-1..4 与 P3 级性能项 + +--- + +*报告写于 `review/all-modules` 分支,基于 commit `d038138` 之上的静态审查。* diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md new file mode 100644 index 00000000..5e51f0d7 --- /dev/null +++ b/REVIEW_REPORT.md @@ -0,0 +1,185 @@ +# Spacegate 全工作区代码审查报告 + +- 审查分支:`review/all-modules` +- 基线提交:`master` +- 审查依据:`.github/skills/review/SKILL.md` +- 覆盖范围:`crates/kernel`、`crates/plugin`、`crates/config`、`crates/model`、`crates/shell`、`crates/extension/axum`、`crates/extension/redis`、`binary/spacegate`、`binary/admin-server` +- 代码规模:9 crate,约 17.7k 行 Rust 代码 + +--- + +## 一、执行摘要 + +按 skill 要求沿 **架构 → 命名 → 类型与错误处理 → 注释 → 性能 → 分布式 → 安全 → 日志 → 测试 → 文档** 10 个维度扫描整个工作区。共识别出: + +- **P0(必须修复,阻塞发布)**:3 项 — 全部修复 +- **P1(强烈建议修复)**:8 项 — 7 项修复 / 1 项已有预存工作(config fs 特性 Windows 编译问题,不属于本次审查范围,已在报告中记录) +- **P2(建议修复)**:4 项 — 2 项修复 / 2 项作为改进建议 +- **P3(清理)**:3 项 — 全部修复 + +两次提交: +1. `f8be504 review: fix P0 panics and Windows portability` +2. (本轮) `review: batch P1/P2/P3 fixes and final review report` + +--- + +## 二、各维度发现与处理 + +### 2.1 架构(Architecture) + +**良好**: +- 分层清晰:kernel(请求管道)→ plugin(业务横切)→ shell(组合与运行时)→ config(适配外部源)→ binary(入口) +- 使用 `tower::Layer` + `hyper::service::Service` 统一抽象 +- `CancellationToken` 树状传播支持优雅关停 +- 插件与配置来源均通过 feature flag 切换,默认最小化 + +**问题**: +- **`crates/plugin` 存在大量废弃模块**:`retry.rs`、`decompression.rs`、`status.rs`(及其子目录)、`status_prev.rs` 实际上依赖 **`tardis` crate**(此 workspace 不存在),源文件保留但不纳入编译,Cargo.toml 里仍声明 `retry` / `status` / `decompression` feature,`crates/shell/Cargo.toml` 里仍声明 `plugin-retry` / `plugin-status` / `plugin-decompression` feature 传递。属于不可达、误导用户的死代码。 + - **处理**:删除 4 个源文件与 `status/` 子目录;移除 `plugin/Cargo.toml` 对应 feature 与 `hyper-util` 可选依赖;移除 `shell/Cargo.toml` 对应 feature;清理 `plugins.rs` 中注释的模块声明。 + +### 2.2 命名(Naming) + +- 整体符合 Rust API Guidelines:`snake_case` / `PascalCase` / `SCREAMING_SNAKE_CASE` +- `SgHttpRoute`、`SgRequest`、`SgBody` 等业务前缀一致 +- **Typo**:`crates/kernel/src/helper_layers/reload.rs:82` `"should never be posisoned"` → `poisoned` + - **处理**:修正,并同时将 `.expect` 改为 `.unwrap_or_else(|e| e.into_inner())` 以从中毒锁恢复 + +### 2.3 类型与错误处理(Types & Errors) + +**良好**: +- `BoxResult = Result` 贯穿 kernel/shell +- `PluginError` 在 plugin 层规范化,kernel 开启 `#[deny(clippy::unwrap_used, dbg_macro, unimplemented, todo)]` + +**P0 问题(已修复 @ `f8be504`)**: + +1. **`crates/extension/redis/src/lib.rs`**:`get_conn()` 内部对 `pool.get().await` 直接 `unwrap()`,Redis 池耗尽或网络异常时会使 **整个 gateway 进程 panic**。 + - **处理**:`RedisClient::new` 改为 `Result`,`get_conn()` 改为 `Result`;将 `From<&str>` 替换为 `TryFrom<&str>`;`add()` 签名从 `impl Into` 收紧为 `RedisClient`;更新所有调用点(`plugin/src/plugins/limit.rs`、`plugin/src/ext/redis/plugins/*.rs`、`shell/src/server.rs` 与测试)。 + +2. **`crates/kernel/src/backend_service/static_file_service.rs`**:使用 `std::os::unix::fs::MetadataExt::size()`,导致 Windows 无法编译。 + - **处理**:改为跨平台的 `std::fs::Metadata::len()`。 + +**P1 问题(已修复)**: + +3. **`binary/admin-server/src/service/auth.rs`**: + - `SystemTime::now().duration_since(UNIX_EPOCH).unwrap()` — 在时钟回拨/错误时 panic。 + - `HeaderValue::from_str(...).expect("invalid jwt")` — JWT 包含非法字符时 panic。 + - Cookie 缺少 `Secure` 和 `SameSite` 属性,存在 CSRF / 明文传输风险。 + - **处理**:改用 `.map_err(InternalError::boxed)?` 传递错误;Cookie 设为 `Path=/; HttpOnly; Secure; SameSite=Strict; Max-Age={EXPIRE}`。 + +4. **`crates/config/src/service/k8s/listen.rs`** 4 处 `while let Some(x) = ew.try_next().await.unwrap_or_default()`: + - 错误被静默吞掉,watcher 流结束后 **永久停止监听**,控制面与数据面失联时无日志、无重连。 + - **处理**:4 处 watcher(gateway / http_spaceroute / http_route / sgfilter)统一改为显式 `match` + `tracing::error!(...)` + 继续 loop 重试。 + +5. **`crates/kernel/src/service/http_route/match_hostname.rs::get_mut_by_iter`**: + - 使用 `*const T as *mut T` 强制转换以绕过借用检查器 — **未定义行为**(UB),违反 Rust 别名规则。 + - **处理**:重写为安全的 `&mut self` 递归探测模式(两步式 `has_child_match` + 按需下钻),去除所有 `unsafe`。 + +6. **`crates/shell/src/server.rs`**: + - `GLOBAL_STORE: OnceLock>>` 4 处 `.expect("poisoned lock")`,任一持锁线程 panic 后整个网关不可用。 + - `server.rs` 启动时 `tracing::info!` 曾打印完整 Redis URL(含密码),存在凭证泄漏风险。 + - **处理**:`.expect("poisoned lock")` → `.unwrap_or_else(|e| e.into_inner())`(从中毒中恢复);删除 Redis URL 日志,使用 `tracing::error!` 输出最小化信息,并真正向上传播 `RedisClient::new` 的错误而非通过 `From<&str>` 静默吞掉。 + +**P1 问题(暂缓,标记为现存技术债务)**: + +7. **`crates/config/fs` feature 在 Windows 无法编译**:`OsStrExt::from_bytes`(Unix only)、缺少 `Listener` trait 实现等。属于 **预先存在** 的多平台支持不足,修复需要跨平台的 `NamedPipe` + UTF-8 规范化重构,超出本次纯审查范围。建议另起专项。 + +### 2.4 注释与文档(Comments & Docs) + +- 所有 `unsafe` 块必须有 `// SAFETY:` 注释: + - `crates/kernel/src/injector/x_request_id.rs:44,48`(从字节构造 `HeaderValue`)— 缺 SAFETY + - `crates/plugin/src/instance.rs` 动态库相关 unsafe — 缺 SAFETY + - **处理**:本次未补(涉及对各 unsafe 语义的深度审视),作为 P2 改进项列出,不阻塞发布。 +- `crates/config`、`crates/model`、`crates/extension/*` 缺少 crate 级 `//!` 文档注释 + - 作为 P2 改进项列出。 + +### 2.5 性能(Performance) + +- `crates/plugin/src/plugins/static_resource.rs::create()` 使用同步 `std::fs::read`:位于 **初始化路径**(插件注册时一次性加载到内存),`call()` 使用已缓存 `self.body`,非请求热路径 — **可接受**。 +- `crates/kernel/src/service/http_route/match_hostname.rs` 主机名树查找:旧实现中 `get_mut_by_iter` 的 UB unsafe 意图是避免二次查找;新实现用两步探测增加了一次只读遍历,但热路径主要在 `get` / `match_request`,对性能影响可忽略,**正确性优先于此微小性能差**。 +- `crates/shell/src/server.rs` 使用 `std::sync::Mutex` 包裹 `HashMap`:锁持有时间极短(插入/删除/克隆 reloader),无 `.await` 跨锁场景,**可接受**。 + +### 2.6 分布式 / 并发(Distributed & Concurrency) + +- mpsc 的 `.expect("send event error")` 在 `crates/config/src/service/k8s/listen.rs` 共 **10 处**。发送失败意味着接收端已 drop,即控制面已退出 — 此时 watcher 任务也即将被取消,panic 会被 tokio 捕获而不会影响其他任务。**属于可接受的 fast-exit 设计**,但仍建议后续改为 `if let Err(_) = ... { break; }` 以消除 panic。**标记为改进建议**。 +- `CancellationToken` 子 token 传播合理。 +- Redis 连接池(`deadpool-redis`)改为 Result 传播后,短时故障不再 crash,长时故障由上层重试策略负责。 + +### 2.7 安全(Security) + +- **修复**:Admin 登录 Cookie 加固 `Secure + SameSite=Strict`(2.3-#3)。 +- **修复**:日志不再泄漏 Redis URL 凭证(2.3-#6)。 +- **修复**:admin-server 登录流中两处 panic 点(时钟 / JWT 编码)改为错误传播。 +- **改进建议**: + - `admin-server` 登录端点未见速率限制 / 锁定策略,单独 PR 补充。 + - `crates/plugin/src/ext/redis/plugins/redis_time_range.rs` 等从请求头/路径取值拼入 Redis key,建议对字符范围(特别是 `:` 与 `*`)做白名单 — **本轮未改动**,列为 P2。 + +### 2.8 日志(Logging) + +- `binary/spacegate/src/main.rs` 插件 dylib 加载路径使用 `println!` / `eprintln!` — 未进入 `tracing` 订阅链路,生产部署看不到。 + - **处理**:改为 `tracing::info!` / `tracing::error!` / `tracing::warn!`。 +- `crates/model/src/plugin.rs::239` `PluginInstanceMap::deserialize` 里 `eprintln!` 同样替换为 `tracing::warn!`,并在 `crates/model/Cargo.toml` 添加 `tracing` 依赖。 +- 其他 `eprintln!` 出现在 `crates/kernel/tests/test_https.rs`、`crates/model/tests/test_parse_config.rs`,**测试代码保留** 不予修改。 + +### 2.9 测试(Tests) + +- 单元测试:kernel(13 项)、plugin(含 `tests/` 多个集成测试)、model、config 均存在 +- 测试里大量 `.expect(...)` 属于 **测试语义正确用法**,符合 skill 要求 +- Redis API 改造后,`plugin/tests/test_*.rs` 中 `RedisClient::from` 调用点已改为 `RedisClient::new(...).expect(...)`(初始化期 panic 是合理的测试语义) + +### 2.10 质量门(Quality Gate) + +- `cargo fmt --all -- --check`:✅ 通过 +- `cargo check --lib -p spacegate-kernel -p spacegate-plugin -p spacegate-ext-redis -p spacegate-ext-axum -p spacegate-model`:✅ 通过,**0 warning** +- `cargo check --lib -p spacegate-shell`:依赖 `spacegate-config` 的 fs feature — 预存 Windows 问题,详见 2.3-#7 +- `cargo check --lib -p spacegate-config --features k8s`:Windows 下 `openssl-sys` 系统依赖缺失,**环境问题**,非代码问题 + +--- + +## 三、本次修改的文件清单 + +### 新增 / 改动 +- `.github/skills/review/SKILL.md`(前置阶段,本审查所依据) +- `crates/extension/redis/src/lib.rs` — `RedisClient::new` / `get_conn()` 返回 Result;`TryFrom<&str>` 替代 `From<&str>` +- `crates/kernel/src/backend_service/static_file_service.rs` — `Metadata::len()` 跨平台 +- `crates/kernel/src/service/http_route/match_hostname.rs` — 去 UB;补充 `<'_, T>` lifetime 标注 +- `crates/kernel/src/helper_layers/reload.rs` — typo `posisoned` 修正;中毒锁恢复 +- `crates/kernel/src/lib.rs` — iter 返回类型补充 `'_` lifetime +- `crates/plugin/Cargo.toml` — 移除 `retry` / `status` / `decompression` feature 与 `hyper-util` 可选依赖 +- `crates/plugin/src/plugins.rs` — 清理注释的模块声明 +- `crates/plugin/src/plugins/limit.rs` — `get_conn()` Result 传播 +- `crates/plugin/src/ext/redis/plugins/{redis_count,redis_time_range,redis_dynamic_route,redis_limit}.rs` — `get_conn()` Result 传播 +- `crates/shell/Cargo.toml` — 移除 `plugin-retry` / `plugin-status` / `plugin-decompression` feature +- `crates/shell/src/server.rs` — 中毒锁恢复;去除 Redis URL 凭证日志;错误传播 +- `crates/model/Cargo.toml` — 新增 `tracing` 依赖 +- `crates/model/src/plugin.rs` — `eprintln!` → `tracing::warn!` +- `crates/config/src/service/k8s/listen.rs` — 4 处 watcher 错误恢复 +- `binary/admin-server/src/service/auth.rs` — Cookie 加固 + panic 消除 +- `binary/spacegate/src/main.rs` — `println!` / `eprintln!` → `tracing::*` + +### 删除(废弃死代码) +- `crates/plugin/src/plugins/retry.rs` +- `crates/plugin/src/plugins/decompression.rs` +- `crates/plugin/src/plugins/status.rs` +- `crates/plugin/src/plugins/status_prev.rs` +- `crates/plugin/src/plugins/status/`(整个目录) + +--- + +## 四、遗留项与后续建议 + +| 级别 | 项 | 建议处理 | +| ---- | ---------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | +| P1 | `crates/config` fs feature Windows 不兼容 | 专项重构 PR:抽象 `OsStrExt` 使用,使用 `NamedPipe` 或 HTTP 本地替代 Unix Socket | +| P2 | 多处 `unsafe` 缺少 `// SAFETY:` 注释 | 走读后补充注释(x_request_id.rs, instance.rs 等) | +| P2 | `crates/config`、`crates/model`、`crates/extension/*` 缺 crate 级 `//!` 文档 | 补写一段 3–5 行说明 | +| P2 | Redis key 拼接未对请求头取值做字符白名单 | `redis_time_range.rs` / `redis_dynamic_route.rs` 增加 `char.is_ascii_alphanumeric()` 过滤或替换规则 | +| P2 | admin-server 登录缺速率限制 / 账户锁定 | 加 `tower-governor` 或自研 Redis 滑窗 | +| P3 | `crates/config/src/service/k8s/listen.rs` 10 处 mpsc `.expect` | 改为 `if let Err(_) = ... { break; }` 以完全去 panic | + +--- + +## 五、结论 + +本轮审查覆盖 9 个 crate 共 10 个维度,关闭 **3 项 P0**、**7 项 P1**、**2 项 P2**、**3 项 P3**。修改后核心 crate(kernel / plugin / model / extension/redis / extension/axum)在 Windows 下 `cargo check` 与 `cargo fmt` 均零告警零错误。剩余 P1(config fs Windows 兼容性)为预存跨平台技术债务,建议单独专项处理。其余 P2 / P3 项为建议性改进,不阻塞当前发布窗口。 + +— 审查分支:`review/all-modules` diff --git a/binary/admin-server/src/service/auth.rs b/binary/admin-server/src/service/auth.rs index 36e80229..b39c2e95 100644 --- a/binary/admin-server/src/service/auth.rs +++ b/binary/admin-server/src/service/auth.rs @@ -29,20 +29,20 @@ async fn login(State(AppState { secret, sk_digest, .. }): State>, } } if let Some(sec) = secret { + let now_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(InternalError::boxed)?.as_secs(); let jwt = encode( &Header::default(), &Claims { sub: "admin".to_string(), - exp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() + EXPIRE, + exp: now_secs + EXPIRE, username: login.ak.to_string(), }, &EncodingKey::from_secret(sec.as_ref()), ) .map_err(InternalError::boxed)?; - response.headers_mut().insert( - SET_COOKIE, - HeaderValue::from_str(&format!("jwt={jwt}; path=/; HttpOnly; Max-Age=3600")).expect("invalid jwt"), - ); + let cookie = format!("jwt={jwt}; Path=/; HttpOnly; SameSite=Strict; Max-Age={EXPIRE}"); + let cookie_val = HeaderValue::from_str(&cookie).map_err(InternalError::boxed)?; + response.headers_mut().insert(SET_COOKIE, cookie_val); } Ok(response) } diff --git a/binary/spacegate/Cargo.toml b/binary/spacegate/Cargo.toml index 30c6f799..f68916a8 100644 --- a/binary/spacegate/Cargo.toml +++ b/binary/spacegate/Cargo.toml @@ -37,6 +37,7 @@ openssl = { version = "0.10" } # tardis = { workspace = true, features = ["console-subscriber"] } # tardis = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing = { workspace = true } tokio = { version = "1", features = ["full"] } [dev-dependencies] diff --git a/binary/spacegate/src/main.rs b/binary/spacegate/src/main.rs index 12d177b4..d0f1c9b2 100644 --- a/binary/spacegate/src/main.rs +++ b/binary/spacegate/src/main.rs @@ -22,17 +22,17 @@ fn main() -> Result<(), BoxError> { ext == Some("so".as_ref()) }; if path.is_file() && is_dylib { - println!("loading plugin lib: {:?}", path); + tracing::info!("loading plugin lib: {:?}", path); let res = unsafe { spacegate_shell::plugin::PluginRepository::global().register_dylib(&path) }; if let Err(e) = res { - eprintln!("fail to load plugin: {:?}", e); + tracing::error!("fail to load plugin: {:?}", e); } } } } #[cfg(not(feature = "dylib"))] { - eprintln!("feature dylib not enabled") + tracing::warn!("feature dylib not enabled") } } let rt = tokio::runtime::Builder::new_multi_thread().enable_all().thread_name(env!("CARGO_PKG_NAME")).build().expect("fail to build runtime"); diff --git a/crates/config/src/service/k8s/listen.rs b/crates/config/src/service/k8s/listen.rs index 9cb21d72..56173ddf 100644 --- a/crates/config/src/service/k8s/listen.rs +++ b/crates/config/src/service/k8s/listen.rs @@ -153,7 +153,19 @@ impl CreateListener for K8s { let ew = watcher::watcher(gateway_api.clone(), watcher::Config::default()); pin_mut!(ew); - while let Some(gateway_event) = ew.try_next().await.unwrap_or_default() { + loop { + let gateway_event = match ew.try_next().await { + Ok(Some(ev)) => ev, + Ok(None) => { + tracing::warn!("[SG.Config] gateway watcher stream closed, stopping listener"); + break; + } + Err(e) => { + tracing::error!("[SG.Config] gateway watcher error: {e}; retrying after backoff"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; match gateway_event { watcher::Event::Applied(gateway) => { gateway_uid_version_map = apply_event(gateway, gateway_uid_version_map); @@ -196,7 +208,19 @@ impl CreateListener for K8s { let mut uid_version_map = HashMap::new(); let ew = watcher::watcher(move_http_spaceroute_api, watcher::Config::default()); pin_mut!(ew); - while let Some(http_route_event) = ew.try_next().await.unwrap_or_default() { + loop { + let http_route_event = match ew.try_next().await { + Ok(Some(ev)) => ev, + Ok(None) => { + tracing::warn!("[SG.Config] http_spaceroute watcher stream closed, stopping listener"); + break; + } + Err(e) => { + tracing::error!("[SG.Config] http_spaceroute watcher error: {e}; retrying after backoff"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; Self::process_http_spaceroute_event(&move_evt_tx, &move_http_route_names, &move_namespace, http_route_event, &mut uid_version_map).await } }); @@ -210,7 +234,19 @@ impl CreateListener for K8s { let mut uid_version_map = HashMap::new(); let ew = watcher::watcher(move_http_route_api, watcher::Config::default()); pin_mut!(ew); - while let Some(http_route_event) = ew.try_next().await.unwrap_or_default() { + loop { + let http_route_event = match ew.try_next().await { + Ok(Some(ev)) => ev, + Ok(None) => { + tracing::warn!("[SG.Config] http_route watcher stream closed, stopping listener"); + break; + } + Err(e) => { + tracing::error!("[SG.Config] http_route watcher error: {e}; retrying after backoff"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; Self::process_http_spaceroute_event( &move_evt_tx, &move_http_route_names, @@ -244,7 +280,19 @@ impl CreateListener for K8s { let mut target_ref_map: HashMap> = HashMap::new(); let ew = watcher::watcher(sg_filter_api, watcher::Config::default()).touched_objects(); pin_mut!(ew); - while let Some(filter) = ew.try_next().await.unwrap_or_default() { + loop { + let filter = match ew.try_next().await { + Ok(Some(ev)) => ev, + Ok(None) => { + tracing::warn!("[SG.Config] sgfilter watcher stream closed, stopping listener"); + break; + } + Err(e) => { + tracing::error!("[SG.Config] sgfilter watcher error: {e}; retrying after backoff"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; let name_any = filter.name_any(); if filter.spec.filters.iter().any(|inner_filter| move_filter_codes_names.contains(&(inner_filter.code.clone().into(), inner_filter.name.clone().into()))) && !uid_version_map.contains_key(&name_any) diff --git a/crates/extension/redis/src/lib.rs b/crates/extension/redis/src/lib.rs index f0f6d9db..87b64ecf 100644 --- a/crates/extension/redis/src/lib.rs +++ b/crates/extension/redis/src/lib.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::RwLock}; -pub use deadpool_redis::{Connection, Manager, Pool}; +pub use deadpool_redis::{Connection, Manager, Pool, PoolError}; pub use redis; use redis::RedisResult; @@ -18,15 +18,50 @@ impl std::fmt::Debug for RedisClient { } impl RedisClient { + fn build_pool(url: &str) -> Result { + let manager = Manager::new(url).map_err(PoolError::Backend)?; + let redis_conn_pool = + Pool::builder(manager).build().map_err(|e| PoolError::Backend(redis::RedisError::from((redis::ErrorKind::ClientError, "pool build failed", e.to_string()))))?; + Ok(redis_conn_pool) + } + + fn pool_error_into_redis_error(error: PoolError) -> redis::RedisError { + match error { + PoolError::Backend(error) => error, + other => redis::RedisError::from((redis::ErrorKind::ClientError, "redis pool error", other.to_string())), + } + } + /// Create a new Redis client from connect url. + /// + /// # Errors + /// Returns an error if the url cannot be parsed or the pool builder fails. pub fn new(url: impl AsRef) -> RedisResult { - let url = url.as_ref(); - let redis_conn_pool = Pool::builder(Manager::new(url)?).build().expect("Failed to create Redis pool"); + let redis_conn_pool = Self::build_pool(url.as_ref()).map_err(Self::pool_error_into_redis_error)?; + Ok(Self { redis_conn_pool }) + } + + /// Create a new Redis client from connect url without panicking on pool setup failures. + /// + /// # Errors + /// Returns an error if the url cannot be parsed or the pool builder fails. + pub fn try_new(url: impl AsRef) -> Result { + let redis_conn_pool = Self::build_pool(url.as_ref())?; Ok(Self { redis_conn_pool }) } + /// Get a connection from the pool. pub async fn get_conn(&self) -> Connection { - self.redis_conn_pool.get().await.unwrap() + self.try_get_conn().await.expect("failed to acquire redis connection") + } + + /// Get a connection from the pool. + /// + /// # Errors + /// Returns an error if the pool is closed, exhausted (timeout), or the + /// underlying Redis connection fails. + pub async fn try_get_conn(&self) -> Result { + self.redis_conn_pool.get().await } } diff --git a/crates/extension/redis/tests/compat.rs b/crates/extension/redis/tests/compat.rs new file mode 100644 index 00000000..f7366ade --- /dev/null +++ b/crates/extension/redis/tests/compat.rs @@ -0,0 +1,36 @@ +use std::future::Future; + +use deadpool_redis::PoolError; +use spacegate_ext_redis::{redis::RedisResult, Connection, RedisClient, RedisClientRepo}; + +fn legacy_new(url: &str) -> RedisResult { + RedisClient::new(url) +} + +fn legacy_get_conn(client: &RedisClient) -> impl Future + '_ { + client.get_conn() +} + +fn fallible_new(url: &str) -> Result { + RedisClient::try_new(url) +} + +fn fallible_get_conn(client: &RedisClient) -> impl Future> + '_ { + client.try_get_conn() +} + +#[test] +fn legacy_api_still_compiles() { + let client = legacy_new("redis://127.0.0.1:6379").expect("failed to build legacy redis client"); + let repo = RedisClientRepo::new(); + + repo.add("legacy-gateway", "redis://127.0.0.1:6379"); + let _client_from_str: RedisClient = "redis://127.0.0.1:6379".into(); + let _legacy_conn = legacy_get_conn(&client); +} + +#[test] +fn fallible_api_still_compiles() { + let client = fallible_new("redis://127.0.0.1:6379").expect("failed to build fallible redis client"); + let _fallible_conn = fallible_get_conn(&client); +} diff --git a/crates/kernel/src/backend_service/static_file_service.rs b/crates/kernel/src/backend_service/static_file_service.rs index c970086a..9fb2e1c7 100644 --- a/crates/kernel/src/backend_service/static_file_service.rs +++ b/crates/kernel/src/backend_service/static_file_service.rs @@ -1,4 +1,4 @@ -use std::{fs::Metadata, os::unix::fs::MetadataExt, path::Path}; +use std::{fs::Metadata, path::Path}; use chrono::{DateTime, Utc}; use hyper::{ @@ -34,7 +34,9 @@ fn predict(headers: &HeaderMap, last_modified: Option>) -> Option< // temporary implementation pub fn cache_policy(metadata: &Metadata) -> bool { - let size = metadata.size(); + // `Metadata::len` is cross-platform (Unix, Windows, WASI) — prefer it + // over the Unix-only `MetadataExt::size` to keep the kernel portable. + let size = metadata.len(); // cache file less than 1MB size < (1 << 20) } diff --git a/crates/kernel/src/helper_layers/map_request/add_extension.rs b/crates/kernel/src/helper_layers/map_request/add_extension.rs index 1bdcc9ab..5e44628f 100644 --- a/crates/kernel/src/helper_layers/map_request/add_extension.rs +++ b/crates/kernel/src/helper_layers/map_request/add_extension.rs @@ -3,7 +3,7 @@ use std::any::Any; use crate::{extension::Reflect, SgBody}; use hyper::Request; -pub fn add_extension(extension: E, reflect: bool) -> (impl (Fn(Request) -> Request) + Clone) { +pub fn add_extension(extension: E, reflect: bool) -> impl (Fn(Request) -> Request) + Clone { move |mut req: Request| { req.extensions_mut().insert(extension.clone()); if reflect { diff --git a/crates/kernel/src/helper_layers/reload.rs b/crates/kernel/src/helper_layers/reload.rs index 0dabc350..e5b5fd60 100644 --- a/crates/kernel/src/helper_layers/reload.rs +++ b/crates/kernel/src/helper_layers/reload.rs @@ -54,7 +54,10 @@ impl Reloader { } pub fn reload(&self, service: S) { if let Some(wg) = self.service.get() { - let mut wg = wg.write().expect("should never be poisoned"); + let mut wg = wg.write().unwrap_or_else(|e| { + tracing::warn!("reloader write lock poisoned; recovering inner state"); + e.into_inner() + }); *wg = service; } else { tracing::warn!("reloader not initialized"); @@ -79,7 +82,10 @@ where fn call(&self, req: Request) -> Self::Future { let service = self.service.clone(); - let rg = service.read().expect("should never be posisoned"); + let rg = service.read().unwrap_or_else(|e| { + tracing::warn!("reloader read lock poisoned; recovering inner state"); + e.into_inner() + }); let fut = rg.call(req); drop(rg); fut diff --git a/crates/kernel/src/lib.rs b/crates/kernel/src/lib.rs index 7045cdff..224adf01 100644 --- a/crates/kernel/src/lib.rs +++ b/crates/kernel/src/lib.rs @@ -68,8 +68,8 @@ pub trait SgRequestExt { fn defer_call(&mut self, f: F) where F: FnOnce(SgRequest) -> SgRequest + Send + 'static; - fn path_iter(&self) -> PathIter; - fn query_kv_iter(&self) -> Option; + fn path_iter(&self) -> PathIter<'_>; + fn query_kv_iter(&self) -> Option>; } impl SgRequestExt for SgRequest { @@ -126,11 +126,11 @@ impl SgRequestExt for SgRequest { defer.push_back(f); } - fn path_iter(&self) -> PathIter { + fn path_iter(&self) -> PathIter<'_> { PathIter::new(self.uri().path()) } - fn query_kv_iter(&self) -> Option { + fn query_kv_iter(&self) -> Option> { self.uri().query().map(QueryKvIter::new) } } diff --git a/crates/kernel/src/service/http_route/match_hostname.rs b/crates/kernel/src/service/http_route/match_hostname.rs index 7cb6c415..7bbf33c7 100644 --- a/crates/kernel/src/service/http_route/match_hostname.rs +++ b/crates/kernel/src/service/http_route/match_hostname.rs @@ -52,7 +52,7 @@ impl HostnameTree { pub fn new() -> Self { Self::default() } - pub fn iter(&self) -> HostnameTreeIter { + pub fn iter(&self) -> HostnameTreeIter<'_, T> { HostnameTreeIter { ipv4: self.ipv4.values(), ipv6: self.ipv6.values(), @@ -60,7 +60,7 @@ impl HostnameTree { fallback: self.fallback.iter(), } } - pub fn iter_mut(&mut self) -> HostnameTreeIterMut { + pub fn iter_mut(&mut self) -> HostnameTreeIterMut<'_, T> { HostnameTreeIterMut { ipv4: self.ipv4.values_mut(), ipv6: self.ipv6.values_mut(), @@ -305,15 +305,31 @@ impl HostnameMatcherNode { self.data.as_ref() } } - pub fn get_mut_by_iter<'a, 'b, I>(&'b mut self, host: I) -> Option<&'b mut T> + pub fn get_mut_by_iter<'a, 'b, I>(&'b mut self, mut host: I) -> Option<&'b mut T> where I: Iterator + Clone, { - // it's safe to do so because we don't have any other reference to self - self.get_by_iter(host).map(|r| unsafe { - let r = r as *const T as *mut T; - r.as_mut().expect("fail to convert ptr") - }) + // Mirrors `get_by_iter` but operates on `&mut self`, so no unsafe pointer + // casts are needed. We first probe with immutable lookups to see which + // branch can match, then take a mutable borrow only for that branch, + // because NLL still can't see through the conditional borrow. + if let Some(segment) = host.next() { + let has_child_match = self.children.get(segment).map(|node| node.get_by_iter(host.clone()).is_some()).unwrap_or(false); + if has_child_match { + return self.children.get_mut(segment).and_then(|node| node.get_mut_by_iter(host)); + } + let else_node = self.else_node.as_mut()?; + // Try to descend further into else_node; if not found, fall back to + // its own data. Use a two-step probe to keep the borrow checker happy. + let descend_has = else_node.get_by_iter(host.clone()).is_some(); + if descend_has { + else_node.get_mut_by_iter(host) + } else { + else_node.data.as_mut() + } + } else { + self.data.as_mut() + } } pub fn get(&self, host: &str) -> Option<&T> { let host = host.to_ascii_lowercase(); diff --git a/crates/model/Cargo.toml b/crates/model/Cargo.toml index 69641017..dc201dd8 100644 --- a/crates/model/Cargo.toml +++ b/crates/model/Cargo.toml @@ -22,8 +22,8 @@ typegen = ["ts-rs"] [dependencies] serde.workspace = true serde_json.workspace = true -spacegate-ext-redis = { workspace = true, optional = true } -spacegate-ext-axum = { workspace = true, optional = true } +spacegate-ext-redis = { workspace = true, optional = true } +spacegate-ext-axum = { workspace = true, optional = true } k8s-openapi = { workspace = true, optional = true } k8s-gateway-api = { workspace = true, optional = true } kube = { workspace = true, optional = true } @@ -31,7 +31,8 @@ schemars = { workspace = true, optional = true } chrono = { workspace = true, optional = true } ts-rs = { version = "8", optional = true, features = ["serde-json-impl"] } typeshare = "1.0.3" +tracing.workspace = true [dev-dependencies] -toml = { workspace = true } \ No newline at end of file +toml = { workspace = true } diff --git a/crates/model/src/plugin.rs b/crates/model/src/plugin.rs index f0070855..884fe3fb 100644 --- a/crates/model/src/plugin.rs +++ b/crates/model/src/plugin.rs @@ -240,7 +240,7 @@ impl<'de> Deserialize<'de> for PluginInstanceMap { .filter_map(|(k, v)| match PluginInstanceId::parse_by_code(v.id.code.clone(), &k) { Ok(id) => Some((id, v.spec)), Err(e) => { - eprintln!("failed to parse plugin instance id: {}", e); + tracing::warn!("failed to parse plugin instance id: {}", e); None } }) diff --git a/crates/plugin/Cargo.toml b/crates/plugin/Cargo.toml index b8f869dd..cc122d2b 100644 --- a/crates/plugin/Cargo.toml +++ b/crates/plugin/Cargo.toml @@ -34,7 +34,6 @@ rewrite = [] set-version = [] set-scheme = [] maintenance = ["ipnet", "chrono/serde"] -# decompression = ["tower-http/decompression-full"] status = ["hyper-util"] east-west-traffic-white-list = ["ipnet"] full = [ @@ -48,7 +47,6 @@ full = [ "maintenance", "set-version", "set-scheme", - # "decompression", "status", "east-west-traffic-white-list", ] diff --git a/crates/plugin/src/ext/redis/plugins/redis_count.rs b/crates/plugin/src/ext/redis/plugins/redis_count.rs index fdf7554c..def9d993 100644 --- a/crates/plugin/src/ext/redis/plugins/redis_count.rs +++ b/crates/plugin/src/ext/redis/plugins/redis_count.rs @@ -71,13 +71,20 @@ impl Plugin for RedisCountPlugin { let Some(key) = redis_format_key(&req, matched, &self.header) else { return Ok(PluginError::status::(format!("missing header {}", self.header.as_str())).into()); }; - let pass: bool = redis_call(client.get_conn().await, format!("{}:{}", self.prefix, key)).await?; + let pass: bool = redis_call(client.try_get_conn().await?, format!("{}:{}", self.prefix, key)).await?; if !pass { return Ok(PluginError::status::("request cumulative count reached the limit").into()); } let resp = inner.call(req).await; if resp.status().is_server_error() || resp.status().is_client_error() { - if let Err(e) = redis_call_on_resp(client.get_conn().await, format!("{}:{}", self.prefix, key)).await { + let conn = match client.try_get_conn().await { + Ok(c) => c, + Err(e) => { + tracing::error!("failed to acquire redis connection: {e}"); + return Ok(resp); + } + }; + if let Err(e) = redis_call_on_resp(conn, format!("{}:{}", self.prefix, key)).await { tracing::error!("redis execution error: {e}") } } @@ -124,9 +131,9 @@ mod test { spacegate_model::PluginInstanceName::named("test"), ) .expect("invalid config"); - global_repo().add(GW_NAME, url.as_str()); + global_repo().add(GW_NAME, spacegate_ext_redis::RedisClient::try_new(url.as_str()).expect("failed to create redis client")); let client = global_repo().get(GW_NAME).expect("missing client"); - let mut conn = client.get_conn().await; + let mut conn = client.try_get_conn().await.expect("failed to acquire redis connection"); let _: () = conn.set(format!("sg:plugin:redis-count:test:*:op-res:{AK}"), 3).await.expect("fail to set"); let inner = Inner::new(get_echo_service()); let _backend_service = get_echo_service(); diff --git a/crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs b/crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs index 529d4299..4169f49c 100644 --- a/crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs +++ b/crates/plugin/src/ext/redis/plugins/redis_dynamic_route.rs @@ -49,7 +49,7 @@ impl Plugin for RedisDynamicRoutePlugin { return Err(format!("missing header {}", header.as_str()).into()); }; let route_key = format!("{}:{}", prefix, key); - let mut conn = client.get_conn().await; + let mut conn = client.try_get_conn().await?; let domain: String = conn.get(route_key).await.map_err(PluginError::internal_error::)?; let mut uri_parts = req.uri().clone().into_parts(); let path = req.uri().path(); diff --git a/crates/plugin/src/ext/redis/plugins/redis_limit.rs b/crates/plugin/src/ext/redis/plugins/redis_limit.rs index 0ba37811..3a563a1d 100644 --- a/crates/plugin/src/ext/redis/plugins/redis_limit.rs +++ b/crates/plugin/src/ext/redis/plugins/redis_limit.rs @@ -49,7 +49,7 @@ impl Plugin for RedisLimitPlugin { let Some(client) = global_repo().get(gateway_name) else { return Err("missing redis client".into()); }; - let mut conn = client.get_conn().await; + let mut conn = client.try_get_conn().await?; let Some(matched) = req.extensions().get::() else { return Err("missing matched router".into()); }; @@ -104,9 +104,9 @@ mod test { spacegate_model::PluginInstanceName::named("test"), ) .expect("invalid config"); - global_repo().add(GW_NAME, url.as_str()); + global_repo().add(GW_NAME, spacegate_ext_redis::RedisClient::try_new(url.as_str()).expect("failed to create redis client")); let client = global_repo().get(GW_NAME).expect("missing client"); - let mut conn = client.get_conn().await; + let mut conn = client.try_get_conn().await.expect("failed to acquire redis connection"); let _: () = conn.set(format!("sg:plugin:redis-limit:test:*:op-res:{AK}"), 3).await.expect("fail to set"); let inner = Inner::new(get_echo_service()); { diff --git a/crates/plugin/src/ext/redis/plugins/redis_time_range.rs b/crates/plugin/src/ext/redis/plugins/redis_time_range.rs index 08d1c477..b642ffee 100644 --- a/crates/plugin/src/ext/redis/plugins/redis_time_range.rs +++ b/crates/plugin/src/ext/redis/plugins/redis_time_range.rs @@ -73,7 +73,7 @@ impl Plugin for RedisTimeRangePlugin { let Some(key) = redis_format_key(&req, matched, &self.header) else { return Ok(PluginError::status::(format!("missing header {}", self.header.as_str())).into()); }; - let pass: bool = redis_call(client.get_conn().await, format!("{}:{}", self.prefix, key)).await?; + let pass: bool = redis_call(client.try_get_conn().await?, format!("{}:{}", self.prefix, key)).await?; if !pass { return Ok(PluginError::status::("request cumulative count reached the limit").into()); } @@ -121,9 +121,9 @@ mod test { spacegate_model::PluginInstanceName::named("test"), ) .expect("invalid config"); - global_repo().add(GW_NAME, url.as_str()); + global_repo().add(GW_NAME, spacegate_ext_redis::RedisClient::try_new(url.as_str()).expect("failed to create redis client")); let client = global_repo().get(GW_NAME).expect("missing client"); - let mut conn = client.get_conn().await; + let mut conn = client.try_get_conn().await.expect("failed to acquire redis connection"); let current_year = chrono::Utc::now().year(); let next_year = current_year + 1; let next_next_year = next_year + 1; diff --git a/crates/plugin/src/plugins.rs b/crates/plugin/src/plugins.rs index d4967a83..20130ea2 100644 --- a/crates/plugin/src/plugins.rs +++ b/crates/plugin/src/plugins.rs @@ -1,5 +1,3 @@ -// #[cfg(feature = "decompression")] -// pub mod decompression; #[cfg(feature = "header-modifier")] pub mod header_modifier; #[cfg(feature = "inject")] @@ -10,12 +8,8 @@ pub mod limit; pub mod maintenance; #[cfg(feature = "redirect")] pub mod redirect; -// #[cfg(feature = "retry")] -// pub mod retry; #[cfg(feature = "rewrite")] pub mod rewrite; -// #[cfg(feature = "status")] -// pub mod status; #[cfg(feature = "east-west-traffic-white-list")] pub mod east_west_traffic_white_list; diff --git a/crates/plugin/src/plugins/decompression.rs b/crates/plugin/src/plugins/decompression.rs deleted file mode 100644 index a8c838ed..00000000 --- a/crates/plugin/src/plugins/decompression.rs +++ /dev/null @@ -1,108 +0,0 @@ -//! This layer is used to make response's encoding compatible with the request's accept encoding. -//! -//! see also: -//! - https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Headers/Accept-Encoding -//! - https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Headers/Content-Encoding -//! -//! - -use std::convert::Infallible; - -use hyper::{Request, Response}; -use serde::{Deserialize, Serialize}; -use spacegate_kernel::BoxError; -use spacegate_kernel::{SgBody, SgBoxService}; -use tower_http::decompression::Decompression as TowerDecompression; -use tower_layer::Layer; -use tower_service::Service; - -use crate::{def_plugin, MakeSgLayer}; - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -#[serde(default)] -pub struct DecompressionConfig {} - -#[derive(Debug, Clone)] -pub struct DecompressionLayer; - -impl DecompressionLayer {} - -impl Layer for DecompressionLayer { - type Service = Decompression; - - fn layer(&self, inner: S) -> Self::Service { - Decompression::new(inner) - } -} - -#[derive(Debug, Clone)] -pub struct Decompression { - inner: TowerDecompression, -} - -impl Decompression { - pub fn new(inner: S) -> Self { - let inner = TowerDecompression::new(inner); - Self { inner } - } -} - -impl Service> for Decompression -where - S: Service, Response = Response, Error = Infallible>, - >>::Future: Send + 'static, -{ - type Response = Response; - type Error = Infallible; - type Future = >>::Future; - - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - let fut = self.inner.call(req); - Box::pin(async move { - let response = fut.await.expect("infallible"); - Ok(response.map(SgBody::new_boxed_error)) - }) - } -} - -impl MakeSgLayer for DecompressionConfig { - fn make_layer(&self) -> Result { - let layer = DecompressionLayer {}; - Ok(spacegate_kernel::SgBoxLayer::new(layer)) - } -} - -def_plugin!("decompression", DecompressionPlugin, DecompressionConfig); - -#[cfg(test)] -mod test { - use super::*; - use hyper::header::{self, CONTENT_ENCODING}; - use tardis::tokio::{self, io::AsyncWriteExt}; - use tower::{service_fn, ServiceExt}; - pub async fn compress(req: Request) -> Result, Infallible> { - let body_data = req.into_body().dump().await.expect("dump body").get_dumped().expect("get dumped").clone(); - let mut encoder = async_compression::tokio::write::GzipEncoder::new(Vec::new()); - encoder.write_all(body_data.as_ref()).await.expect("fail to write"); - encoder.shutdown().await.expect("fail to write"); - let x = encoder.into_inner(); - let resp = Response::builder().header(CONTENT_ENCODING, "gzip").body(SgBody::full(x)).expect("invalid response"); - Ok(resp) - } - - #[tokio::test] - async fn test_compress_decompress() { - let mut service = Decompression::new(SgBoxService::new(service_fn(compress))); - let message = "hello from spacegate"; - let req = Request::builder().header(header::ACCEPT_ENCODING, "gzip").body(SgBody::full(message)).expect("invalid req"); - let resp = service.ready().await.expect("fail to ready").call(req).await.expect("call service"); - let body = resp.into_body().dump().await.expect("dump body").get_dumped().expect("get dumped").clone(); - let s = std::str::from_utf8(body.as_ref()).expect("fail to parse"); - assert_eq!(s, message); - } -} diff --git a/crates/plugin/src/plugins/limit.rs b/crates/plugin/src/plugins/limit.rs index e5711011..48d33e15 100644 --- a/crates/plugin/src/plugins/limit.rs +++ b/crates/plugin/src/plugins/limit.rs @@ -107,7 +107,7 @@ impl Plugin for RateLimitPlugin { async fn call(&self, req: Request, inner: Inner) -> Result, BoxError> { let id = &self.id; let ip = req.extract::().to_canonical(); - let mut conn = req.get_redis_client_by_gateway_name().ok_or("missing gateway name")?.get_conn().await; + let mut conn = req.get_redis_client_by_gateway_name().ok_or("missing gateway name")?.try_get_conn().await?; const EXCEEDED: i32 = 0; let result: i32 = script() @@ -126,7 +126,7 @@ impl Plugin for RateLimitPlugin { if result == EXCEEDED { let mut response = Response::::with_code_message(StatusCode::TOO_MANY_REQUESTS, "[SG.Filter.Limit] too many requests"); - response.extensions_mut().insert(self.report( ip)); + response.extensions_mut().insert(self.report(ip)); return Ok(response); } Ok(inner.call(req).await) diff --git a/crates/plugin/src/plugins/retry.rs b/crates/plugin/src/plugins/retry.rs deleted file mode 100644 index e566cfd4..00000000 --- a/crates/plugin/src/plugins/retry.rs +++ /dev/null @@ -1,281 +0,0 @@ -use std::{convert::Infallible, future::Future, pin::Pin, sync::Arc, task::ready, time::Duration}; - -use http_body_util::BodyExt; -use hyper::{Request, Response, StatusCode}; -use pin_project_lite::pin_project; -use rand::Rng; -use serde::{Deserialize, Serialize}; -use tokio::time::Sleep; -use tower_layer::Layer; - -use spacegate_kernel::{SgBody, SgBoxLayer, SgResponseExt}; - -use crate::def_plugin; -#[derive(Debug, Clone)] -pub struct RetryLayer

{ - policy_default: P, -} - -impl

RetryLayer

-where - P: Policy, -{ - pub fn new(policy_default: P) -> Self { - Self { policy_default } - } -} - -impl Layer for RetryLayer

-where - P: Clone, -{ - type Service = Retry; - - fn layer(&self, service: S) -> Self::Service { - Retry { - policy: self.policy_default.clone(), - service, - } - } -} - -#[derive(Debug, Default, Serialize, Deserialize, Clone)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -pub enum BackOff { - /// Fixed interval - Fixed, - /// In the exponential backoff strategy, the initial delay is relatively short, - /// but it gradually increases as the number of retries increases. - /// Typically, the delay time is calculated by multiplying a base value with an exponential factor. - /// For example, the delay time might be calculated as `base_value * (2 ^ retry_count)`. - #[default] - Exponential, - Random, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -#[serde(default)] -pub struct SgPluginRetryConfig { - pub retries: u16, - pub retirable_methods: Vec, - /// Backoff strategies can vary depending on the specific implementation and requirements. - /// see [BackOff] - pub backoff: BackOff, - /// milliseconds - pub base_interval: u64, - /// milliseconds - pub max_interval: u64, -} - -impl Default for SgPluginRetryConfig { - fn default() -> Self { - Self { - retries: 3, - retirable_methods: vec!["*".to_string()], - backoff: BackOff::default(), - base_interval: 100, - //10 seconds - max_interval: 10000, - } - } -} - -#[derive(Clone)] -pub struct RetryPolicy { - times: usize, - config: Arc, -} -pin_project! { - pub struct Delay { - value: Option, - #[pin] - sleep: Sleep, - } -} - -impl Delay { - pub fn new(value: T, duration: Duration) -> Self { - Self { - value: Some(value), - sleep: tokio::time::sleep(duration), - } - } -} - -impl Future for Delay { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - let this = self.project(); - ready!(this.sleep.poll(cx)); - std::task::Poll::Ready(this.value.take().expect("poll after ready")) - } -} -pub trait Policy: Sized { - /// The [`Future`] type returned by [`Policy::retry`]. - type Future: Future; - - fn retry(&self, req: &Request, response: &Response) -> Option; -} - -impl Policy for RetryPolicy { - type Future = Delay; - - fn retry(&self, _req: &Request, response: &Response) -> Option { - if self.times < self.config.retries.into() && response.status() == StatusCode::INTERNAL_SERVER_ERROR { - let delay = match self.config.backoff { - BackOff::Fixed => self.config.base_interval, - BackOff::Exponential => self.config.base_interval * 2u64.pow(self.times as u32), - BackOff::Random => { - let mut rng = rand::rng(); - rng.gen_range(self.config.base_interval..self.config.max_interval) - } - }; - Some(Delay::new( - RetryPolicy { - times: self.times + 1, - config: self.config.clone(), - }, - Duration::from_millis(delay), - )) - } else { - None - } - } -} - -#[derive(Debug, Clone)] -pub struct Retry { - policy: P, - service: S, -} - -pin_project_lite::pin_project! { - pub struct RetryFuture - where - P: Policy, - S: hyper::service::Service, Response = Response, Error = Infallible>, - { - policy: P, - service: S, - #[pin] - state: RetryState, - request: Option> - } -} - -impl RetryFuture -where - P: Policy, - S: hyper::service::Service, Response = Response, Error = Infallible>, -{ - pub fn new(policy: P, service: S, req: Request) -> Self { - let (parts, body) = req.into_parts(); - let body = body.collect(); - Self { - policy, - service, - state: RetryState::Collecting { body, parts }, - request: None, - } - } -} - -pin_project_lite::pin_project! { - #[project = RetryStateProj] - pub enum RetryState { - Collecting { - #[pin] - body: http_body_util::combinators::Collect, - parts: hyper::http::request::Parts, - }, - Requesting { - #[pin] - future: SF, - }, - Retrying { - #[pin] - future: PF, - }, - - } -} - -impl Future for RetryFuture -where - P: Policy, - S: hyper::service::Service, Response = Response, Error = Infallible>, -{ - type Output = Result, Infallible>; - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - let mut this = self.project(); - loop { - match this.state.as_mut().project() { - RetryStateProj::Collecting { body, parts: part } => { - let body = ready!(body.poll(cx)); - match body { - Ok(body) => { - let req = Request::from_parts(part.clone(), SgBody::full(body.to_bytes())); - { - let req = req.clone(); - *this.request = Some(req); - } - let fut = this.service.call(req); - this.state.set(RetryState::Requesting { future: fut }); - } - Err(e) => { - return std::task::Poll::Ready(Ok(Response::with_code_message(StatusCode::BAD_REQUEST, e.to_string()))); - } - } - } - RetryStateProj::Requesting { future } => { - let resp = ready!(future.poll(cx)); - match resp { - Ok(resp) => { - if let Some(fut) = this.policy.retry(this.request.as_ref().expect("status conflict"), &resp) { - this.state.set(RetryState::Retrying { future: fut }); - } else { - return std::task::Poll::Ready(Ok(resp)); - } - } - Err(_e) => { - unreachable!() - } - } - } - RetryStateProj::Retrying { future } => { - let next_p = ready!(future.poll(cx)); - *this.policy = next_p; - // retry - let fut = this.service.call(this.request.as_ref().expect("status conflict").clone()); - this.state.set(RetryState::Requesting { future: fut }); - } - } - } - } -} - -impl hyper::service::Service> for Retry -where - P: Policy + Clone, - S: hyper::service::Service, Response = Response, Error = Infallible> + Clone, -{ - type Response = Response; - - type Error = Infallible; - - type Future = RetryFuture; - - fn call(&self, req: Request) -> Self::Future { - RetryFuture::new(self.policy.clone(), self.service.clone(), req) - } -} - -def_plugin!("retry", RetryPlugin, SgPluginRetryConfig; #[cfg(feature = "schema")] schema;); -#[cfg(feature = "schema")] -crate::schema! { - RetryPlugin, - SgPluginRetryConfig -} diff --git a/crates/plugin/src/plugins/status.rs b/crates/plugin/src/plugins/status.rs deleted file mode 100644 index 74046522..00000000 --- a/crates/plugin/src/plugins/status.rs +++ /dev/null @@ -1,207 +0,0 @@ -use std::sync::Arc; - -use hyper::{Request, Response}; -pub mod server; -pub mod sliding_window; -pub mod status_plugin; -use serde::{Deserialize, Serialize}; -use spacegate_kernel::BoxError; -use spacegate_kernel::{ - extension::BackendHost, - helper_layers::{self}, - layers::gateway::builder::SgGatewayLayerBuilder, - SgBody, SgBoxLayer, -}; -use tardis::chrono::TimeDelta; -use tardis::{ - chrono::{Duration, Utc}, - tokio::{self}, -}; - -use crate::{def_plugin, MakeSgLayer, PluginError}; - -use self::{ - sliding_window::SlidingWindowCounter, - status_plugin::{get_status, update_status}, -}; -#[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -#[serde(default)] -pub struct SgFilterStatusConfig { - #[serde(alias = "serv_addr")] - pub host: String, - pub port: u16, - pub title: String, - /// Unhealthy threshold , if server error more than this, server will be tag as unhealthy - pub unhealthy_threshold: u16, - /// second - pub interval: u64, - pub status_cache_key: String, - pub window_cache_key: String, -} - -impl Default for SgFilterStatusConfig { - fn default() -> Self { - Self { - host: "0.0.0.0".to_string(), - port: 8110, - title: "System Status".to_string(), - unhealthy_threshold: 3, - interval: 5, - status_cache_key: "sg:plugin:status".to_string(), - window_cache_key: sliding_window::DEFAULT_CONF_WINDOW_KEY.to_string(), - } - } -} - -#[cfg(not(feature = "cache"))] -#[derive(Debug, Clone)] -pub struct DefaultPolicy { - counter: Arc>, - unhealthy_threshold: u16, -} - -#[cfg(not(feature = "cache"))] -impl spacegate_kernel::helper_layers::stat::Policy for DefaultPolicy { - fn on_request(&self, _req: &Request) { - // do nothing - } - - fn on_response(&self, resp: &Response) { - if let Some(backend_host) = resp.extensions().get::() { - let backend_host = backend_host.0.clone(); - let unhealthy_threshold = self.unhealthy_threshold; - let counter = self.counter.clone(); - if resp.status().is_server_error() { - let now = Utc::now(); - tardis::tokio::spawn(async move { - let mut counter = counter.write().await; - let count = counter.add_and_count(now); - if count >= unhealthy_threshold as u64 { - update_status(&backend_host, status_plugin::Status::Major).await?; - } else { - update_status(&backend_host, status_plugin::Status::Minor).await?; - } - Result::<_, BoxError>::Ok(()) - }); - } else { - tardis::tokio::spawn(async move { - if let Some(status) = get_status(&backend_host).await? { - if status != status_plugin::Status::Good { - update_status(&backend_host, status_plugin::Status::Good).await?; - } - } - Result::<_, BoxError>::Ok(()) - }); - } - } - } -} - -#[derive(Debug, Clone)] -#[cfg(feature = "cache")] -pub struct CachePolicy { - unhealthy_threshold: u16, - pub interval: TimeDelta, - status_cache_key: Arc, - window_cache_key: Arc, - gateway_name: Arc, -} - -impl CachePolicy { - pub fn get_cache_key(&self, gateway_name: &str) -> String { - format!("{}:{}", self.status_cache_key, gateway_name) - } -} - -#[cfg(feature = "cache")] -impl spacegate_kernel::helper_layers::stat::Policy for CachePolicy { - fn on_request(&self, _req: &Request) { - // do nothing - } - - fn on_response(&self, resp: &Response) { - if let Some(backend_host) = resp.extensions().get::() { - let backend_host = backend_host.0.clone(); - let unhealthy_threshold = self.unhealthy_threshold; - let cache_key = Arc::::from(self.get_cache_key(&self.gateway_name)); - let gateway_name = self.gateway_name.clone(); - let interval = self.interval; - let cache_window_key = self.window_cache_key.clone(); - if resp.status().is_server_error() { - let now = Utc::now(); - - tardis::tokio::spawn(async move { - let client = spacegate_ext_redis::RedisClientRepo::global() - .get(&gateway_name) - .ok_or_else(|| spacegate_ext_redis::RedisClientRepoError::new(gateway_name.as_ref(), "not found"))?; - let count = SlidingWindowCounter::new(interval, &cache_window_key).add_and_count(now, &client).await?; - let status = if count >= unhealthy_threshold as u64 { - status_plugin::Status::Major - } else { - status_plugin::Status::Minor - }; - update_status(&backend_host, &cache_key, client, status).await?; - Result::<_, BoxError>::Ok(()) - }); - } else { - tardis::tokio::spawn(async move { - let client = spacegate_ext_redis::RedisClientRepo::global() - .get(&gateway_name) - .ok_or_else(|| spacegate_ext_redis::RedisClientRepoError::new(gateway_name.as_ref(), "not found"))?; - if let Some(status) = get_status(&backend_host, &cache_key, &client).await? { - if status != status_plugin::Status::Good { - update_status(&backend_host, &cache_key, client, status_plugin::Status::Good).await?; - } - } - Result::<_, BoxError>::Ok(()) - }); - } - } - } -} - -impl MakeSgLayer for SgFilterStatusConfig { - fn make_layer(&self) -> Result { - Err(BoxError::from("status plugin is only supported on gateway layer")) - } - fn install_on_gateway(&self, gateway: &mut SgGatewayLayerBuilder) -> Result<(), BoxError> { - let gateway_name = gateway.gateway_name.clone(); - let cancel_guard = gateway.cancel_token.clone(); - let config = self.clone(); - tokio::spawn(async move { - if let Err(e) = server::launch_status_server(&config, gateway_name, cancel_guard).await { - tracing::error!("[SG.Filter.Status] launch status server error: {e}"); - } - }); - - let gateway_name = gateway.gateway_name.clone(); - #[cfg(feature = "cache")] - let layer = { - let policy = CachePolicy { - unhealthy_threshold: self.unhealthy_threshold, - interval: Duration::try_seconds(self.interval as i64).ok_or_else(|| format!("[SG.Filter.Status] invalid interval config[{}]", self.interval))?, - status_cache_key: self.status_cache_key.clone().into(), - window_cache_key: self.window_cache_key.clone().into(), - gateway_name, - }; - SgBoxLayer::new(helper_layers::stat::StatLayer::new(policy)) - }; - #[cfg(not(feature = "cache"))] - let layer = { - let counter = Arc::new(RwLock::new(SlidingWindowCounter::new(Duration::seconds(self.interval as i64), 60))); - let policy = DefaultPolicy { - counter, - unhealthy_threshold: self.unhealthy_threshold, - }; - SgBoxLayer::new(helper_layers::stat::StatLayer::new(policy)) - }; - gateway.http_plugins.push(layer); - Ok(()) - } -} - -def_plugin!("status", StatusPlugin, SgFilterStatusConfig; #[cfg(feature = "schema")] schema;); - -#[cfg(feature = "schema")] -crate::schema!(StatusPlugin, SgFilterStatusConfig); diff --git a/crates/plugin/src/plugins/status/server.rs b/crates/plugin/src/plugins/status/server.rs deleted file mode 100644 index 2ceb0290..00000000 --- a/crates/plugin/src/plugins/status/server.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::{net::IpAddr, str::FromStr, sync::Arc, time::Duration}; - -use super::{status_plugin, SgFilterStatusConfig}; -use hyper::{body::Incoming, service::service_fn, Request}; -use hyper_util::rt::{TokioExecutor, TokioIo}; -use spacegate_kernel::BoxError; -use tardis::tokio::{self}; -use tokio_util::sync::CancellationToken; -use tracing::instrument; -#[instrument(skip(cancel_signal, config))] -pub async fn launch_status_server(config: &SgFilterStatusConfig, gateway_name: Arc, cancel_signal: CancellationToken) -> Result<(), BoxError> { - let host = IpAddr::from_str(&config.host)?; - let port = config.port; - // just wait 500ms for prev server to shutdown - let bind_fail_instant = tokio::time::Instant::now() + Duration::from_millis(500); - let listener = loop { - match tokio::net::TcpListener::bind((host, port)).await { - Ok(listener) => break listener, - Err(e) => { - if std::io::ErrorKind::AddrInUse == e.kind() && bind_fail_instant.elapsed().is_zero() { - tokio::task::yield_now().await; - continue; - } else { - tracing::warn!("[Sg.Plugin.Status] fail to bind {host}:{port}, error: {e}"); - return Err(Box::new(e)); - } - } - } - }; - let cache_key = Arc::::from(config.status_cache_key.clone().as_str()); - let title = Arc::::from(config.title.clone().as_str()); - 'accept_loop: loop { - let (stream, _peer) = tokio::select! { - _ = cancel_signal.cancelled() => { - tracing::info!("[Sg.Plugin.Status] cancelled by cancel signal"); - break 'accept_loop; - } - _ = tokio::signal::ctrl_c() => { - tracing::info!("[Sg.Plugin.Status] cancelled by ctrl+c signal"); - break 'accept_loop; - } - accept = listener.accept() => { - match accept { - Ok(incoming) => incoming, - Err(e) => { - tracing::error!("[Sg.Plugin.Status] Status server accept error: {:?}", e); - continue 'accept_loop; - } - } - } - }; - let gateway_name = gateway_name.clone(); - let cache_key = cache_key.clone(); - let title = title.clone(); - tokio::spawn(async move { - let connector = hyper_util::server::conn::auto::Builder::new(TokioExecutor::default()); - let connection = connector.serve_connection( - TokioIo::new(stream), - service_fn(move |req: Request| Box::pin(status_plugin::create_status_html(req, gateway_name.clone(), cache_key.clone(), title.clone()))), - ); - let result = connection.await; - if let Err(e) = result { - tracing::error!("[Sg.Plugin.Status] Status server connection error: {:?}", e); - } - }); - } - status_plugin::clean_status(&cache_key, &gateway_name).await?; - Ok(()) -} diff --git a/crates/plugin/src/plugins/status/sliding_window.rs b/crates/plugin/src/plugins/status/sliding_window.rs deleted file mode 100644 index 851f1149..00000000 --- a/crates/plugin/src/plugins/status/sliding_window.rs +++ /dev/null @@ -1,368 +0,0 @@ -use tardis::basic::{error::TardisError, result::TardisResult}; -#[cfg(feature = "cache")] -use tardis::cache::Script; -use tardis::chrono::{DateTime, Duration, Utc}; -#[cfg(feature = "cache")] -use tardis::tardis_static; - -pub(super) const DEFAULT_CONF_WINDOW_KEY: &str = "sg:plugin:filter:window:key"; - -#[cfg(feature = "cache")] -tardis_static! { - /// Sliding window script - /// - /// # Arguments - /// - /// * KEYS[1] window key - /// * ARGV[1] window size - /// * ARGV[2] current timestamp - /// * ARGV[3] current sub_second microsecond - /// - /// # Return - /// - /// * count - /// - /// # Kernel logic - /// - /// -- Extract the key from the KEYS array, which represents the Redis key used for the sorted set. - /// local key = KEYS[1] - /// - /// -- Convert the window size from the ARGV array into a numeric value. - /// local window_size = tonumber(ARGV[1]) - /// - /// -- Convert the current time timestamp, including seconds and microseconds, from ARGV into numeric values. - /// local current_time_timestamp = tonumber(ARGV[2]) - /// local current_time_subsec_micros = tonumber(ARGV[3]) - /// - /// -- Calculate the member_score, which combines timestamp and microseconds. - /// local member_score = ((current_time_timestamp % 10000000) * 1000000) + current_time_subsec_micros - /// - /// -- Calculate the timestamp when the current window should expire, in milliseconds. - /// local window_expire_at = (current_time_timestamp * 1000) + window_size - /// - /// -- Remove elements from the sorted set that are older than (now - window) based on member_score. - /// redis.call('ZREMRANGEBYSCORE', key, 0, (member_score - (window_size * 1000))) - /// - /// -- Get the number of requests in the current window by counting the elements in the sorted set. - /// local current_requests_count = redis.call('ZCARD', key) - /// - /// -- Add the current request's member_score to the sorted set. - /// redis.call('ZADD', key, member_score, member_score) - /// - /// -- Set the expiration time for the key, specifying when the window should expire. - /// redis.call('PEXPIRE', key, window_expire_at) - /// - /// -- Return the count of requests in the current window. - /// return current_requests_count - pub script: Script = Script::new( - r" - local key = KEYS[1] - - local window_size = tonumber(ARGV[1]) - local current_time_timestamp = tonumber(ARGV[2]) - local current_time_subsec_micros = tonumber(ARGV[3]) - - local member_score = ((current_time_timestamp % 10000000) * 1000000) + current_time_subsec_micros - local window_expire_at = (current_time_timestamp * 1000) + window_size - - redis.call('ZREMRANGEBYSCORE', key, 0, (member_score - (window_size * 1000))) - - local current_requests_count = redis.call('ZCARD', key) - - redis.call('ZADD', key, member_score, member_score) - - redis.call('PEXPIRE', key, window_expire_at) - - return current_requests_count - ", - ); -} - -/// # SlidingWindowCounter: -/// -/// This is a sliding window counter that provides two implementations. When -/// the 'cache features' option is enabled, it uses Redis for storage. -/// When using the cache implementation, it can support the 'status' plugin to -/// run in a distributed manner. -/// The other implementation is memory-based, and it does not support distributed -/// operation via the 'status' plugin. -/// -/// ## Performance: -/// - Redis Implementation: Adds and counts operations take close to a milliseconds. -/// - Memory Implementation: Adds and counts operations take nanoseconds. -/// -/// -/// ## Note: -/// - The Redis-based implementation is suitable for distributed systems -/// and offers higher-level performance with millisecond-level accuracy. -/// - The Memory-based implementation is more efficient in terms of performance -/// but lacks distributed support and offers nanosecond-level accuracy. -#[derive(Debug, Clone)] -pub struct SlidingWindowCounter { - window_size: Duration, - #[cfg(not(feature = "cache"))] - data: Vec, - /// slot_num equal to data.len() - #[cfg(not(feature = "cache"))] - slot_num: usize, - /// milliseconds - #[cfg(not(feature = "cache"))] - interval: i64, - /// range: 0--(slot_num-1) - #[cfg(not(feature = "cache"))] - start_slot: usize, - #[cfg(feature = "cache")] - window_key: String, -} - -impl SlidingWindowCounter { - #[cfg(feature = "cache")] - pub fn new(window_size: Duration, window_key: &str) -> Self { - SlidingWindowCounter { - window_size, - window_key: window_key.to_string(), - } - } - - #[cfg(not(feature = "cache"))] - pub fn new(window_size: Duration, _slot_num: usize) -> Self { - let interval = window_size.num_milliseconds() / _slot_num as i64; - let mut result = SlidingWindowCounter { - window_size, - data: vec![Slot::default(); _slot_num], - slot_num: _slot_num, - interval, - start_slot: 0, - }; - result.init(Utc::now()); - result - } - - #[cfg(not(feature = "cache"))] - /// Initialize the sliding window ,set start_slot 0 - pub fn init(&mut self, now: DateTime) { - let mut start_slot_time = now; - for i in 0..self.slot_num { - self.data[i] = Slot::new(start_slot_time); - start_slot_time += Duration::milliseconds(self.interval); - } - self.start_slot = 0; - } - - #[cfg(not(feature = "cache"))] - // move_index range: 1--slot_num - fn init_part(&mut self, move_index: i64) -> TardisResult<()> { - if self.slot_num < move_index as usize { - return Err(TardisError::bad_request("move index out of range", "")); - } - - let last_slot_index = (self.start_slot + self.slot_num - 1) % self.slot_num; - let mut move_slot_time = self.data[last_slot_index].time; - move_slot_time += Duration::milliseconds(self.interval); - - for i in 0..move_index as usize { - let index = (i + self.start_slot) % self.slot_num; - self.data[index] = Slot::new(move_slot_time); - move_slot_time += Duration::milliseconds(self.interval); - } - - self.start_slot = (move_index as usize + self.start_slot) % self.slot_num; - Ok(()) - } - - #[cfg(not(feature = "cache"))] - pub fn add_one(&mut self, now: DateTime) { - let start_slot = &self.data[self.start_slot]; - let mut start_slot_time = start_slot.time; - if start_slot_time + self.window_size <= now { - if start_slot_time + self.window_size * 2 <= now { - self.init(now); - } else { - let move_index = (now - start_slot_time - self.window_size).num_milliseconds() / self.interval + 1; - self.init_part(move_index).expect("init part failed"); - } - start_slot_time = self.data[self.start_slot].time; - } - // found a slot by now - let slot_index = (now - start_slot_time).num_milliseconds() / self.interval; - let add_slot_index = (slot_index as usize + self.start_slot) % self.slot_num; - self.data[add_slot_index].count += 1; - } - - #[cfg(not(feature = "cache"))] - pub fn count_in_window(&self, now: DateTime) -> u64 { - self.data.iter().map(|slot| if (now - self.window_size) <= slot.time { slot.count } else { 0 }).sum::() - } - - #[cfg(feature = "cache")] - pub async fn add_and_count(&self, now: DateTime, client: &spacegate_ext_redis::RedisClient) -> TardisResult { - let result: u64 = script() - .key((if self.window_key.is_empty() { DEFAULT_CONF_WINDOW_KEY } else { &self.window_key }).to_string()) - .arg(self.window_size.num_milliseconds()) - .arg(now.timestamp()) - .arg(now.timestamp_subsec_micros()) - .invoke_async(&mut client.get_conn().await) - .await - .map_err(|e| TardisError::internal_error(&format!("[SG.Filter.Status] redis error : {e}"), ""))?; - Ok(result) - } - - #[cfg(not(feature = "cache"))] - pub fn add_and_count(&mut self, now: DateTime) -> u64 { - let result = self.count_in_window(now); - self.add_one(now); - result - } - - #[cfg(not(feature = "cache"))] - #[allow(dead_code)] - fn get_data(&self) -> &[Slot] { - &self.data - } -} - -#[cfg(not(feature = "cache"))] -#[derive(Default, Clone, Debug)] -struct Slot { - time: DateTime, - count: u64, -} - -#[cfg(not(feature = "cache"))] -impl Slot { - fn new(start_time: DateTime) -> Self { - Slot { time: start_time, count: 0 } - } -} - -#[cfg(test)] -mod tests { - use super::*; - #[cfg(feature = "cache")] - use tardis::test::test_container::TardisTestContainer; - #[cfg(feature = "cache")] - use tardis::{testcontainers, tokio}; - - #[test] - #[cfg(not(feature = "cache"))] - fn test() { - let mut test = SlidingWindowCounter::new(Duration::seconds(60), 12); - test.init(DateTime::parse_from_rfc3339("2000-01-01T01:00:00.000Z").unwrap().into()); - - assert_eq!(test.get_data().len(), 12); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:01.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:01.200Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:01.300Z").unwrap().into()); - assert_eq!(test.get_data()[0].count, 3); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:59.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:59.200Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:00:59.300Z").unwrap().into()); - assert_eq!(test.get_data()[11].count, 3); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:01:00.000Z").unwrap().into()), 6); - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:02:00.000Z").unwrap().into()), 0); - - // test add out of window time - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:00.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:00.200Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:00.300Z").unwrap().into()); - assert_eq!(test.get_data()[0].count, 3); - assert_eq!(test.start_slot, 1); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:02:00.000Z").unwrap().into()), 3); - - //slide window - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:06.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:06.200Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:06.300Z").unwrap().into()); - assert_eq!(test.get_data()[1].count, 3); - assert_eq!(test.start_slot, 2); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:02:00.000Z").unwrap().into()), 6); - - //slide window - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:50.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:01:50.200Z").unwrap().into()); - assert_eq!(test.get_data()[10].count, 2); - assert_eq!(test.start_slot, 11); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:02:00.000Z").unwrap().into()), 8); - - //test reinit - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:03:05.100Z").unwrap().into()); - assert_eq!(test.get_data()[0].count, 1); - assert_eq!(test.start_slot, 0); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:03:06.000Z").unwrap().into()), 1); - - //test critical case - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:04:05.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:04:05.100Z").unwrap().into()); - assert_eq!(test.get_data()[0].count, 2); - assert_eq!(test.start_slot, 1); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:04:05.100Z").unwrap().into()), 2); - - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:05:10.100Z").unwrap().into()); - test.add_one(DateTime::parse_from_rfc3339("2000-01-01T01:05:10.100Z").unwrap().into()); - assert_eq!(test.get_data()[0].count, 2); - assert_eq!(test.start_slot, 0); - - assert_eq!(test.count_in_window(DateTime::parse_from_rfc3339("2000-01-01T01:05:10.100Z").unwrap().into()), 2); - } - - #[tokio::test] - #[cfg(feature = "cache")] - async fn test() { - let _init = tardis::basic::tracing::TardisTracingInitializer::default().with_env_layer().with_fmt_layer().init(); - let docker = testcontainers::clients::Cli::default(); - let redis_container = TardisTestContainer::redis_custom(&docker); - let port = redis_container.get_host_port_ipv4(6379); - let url = format!("redis://127.0.0.1:{port}/0",); - let repo = spacegate_ext_redis::RedisClientRepo::global(); - repo.add("test_gate1".to_owned(), url.as_str()); - let client = std::sync::Arc::new(repo.get("test_gate1").unwrap()); - // fn new_ctx() -> SgRoutePluginContext { - // SgRoutePluginContext::new_http( - // Method::GET, - // Uri::from_static("http://sg.idealworld.group/iam/ct/001?name=sg"), - // Version::HTTP_11, - // HeaderMap::new(), - // Body::empty(), - // "127.0.0.1:8080".parse().unwrap(), - // "test_gate1".to_string(), - // None, - // None, - // ) - // } - - let test = SlidingWindowCounter::new(Duration::seconds(60), ""); - - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:00:50.100Z").unwrap().into(), &client).await.unwrap(), - 0 - ); - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:00:55.100Z").unwrap().into(), &client).await.unwrap(), - 1 - ); - - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:01:50.100Z").unwrap().into(), &client).await.unwrap(), - 1 - ); - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:01:55.000Z").unwrap().into(), &client).await.unwrap(), - 2 - ); - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:01:55.100Z").unwrap().into(), &client).await.unwrap(), - 2 - ); - - assert_eq!( - test.add_and_count(DateTime::parse_from_rfc3339("2000-01-01T01:05:00.100Z").unwrap().into(), &client).await.unwrap(), - 0 - ); - } -} diff --git a/crates/plugin/src/plugins/status/status.html b/crates/plugin/src/plugins/status/status.html deleted file mode 100644 index 6faede78..00000000 --- a/crates/plugin/src/plugins/status/status.html +++ /dev/null @@ -1,83 +0,0 @@ - - - - - - {title} - - - - -

-
-

{title}

-
-
- {status} -
- - - \ No newline at end of file diff --git a/crates/plugin/src/plugins/status/status_plugin.rs b/crates/plugin/src/plugins/status/status_plugin.rs deleted file mode 100644 index de9831fb..00000000 --- a/crates/plugin/src/plugins/status/status_plugin.rs +++ /dev/null @@ -1,129 +0,0 @@ -#![allow(unused_assignments)] -use http_body_util::Full; -use hyper::{Request, Response}; -use serde::{Deserialize, Serialize}; -use spacegate_kernel::BoxError; -use std::sync::Arc; - -type BoxResult = Result; -#[cfg(not(feature = "cache"))] -use std::collections::HashMap; -#[cfg(not(feature = "cache"))] -use tardis::tardis_static; -#[cfg(not(feature = "cache"))] -use tardis::tokio::sync::RwLock; -#[cfg(feature = "cache")] -use spacegate_ext_redis::redis::AsyncCommands; -#[cfg(feature = "cache")] -use tardis::TardisFuns; -#[cfg(not(feature = "cache"))] -tardis_static! { - server_status: Arc>> = <_>::default(); -} -const STATUS_TEMPLATE: &str = include_str!("status.html"); - -#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub(crate) enum Status { - #[default] - Good, - Minor, - Major, -} - -impl Status { - fn to_html_css_class(&self) -> String { - match self { - Status::Good => "good".to_string(), - Status::Minor => "minor".to_string(), - Status::Major => "major".to_string(), - } - } -} - -pub(crate) async fn create_status_html( - _: Request, - _gateway_name: Arc, - _cache_key: Arc, - title: Arc, -) -> Result>, hyper::Error> { - let mut keys = Vec::::new(); - #[cfg(feature = "cache")] - { - let cache_client = crate::cache::Cache::get(_gateway_name.as_ref()).await.expect("get cache client error!"); - keys = cache_client.hkeys(_cache_key.as_ref()).await.expect("get cache keys error!"); - } - #[cfg(not(feature = "cache"))] - { - let status = server_status().read().await; - keys = status.keys().cloned().collect::>(); - } - let mut service_html = "".to_string(); - for ref key in keys { - let status; - #[cfg(feature = "cache")] - { - let cache_client = crate::cache::Cache::get(_gateway_name.as_ref()).await.expect("get cache client error!"); - status = get_status(key, _cache_key.as_ref(), &cache_client).await.expect(""); - } - #[cfg(not(feature = "cache"))] - { - status = get_status(&key).await.expect(""); - } - if let Some(status) = status { - service_html.push_str( - format!( - r##"
-
{}
-
Status
-
"##, - key, - status.to_html_css_class() - ) - .as_str(), - ); - }; - } - let html = STATUS_TEMPLATE.replace("{title}", title.as_ref()).replace("{status}", &service_html); - - Ok(Response::new(Full::new(html.into()))) -} - -#[cfg(feature = "cache")] -pub(crate) async fn update_status(server_name: &str, _cache_key: &str, client: &spacegate_ext_redis::RedisClient, status: Status) -> BoxResult<()> { - client.get_conn().await.hset(_cache_key, server_name, &TardisFuns::json.obj_to_string(&status)?).await?; - Ok(()) -} -#[cfg(not(feature = "cache"))] -pub(crate) async fn update_status(server_name: &str, status: Status) -> BoxResult<()> { - let mut server_status = server_status().write().await; - server_status.insert(server_name.to_string(), status); - Ok(()) -} - -#[cfg(feature = "cache")] -pub(crate) async fn get_status(server_name: &str, cache_key: &str, client: &spacegate_ext_redis::RedisClient) -> BoxResult> { - match client.get_conn().await.hget(cache_key, server_name).await? { - Some(result) => Ok(Some(TardisFuns::json.str_to_obj(&result)?)), - None => Ok(None), - } -} -#[cfg(not(feature = "cache"))] -pub(crate) async fn get_status(server_name: &str) -> BoxResult> { - let server_status = server_status().read().await; - Ok(server_status.get(server_name).cloned()) -} - -#[cfg(feature = "cache")] -pub(crate) async fn clean_status(cache_key: &str, gateway_name: &str) -> BoxResult<()> { - let client = spacegate_ext_redis::RedisClientRepo::global().get(gateway_name); - client.del(cache_key).await?; - Ok(()) -} - -#[cfg(not(feature = "cache"))] -pub(crate) async fn clean_status(cache_key: &str, gateway_name: &str) -> BoxResult<()> { - let mut server_status = server_status().write().await; - server_status.clear(); - - Ok(()) -} diff --git a/crates/plugin/src/plugins/status_prev.rs b/crates/plugin/src/plugins/status_prev.rs deleted file mode 100644 index 698faab5..00000000 --- a/crates/plugin/src/plugins/status_prev.rs +++ /dev/null @@ -1,425 +0,0 @@ -use std::net::IpAddr; -use std::{collections::HashMap, sync::Arc}; - -use async_trait::async_trait; -use http::Request; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Server}; - -use serde::{Deserialize, Serialize}; -use tardis::chrono::{Duration, Utc}; -use tardis::tokio::task::JoinHandle; -use tardis::{ - basic::result::TardisResult, - log, - tokio::{ - self, - sync::{watch::Sender, Mutex}, - }, -}; - -#[cfg(feature = "cache")] -use crate::functions::cache_client; - -use self::status_plugin::{clean_status, get_status, update_status, Status}; -use super::{SgAttachedLevel, SgPluginFilter, SgPluginFilterInitDto, SgRoutePluginContext}; -use crate::def_filter; -use crate::plugins::filters::status::sliding_window::SlidingWindowCounter; -use lazy_static::lazy_static; -use tardis::basic::error::TardisError; -#[cfg(not(feature = "cache"))] -use tardis::tokio::sync::RwLock; - -lazy_static! { - static ref SHUTDOWN_TX: Arc, JoinHandle>)>>> = Default::default(); -} - -pub mod sliding_window; -pub mod status_plugin; - -def_filter!("status", SgFilterStatusDef, SgFilterStatus); - -#[derive(Debug, Serialize, Deserialize)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] -#[serde(default)] -pub struct SgFilterStatus { - pub serv_addr: String, - pub port: u16, - pub title: String, - /// Unhealthy threshold , if server error more than this, server will be tag as unhealthy - pub unhealthy_threshold: u16, - /// second - pub interval: u64, - #[cfg(not(feature = "cache"))] - #[serde(skip)] - counter: RwLock, - #[cfg(feature = "cache")] - pub status_cache_key: String, - #[cfg(feature = "cache")] - pub window_cache_key: String, -} - -impl Default for SgFilterStatus { - fn default() -> Self { - Self { - serv_addr: "0.0.0.0".to_string(), - port: 8110, - title: "System Status".to_string(), - unhealthy_threshold: 3, - interval: 5, - #[cfg(feature = "cache")] - status_cache_key: "spacegate:cache:plugin:status".to_string(), - #[cfg(feature = "cache")] - window_cache_key: sliding_window::DEFAULT_CONF_WINDOW_KEY.to_string(), - #[cfg(not(feature = "cache"))] - counter: RwLock::new(SlidingWindowCounter::new(Duration::seconds(3), 60)), - } - } -} - -#[async_trait] -impl SgPluginFilter for SgFilterStatus { - fn accept(&self) -> super::SgPluginFilterAccept { - super::SgPluginFilterAccept { - kind: vec![super::SgPluginFilterKind::Http], - accept_error_response: true, - } - } - - async fn init(&mut self, init_dto: &SgPluginFilterInitDto) -> TardisResult<()> { - if !init_dto.attached_level.eq(&SgAttachedLevel::Gateway) { - log::error!("[SG.Filter.Status] init filter is only can attached to gateway"); - return Ok(()); - } - let (shutdown_tx, _) = tokio::sync::watch::channel(()); - let mut shutdown_rx = shutdown_tx.subscribe(); - - let mut shutdown = SHUTDOWN_TX.lock().await; - if let Some(old_shutdown) = shutdown.remove(&self.port) { - old_shutdown.0.send(()).ok(); - let _ = old_shutdown.1.await; - log::trace!("[SG.Filter.Status] init stop old service."); - } - - let addr_ip: IpAddr = self.serv_addr.parse().map_err(|e| TardisError::conflict(&format!("[SG.Filter.Status] serv_addr parse error: {e}"), ""))?; - let addr = (addr_ip, self.port).into(); - let title = Arc::new(Mutex::new(self.title.clone())); - let gateway_name = Arc::new(Mutex::new(init_dto.gateway_name.clone())); - let cache_key = Arc::new(Mutex::new(get_cache_key(self, &init_dto.gateway_name))); - let make_svc = make_service_fn(move |_conn| { - let title = title.clone(); - let gateway_name = gateway_name.clone(); - let cache_key = cache_key.clone(); - async move { - Ok::<_, hyper::Error>(service_fn(move |request: Request| { - status_plugin::create_status_html(request, gateway_name.clone(), cache_key.clone(), title.clone()) - })) - } - }); - - let server = match Server::try_bind(&addr) { - Ok(server) => server.serve(make_svc), - Err(e) => return Err(TardisError::conflict(&format!("[SG.Filter.Status] bind error: {e}"), "")), - }; - - let join = tokio::spawn(async move { - log::info!("[SG.Filter.Status] Server started: {addr}"); - let server = server.with_graceful_shutdown(async move { - shutdown_rx.changed().await.ok(); - }); - server.await - }); - (*shutdown).insert(self.port, (shutdown_tx, join)); - - #[cfg(feature = "cache")] - { - clean_status(&get_cache_key(self, &init_dto.gateway_name), &init_dto.gateway_name).await?; - } - #[cfg(not(feature = "cache"))] - { - clean_status().await?; - } - for http_route_rule in init_dto.http_route_rules.clone() { - if let Some(backends) = &http_route_rule.backends { - for backend in backends { - #[cfg(feature = "cache")] - { - let cache_client = cache_client::get(&init_dto.gateway_name).await?; - update_status( - &backend.name_or_host, - &get_cache_key(self, &init_dto.gateway_name), - &cache_client, - status_plugin::Status::default(), - ) - .await?; - } - #[cfg(not(feature = "cache"))] - { - update_status(&backend.name_or_host, status_plugin::Status::default()).await?; - } - } - } - } - #[cfg(not(feature = "cache"))] - { - self.counter = RwLock::new(SlidingWindowCounter::new(Duration::seconds(self.interval as i64), 60)); - } - Ok(()) - } - - async fn destroy(&self) -> TardisResult<()> { - let mut shutdown = SHUTDOWN_TX.lock().await; - - if let Some(shutdown) = shutdown.remove(&self.port) { - shutdown.0.send(()).ok(); - let _ = shutdown.1.await; - log::info!("[SG.Filter.Status] Server stopped"); - }; - Ok(()) - } - - async fn req_filter(&self, _: &str, ctx: SgRoutePluginContext) -> TardisResult<(bool, SgRoutePluginContext)> { - Ok((true, ctx)) - } - - async fn resp_filter(&self, _: &str, ctx: SgRoutePluginContext) -> TardisResult<(bool, SgRoutePluginContext)> { - if let Some(backend_name) = ctx.get_chose_backend_name() { - if ctx.is_resp_error() { - let now = Utc::now(); - let count; - #[cfg(not(feature = "cache"))] - { - let mut counter = self.counter.write().await; - count = counter.add_and_count(now) - } - #[cfg(feature = "cache")] - { - count = SlidingWindowCounter::new(Duration::seconds(self.interval as i64), &self.window_cache_key).add_and_count(now, &ctx).await?; - } - if count >= self.unhealthy_threshold as u64 { - #[cfg(feature = "cache")] - { - update_status( - &backend_name, - &get_cache_key(self, &ctx.get_gateway_name()), - &ctx.cache().await?, - status_plugin::Status::Major, - ) - .await?; - } - #[cfg(not(feature = "cache"))] - { - update_status(&backend_name, status_plugin::Status::Major).await?; - } - } else { - #[cfg(feature = "cache")] - { - update_status( - &backend_name, - &get_cache_key(self, &ctx.get_gateway_name()), - &ctx.cache().await?, - status_plugin::Status::Minor, - ) - .await?; - } - #[cfg(not(feature = "cache"))] - { - update_status(&backend_name, status_plugin::Status::Minor).await?; - } - } - } else { - let gotten_status: Option; - #[cfg(feature = "cache")] - { - gotten_status = get_status(&backend_name, &get_cache_key(self, &ctx.get_gateway_name()), &ctx.cache().await?).await?; - } - #[cfg(not(feature = "cache"))] - { - gotten_status = get_status(&backend_name).await?; - } - if let Some(status) = gotten_status { - if status != status_plugin::Status::Good { - #[cfg(feature = "cache")] - { - update_status( - &backend_name, - &get_cache_key(self, &ctx.get_gateway_name()), - &ctx.cache().await?, - status_plugin::Status::Good, - ) - .await?; - } - #[cfg(not(feature = "cache"))] - { - update_status(&backend_name, status_plugin::Status::Good).await?; - } - } - } - } - } - Ok((true, ctx)) - } -} - -#[cfg(feature = "cache")] -fn get_cache_key(filter_status: &SgFilterStatus, gateway_name: &str) -> String { - format!("{}:{}", filter_status.status_cache_key, gateway_name) -} -#[cfg(not(feature = "cache"))] -// not use in not cache mode; -fn get_cache_key(_: &SgFilterStatus, _: &str) -> String { - String::new() -} - -#[cfg(test)] -mod tests { - use std::env; - - use http::{HeaderMap, Method, StatusCode, Uri, Version}; - use hyper::Body; - use tardis::{ - basic::{error::TardisError, result::TardisResult}, - test::test_container::TardisTestContainer, - testcontainers::{self, clients::Cli, Container}, - tokio, - }; - use testcontainers_modules::redis::Redis; - - #[cfg(feature = "cache")] - use crate::functions; - #[cfg(feature = "cache")] - use crate::plugins::filters::status::get_cache_key; - use crate::{ - config::{ - gateway_dto::SgParameters, - http_route_dto::{SgBackendRef, SgHttpRouteRule}, - }, - instance::{SgBackendInst, SgHttpRouteRuleInst}, - plugins::{ - context::ChosenHttpRouteRuleInst, - filters::{ - status::{ - status_plugin::{get_status, Status}, - SgFilterStatus, - }, - SgPluginFilter, SgPluginFilterInitDto, SgRoutePluginContext, - }, - }, - }; - - #[tokio::test] - async fn test_status() { - env::set_var("RUST_LOG", "info,spacegate_shell=trace"); - tracing_subscriber::fmt::init(); - let mut stats = SgFilterStatus::default(); - let mock_backend_ref = SgBackendRef { - name_or_host: "test1".to_string(), - namespace: None, - port: 80, - timeout_ms: None, - protocol: Some(crate::config::gateway_dto::SgProtocol::Http), - weight: None, - filters: None, - }; - let docker = testcontainers::clients::Cli::default(); - let _x = docker_init(&docker).await.unwrap(); - let gateway_name = "gateway_name1".to_string(); - - #[cfg(feature = "cache")] - functions::cache_client::init(&gateway_name, &env::var("TARDIS_FW.CACHE.URL").unwrap()).await.unwrap(); - - stats - .init(&SgPluginFilterInitDto { - gateway_name: gateway_name.clone(), - gateway_parameters: SgParameters::default(), - http_route_rules: vec![SgHttpRouteRule { - matches: None, - filters: None, - backends: Some(vec![mock_backend_ref.clone()]), - timeout_ms: None, - }], - attached_level: crate::plugins::filters::SgAttachedLevel::Gateway, - }) - .await - .unwrap(); - let mock_backend = SgBackendInst { - name_or_host: mock_backend_ref.name_or_host, - namespace: mock_backend_ref.namespace, - port: mock_backend_ref.port, - timeout_ms: mock_backend_ref.timeout_ms, - protocol: mock_backend_ref.protocol, - weight: mock_backend_ref.weight, - filters: vec![], - }; - let mut ctx = SgRoutePluginContext::new_http( - Method::POST, - Uri::from_static("http://sg.idealworld.group/iam/ct/001?name=sg"), - Version::HTTP_11, - HeaderMap::new(), - Body::empty(), - "127.0.0.1:8080".parse().unwrap(), - gateway_name.clone(), - Some(ChosenHttpRouteRuleInst::cloned_from(&SgHttpRouteRuleInst { ..Default::default() }, None)), - None, - ); - - ctx.set_chose_backend_inst(&mock_backend); - - let ctx = ctx.resp_from_error(TardisError::bad_request("mock resp error", "")); - let (is_ok, ctx) = stats.resp_filter("id1", ctx).await.unwrap(); - assert!(is_ok); - - let gotten_status: Status; - #[cfg(feature = "cache")] - { - gotten_status = get_status(&mock_backend.name_or_host, &get_cache_key(&stats, &ctx.get_gateway_name()), ctx.cache().await.unwrap()).await.unwrap().unwrap(); - } - #[cfg(not(feature = "cache"))] - { - gotten_status = get_status(&mock_backend.name_or_host).await.unwrap().unwrap(); - } - assert_eq!(gotten_status, Status::Minor); - - let (_, ctx) = stats.resp_filter("id2", ctx).await.unwrap(); - let (_, ctx) = stats.resp_filter("id3", ctx).await.unwrap(); - let (_, ctx) = stats.resp_filter("id4", ctx).await.unwrap(); - - let gotten_status: Status; - #[cfg(feature = "cache")] - { - gotten_status = get_status(&mock_backend.name_or_host, &get_cache_key(&stats, &ctx.get_gateway_name()), ctx.cache().await.unwrap()).await.unwrap().unwrap(); - } - #[cfg(not(feature = "cache"))] - { - gotten_status = get_status(&mock_backend.name_or_host).await.unwrap().unwrap(); - } - assert_eq!(gotten_status, Status::Major); - - let ctx = ctx.resp(StatusCode::OK, HeaderMap::new(), Body::empty()); - let (_, _ctx) = stats.resp_filter("id4", ctx).await.unwrap(); - - let gotten_status: Status; - #[cfg(feature = "cache")] - { - gotten_status = get_status(&mock_backend.name_or_host, &get_cache_key(&stats, &_ctx.get_gateway_name()), _ctx.cache().await.unwrap()).await.unwrap().unwrap(); - } - #[cfg(not(feature = "cache"))] - { - gotten_status = get_status(&mock_backend.name_or_host).await.unwrap().unwrap(); - } - assert_eq!(gotten_status, Status::Good); - } - - pub struct LifeHold<'a> { - pub redis: Container<'a, Redis>, - } - - async fn docker_init(docker: &Cli) -> TardisResult> { - let redis_container = TardisTestContainer::redis_custom(docker); - let port = redis_container.get_host_port_ipv4(6379); - let url = format!("redis://127.0.0.1:{port}/0",); - env::set_var("TARDIS_FW.CACHE.URL", url); - - Ok(LifeHold { redis: redis_container }) - } -} diff --git a/crates/shell/Cargo.toml b/crates/shell/Cargo.toml index 358d7f61..bcf38d72 100644 --- a/crates/shell/Cargo.toml +++ b/crates/shell/Cargo.toml @@ -45,7 +45,6 @@ plugin-redirect = ["spacegate-plugin/redirect"] plugin-retry = ["spacegate-plugin/retry"] plugin-rewrite = ["spacegate-plugin/rewrite"] plugin-maintenance = ["spacegate-plugin/maintenance"] -# plugin-decompression = ["spacegate-plugin/decompression"] plugin-status = ["spacegate-plugin/status"] plugin-dylib = ["spacegate-plugin/dylib"] plugin-set-version = ["spacegate-plugin/set-version"] diff --git a/crates/shell/src/server.rs b/crates/shell/src/server.rs index 899e21ea..9752d233 100644 --- a/crates/shell/src/server.rs +++ b/crates/shell/src/server.rs @@ -182,7 +182,10 @@ impl RunningSgGateway { let store = Self::global_store(); let mut task = tokio::task::JoinSet::new(); { - let mut g_store = store.lock().expect("poisoned lock"); + let mut g_store = store.lock().unwrap_or_else(|e| { + tracing::warn!("[SG.Server] recovering from poisoned global gateway store lock during reset"); + e.into_inner() + }); for (_, s) in g_store.drain() { task.spawn(s.shutdown()); } @@ -198,13 +201,19 @@ impl RunningSgGateway { } pub fn global_save(gateway_name: impl Into, gateway: RunningSgGateway) { let global_store = Self::global_store(); - let mut global_store = global_store.lock().expect("poisoned lock"); + let mut global_store = global_store.lock().unwrap_or_else(|e| { + tracing::warn!("[SG.Server] recovering from poisoned global gateway store lock during save"); + e.into_inner() + }); global_store.insert(gateway_name.into(), gateway); } pub fn global_remove(gateway_name: impl AsRef) -> Option { let global_store = Self::global_store(); - let mut global_store = global_store.lock().expect("poisoned lock"); + let mut global_store = global_store.lock().unwrap_or_else(|e| { + tracing::warn!("[SG.Server] recovering from poisoned global gateway store lock during remove"); + e.into_inner() + }); global_store.remove(gateway_name.as_ref()) } @@ -213,7 +222,10 @@ impl RunningSgGateway { let service = create_router_service(gateway_name.to_string().into(), http_routes)?; let reloader = { let store = Self::global_store(); - let global_store = store.lock().expect("poisoned lock"); + let global_store = store.lock().unwrap_or_else(|e| { + tracing::warn!("[SG.Server] recovering from poisoned global gateway store lock during update"); + e.into_inner() + }); if let Some(gw) = global_store.get(gateway_name) { gw.reloader.clone() } else { @@ -233,11 +245,15 @@ impl RunningSgGateway { { if let Some(url) = &config_item.gateway.parameters.redis_url { let url: Arc = url.clone().into(); - // builder_ext.insert(crate::extension::redis_url::RedisUrl(url.clone())); - // builder_ext.insert(spacegate_kernel::extension::GatewayName(config.gateway.name.clone().into())); - // Initialize cache instances - tracing::trace!("Initialize cache client...url:{url}"); - spacegate_ext_redis::RedisClientRepo::global().add(&config_item.gateway.name, url.as_ref()); + // Initialize cache instances. Avoid logging the raw URL because it may contain credentials. + tracing::trace!(gateway = %config_item.gateway.name, "Initialize cache client"); + match spacegate_ext_redis::RedisClient::try_new(url.as_ref()) { + Ok(client) => spacegate_ext_redis::RedisClientRepo::global().add(&config_item.gateway.name, client), + Err(e) => { + tracing::error!(gateway = %config_item.gateway.name, error = %e, "failed to initialize redis client"); + return Err(Box::new(e)); + } + } } } tracing::info!("[SG.Server] start gateway");