Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 5 additions & 212 deletions internal/v2/appinfo/appinfomodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (m *AppInfoModule) initSyncer() error {
}

// Start syncer in passive mode (Pipeline handles scheduling)
if err := m.syncer.StartWithOptions(m.ctx, false); err != nil {
if err := m.syncer.StartWithOptions(m.ctx); err != nil {
return fmt.Errorf("failed to start syncer: %w", err)
}

Expand Down Expand Up @@ -550,7 +550,7 @@ func (m *AppInfoModule) initHydrator() error {
m.hydrator = NewHydrator(cacheData, m.settingsManager, m.cacheManager, hydratorConfig)

// Start hydrator with context
if err := m.hydrator.Start(m.ctx); err != nil {
if err := m.hydrator.StartWithOptions(m.ctx); err != nil {
return fmt.Errorf("failed to start hydrator: %w", err)
}

Expand Down Expand Up @@ -583,7 +583,7 @@ func (m *AppInfoModule) initDataWatcher() error {
// Create DataWatcher instance
m.dataWatcher = NewDataWatcher(m.cacheManager, m.hydrator, m.dataSender)

if err := m.dataWatcher.StartWithOptions(m.ctx, false); err != nil {
if err := m.dataWatcher.StartWithOptions(m.ctx); err != nil {
return fmt.Errorf("failed to start DataWatcher: %w", err)
}

Expand Down Expand Up @@ -646,7 +646,7 @@ func (m *AppInfoModule) initDataWatcherRepo() error {
// Create DataWatcherRepo instance
m.dataWatcherRepo = NewDataWatcherRepo(m.redisClient, m.cacheManager, m.dataWatcher, m.dataSender)

if err := m.dataWatcherRepo.StartWithOptions(false); err != nil {
if err := m.dataWatcherRepo.StartWithOptions(); err != nil {
return fmt.Errorf("failed to start DataWatcherRepo: %w", err)
}

Expand All @@ -664,7 +664,7 @@ func (m *AppInfoModule) initStatusCorrectionChecker() error {

m.statusCorrectionChecker = NewStatusCorrectionChecker(m.cacheManager)

if err := m.statusCorrectionChecker.StartWithOptions(false); err != nil {
if err := m.statusCorrectionChecker.StartWithOptions(); err != nil {
return fmt.Errorf("failed to start StatusCorrectionChecker: %w", err)
}

Expand Down Expand Up @@ -1082,16 +1082,6 @@ func DefaultModuleConfig() *ModuleConfig {
}
}

// GetDockerImageInfo is a convenience function to get Docker image information
func (m *AppInfoModule) GetDockerImageInfo(imageName, appName string) (*utils.DockerImageInfo, error) {
return utils.GetDockerImageInfo(imageName, appName)
}

// GetLayerDownloadProgress is a convenience function to get layer download progress
func (m *AppInfoModule) GetLayerDownloadProgress(layerDigest string) (*utils.LayerInfo, error) {
return utils.GetLayerDownloadProgress(layerDigest)
}

// SetAppData is a convenience function to set app data
func (m *AppInfoModule) SetAppData(userID, sourceID string, dataType AppDataType, data map[string]interface{}) error {
if !m.isStarted || m.cacheManager == nil {
Expand Down Expand Up @@ -1255,197 +1245,6 @@ func (m *AppInfoModule) SyncUserListToCache() error {
return m.cacheManager.SyncUserListToCache()
}

// RefreshUserDataStructures ensures all configured users have proper data structures
// not used
func (m *AppInfoModule) RefreshUserDataStructures() error {
// Check isStarted without lock since it's only read
if !m.isStarted {
return fmt.Errorf("module is not started")
}

glog.V(3).Info("Refreshing user data structures")

// First sync user list to cache
if err := m.SyncUserListToCache(); err != nil {
return fmt.Errorf("failed to sync user list to cache: %w", err)
}

// Force sync to Redis to ensure persistence
// Cache manager pointer access is atomic
if m.cacheManager != nil {
if err := m.cacheManager.ForceSync(); err != nil {
glog.Errorf("Failed to force sync after refreshing user data structures: %v", err)
}
}

glog.V(3).Info("User data structures refreshed successfully")
return nil
}

// GetConfiguredUsers returns the list of configured users
func (m *AppInfoModule) GetConfiguredUsers() []string {
// Config is read-only after initialization, no need for read lock
if m.config.User == nil {
return []string{}
}

// Return a copy to prevent external modification
users := make([]string, len(m.config.User.UserList))
copy(users, m.config.User.UserList)
return users
}

// GetCachedUsers returns the list of users currently in cache
func (m *AppInfoModule) GetCachedUsers() []string {
// Check isStarted and cache manager without lock since they're only read
if !m.isStarted || m.cacheManager == nil {
return []string{}
}

allUsersData := m.cacheManager.GetAllUsersData() // not used
users := make([]string, 0, len(allUsersData))
for userID := range allUsersData {
users = append(users, userID)
}

return users
}

// CleanupInvalidData cleans up invalid pending data entries from cache
func (m *AppInfoModule) CleanupInvalidData() (int, error) {
if m.cacheManager == nil {
return 0, fmt.Errorf("cache manager not available")
}

cleanedCount := m.cacheManager.CleanupInvalidPendingData()
glog.V(3).Infof("Cleaned up %d invalid pending data entries", cleanedCount)

return cleanedCount, nil
}

// GetInvalidDataReport returns a detailed report of invalid pending data entries
func (m *AppInfoModule) GetInvalidDataReport() map[string]interface{} {
if m.cacheManager == nil {
return map[string]interface{}{
"error": "cache manager not available",
}
}

report := map[string]interface{}{
"users": make(map[string]interface{}),
"totals": map[string]int{
"total_users": 0,
"total_sources": 0,
"total_pending_data": 0,
"total_invalid_data": 0,
},
}

allUsersForReport := m.cacheManager.GetAllUsersData() // not used

totalUsers := 0
totalSources := 0
totalPendingData := 0
totalInvalidData := 0

for userID, userData := range allUsersForReport {
totalUsers++
userReport := map[string]interface{}{
"sources": make(map[string]interface{}),
"totals": map[string]int{
"total_sources": 0,
"total_pending_data": 0,
"total_invalid_data": 0,
},
}

for sourceID, sourceData := range userData.Sources {
totalSources++

sourceReport := map[string]interface{}{
"total_pending_data": len(sourceData.AppInfoLatestPending),
"invalid_entries": make([]map[string]interface{}, 0),
}

invalidCount := 0
for i, pendingData := range sourceData.AppInfoLatestPending {
isValid := false
invalidReasons := make([]string, 0)

if pendingData.RawData != nil {
if (pendingData.RawData.ID != "" && pendingData.RawData.ID != "0") ||
(pendingData.RawData.AppID != "" && pendingData.RawData.AppID != "0") ||
(pendingData.RawData.Name != "" && pendingData.RawData.Name != "unknown") {
isValid = true
} else {
if pendingData.RawData.ID == "" || pendingData.RawData.ID == "0" {
invalidReasons = append(invalidReasons, "empty or zero ID")
}
if pendingData.RawData.AppID == "" || pendingData.RawData.AppID == "0" {
invalidReasons = append(invalidReasons, "empty or zero AppID")
}
if pendingData.RawData.Name == "" || pendingData.RawData.Name == "unknown" {
invalidReasons = append(invalidReasons, "empty or unknown Name")
}
}
} else {
invalidReasons = append(invalidReasons, "null RawData")
}

if pendingData.AppInfo != nil && pendingData.AppInfo.AppEntry != nil && !isValid {
if (pendingData.AppInfo.AppEntry.ID != "" && pendingData.AppInfo.AppEntry.ID != "0") ||
(pendingData.AppInfo.AppEntry.AppID != "" && pendingData.AppInfo.AppEntry.AppID != "0") ||
(pendingData.AppInfo.AppEntry.Name != "" && pendingData.AppInfo.AppEntry.Name != "unknown") {
isValid = true
invalidReasons = make([]string, 0) // Clear reasons if AppInfo is valid
}
}

if !isValid {
invalidCount++
invalidEntry := map[string]interface{}{
"index": i,
"reasons": invalidReasons,
"data": map[string]interface{}{
"timestamp": pendingData.Timestamp,
"version": pendingData.Version,
},
}

if pendingData.RawData != nil {
invalidEntry["raw_data"] = map[string]interface{}{
"id": pendingData.RawData.ID,
"app_id": pendingData.RawData.AppID,
"name": pendingData.RawData.Name,
"title": pendingData.RawData.Title,
}
}

sourceReport["invalid_entries"] = append(sourceReport["invalid_entries"].([]map[string]interface{}), invalidEntry)
}
}

sourceReport["invalid_count"] = invalidCount
totalPendingData += len(sourceData.AppInfoLatestPending)
totalInvalidData += invalidCount

userReport["sources"].(map[string]interface{})[sourceID] = sourceReport
userReport["totals"].(map[string]int)["total_sources"]++
userReport["totals"].(map[string]int)["total_pending_data"] += len(sourceData.AppInfoLatestPending)
userReport["totals"].(map[string]int)["total_invalid_data"] += invalidCount
}

report["users"].(map[string]interface{})[userID] = userReport
}

report["totals"].(map[string]int)["total_users"] = totalUsers
report["totals"].(map[string]int)["total_sources"] = totalSources
report["totals"].(map[string]int)["total_pending_data"] = totalPendingData
report["totals"].(map[string]int)["total_invalid_data"] = totalInvalidData

return report
}

// SetTaskModule sets the task module for recording task events
func (m *AppInfoModule) SetTaskModule(taskModule *task.TaskModule) {
m.taskModule = taskModule
Expand Down Expand Up @@ -1492,9 +1291,3 @@ func (m *AppInfoModule) SetSettingsManager(settingsManager *settings.SettingsMan
m.settingsManager = settingsManager
glog.V(2).Info("Settings manager set in AppInfo module")
}

// GetSettingsManager returns the settings manager instance
func (m *AppInfoModule) GetSettingsManager() *settings.SettingsManager {
// Pointer assignment is atomic, no need for read lock
return m.settingsManager
}
37 changes: 6 additions & 31 deletions internal/v2/appinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,6 @@ func (cm *CacheManager) Start() error {
// Start sync worker goroutine
go cm.syncWorker()

// Start periodic cleanup of AppRenderFailed data (every 5 minutes)
cm.cleanupTicker = time.NewTicker(5 * time.Minute)
go cm.cleanupWorker() // +

glog.V(3).Infof("Cache manager started successfully")
return nil
}
Expand Down Expand Up @@ -853,6 +849,7 @@ func (cm *CacheManager) processSyncRequest(req SyncRequest) {
return
}

glog.Infof("[CACHE] SyncRequest, user: %s, source: %s, type: %d", req.UserID, req.SourceID, req.Type)
switch req.Type {
case SyncUser:
if userData := cm.getUserData(req.UserID); userData != nil {
Expand Down Expand Up @@ -926,6 +923,8 @@ func (cm *CacheManager) GetAppVersionFromState(userID, sourceID, appName string)

// getSourceData internal method to get source data without external locking
func (cm *CacheManager) getSourceData(userID, sourceID string) *SourceData {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
if userData, exists := cm.cache.Users[userID]; exists {
return userData.Sources[sourceID]
}
Expand Down Expand Up @@ -1523,7 +1522,7 @@ func (cm *CacheManager) setAppDataInternal(userID, sourceID string, dataType App
glog.V(3).Infof("Added app %s for user=%s, source=%s", appID, userID, sourceID)
}
}
} else {
} else { // syncer
if dataSection, hasData := data["data"].(map[string]interface{}); hasData {
if appsData, hasApps := dataSection["apps"].(map[string]interface{}); hasApps {
for appID, appDataInterface := range appsData {
Expand Down Expand Up @@ -1792,16 +1791,6 @@ func (cm *CacheManager) addUserInternal(userID string) error {
return nil
}

// AddUser adds a new user to the cache
func (cm *CacheManager) AddUser(userID string) error {
go func() {
if err := cm.addUserInternal(userID); err != nil {
glog.Errorf("Failed to add user in goroutine: %v", err)
}
}()
return nil
}

// GetCacheStats returns cache statistics using single global lock
func (cm *CacheManager) GetCacheStats() map[string]interface{} {
cm.mutex.RLock()
Expand Down Expand Up @@ -2331,6 +2320,7 @@ func (cm *CacheManager) updateLockStats(lockType string) {

// RemoveAppStateData removes a specific app from AppStateLatest for a user and source
func (cm *CacheManager) removeAppStateDataInternal(userID, sourceID, appName string) error {
glog.Infof("[CACHE], remove appStateData, user: %s, source: %s, app: %s", userID, sourceID, appName)
cm.mutex.Lock()
_wd := cm.startLockWatchdog("@RemoveAppStateData")
defer func() { cm.mutex.Unlock(); _wd() }()
Expand Down Expand Up @@ -2384,6 +2374,7 @@ func (cm *CacheManager) RemoveAppStateData(userID, sourceID, appName string) err

// RemoveAppInfoLatestData removes a specific app from AppInfoLatest for a user and source
func (cm *CacheManager) removeAppInfoLatestDataInternal(userID, sourceID, appName string) error {
glog.Infof("[CACHE], remove appInfoLatestData, user: %s, source: %s, app: %s", userID, sourceID, appName)
cm.mutex.Lock()
_wd := cm.startLockWatchdog("@RemoveAppInfoLatestData")
defer func() { cm.mutex.Unlock(); _wd() }()
Expand Down Expand Up @@ -2584,22 +2575,6 @@ func (cm *CacheManager) ResynceUser() error {
return nil
}

// cleanupWorker processes periodic cleanup of AppRenderFailed data
func (cm *CacheManager) cleanupWorker() {
glog.V(3).Info("INFO: Starting AppRenderFailed cleanup worker")

for range cm.cleanupTicker.C {
if !cm.isRunning {
glog.V(3).Info("INFO: CacheManager stopped, cleanup worker exiting")
break
}

cm.ClearAppRenderFailedData()
}

glog.V(3).Info("INFO: AppRenderFailed cleanup worker stopped")
}

// ClearAppRenderFailedData clears all AppRenderFailed data for all users and sources
func (cm *CacheManager) ClearAppRenderFailedData() {
glog.Info("INFO: [Cleanup] Starting periodic cleanup of AppRenderFailed data")
Expand Down
8 changes: 0 additions & 8 deletions internal/v2/appinfo/datasender_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,3 @@ func (ds *DataSender) Close() {
glog.V(3).Info("NATS connection closed")
}
}

// IsConnected checks if NATS connection is active
func (ds *DataSender) IsConnected() bool {
if !ds.enabled || ds.conn == nil {
return false
}
return ds.conn.IsConnected()
}
Loading
Loading