-
Notifications
You must be signed in to change notification settings - Fork 2
Feat/refactor search #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,11 +2,13 @@ package main | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| import ( | ||||||||||||||||||||||
| "context" | ||||||||||||||||||||||
| "flag" | ||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||
| "log" | ||||||||||||||||||||||
| "os" | ||||||||||||||||||||||
| "os/signal" | ||||||||||||||||||||||
| "strconv" | ||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||
| "syscall" | ||||||||||||||||||||||
| "time" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -22,6 +24,15 @@ import ( | |||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func main() { | ||||||||||||||||||||||
| if len(os.Args) > 1 && os.Args[1] == "scan-backfill" { | ||||||||||||||||||||||
| runScanBackfill(os.Args[2:]) | ||||||||||||||||||||||
| return | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| runWorker() | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func runWorker() { | ||||||||||||||||||||||
| // Initialise structured logging with daily rotation and 7-day retention. | ||||||||||||||||||||||
| // Worker uses a "worker" file prefix to separate logs from the API server: | ||||||||||||||||||||||
| // logs/worker-app.log – all worker messages (DEBUG+), including full SQL | ||||||||||||||||||||||
|
|
@@ -79,11 +90,13 @@ func main() { | |||||||||||||||||||||
| tmpDir = os.TempDir() + "/costrict-sync" | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| scanJobSvc := &services.ScanJobService{DB: db} | ||||||||||||||||||||||
| syncSvc := &services.SyncService{ | ||||||||||||||||||||||
| DB: db, | ||||||||||||||||||||||
| Git: &services.GitService{TempBaseDir: tmpDir}, | ||||||||||||||||||||||
| Parser: &services.ParserService{}, | ||||||||||||||||||||||
| CategorySvc: &services.CategoryService{DB: db}, | ||||||||||||||||||||||
| DB: db, | ||||||||||||||||||||||
| Git: &services.GitService{TempBaseDir: tmpDir}, | ||||||||||||||||||||||
| Parser: &services.ParserService{}, | ||||||||||||||||||||||
| ScanJobService: scanJobSvc, | ||||||||||||||||||||||
| CategorySvc: &services.CategoryService{DB: db}, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| concurrency, _ := strconv.Atoi(os.Getenv("WORKER_CONCURRENCY")) | ||||||||||||||||||||||
|
|
@@ -120,9 +133,10 @@ func main() { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| scanLLMClient := llm.NewClient(&llmCfg) | ||||||||||||||||||||||
| scanSvc := &services.ScanService{ | ||||||||||||||||||||||
| DB: db, | ||||||||||||||||||||||
| LLMClient: scanLLMClient, | ||||||||||||||||||||||
| ModelName: llmCfg.Model, | ||||||||||||||||||||||
| DB: db, | ||||||||||||||||||||||
| LLMClient: scanLLMClient, | ||||||||||||||||||||||
| ModelName: llmCfg.Model, | ||||||||||||||||||||||
| CategorySvc: &services.CategoryService{DB: db}, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| scanConcurrency, _ := strconv.Atoi(os.Getenv("SCAN_WORKER_CONCURRENCY")) | ||||||||||||||||||||||
|
|
@@ -157,6 +171,143 @@ func main() { | |||||||||||||||||||||
| log.Println("Worker pools stopped") | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| type latestRevisionRow struct { | ||||||||||||||||||||||
| ItemID string | ||||||||||||||||||||||
| LatestRevision int | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func runScanBackfill(args []string) { | ||||||||||||||||||||||
| logger.Init(logger.Config{ | ||||||||||||||||||||||
| Dir: "./logs", | ||||||||||||||||||||||
| FilePrefix: "worker", | ||||||||||||||||||||||
| MaxAgeDays: 7, | ||||||||||||||||||||||
| Console: true, | ||||||||||||||||||||||
| ConsoleLevel: "warn", | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| fs := flag.NewFlagSet("scan-backfill", flag.ExitOnError) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| var ( | ||||||||||||||||||||||
| allItems bool | ||||||||||||||||||||||
| securityStatus string | ||||||||||||||||||||||
| registryID string | ||||||||||||||||||||||
| itemType string | ||||||||||||||||||||||
| limit int | ||||||||||||||||||||||
| triggerType string | ||||||||||||||||||||||
| priority int | ||||||||||||||||||||||
| maxAttempts int | ||||||||||||||||||||||
| dryRun bool | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| fs.BoolVar(&allItems, "all", false, "enqueue all active items instead of filtering by security status") | ||||||||||||||||||||||
| fs.StringVar(&securityStatus, "security-status", "unscanned", "filter by capability_items.security_status when --all=false") | ||||||||||||||||||||||
| fs.StringVar(®istryID, "registry-id", "", "only enqueue items in the given registry") | ||||||||||||||||||||||
| fs.StringVar(&itemType, "item-type", "", "only enqueue items of the given type (skill|subagent|command|mcp)") | ||||||||||||||||||||||
| fs.IntVar(&limit, "limit", 0, "max number of items to enqueue (0 = no limit)") | ||||||||||||||||||||||
| fs.StringVar(&triggerType, "trigger-type", "manual", "scan job trigger type to record") | ||||||||||||||||||||||
| fs.IntVar(&priority, "priority", 1, "scan job priority") | ||||||||||||||||||||||
| fs.IntVar(&maxAttempts, "max-attempts", 2, "scan job max attempts") | ||||||||||||||||||||||
| fs.BoolVar(&dryRun, "dry-run", false, "preview items without inserting scan jobs") | ||||||||||||||||||||||
| _ = fs.Parse(args) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| cfg := config.Load() | ||||||||||||||||||||||
| db, err := database.Initialize(cfg.DatabaseURL) | ||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||
| log.Fatalf("failed to initialize database: %v", err) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| query := db.Model(&models.CapabilityItem{}).Where("status = ?", "active") | ||||||||||||||||||||||
| if !allItems { | ||||||||||||||||||||||
| query = query.Where("security_status = ?", securityStatus) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if registryID != "" { | ||||||||||||||||||||||
| query = query.Where("registry_id = ?", registryID) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if itemType != "" { | ||||||||||||||||||||||
| query = query.Where("item_type = ?", itemType) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if limit > 0 { | ||||||||||||||||||||||
| query = query.Limit(limit) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| var items []models.CapabilityItem | ||||||||||||||||||||||
| if err := query.Order("created_at ASC").Find(&items).Error; err != nil { | ||||||||||||||||||||||
| log.Fatalf("failed to query items: %v", err) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if len(items) == 0 { | ||||||||||||||||||||||
| log.Println("no matching items found") | ||||||||||||||||||||||
| return | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| itemIDs := make([]string, 0, len(items)) | ||||||||||||||||||||||
| for _, item := range items { | ||||||||||||||||||||||
| itemIDs = append(itemIDs, item.ID) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| var revisions []latestRevisionRow | ||||||||||||||||||||||
| if err := db.Model(&models.CapabilityVersion{}). | ||||||||||||||||||||||
| Select("item_id, COALESCE(MAX(revision), 0) AS latest_revision"). | ||||||||||||||||||||||
| Where("item_id IN ?", itemIDs). | ||||||||||||||||||||||
| Group("item_id"). | ||||||||||||||||||||||
| Scan(&revisions).Error; err != nil { | ||||||||||||||||||||||
| log.Fatalf("failed to query latest revisions: %v", err) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| revisionByItemID := make(map[string]int, len(revisions)) | ||||||||||||||||||||||
| for _, row := range revisions { | ||||||||||||||||||||||
| revisionByItemID[row.ItemID] = row.LatestRevision | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| scanJobSvc := &services.ScanJobService{DB: db} | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| var ( | ||||||||||||||||||||||
| enqueued int | ||||||||||||||||||||||
| skipped int | ||||||||||||||||||||||
| failed int | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for _, item := range items { | ||||||||||||||||||||||
| revision := revisionByItemID[item.ID] | ||||||||||||||||||||||
| if revision <= 0 { | ||||||||||||||||||||||
| revision = 1 | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+271
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Skip items that have no version row.
Suggested fix- revision := revisionByItemID[item.ID]
- if revision <= 0 {
- revision = 1
- }
+ revision, ok := revisionByItemID[item.ID]
+ if !ok || revision <= 0 {
+ skipped++
+ log.Printf("skipped item=%s slug=%s reason=no-capability-version", item.ID, item.Slug)
+ continue
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if dryRun { | ||||||||||||||||||||||
| log.Printf("[dry-run] item=%s slug=%s type=%s security_status=%s revision=%d", | ||||||||||||||||||||||
| item.ID, item.Slug, item.ItemType, item.SecurityStatus, revision) | ||||||||||||||||||||||
| continue | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| job, err := scanJobSvc.Enqueue(item.ID, revision, triggerType, "", services.ScanEnqueueOptions{ | ||||||||||||||||||||||
| Priority: priority, | ||||||||||||||||||||||
| MaxAttempts: maxAttempts, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| switch { | ||||||||||||||||||||||
| case err == nil && job != nil: | ||||||||||||||||||||||
| enqueued++ | ||||||||||||||||||||||
| log.Printf("enqueued scan job item=%s slug=%s job=%s revision=%d", item.ID, item.Slug, job.ID, revision) | ||||||||||||||||||||||
| case err == nil && job == nil: | ||||||||||||||||||||||
| skipped++ | ||||||||||||||||||||||
| log.Printf("skipped item=%s slug=%s reason=already-has-active-job", item.ID, item.Slug) | ||||||||||||||||||||||
| case err != nil && strings.Contains(err.Error(), services.ErrScanJobAlreadyQueued.Error()): | ||||||||||||||||||||||
| skipped++ | ||||||||||||||||||||||
| log.Printf("skipped item=%s slug=%s reason=already-has-active-job", item.ID, item.Slug) | ||||||||||||||||||||||
| default: | ||||||||||||||||||||||
| failed++ | ||||||||||||||||||||||
| log.Printf("failed item=%s slug=%s err=%v", item.ID, item.Slug, err) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if dryRun { | ||||||||||||||||||||||
| log.Printf("[dry-run] matched %d items", len(items)) | ||||||||||||||||||||||
| return | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| log.Printf("backfill complete: matched=%d enqueued=%d skipped=%d failed=%d", | ||||||||||||||||||||||
| len(items), enqueued, skipped, failed) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func runPreMigrations(db *gorm.DB) error { | ||||||||||||||||||||||
| stmts := []struct { | ||||||||||||||||||||||
| check string | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reject negative queue controls.
Line 282 forwards
priorityandmaxAttemptsverbatim, andserver/internal/services/scan_job_service.go:24-63only normalizes zero.scan-backfill --priority=-1or--max-attempts=-1therefore persists invalid job settings instead of failing fast.Suggested fix
Also applies to: 282-285
🤖 Prompt for AI Agents