Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions internal/v2/appinfo/appinfomodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"market/internal/v2/history"
Expand All @@ -36,7 +35,6 @@ type AppInfoModule struct {
settingsManager *settings.SettingsManager
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
isStarted bool
taskModule *task.TaskModule
historyModule *history.HistoryModule
Expand Down Expand Up @@ -142,9 +140,6 @@ func NewAppInfoModule(config *ModuleConfig) (*AppInfoModule, error) {

// Start initializes and starts all module components
func (m *AppInfoModule) Start() error {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.isStarted {
return fmt.Errorf("module is already started")
}
Expand Down Expand Up @@ -256,9 +251,6 @@ func (m *AppInfoModule) Start() error {

// Stop gracefully shuts down the module
func (m *AppInfoModule) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()

if !m.isStarted {
return nil
}
Expand Down Expand Up @@ -402,16 +394,11 @@ func (m *AppInfoModule) GetRedisConfig() *RedisConfig {

// IsStarted returns whether the module is currently running
func (m *AppInfoModule) IsStarted() bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.isStarted
}

// GetModuleStatus returns the current status of the module and all components
func (m *AppInfoModule) GetModuleStatus() map[string]interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()

status := map[string]interface{}{
"is_started": m.isStarted,
"enable_sync": m.config.EnableSync,
Expand Down Expand Up @@ -1229,9 +1216,6 @@ func (m *AppInfoModule) GetUserPermissions(userID string) []string {

// UpdateUserConfig updates the user configuration for the module and cache manager
func (m *AppInfoModule) UpdateUserConfig(newUserConfig *UserConfig) error {
m.mutex.Lock()
defer m.mutex.Unlock()

if !m.isStarted {
return fmt.Errorf("module is not started")
}
Expand Down Expand Up @@ -1464,8 +1448,6 @@ func (m *AppInfoModule) GetInvalidDataReport() map[string]interface{} {

// SetTaskModule sets the task module for recording task events
func (m *AppInfoModule) SetTaskModule(taskModule *task.TaskModule) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.taskModule = taskModule
glog.V(3).Info("Task module reference set in AppInfo module")

Expand All @@ -1481,8 +1463,6 @@ func (m *AppInfoModule) SetTaskModule(taskModule *task.TaskModule) {

// SetHistoryModule sets the history module reference
func (m *AppInfoModule) SetHistoryModule(historyModule *history.HistoryModule) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.historyModule = historyModule
glog.V(3).Info("History module reference set in AppInfo module")

Expand All @@ -1509,8 +1489,6 @@ func (m *AppInfoModule) GetTaskModule() *task.TaskModule {

// SetSettingsManager sets the settings manager for the module
func (m *AppInfoModule) SetSettingsManager(settingsManager *settings.SettingsManager) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.settingsManager = settingsManager
glog.V(2).Info("Settings manager set in AppInfo module")
}
Expand Down
113 changes: 7 additions & 106 deletions internal/v2/appinfo/datawatcher_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ type DataWatcher struct {
isRunning int32 // 0 = false, 1 = true, using atomic operations
stopChan chan struct{}

// Processing mutex to ensure only one cycle runs at a time
processingMutex sync.Mutex

// Active hash calculations tracking
activeHashCalculations map[string]bool
hashMutex sync.Mutex

// Dirty users tracking for deferred hash calculation
dirtyUsers map[string]bool
dirtyUsersMutex sync.Mutex
Expand All @@ -43,14 +36,13 @@ type DataWatcher struct {
// NewDataWatcher creates a new DataWatcher instance
func NewDataWatcher(cacheManager *CacheManager, hydrator *Hydrator, dataSender *DataSender) *DataWatcher {
return &DataWatcher{
cacheManager: cacheManager,
hydrator: hydrator,
dataSender: dataSender,
interval: 30 * time.Second, // Run every 30 seconds
stopChan: make(chan struct{}),
isRunning: 0, // Initialize as false
activeHashCalculations: make(map[string]bool),
dirtyUsers: make(map[string]bool),
cacheManager: cacheManager,
hydrator: hydrator,
dataSender: dataSender,
interval: 30 * time.Second, // Run every 30 seconds
stopChan: make(chan struct{}),
isRunning: 0, // Initialize as false
dirtyUsers: make(map[string]bool),
}
}

Expand Down Expand Up @@ -130,13 +122,6 @@ func (dw *DataWatcher) watchLoop(ctx context.Context) {

// processCompletedApps checks for completed hydration apps and moves them
func (dw *DataWatcher) processCompletedApps() {
// Ensure only one processing cycle runs at a time
if !dw.processingMutex.TryLock() {
glog.Warningf("DataWatcher: Previous processing cycle still running, skipping this cycle")
return
}
defer dw.processingMutex.Unlock()

processingStart := time.Now()
atomic.StoreInt64(&dw.lastRunTime, processingStart.Unix())

Expand Down Expand Up @@ -264,43 +249,6 @@ func (dw *DataWatcher) processUserData(userID string, userData *types.UserData)
return totalProcessed, totalMoved
}

// calculateAndSetUserHash calculates and sets the hash for user data (with tracking)
func (dw *DataWatcher) calculateAndSetUserHash(userID string, userData *types.UserData) {
// Add a per-user calculation flag to prevent concurrent execution
var isCalculatingKey = "isCalculating_" + userID

// Use a map in DataWatcher to track per-user calculation state
dw.hashMutex.Lock()
if dw.activeHashCalculations[isCalculatingKey] {
dw.hashMutex.Unlock()
glog.V(4).Infof("DataWatcher: Hash calculation already in progress for user %s (isCalculating), skipping", userID)
return
}

dw.activeHashCalculations[isCalculatingKey] = true
// Also keep the original tracking for compatibility
if dw.activeHashCalculations[userID] {
// delete(dw.activeHashCalculations, isCalculatingKey)
dw.hashMutex.Unlock()
glog.V(4).Infof("DataWatcher: Hash calculation already in progress for user %s, skipping", userID)
return
}
dw.activeHashCalculations[userID] = true
dw.hashMutex.Unlock()

defer func() {
// Clean up tracking when done
dw.hashMutex.Lock()
delete(dw.activeHashCalculations, userID)
delete(dw.activeHashCalculations, isCalculatingKey)
dw.hashMutex.Unlock()
glog.V(3).Infof("DataWatcher: Hash calculation tracking cleaned up for user %s", userID)
}()

// Call the direct calculation function
_ = dw.calculateAndSetUserHashDirect(userID, userData)
}

// calculateAndSetUserHashWithRetry calculates hash with retry mechanism for data consistency
func (dw *DataWatcher) calculateAndSetUserHashWithRetry(userID string, userData *types.UserData) {
maxRetries := 3
Expand Down Expand Up @@ -362,33 +310,6 @@ func (dw *DataWatcher) calculateAndSetUserHashDirect(userID string, userData *ty
return true
}

// calculateAndSetUserHashAsync calculates and sets hash for user data asynchronously
func (dw *DataWatcher) calculateAndSetUserHashAsync(userID string, userData *types.UserData) {
glog.Infof("DataWatcher: Starting async hash calculation for user %s", userID)

// Add timeout to prevent hanging
done := make(chan bool, 1)
go func() {
defer func() {
if r := recover(); r != nil {
glog.Errorf("DataWatcher: Panic during hash calculation for user %s: %v", userID, r)
}
}()
glog.Infof("DataWatcher: Hash calculation goroutine started for user %s", userID)
dw.calculateAndSetUserHash(userID, userData)
glog.Infof("DataWatcher: Hash calculation goroutine completed for user %s", userID)
done <- true
}()

select {
case <-done:
// Hash calculation completed successfully
glog.Infof("DataWatcher: Hash calculation finished successfully for user %s", userID)
case <-time.After(10 * time.Second):
glog.Errorf("DataWatcher: Hash calculation timeout for user %s after 10 seconds", userID)
}
}

// processSourceData processes a single source's data for completed hydration
// ~ not used
func (dw *DataWatcher) processSourceData(userID, sourceID string, sourceData *types.SourceData) (int64, int64) {
Expand Down Expand Up @@ -912,26 +833,6 @@ func (dw *DataWatcher) ForceCalculateUserHash(userID string) error {
// return nil
}

// ForceCalculateAllUsersHash forces hash calculation for all users
func (dw *DataWatcher) ForceCalculateAllUsersHash() error {
glog.V(3).Infof("DataWatcher: Force calculating hash for all users")

// Get all users data
allUsersData := dw.cacheManager.GetAllUsersData() // not used
if len(allUsersData) == 0 {
return fmt.Errorf("no users found in cache")
}

for userID, userData := range allUsersData {
if userData != nil {
glog.V(3).Infof("DataWatcher: Force calculating hash for user: %s", userID)
dw.calculateAndSetUserHash(userID, userData)
}
}

return nil
}

// MarkUserDirty marks a user as needing hash recalculation.
// Called by event-driven paths (e.g. DataWatcherState) that modify user data
// outside the Pipeline cycle. The dirty users will be picked up by Pipeline Phase 5.
Expand Down
21 changes: 0 additions & 21 deletions internal/v2/appinfo/datawatcher_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os"
"sort"
"strconv"
"sync"
"time"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -63,7 +62,6 @@ type DataWatcherRepo struct {
cacheManager *CacheManager // Add cache manager reference
dataWatcher *DataWatcher // Add DataWatcher reference for hash calculation
dataSender *DataSender // Add DataSender reference for NATS communication
mu sync.RWMutex
ticker *time.Ticker
stopChannel chan bool
isRunning bool
Expand Down Expand Up @@ -124,9 +122,6 @@ func (dwr *DataWatcherRepo) initializeLastProcessedID() error {

// Start begins the periodic state checking process
func (dwr *DataWatcherRepo) Start() error {
dwr.mu.Lock()
defer dwr.mu.Unlock()

if dwr.isRunning {
return fmt.Errorf("data watcher is already running")
}
Expand All @@ -145,9 +140,6 @@ func (dwr *DataWatcherRepo) Start() error {

// StartWithOptions starts with options, if enablePolling is false, the periodic polling is not started
func (dwr *DataWatcherRepo) StartWithOptions(enablePolling bool) error {
dwr.mu.Lock()
defer dwr.mu.Unlock()

if dwr.isRunning {
return fmt.Errorf("DataWatcherRepo is already running")
}
Expand All @@ -170,9 +162,6 @@ func (dwr *DataWatcherRepo) ProcessOnce() map[string]bool {

// Stop stops the periodic state checking process
func (dwr *DataWatcherRepo) Stop() error {
dwr.mu.Lock()
defer dwr.mu.Unlock()

if !dwr.isRunning {
return fmt.Errorf("data watcher is not running")
}
Expand All @@ -192,8 +181,6 @@ func (dwr *DataWatcherRepo) Stop() error {

// IsRunning returns whether the data watcher is currently running
func (dwr *DataWatcherRepo) IsRunning() bool {
dwr.mu.RLock()
defer dwr.mu.RUnlock()
return dwr.isRunning
}

Expand Down Expand Up @@ -561,16 +548,12 @@ func (dwr *DataWatcherRepo) updateCacheWithAppInfo(userID, sourceID string, appI

// SetCacheManager sets the cache manager for the data watcher repository
func (dwr *DataWatcherRepo) SetCacheManager(cacheManager *CacheManager) {
dwr.mu.Lock()
defer dwr.mu.Unlock()
dwr.cacheManager = cacheManager
glog.V(3).Info("Cache manager set for data watcher repository")
}

// GetCacheManager returns the current cache manager
func (dwr *DataWatcherRepo) GetCacheManager() *CacheManager {
dwr.mu.RLock()
defer dwr.mu.RUnlock()
return dwr.cacheManager
}

Expand All @@ -596,8 +579,6 @@ func (dwr *DataWatcherRepo) GetDataSender() *DataSender {

// GetLastProcessedID returns the last processed ID
func (dwr *DataWatcherRepo) GetLastProcessedID() int64 {
dwr.mu.RLock()
defer dwr.mu.RUnlock()
return dwr.lastProcessedID
}

Expand All @@ -608,8 +589,6 @@ func (dwr *DataWatcherRepo) GetApiBaseURL() string {

// SetApiBaseURL updates the API base URL
func (dwr *DataWatcherRepo) SetApiBaseURL(url string) {
dwr.mu.Lock()
defer dwr.mu.Unlock()
dwr.apiBaseURL = url
glog.V(3).Infof("API base URL updated to: %s", url)
}
Expand Down
1 change: 1 addition & 0 deletions internal/v2/appinfo/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func (h *Hydrator) HydrateSingleApp(ctx context.Context, userID, sourceID string
continue
}
if err := step.Execute(ctx, task); err != nil {
task.SetError(err)
failureReason := err.Error()
failureStep := step.GetStepName()
glog.Errorf("HydrateSingleApp: step %s failed for app %s(%s): %v", failureStep, appID, appName, err)
Expand Down
Loading
Loading