Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions embedded/graph/crud/hl_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func DeleteType(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextProc

om.AggregateOpMsg(m)

cachePurgeTypeEdgesForType(selfID)

PolyTypeGoalFinalize(ctx, polyTypeData)

om.Reply()
Expand Down Expand Up @@ -287,6 +289,10 @@ func CreateObject(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextPr
executeTriggersFromLLOpStack(ctx, opStack, "", "")
}

if om.GetStatus() == sfMediators.SYNC_OP_STATUS_OK {
cacheSetObjectType(selfID, originType)
}

replyWithoutOpStack(om, ctx, targetReply)
}

Expand Down Expand Up @@ -369,6 +375,7 @@ func DeleteObject(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextPr
executeTriggersFromLLOpStack(ctx, j, selfID, objectType)
}

cacheDeleteObjectType(selfID)
replyWithoutOpStack(om, ctx)
}

Expand Down Expand Up @@ -497,6 +504,10 @@ func CreateTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.create", makeSequenceFreeParentBasedID(ctx, selfID), injectParentHoldsLocks(ctx, &link), ctx.Options)))
operationKeysMutexUnlock(ctx)

if om.GetStatus() == sfMediators.SYNC_OP_STATUS_OK {
cacheSetTypeEdge(selfID, toType, objectLinkType)
}

om.Reply()
}

Expand Down Expand Up @@ -545,6 +556,14 @@ func UpdateTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.update", makeSequenceFreeParentBasedID(ctx, selfID), injectParentHoldsLocks(ctx, &link), ctx.Options)))
operationKeysMutexUnlock(ctx)

if om.GetStatus() == sfMediators.SYNC_OP_STATUS_OK {
if ctx.Payload.PathExists("body.type") {
if newLT, ok := ctx.Payload.GetByPath("body.type").AsString(); ok && newLT != "" {
cacheSetTypeEdge(selfID, toType, newLT)
}
}
}

om.Reply()
}

Expand Down Expand Up @@ -613,6 +632,8 @@ func DeleteTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.delete", makeSequenceFreeParentBasedID(ctx, selfID), injectParentHoldsLocks(ctx, &objectLink), ctx.Options)))
operationKeysMutexUnlock(ctx)

cacheDeleteTypeEdge(selfID, toType)

PolyTypeGoalFinalize(ctx, polyTypeData)

for _, lateTrigger := range lateTriggersArr {
Expand Down Expand Up @@ -738,17 +759,8 @@ func UpdateObjectsLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunCont
}
objectToID = ctx.Domain.CreateObjectIDWithThisDomain(objectToID, false)

operationKeysMutexLock(ctx, []string{selfID, objectToID}, true)
_, _, linkType, err := getReferenceLinkTypeBetweenTwoObjects(ctx, selfID, objectToID)
if err != nil {
operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgFailed(err.Error())).Reply()
return
}

objectLink := easyjson.NewJSONObject()
objectLink.SetByPath("to", easyjson.NewJSON(objectToID))
objectLink.SetByPath("type", easyjson.NewJSON(linkType))
objectLink.SetByPath("body", ctx.Payload.GetByPath("body"))
if ctx.Payload.PathExists("tags") {
objectLink.SetByPath("tags", ctx.Payload.GetByPath("tags"))
Expand All @@ -766,6 +778,17 @@ func UpdateObjectsLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunCont

options := ctx.Options.Clone()
options.SetByPath("op_stack", easyjson.NewJSON(true))

operationKeysMutexLock(ctx, []string{selfID, objectToID}, true)
_, _, linkType, err := getReferenceLinkTypeBetweenTwoObjects(ctx, selfID, objectToID)
if err != nil {
operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgFailed(err.Error())).Reply()
return
}

objectLink.SetByPath("type", easyjson.NewJSON(linkType))

om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.update", makeSequenceFreeParentBasedID(ctx, selfID), injectParentHoldsLocks(ctx, &objectLink), &options)))
operationKeysMutexUnlock(ctx)

Expand Down
83 changes: 82 additions & 1 deletion embedded/graph/crud/hl_crud_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/foliagecp/easyjson"
"github.com/foliagecp/sdk/statefun"
Expand All @@ -27,6 +28,69 @@ const (
BUILT_IN_OBJECT_NAV = "nav"
)

// ----------------------------
var (
// key: objectID -> typeID
objectTypeCache sync.Map
// key: fromType -> *sync.Map(toType -> objectLinkType)
type2TypeObjectLinkTypeCache sync.Map
)

func cacheGetObjectType(objectID string) (string, bool) {
if v, ok := objectTypeCache.Load(objectID); ok {
if s, ok := v.(string); ok && s != "" {
return s, true
}
}
return "", false
}
func cacheSetObjectType(objectID, typeID string) { objectTypeCache.Store(objectID, typeID) }
func cacheDeleteObjectType(objectID string) { objectTypeCache.Delete(objectID) }

func cacheGetTypeEdge(fromType, toType string) (string, bool) {
if v, ok := type2TypeObjectLinkTypeCache.Load(fromType); ok {
if m, ok := v.(*sync.Map); ok {
if lt, ok := m.Load(toType); ok {
if s, ok := lt.(string); ok && s != "" {
return s, true
}
}
}
}
return "", false
}
func cacheSetTypeEdge(fromType, toType, linkType string) {
var m *sync.Map
if v, ok := type2TypeObjectLinkTypeCache.Load(fromType); ok {
m, _ = v.(*sync.Map)
}
if m == nil {
m = &sync.Map{}
type2TypeObjectLinkTypeCache.Store(fromType, m)
}
m.Store(toType, linkType)
}
func cacheDeleteTypeEdge(fromType, toType string) {
if v, ok := type2TypeObjectLinkTypeCache.Load(fromType); ok {
if m, ok := v.(*sync.Map); ok {
m.Delete(toType)
}
}
}
func cachePurgeTypeEdgesForType(typeID string) {
// outcome
type2TypeObjectLinkTypeCache.Delete(typeID)
// income
type2TypeObjectLinkTypeCache.Range(func(k, v any) bool {
if m, ok := v.(*sync.Map); ok {
m.Delete(typeID)
}
return true
})
}

// -----------------------------------------------------------------------------

func typeOperationRedirectedToHub(ctx *sfPlugins.StatefunContextProcessor) bool {
if ctx.Domain.Name() != ctx.Domain.HubDomainName() {
om := sfMediators.NewOpMediator(ctx)
Expand Down Expand Up @@ -110,7 +174,15 @@ func getTypeTriggers(ctx *sfPlugins.StatefunContextProcessor, typeName string) *
return easyjson.NewJSONObject().GetPtr()
}

func FindObjectType(ctx *sfPlugins.StatefunContextProcessor, objectID string) (string, error) {
return findObjectType(ctx, objectID)
}

func findObjectType(ctx *sfPlugins.StatefunContextProcessor, objectID string) (string, error) {
if t, ok := cacheGetObjectType(objectID); ok {
return t, nil
}

options := easyjson.NewJSONObject()
if ctx.Options != nil {
options = ctx.Options.Clone()
Expand All @@ -120,8 +192,11 @@ func findObjectType(ctx *sfPlugins.StatefunContextProcessor, objectID string) (s

som := sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.cmdb.api.object.read", makeSequenceFreeParentBasedID(ctx, id), injectParentHoldsLocks(ctx, nil), &options))
if som.Status == sfMediators.SYNC_OP_STATUS_OK {
return som.Data.GetByPath("type").AsStringDefault(""), nil
tp := som.Data.GetByPath("type").AsStringDefault("")
cacheSetObjectType(objectID, tp)
return tp, nil
}

return "", fmt.Errorf(som.Details)
}

Expand Down Expand Up @@ -174,6 +249,10 @@ func getReferenceLinkTypeBetweenTwoObjects(ctx *sfPlugins.StatefunContextProcess
}

func getObjectsLinkTypeFromTypesLink(ctx *sfPlugins.StatefunContextProcessor, fromType, toType string) (string, error) {
if lt, ok := cacheGetTypeEdge(fromType, toType); ok {
return lt, nil
}

linkBody, err := getLinkBody(ctx, fromType, toType)
if err != nil {
return "", err
Expand All @@ -183,6 +262,8 @@ func getObjectsLinkTypeFromTypesLink(ctx *sfPlugins.StatefunContextProcessor, fr
if !ok {
return "", fmt.Errorf("type of a link was not defined in link type")
}

cacheSetTypeEdge(fromType, toType, linkType)
return linkType, nil
}

Expand Down
77 changes: 44 additions & 33 deletions embedded/graph/crud/ll_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,24 +379,29 @@ Reply:
op_stack: json array - optional
*/
func LLAPIVertexRead(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextProcessor) {
details := ctx.Payload.GetByPath("details").AsBoolDefault(false)

selfID := getOriginalID(ctx.Self.ID)

om := sfMediators.NewOpMediator(ctx)

operationKeysMutexLock(ctx, []string{selfID}, false)
_, err := ctx.Domain.Cache().GetValueJSON(selfID)
if details {
operationKeysMutexLock(ctx, []string{selfID}, false)
}
j, err := ctx.Domain.Cache().GetValueJSON(selfID)
if err != nil { // If vertex does not exist
operationKeysMutexUnlock(ctx)
if details {
operationKeysMutexUnlock(ctx)
}
om.AggregateOpMsg(sfMediators.OpMsgIdle(fmt.Sprintf("vertex with id=%s does not exist", selfID))).Reply()
return
}

opStack := getOpStackFromOptions(ctx.Options)

j := getVertexBody(ctx, selfID)
result := easyjson.NewJSONObjectWithKeyValue("body", *j)

if ctx.Payload.GetByPath("details").AsBoolDefault(false) {
if details {
outLinkNames := []string{}
outLinkTypes := []string{}
outLinkIds := []string{}
Expand Down Expand Up @@ -448,8 +453,6 @@ func LLAPIVertexRead(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
result.SetByPath("links.out.ids", easyjson.NewJSON(outLinkIds))

result.SetByPath("links.in", inLinks)
} else {
operationKeysMutexUnlock(ctx)
}

addVertexOpToOpStack(opStack, ctx.Self.Typename, selfID, nil, nil)
Expand Down Expand Up @@ -579,16 +582,20 @@ func LLAPILinkCreate(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
}

// Create in link on descendant vertex --------------------
nextCallPayload := easyjson.NewJSONObject()
nextCallPayload.SetByPath("in_name", easyjson.NewJSON(linkName))
nextCallPayload.SetByPath("in_type", easyjson.NewJSON(linkType))
nextCallPayload.SetByPath("op_time", easyjson.NewJSON(opTime))
if ctx.Domain.GetDomainFromObjectID(toId) == ctx.Domain.Name() {
ctx.Domain.Cache().SetValue(fmt.Sprintf(InLinkKeyPrefPattern+KeySuff2Pattern, toId, selfID, linkName), []byte(linkType), true, opTime, "")
} else {
nextCallPayload := easyjson.NewJSONObject()
nextCallPayload.SetByPath("in_name", easyjson.NewJSON(linkName))
nextCallPayload.SetByPath("in_type", easyjson.NewJSON(linkType))
nextCallPayload.SetByPath("op_time", easyjson.NewJSON(opTime))

om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, ctx.Self.Typename, makeSequenceFreeParentBasedID(ctx, toId, "inlink"), injectParentHoldsLocks(ctx, &nextCallPayload), ctx.Options)))
if om.GetLastSyncOp().Status == sfMediators.SYNC_OP_STATUS_FAILED {
operationKeysMutexUnlock(ctx)
system.MsgOnErrorReturn(om.ReplyWithData(resultWithOpStack(nil, opStack).GetPtr()))
return
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, ctx.Self.Typename, makeSequenceFreeParentBasedID(ctx, toId, "inlink"), injectParentHoldsLocks(ctx, &nextCallPayload), ctx.Options)))
if om.GetLastSyncOp().Status == sfMediators.SYNC_OP_STATUS_FAILED {
operationKeysMutexUnlock(ctx)
system.MsgOnErrorReturn(om.ReplyWithData(resultWithOpStack(nil, opStack).GetPtr()))
return
}
}
// --------------------------------------------------------

Expand Down Expand Up @@ -844,15 +851,19 @@ func LLAPILinkDelete(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
addLinkOpToOpStack(opStack, ctx.Self.Typename, selfID, toId, linkName, linkType, oldLinkBody, nil)

// Delete in link on descendant vertex --------------------
nextCallPayload := easyjson.NewJSONObject()
nextCallPayload.SetByPath("in_name", easyjson.NewJSON(linkName))
nextCallPayload.SetByPath("op_time", easyjson.NewJSON(opTime))
if ctx.Domain.GetDomainFromObjectID(toId) == ctx.Domain.Name() {
ctx.Domain.Cache().DeleteValue(fmt.Sprintf(InLinkKeyPrefPattern+KeySuff2Pattern, toId, selfID, linkName), true, opTime, "")
} else {
nextCallPayload := easyjson.NewJSONObject()
nextCallPayload.SetByPath("in_name", easyjson.NewJSON(linkName))
nextCallPayload.SetByPath("op_time", easyjson.NewJSON(opTime))

om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, ctx.Self.Typename, makeSequenceFreeParentBasedID(ctx, toId, "inlink"), injectParentHoldsLocks(ctx, &nextCallPayload), ctx.Options)))
if om.GetLastSyncOp().Status == sfMediators.SYNC_OP_STATUS_FAILED {
operationKeysMutexUnlock(ctx)
system.MsgOnErrorReturn(om.ReplyWithData(resultWithOpStack(nil, opStack).GetPtr()))
return
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, ctx.Self.Typename, makeSequenceFreeParentBasedID(ctx, toId, "inlink"), injectParentHoldsLocks(ctx, &nextCallPayload), ctx.Options)))
if om.GetLastSyncOp().Status == sfMediators.SYNC_OP_STATUS_FAILED {
operationKeysMutexUnlock(ctx)
system.MsgOnErrorReturn(om.ReplyWithData(resultWithOpStack(nil, opStack).GetPtr()))
return
}
}
// --------------------------------------------------------

Expand Down Expand Up @@ -898,32 +909,34 @@ func LLAPILinkRead(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextP

opStack := getOpStackFromOptions(ctx.Options)

//operationKeysMutexLock(ctx, []string{selfID}, true)
linkType, linkName, toId, linkExists := getFullLinkInfoFromSpecifiedIdentifier(ctx)
if !linkExists {
//operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgIdle(fmt.Sprintf("link from=%s with name=%s does not exist", ctx.Self.ID, linkName))).Reply()
return
}
if !validLinkName.MatchString(linkName) {
//operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgFailed("invalid link name")).Reply()
return
}
//operationKeysMutexUnlock(ctx)

operationKeysMutexLock(ctx, []string{selfID, toId}, false)
details := ctx.Payload.GetByPath("details").AsBoolDefault(false)

if details {
operationKeysMutexLock(ctx, []string{selfID, toId}, false)
}

linkBody, err := ctx.Domain.Cache().GetValueJSON(fmt.Sprintf(OutLinkBodyKeyPrefPattern+KeySuff1Pattern, selfID, linkName))
if err != nil {
operationKeysMutexUnlock(ctx)
if details {
operationKeysMutexUnlock(ctx)
}
om.AggregateOpMsg(sfMediators.OpMsgFailed(fmt.Sprintf("link body from=%s with name=%s does not exist", selfID, linkName))).Reply()
return
}

result := easyjson.NewJSONObjectWithKeyValue("body", *linkBody)

if ctx.Payload.GetByPath("details").AsBoolDefault(false) {
if details {
tags := []string{}
tagKeys := ctx.Domain.Cache().GetKeysByPattern(fmt.Sprintf(OutLinkIndexPrefPattern+KeySuff3Pattern, selfID, linkName, "tag", ">"))
operationKeysMutexUnlock(ctx)
Expand All @@ -939,8 +952,6 @@ func LLAPILinkRead(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextP
result.SetByPath("to", easyjson.NewJSON(toId))

result.SetByPath("tags", easyjson.NewJSON(tags))
} else {
operationKeysMutexUnlock(ctx)
}

addLinkOpToOpStack(opStack, ctx.Self.Typename, selfID, toId, linkName, linkType, nil, nil)
Expand Down
Loading
Loading