diff --git a/adapter/controller_decorator.go b/adapter/controller_decorator.go index 1640c3c..8541e39 100644 --- a/adapter/controller_decorator.go +++ b/adapter/controller_decorator.go @@ -1,9 +1,9 @@ package adapter import ( + "context" "encoding/json" "fmt" - "log/slog" "sort" "strconv" "strings" @@ -59,16 +59,16 @@ func (c *controllerFunction) Decorate(db *db.DBSyncClient, _ *easyjson.JSON) eas } children := getChildrenUUIDSByLinkTypeRemote(db, c.id, lt) - return easyjson.JSONFromArray(children) + return easyjson.NewJSON(children) case "getFromJPGQL": query := "" if len(c.args) > 0 { query = c.args[0] } if uuids, err := db.Query.JPGQLCtraQuery(c.id, query); err == nil { - return easyjson.JSONFromArray(uuids) + return easyjson.NewJSON(uuids) } - return easyjson.JSONFromArray([]string{}) + return easyjson.NewJSON([]string{}) case "getFromFPLInBase64": query := "" if len(c.args) > 0 { @@ -96,15 +96,15 @@ func (c *controllerFunction) Decorate(db *db.DBSyncClient, _ *easyjson.JSON) eas } } // -------------------------------------------------------------------------- - return easyjson.JSONFromArray(uuids) + return easyjson.NewJSON(uuids) } - return easyjson.JSONFromArray([]string{}) + return easyjson.NewJSON([]string{}) case "getInOutLinkTypes": out := getInOutLinkTypes(db, c.id) - return easyjson.JSONFromArray(out) + return easyjson.NewJSON(out) case "getOutLinkTypes": out := getOutLinkTypes(db, c.id) - return easyjson.JSONFromArray(out) + return easyjson.NewJSON(out) case "getLinksByType": if len(c.args) != 1 { return easyjson.NewJSON("invalid arguments") @@ -153,7 +153,7 @@ func parseDecorators(objectID string, payload *easyjson.JSON) map[string]control case _FUNCTION: f, args, err := extractFunctionAndArgs(value) if err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) continue } @@ -163,7 +163,7 @@ func parseDecorators(objectID string, payload *easyjson.JSON) map[string]control args: args, } default: - slog.Warn("parse decorator: unknown decorator", "decorator", decorator) + logger.GetLogger().Warnf(context.TODO(), "parse decorator: unknown decorator=%s", decorator) } } @@ -239,7 +239,7 @@ func parseArguments(s string) ([]string, error) { return nil, fmt.Errorf("unbalanced parentheses in arguments") } - // Добавляем последний аргумент + // Add last argument if currentArg.Len() > 0 { args = append(args, strings.TrimSpace(currentArg.String())) } @@ -255,7 +255,7 @@ func getChildrenLinkDataRemote(db *db.DBSyncClient, id, filterLinkType string, f data, err := db.Graph.VertexRead(id, true) if err != nil { - logger.Logln(logger.ErrorLevel, err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return easyjson.NewJSONObject() } @@ -292,7 +292,7 @@ func getChildrenUUIDSByLinkTypeRemote(db *db.DBSyncClient, id, filterLinkType st data, err := db.Graph.VertexRead(id, true) if err != nil { - logger.Logln(logger.ErrorLevel, err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return result } diff --git a/adapter/decorators/in_out_link_types.go b/adapter/decorators/in_out_link_types.go index 1f24442..f31561d 100644 --- a/adapter/decorators/in_out_link_types.go +++ b/adapter/decorators/in_out_link_types.go @@ -1,6 +1,7 @@ package decorators import ( + "context" "strings" "github.com/foliagecp/easyjson" @@ -27,7 +28,7 @@ func inOutLinkTypes(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { data, err := db.Graph.VertexRead(ctx.Self.ID, true) if err != nil { - logger.Logln(logger.ErrorLevel, err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return } @@ -76,8 +77,8 @@ func inOutLinkTypes(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { } resp := easyjson.NewJSONObject() - resp.SetByPath("in", easyjson.JSONFromArray(in)) - resp.SetByPath("out", easyjson.JSONFromArray(out)) + resp.SetByPath("in", easyjson.NewJSON(in)) + resp.SetByPath("out", easyjson.NewJSON(out)) okResponse(ctx, resp) } diff --git a/adapter/decorators/links_by_type.go b/adapter/decorators/links_by_type.go index b30e1d5..bec6dd3 100644 --- a/adapter/decorators/links_by_type.go +++ b/adapter/decorators/links_by_type.go @@ -1,6 +1,8 @@ package decorators import ( + "context" + "github.com/foliagecp/sdk/statefun/logger" sf "github.com/foliagecp/sdk/statefun/plugins" "github.com/foliagecp/ui-app-lib/internal/common" @@ -39,7 +41,7 @@ func linksByType(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { db := common.MustDBClient(ctx.Request) data, err := db.Graph.VertexRead(ctx.Self.ID, true) if err != nil { - logger.Logln(logger.ErrorLevel, err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return } diff --git a/adapter/decorators/types_navigation.go b/adapter/decorators/types_navigation.go index 2eb1e73..78def52 100644 --- a/adapter/decorators/types_navigation.go +++ b/adapter/decorators/types_navigation.go @@ -1,11 +1,12 @@ package decorators import ( + "context" "encoding/json" - "log/slog" "sort" "github.com/foliagecp/sdk/embedded/graph/crud" + "github.com/foliagecp/sdk/statefun/logger" sf "github.com/foliagecp/sdk/statefun/plugins" "github.com/foliagecp/ui-app-lib/internal/common" ) @@ -133,7 +134,7 @@ func typesNavigation(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { if ok := typeBody.PathExists("object_ids"); ok { var typeObjects []string if err := json.Unmarshal(typeBody.GetByPath("object_ids").ToBytes(), &typeObjects); err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) } for _, v := range typeObjects { @@ -239,7 +240,7 @@ func inOutTypes(ctx *sf.StatefunContextProcessor, id string) []string { link, err := db.CMDB.TypesLinkRead(objectID, id) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) continue } diff --git a/adapter/statefun.go b/adapter/statefun.go index b180ad3..7c0f2f5 100644 --- a/adapter/statefun.go +++ b/adapter/statefun.go @@ -3,7 +3,6 @@ package adapter import ( "context" "fmt" - "log/slog" "strings" "sync" "time" @@ -12,6 +11,7 @@ import ( "github.com/foliagecp/sdk/clients/go/db" "github.com/foliagecp/sdk/embedded/graph/crud" "github.com/foliagecp/sdk/statefun" + "github.com/foliagecp/sdk/statefun/logger" sfplugins "github.com/foliagecp/sdk/statefun/plugins" "github.com/foliagecp/sdk/statefun/system" "github.com/foliagecp/ui-app-lib/adapter/decorators" @@ -44,7 +44,7 @@ func controllerObjectOnTriggerWindowUpdater(runtime *statefun.Runtime) { for objectUUI, updatePayload := range controllerObjectOnTriggerWindowUpdaterTasks { err := runtime.Signal(sfplugins.AutoSignalSelect, inStatefun.CONTROLLER_OBJECT_UPDATE, objectUUI, updatePayload, nil) if err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) } } clear(controllerObjectOnTriggerWindowUpdaterTasks) @@ -56,11 +56,11 @@ func controllerObjectOnTriggerWindowUpdater(runtime *statefun.Runtime) { } func RegisterFunctions(runtime *statefun.Runtime) { - statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_START, StartController, *statefun.NewFunctionTypeConfig()) - statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_CLEAR, ClearController, *statefun.NewFunctionTypeConfig()) - statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_OBJECT_UPDATE, UpdateControllerObject, *statefun.NewFunctionTypeConfig()) - statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_OBJECT_TRIGGER, ControllerObjectTrigger, *statefun.NewFunctionTypeConfig()) - statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_CONSTRUCT, ControllerConstruct, *statefun.NewFunctionTypeConfig().SetAllowedRequestProviders(sfplugins.AutoRequestSelect)) + statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_START, StartController, *statefun.NewFunctionTypeConfig().SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_CLEAR, ClearController, *statefun.NewFunctionTypeConfig().SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_OBJECT_UPDATE, UpdateControllerObject, *statefun.NewFunctionTypeConfig().SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_OBJECT_TRIGGER, ControllerObjectTrigger, *statefun.NewFunctionTypeConfig().SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.CONTROLLER_CONSTRUCT, ControllerConstruct, *statefun.NewFunctionTypeConfig().SetAllowedRequestProviders(sfplugins.AutoRequestSelect).SetIdChannelSize(100)) decorators.Register(runtime) @@ -152,14 +152,14 @@ func StartController(_ sfplugins.StatefunExecutor, ctx *sfplugins.StatefunContex if err := cmdb.ObjectsLinkCreate(self.ID, caller.ID, caller.ID, []string{}); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create objects link between controller and session", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to create objects link between controller and session, err=%v", err.Error()) return } } if err := cmdb.ObjectsLinkCreate(caller.ID, self.ID, self.ID, []string{}); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create objects link between session and controller", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to create objects link between session and controller, err=%v", err.Error()) return } } @@ -174,14 +174,14 @@ func StartController(_ sfplugins.StatefunExecutor, ctx *sfplugins.StatefunContex objectType, err := common.ObjectType(cmdb, objectUUID) if err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to find uuid type", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to find uuid type, err=%v", err.Error()) return } } if err := cmdb.TypesLinkCreate(inStatefun.CONTROLLER_OBJECT_TYPE, objectType, inStatefun.CONTROLLER_SUBJECT_TYPE, []string{}); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create types link between controller object and uuid", "err", err.Error()) + logger.GetLogger().Warn(context.TODO(), "failed to create types link between controller object and uuid", err.Error()) return } } @@ -347,7 +347,7 @@ func StartController(_ sfplugins.StatefunExecutor, ctx *sfplugins.StatefunContex // if it's different send update to controller func UpdateControllerObject(_ sfplugins.StatefunExecutor, ctx *sfplugins.StatefunContextProcessor) { controllerObjectID := ctx.Self.ID - slog.Info("Update controller object", "id", controllerObjectID) + logger.GetLogger().Infof(context.TODO(), "Update controller object, id=%s", controllerObjectID) var body *easyjson.JSON var parentControllerID string @@ -371,21 +371,21 @@ func UpdateControllerObject(_ sfplugins.StatefunExecutor, ctx *sfplugins.Statefu if err := cmdb.ObjectCreate(controllerObjectID, inStatefun.CONTROLLER_OBJECT_TYPE, controllerObjectBody); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create controller object", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to create controller object, err=%s", err.Error()) return } } if err := cmdb.ObjectsLinkCreate(controllerObjectID, realObjectID, "uiapplib_"+realObjectID, []string{}); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create objects link between controller object and uuid", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to create objects link between controller object and uuid, err=%s", err.Error()) return } } if err := cmdb.ObjectsLinkCreate(parentControllerID, controllerObjectID, controllerObjectID, []string{}); err != nil { if !common.ErrorAlreadyExists(err) { - slog.Warn("failed to create objects link between controller and controller object", "err", err.Error()) + logger.GetLogger().Warnf(context.TODO(), "failed to create objects link between controller and controller object, err=%s", err.Error()) return } } @@ -395,7 +395,7 @@ func UpdateControllerObject(_ sfplugins.StatefunExecutor, ctx *sfplugins.Statefu } else { parentUUID, ok := body.GetByPath("parent").AsString() if !ok { - slog.Warn("empty controller id") + logger.GetLogger().Warn(context.TODO(), "empty controller id") return } parentControllerID = parentUUID @@ -405,7 +405,7 @@ func UpdateControllerObject(_ sfplugins.StatefunExecutor, ctx *sfplugins.Statefu controllerBody, err := ctx.Domain.Cache().GetValueJSON(parentControllerID) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return } @@ -462,16 +462,16 @@ func UpdateControllerObject(_ sfplugins.StatefunExecutor, ctx *sfplugins.Statefu subscribers := getChildrenUUIDSByLinkTypeLocal(ctx, parentControllerID, inStatefun.SUBSCRIBER_TYPE) if len(forceUpdateSessionId) == 0 { - slog.Info("Send update to subscribers", "subscribers", subscribers) + logger.GetLogger().Infof(context.TODO(), "Send update to subscribers=%v", subscribers) for _, subID := range subscribers { if err := egress.SendToSessionEgress(ctx, subID, &updateReply); err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) } } } else { - slog.Info("Send update to force update requested session only", "subscribers", subscribers) + logger.GetLogger().Infof(context.TODO(), "Send update to force update requested session only, subscribers=%v", subscribers) if err := egress.SendToSessionEgress(ctx, forceUpdateSessionId, &updateReply); err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) } } // ------------------------------------------------------------------------ diff --git a/adapter/statefun_test.go b/adapter/statefun_test.go index b6b16b9..f3e4a3a 100644 --- a/adapter/statefun_test.go +++ b/adapter/statefun_test.go @@ -82,7 +82,7 @@ func (s *adapterTestSuite) Test_StartController_Correct() { payload := easyjson.NewJSONObject() payload.SetByPath("name", easyjson.NewJSON(controllerName)) payload.SetByPath("declaration", controllerDeclaration) - payload.SetByPath("uuids", easyjson.JSONFromArray(uuids)) + payload.SetByPath("uuids", easyjson.NewJSON(uuids)) err = s.Signal(sfplugins.AutoSignalSelect, typename, controllerID, &payload, nil) s.Require().NoError(err) diff --git a/go.mod b/go.mod index 7a45e34..b8b7664 100644 --- a/go.mod +++ b/go.mod @@ -4,15 +4,17 @@ go 1.21.1 require ( github.com/foliagecp/easyjson v0.1.7 - github.com/foliagecp/sdk v0.1.7-cachejson-hotfix-2 + github.com/foliagecp/sdk v0.1.7-dev08072025.0.20251024102811-464daa79e017 github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 ) require ( + github.com/PaesslerAG/gval v1.2.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/dot v1.6.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect github.com/klauspost/compress v1.17.7 // indirect @@ -28,6 +30,7 @@ require ( github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect + github.com/shopspring/decimal v1.3.1 // indirect golang.org/x/crypto v0.22.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..b62de3b --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,255 @@ +package cache + +import ( + "context" + "fmt" + "hash/fnv" + "strconv" + "sync" + "time" + + "github.com/foliagecp/easyjson" + lg "github.com/foliagecp/sdk/statefun/logger" + sf "github.com/foliagecp/sdk/statefun/plugins" + "github.com/foliagecp/sdk/statefun/system" + "github.com/nats-io/nats.go" +) + +var ( + uiCache *Cache +) + +type Cache struct { + cache sync.Map // hash -> CacheEntry + pending sync.Map // hash -> PendingEntry (temporary) + correlator map[string]CorrelatorEntry // traceID -> hash, controllerOID (temporary) + correlatorMu sync.RWMutex // correlator mutex + egressPayloads map[string]*easyjson.JSON // controllerOID -> egress Payloads + egressPayloadsMu sync.RWMutex // egress Payloads mutex + config *Config // cache config + subscription *nats.Subscription // egress subscription + nc *nats.Conn // nats connection + stopCh chan struct{} // for gracefully shutdown +} + +type CacheEntry struct { + ExpiresAt time.Time + ControllerOIDs []string +} + +type PendingEntry struct { + TraceID string + Hash string + ControllerOIDs []string + FirstEgressAt time.Time + Timer *time.Timer + Mutex sync.Mutex +} + +type CorrelatorEntry struct { + Hash string +} + +func PrepareCollection(ctx *sf.StatefunContextProcessor, hash string) bool { + if !Enabled() { + return false + } + + traceID := ctx.TraceID() + + uiCache.correlatorMu.Lock() + uiCache.correlator[traceID] = CorrelatorEntry{ + Hash: hash, + } + uiCache.correlatorMu.Unlock() + + entry := &PendingEntry{ + TraceID: traceID, + Hash: hash, + ControllerOIDs: make([]string, 0), + } + + entry.Timer = time.AfterFunc( + time.Duration(uiCache.config.CollectTimeoutMS)*time.Millisecond, + func() { saveToCache(hash) }, + ) + + _, loaded := uiCache.pending.LoadOrStore(hash, entry) + + if loaded { + if !entry.Timer.Stop() { + <-entry.Timer.C + } + uiCache.correlatorMu.Lock() + delete(uiCache.correlator, traceID) + uiCache.correlatorMu.Unlock() + } + + return true +} + +func saveToCache(hash string) { + if !Enabled() { + return + } + + entryInterface, ok := uiCache.pending.Load(hash) + if !ok { + return + } + + entry := entryInterface.(*PendingEntry) + entry.Mutex.Lock() + defer entry.Mutex.Unlock() + + if entry.Timer == nil { + return + } + + traceID := entry.TraceID + entry.Timer = nil + + if len(entry.ControllerOIDs) == 0 { + uiCache.correlatorMu.Lock() + delete(uiCache.correlator, traceID) + uiCache.correlatorMu.Unlock() + uiCache.pending.Delete(hash) + return + } + + uiCache.cache.Store(hash, &CacheEntry{ + ControllerOIDs: entry.ControllerOIDs, + ExpiresAt: time.Now().Add(time.Duration(uiCache.config.TTLSeconds) * time.Second), + }) + + uiCache.correlatorMu.Lock() + delete(uiCache.correlator, traceID) + uiCache.correlatorMu.Unlock() + + uiCache.pending.Delete(hash) +} + +func collectEgress(payload *easyjson.JSON) { + if !Enabled() { + return + } + + if payload.GetByPath("cached").AsBoolDefault(false) { + return + } + + traceId, ok := payload.GetByPath("__trace_context.trace_id").AsString() + if !ok { + return + } + + controllerOID, ok := payload.GetByPath("__caller_id").AsString() + if !ok { + return + } + + clone := easyjson.NewJSONObject() + clone.SetByPath("payload", payload.GetByPath("payload")) + clone.SetByPath("cached", easyjson.NewJSON(true)) + + uiCache.correlatorMu.RLock() + correlatorEntry, inCorrelator := uiCache.correlator[traceId] + uiCache.correlatorMu.RUnlock() + + uiCache.egressPayloadsMu.Lock() + uiCache.egressPayloads[controllerOID] = &clone + uiCache.egressPayloadsMu.Unlock() + + if !inCorrelator { + return + } + + pendingEntryInterface, ok := uiCache.pending.Load(correlatorEntry.Hash) + if !ok { + return + } + + pendingEntry := pendingEntryInterface.(*PendingEntry) + pendingEntry.Mutex.Lock() + defer pendingEntry.Mutex.Unlock() + + if pendingEntry.Timer != nil { + pendingEntry.Timer.Reset(time.Duration(uiCache.config.CollectTimeoutMS) * time.Millisecond) + } + + pendingEntry.ControllerOIDs = append(pendingEntry.ControllerOIDs, controllerOID) +} + +func PublishCachedEgress(clientID string, controllerOIDs []string) { + if !Enabled() { + return + } + for _, cOID := range controllerOIDs { + uiCache.egressPayloadsMu.RLock() + payload, exist := uiCache.egressPayloads[cOID] + uiCache.egressPayloadsMu.RUnlock() + if !exist || payload == nil { + lg.GetLogger().Warnf(context.TODO(), "payload not found for controllerOID: %s", cOID) + continue + } + if err := uiCache.nc.Publish(fmt.Sprintf("egress.ui.%s", clientID), payload.ToBytes()); err != nil { + lg.GetLogger().Errorf(context.TODO(), "publishCachedEgress error: %v", err) + } + } +} + +func Get(hash string) *CacheEntry { + if !Enabled() { + return nil + } + entryInterface, ok := uiCache.cache.Load(hash) + if !ok { + return nil + } + + entry := entryInterface.(*CacheEntry) + + if time.Now().Before(entry.ExpiresAt) { + return entry + } + + uiCache.cache.CompareAndDelete(hash, entryInterface) + return nil +} + +func IsCacheable(payload *easyjson.JSON) bool { + if !Enabled() { + return false + } + + if payload.PathExists("command") { + return false + } + + if payload.GetByPath("no_cache").AsBoolDefault(false) { + return false + } + + return payload.PathExists("viewer") || payload.PathExists("controllers") +} + +func Hash(payload *easyjson.JSON) string { + bytes := payload.ToBytes() + h := fnv.New64a() + system.MsgOnErrorReturn(h.Write(bytes)) + return strconv.FormatUint(h.Sum64(), 16) +} + +func (c *Cache) handleNatsMessage(msg *nats.Msg) { + payload, ok := easyjson.JSONFromBytes(msg.Data) + if !ok { + lg.GetLogger().Errorf(context.TODO(), "invalid nats message") + return + } + + if payload.PathExists("payload.command") { + return + } + + collectEgress(&payload) +} diff --git a/internal/cache/config.go b/internal/cache/config.go new file mode 100644 index 0000000..4cd0b27 --- /dev/null +++ b/internal/cache/config.go @@ -0,0 +1,180 @@ +package cache + +import ( + "context" + "sync" + "time" + + "github.com/foliagecp/easyjson" + "github.com/foliagecp/sdk/statefun" + lg "github.com/foliagecp/sdk/statefun/logger" + "github.com/foliagecp/sdk/statefun/system" + "github.com/nats-io/nats.go" +) + +const ( + EGRESS_UI_SUBSRIBE_WILDCARD = "egress.ui.>" +) + +var config *Config + +type Config struct { + Enabled bool + TTLSeconds int + CollectTimeoutMS int + MaxEntries int + PeriodicCleanerTimeoutMin int +} + +func InitConfig() { + config = &Config{ + Enabled: system.GetEnvMustProceed("UI_APP_LIB_CACHE_ENABLED", false), + TTLSeconds: system.GetEnvMustProceed("UI_APP_LIB_CACHE_TTL_SECONDS", 600), + CollectTimeoutMS: system.GetEnvMustProceed("UI_APP_LIB_CACHE_COLLECT_TIMEOUT_MS", 10000), + MaxEntries: system.GetEnvMustProceed("UI_APP_LIB_CACHE_MAX_ENTRIES", 10000), + PeriodicCleanerTimeoutMin: system.GetEnvMustProceed("UI_APP_LIB_CACHE_PERIODIC_CLEANER_TIMEOUT_MIN", 5), + } +} + +func Init(runtime *statefun.Runtime) { + le := lg.GetLogger() + InitConfig() + + if !config.Enabled { + le.Infof(context.TODO(), ":::::: ui cache disabled, use UI_APP_LIB_CACHE_ENABLED in .env") + return + } + + nc := runtime.GetNatsConnection() + + cache := &Cache{ + correlator: make(map[string]CorrelatorEntry), + correlatorMu: sync.RWMutex{}, + egressPayloads: make(map[string]*easyjson.JSON), + config: config, + nc: nc, + stopCh: make(chan struct{}), + } + + sub, err := nc.Subscribe(EGRESS_UI_SUBSRIBE_WILDCARD, func(msg *nats.Msg) { + cache.handleNatsMessage(msg) + }) + if err != nil { + le.Errorf(context.TODO(), "Failed to subscribe to egress.ui.>: %s", err.Error()) + return + } + + cache.subscription = sub + + uiCache = cache //global ui cache + + go func() { + lg.GetLogger().Trace(context.TODO(), "cache cleaner started") + ticker := time.NewTicker(time.Duration(config.PeriodicCleanerTimeoutMin) * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + usedOIDs := cache.cleanupCacheAndCollect() + cache.cleanupEgressPayloads(usedOIDs) + cache.cleanupCorrelator() + case <-cache.stopCh: + return + } + } + }() + + le.Infof(context.TODO(), ":::::: ui cache enabled") +} + +func Enabled() bool { + return uiCache != nil && config.Enabled +} + +func (c *Cache) Shutdown() { + if c == nil { + return + } + + select { + case <-c.stopCh: + return + default: + close(c.stopCh) + } + + if c.subscription != nil { + system.MsgOnErrorReturn(c.subscription.Unsubscribe()) + } +} + +func (c *Cache) cleanupCacheAndCollect() (usedOIDs map[string]struct{}) { + all, deleted := 0, 0 + log := lg.GetLogger() + log.Debugf(context.TODO(), ">>> start delete old entries from ui-cache >>>") + + usedOIDs = make(map[string]struct{}) + + now := time.Now() + + c.cache.Range(func(key, value any) bool { + all++ + entry := value.(*CacheEntry) + + if now.After(entry.ExpiresAt) { + c.cache.CompareAndDelete(key, value) + deleted++ + } else { + for _, oid := range entry.ControllerOIDs { + usedOIDs[oid] = struct{}{} + } + } + + return true + }) + + log.Debugf(context.TODO(), + "<<< finish delete old entries from ui-cache, all=%d, deleted=%d <<<", all, deleted) + + return +} + +func (c *Cache) cleanupEgressPayloads(usedOIDs map[string]struct{}) { + //TODO optimize + //all, deleted := 0, 0 + //log := lg.GetLogger() + //log.Debugf(context.TODO(), ">>> start delete unactual entries from ui-cache-egress-payloads >>>") + // + //c.egressPayloadsMu.Lock() + //for controllerOID := range c.egressPayloads { + // all++ + // if _, ok := usedOIDs[controllerOID]; !ok { + // delete(c.egressPayloads, controllerOID) + // deleted++ + // } + //} + //c.egressPayloadsMu.Unlock() + // + //log.Debugf(context.TODO(), + // "<<< finish delete unactual entries from ui-cache-egress-payloads, all=%d, deleted=%d <<<", all, deleted) +} + +func (c *Cache) cleanupCorrelator() { + //TODO probably repeat function + all, deleted := 0, 0 + log := lg.GetLogger() + log.Debugf(context.TODO(), ">>> start delete unactual entries from ui-cache-correlator >>>") + + c.correlatorMu.Lock() + for traceID, corrEntry := range c.correlator { + all++ + if _, exists := c.pending.Load(corrEntry.Hash); !exists { + delete(c.correlator, traceID) + deleted++ + } + } + c.correlatorMu.Unlock() + + log.Debugf(context.TODO(), + "<<< finish delete unactual entries from ui-cache-correlator, all=%d, deleted=%d <<<", all, deleted) +} diff --git a/internal/egress/egress.go b/internal/egress/egress.go index 2fd44ed..c3bc657 100644 --- a/internal/egress/egress.go +++ b/internal/egress/egress.go @@ -54,7 +54,7 @@ func SendToSessionEgress(ctx *sf.StatefunContextProcessor, sessionID string, pay sessionID2ClientIDCache.Store(sessionID, clientID) } - return ctx.Signal(sf.AutoSignalSelect, inStatefun.EGRESS, generateEgressID(clientID), payload, nil) + return ctx.Signal(sf.AutoSignalSelect, inStatefun.EGRESS, GenerateEgressID(clientID), payload, nil) } func ClientIDFromEgressID(id string) string { @@ -67,7 +67,7 @@ func ClientIDFromEgressID(id string) string { return split[0] } -func generateEgressID(clientID string) string { +func GenerateEgressID(clientID string) string { s := make([]byte, 5) _, err := rand.Read(s) system.MsgOnErrorReturn(err) diff --git a/session/statefun.go b/session/statefun.go index 906f771..3207ab0 100644 --- a/session/statefun.go +++ b/session/statefun.go @@ -5,15 +5,16 @@ import ( "encoding/json" "errors" "fmt" - "log/slog" "time" "github.com/foliagecp/easyjson" "github.com/foliagecp/sdk/clients/go/db" "github.com/foliagecp/sdk/embedded/graph/crud" "github.com/foliagecp/sdk/statefun" + "github.com/foliagecp/sdk/statefun/logger" sf "github.com/foliagecp/sdk/statefun/plugins" "github.com/foliagecp/sdk/statefun/system" + "github.com/foliagecp/ui-app-lib/internal/cache" "github.com/foliagecp/ui-app-lib/internal/common" "github.com/foliagecp/ui-app-lib/internal/egress" "github.com/foliagecp/ui-app-lib/internal/generate" @@ -25,14 +26,16 @@ var ( ) func RegisterFunctions(runtime *statefun.Runtime) { - statefun.NewFunctionType(runtime, inStatefun.INGRESS, Ingress, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_ROUTER, SessionRouter, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_START, StartSession, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_CLOSE, CloseSession, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_UPDATE_ACTIVITY, UpdateSessionActivity, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_START_CONTROLLER, StartController, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.SESSION_CLEAR_CONTROLLER, ClearController, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) - statefun.NewFunctionType(runtime, inStatefun.EGRESS, Egress, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1)) + statefun.NewFunctionType(runtime, inStatefun.INGRESS, Ingress, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(500)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_ROUTER, SessionRouter, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_START, StartSession, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_CLOSE, CloseSession, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_UPDATE_ACTIVITY, UpdateSessionActivity, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_START_CONTROLLER, StartController, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.SESSION_CLEAR_CONTROLLER, ClearController, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + statefun.NewFunctionType(runtime, inStatefun.EGRESS, Egress, *statefun.NewFunctionTypeConfig().SetMaxIdHandlers(-1).SetIdChannelSize(100)) + + cache.Init(runtime) runtime.RegisterOnAfterStartFunction(InitSchema, false) } @@ -89,14 +92,25 @@ func Ingress(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { payload := ctx.Payload sessionID := ctx.Domain.CreateObjectIDWithHubDomain(generate.SessionID(id).String(), false) - slog.Info("Receive msg", "from", id, "session_id", sessionID) + logger.GetLogger().Infof(context.TODO(), "Receive msg from=%s, session_id=%s", id, sessionID) payload.SetByPath("client_id", easyjson.NewJSON(id)) if err := ctx.Signal(sf.AutoSignalSelect, inStatefun.SESSION_ROUTER, sessionID, payload, nil); err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Warn(context.TODO(), err.Error()) } } else { // Routing into ingresses of all weak cluster domains + if ctx.TraceContext() != nil && cache.IsCacheable(ctx.Payload) { + hash := cache.Hash(ctx.Payload) + if cached := cache.Get(hash); cached != nil { + logger.GetLogger().Tracef(context.TODO(), ":::::::::::cache hit, request hash=%s", hash) + cache.PublishCachedEgress(ctx.Self.ID, cached.ControllerOIDs) + return + } + logger.GetLogger().Tracef(context.TODO(), ":::::::::::cache miss, request hash=%s", hash) + cache.PrepareCollection(ctx, hash) + } + domains := ctx.Domain.GetWeakClusterDomains() if len(domains) > 1 { weakClustering = true @@ -131,7 +145,7 @@ var routes = map[Command]string{ func SessionRouter(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { sessionID := ctx.Self.ID payload := ctx.Payload - logger := slog.With("session_id", sessionID) + lg := logger.GetLogger().With(map[string]interface{}{"session_id": sessionID}) var command Command @@ -146,11 +160,11 @@ func SessionRouter(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { next, ok := routes[command] if !ok { - logger.Warn("Command not found", "command", command) + lg.Warnf(context.TODO(), "Command not found, command=%s", command) return } - logger.Info("Forward to next route", "next", next) + lg.Infof(context.TODO(), "Forward to next route, next=%s", next) system.MsgOnErrorReturn(ctx.Signal(sf.AutoSignalSelect, next, sessionID, payload, nil)) system.MsgOnErrorReturn(ctx.Signal(sf.AutoSignalSelect, inStatefun.SESSION_UPDATE_ACTIVITY, sessionID, nil, nil)) @@ -218,12 +232,12 @@ func CloseSession(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { /*dbc, err := db.NewDBSyncClientFromRequestFunction(ctx.Request) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return }*/ cmdb, err := db.NewCMDBSyncClientFromRequestFunction(ctx.Request) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return } @@ -265,7 +279,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { for _, plugin := range ctx.Payload.ObjectKeys() { var controllers map[string]Controller if err := json.Unmarshal(ctx.Payload.GetByPath(plugin).ToBytes(), &controllers); err != nil { - slog.Error(err.Error()) + logger.GetLogger().Errorf(context.TODO(), "unmarshall error, err=%s", err.Error()) return } @@ -278,7 +292,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { isShadowObjectInDomain = ctx.Domain.GetDomainFromObjectID(controller.UUIDs[0]) } - slog.Info(fmt.Sprintf( + logger.GetLogger().Infof(context.TODO(), fmt.Sprintf( "::::: StartController: SelfID=%s DomainName=%s WeakClustering=%t UUID[0]=%s GetValidObjectId(UUIDs[0])=%s", ctx.Self.ID, ctx.Domain.Name(), @@ -297,7 +311,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { payload := easyjson.NewJSONObject() payload.SetByPath("plugin", easyjson.NewJSON(plugin)) payload.SetByPath("declaration", body) - payload.SetByPath("uuids", easyjson.JSONFromArray(controller.UUIDs)) + payload.SetByPath("uuids", easyjson.NewJSON(controller.UUIDs)) payload.SetByPath("session_id", easyjson.NewJSON(sessionID)) payload.SetByPath("is_shadow_object_in_domain", easyjson.NewJSON(isShadowObjectInDomain)) payload.SetByPath("name", easyjson.NewJSON(name)) @@ -310,7 +324,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { ) if ctx.Domain.GetDomainFromObjectID(ctx.Self.ID) != ctx.Domain.GetDomainFromObjectID(controllerIDWithDomain) { - slog.Warn( + logger.GetLogger().Warnf(context.TODO(), fmt.Sprintf("::::: StartController: domains are not the same for SelfID=%s and UUID[0]=%s", ctx.Self.ID, controller.UUIDs[0], @@ -320,7 +334,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { err := ctx.Signal(sf.AutoSignalSelect, inStatefun.CONTROLLER_START, controllerIDWithDomain, &payload, nil) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return } } @@ -339,7 +353,7 @@ func StartController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { func ClearController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { sessionID := ctx.Self.ID - slog.Error(errors.ErrUnsupported.Error()) + logger.GetLogger().Error(context.TODO(), errors.ErrUnsupported.Error()) response := easyjson.NewJSONObject() response.SetByPath("command", easyjson.NewJSON(CLEAR_CONTROLLER)) @@ -349,7 +363,14 @@ func ClearController(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { } func Egress(_ sf.StatefunExecutor, ctx *sf.StatefunContextProcessor) { + if !ctx.Payload.PathExists("payload.command") && cache.Enabled() { // ignore command messages + tc := ctx.GetTraceContext() + if tc != nil { + ctx.Payload.SetByPath("__trace_context", *tc) + } + ctx.Payload.SetByPath("__caller_id", easyjson.NewJSON(ctx.Caller.ID)) + } if err := ctx.Egress(sf.NatsCoreEgress, ctx.Payload, egress.ClientIDFromEgressID(ctx.Self.ID)); err != nil { - slog.Warn(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) } } diff --git a/statefun.go b/statefun.go index f5574e1..e384b7b 100644 --- a/statefun.go +++ b/statefun.go @@ -1,8 +1,8 @@ package uilib import ( + "context" "fmt" - "log/slog" "sort" "time" @@ -26,7 +26,7 @@ const ( func sessionsKeeper(runtime *statefun.Runtime) { dbc, err := db.NewDBSyncClientFromRequestFunction(runtime.Request) if err != nil { - logger.Logf(logger.ErrorLevel, "ui-app-lib: cannot start sessionsKeeper, dbc creation error %s", err.Error()) + logger.GetLogger().Errorf(context.TODO(), "ui-app-lib: cannot start sessionsKeeper, dbc creation error %s", err.Error()) return } @@ -36,7 +36,7 @@ func sessionsKeeper(runtime *statefun.Runtime) { ids, err := dbc.Query.JPGQLCtraQuery(inStatefun.SESSION_TYPE, fmt.Sprintf(".*[l:type('%s')]", crud.OBJECT_TYPELINK)) if err != nil { - slog.Error(err.Error()) + logger.GetLogger().Error(context.TODO(), err.Error()) return }