@@ -2,11 +2,13 @@ package main
22
33import (
44 "context"
5+ "flag"
56 "fmt"
67 "log"
78 "os"
89 "os/signal"
910 "strconv"
11+ "strings"
1012 "syscall"
1113 "time"
1214
@@ -22,6 +24,15 @@ import (
2224)
2325
2426func main () {
27+ if len (os .Args ) > 1 && os .Args [1 ] == "scan-backfill" {
28+ runScanBackfill (os .Args [2 :])
29+ return
30+ }
31+
32+ runWorker ()
33+ }
34+
35+ func runWorker () {
2536 // Initialise structured logging with daily rotation and 7-day retention.
2637 // Worker uses a "worker" file prefix to separate logs from the API server:
2738 // logs/worker-app.log – all worker messages (DEBUG+), including full SQL
@@ -79,11 +90,13 @@ func main() {
7990 tmpDir = os .TempDir () + "/costrict-sync"
8091 }
8192
93+ scanJobSvc := & services.ScanJobService {DB : db }
8294 syncSvc := & services.SyncService {
83- DB : db ,
84- Git : & services.GitService {TempBaseDir : tmpDir },
85- Parser : & services.ParserService {},
86- CategorySvc : & services.CategoryService {DB : db },
95+ DB : db ,
96+ Git : & services.GitService {TempBaseDir : tmpDir },
97+ Parser : & services.ParserService {},
98+ ScanJobService : scanJobSvc ,
99+ CategorySvc : & services.CategoryService {DB : db },
87100 }
88101
89102 concurrency , _ := strconv .Atoi (os .Getenv ("WORKER_CONCURRENCY" ))
@@ -120,9 +133,10 @@ func main() {
120133
121134 scanLLMClient := llm .NewClient (& llmCfg )
122135 scanSvc := & services.ScanService {
123- DB : db ,
124- LLMClient : scanLLMClient ,
125- ModelName : llmCfg .Model ,
136+ DB : db ,
137+ LLMClient : scanLLMClient ,
138+ ModelName : llmCfg .Model ,
139+ CategorySvc : & services.CategoryService {DB : db },
126140 }
127141
128142 scanConcurrency , _ := strconv .Atoi (os .Getenv ("SCAN_WORKER_CONCURRENCY" ))
@@ -157,6 +171,143 @@ func main() {
157171 log .Println ("Worker pools stopped" )
158172}
159173
174+ type latestRevisionRow struct {
175+ ItemID string
176+ LatestRevision int
177+ }
178+
179+ func runScanBackfill (args []string ) {
180+ logger .Init (logger.Config {
181+ Dir : "./logs" ,
182+ FilePrefix : "worker" ,
183+ MaxAgeDays : 7 ,
184+ Console : true ,
185+ ConsoleLevel : "warn" ,
186+ })
187+
188+ fs := flag .NewFlagSet ("scan-backfill" , flag .ExitOnError )
189+
190+ var (
191+ allItems bool
192+ securityStatus string
193+ registryID string
194+ itemType string
195+ limit int
196+ triggerType string
197+ priority int
198+ maxAttempts int
199+ dryRun bool
200+ )
201+
202+ fs .BoolVar (& allItems , "all" , false , "enqueue all active items instead of filtering by security status" )
203+ fs .StringVar (& securityStatus , "security-status" , "unscanned" , "filter by capability_items.security_status when --all=false" )
204+ fs .StringVar (& registryID , "registry-id" , "" , "only enqueue items in the given registry" )
205+ fs .StringVar (& itemType , "item-type" , "" , "only enqueue items of the given type (skill|subagent|command|mcp)" )
206+ fs .IntVar (& limit , "limit" , 0 , "max number of items to enqueue (0 = no limit)" )
207+ fs .StringVar (& triggerType , "trigger-type" , "manual" , "scan job trigger type to record" )
208+ fs .IntVar (& priority , "priority" , 1 , "scan job priority" )
209+ fs .IntVar (& maxAttempts , "max-attempts" , 2 , "scan job max attempts" )
210+ fs .BoolVar (& dryRun , "dry-run" , false , "preview items without inserting scan jobs" )
211+ _ = fs .Parse (args )
212+
213+ cfg := config .Load ()
214+ db , err := database .Initialize (cfg .DatabaseURL )
215+ if err != nil {
216+ log .Fatalf ("failed to initialize database: %v" , err )
217+ }
218+
219+ query := db .Model (& models.CapabilityItem {}).Where ("status = ?" , "active" )
220+ if ! allItems {
221+ query = query .Where ("security_status = ?" , securityStatus )
222+ }
223+ if registryID != "" {
224+ query = query .Where ("registry_id = ?" , registryID )
225+ }
226+ if itemType != "" {
227+ query = query .Where ("item_type = ?" , itemType )
228+ }
229+ if limit > 0 {
230+ query = query .Limit (limit )
231+ }
232+
233+ var items []models.CapabilityItem
234+ if err := query .Order ("created_at ASC" ).Find (& items ).Error ; err != nil {
235+ log .Fatalf ("failed to query items: %v" , err )
236+ }
237+
238+ if len (items ) == 0 {
239+ log .Println ("no matching items found" )
240+ return
241+ }
242+
243+ itemIDs := make ([]string , 0 , len (items ))
244+ for _ , item := range items {
245+ itemIDs = append (itemIDs , item .ID )
246+ }
247+
248+ var revisions []latestRevisionRow
249+ if err := db .Model (& models.CapabilityVersion {}).
250+ Select ("item_id, COALESCE(MAX(revision), 0) AS latest_revision" ).
251+ Where ("item_id IN ?" , itemIDs ).
252+ Group ("item_id" ).
253+ Scan (& revisions ).Error ; err != nil {
254+ log .Fatalf ("failed to query latest revisions: %v" , err )
255+ }
256+
257+ revisionByItemID := make (map [string ]int , len (revisions ))
258+ for _ , row := range revisions {
259+ revisionByItemID [row .ItemID ] = row .LatestRevision
260+ }
261+
262+ scanJobSvc := & services.ScanJobService {DB : db }
263+
264+ var (
265+ enqueued int
266+ skipped int
267+ failed int
268+ )
269+
270+ for _ , item := range items {
271+ revision := revisionByItemID [item .ID ]
272+ if revision <= 0 {
273+ revision = 1
274+ }
275+
276+ if dryRun {
277+ log .Printf ("[dry-run] item=%s slug=%s type=%s security_status=%s revision=%d" ,
278+ item .ID , item .Slug , item .ItemType , item .SecurityStatus , revision )
279+ continue
280+ }
281+
282+ job , err := scanJobSvc .Enqueue (item .ID , revision , triggerType , "" , services.ScanEnqueueOptions {
283+ Priority : priority ,
284+ MaxAttempts : maxAttempts ,
285+ })
286+ switch {
287+ case err == nil && job != nil :
288+ enqueued ++
289+ log .Printf ("enqueued scan job item=%s slug=%s job=%s revision=%d" , item .ID , item .Slug , job .ID , revision )
290+ case err == nil && job == nil :
291+ skipped ++
292+ log .Printf ("skipped item=%s slug=%s reason=already-has-active-job" , item .ID , item .Slug )
293+ case err != nil && strings .Contains (err .Error (), services .ErrScanJobAlreadyQueued .Error ()):
294+ skipped ++
295+ log .Printf ("skipped item=%s slug=%s reason=already-has-active-job" , item .ID , item .Slug )
296+ default :
297+ failed ++
298+ log .Printf ("failed item=%s slug=%s err=%v" , item .ID , item .Slug , err )
299+ }
300+ }
301+
302+ if dryRun {
303+ log .Printf ("[dry-run] matched %d items" , len (items ))
304+ return
305+ }
306+
307+ log .Printf ("backfill complete: matched=%d enqueued=%d skipped=%d failed=%d" ,
308+ len (items ), enqueued , skipped , failed )
309+ }
310+
160311func runPreMigrations (db * gorm.DB ) error {
161312 stmts := []struct {
162313 check string
0 commit comments