diff --git a/internal/app/learning/handlers/confidence.go b/internal/app/learning/handlers/confidence.go index 197fde2..2385e14 100644 --- a/internal/app/learning/handlers/confidence.go +++ b/internal/app/learning/handlers/confidence.go @@ -21,9 +21,9 @@ import ( "strings" "time" - "openappsec.io/smartsync-service/models" "openappsec.io/errors" "openappsec.io/log" + "openappsec.io/smartsync-service/models" ) const ( @@ -38,7 +38,7 @@ type Repository interface { PostFile(ctx context.Context, tenantID string, path string, compress bool, data interface{}) error } -//NewConfidenceCalculator creates a new struct of confidence calculator +// NewConfidenceCalculator creates a new struct of confidence calculator func NewConfidenceCalculator(id models.SyncID, params models.ConfidenceParams, tuningDecisions models.TuningEvents, repo Repository) *ConfidenceCalculator { cc := &ConfidenceCalculator{ params: params, @@ -60,7 +60,7 @@ func NewConfidenceCalculator(id models.SyncID, params models.ConfidenceParams, t return cc } -//ConfidenceCalculator represents a confidence calculator handler +// ConfidenceCalculator represents a confidence calculator handler type ConfidenceCalculator struct { dependencies map[models.SyncID]models.SyncHandler filterSources *ScannersDetectorHandler @@ -133,7 +133,7 @@ func (cd *confidenceData) GetOriginalPath(ids models.SyncID) string { return fmt.Sprintf("/%v/%v/%v/processed/confidence.data", ids.TenantID, ids.AssetID, ids.Type) } -//GetFilePath returns the path to the state file saved by the service +// GetFilePath returns the path to the state file saved by the service func (cd *confidenceData) GetFilePath(id models.SyncID) string { return fmt.Sprintf("/%v/%v/%v/remote/confidence.data", id.TenantID, id.AssetID, id.Type) } @@ -171,7 +171,7 @@ func (c *ConfidenceCalculator) MergeData(data interface{}) { } } -//NewState returns a struct representing the state +// NewState returns a struct representing the state func (c *ConfidenceCalculator) NewState() models.State { return &confidenceData{} } @@ -230,7 +230,7 @@ func convertToDataStruct(key string, valuesAndSources map[string]map[string]bool return ret } -//ProcessData gets the last state and update the state according to the collected data +// ProcessData gets the last state and update the state according to the collected data func (c *ConfidenceCalculator) ProcessData(ctx context.Context, state models.State) models.State { stateStruct := state.(*confidenceData) stateModel := convertStateToModel(stateStruct) @@ -436,11 +436,63 @@ func (c *ConfidenceCalculator) calculateConfidenceDelta( return delta } -//GetDependencies return the handlers that this handler is dependent on +// GetDependencies return the handlers that this handler is dependent on func (c *ConfidenceCalculator) GetDependencies() map[models.SyncID]models.SyncHandler { return c.dependencies } +// ClearMergedData releases references to large in-memory merged data maps to allow GC to reclaim memory +// after a processing cycle finishes. Should be called once the state has been derived. +func (c *ConfidenceCalculator) ClearMergedData() { + // Replace with fresh empty map instead of nil to avoid nil map assignment panics in future merges + c.md.log = mms{} +} + +// ProcessDataFromCentralData processes data from the central data source +func (c *ConfidenceCalculator) ProcessDataFromCentralData(ctx context.Context, state models.State, mergedData *models.CentralData) models.State { + log.WithContext(ctx).Debugf("ConfidenceCalculator.ProcessDataFromCentralData id: %+v", c.id) + c.md.log = mms{} // reset + // Use pointer-based deduplication (already done at unmarshal time) + for key, entry := range mergedData.Logger { + if _, ok := c.md.log[key]; !ok { + c.md.log[key] = map[string]map[string]bool{} + } + if c.id.Type == models.TypesConfidence { + for typeKey, sourcePtrs := range entry.Types { + if typeKey == "long_random_text" { + continue + } + if _, ok := c.md.log[key][typeKey]; !ok { + c.md.log[key][typeKey] = map[string]bool{} + } + // Dereference pointers - strings already deduplicated at unmarshal + for _, srcPtr := range sourcePtrs { + c.md.log[key][typeKey][*srcPtr] = true + } + } + } else { + for indKey, sourcePtrs := range entry.Indicators { + if _, ok := c.md.log[key][indKey]; !ok { + c.md.log[key][indKey] = map[string]bool{} + } + // Dereference pointers - strings already deduplicated at unmarshal + for _, srcPtr := range sourcePtrs { + c.md.log[key][indKey][*srcPtr] = true + } + } + } + if len(entry.TotalSources) > 0 { + allSources := map[string]bool{} + // Dereference pointers - strings already deduplicated at unmarshal + for _, srcPtr := range entry.TotalSources { + allSources[*srcPtr] = true + } + c.md.log[key][c.params.NullObject] = allSources + } + } + return c.ProcessData(ctx, state) +} + func (c *ConfidenceCalculator) isParamBenign(key string) bool { keySplit := strings.SplitN(key, "#", 2) param := key diff --git a/internal/app/learning/handlers/scanners_detector.go b/internal/app/learning/handlers/scanners_detector.go index 3829518..44ac138 100644 --- a/internal/app/learning/handlers/scanners_detector.go +++ b/internal/app/learning/handlers/scanners_detector.go @@ -78,7 +78,20 @@ func (s *ScannersDetectorHandler) SetCompressionEnabled() { // MergeData merges the data from agent into struct member func (s *ScannersDetectorHandler) MergeData(data interface{}) { monitor := data.(*scannersMonitor) - mergeAndConvertCerealToGo(monitor.Monitor, s.mergedMonitors.Monitor) + mergeAndConvertCerealToGo(monitor.Monitor, s.mergedMonitors.Monitor, false) +} + +// ClearMergedData releases references to merged data to free memory between runs. +func (s *ScannersDetectorHandler) ClearMergedData() { + s.mergedMonitors = scannersDetectMergedData{Monitor: map[string]map[string]map[string]bool{}} +} + +// ProcessDataFromCentralData processes data from central data +// ScannersDetector is handled as a dependency in indicators confidence, not directly from central data +func (s *ScannersDetectorHandler) ProcessDataFromCentralData(ctx context.Context, state models.State, mergedData *models.CentralData) models.State { + // ScannersDetector uses its own monitor data structure, not CentralData + // This is a pass-through to ProcessData for interface compliance + return s.ProcessData(ctx, state) } //NewState returns a struct representing the state diff --git a/internal/app/learning/handlers/trusted_sources.go b/internal/app/learning/handlers/trusted_sources.go index da4cabf..bb024f4 100644 --- a/internal/app/learning/handlers/trusted_sources.go +++ b/internal/app/learning/handlers/trusted_sources.go @@ -17,11 +17,13 @@ import ( "context" "fmt" + "openappsec.io/log" "openappsec.io/smartsync-service/models" ) -//TrustedSourcesHandler represents a trusted sources handler +// TrustedSourcesHandler represents a trusted sources handler type TrustedSourcesHandler struct { + id models.SyncID trustedLog mms } @@ -29,12 +31,12 @@ type trustedSourcesState struct { Logger []mapToMapToSet `json:"logger"` } -//GetFilePath returns the path to the state file saved by the service +// GetFilePath returns the path to the state file saved by the service func (ts *trustedSourcesState) GetFilePath(id models.SyncID) string { return fmt.Sprintf("/%v/%v/%v/remote/data.data", id.TenantID, id.AssetID, id.Type) } -//GetOriginalPath returns the path to the state file saved by the agent +// GetOriginalPath returns the path to the state file saved by the agent func (ts *trustedSourcesState) GetOriginalPath(id models.SyncID) string { return fmt.Sprintf("/%v/%v/%v/processed/data.data", id.TenantID, id.AssetID, id.Type) } @@ -43,9 +45,9 @@ func (ts *trustedSourcesState) ShouldRebase() bool { return false } -//NewTrustedSources creates new trusted sources handler -func NewTrustedSources() *TrustedSourcesHandler { - return &TrustedSourcesHandler{trustedLog: mms{}} +// NewTrustedSources creates new trusted sources handler +func NewTrustedSources(id models.SyncID) *TrustedSourcesHandler { + return &TrustedSourcesHandler{id: id, trustedLog: mms{}} } // NewDataStruct returns a struct representation of the data put by agent @@ -56,10 +58,10 @@ func (ts *TrustedSourcesHandler) NewDataStruct() interface{} { // MergeData merges the data from agent into struct member func (ts *TrustedSourcesHandler) MergeData(data interface{}) { state := data.(*trustedSourcesState) - mergeAndConvertCerealToGo(state.Logger, ts.trustedLog) + mergeAndConvertCerealToGo(state.Logger, ts.trustedLog, false) } -//NewState returns a struct representing the state +// NewState returns a struct representing the state func (ts *TrustedSourcesHandler) NewState() models.State { return &trustedSourcesState{[]mapToMapToSet{}} } @@ -69,9 +71,97 @@ func (ts *TrustedSourcesHandler) SetCompressionEnabled() { // do nothing } -//ProcessData gets the last state and update the state according to the collected data -func (ts *TrustedSourcesHandler) ProcessData(ctx context.Context, _ models.State) models.State { - state := &trustedSourcesState{[]mapToMapToSet{}} +// ClearMergedData releases references to merged trusted sources data to free memory between runs. +func (ts *TrustedSourcesHandler) ClearMergedData() { + ts.trustedLog = mms{} +} + +func containsTrustedSourcePtrs(sources []*string, trustedMap map[string]bool) bool { + // Check if any source is trusted + for _, srcPtr := range sources { + if _, ok := trustedMap[*srcPtr]; ok { + return true + } + } + return false +} + +// ProcessDataFromCentralData processes data from central data +func (ts *TrustedSourcesHandler) ProcessDataFromCentralData(ctx context.Context, state models.State, mergedData *models.CentralData) models.State { + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessDataFromCentralData ids: %+v", ts.id) + ts.trustedLog = mms{} // reset + // Use pointer-based deduplication (already done at unmarshal time) + if len(mergedData.TrustedSources) == 0 { + // No trusted sources reported, return empty state + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessDataFromCentralData no trusted sources reported, returning empty state") + return ts.ProcessData(ctx, state) + } + // Create trusted sources lookup map for O(1) checks - dereference pointers + trustedMap := make(map[string]bool, len(mergedData.TrustedSources)) + for _, srcPtr := range mergedData.TrustedSources { + trustedMap[*srcPtr] = true + } + + for key, entry := range mergedData.Logger { + if !containsTrustedSourcePtrs(entry.TotalSources, trustedMap) { + // skip if no trusted source reported this key + continue + } + if ts.id.Type == models.TypesTrusted { + for typeKey, sourcePtrs := range entry.Types { + // check if any source is trusted else skip + for _, srcPtr := range sourcePtrs { + src := *srcPtr + // Only add src if it was reported in trusted sources + if trustedMap[src] { + if _, ok := ts.trustedLog[key]; !ok { + ts.trustedLog[key] = map[string]map[string]bool{} + } + if _, ok := ts.trustedLog[key][typeKey]; !ok { + ts.trustedLog[key][typeKey] = map[string]bool{} + } + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessDataFromCentralData adding src: %v to key: %v typeKey: %v", src, key, typeKey) + ts.trustedLog[key][typeKey][src] = true + } + } + } + } else { + for indKey, sourcePtrs := range entry.Indicators { + for _, srcPtr := range sourcePtrs { + src := *srcPtr + // Only add src if it was reported in trusted sources + if trustedMap[src] { + if _, ok := ts.trustedLog[key]; !ok { + ts.trustedLog[key] = map[string]map[string]bool{} + } + if _, ok := ts.trustedLog[key][indKey]; !ok { + ts.trustedLog[key][indKey] = map[string]bool{} + } + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessDataFromCentralData adding src: %v to key: %v indKey: %v", src, key, indKey) + ts.trustedLog[key][indKey][src] = true + } + } + } + } + } + // Return processed state + return ts.ProcessData(ctx, state) +} + +// ProcessData gets the last state and update the state according to the collected data +func (ts *TrustedSourcesHandler) ProcessData(ctx context.Context, stateIfs models.State) models.State { + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessData ids: %+v", ts.id) + // Handle nil state - treat it as empty state + var existingState *trustedSourcesState + if stateIfs == nil { + log.WithContext(ctx).Debugf("TrustedSourcesHandler.ProcessData received nil state, using empty state") + existingState = &trustedSourcesState{[]mapToMapToSet{}} + } else { + existingState = stateIfs.(*trustedSourcesState) + } + // merge the state with the trusted log + mergeAndConvertCerealToGo(existingState.Logger, ts.trustedLog, false) + newState := &trustedSourcesState{[]mapToMapToSet{}} for key, keyData := range ts.trustedLog { keyObj := mapToMapToSet{Key: key} for value, sources := range keyData { @@ -81,12 +171,12 @@ func (ts *TrustedSourcesHandler) ProcessData(ctx context.Context, _ models.State } keyObj.Value = append(keyObj.Value, valueObj) } - state.Logger = append(state.Logger, keyObj) + newState.Logger = append(newState.Logger, keyObj) } - return state + return newState } -//GetDependencies return the handlers that this handler is dependent on +// GetDependencies return the handlers that this handler is dependent on func (ts *TrustedSourcesHandler) GetDependencies() map[models.SyncID]models.SyncHandler { return nil } diff --git a/internal/app/learning/handlers/utils.go b/internal/app/learning/handlers/utils.go index 63f9e58..824b86c 100644 --- a/internal/app/learning/handlers/utils.go +++ b/internal/app/learning/handlers/utils.go @@ -13,21 +13,46 @@ package handlers +import ( + "strings" +) + +// Configuration defines the get configuration interface +type Configuration interface { + GetString(key string) (string, error) +} + // mms type defines a map to map to set. type mms map[string]map[string]map[string]bool -func mergeAndConvertCerealToGo(src []mapToMapToSet, dst mms) { +func mergeAndConvertCerealToGo(src []mapToMapToSet, dst mms, shouldStrip bool) { for _, srcData := range src { if _, ok := dst[srcData.Key]; !ok { dst[srcData.Key] = map[string]map[string]bool{} } for _, keyData := range srcData.Value { - if _, ok := dst[srcData.Key][keyData.Key]; !ok { - dst[srcData.Key][keyData.Key] = map[string]bool{} + key := keyData.Key + if shouldStrip { + key = stripSuffix(key) + } + if _, ok := dst[srcData.Key][key]; !ok { + dst[srcData.Key][key] = map[string]bool{} } for _, indicator := range keyData.Value { - dst[srcData.Key][keyData.Key][indicator] = true + dst[srcData.Key][key][indicator] = true } } } } + +func stripSuffix(key string) string { + suffices := []string{".pipe", ".sem", ".amp", ".comma", ".asterisk"} + strippedKey := "" + for _, suffix := range suffices { + strippedKey = strings.TrimSuffix(key, suffix) + if strippedKey != key { + return stripSuffix(strippedKey) + } + } + return strippedKey +} diff --git a/internal/app/learning/operations.go b/internal/app/learning/operations.go index c14d297..8080e9a 100644 --- a/internal/app/learning/operations.go +++ b/internal/app/learning/operations.go @@ -42,7 +42,7 @@ func (lc *LearnCore) unlockOnPanic(ctx context.Context, lockKey string) { } } -//ProcessSyncRequest process a request to sync +// ProcessSyncRequest process a request to sync func (lc *LearnCore) ProcessSyncRequest(ctx context.Context, ids models.SyncID) error { lockKey := genKey(ids) hasLock := lc.lock.Lock(ctx, lockKey) @@ -52,16 +52,25 @@ func (lc *LearnCore) ProcessSyncRequest(ctx context.Context, ids models.SyncID) } log.WithContext(ctx).Infof("handling event: %v", ids) defer lc.unlockOnPanic(ctx, lockKey) - handlers, err := lc.handlersFactory(ctx, ids) + if ids.Type == models.CentralizedData { + log.WithContext(ctx).Infof("handling centralized data all handlers for event: %+v", ids) + err := lc.handleCentralDataType(ctx, ids) + if err != nil { + err = errors.Wrap(err, "failed running central type") + lc.lock.Unlock(ctx, lockKey) + } + return err + } + syncHandlers, err := lc.handlersFactory(ctx, ids) if err != nil { log.WithContext(ctx).Warnf("failed to get handlers for: %v, err: %v", ids, err) return nil } - if len(handlers) == 0 { + if len(syncHandlers) == 0 { log.WithContext(ctx).Debugf("no handlers for event: %v", ids) return nil } - err = lc.syncWorker(ctx, handlers) + err = lc.syncWorker(ctx, syncHandlers) if err != nil { err = errors.Wrap(err, "failed running sync worker") lc.lock.Unlock(ctx, lockKey) @@ -97,7 +106,7 @@ func (lc *LearnCore) handlersFactory(ctx context.Context, id models.SyncID) (map } ret[id] = handlers.NewConfidenceCalculator(id, params, lc.getTuningDecisions(ctx, id), lc.repo) case models.IndicatorsTrusted: - ret[id] = handlers.NewTrustedSources() + ret[id] = handlers.NewTrustedSources(id) case models.ScannersDetector: // do nothing - is handled as a dependency in indicators confidence case models.TypesConfidence: @@ -110,18 +119,141 @@ func (lc *LearnCore) handlersFactory(ctx context.Context, id models.SyncID) (map } ret[id] = handlers.NewConfidenceCalculator(id, params, lc.getTuningDecisions(ctx, id), lc.repo) case models.TypesTrusted: - ret[id] = handlers.NewTrustedSources() + ret[id] = handlers.NewTrustedSources(id) + case models.CentralizedData: + // Centralized data is handled separately in ProcessSyncRequest + // do nothing here default: return nil, errors.Errorf("type %v is unrecognized", id.Type) } return ret, nil } -func (lc *LearnCore) syncWorker(ctx context.Context, handlers map[models.SyncID]models.SyncHandler) error { - if len(handlers) == 0 { +func (lc *LearnCore) handleCentralDataType(ctx context.Context, ids models.SyncID) error { + log.WithContext(ctx).Infof("handleCentralDataType id: %+v", ids) + // create central data collector + centralDataCollector := handlers.NewCentralDataCollector(ids, lc.repo) + isCompressEnable, err := lc.mergeAgentFiles(ctx, ids, centralDataCollector) + if err != nil { + return errors.Wrapf(err, "failed merging central agent data for %s", ids.TenantID) + } + + allHandlers := centralDataCollector.GetAllHandlers(ctx, ids, lc.getTuningDecisions(ctx, ids)) + if len(allHandlers) == 0 { + log.WithContext(ctx).Debugf("no handlers for centralized data for event: %v", ids) + return nil + } + for idsH, handler := range allHandlers { + if len(handler.GetDependencies()) > 0 { + log.WithContext(ctx).Infof("handle dependency of: %v", idsH) + dependencyHandler := handler.GetDependencies() + for depIds, depHandler := range dependencyHandler { + err := lc.invokeHandler(ctx, depHandler, isCompressEnable, depIds, centralDataCollector, ids) + if err != nil { + return err + } + } + } + err := lc.invokeHandler(ctx, handler, isCompressEnable, idsH, centralDataCollector, ids) + if err != nil { + return err + } + } + // Clear references to allow GC to reclaim memory + centralDataCollector.ClearMergedData() + return nil +} + +func (lc *LearnCore) invokeHandler( + ctx context.Context, + handler models.SyncHandler, + isCompressEnable bool, + idsH models.SyncID, + centralDataCollector *handlers.CentralDataCollector, + ids models.SyncID) error { + + if isCompressEnable { + handler.SetCompressionEnabled() + } + state, statePath, err := lc.getState(ctx, idsH, handler) + if err != nil { + return errors.Wrapf(err, "failed to get state for handler: %v", idsH) + } + // process data from central data collector + log.WithContext(ctx).Infof("processing centralized data for %+v", idsH) + + // merged data - new structure after merge + state = handler.ProcessDataFromCentralData(ctx, state, centralDataCollector.GetData()) + log.WithContext(ctx).Infof("posting new state for %+v to path %s", idsH, statePath) + // post new state + err = lc.repo.PostFile(ctx, ids.TenantID, statePath, isCompressEnable, state) + if err != nil { + return errors.Wrapf(err, "failed to post new state to: %v", statePath) + } + return nil +} + +func (lc *LearnCore) getState(ctx context.Context, ids models.SyncID, handler models.SyncHandler) (models.State, string, error) { + log.WithContext(ctx).Infof("getting state for: %+v", ids) + state := handler.NewState() + statePath := state.GetFilePath(ids) + _, err := lc.repo.GetFile(ctx, ids.TenantID, statePath, state) + if err != nil { + if !errors.IsClass(err, errors.ClassNotFound) { + return nil, "", errors.Wrapf(err, "failed to get state from: %v", statePath) + } + return state, statePath, nil + } + log.WithContext(ctx).Debugf("got state: %.512v", fmt.Sprintf("%+v", state)) + if state.ShouldRebase() { + log.WithContext(ctx).Infof("Rebasing state for %+v", ids) + // if should rebase is true then original path must not be empty + origStatePath := state.GetOriginalPath(ids) + rebasedState := handler.NewState() + _, err = lc.repo.GetFile(ctx, ids.TenantID, origStatePath, rebasedState) + if err != nil { + log.WithContext(ctx).Warnf("Failed to rebase state") + } else { + state = rebasedState + } + } + return state, statePath, nil +} + +func (lc *LearnCore) mergeAgentFiles( + ctx context.Context, + ids models.SyncID, + dataCollector models.DataCollector, +) (bool, error) { + isCompressEnable := false + files, err := lc.repo.GetFilesList(ctx, ids) + if err != nil { + return isCompressEnable, errors.Wrap(err, "failed to get files list") + } + log.WithContext(ctx).Infof("merging files: %v", files) + if len(files) == 0 { + log.WithContext(ctx).Infof("no files to merge for: %v", ids) + return isCompressEnable, errors.Errorf("no files to merge for: %v", ids).SetClass(errors.ClassNotFound) + } + for _, file := range files { + data := dataCollector.NewDataStruct() + fileIsCompressed, err := lc.repo.GetFile(ctx, ids.TenantID, file, data) + if err != nil { + return isCompressEnable, errors.Wrapf(err, "failed to get file: %v", file) + } + if fileIsCompressed { + isCompressEnable = true + } + dataCollector.MergeData(data) + } + return isCompressEnable, nil +} + +func (lc *LearnCore) syncWorker(ctx context.Context, syncHandlers map[models.SyncID]models.SyncHandler) error { + if len(syncHandlers) == 0 { return errors.New("got empty handlers list") } - for ids, handler := range handlers { + for ids, handler := range syncHandlers { dependenciesHandlers := handler.GetDependencies() if len(dependenciesHandlers) > 0 { log.WithContext(ctx).Infof("handle dependency of: %v", ids) diff --git a/models/domain.go b/models/domain.go index ae10644..c12137d 100644 --- a/models/domain.go +++ b/models/domain.go @@ -27,12 +27,20 @@ type State interface { GetOriginalPath(ids SyncID) string } -//SyncHandler defines a handler for sync per type -type SyncHandler interface { +// DataCollector defines the interface for data collection per type +type DataCollector interface { NewDataStruct() interface{} MergeData(data interface{}) + ClearMergedData() +} + +// SyncHandler defines a handler for sync per type +type SyncHandler interface { + DataCollector + NewState() State ProcessData(ctx context.Context, state State) State + ProcessDataFromCentralData(ctx context.Context, state State, mergedData *CentralData) State GetDependencies() map[SyncID]SyncHandler SetCompressionEnabled() } diff --git a/models/repository.go b/models/repository.go index a69cb3b..d380906 100644 --- a/models/repository.go +++ b/models/repository.go @@ -13,6 +13,10 @@ package models +import ( + "encoding/json" +) + // Values list of values type Values []string @@ -56,8 +60,18 @@ const ( IndicatorsTrusted SyncType = "Indicators/Trust" TypesConfidence SyncType = "Type/Confidence" TypesTrusted SyncType = "Type/Trust" + CentralizedData SyncType = "CentralizedData" ) +// SyncTypes is an iterable list of known sync types for centralized data processing +var SyncTypes = []SyncType{ + ScannersDetector, + IndicatorsConfidence, + IndicatorsTrusted, + TypesConfidence, + TypesTrusted, +} + // SyncID contain all the data that defines the sync operation type SyncID struct { TenantID string @@ -72,3 +86,112 @@ type SyncLearnNotificationConsumers struct { Type SyncType `json:"type"` WindowID string `json:"windowId"` } + +// CentralData is the struct for centralized data collection, matching the expected JSON +// Uses []*string for pointer-based deduplication +type CentralData struct { + TrustedSources []*string `json:"trustedSources"` + Logger map[string]LoggerEntry `json:"logger"` +} + +// CentralDataWrapper is a wrapper for CentralData to match the JSON structure +type CentralDataWrapper struct { + Data *CentralData `json:"unifiedIndicators"` +} + +// UnmarshalJSON customizes the JSON unmarshalling for CentralData +// Deduplicates trusted sources using pointers at unmarshal time +func (c *CentralData) UnmarshalJSON(data []byte) error { + type rawCentralData struct { + Logger map[string]LoggerEntry `json:"logger"` + TrustedSources []string `json:"trustedSources"` + } + var raw rawCentralData + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + // String pool for deduplication of trusted sources + stringPool := make(map[string]*string) + + getOrCreate := func(s string) *string { + if ptr, ok := stringPool[s]; ok { + return ptr + } + str := s + stringPool[str] = &str + return stringPool[str] + } + + // Deduplicate TrustedSources + c.TrustedSources = make([]*string, 0, len(raw.TrustedSources)) + for _, src := range raw.TrustedSources { + c.TrustedSources = append(c.TrustedSources, getOrCreate(src)) + } + + // Logger entries already unmarshaled (each entry deduplicates internally) + c.Logger = raw.Logger + return nil +} + +// LoggerEntry represents a single entry in the logger with total sources, indicators, and types +// Uses []*string for pointer-based string deduplication at unmarshal time +type LoggerEntry struct { + TotalSources []*string `json:"totalSources"` + Indicators map[string][]*string `json:"indicators"` + Types map[string][]*string `json:"types"` +} + +// UnmarshalJSON customizes the JSON unmarshalling for LoggerEntry +// It deduplicates strings at unmarshal time using pointers to reduce memory usage +func (l *LoggerEntry) UnmarshalJSON(data []byte) error { + type rawLoggerEntry struct { + TotalSources []string `json:"totalSources"` + Indicators map[string][]string `json:"indicators"` + Types map[string][]string `json:"types"` + } + var raw rawLoggerEntry + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + // String pool for deduplication within this logger entry + stringPool := make(map[string]*string) + + getOrCreate := func(s string) *string { + if ptr, ok := stringPool[s]; ok { + return ptr + } + // Create a new string and store pointer in pool + str := s + stringPool[str] = &str + return stringPool[str] + } + + // Deduplicate TotalSources + l.TotalSources = make([]*string, 0, len(raw.TotalSources)) + for _, src := range raw.TotalSources { + l.TotalSources = append(l.TotalSources, getOrCreate(src)) + } + + // Deduplicate Indicators + l.Indicators = make(map[string][]*string, len(raw.Indicators)) + for key, arr := range raw.Indicators { + ptrSlice := make([]*string, 0, len(arr)) + for _, v := range arr { + ptrSlice = append(ptrSlice, getOrCreate(v)) + } + l.Indicators[key] = ptrSlice + } + + // Deduplicate Types + l.Types = make(map[string][]*string, len(raw.Types)) + for key, arr := range raw.Types { + ptrSlice := make([]*string, 0, len(arr)) + for _, v := range arr { + ptrSlice = append(ptrSlice, getOrCreate(v)) + } + l.Types[key] = ptrSlice + } + return nil +}