diff --git a/internal/v2/appinfo/appinfomodule.go b/internal/v2/appinfo/appinfomodule.go index 7557a9f..4eb3405 100644 --- a/internal/v2/appinfo/appinfomodule.go +++ b/internal/v2/appinfo/appinfomodule.go @@ -518,7 +518,7 @@ func (m *AppInfoModule) initSyncer() error { } // Start syncer in passive mode (Pipeline handles scheduling) - if err := m.syncer.StartWithOptions(m.ctx, false); err != nil { + if err := m.syncer.StartWithOptions(m.ctx); err != nil { return fmt.Errorf("failed to start syncer: %w", err) } @@ -550,7 +550,7 @@ func (m *AppInfoModule) initHydrator() error { m.hydrator = NewHydrator(cacheData, m.settingsManager, m.cacheManager, hydratorConfig) // Start hydrator with context - if err := m.hydrator.Start(m.ctx); err != nil { + if err := m.hydrator.StartWithOptions(m.ctx); err != nil { return fmt.Errorf("failed to start hydrator: %w", err) } @@ -583,7 +583,7 @@ func (m *AppInfoModule) initDataWatcher() error { // Create DataWatcher instance m.dataWatcher = NewDataWatcher(m.cacheManager, m.hydrator, m.dataSender) - if err := m.dataWatcher.StartWithOptions(m.ctx, false); err != nil { + if err := m.dataWatcher.StartWithOptions(m.ctx); err != nil { return fmt.Errorf("failed to start DataWatcher: %w", err) } @@ -646,7 +646,7 @@ func (m *AppInfoModule) initDataWatcherRepo() error { // Create DataWatcherRepo instance m.dataWatcherRepo = NewDataWatcherRepo(m.redisClient, m.cacheManager, m.dataWatcher, m.dataSender) - if err := m.dataWatcherRepo.StartWithOptions(false); err != nil { + if err := m.dataWatcherRepo.StartWithOptions(); err != nil { return fmt.Errorf("failed to start DataWatcherRepo: %w", err) } @@ -664,7 +664,7 @@ func (m *AppInfoModule) initStatusCorrectionChecker() error { m.statusCorrectionChecker = NewStatusCorrectionChecker(m.cacheManager) - if err := m.statusCorrectionChecker.StartWithOptions(false); err != nil { + if err := m.statusCorrectionChecker.StartWithOptions(); err != nil { return fmt.Errorf("failed to start StatusCorrectionChecker: %w", err) } @@ -1082,16 +1082,6 @@ func DefaultModuleConfig() *ModuleConfig { } } -// GetDockerImageInfo is a convenience function to get Docker image information -func (m *AppInfoModule) GetDockerImageInfo(imageName, appName string) (*utils.DockerImageInfo, error) { - return utils.GetDockerImageInfo(imageName, appName) -} - -// GetLayerDownloadProgress is a convenience function to get layer download progress -func (m *AppInfoModule) GetLayerDownloadProgress(layerDigest string) (*utils.LayerInfo, error) { - return utils.GetLayerDownloadProgress(layerDigest) -} - // SetAppData is a convenience function to set app data func (m *AppInfoModule) SetAppData(userID, sourceID string, dataType AppDataType, data map[string]interface{}) error { if !m.isStarted || m.cacheManager == nil { @@ -1255,197 +1245,6 @@ func (m *AppInfoModule) SyncUserListToCache() error { return m.cacheManager.SyncUserListToCache() } -// RefreshUserDataStructures ensures all configured users have proper data structures -// not used -func (m *AppInfoModule) RefreshUserDataStructures() error { - // Check isStarted without lock since it's only read - if !m.isStarted { - return fmt.Errorf("module is not started") - } - - glog.V(3).Info("Refreshing user data structures") - - // First sync user list to cache - if err := m.SyncUserListToCache(); err != nil { - return fmt.Errorf("failed to sync user list to cache: %w", err) - } - - // Force sync to Redis to ensure persistence - // Cache manager pointer access is atomic - if m.cacheManager != nil { - if err := m.cacheManager.ForceSync(); err != nil { - glog.Errorf("Failed to force sync after refreshing user data structures: %v", err) - } - } - - glog.V(3).Info("User data structures refreshed successfully") - return nil -} - -// GetConfiguredUsers returns the list of configured users -func (m *AppInfoModule) GetConfiguredUsers() []string { - // Config is read-only after initialization, no need for read lock - if m.config.User == nil { - return []string{} - } - - // Return a copy to prevent external modification - users := make([]string, len(m.config.User.UserList)) - copy(users, m.config.User.UserList) - return users -} - -// GetCachedUsers returns the list of users currently in cache -func (m *AppInfoModule) GetCachedUsers() []string { - // Check isStarted and cache manager without lock since they're only read - if !m.isStarted || m.cacheManager == nil { - return []string{} - } - - allUsersData := m.cacheManager.GetAllUsersData() // not used - users := make([]string, 0, len(allUsersData)) - for userID := range allUsersData { - users = append(users, userID) - } - - return users -} - -// CleanupInvalidData cleans up invalid pending data entries from cache -func (m *AppInfoModule) CleanupInvalidData() (int, error) { - if m.cacheManager == nil { - return 0, fmt.Errorf("cache manager not available") - } - - cleanedCount := m.cacheManager.CleanupInvalidPendingData() - glog.V(3).Infof("Cleaned up %d invalid pending data entries", cleanedCount) - - return cleanedCount, nil -} - -// GetInvalidDataReport returns a detailed report of invalid pending data entries -func (m *AppInfoModule) GetInvalidDataReport() map[string]interface{} { - if m.cacheManager == nil { - return map[string]interface{}{ - "error": "cache manager not available", - } - } - - report := map[string]interface{}{ - "users": make(map[string]interface{}), - "totals": map[string]int{ - "total_users": 0, - "total_sources": 0, - "total_pending_data": 0, - "total_invalid_data": 0, - }, - } - - allUsersForReport := m.cacheManager.GetAllUsersData() // not used - - totalUsers := 0 - totalSources := 0 - totalPendingData := 0 - totalInvalidData := 0 - - for userID, userData := range allUsersForReport { - totalUsers++ - userReport := map[string]interface{}{ - "sources": make(map[string]interface{}), - "totals": map[string]int{ - "total_sources": 0, - "total_pending_data": 0, - "total_invalid_data": 0, - }, - } - - for sourceID, sourceData := range userData.Sources { - totalSources++ - - sourceReport := map[string]interface{}{ - "total_pending_data": len(sourceData.AppInfoLatestPending), - "invalid_entries": make([]map[string]interface{}, 0), - } - - invalidCount := 0 - for i, pendingData := range sourceData.AppInfoLatestPending { - isValid := false - invalidReasons := make([]string, 0) - - if pendingData.RawData != nil { - if (pendingData.RawData.ID != "" && pendingData.RawData.ID != "0") || - (pendingData.RawData.AppID != "" && pendingData.RawData.AppID != "0") || - (pendingData.RawData.Name != "" && pendingData.RawData.Name != "unknown") { - isValid = true - } else { - if pendingData.RawData.ID == "" || pendingData.RawData.ID == "0" { - invalidReasons = append(invalidReasons, "empty or zero ID") - } - if pendingData.RawData.AppID == "" || pendingData.RawData.AppID == "0" { - invalidReasons = append(invalidReasons, "empty or zero AppID") - } - if pendingData.RawData.Name == "" || pendingData.RawData.Name == "unknown" { - invalidReasons = append(invalidReasons, "empty or unknown Name") - } - } - } else { - invalidReasons = append(invalidReasons, "null RawData") - } - - if pendingData.AppInfo != nil && pendingData.AppInfo.AppEntry != nil && !isValid { - if (pendingData.AppInfo.AppEntry.ID != "" && pendingData.AppInfo.AppEntry.ID != "0") || - (pendingData.AppInfo.AppEntry.AppID != "" && pendingData.AppInfo.AppEntry.AppID != "0") || - (pendingData.AppInfo.AppEntry.Name != "" && pendingData.AppInfo.AppEntry.Name != "unknown") { - isValid = true - invalidReasons = make([]string, 0) // Clear reasons if AppInfo is valid - } - } - - if !isValid { - invalidCount++ - invalidEntry := map[string]interface{}{ - "index": i, - "reasons": invalidReasons, - "data": map[string]interface{}{ - "timestamp": pendingData.Timestamp, - "version": pendingData.Version, - }, - } - - if pendingData.RawData != nil { - invalidEntry["raw_data"] = map[string]interface{}{ - "id": pendingData.RawData.ID, - "app_id": pendingData.RawData.AppID, - "name": pendingData.RawData.Name, - "title": pendingData.RawData.Title, - } - } - - sourceReport["invalid_entries"] = append(sourceReport["invalid_entries"].([]map[string]interface{}), invalidEntry) - } - } - - sourceReport["invalid_count"] = invalidCount - totalPendingData += len(sourceData.AppInfoLatestPending) - totalInvalidData += invalidCount - - userReport["sources"].(map[string]interface{})[sourceID] = sourceReport - userReport["totals"].(map[string]int)["total_sources"]++ - userReport["totals"].(map[string]int)["total_pending_data"] += len(sourceData.AppInfoLatestPending) - userReport["totals"].(map[string]int)["total_invalid_data"] += invalidCount - } - - report["users"].(map[string]interface{})[userID] = userReport - } - - report["totals"].(map[string]int)["total_users"] = totalUsers - report["totals"].(map[string]int)["total_sources"] = totalSources - report["totals"].(map[string]int)["total_pending_data"] = totalPendingData - report["totals"].(map[string]int)["total_invalid_data"] = totalInvalidData - - return report -} - // SetTaskModule sets the task module for recording task events func (m *AppInfoModule) SetTaskModule(taskModule *task.TaskModule) { m.taskModule = taskModule @@ -1492,9 +1291,3 @@ func (m *AppInfoModule) SetSettingsManager(settingsManager *settings.SettingsMan m.settingsManager = settingsManager glog.V(2).Info("Settings manager set in AppInfo module") } - -// GetSettingsManager returns the settings manager instance -func (m *AppInfoModule) GetSettingsManager() *settings.SettingsManager { - // Pointer assignment is atomic, no need for read lock - return m.settingsManager -} diff --git a/internal/v2/appinfo/cache.go b/internal/v2/appinfo/cache.go index 0ffc785..3b013e4 100644 --- a/internal/v2/appinfo/cache.go +++ b/internal/v2/appinfo/cache.go @@ -800,10 +800,6 @@ func (cm *CacheManager) Start() error { // Start sync worker goroutine go cm.syncWorker() - // Start periodic cleanup of AppRenderFailed data (every 5 minutes) - cm.cleanupTicker = time.NewTicker(5 * time.Minute) - go cm.cleanupWorker() // + - glog.V(3).Infof("Cache manager started successfully") return nil } @@ -853,6 +849,7 @@ func (cm *CacheManager) processSyncRequest(req SyncRequest) { return } + glog.Infof("[CACHE] SyncRequest, user: %s, source: %s, type: %d", req.UserID, req.SourceID, req.Type) switch req.Type { case SyncUser: if userData := cm.getUserData(req.UserID); userData != nil { @@ -926,6 +923,8 @@ func (cm *CacheManager) GetAppVersionFromState(userID, sourceID, appName string) // getSourceData internal method to get source data without external locking func (cm *CacheManager) getSourceData(userID, sourceID string) *SourceData { + cm.mutex.RLock() + defer cm.mutex.RUnlock() if userData, exists := cm.cache.Users[userID]; exists { return userData.Sources[sourceID] } @@ -1523,7 +1522,7 @@ func (cm *CacheManager) setAppDataInternal(userID, sourceID string, dataType App glog.V(3).Infof("Added app %s for user=%s, source=%s", appID, userID, sourceID) } } - } else { + } else { // syncer if dataSection, hasData := data["data"].(map[string]interface{}); hasData { if appsData, hasApps := dataSection["apps"].(map[string]interface{}); hasApps { for appID, appDataInterface := range appsData { @@ -1792,16 +1791,6 @@ func (cm *CacheManager) addUserInternal(userID string) error { return nil } -// AddUser adds a new user to the cache -func (cm *CacheManager) AddUser(userID string) error { - go func() { - if err := cm.addUserInternal(userID); err != nil { - glog.Errorf("Failed to add user in goroutine: %v", err) - } - }() - return nil -} - // GetCacheStats returns cache statistics using single global lock func (cm *CacheManager) GetCacheStats() map[string]interface{} { cm.mutex.RLock() @@ -2331,6 +2320,7 @@ func (cm *CacheManager) updateLockStats(lockType string) { // RemoveAppStateData removes a specific app from AppStateLatest for a user and source func (cm *CacheManager) removeAppStateDataInternal(userID, sourceID, appName string) error { + glog.Infof("[CACHE], remove appStateData, user: %s, source: %s, app: %s", userID, sourceID, appName) cm.mutex.Lock() _wd := cm.startLockWatchdog("@RemoveAppStateData") defer func() { cm.mutex.Unlock(); _wd() }() @@ -2384,6 +2374,7 @@ func (cm *CacheManager) RemoveAppStateData(userID, sourceID, appName string) err // RemoveAppInfoLatestData removes a specific app from AppInfoLatest for a user and source func (cm *CacheManager) removeAppInfoLatestDataInternal(userID, sourceID, appName string) error { + glog.Infof("[CACHE], remove appInfoLatestData, user: %s, source: %s, app: %s", userID, sourceID, appName) cm.mutex.Lock() _wd := cm.startLockWatchdog("@RemoveAppInfoLatestData") defer func() { cm.mutex.Unlock(); _wd() }() @@ -2584,22 +2575,6 @@ func (cm *CacheManager) ResynceUser() error { return nil } -// cleanupWorker processes periodic cleanup of AppRenderFailed data -func (cm *CacheManager) cleanupWorker() { - glog.V(3).Info("INFO: Starting AppRenderFailed cleanup worker") - - for range cm.cleanupTicker.C { - if !cm.isRunning { - glog.V(3).Info("INFO: CacheManager stopped, cleanup worker exiting") - break - } - - cm.ClearAppRenderFailedData() - } - - glog.V(3).Info("INFO: AppRenderFailed cleanup worker stopped") -} - // ClearAppRenderFailedData clears all AppRenderFailed data for all users and sources func (cm *CacheManager) ClearAppRenderFailedData() { glog.Info("INFO: [Cleanup] Starting periodic cleanup of AppRenderFailed data") diff --git a/internal/v2/appinfo/datasender_app.go b/internal/v2/appinfo/datasender_app.go index 5c8a648..3485a0a 100644 --- a/internal/v2/appinfo/datasender_app.go +++ b/internal/v2/appinfo/datasender_app.go @@ -242,11 +242,3 @@ func (ds *DataSender) Close() { glog.V(3).Info("NATS connection closed") } } - -// IsConnected checks if NATS connection is active -func (ds *DataSender) IsConnected() bool { - if !ds.enabled || ds.conn == nil { - return false - } - return ds.conn.IsConnected() -} diff --git a/internal/v2/appinfo/datawatcher_app.go b/internal/v2/appinfo/datawatcher_app.go index 00889be..13a06e7 100644 --- a/internal/v2/appinfo/datawatcher_app.go +++ b/internal/v2/appinfo/datawatcher_app.go @@ -3,7 +3,6 @@ package appinfo import ( "context" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -46,14 +45,9 @@ func NewDataWatcher(cacheManager *CacheManager, hydrator *Hydrator, dataSender * } } -// Start begins the data watching process -func (dw *DataWatcher) Start(ctx context.Context) error { - return dw.StartWithOptions(ctx, true) -} - // StartWithOptions begins the data watching process with options // If enableWatchLoop is false, the periodic watchLoop is not started (used when serial pipeline handles processing) -func (dw *DataWatcher) StartWithOptions(ctx context.Context, enableWatchLoop bool) error { +func (dw *DataWatcher) StartWithOptions(ctx context.Context) error { if atomic.LoadInt32(&dw.isRunning) == 1 { return fmt.Errorf("DataWatcher is already running") } @@ -68,12 +62,7 @@ func (dw *DataWatcher) StartWithOptions(ctx context.Context, enableWatchLoop boo atomic.StoreInt32(&dw.isRunning, 1) - if enableWatchLoop { - glog.Infof("Starting DataWatcher with interval: %v", time.Duration(atomic.LoadInt64((*int64)(&dw.interval)))) - go dw.watchLoop(ctx) - } else { - glog.Infof("Starting DataWatcher in passive mode (serial pipeline handles processing)") - } + glog.Infof("Starting DataWatcher in passive mode (serial pipeline handles processing)") return nil } @@ -94,187 +83,6 @@ func (dw *DataWatcher) IsRunning() bool { return atomic.LoadInt32(&dw.isRunning) == 1 } -// watchLoop is the main monitoring loop -// ~ not used -func (dw *DataWatcher) watchLoop(ctx context.Context) { - glog.Infof("DataWatcher monitoring loop started") - defer glog.Infof("DataWatcher monitoring loop stopped") - - ticker := time.NewTicker(time.Duration(atomic.LoadInt64((*int64)(&dw.interval)))) - defer ticker.Stop() - - // Run once immediately - dw.processCompletedApps() // not used - - for { - select { - case <-ctx.Done(): - glog.Infof("DataWatcher stopped due to context cancellation") - return - case <-dw.stopChan: - glog.Infof("DataWatcher stopped due to explicit stop") - return - case <-ticker.C: - dw.processCompletedApps() // not used - } - } -} - -// processCompletedApps checks for completed hydration apps and moves them -func (dw *DataWatcher) processCompletedApps() { - processingStart := time.Now() - atomic.StoreInt64(&dw.lastRunTime, processingStart.Unix()) - - glog.Infof("DataWatcher: Starting to process completed apps") - - // Create timeout context for entire processing cycle - ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) - defer cancel() - - // Get all users data from cache manager with timeout - var allUsersData map[string]*types.UserData - - allUsersData = dw.cacheManager.GetAllUsersData() // not used - - if len(allUsersData) == 0 { - glog.Infof("DataWatcher: No users data found, processing cycle completed") - return - } - - glog.Infof("DataWatcher: Found %d users to process", len(allUsersData)) - totalProcessed := int64(0) - totalMoved := int64(0) - - // Process users in batches to avoid holding locks too long - const batchSize = 5 - userCount := 0 - userBatch := make([]string, 0, batchSize) - userDataBatch := make(map[string]*types.UserData) - - for userID, userData := range allUsersData { - userBatch = append(userBatch, userID) - userDataBatch[userID] = userData - userCount++ - - // Process batch when it's full or we've reached the end - if len(userBatch) >= batchSize || userCount == len(allUsersData) { - batchProcessed, batchMoved := dw.processUserBatch(ctx, userBatch, userDataBatch) // not used - totalProcessed += batchProcessed - totalMoved += batchMoved - - // Clear batch for next iteration - userBatch = userBatch[:0] - userDataBatch = make(map[string]*types.UserData) - - // Check timeout between batches - select { - case <-ctx.Done(): - glog.Errorf("DataWatcher: Timeout during batch processing after processing %d users", userCount) - return - default: - } - } - } - - // Update metrics - atomic.AddInt64(&dw.totalAppsProcessed, totalProcessed) - atomic.AddInt64(&dw.totalAppsMoved, totalMoved) - - processingDuration := time.Since(processingStart) - if totalMoved > 0 { - glog.V(2).Infof("DataWatcher: Processing cycle completed in %v - %d apps processed, %d moved to AppInfoLatest", - processingDuration, totalProcessed, totalMoved) - } else { - glog.V(3).Infof("DataWatcher: Processing cycle completed in %v - %d apps processed, no moves needed", - processingDuration, totalProcessed) - } -} - -// processUserBatch processes a batch of users -func (dw *DataWatcher) processUserBatch(ctx context.Context, userIDs []string, userDataMap map[string]*types.UserData) (int64, int64) { - totalProcessed := int64(0) - totalMoved := int64(0) - - for i, userID := range userIDs { - // Check timeout during batch processing - select { - case <-ctx.Done(): - glog.Errorf("DataWatcher: Timeout during user batch processing (user %d/%d)", i+1, len(userIDs)) - return totalProcessed, totalMoved - default: - } - - userData := userDataMap[userID] - if userData == nil { - continue - } - - glog.V(3).Infof("DataWatcher: Processing user %d/%d in batch: %s", i+1, len(userIDs), userID) - processed, moved := dw.processUserData(userID, userData) // not used - totalProcessed += processed - totalMoved += moved - glog.V(2).Infof("DataWatcher: User %s completed: %d processed, %d moved", userID, processed, moved) - } - - return totalProcessed, totalMoved -} - -// processUserData processes a single user's data -// ~ not used -func (dw *DataWatcher) processUserData(userID string, userData *types.UserData) (int64, int64) { - if userData == nil { - return 0, 0 - } - - // Step 1: Collect source data references under minimal lock - sourceRefs := make(map[string]*SourceData) - for sourceID, sourceData := range userData.Sources { - sourceRefs[sourceID] = sourceData - } - - // Step 2: Process each source without holding user lock - totalProcessed := int64(0) - totalMoved := int64(0) - - for sourceID, sourceData := range sourceRefs { - processed, moved := dw.processSourceData(userID, sourceID, sourceData) // not used - totalProcessed += processed - totalMoved += moved - } - - // Hash calculation is deferred to Pipeline Phase 5. - // The caller (Pipeline.phaseHydrateApps) tracks affected users and - // Phase 5 will calculate hashes for all affected users in one pass. - - return totalProcessed, totalMoved -} - -// calculateAndSetUserHashWithRetry calculates hash with retry mechanism for data consistency -func (dw *DataWatcher) calculateAndSetUserHashWithRetry(userID string, userData *types.UserData) { - maxRetries := 3 - retryDelay := 20000 * time.Millisecond - - for attempt := 1; attempt <= maxRetries; attempt++ { - glog.Infof("DataWatcher: Hash calculation attempt %d/%d for user %s", attempt, maxRetries, userID) - - // Perform hash calculation directly - success := dw.calculateAndSetUserHashDirect(userID, userData) - - if success { - glog.Infof("DataWatcher: Hash calculation completed successfully for user %s", userID) - return - } - - if attempt < maxRetries { - glog.Warningf("DataWatcher: Hash calculation failed for user %s, retrying in %v", userID, retryDelay) - time.Sleep(retryDelay) - retryDelay *= 2 // Exponential backoff - } - } - - glog.Errorf("DataWatcher: Hash calculation failed after %d attempts for user %s", maxRetries, userID) -} - // calculateAndSetUserHashDirect calculates and updates hash for a single user. // Does NOT call ForceSync — the caller (Pipeline Phase 5) is responsible for syncing. func (dw *DataWatcher) calculateAndSetUserHashDirect(userID string, userData *types.UserData) bool { @@ -310,126 +118,6 @@ func (dw *DataWatcher) calculateAndSetUserHashDirect(userID string, userData *ty return true } -// processSourceData processes a single source's data for completed hydration -// ~ not used -func (dw *DataWatcher) processSourceData(userID, sourceID string, sourceData *types.SourceData) (int64, int64) { - if sourceData == nil { - return 0, 0 - } - - // Step 1: Quick check and data copy with minimal lock time - pendingApps, _ := dw.cacheManager.SnapshotSourcePending(userID, sourceID) - - // Early exit if no pending apps - if len(pendingApps) == 0 { - return 0, 0 - } - - glog.V(3).Infof("DataWatcher: Processing %d pending apps for user=%s, source=%s", len(pendingApps), userID, sourceID) - - // Step 2: Lock-free processing - Check hydration completion status - var completedApps []*types.AppInfoLatestPendingData - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - for i, pendingApp := range pendingApps { - if pendingApp == nil { - continue - } - - if isDevEnvironment() { - glog.V(3).Infof("DataWatcher: Checking app %d/%d: %s", i+1, len(pendingApps), dw.getAppID(pendingApp)) - } - - if dw.isAppHydrationCompletedWithTimeout(ctx, pendingApp) { - completedApps = append(completedApps, pendingApp) - } - } - - if len(completedApps) == 0 { - glog.V(3).Infof("DataWatcher: No completed apps found for user=%s, source=%s", userID, sourceID) - return int64(len(pendingApps)), 0 - } - - // Build single-line summary with all completed app IDs - completedIDs := make([]string, 0, len(completedApps)) - for _, ca := range completedApps { - completedIDs = append(completedIDs, dw.getAppID(ca)) - } - glog.Infof("DataWatcher: user=%s source=%s completed=%d/%d apps=[%s]", userID, sourceID, len(completedApps), len(pendingApps), strings.Join(completedIDs, ",")) - - // Step 3: Move completed apps from pending to latest via CacheManager - movedCount := int64(0) - for _, completedApp := range completedApps { - latestData := dw.convertPendingToLatest(completedApp) - if latestData == nil { - continue - } - appID := dw.getAppID(completedApp) - appName := dw.getAppName(completedApp) - - oldVersion, replaced, ok := dw.cacheManager.UpsertLatestAndRemovePending(userID, sourceID, latestData, appID, appName) - if !ok { - continue - } - - if replaced { - newVersion := "" - if latestData.AppInfo != nil && latestData.AppInfo.AppEntry != nil { - newVersion = latestData.AppInfo.AppEntry.Version - } - if oldVersion != newVersion { - dw.sendNewAppReadyNotification(userID, completedApp, sourceID) // ~ not used - } - glog.V(3).Infof("DataWatcher: Replaced existing app: %s", appName) - } else { - glog.V(2).Infof("DataWatcher: Added new app to latest: %s", appName) - dw.sendNewAppReadyNotification(userID, completedApp, sourceID) // ~ not used - } - movedCount++ - } - - return int64(len(pendingApps)), movedCount -} - -// isAppHydrationCompletedWithTimeout checks if app hydration is completed with timeout protection -func (dw *DataWatcher) isAppHydrationCompletedWithTimeout(ctx context.Context, pendingApp *types.AppInfoLatestPendingData) bool { - if pendingApp == nil { - glog.V(3).Info("DataWatcher: isAppHydrationCompletedWithTimeout called with nil pendingApp") - return false - } - if dw.hydrator == nil { - glog.V(3).Info("DataWatcher: Hydrator is nil, cannot check hydration completion") - return false - } - - // Create a channel to receive the result - resultChan := make(chan bool, 1) - - // Run hydration check in a goroutine with timeout - go func() { - defer func() { - if r := recover(); r != nil { - glog.Errorf("DataWatcher: Panic in hydration check: %v", r) - resultChan <- false - } - }() - - result := dw.hydrator.isAppHydrationComplete(pendingApp) - resultChan <- result - }() - - // Wait for result or timeout - select { - case result := <-resultChan: - return result - case <-ctx.Done(): - appID := dw.getAppID(pendingApp) - glog.V(3).Infof("DataWatcher: Timeout checking hydration completion for app=%s", appID) - return false - } -} - // getAppID extracts app ID from pending app data func (dw *DataWatcher) getAppID(pendingApp *types.AppInfoLatestPendingData) string { if pendingApp == nil { @@ -486,36 +174,6 @@ func (dw *DataWatcher) getAppName(pendingApp *types.AppInfoLatestPendingData) st return "unknown" } -// getAppNameFromLatest extracts app name from latest app data for deduplication -func (dw *DataWatcher) getAppNameFromLatest(latestApp *types.AppInfoLatestData) string { - if latestApp == nil { - return "unknown" - } - - // Try to get name from RawData first - if latestApp.RawData != nil { - if latestApp.RawData.Name != "" { - return latestApp.RawData.Name - } - } - - // Try to get name from AppInfo - if latestApp.AppInfo != nil && latestApp.AppInfo.AppEntry != nil { - if latestApp.AppInfo.AppEntry.Name != "" { - return latestApp.AppInfo.AppEntry.Name - } - } - - // Try to get name from AppSimpleInfo - if latestApp.AppSimpleInfo != nil { - if latestApp.AppSimpleInfo.AppName != "" { - return latestApp.AppSimpleInfo.AppName - } - } - - return "unknown" -} - // convertPendingToLatest converts AppInfoLatestPendingData to AppInfoLatestData func (dw *DataWatcher) convertPendingToLatest(pendingApp *types.AppInfoLatestPendingData) *types.AppInfoLatestData { if pendingApp == nil { @@ -611,17 +269,6 @@ type DataWatcherMetrics struct { Interval time.Duration `json:"interval"` } -// SetInterval sets the monitoring interval -func (dw *DataWatcher) SetInterval(interval time.Duration) { - if interval < time.Second { - interval = time.Second // Minimum 1 second - } - - // Use atomic operation for thread safety since interval can be modified at runtime - atomic.StoreInt64((*int64)(&dw.interval), int64(interval)) - glog.V(3).Info("DataWatcher interval set to: %v", interval) -} - // createAppSimpleInfo creates an AppSimpleInfo from pending app data func (dw *DataWatcher) createAppSimpleInfo(pendingApp *types.AppInfoLatestPendingData) *types.AppSimpleInfo { if pendingApp == nil { @@ -816,23 +463,6 @@ func (dw *DataWatcher) ensureAppSimpleInfoFields(appSimpleInfo *types.AppSimpleI } } -// ForceCalculateUserHash forces hash calculation for a user regardless of app movement -// not used -func (dw *DataWatcher) ForceCalculateUserHash(userID string) error { - return nil - // glog.Infof("DataWatcher: Force calculating hash for user %s", userID) - - // // Get user data from cache manager - // userData := dw.cacheManager.GetUserData(userID) - // if userData == nil { - // return fmt.Errorf("user data not found for user %s", userID) - // } - - // // Call hash calculation directly - // dw.calculateAndSetUserHashWithRetry(userID, userData) - // return nil -} - // MarkUserDirty marks a user as needing hash recalculation. // Called by event-driven paths (e.g. DataWatcherState) that modify user data // outside the Pipeline cycle. The dirty users will be picked up by Pipeline Phase 5. diff --git a/internal/v2/appinfo/datawatcher_repo.go b/internal/v2/appinfo/datawatcher_repo.go index ad95a43..776d659 100644 --- a/internal/v2/appinfo/datawatcher_repo.go +++ b/internal/v2/appinfo/datawatcher_repo.go @@ -121,30 +121,31 @@ func (dwr *DataWatcherRepo) initializeLastProcessedID() error { } // Start begins the periodic state checking process -func (dwr *DataWatcherRepo) Start() error { - if dwr.isRunning { - return fmt.Errorf("data watcher is already running") - } +// func (dwr *DataWatcherRepo) Start() error { +// if dwr.isRunning { +// return fmt.Errorf("data watcher is already running") +// } - // Create ticker for 2-minute intervals - dwr.ticker = time.NewTicker(2 * time.Minute) - dwr.isRunning = true +// // Create ticker for 2-minute intervals +// dwr.ticker = time.NewTicker(2 * time.Minute) +// dwr.isRunning = true - glog.V(3).Info("Starting data watcher with 2-minute intervals") +// glog.V(3).Info("Starting data watcher with 2-minute intervals") - // Start the monitoring goroutine - go dwr.monitorStateChanges() // not used +// // Start the monitoring goroutine +// go dwr.monitorStateChanges() - return nil -} +// return nil +// } // StartWithOptions starts with options, if enablePolling is false, the periodic polling is not started -func (dwr *DataWatcherRepo) StartWithOptions(enablePolling bool) error { +func (dwr *DataWatcherRepo) StartWithOptions() error { if dwr.isRunning { return fmt.Errorf("DataWatcherRepo is already running") } dwr.isRunning = true + glog.V(3).Info("Starting DataWatcherRepo in passive mode (serial pipeline handles processing)") return nil @@ -184,28 +185,6 @@ func (dwr *DataWatcherRepo) IsRunning() bool { return dwr.isRunning } -// monitorStateChanges runs the main monitoring loop -func (dwr *DataWatcherRepo) monitorStateChanges() { - glog.V(3).Info("State change monitoring started") - - // Process immediately on start - if err := dwr.processStateChanges(); err != nil { // not used - glog.Errorf("Error processing state changes on startup: %v", err) - } - - for { - select { - case <-dwr.ticker.C: - if err := dwr.processStateChanges(); err != nil { // not used - glog.Errorf("Error processing state changes: %v", err) - } - case <-dwr.stopChannel: - glog.V(3).Info("State change monitoring stopped") - return - } - } -} - // processStateChanges fetches and processes new state changes func (dwr *DataWatcherRepo) processStateChanges() map[string]bool { glog.V(2).Infof("Processing state changes after ID: %d", dwr.lastProcessedID) diff --git a/internal/v2/appinfo/datawatcher_state.go b/internal/v2/appinfo/datawatcher_state.go index a034e9c..1b30f5a 100644 --- a/internal/v2/appinfo/datawatcher_state.go +++ b/internal/v2/appinfo/datawatcher_state.go @@ -545,6 +545,19 @@ func (dw *DataWatcherState) handleMessage(msg *nats.Msg) { // + shouldUpdate := true checker := func(appState *AppStateLatestData) { + // /** + // * [Mandatory Sync Whitelist] + // * The cases below define critical state transition scenarios that must be processed. + // * + // * Background: + // * When a user performs an action in the UI (e.g., canceling installation/download) or when an app lifecycle event completes (e.g., installation finished, uninstallation finished), + // * the final state pushed by NATS (appStateMsg.State) may differ from the cached state in memory (appState.Status.State). + // * + // * Purpose: + // * Even if the progress (Progress) has not changed and there is no entrance information (EntranceStatuses), + // * as long as the following conditions are met, we must bypass the "deduplication check" in the default branch. + // * This forces the local state to update and pushes the change to the frontend, ensuring the UI promptly reflects the final result. + // */ switch { // NATS State APP State case appStateMsg.State == "running" && appState.Status.State == "installing": @@ -599,74 +612,6 @@ func (dw *DataWatcherState) handleMessage(msg *nats.Msg) { // + return } - // userData := dw.cacheManager.getUserData(appStateMsg.User) - // if userData == nil { - // glog.V(2).Infof("User data not found for user %s", appStateMsg.User) - // return - // } - - // for sourceId, sourceData := range userData.Sources { - // if appStateMsg.MarketSource != "" && sourceId != appStateMsg.MarketSource { - // continue - // } - // for _, appState := range sourceData.AppStateLatest { - // if appState.Status.Name == appStateMsg.Name { // && appState.Status.State == appStateMsg.State - - // /** - // * [Mandatory Sync Whitelist] - // * The cases below define critical state transition scenarios that must be processed. - // * - // * Background: - // * When a user performs an action in the UI (e.g., canceling installation/download) or when an app lifecycle event completes (e.g., installation finished, uninstallation finished), - // * the final state pushed by NATS (appStateMsg.State) may differ from the cached state in memory (appState.Status.State). - // * - // * Purpose: - // * Even if the progress (Progress) has not changed and there is no entrance information (EntranceStatuses), - // * as long as the following conditions are met, we must bypass the "deduplication check" in the default branch. - // * This forces the local state to update and pushes the change to the frontend, ensuring the UI promptly reflects the final result. - // */ - // switch { - // // NATS State APP State - // case appStateMsg.State == "running" && appState.Status.State == "installing": - // case appStateMsg.State == "running" && appState.Status.State == "initializing": - // case appStateMsg.State == "uninstalled" && appState.Status.State == "running": - // case appStateMsg.State == "uninstalled" && appState.Status.State == "stopped": - // case appStateMsg.State == "uninstalled" && appState.Status.State == "uninstalling": - // case appStateMsg.State == "uninstalled" && appState.Status.State == "installingCanceling": - // case appStateMsg.State == "uninstalled" && appState.Status.State == "installingCancelFailed": - // case appStateMsg.State == "pendingCanceled" && appState.Status.State == "pending": - // case appStateMsg.State == "downloadingCanceled" && appState.Status.State == "downloadingCanceling": - // case appStateMsg.State == "downloadingCanceled" && appState.Status.State == "pending": - // case appStateMsg.State == "installingCanceled" && appState.Status.State == "installing": - // case appStateMsg.State == "installingCanceled" && appState.Status.State == "installingCanceling": - // default: - // if len(appStateMsg.EntranceStatuses) == 0 && appState.Status.Progress == appStateMsg.Progress { - // glog.V(2).Infof("App state message is the same as the cached app state message for app %s, user %s, source %s, appState: %s, msgState: %s", - // appStateMsg.Name, appStateMsg.User, appStateMsg.OpID, appState.Status.State, appStateMsg.State) - // return - // } - // } - - // // Compare timestamps properly by parsing them - // if appState.Status.StatusTime != "" && appStateMsg.CreateTime != "" { - // statusTime, err1 := time.Parse("2006-01-02T15:04:05.000000000Z", appState.Status.StatusTime) - // createTime, err2 := time.Parse("2006-01-02T15:04:05.000000000Z", appStateMsg.CreateTime) - - // if err1 == nil && err2 == nil { - // if statusTime.After(createTime) { - // glog.V(2).Infof("Cached app state is newer than incoming message for app %s, user %s, source %s, appTime: %s, msgTime: %s. Skipping update.", - // appStateMsg.Name, appStateMsg.User, appStateMsg.OpID, statusTime.String(), createTime.String()) - // return - // } - // } else { - // glog.Errorf("Failed to parse timestamps for comparison: StatusTime=%s, CreateTime=%s, err1=%v, err2=%v", - // appState.Status.StatusTime, appStateMsg.CreateTime, err1, err2) - // } - // } - // } - // } - // } - // Process the message glog.V(2).Infof("State - Processs update message from NATS subject %s, for internal for opID: %s, app: %s, user: %s, msgState: %s", msg.Subject, appStateMsg.OpID, appStateMsg.Name, appStateMsg.User, appStateMsg.State) diff --git a/internal/v2/appinfo/db.go b/internal/v2/appinfo/db.go index 453e4e9..83cf79a 100644 --- a/internal/v2/appinfo/db.go +++ b/internal/v2/appinfo/db.go @@ -407,6 +407,9 @@ func (r *RedisClient) prepareBatchSourceDataSave(userID, sourceID string, source appInfoLatestPending = sourceData.AppInfoLatestPending } + glog.Infof("[DB] BatchSourceDataSave, history: %d, state: %d, latest: %d, pending: %d", + len(appInfoHistory), len(appStateLatest), len(appInfoLatest), len(appInfoLatestPending)) + baseKey := fmt.Sprintf("appinfo:user:%s:source:%s", userID, sourceID) // Prepare JSON serialization outside of locks @@ -458,7 +461,7 @@ func (r *RedisClient) prepareBatchSourceDataSave(userID, sourceID string, source } } - glog.V(3).Infof("Prepared batch Redis operations for user=%s, source=%s", userID, sourceID) + // glog.V(2).Infof("[DB] Prepared batch Redis operations for user=%s, source=%s", userID, sourceID) return nil } @@ -514,7 +517,7 @@ func (r *RedisClient) SaveSourceDataToRedis(userID, sourceID string, sourceData if failedOps > 0 { glog.Warningf("Redis pipeline completed with %d failed operations out of %d total for user=%s, source=%s", failedOps, len(results), userID, sourceID) } else { - glog.V(4).Infof("Successfully saved source data to Redis for user=%s, source=%s in %v (%d operations)", userID, sourceID, duration, len(results)) + glog.V(2).Infof("Successfully saved source data to Redis for user=%s, source=%s in %v (%d operations)", userID, sourceID, duration, len(results)) } return nil diff --git a/internal/v2/appinfo/diagnostic.go b/internal/v2/appinfo/diagnostic.go index 66f0000..b66bd72 100644 --- a/internal/v2/appinfo/diagnostic.go +++ b/internal/v2/appinfo/diagnostic.go @@ -1,8 +1,6 @@ package appinfo import ( - "encoding/json" - "fmt" "strings" "github.com/golang/glog" @@ -91,45 +89,6 @@ func (cm *CacheManager) DiagnoseCacheAndRedis() error { return nil } -// PrintDiagnosticInfo prints diagnostic information in a readable format -func (cm *CacheManager) PrintDiagnosticInfo() error { - err := cm.DiagnoseCacheAndRedis() - if err != nil { - return err - } - - glog.Infof("=== CACHE AND REDIS DIAGNOSTIC REPORT ===") - glog.Infof("Diagnostic completed successfully") - return nil -} - -// GetDiagnosticJSON returns diagnostic information as JSON -func (cm *CacheManager) GetDiagnosticJSON() (string, error) { - err := cm.DiagnoseCacheAndRedis() - if err != nil { - return "", err - } - - // Get cache stats and users data for JSON response - cacheStats := cm.GetCacheStats() // not used - allUsersData := cm.GetAllUsersData() // not used - - diagnosticInfo := map[string]interface{}{ - "cache_stats": cacheStats, - "users_data": allUsersData, - "total_users": len(allUsersData), - "total_sources": cacheStats["total_sources"], - "is_running": cacheStats["is_running"], - } - - jsonData, err := json.MarshalIndent(diagnosticInfo, "", " ") - if err != nil { - return "", err - } - - return string(jsonData), nil -} - // ForceReloadFromRedis forces a complete reload of cache data from Redis func (cm *CacheManager) ForceReloadFromRedis() error { glog.Infof("Force reloading cache data from Redis") @@ -147,52 +106,3 @@ func (cm *CacheManager) ForceReloadFromRedis() error { glog.Infof("Successfully reloaded cache data from Redis") return nil } - -// ValidateSourceData validates source data integrity -func (cm *CacheManager) ValidateSourceData(userID, sourceID string) (*SourceAnalysis, error) { - cm.mutex.RLock() - defer cm.mutex.RUnlock() - - userData, exists := cm.cache.Users[userID] - if !exists { - return nil, fmt.Errorf("user %s not found in cache", userID) - } - - sourceData, exists := userData.Sources[sourceID] - if !exists { - return nil, fmt.Errorf("source %s not found for user %s", sourceID, userID) - } - - analysis := &SourceAnalysis{ - SourceID: sourceID, - HasAppInfoLatest: len(sourceData.AppInfoLatest) > 0, - HasAppInfoLatestPending: len(sourceData.AppInfoLatestPending) > 0, - HasAppStateLatest: len(sourceData.AppStateLatest) > 0, - HasAppInfoHistory: len(sourceData.AppInfoHistory) > 0, - AppInfoLatestCount: len(sourceData.AppInfoLatest), - AppInfoPendingCount: len(sourceData.AppInfoLatestPending), - AppStateLatestCount: len(sourceData.AppStateLatest), - AppInfoHistoryCount: len(sourceData.AppInfoHistory), - Issues: make([]string, 0), - } - - // Validate pending data - for i, pendingData := range sourceData.AppInfoLatestPending { - if pendingData == nil { - analysis.Issues = append(analysis.Issues, fmt.Sprintf("Pending data at index %d is nil", i)) - continue - } - - if pendingData.RawData == nil { - analysis.Issues = append(analysis.Issues, fmt.Sprintf("Pending data at index %d has nil RawData", i)) - continue - } - - // Check for valid identifiers - if pendingData.RawData.ID == "" && pendingData.RawData.AppID == "" && pendingData.RawData.Name == "" { - analysis.Issues = append(analysis.Issues, fmt.Sprintf("Pending data at index %d has no valid identifiers", i)) - } - } - - return analysis, nil -} diff --git a/internal/v2/appinfo/hydration.go b/internal/v2/appinfo/hydration.go index 93c5a3d..f240279 100644 --- a/internal/v2/appinfo/hydration.go +++ b/internal/v2/appinfo/hydration.go @@ -162,7 +162,7 @@ func (h *Hydrator) AddStep(step hydrationfn.HydrationStep) { } // Start begins the hydration process in passive mode (Pipeline handles scheduling) -func (h *Hydrator) Start(ctx context.Context) error { +func (h *Hydrator) StartWithOptions(ctx context.Context) error { if h.isRunning.Load() { return fmt.Errorf("hydrator is already running") } @@ -170,12 +170,6 @@ func (h *Hydrator) Start(ctx context.Context) error { glog.V(3).Infof("Starting hydrator with %d steps (passive mode, Pipeline handles scheduling)", len(h.steps)) - go h.batchCompletionProcessor(ctx) - - if h.cacheManager != nil { - go h.databaseSyncMonitor(ctx) - } - return nil } diff --git a/internal/v2/appinfo/hydrationfn/step_interface.go b/internal/v2/appinfo/hydrationfn/step_interface.go index 49ef5a6..f247ce7 100644 --- a/internal/v2/appinfo/hydrationfn/step_interface.go +++ b/internal/v2/appinfo/hydrationfn/step_interface.go @@ -291,26 +291,3 @@ func (ht *HydrationTask) SetError(err error) { ht.RetryCount++ ht.UpdatedAt = time.Now() } - -// CanRetry returns true if the task can be retried -func (ht *HydrationTask) CanRetry() bool { - ht.mutex.RLock() - defer ht.mutex.RUnlock() - return ht.RetryCount < ht.MaxRetries -} - -// ResetForRetry resets the task for retry -func (ht *HydrationTask) ResetForRetry() { - ht.mutex.Lock() - defer ht.mutex.Unlock() - ht.Status = TaskStatusPending - ht.CurrentStep = 0 - ht.UpdatedAt = time.Now() -} - -// IsCompleted returns true if the task is completed -func (ht *HydrationTask) IsCompleted() bool { - ht.mutex.RLock() - defer ht.mutex.RUnlock() - return ht.Status == TaskStatusCompleted || ht.Status == TaskStatusFailed || ht.Status == TaskStatusCancelled -} diff --git a/internal/v2/appinfo/localrepo.go b/internal/v2/appinfo/localrepo.go index 8e5bfb1..c19e19e 100644 --- a/internal/v2/appinfo/localrepo.go +++ b/internal/v2/appinfo/localrepo.go @@ -369,7 +369,7 @@ func (lr *LocalRepo) DeleteApp(userID, appName, appVersion, sourceID string, tok } glog.V(3).Infof("App %s installation check passed: found=%v, completed=%v, taskType=%s, source=%s", appName, found, completed, taskType, source) } else { - glog.V(3).Infof("Task module not available, skipping installation status check for app: %s", appName) + glog.V(2).Infof("Task module not available, skipping installation status check for app: %s", appName) } // Get chart repo service host from environment variable diff --git a/internal/v2/appinfo/status_correction_check.go b/internal/v2/appinfo/status_correction_check.go index 5efb2f6..e6722e5 100644 --- a/internal/v2/appinfo/status_correction_check.go +++ b/internal/v2/appinfo/status_correction_check.go @@ -76,46 +76,15 @@ func NewStatusCorrectionChecker(cacheManager *CacheManager) *StatusCorrectionChe } } -// Start begins the periodic status checking -func (scc *StatusCorrectionChecker) Start() error { - if scc.isRunning { - return fmt.Errorf("status correction checker is already running") - } - - if scc.cacheManager == nil { - return fmt.Errorf("cache manager is required") - } - - scc.isRunning = true - scc.lastCheckTime = time.Time{} // Zero time indicates no checks yet - scc.checkCount = 0 - scc.correctionCount = 0 - scc.stopChan = make(chan struct{}) // Recreate stopChan for each start - - glog.Infof("Starting status correction checker with interval: %v", scc.checkInterval) - glog.Infof("App service endpoint: http://%s:%s/app-service/v1/all/apps", scc.appServiceHost, scc.appServicePort) - glog.Infof("Middleware service endpoint: http://%s:%s/app-service/v1/middlewares/status", scc.appServiceHost, scc.appServicePort) - - // Start the periodic checking goroutine - go scc.runPeriodicCheck() // not used - - return nil -} - // StartWithOptions starts with options -func (scc *StatusCorrectionChecker) StartWithOptions(enablePeriodicCheck bool) error { +func (scc *StatusCorrectionChecker) StartWithOptions() error { if scc.isRunning { return fmt.Errorf("status correction checker is already running") } scc.isRunning = true - if enablePeriodicCheck { - glog.Infof("Starting status correction checker with interval: %v", scc.checkInterval) - go scc.runPeriodicCheck() // not use - } else { - glog.Infof("Starting status correction checker in passive mode (serial pipeline handles processing)") - } + glog.Infof("Starting status correction checker in passive mode (serial pipeline handles processing)") return nil } @@ -163,27 +132,6 @@ func (scc *StatusCorrectionChecker) GetStats() map[string]interface{} { } } -// runPeriodicCheck runs the periodic status checking loop -func (scc *StatusCorrectionChecker) runPeriodicCheck() { - ticker := time.NewTicker(scc.checkInterval) - defer ticker.Stop() - - glog.Infof("Status correction checker periodic loop started") - - // Perform initial check immediately - scc.performStatusCheck() // not use - - for { - select { - case <-ticker.C: - scc.performStatusCheck() // not use - case <-scc.stopChan: - glog.Infof("Status correction checker periodic loop stopped") - return - } - } -} - // performStatusCheck performs a single status check cycle func (scc *StatusCorrectionChecker) performStatusCheck() map[string]bool { startTime := time.Now() @@ -1269,47 +1217,6 @@ func (scc *StatusCorrectionChecker) createStateDataFromAppStateData(appStateData return stateData } -// ForceCheck performs an immediate status check -func (scc *StatusCorrectionChecker) ForceCheck() error { - if !scc.IsRunning() { - return fmt.Errorf("status correction checker is not running") - } - - glog.Infof("Forcing immediate status check") - scc.performStatusCheck() // not used - return nil -} - -// SetCheckInterval sets the check interval -func (scc *StatusCorrectionChecker) SetCheckInterval(interval time.Duration) { - scc.checkInterval = interval - glog.Infof("Status correction check interval updated to: %v", interval) -} - -// isStateInconsistent checks if the app state is inconsistent with the entrance statuses -// Returns true if all entrances are running but the app state is not running -func (scc *StatusCorrectionChecker) isStateInconsistent(app utils.AppServiceResponse) bool { - // If app state is already running, no inconsistency - if app.Status.State == "running" { - return false - } - - // If no entrances, no inconsistency - if len(app.Status.EntranceStatuses) == 0 { - return false - } - - // Check if all entrances are running - for _, entrance := range app.Status.EntranceStatuses { - if entrance.State != "running" { - return false - } - } - - // All entrances are running but app state is not running - this is inconsistent - return true -} - // SetHistoryModule sets the history module for status correction checker func (scc *StatusCorrectionChecker) SetHistoryModule(hm *history.HistoryModule) { scc.historyModule = hm diff --git a/internal/v2/appinfo/syncer.go b/internal/v2/appinfo/syncer.go index 2cd62e5..3f10613 100644 --- a/internal/v2/appinfo/syncer.go +++ b/internal/v2/appinfo/syncer.go @@ -99,25 +99,17 @@ func (s *Syncer) GetSteps() []syncerfn.SyncStep { return steps } -// Start begins the synchronization process with its own sync loop -func (s *Syncer) Start(ctx context.Context) error { - return s.StartWithOptions(ctx, true) -} - // StartWithOptions starts the syncer with options. // If enableSyncLoop is false, the periodic sync loop is not started (Pipeline handles scheduling). -func (s *Syncer) StartWithOptions(ctx context.Context, enableSyncLoop bool) error { +func (s *Syncer) StartWithOptions(ctx context.Context) error { if s.isRunning.Load() { return fmt.Errorf("syncer is already running") } + s.isRunning.Store(true) - if enableSyncLoop { - glog.V(2).Infof("Starting syncer with %d steps, sync interval: %v", len(s.steps), s.syncInterval) - go s.syncLoop(ctx) // not use - } else { - glog.V(2).Infof("Starting syncer with %d steps (passive mode, Pipeline handles scheduling)", len(s.steps)) - } + glog.V(2).Infof("Starting syncer with %d steps (passive mode, Pipeline handles scheduling)", len(s.steps)) + return nil } @@ -273,39 +265,6 @@ func (s *Syncer) IsRunning() bool { return s.isRunning.Load() } -// syncLoop runs the main synchronization loop -func (s *Syncer) syncLoop(ctx context.Context) { - defer func() { - s.isRunning.Store(false) - glog.V(4).Info("Syncer stopped") - }() - - for { - select { - case <-ctx.Done(): - glog.V(4).Info("Context cancelled, stopping syncer") - return - case <-s.stopChan: - glog.V(4).Info("Stop signal received, stopping syncer") - return - default: - // Execute sync cycle - if err := s.executeSyncCycle(ctx); err != nil { // not use - glog.Errorf("Sync cycle failed: %v", err) - } - - // Wait for next cycle or stop signal - select { - case <-ctx.Done(): - return - case <-s.stopChan: - return - case <-time.After(s.syncInterval): - // Continue to next cycle - } - } - } -} // getVersionForSync returns the version to use for sync operations with fallback func getVersionForSync() string { @@ -360,6 +319,9 @@ func (s *Syncer) executeSyncCycle(ctx context.Context) error { return nil } + // clears all AppRenderFailed data for all users and sources + s.cacheManager.Load().ClearAppRenderFailedData() + glog.V(3).Infof("Found %d configured market sources (%d remote, %d local)", len(config.Sources), len(remoteSources), len(config.Sources)-len(remoteSources)) // Process all remote sources in priority order - each source is synced independently @@ -955,13 +917,6 @@ type SyncerConfig struct { SyncInterval time.Duration `json:"sync_interval"` } -// DefaultSyncerConfig returns a default configuration -func DefaultSyncerConfig() SyncerConfig { - return SyncerConfig{ - SyncInterval: 5 * time.Minute, - } -} - // SetCacheManager sets the cache manager for hydration notifications func (s *Syncer) SetCacheManager(cacheManager *CacheManager) { s.cacheManager.Store(cacheManager) @@ -1089,13 +1044,6 @@ func (s *Syncer) GetMetrics() SyncerMetrics { } } -// RecordSyncDetails records detailed information about a sync operation -func (s *Syncer) RecordSyncDetails(details *SyncDetails) { - if details != nil { - s.lastSyncDetails.Store(details) - } -} - // IsHealthy returns whether the syncer is healthy func (s *Syncer) IsHealthy() bool { if !s.isRunning.Load() { diff --git a/internal/v2/appinfo/syncerfn/detail_fetch_step.go b/internal/v2/appinfo/syncerfn/detail_fetch_step.go index d3b8425..bb5e3b8 100644 --- a/internal/v2/appinfo/syncerfn/detail_fetch_step.go +++ b/internal/v2/appinfo/syncerfn/detail_fetch_step.go @@ -799,44 +799,6 @@ func (d *DetailFetchStep) mergeAppData(originalAppData interface{}, appInfoMap m return detailedAppData } -// preserveFieldsForDelistedApp preserves fields from original data if detail API returns empty/null values -// This ensures installed delisted apps retain all their information -func (d *DetailFetchStep) preserveFieldsForDelistedApp(originalMap, detailMap map[string]interface{}, appID string) { - // List of fields to preserve if detail API returns empty/null - fieldsToPreserve := []string{ - "icon", "description", "title", "developer", "promoteImage", "promoteVideo", - "subCategory", "locale", "requiredMemory", "requiredDisk", "supportClient", - "requiredGPU", "requiredCPU", "rating", "target", "permission", "entrances", - "middleware", "options", "submitter", "doc", "website", "featuredImage", - "sourceCode", "license", "legal", "i18n", "modelSize", "namespace", - "onlyAdmin", "lastCommitHash", "createTime", "updateTime", "count", - "variants", "screenshots", "tags", "metadata", "updated_at", "versionHistory", - "fullDescription", "upgradeDescription", "cfgType", "chartName", "versionName", - } - - for _, field := range fieldsToPreserve { - detailValue := detailMap[field] - originalValue, hasOriginal := originalMap[field] - - // Check if detail API returned empty/null value - shouldPreserve := false - if detailValue == nil { - shouldPreserve = true - } else if strValue, ok := detailValue.(string); ok && strValue == "" { - shouldPreserve = true - } else if sliceValue, ok := detailValue.([]interface{}); ok && len(sliceValue) == 0 { - shouldPreserve = true - } else if mapValue, ok := detailValue.(map[string]interface{}); ok && len(mapValue) == 0 { - shouldPreserve = true - } - - // Preserve original value if detail API returned empty/null and original has value - if shouldPreserve && hasOriginal && originalValue != nil { - originalMap[field] = originalValue - } - } -} - // isAppInstalled determines whether the given app is currently installed for the active source. func (d *DetailFetchStep) isAppInstalled(appName, sourceID string, data *SyncContext) bool { if appName == "" || sourceID == "" || data == nil || data.CacheManager == nil { diff --git a/internal/v2/appinfo/syncerfn/hash_comparison_step.go b/internal/v2/appinfo/syncerfn/hash_comparison_step.go index 5b98268..8288ff5 100644 --- a/internal/v2/appinfo/syncerfn/hash_comparison_step.go +++ b/internal/v2/appinfo/syncerfn/hash_comparison_step.go @@ -7,7 +7,6 @@ import ( "strings" "market/internal/v2/settings" - "market/internal/v2/types" "github.com/golang/glog" ) @@ -118,57 +117,3 @@ func (h *HashComparisonStep) Execute(ctx context.Context, data *SyncContext) err func (h *HashComparisonStep) CanSkip(ctx context.Context, data *SyncContext) bool { return false // Always execute hash comparison } - -// calculateLocalHash computes hash from local SourceData Others.Hash for specific market source -func (h *HashComparisonStep) calculateLocalHash(cache *types.CacheData, marketSource *settings.MarketSource) string { - if cache == nil { - glog.V(3).Infof("Cache is nil, returning empty_cache hash") - return "empty_cache" - } - - if marketSource == nil { - glog.V(3).Infof("MarketSource is nil, returning no_market_source hash") - return "no_market_source" - } - - // Use market source name as source ID to match syncer.go behavior - sourceID := marketSource.ID - - // Note: This function is called from SyncContext with proper locking - - // If no users exist, return empty hash - if len(cache.Users) == 0 { - glog.V(3).Infof("No users in cache, returning empty hash") - return "empty_cache_no_users" - } - - // Look for Others.Hash only in the current market source - var sourceHash string - var foundValidHash bool - - for userID, userData := range cache.Users { - // Check if this user has data for the specific source - if sourceData, exists := userData.Sources[sourceID]; exists { - // Check if Others exists and has a Hash - if sourceData.Others != nil && sourceData.Others.Hash != "" { - sourceHash = sourceData.Others.Hash - foundValidHash = true - glog.V(3).Infof("Found Others.Hash for user:%s source:%s hash:%s", userID, sourceID, sourceHash) - break // Use the first valid hash found - } else { - glog.V(3).Infof("No valid Others.Hash for user:%s source:%s (Others: %v)", userID, sourceID, sourceData.Others) - } - } else { - glog.V(3).Infof("No data found for user:%s source:%s", userID, sourceID) - } - } - - // If no valid Others.Hash found for the specific source, return appropriate hash - if !foundValidHash { - glog.V(3).Infof("No valid Others.Hash found for source:%s, returning no_source_hash", sourceID) - return "no_source_hash" - } - - glog.V(3).Infof("Using Others.Hash from source:%s as local hash: %s", sourceID, sourceHash) - return sourceHash -} diff --git a/internal/v2/paymentnew/api.go b/internal/v2/paymentnew/api.go index abf126c..922bbec 100644 --- a/internal/v2/paymentnew/api.go +++ b/internal/v2/paymentnew/api.go @@ -1094,7 +1094,7 @@ func PreprocessAppPaymentData(ctx context.Context, appInfo *types.AppInfo, userI return nil, fmt.Errorf("state machine not initialized") } var state *PaymentState - if s, err := globalStateMachine.LoadState(userID, appID, productID); err == nil { + if s, err := globalStateMachine.LoadState(userID, appID, productID); err == nil { // + PreprocessAppPaymentData state = s } diff --git a/internal/v2/utils/image.go b/internal/v2/utils/image.go deleted file mode 100644 index 037de95..0000000 --- a/internal/v2/utils/image.go +++ /dev/null @@ -1,801 +0,0 @@ -package utils - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "os/exec" - "path/filepath" - "regexp" - "strconv" - "strings" - "time" - - "github.com/golang/glog" -) - -// DockerNodeInfo contains information about a specific node and its layers -type DockerNodeInfo struct { - NodeName string `json:"node_name"` - Architecture string `json:"architecture,omitempty"` - Variant string `json:"variant,omitempty"` - OS string `json:"os,omitempty"` - Layers []LayerInfo `json:"layers"` - TotalSize int64 `json:"total_size"` - LayerCount int `json:"layer_count"` -} - -// DockerImageInfo contains detailed information about a Docker image -type DockerImageInfo struct { - Name string `json:"name"` - Tag string `json:"tag"` - Architecture string `json:"architecture"` - Nodes []DockerNodeInfo `json:"nodes"` - Manifest *ImageManifest `json:"manifest,omitempty"` - Config *ImageConfig `json:"config,omitempty"` - TotalSize int64 `json:"total_size"` - CreatedAt time.Time `json:"created_at"` -} - -// LayerInfo contains information about a specific layer -type LayerInfo struct { - Digest string `json:"digest"` - Size int64 `json:"size"` - MediaType string `json:"media_type"` - Offset int64 `json:"offset,omitempty"` - Downloaded bool `json:"downloaded"` - Progress int `json:"progress"` // 0-100 - LocalPath string `json:"local_path,omitempty"` -} - -// ImageManifest represents Docker image manifest -type ImageManifest struct { - SchemaVersion int `json:"schemaVersion"` - MediaType string `json:"mediaType"` - Config ManifestConfig `json:"config"` - Layers []ManifestLayer `json:"layers"` -} - -// ManifestConfig represents the config section in manifest -type ManifestConfig struct { - MediaType string `json:"mediaType"` - Size int64 `json:"size"` - Digest string `json:"digest"` -} - -// ManifestLayer represents a layer in the manifest -type ManifestLayer struct { - MediaType string `json:"mediaType"` - Size int64 `json:"size"` - Digest string `json:"digest"` -} - -// ImageConfig represents Docker image configuration -type ImageConfig struct { - Architecture string `json:"architecture"` - OS string `json:"os"` - Config map[string]interface{} `json:"config"` - RootFS RootFS `json:"rootfs"` - History []HistoryEntry `json:"history"` - Created time.Time `json:"created"` -} - -// RootFS represents the root filesystem info -type RootFS struct { - Type string `json:"type"` - DiffIDs []string `json:"diff_ids"` -} - -// HistoryEntry represents a history entry in the image config -type HistoryEntry struct { - Created time.Time `json:"created"` - CreatedBy string `json:"created_by"` - EmptyLayer bool `json:"empty_layer,omitempty"` - Comment string `json:"comment,omitempty"` -} - -// ContainerRuntime represents the detected container runtime -type ContainerRuntime string - -const ( - RuntimeK8s ContainerRuntime = "k8s" - RuntimeK3s ContainerRuntime = "k3s" - RuntimeDocker ContainerRuntime = "docker" - RuntimeUnknown ContainerRuntime = "unknown" -) - -// ImageInfoResponse represents the response from the image info API -type ImageInfoResponse struct { - Images []ImageInfo `json:"images"` - Name string `json:"name"` -} - -// ImageInfo represents a single image info from the API -type ImageInfo struct { - Node string `json:"node"` - Name string `json:"name"` - Architecture string `json:"architecture"` - Variant string `json:"variant"` - OS string `json:"os"` - LayersData []LayerData `json:"layersData"` -} - -// LayerData represents layer information from the API -type LayerData struct { - MediaType string `json:"mediaType"` - Digest string `json:"digest"` - Offset int64 `json:"offset"` - Size int64 `json:"size"` -} - -// GetDockerImageInfo retrieves detailed information about a Docker image from registry -func GetDockerImageInfo(imageName, appName string) (*DockerImageInfo, error) { - glog.Infof("Getting Docker image info for: %s", imageName) - - // Check if it's development environment - if IsDevelopmentEnvironment() { - glog.Infof("Development environment detected, using direct registry access") - return getDockerImageInfoFromRegistry(imageName) - } - - // Production environment - use API - glog.Infof("Production environment detected, using API access") - return getDockerImageInfoFromAPI(imageName, appName) -} - -// getDockerImageInfoFromRegistry gets image info directly from registry -func getDockerImageInfoFromRegistry(imageName string) (*DockerImageInfo, error) { - // Parse image name and tag - name, tag := parseImageNameAndTag(imageName) - - // Get image manifest - manifest, err := getImageManifest(name, tag) - if err != nil { - glog.Errorf("Failed to get image manifest: %v", err) - return nil, fmt.Errorf("failed to get image manifest: %w", err) - } - - // Get image config - config, err := getImageConfig(name, manifest.Config.Digest) - if err != nil { - glog.Warningf("Failed to get image config: %v", err) - // Continue without config as it's not critical - } - - // Build layer information for a single node (registry access) - layers := make([]LayerInfo, len(manifest.Layers)) - var totalSize int64 - - for i, layer := range manifest.Layers { - layers[i] = LayerInfo{ - Digest: layer.Digest, - Size: layer.Size, - MediaType: layer.MediaType, - } - totalSize += layer.Size - } - - // Create a single node for registry access - nodeInfo := DockerNodeInfo{ - NodeName: "registry", - Layers: layers, - TotalSize: totalSize, - LayerCount: len(layers), - } - - if config != nil { - nodeInfo.Architecture = config.Architecture - } - - imageInfo := &DockerImageInfo{ - Name: name, - Tag: tag, - Nodes: []DockerNodeInfo{nodeInfo}, - Manifest: manifest, - Config: config, - TotalSize: totalSize, - CreatedAt: time.Now(), - } - - if config != nil { - imageInfo.Architecture = config.Architecture - imageInfo.CreatedAt = config.Created - } - - return imageInfo, nil -} - -// getDockerImageInfoFromAPI gets image info from the app service API -func getDockerImageInfoFromAPI(imageName, appName string) (*DockerImageInfo, error) { - // Get the complete API response with node information - apiResponse, err := GetImageInfoAPIResponse(imageName, appName) - if err != nil { - return nil, err - } - - // Convert API response to DockerImageInfo - // Use the first image info as base (they should all be for the same image) - imageInfo := apiResponse.Images[0] - - // Create nodes from API response - nodes := make([]DockerNodeInfo, 0, len(apiResponse.Images)) - var totalSize int64 - - for _, nodeImage := range apiResponse.Images { - // Build layers for this node - layers := make([]LayerInfo, 0, len(nodeImage.LayersData)) - var nodeTotalSize int64 - - for _, layer := range nodeImage.LayersData { - layerInfo := LayerInfo{ - Digest: layer.Digest, - Size: layer.Size, - MediaType: layer.MediaType, - Offset: layer.Offset, - } - layers = append(layers, layerInfo) - nodeTotalSize += layer.Size - } - - // Create node info - nodeInfo := DockerNodeInfo{ - NodeName: nodeImage.Node, - Architecture: nodeImage.Architecture, - Variant: nodeImage.Variant, - OS: nodeImage.OS, - Layers: layers, - TotalSize: nodeTotalSize, - LayerCount: len(layers), - } - - nodes = append(nodes, nodeInfo) - totalSize += nodeTotalSize - } - - return &DockerImageInfo{ - Name: imageInfo.Name, - Architecture: imageInfo.Architecture, - Nodes: nodes, - TotalSize: totalSize, - CreatedAt: time.Now(), - }, nil -} - -// GetImageInfoAPIResponse gets the complete API response with node information -func GetImageInfoAPIResponse(imageName, appName string) (*ImageInfoResponse, error) { - // Get app service host and port from environment - appServiceHost := os.Getenv("APP_SERVICE_SERVICE_HOST") - appServicePort := os.Getenv("APP_SERVICE_SERVICE_PORT") - - if appServiceHost == "" || appServicePort == "" { - return nil, fmt.Errorf("app service host or port not configured") - } - - // Build request URL - url := fmt.Sprintf("http://%s:%s/app-service/v1/apps/image-info", appServiceHost, appServicePort) - - // Prepare request body - requestBody := map[string]interface{}{ - "name": appName, - "images": []string{imageName}, - } - - jsonBody, err := json.Marshal(requestBody) - if err != nil { - return nil, fmt.Errorf("failed to marshal request body: %w", err) - } - - // Create HTTP request - req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(jsonBody)) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Set headers - req.Header.Set("Content-Type", "application/json") - - // Send request - client := &http.Client{Timeout: 60 * time.Second} - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to send request: %w", err) - } - defer resp.Body.Close() - - // Check response status - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("API returned status %d", resp.StatusCode) - } - - // Parse response - var apiResponse ImageInfoResponse - if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { - return nil, fmt.Errorf("failed to parse response: %w", err) - } - - // Check if we got any image info - if len(apiResponse.Images) == 0 { - return nil, fmt.Errorf("no image info found for %s", imageName) - } - - return &apiResponse, nil -} - -// GetLayerDownloadProgress checks the download progress of a specific layer locally -func GetLayerDownloadProgress(layerDigest string) (*LayerInfo, error) { - glog.Infof("Checking layer download progress for: %s", layerDigest) - - // Detect container runtime - runtime := detectContainerRuntime() - glog.Infof("Detected container runtime: %s", runtime) - - layerInfo := &LayerInfo{ - Digest: layerDigest, - } - - switch runtime { - case RuntimeK8s: - return getK8sLayerProgress(layerInfo) - case RuntimeK3s: - return getK3sLayerProgress(layerInfo) - case RuntimeDocker: - return getDockerLayerProgress(layerInfo) - default: - return getGenericLayerProgress(layerInfo) - } -} - -// parseImageNameAndTag splits image name into name and tag components -func parseImageNameAndTag(imageName string) (string, string) { - if strings.Contains(imageName, ":") { - parts := strings.SplitN(imageName, ":", 2) - return parts[0], parts[1] - } - return imageName, "latest" -} - -// getImageManifest retrieves the image manifest from Docker registry -func getImageManifest(imageName, tag string) (*ImageManifest, error) { - // Handle different registry formats - registryURL := buildRegistryURL(imageName, tag) - - client := &http.Client{Timeout: 30 * time.Second} - req, err := http.NewRequest("GET", registryURL+"/manifests/"+tag, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Set appropriate headers for Docker Registry API - req.Header.Set("Accept", "application/vnd.docker.distribution.manifest.v2+json") - req.Header.Set("Accept", "application/vnd.oci.image.manifest.v1+json") - - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to fetch manifest: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("registry returned status %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) - } - - var manifest ImageManifest - if err := json.Unmarshal(body, &manifest); err != nil { - return nil, fmt.Errorf("failed to parse manifest: %w", err) - } - - return &manifest, nil -} - -// getImageConfig retrieves the image configuration -func getImageConfig(imageName, configDigest string) (*ImageConfig, error) { - registryURL := buildRegistryURL(imageName, "") - - client := &http.Client{Timeout: 30 * time.Second} - req, err := http.NewRequest("GET", registryURL+"/blobs/"+configDigest, nil) - if err != nil { - return nil, fmt.Errorf("failed to create config request: %w", err) - } - - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to fetch config: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("config request returned status %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed to read config response: %w", err) - } - - var config ImageConfig - if err := json.Unmarshal(body, &config); err != nil { - return nil, fmt.Errorf("failed to parse config: %w", err) - } - - return &config, nil -} - -// buildRegistryURL constructs the appropriate registry URL for the image -func buildRegistryURL(imageName, tag string) string { - // Handle Docker Hub images - if !strings.Contains(imageName, "/") || (!strings.Contains(imageName, ".") && strings.Count(imageName, "/") == 1) { - if !strings.Contains(imageName, "/") { - imageName = "library/" + imageName - } - return "https://registry-1.docker.io/v2/" + imageName - } - - // Handle custom registry - parts := strings.SplitN(imageName, "/", 2) - if len(parts) == 2 { - registry := parts[0] - repo := parts[1] - return "https://" + registry + "/v2/" + repo - } - - return "https://registry-1.docker.io/v2/" + imageName -} - -// detectContainerRuntime attempts to detect the local container runtime -func detectContainerRuntime() ContainerRuntime { - // Check for k3s - if isK3sRunning() { - return RuntimeK3s - } - - // Check for k8s (containerd/cri-o) - if isK8sRunning() { - return RuntimeK8s - } - - // Check for Docker - if isDockerRunning() { - return RuntimeDocker - } - - return RuntimeUnknown -} - -// isK3sRunning checks if k3s is running on the system -func isK3sRunning() bool { - // Check if k3s binary exists and is running - if _, err := exec.LookPath("k3s"); err == nil { - cmd := exec.Command("pgrep", "-f", "k3s") - return cmd.Run() == nil - } - - // Check for k3s-specific directories - k3sPaths := []string{ - "/var/lib/rancher/k3s", - "/etc/rancher/k3s", - } - - for _, path := range k3sPaths { - if _, err := os.Stat(path); err == nil { - return true - } - } - - return false -} - -// isK8sRunning checks if standard k8s (kubelet/containerd) is running -func isK8sRunning() bool { - // Check if kubelet is running - if cmd := exec.Command("pgrep", "-f", "kubelet"); cmd.Run() == nil { - return true - } - - // Check if containerd is running - if cmd := exec.Command("pgrep", "-f", "containerd"); cmd.Run() == nil { - return true - } - - // Check for k8s-specific directories - k8sPaths := []string{ - "/var/lib/kubelet", - "/etc/kubernetes", - "/var/lib/containerd", - } - - for _, path := range k8sPaths { - if _, err := os.Stat(path); err == nil { - return true - } - } - - return false -} - -// isDockerRunning checks if Docker daemon is running -func isDockerRunning() bool { - if _, err := exec.LookPath("docker"); err == nil { - cmd := exec.Command("docker", "version") - return cmd.Run() == nil - } - return false -} - -// getK8sLayerProgress checks layer progress in k8s environment -func getK8sLayerProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - // Try containerd first - if progress, err := getContainerdLayerProgress(layerInfo.Digest); err == nil { - layerInfo.Downloaded = progress.Downloaded - layerInfo.Progress = progress.Progress - layerInfo.LocalPath = progress.LocalPath - return layerInfo, nil - } - - // Fallback to checking filesystem directly - return getContainerdFilesystemProgress(layerInfo) -} - -// getK3sLayerProgress checks layer progress in k3s environment -func getK3sLayerProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - // k3s uses containerd, but in a different location - k3sContainerdRoot := "/var/lib/rancher/k3s/agent/containerd" - - // Check if layer exists in k3s containerd - if progress, err := getK3sContainerdLayerProgress(layerInfo.Digest, k3sContainerdRoot); err == nil { - layerInfo.Downloaded = progress.Downloaded - layerInfo.Progress = progress.Progress - layerInfo.LocalPath = progress.LocalPath - return layerInfo, nil - } - - // Fallback to direct filesystem check - return getK3sFilesystemProgress(layerInfo, k3sContainerdRoot) -} - -// getDockerLayerProgress checks layer progress in Docker environment -func getDockerLayerProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - // Use docker inspect to check layer status - cmd := exec.Command("docker", "system", "df", "-v") - output, err := cmd.Output() - if err != nil { - glog.Warningf("Failed to run docker system df: %v", err) - return getDockerFilesystemProgress(layerInfo) - } - - // Parse docker system df output to find layer info - if progress := parseDockerSystemDF(string(output), layerInfo.Digest); progress != nil { - layerInfo.Downloaded = progress.Downloaded - layerInfo.Progress = progress.Progress - layerInfo.LocalPath = progress.LocalPath - return layerInfo, nil - } - - return getDockerFilesystemProgress(layerInfo) -} - -// getGenericLayerProgress provides a generic fallback for unknown runtimes -func getGenericLayerProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - // Try common container storage locations - commonPaths := []string{ - "/var/lib/docker", - "/var/lib/containerd", - "/var/lib/rancher/k3s", - } - - for _, basePath := range commonPaths { - if progress, err := checkLayerInPath(layerInfo.Digest, basePath); err == nil { - layerInfo.Downloaded = progress.Downloaded - layerInfo.Progress = progress.Progress - layerInfo.LocalPath = progress.LocalPath - return layerInfo, nil - } - } - - // No layer found - layerInfo.Downloaded = false - layerInfo.Progress = 0 - return layerInfo, nil -} - -// Helper function to get containerd layer progress -func getContainerdLayerProgress(digest string) (*LayerInfo, error) { - cmd := exec.Command("ctr", "content", "ls") - output, err := cmd.Output() - if err != nil { - return nil, fmt.Errorf("failed to run ctr content ls: %w", err) - } - - return parseContainerdContentList(string(output), digest) -} - -// Helper function for k3s containerd -func getK3sContainerdLayerProgress(digest, containerdRoot string) (*LayerInfo, error) { - cmd := exec.Command("k3s", "ctr", "content", "ls") - output, err := cmd.Output() - if err != nil { - return nil, fmt.Errorf("failed to run k3s ctr content ls: %w", err) - } - - return parseContainerdContentList(string(output), digest) -} - -// Parse containerd content list output -func parseContainerdContentList(output, digest string) (*LayerInfo, error) { - lines := strings.Split(output, "\n") - for _, line := range lines { - if strings.Contains(line, digest) { - fields := strings.Fields(line) - if len(fields) >= 3 { - layer := &LayerInfo{ - Digest: digest, - Downloaded: true, - Progress: 100, - } - - if size, err := strconv.ParseInt(fields[1], 10, 64); err == nil { - layer.Size = size - } - - return layer, nil - } - } - } - - return &LayerInfo{ - Digest: digest, - Downloaded: false, - Progress: 0, - }, nil -} - -// Filesystem-based progress checks -func getContainerdFilesystemProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - return checkLayerInPath(layerInfo.Digest, "/var/lib/containerd") -} - -func getK3sFilesystemProgress(layerInfo *LayerInfo, containerdRoot string) (*LayerInfo, error) { - return checkLayerInPath(layerInfo.Digest, containerdRoot) -} - -func getDockerFilesystemProgress(layerInfo *LayerInfo) (*LayerInfo, error) { - return checkLayerInPath(layerInfo.Digest, "/var/lib/docker") -} - -// checkLayerInPath searches for a layer in the specified base path -func checkLayerInPath(digest, basePath string) (*LayerInfo, error) { - // Remove sha256: prefix if present - cleanDigest := strings.TrimPrefix(digest, "sha256:") - - // Common patterns for layer storage - patterns := []string{ - filepath.Join(basePath, "**", cleanDigest), - filepath.Join(basePath, "**", cleanDigest[:12]), // Short digest - filepath.Join(basePath, "overlay2", cleanDigest, "**"), - filepath.Join(basePath, "image", "overlay2", "layerdb", "sha256", cleanDigest), - } - - for _, pattern := range patterns { - if matches, err := filepath.Glob(pattern); err == nil && len(matches) > 0 { - // Found layer, check if it's complete - for _, match := range matches { - if info, err := os.Stat(match); err == nil { - return &LayerInfo{ - Digest: digest, - Downloaded: true, - Progress: 100, - Size: info.Size(), - LocalPath: match, - }, nil - } - } - } - } - - return &LayerInfo{ - Digest: digest, - Downloaded: false, - Progress: 0, - }, nil -} - -// parseDockerSystemDF parses docker system df output to find layer information -func parseDockerSystemDF(output, digest string) *LayerInfo { - lines := strings.Split(output, "\n") - cleanDigest := strings.TrimPrefix(digest, "sha256:") - - for _, line := range lines { - if strings.Contains(line, cleanDigest) || strings.Contains(line, cleanDigest[:12]) { - // Parse the line to extract size and status information - fields := strings.Fields(line) - if len(fields) >= 2 { - layer := &LayerInfo{ - Digest: digest, - Downloaded: true, - Progress: 100, - } - - // Try to parse size from the output - for _, field := range fields { - if matched, _ := regexp.MatchString(`^\d+(\.\d+)?[KMGT]?B$`, field); matched { - if size := parseHumanReadableSize(field); size > 0 { - layer.Size = size - break - } - } - } - - return layer - } - } - } - - return nil -} - -// parseHumanReadableSize converts human-readable size strings to bytes -func parseHumanReadableSize(sizeStr string) int64 { - re := regexp.MustCompile(`^(\d+(?:\.\d+)?)\s*([KMGT]?)B?$`) - matches := re.FindStringSubmatch(strings.ToUpper(sizeStr)) - - if len(matches) != 3 { - return 0 - } - - size, err := strconv.ParseFloat(matches[1], 64) - if err != nil { - return 0 - } - - unit := matches[2] - switch unit { - case "K": - size *= 1024 - case "M": - size *= 1024 * 1024 - case "G": - size *= 1024 * 1024 * 1024 - case "T": - size *= 1024 * 1024 * 1024 * 1024 - } - - return int64(size) -} - -// GetLayerDownloadProgressByOffset calculates download progress based on offset and size -// This is used in production environment where we have offset information from API -func GetLayerDownloadProgressByOffset(digest string, offset, size int64) (*LayerInfo, error) { - glog.Infof("Calculating layer download progress by offset for: %s (offset: %d, size: %d)", digest, offset, size) - - layerInfo := &LayerInfo{ - Digest: digest, - Size: size, - Offset: offset, - } - - // Calculate progress based on offset - if size > 0 { - // Progress is calculated as (offset / size) * 100 - progress := int((float64(offset) / float64(size)) * 100) - if progress > 100 { - progress = 100 // Cap at 100% - } - layerInfo.Progress = progress - layerInfo.Downloaded = progress >= 100 - } else { - layerInfo.Progress = 0 - layerInfo.Downloaded = false - } - - glog.Infof("Layer %s: offset=%d, size=%d, progress=%d%%, downloaded=%v", - digest, offset, size, layerInfo.Progress, layerInfo.Downloaded) - - return layerInfo, nil -} diff --git a/internal/v2/utils/installhistory.go b/internal/v2/utils/installhistory.go index a14d2de..4905078 100644 --- a/internal/v2/utils/installhistory.go +++ b/internal/v2/utils/installhistory.go @@ -26,14 +26,6 @@ func SetCacheVersionGetter(getter CacheVersionGetter) { cacheVersionGetter = getter } -// getVersionFromCacheState gets app version from cache state if available -func getVersionFromCacheState(userID, sourceID, appName string) (version string, found bool) { - if cacheVersionGetter != nil { - return cacheVersionGetter.GetAppVersionFromState(userID, sourceID, appName) - } - return "", false -} - // GetAppInfoFromDownloadRecord fetches app version and source from chart-repo service func GetAppInfoFromDownloadRecord(userID, appName string) (string, string, error) { diff --git a/internal/v2/utils/state_monitor.go b/internal/v2/utils/state_monitor.go index 2bd2c82..6d93f3f 100644 --- a/internal/v2/utils/state_monitor.go +++ b/internal/v2/utils/state_monitor.go @@ -11,7 +11,6 @@ import ( // DataSenderInterface defines the interface for sending app info updates type DataSenderInterface interface { SendAppInfoUpdate(update types.AppInfoUpdate, trace string) error - IsConnected() bool Close() } @@ -199,11 +198,3 @@ func (sm *StateMonitor) Close() { sm.dataSender.Close() } } - -// IsConnected checks if the data sender is connected -func (sm *StateMonitor) IsConnected() bool { - if sm.dataSender == nil { - return false - } - return sm.dataSender.IsConnected() -} diff --git a/internal/v2/utils/systeminfo.go b/internal/v2/utils/systeminfo.go index d3ac5fd..6aba0f6 100644 --- a/internal/v2/utils/systeminfo.go +++ b/internal/v2/utils/systeminfo.go @@ -17,18 +17,6 @@ type VersionInfo struct { Version string `json:"version"` } -// GetTerminusVersion retrieves the Terminus version with environment-aware logic -func GetTerminusVersion() (string, error) { - // Check if running in development environment - if IsDevelopmentEnvironment() { - glog.Infof("Running in development environment, returning fixed version: 1.12.3") - return "1.12.3", nil - } - - // For production environment, try to get version from service - return getTerminusVersionFromService() -} - // GetTerminusVersionValue returns the parsed version string func GetTerminusVersionValue() (string, error) { // Check public environment first