diff --git a/modules/backup/1module.go b/modules/backup/1module.go index bcd1c1e7..42a5e77e 100644 --- a/modules/backup/1module.go +++ b/modules/backup/1module.go @@ -1,9 +1,30 @@ +// Package backup is a tombstone for the removed built-in WuKongIM backup +// module (issue #139). The Go code (cron scheduler, /v1/manager/backup/* +// API, tar+COS storage, disk precheck — ~1.6k LoC) has been deleted; only +// the two historical SQL migrations remain so sql-migrate continues to +// recognise their IDs. +// +// Why a tombstone instead of deleting the SQL files outright: +// - Deleting them would leave orphaned IDs in gorp_migrations on existing +// deployments — sql-migrate's PlanMigration stage panics with "unknown +// migration in database" the moment it sees an ID with no embedded file. +// - Purging those rows at startup would let a rollback to a pre-removal +// release silently re-run the original CREATE TABLE statements (no +// IF NOT EXISTS) and fail at startup with MySQL 1050 because +// backup_config / backup_history are intentionally kept on disk. +// +// SetupAPI is intentionally nil — no HTTP surface is registered. Only the +// SQLDir is exposed so the embedded migrations stay discoverable. +// +// The backup_config / backup_history tables themselves are deliberately +// left in place (per issue #139's removal plan) so existing production +// rows are not lost. A future change may add a drop migration once we are +// confident no deployment depends on the data. package backup import ( "embed" - "github.com/Mininglamp-OSS/octo-lib/config" "github.com/Mininglamp-OSS/octo-lib/pkg/register" ) @@ -13,10 +34,7 @@ var sqlFS embed.FS func init() { register.AddModule(func(ctx interface{}) register.Module { return register.Module{ - Name: "backup", - SetupAPI: func() register.APIRouter { - return NewManager(ctx.(*config.Context)) - }, + Name: "backup", SQLDir: register.NewSQLFS(sqlFS), } }) diff --git a/modules/backup/api_manager.go b/modules/backup/api_manager.go deleted file mode 100644 index dfe08dd7..00000000 --- a/modules/backup/api_manager.go +++ /dev/null @@ -1,309 +0,0 @@ -package backup - -import ( - "strconv" - - "github.com/Mininglamp-OSS/octo-lib/config" - "github.com/Mininglamp-OSS/octo-lib/pkg/log" - "github.com/Mininglamp-OSS/octo-lib/pkg/wkhttp" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -// Manager 备份管理 -type Manager struct { - ctx *config.Context - db *backupDB - service *Service - scheduler *Scheduler - log.Log -} - -// NewManager 创建备份管理 -func NewManager(ctx *config.Context) *Manager { - db := newBackupDB(ctx) - service := NewService(ctx, db) - scheduler := NewScheduler(service) - - m := &Manager{ - ctx: ctx, - db: db, - service: service, - scheduler: scheduler, - Log: log.NewTLog("BackupManager"), - } - - // 启动定时调度器 - if err := scheduler.Start(); err != nil { - m.Error("failed to start backup scheduler", zap.Error(err)) - } - - return m -} - -// Route 配置路由规则 -func (m *Manager) Route(r *wkhttp.WKHttp) { - auth := r.Group("/v1/manager", m.ctx.AuthMiddleware(r)) - { - // 备份配置 - auth.GET("/backup/config", m.getConfig) - auth.PUT("/backup/config", m.updateConfig) - auth.POST("/backup/config/test", m.testConnection) - - // 备份操作 - auth.POST("/backup/trigger", m.triggerBackup) - auth.GET("/backup/history", m.getHistory) - auth.DELETE("/backup/history/:id", m.deleteHistory) - auth.GET("/backup/history/:id/download", m.getDownloadURL) - - // 状态 - auth.GET("/backup/status", m.getStatus) - } -} - -// getConfig 获取备份配置 -func (m *Manager) getConfig(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - cfg, err := m.service.GetConfig() - if err != nil { - m.Error("failed to get backup config", zap.Error(err)) - c.ResponseError(errors.New("获取备份配置失败")) - return - } - - // 从系统配置获取 COS 信息(只读展示) - cos := m.ctx.GetConfig().COS - - if cfg == nil { - // 返回默认配置 - c.Response(&BackupConfigResp{ - Enabled: false, - Prefix: "backup/", - CronExpr: "0 2 * * *", - RetentionCount: 7, - DataDir: "/data/wukongim", - // 只读的系统 COS 配置 - StorageType: "cos", - Bucket: cos.Bucket, - Region: cos.Region, - }) - return - } - - c.Response(&BackupConfigResp{ - Enabled: cfg.Enabled, - Prefix: cfg.Prefix, - CronExpr: cfg.CronExpr, - RetentionCount: cfg.RetentionCount, - DataDir: cfg.DataDir, - // 只读的系统 COS 配置 - StorageType: "cos", - Bucket: cos.Bucket, - Region: cos.Region, - }) -} - -// updateConfig 更新备份配置 -func (m *Manager) updateConfig(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - var req BackupConfigReq - if err := c.BindJSON(&req); err != nil { - c.ResponseError(errors.New("参数错误")) - return - } - - // 获取现有配置 - existingCfg, err := m.service.GetConfig() - if err != nil { - m.Error("failed to get existing config", zap.Error(err)) - c.ResponseError(errors.New("获取配置失败")) - return - } - - // 合并配置(存储配置复用系统 COS,这里只处理备份相关配置) - cfg := &BackupConfig{ - Prefix: req.Prefix, - CronExpr: req.CronExpr, - DataDir: req.DataDir, - } - - if req.Enabled != nil { - cfg.Enabled = *req.Enabled - } else if existingCfg != nil { - cfg.Enabled = existingCfg.Enabled - } - - if req.RetentionCount != nil { - cfg.RetentionCount = *req.RetentionCount - } else if existingCfg != nil { - cfg.RetentionCount = existingCfg.RetentionCount - } else { - cfg.RetentionCount = 7 - } - - // 设置默认值 - if cfg.Prefix == "" { - cfg.Prefix = "backup/" - } - if cfg.CronExpr == "" { - cfg.CronExpr = "0 2 * * *" - } - if cfg.DataDir == "" { - cfg.DataDir = "/data/wukongim" - } - - // 验证 cron 表达式 - if err := ValidateCronExpr(cfg.CronExpr); err != nil { - c.ResponseError(errors.New("无效的 cron 表达式: " + err.Error())) - return - } - - // 保存配置 - if err := m.service.SaveConfig(cfg); err != nil { - m.Error("failed to save backup config", zap.Error(err)) - c.ResponseError(errors.New("保存配置失败")) - return - } - - // 更新定时任务 - if err := m.scheduler.UpdateSchedule(cfg.CronExpr, cfg.Enabled); err != nil { - m.Warn("failed to update scheduler", zap.Error(err)) - } - - c.ResponseOK() -} - -// testConnection 测试存储连接(测试系统 COS 配置) -func (m *Manager) testConnection(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - err := m.service.TestConnection() - if err != nil { - m.Error("connection test failed", zap.Error(err)) - c.ResponseError(errors.New("连接测试失败: " + err.Error())) - return - } - - c.ResponseOK() -} - -// triggerBackup 手动触发备份 -func (m *Manager) triggerBackup(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - backupID, err := m.service.TriggerBackup() - if err != nil { - m.Error("failed to trigger backup", zap.Error(err)) - c.ResponseError(errors.New("触发备份失败: " + err.Error())) - return - } - - c.Response(map[string]string{ - "backup_id": backupID, - "message": "备份已开始,请稍后查看备份历史", - }) -} - -// getHistory 获取备份历史 -func (m *Manager) getHistory(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - pageIndex, _ := strconv.Atoi(c.DefaultQuery("page_index", "1")) - pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) - - if pageIndex < 1 { - pageIndex = 1 - } - if pageSize < 1 || pageSize > 100 { - pageSize = 20 - } - - list, count, err := m.service.GetHistoryList(pageIndex, pageSize) - if err != nil { - m.Error("failed to get backup history", zap.Error(err)) - c.ResponseError(errors.New("获取备份历史失败")) - return - } - - c.Response(map[string]interface{}{ - "list": list, - "count": count, - }) -} - -// deleteHistory 删除备份历史 -func (m *Manager) deleteHistory(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.ResponseError(errors.New("无效的ID")) - return - } - - if err := m.service.DeleteHistory(id); err != nil { - m.Error("failed to delete backup history", zap.Error(err)) - c.ResponseError(errors.New("删除备份失败")) - return - } - - c.ResponseOK() -} - -// getDownloadURL 获取下载链接 -func (m *Manager) getDownloadURL(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.ResponseError(errors.New("无效的ID")) - return - } - - url, err := m.service.GetDownloadURL(id) - if err != nil { - m.Error("failed to get download URL", zap.Error(err)) - c.ResponseError(errors.New("获取下载链接失败: " + err.Error())) - return - } - - c.Response(map[string]string{ - "url": url, - }) -} - -// getStatus 获取备份状态 -func (m *Manager) getStatus(c *wkhttp.Context) { - if err := c.CheckLoginRoleIsSuperAdmin(); err != nil { - c.ResponseError(err) - return - } - - c.Response(map[string]interface{}{ - "is_running": m.service.IsRunning(), - "next_run": m.scheduler.GetNextRun(), - }) -} diff --git a/modules/backup/db.go b/modules/backup/db.go deleted file mode 100644 index 85bc62d7..00000000 --- a/modules/backup/db.go +++ /dev/null @@ -1,239 +0,0 @@ -package backup - -import ( - "time" - - "github.com/Mininglamp-OSS/octo-lib/config" - "github.com/gocraft/dbr/v2" -) - -type backupDB struct { - session *dbr.Session - ctx *config.Context -} - -func newBackupDB(ctx *config.Context) *backupDB { - return &backupDB{ - session: ctx.DB(), - ctx: ctx, - } -} - -// ==================== BackupConfig ==================== - -// GetConfig 获取备份配置 -func (d *backupDB) GetConfig() (*BackupConfig, error) { - var m *backupConfigModel - _, err := d.session.Select("*").From("backup_config").OrderDesc("id").Limit(1).Load(&m) - if err != nil { - return nil, err - } - if m == nil { - return nil, nil - } - return m.toBackupConfig(), nil -} - -// SaveConfig 保存备份配置 -func (d *backupDB) SaveConfig(cfg *BackupConfig) error { - existing, err := d.GetConfig() - if err != nil { - return err - } - - m := &backupConfigModel{ - Enabled: boolToInt(cfg.Enabled), - Prefix: cfg.Prefix, - CronExpr: cfg.CronExpr, - RetentionCount: cfg.RetentionCount, - DataDir: cfg.DataDir, - } - - if existing == nil { - _, err = d.session.InsertInto("backup_config").Columns( - "enabled", "prefix", "cron_expr", "retention_count", "data_dir", - ).Values( - m.Enabled, m.Prefix, m.CronExpr, m.RetentionCount, m.DataDir, - ).Exec() - } else { - _, err = d.session.Update("backup_config").SetMap(map[string]interface{}{ - "enabled": m.Enabled, - "prefix": m.Prefix, - "cron_expr": m.CronExpr, - "retention_count": m.RetentionCount, - "data_dir": m.DataDir, - }).Where("id=?", existing.ID).Exec() - } - return err -} - -// ==================== BackupHistory ==================== - -// CreateHistory 创建备份历史记录 -func (d *backupDB) CreateHistory(backupID, status string) error { - now := time.Now() - _, err := d.session.InsertInto("backup_history").Columns( - "backup_id", "status", "started_at", - ).Values( - backupID, status, now, - ).Exec() - return err -} - -// UpdateHistoryStatus 更新备份状态 -func (d *backupDB) UpdateHistoryStatus(backupID, status, errorMsg string) error { - updateMap := map[string]interface{}{ - "status": status, - } - if errorMsg != "" { - updateMap["error_message"] = errorMsg - } - if status == BackupStatusFailed || status == BackupStatusSuccess { - updateMap["finished_at"] = time.Now() - } - _, err := d.session.Update("backup_history").SetMap(updateMap).Where("backup_id=?", backupID).Exec() - return err -} - -// UpdateHistorySuccess 更新备份成功状态 -func (d *backupDB) UpdateHistorySuccess(backupID, fileName, storagePath string, fileSize int64) error { - now := time.Now() - _, err := d.session.Update("backup_history").SetMap(map[string]interface{}{ - "status": BackupStatusSuccess, - "file_name": fileName, - "storage_path": storagePath, - "file_size": fileSize, - "finished_at": now, - }).Where("backup_id=?", backupID).Exec() - return err -} - -// GetHistoryList 获取备份历史列表 -func (d *backupDB) GetHistoryList(pageIndex, pageSize int) ([]*BackupHistory, error) { - var models []*backupHistoryModel - _, err := d.session.Select("*").From("backup_history"). - OrderDesc("created_at"). - Offset(uint64((pageIndex - 1) * pageSize)). - Limit(uint64(pageSize)). - Load(&models) - if err != nil { - return nil, err - } - - result := make([]*BackupHistory, len(models)) - for i, m := range models { - result[i] = m.toBackupHistory() - } - return result, nil -} - -// GetHistoryCount 获取备份历史总数 -func (d *backupDB) GetHistoryCount() (int64, error) { - var count int64 - err := d.session.Select("count(*)").From("backup_history").LoadOne(&count) - return count, err -} - -// GetHistoryByID 根据ID获取备份历史 -func (d *backupDB) GetHistoryByID(id int64) (*BackupHistory, error) { - var m *backupHistoryModel - _, err := d.session.Select("*").From("backup_history").Where("id=?", id).Load(&m) - if err != nil { - return nil, err - } - if m == nil { - return nil, nil - } - return m.toBackupHistory(), nil -} - -// DeleteHistory 删除备份历史 -func (d *backupDB) DeleteHistory(id int64) error { - _, err := d.session.DeleteFrom("backup_history").Where("id=?", id).Exec() - return err -} - -// GetOldestHistories 获取最旧的备份记录(用于清理) -func (d *backupDB) GetOldestHistories(keepCount int) ([]*BackupHistory, error) { - var models []*backupHistoryModel - _, err := d.session.Select("*").From("backup_history"). - Where("status=?", BackupStatusSuccess). - OrderDesc("created_at"). - Offset(uint64(keepCount)). - Load(&models) - if err != nil { - return nil, err - } - - result := make([]*BackupHistory, len(models)) - for i, m := range models { - result[i] = m.toBackupHistory() - } - return result, nil -} - -// ==================== Internal Models ==================== - -type backupConfigModel struct { - ID int64 `db:"id"` - Enabled int `db:"enabled"` - Prefix string `db:"prefix"` - CronExpr string `db:"cron_expr"` - RetentionCount int `db:"retention_count"` - DataDir string `db:"data_dir"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` -} - -func (m *backupConfigModel) toBackupConfig() *BackupConfig { - return &BackupConfig{ - ID: m.ID, - Enabled: m.Enabled == 1, - Prefix: m.Prefix, - CronExpr: m.CronExpr, - RetentionCount: m.RetentionCount, - DataDir: m.DataDir, - CreatedAt: m.CreatedAt, - UpdatedAt: m.UpdatedAt, - } -} - -type backupHistoryModel struct { - ID int64 `db:"id"` - BackupID string `db:"backup_id"` - Status string `db:"status"` - FileName string `db:"file_name"` - FileSize int64 `db:"file_size"` - StoragePath string `db:"storage_path"` - StartedAt *time.Time `db:"started_at"` - FinishedAt *time.Time `db:"finished_at"` - ErrorMessage *string `db:"error_message"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` -} - -func (m *backupHistoryModel) toBackupHistory() *BackupHistory { - h := &BackupHistory{ - ID: m.ID, - BackupID: m.BackupID, - Status: m.Status, - FileName: m.FileName, - FileSize: m.FileSize, - StoragePath: m.StoragePath, - StartedAt: m.StartedAt, - FinishedAt: m.FinishedAt, - CreatedAt: m.CreatedAt, - UpdatedAt: m.UpdatedAt, - } - if m.ErrorMessage != nil { - h.ErrorMessage = *m.ErrorMessage - } - return h -} - -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 -} diff --git a/modules/backup/diskcheck.go b/modules/backup/diskcheck.go deleted file mode 100644 index 048f8351..00000000 --- a/modules/backup/diskcheck.go +++ /dev/null @@ -1,87 +0,0 @@ -package backup - -import ( - "fmt" - "os" - "path/filepath" - "syscall" -) - -// DiskChecker 磁盘空间检查器 -type DiskChecker struct{} - -// NewDiskChecker 创建磁盘检查器 -func NewDiskChecker() *DiskChecker { - return &DiskChecker{} -} - -// CheckAvailableSpace 检查目录可用空间是否满足需求 -// path: 要检查的目录路径 -// requiredBytes: 需要的字节数 -// 返回: 可用空间(bytes), 是否满足需求, error -func (d *DiskChecker) CheckAvailableSpace(path string, requiredBytes int64) (available int64, sufficient bool, err error) { - var stat syscall.Statfs_t - if err := syscall.Statfs(path, &stat); err != nil { - return 0, false, fmt.Errorf("failed to get disk stats: %w", err) - } - - // 可用空间 = 可用块数 * 块大小 - available = int64(stat.Bavail) * int64(stat.Bsize) - sufficient = available >= requiredBytes - - return available, sufficient, nil -} - -// EstimateArchiveSize 估算压缩包大小 -// sourcePath: 源目录路径 -// compressionRatio: 压缩比 (0.0-1.0, 例如 0.5 表示压缩后为原大小的 50%) -func (d *DiskChecker) EstimateArchiveSize(sourcePath string, compressionRatio float64) (int64, error) { - var totalSize int64 - - err := filepath.Walk(sourcePath, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - totalSize += info.Size() - } - return nil - }) - - if err != nil { - return 0, fmt.Errorf("failed to calculate directory size: %w", err) - } - - // 应用压缩比 - estimatedSize := int64(float64(totalSize) * compressionRatio) - - // 至少需要 1MB 的安全余量 - const safetyMargin = 1024 * 1024 - return estimatedSize + safetyMargin, nil -} - -// CheckBeforeBackup 备份前检查磁盘空间 -// sourcePath: 源数据目录 -// tempPath: 临时文件目录 (通常是 os.TempDir()) -// 返回: error 如果空间不足 -func (d *DiskChecker) CheckBeforeBackup(sourcePath, tempPath string) error { - // 估算压缩包大小 (假设 gzip 压缩比为 30%) - estimatedSize, err := d.EstimateArchiveSize(sourcePath, 0.3) - if err != nil { - return err - } - - // 检查临时目录空间 - available, sufficient, err := d.CheckAvailableSpace(tempPath, estimatedSize) - if err != nil { - return err - } - - if !sufficient { - return fmt.Errorf("insufficient disk space: need %s, available %s", - FormatFileSize(estimatedSize), - FormatFileSize(available)) - } - - return nil -} diff --git a/modules/backup/diskcheck_test.go b/modules/backup/diskcheck_test.go deleted file mode 100644 index f99db75d..00000000 --- a/modules/backup/diskcheck_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package backup - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDiskChecker_CheckAvailableSpace(t *testing.T) { - checker := NewDiskChecker() - - // 检查临时目录(应该存在且有空间) - tmpDir := os.TempDir() - available, sufficient, err := checker.CheckAvailableSpace(tmpDir, 1024) // 需要 1KB - - assert.NoError(t, err) - assert.True(t, available > 0, "should have available space") - assert.True(t, sufficient, "1KB should be sufficient") -} - -func TestDiskChecker_CheckAvailableSpace_InsufficientSpace(t *testing.T) { - checker := NewDiskChecker() - - tmpDir := os.TempDir() - // 请求一个超大空间 (1 PB) - _, sufficient, err := checker.CheckAvailableSpace(tmpDir, 1024*1024*1024*1024*1024) - - assert.NoError(t, err) - assert.False(t, sufficient, "1PB should not be sufficient") -} - -func TestDiskChecker_CheckAvailableSpace_InvalidPath(t *testing.T) { - checker := NewDiskChecker() - - _, _, err := checker.CheckAvailableSpace("/nonexistent/path/12345", 1024) - - assert.Error(t, err) -} - -func TestDiskChecker_EstimateArchiveSize(t *testing.T) { - checker := NewDiskChecker() - - // 创建临时目录和文件 - tmpDir, err := os.MkdirTemp("", "diskcheck_test") - assert.NoError(t, err) - defer os.RemoveAll(tmpDir) - - // 创建一个 1KB 的文件 - testFile := filepath.Join(tmpDir, "test.txt") - data := make([]byte, 1024) - err = os.WriteFile(testFile, data, 0644) - assert.NoError(t, err) - - // 估算大小 (30% 压缩比) - estimatedSize, err := checker.EstimateArchiveSize(tmpDir, 0.3) - assert.NoError(t, err) - - // 估算大小应该是 1024 * 0.3 + 1MB 安全余量 ≈ 1MB+ - assert.True(t, estimatedSize > 1024*1024, "should include safety margin") - assert.True(t, estimatedSize < 2*1024*1024, "should not be too large") -} - -func TestDiskChecker_EstimateArchiveSize_EmptyDir(t *testing.T) { - checker := NewDiskChecker() - - // 创建空临时目录 - tmpDir, err := os.MkdirTemp("", "diskcheck_empty") - assert.NoError(t, err) - defer os.RemoveAll(tmpDir) - - estimatedSize, err := checker.EstimateArchiveSize(tmpDir, 0.3) - assert.NoError(t, err) - - // 空目录只有安全余量 - assert.Equal(t, int64(1024*1024), estimatedSize) -} - -func TestDiskChecker_EstimateArchiveSize_InvalidPath(t *testing.T) { - checker := NewDiskChecker() - - _, err := checker.EstimateArchiveSize("/nonexistent/path/12345", 0.3) - assert.Error(t, err) -} - -func TestDiskChecker_CheckBeforeBackup(t *testing.T) { - checker := NewDiskChecker() - - // 创建临时目录和小文件 - tmpDir, err := os.MkdirTemp("", "diskcheck_backup") - assert.NoError(t, err) - defer os.RemoveAll(tmpDir) - - testFile := filepath.Join(tmpDir, "test.txt") - err = os.WriteFile(testFile, []byte("hello"), 0644) - assert.NoError(t, err) - - // 检查应该通过 - err = checker.CheckBeforeBackup(tmpDir, os.TempDir()) - assert.NoError(t, err) -} - -func TestDiskChecker_CheckBeforeBackup_InvalidSourcePath(t *testing.T) { - checker := NewDiskChecker() - - err := checker.CheckBeforeBackup("/nonexistent/path", os.TempDir()) - assert.Error(t, err) -} diff --git a/modules/backup/model.go b/modules/backup/model.go deleted file mode 100644 index 2f97401b..00000000 --- a/modules/backup/model.go +++ /dev/null @@ -1,75 +0,0 @@ -package backup - -import "time" - -// BackupConfig 备份配置模型(存储配置复用 ctx.GetConfig().COS) -type BackupConfig struct { - ID int64 `json:"id"` - Enabled bool `json:"enabled"` - Prefix string `json:"prefix"` // 备份路径前缀 - CronExpr string `json:"cron_expr"` // cron表达式 - RetentionCount int `json:"retention_count"` // 保留数量 - DataDir string `json:"data_dir"` // WuKongIM 数据目录 - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` -} - -// BackupHistory 备份历史模型 -type BackupHistory struct { - ID int64 `json:"id"` - BackupID string `json:"backup_id"` - Status string `json:"status"` // pending/running/success/failed - FileName string `json:"file_name"` - FileSize int64 `json:"file_size"` - StoragePath string `json:"storage_path"` - StartedAt *time.Time `json:"started_at"` - FinishedAt *time.Time `json:"finished_at"` - ErrorMessage string `json:"error_message"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` -} - -// BackupConfigReq 备份配置请求 -type BackupConfigReq struct { - Enabled *bool `json:"enabled,omitempty"` - Prefix string `json:"prefix,omitempty"` - CronExpr string `json:"cron_expr,omitempty"` - RetentionCount *int `json:"retention_count,omitempty"` - DataDir string `json:"data_dir,omitempty"` -} - -// BackupConfigResp 备份配置响应 -type BackupConfigResp struct { - Enabled bool `json:"enabled"` - Prefix string `json:"prefix"` - CronExpr string `json:"cron_expr"` - RetentionCount int `json:"retention_count"` - DataDir string `json:"data_dir"` - // 以下字段从 ctx.GetConfig() 读取,只读展示 - StorageType string `json:"storage_type"` - Bucket string `json:"bucket"` - Region string `json:"region"` -} - -// BackupHistoryResp 备份历史响应 -type BackupHistoryResp struct { - ID int64 `json:"id"` - BackupID string `json:"backup_id"` - Status string `json:"status"` - FileName string `json:"file_name"` - FileSize int64 `json:"file_size"` - FileSizeStr string `json:"file_size_str"` - StoragePath string `json:"storage_path"` - StartedAt string `json:"started_at,omitempty"` - FinishedAt string `json:"finished_at,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` - CreatedAt string `json:"created_at"` -} - -// BackupStatus 备份状态常量 -const ( - BackupStatusPending = "pending" - BackupStatusRunning = "running" - BackupStatusSuccess = "success" - BackupStatusFailed = "failed" -) diff --git a/modules/backup/scheduler.go b/modules/backup/scheduler.go deleted file mode 100644 index 243b8ee1..00000000 --- a/modules/backup/scheduler.go +++ /dev/null @@ -1,144 +0,0 @@ -package backup - -import ( - "sync" - - "github.com/Mininglamp-OSS/octo-lib/pkg/log" - "github.com/robfig/cron/v3" - "go.uber.org/zap" -) - -// Scheduler 定时调度器 -type Scheduler struct { - log.Log - service *Service - cron *cron.Cron - entryID cron.EntryID - mu sync.Mutex - started bool -} - -// NewScheduler 创建调度器 -func NewScheduler(service *Service) *Scheduler { - return &Scheduler{ - Log: log.NewTLog("BackupScheduler"), - service: service, - cron: cron.New(), - } -} - -// Start 启动调度器 -func (s *Scheduler) Start() error { - s.mu.Lock() - defer s.mu.Unlock() - - if s.started { - return nil - } - - cfg, err := s.service.GetConfig() - if err != nil { - s.Warn("failed to get backup config", zap.Error(err)) - return nil - } - - if cfg == nil || !cfg.Enabled { - s.Info("backup is disabled, scheduler not started") - return nil - } - - // 添加定时任务(走 TriggerBackup 确保并发检查) - entryID, err := s.cron.AddFunc(cfg.CronExpr, func() { - s.Info("scheduled backup triggered", zap.String("cron", cfg.CronExpr)) - if _, err := s.service.TriggerBackup(); err != nil { - s.Error("scheduled backup skipped or failed", zap.Error(err)) - } - }) - if err != nil { - return err - } - - s.entryID = entryID - s.cron.Start() - s.started = true - - s.Info("backup scheduler started", zap.String("cron", cfg.CronExpr)) - return nil -} - -// Stop 停止调度器 -func (s *Scheduler) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - - if !s.started { - return - } - - s.cron.Stop() - s.started = false - s.Info("backup scheduler stopped") -} - -// UpdateSchedule 更新调度计划 -func (s *Scheduler) UpdateSchedule(cronExpr string, enabled bool) error { - s.mu.Lock() - defer s.mu.Unlock() - - // 移除旧任务 - if s.entryID != 0 { - s.cron.Remove(s.entryID) - s.entryID = 0 - } - - if !enabled { - s.Info("backup disabled, scheduler stopped") - return nil - } - - // 添加新任务(走 TriggerBackup 确保并发检查) - entryID, err := s.cron.AddFunc(cronExpr, func() { - s.Info("scheduled backup triggered", zap.String("cron", cronExpr)) - if _, err := s.service.TriggerBackup(); err != nil { - s.Error("scheduled backup skipped or failed", zap.Error(err)) - } - }) - if err != nil { - return err - } - - s.entryID = entryID - - // 如果 cron 未启动,启动它 - if !s.started { - s.cron.Start() - s.started = true - } - - s.Info("backup schedule updated", zap.String("cron", cronExpr)) - return nil -} - -// ValidateCronExpr 验证 cron 表达式 -func ValidateCronExpr(cronExpr string) error { - parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) - _, err := parser.Parse(cronExpr) - return err -} - -// GetNextRun 获取下次执行时间 -func (s *Scheduler) GetNextRun() string { - s.mu.Lock() - defer s.mu.Unlock() - - if s.entryID == 0 { - return "" - } - - entry := s.cron.Entry(s.entryID) - if entry.ID == 0 { - return "" - } - - return entry.Next.Format("2006-01-02 15:04:05") -} diff --git a/modules/backup/service.go b/modules/backup/service.go deleted file mode 100644 index 8801fafe..00000000 --- a/modules/backup/service.go +++ /dev/null @@ -1,407 +0,0 @@ -package backup - -import ( - "archive/tar" - "compress/gzip" - "context" - "fmt" - "io" - "os" - "path" - "path/filepath" - "sync" - "time" - - "github.com/Mininglamp-OSS/octo-lib/config" - "github.com/Mininglamp-OSS/octo-lib/pkg/log" - "github.com/google/uuid" - "go.uber.org/zap" -) - -// Service 备份服务 -type Service struct { - log.Log - ctx *config.Context - db *backupDB - storage IStorage - cfg *BackupConfig - isRunning bool - mu sync.Mutex -} - -// NewService 创建备份服务 -func NewService(ctx *config.Context, db *backupDB) *Service { - return &Service{ - Log: log.NewTLog("BackupService"), - ctx: ctx, - db: db, - } -} - -// GetConfig 获取备份配置 -func (s *Service) GetConfig() (*BackupConfig, error) { - return s.db.GetConfig() -} - -// SaveConfig 保存备份配置 -func (s *Service) SaveConfig(cfg *BackupConfig) error { - return s.db.SaveConfig(cfg) -} - -// TestConnection 测试存储连接(使用系统 COS 配置) -func (s *Service) TestConnection() error { - cos := s.ctx.GetConfig().COS - if cos.Bucket == "" { - return fmt.Errorf("COS Bucket 未配置") - } - if cos.SecretID == "" || cos.SecretKey == "" { - return fmt.Errorf("COS AccessKey/SecretKey 未配置") - } - if cos.Region == "" { - return fmt.Errorf("COS Region 未配置") - } - - storage, err := NewCOSStorage(&StorageConfig{ - Bucket: cos.Bucket, - AccessKey: cos.SecretID, - SecretKey: cos.SecretKey, - Region: cos.Region, - }) - if err != nil { - return err - } - - // 1 分钟超时 - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - return storage.TestConnection(ctx) -} - -// TriggerBackup 手动触发备份 -func (s *Service) TriggerBackup() (string, error) { - s.mu.Lock() - if s.isRunning { - s.mu.Unlock() - return "", fmt.Errorf("backup is already running") - } - s.isRunning = true - s.mu.Unlock() - - backupID := uuid.New().String() - - // 异步执行备份 - go func() { - defer func() { - s.mu.Lock() - s.isRunning = false - s.mu.Unlock() - }() - - if err := s.ExecuteBackup(backupID); err != nil { - s.Error("backup failed", zap.String("backupID", backupID), zap.Error(err)) - } - }() - - return backupID, nil -} - -// ExecuteBackup 执行备份 -func (s *Service) ExecuteBackup(backupID string) error { - s.Info("starting backup", zap.String("backupID", backupID)) - - // 创建带超时的 context(30 分钟) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() - - // 1. 获取配置 - cfg, err := s.db.GetConfig() - if err != nil { - return fmt.Errorf("failed to get config: %w", err) - } - if cfg == nil { - return fmt.Errorf("backup config not found") - } - - // 2. 创建备份记录 - if err := s.db.CreateHistory(backupID, BackupStatusRunning); err != nil { - return fmt.Errorf("failed to create history: %w", err) - } - - // 3. 检查数据目录 - dataDir := cfg.DataDir - if _, err := os.Stat(dataDir); os.IsNotExist(err) { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("data directory not found: %s", dataDir)) - return fmt.Errorf("data directory not found: %s", dataDir) - } - - // 4. 检查磁盘空间 - tmpDir := os.TempDir() - diskChecker := NewDiskChecker() - if err := diskChecker.CheckBeforeBackup(dataDir, tmpDir); err != nil { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("disk space check failed: %v", err)) - return fmt.Errorf("disk space check failed: %w", err) - } - - // 5. 创建临时文件 - fileName := fmt.Sprintf("wukongim-%s.tar.gz", time.Now().Format("20060102-150405")) - localPath := filepath.Join(tmpDir, fileName) - - // 6. 打包数据目录 - s.Info("creating tar.gz archive", zap.String("source", dataDir), zap.String("dest", localPath)) - if err := s.createTarGz(dataDir, localPath); err != nil { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("failed to create archive: %v", err)) - return fmt.Errorf("failed to create archive: %w", err) - } - defer os.Remove(localPath) - - // 7. 获取文件大小 - fileInfo, err := os.Stat(localPath) - if err != nil { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("failed to stat archive: %v", err)) - return fmt.Errorf("failed to stat archive: %w", err) - } - fileSize := fileInfo.Size() - s.Info("archive created", zap.String("file", fileName), zap.Int64("size", fileSize)) - - // 8. 创建存储实例(复用 ctx.GetConfig().COS 配置) - storage, err := s.createStorage(cfg.Prefix) - if err != nil { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("failed to create storage: %v", err)) - return fmt.Errorf("failed to create storage: %w", err) - } - - // 9. 上传到存储 - s.Info("uploading archive", zap.String("file", fileName)) - if err := storage.Upload(ctx, localPath, fileName); err != nil { - s.db.UpdateHistoryStatus(backupID, BackupStatusFailed, fmt.Sprintf("failed to upload: %v", err)) - return fmt.Errorf("failed to upload: %w", err) - } - - // 10. 更新备份记录 - remotePath := path.Join(cfg.Prefix, fileName) - if err := s.db.UpdateHistorySuccess(backupID, fileName, remotePath, fileSize); err != nil { - return fmt.Errorf("failed to update history: %w", err) - } - - // 11. 清理旧备份 - if err := s.cleanupOldBackups(ctx, storage, cfg.RetentionCount); err != nil { - s.Warn("failed to cleanup old backups", zap.Error(err)) - } - - s.Info("backup completed successfully", zap.String("backupID", backupID), zap.String("file", fileName)) - return nil -} - -// createTarGz 创建 tar.gz 压缩包 -func (s *Service) createTarGz(source, target string) error { - tarFile, err := os.Create(target) - if err != nil { - return err - } - defer tarFile.Close() - - gzWriter := gzip.NewWriter(tarFile) - defer gzWriter.Close() - - tarWriter := tar.NewWriter(gzWriter) - defer tarWriter.Close() - - baseDir := filepath.Base(source) - - return filepath.Walk(source, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - // 创建 tar header - header, err := tar.FileInfoHeader(info, "") - if err != nil { - return err - } - - // 设置相对路径 - relPath, err := filepath.Rel(source, path) - if err != nil { - return err - } - if relPath == "." { - header.Name = baseDir - } else { - header.Name = filepath.Join(baseDir, relPath) - } - - // 写入 header - if err := tarWriter.WriteHeader(header); err != nil { - return err - } - - // 如果是文件,写入内容 - if !info.IsDir() { - if err := s.writeFileToTar(tarWriter, path); err != nil { - return err - } - } - - return nil - }) -} - -// writeFileToTar 写入单个文件到 tar(确保文件句柄及时关闭) -func (s *Service) writeFileToTar(tarWriter *tar.Writer, path string) error { - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - - _, err = io.Copy(tarWriter, file) - return err -} - -// cleanupOldBackups 清理旧备份 -func (s *Service) cleanupOldBackups(ctx context.Context, storage IStorage, keepCount int) error { - // 获取需要删除的旧备份 - oldBackups, err := s.db.GetOldestHistories(keepCount) - if err != nil { - return err - } - - for _, backup := range oldBackups { - s.Info("deleting old backup", zap.String("backupID", backup.BackupID), zap.String("file", backup.FileName)) - - // 从存储删除 - if err := storage.Delete(ctx, backup.FileName); err != nil { - s.Warn("failed to delete backup from storage", zap.String("file", backup.FileName), zap.Error(err)) - } - - // 从数据库删除记录 - if err := s.db.DeleteHistory(backup.ID); err != nil { - s.Warn("failed to delete backup history", zap.Int64("id", backup.ID), zap.Error(err)) - } - } - - return nil -} - -// GetHistoryList 获取备份历史列表 -func (s *Service) GetHistoryList(pageIndex, pageSize int) ([]*BackupHistoryResp, int64, error) { - histories, err := s.db.GetHistoryList(pageIndex, pageSize) - if err != nil { - return nil, 0, err - } - - count, err := s.db.GetHistoryCount() - if err != nil { - return nil, 0, err - } - - result := make([]*BackupHistoryResp, len(histories)) - for i, h := range histories { - result[i] = &BackupHistoryResp{ - ID: h.ID, - BackupID: h.BackupID, - Status: h.Status, - FileName: h.FileName, - FileSize: h.FileSize, - FileSizeStr: FormatFileSize(h.FileSize), - StoragePath: h.StoragePath, - CreatedAt: h.CreatedAt.Format("2006-01-02 15:04:05"), - } - if h.StartedAt != nil { - result[i].StartedAt = h.StartedAt.Format("2006-01-02 15:04:05") - } - if h.FinishedAt != nil { - result[i].FinishedAt = h.FinishedAt.Format("2006-01-02 15:04:05") - } - if h.ErrorMessage != "" { - result[i].ErrorMessage = h.ErrorMessage - } - } - - return result, count, nil -} - -// DeleteHistory 删除备份历史 -func (s *Service) DeleteHistory(id int64) error { - history, err := s.db.GetHistoryByID(id) - if err != nil { - return err - } - if history == nil { - return fmt.Errorf("backup history not found") - } - - // 获取配置以创建存储实例 - cfg, err := s.db.GetConfig() - if err == nil && cfg != nil { - storage, err := s.createStorage(cfg.Prefix) - if err == nil { - // 尝试从存储删除文件(5 分钟超时) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - if err := storage.Delete(ctx, history.FileName); err != nil { - s.Warn("failed to delete backup from storage", zap.String("file", history.FileName), zap.Error(err)) - } - } - } - - return s.db.DeleteHistory(id) -} - -// GetDownloadURL 获取下载链接 -func (s *Service) GetDownloadURL(id int64) (string, error) { - history, err := s.db.GetHistoryByID(id) - if err != nil { - return "", err - } - if history == nil { - return "", fmt.Errorf("backup history not found") - } - - cfg, err := s.db.GetConfig() - if err != nil { - return "", err - } - if cfg == nil { - return "", fmt.Errorf("backup config not found") - } - - storage, err := s.createStorage(cfg.Prefix) - if err != nil { - return "", err - } - - // 生成 1 小时有效的下载链接(1 分钟超时) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - return storage.GetPresignedURL(ctx, history.FileName, time.Hour) -} - -// IsRunning 检查是否正在运行备份 -func (s *Service) IsRunning() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.isRunning -} - -// createStorage 创建存储实例(复用 ctx.GetConfig().COS 配置) -func (s *Service) createStorage(backupPrefix string) (IStorage, error) { - cos := s.ctx.GetConfig().COS - if cos.Bucket == "" { - return nil, fmt.Errorf("COS Bucket 未配置") - } - if cos.SecretID == "" || cos.SecretKey == "" { - return nil, fmt.Errorf("COS AccessKey/SecretKey 未配置") - } - if cos.Region == "" { - return nil, fmt.Errorf("COS Region 未配置") - } - - return NewCOSStorage(&StorageConfig{ - Bucket: cos.Bucket, - AccessKey: cos.SecretID, - SecretKey: cos.SecretKey, - Region: cos.Region, - Prefix: backupPrefix, - }) -} diff --git a/modules/backup/storage.go b/modules/backup/storage.go deleted file mode 100644 index f611a51a..00000000 --- a/modules/backup/storage.go +++ /dev/null @@ -1,75 +0,0 @@ -package backup - -import ( - "context" - "fmt" - "io" - "time" -) - -// IStorage 存储接口 -type IStorage interface { - // Upload 上传文件 - Upload(ctx context.Context, localPath, remotePath string) error - // Delete 删除文件 - Delete(ctx context.Context, remotePath string) error - // GetPresignedURL 获取预签名下载URL - GetPresignedURL(ctx context.Context, remotePath string, expires time.Duration) (string, error) - // TestConnection 测试连接 - TestConnection(ctx context.Context) error - // List 列出文件 - List(ctx context.Context, prefix string) ([]string, error) -} - -// StorageConfig 存储配置(复用系统 COS 配置) -type StorageConfig struct { - Bucket string - AccessKey string - SecretKey string - Region string - Prefix string // 备份路径前缀 -} - -// FormatFileSize 格式化文件大小 -func FormatFileSize(size int64) string { - const ( - KB = 1024 - MB = 1024 * KB - GB = 1024 * MB - ) - switch { - case size >= GB: - return fmt.Sprintf("%.2f GB", float64(size)/float64(GB)) - case size >= MB: - return fmt.Sprintf("%.2f MB", float64(size)/float64(MB)) - case size >= KB: - return fmt.Sprintf("%.2f KB", float64(size)/float64(KB)) - default: - return fmt.Sprintf("%d B", size) - } -} - -// ProgressReader 进度读取器 -type ProgressReader struct { - reader io.Reader - total int64 - read int64 - onProgress func(read, total int64) -} - -func NewProgressReader(reader io.Reader, total int64, onProgress func(read, total int64)) *ProgressReader { - return &ProgressReader{ - reader: reader, - total: total, - onProgress: onProgress, - } -} - -func (pr *ProgressReader) Read(p []byte) (int, error) { - n, err := pr.reader.Read(p) - pr.read += int64(n) - if pr.onProgress != nil { - pr.onProgress(pr.read, pr.total) - } - return n, err -} diff --git a/modules/backup/storage_cos.go b/modules/backup/storage_cos.go deleted file mode 100644 index c68b06fd..00000000 --- a/modules/backup/storage_cos.go +++ /dev/null @@ -1,114 +0,0 @@ -package backup - -import ( - "context" - "fmt" - "net/url" - "path" - "time" - - "github.com/Mininglamp-OSS/octo-lib/pkg/log" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "go.uber.org/zap" -) - -// COSStorage 腾讯云 COS 存储(通过 S3 兼容协议) -type COSStorage struct { - log.Log - client *minio.Client - config *StorageConfig -} - -// NewCOSStorage 创建 COS 存储实例 -func NewCOSStorage(cfg *StorageConfig) (*COSStorage, error) { - if cfg.Region == "" { - return nil, fmt.Errorf("COS region is required") - } - - // 根据 region 构建 endpoint - endpoint := fmt.Sprintf("cos.%s.myqcloud.com", cfg.Region) - - client, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(cfg.AccessKey, cfg.SecretKey, ""), - Secure: true, - BucketLookup: minio.BucketLookupDNS, // COS 要求 virtual-hosted-style - }) - if err != nil { - return nil, fmt.Errorf("failed to create COS client: %w", err) - } - - return &COSStorage{ - Log: log.NewTLog("COSStorage"), - client: client, - config: cfg, - }, nil -} - -// Upload 上传文件到 COS -func (s *COSStorage) Upload(ctx context.Context, localPath, remotePath string) error { - fullRemotePath := path.Join(s.config.Prefix, remotePath) - s.Info("uploading file to COS", zap.String("local", localPath), zap.String("remote", fullRemotePath)) - - _, err := s.client.FPutObject(ctx, s.config.Bucket, fullRemotePath, localPath, minio.PutObjectOptions{}) - if err != nil { - return fmt.Errorf("failed to upload file: %w", err) - } - - s.Info("file uploaded successfully", zap.String("path", fullRemotePath)) - return nil -} - -// Delete 删除 COS 上的文件 -func (s *COSStorage) Delete(ctx context.Context, remotePath string) error { - fullRemotePath := path.Join(s.config.Prefix, remotePath) - err := s.client.RemoveObject(ctx, s.config.Bucket, fullRemotePath, minio.RemoveObjectOptions{}) - if err != nil { - return fmt.Errorf("failed to delete object: %w", err) - } - s.Info("file deleted", zap.String("path", fullRemotePath)) - return nil -} - -// GetPresignedURL 获取预签名下载 URL -func (s *COSStorage) GetPresignedURL(ctx context.Context, remotePath string, expires time.Duration) (string, error) { - fullRemotePath := path.Join(s.config.Prefix, remotePath) - reqParams := make(url.Values) - presignedURL, err := s.client.PresignedGetObject(ctx, s.config.Bucket, fullRemotePath, expires, reqParams) - if err != nil { - return "", fmt.Errorf("failed to generate presigned URL: %w", err) - } - return presignedURL.String(), nil -} - -// TestConnection 测试 COS 连接 -func (s *COSStorage) TestConnection(ctx context.Context) error { - exists, err := s.client.BucketExists(ctx, s.config.Bucket) - if err != nil { - return fmt.Errorf("failed to check bucket existence: %w", err) - } - if !exists { - return fmt.Errorf("bucket %s does not exist", s.config.Bucket) - } - return nil -} - -// List 列出前缀下的文件 -func (s *COSStorage) List(ctx context.Context, prefix string) ([]string, error) { - fullPrefix := path.Join(s.config.Prefix, prefix) - var objects []string - - objectCh := s.client.ListObjects(ctx, s.config.Bucket, minio.ListObjectsOptions{ - Prefix: fullPrefix, - Recursive: false, - }) - - for object := range objectCh { - if object.Err != nil { - return nil, fmt.Errorf("failed to list objects: %w", object.Err) - } - objects = append(objects, object.Key) - } - - return objects, nil -}