Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/dgraph-io/ristretto/v2 v2.4.0
github.com/eko/gocache/lib/v4 v4.2.0
github.com/eko/gocache/store/ristretto/v4 v4.3.2
github.com/exaring/otelpgx v0.10.0
github.com/fsnotify/fsnotify v1.9.0
github.com/go-chi/cors v1.2.1
github.com/go-playground/validator/v10 v10.26.0
Expand Down
2 changes: 2 additions & 0 deletions service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ github.com/eko/gocache/lib/v4 v4.2.0 h1:MNykyi5Xw+5Wu3+PUrvtOCaKSZM1nUSVftbzmeC7
github.com/eko/gocache/lib/v4 v4.2.0/go.mod h1:7ViVmbU+CzDHzRpmB4SXKyyzyuJ8A3UW3/cszpcqB4M=
github.com/eko/gocache/store/ristretto/v4 v4.3.2 h1:DfvjqmB6hPHJ9oduReMohe8rZCVtxmY8OqTkmIu+dk0=
github.com/eko/gocache/store/ristretto/v4 v4.3.2/go.mod h1:1F6nJFAY6fTx/UVd66iYr26V2GzZbVJqQJSl+CkRGh4=
github.com/exaring/otelpgx v0.10.0 h1:NGGegdoBQM3jNZDKG8ENhigUcgBN7d7943L0YlcIpZc=
github.com/exaring/otelpgx v0.10.0/go.mod h1:R5/M5LWsPPBZc1SrRE5e0DiU48bI78C1/GPTWs6I66U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down
126 changes: 115 additions & 11 deletions service/internal/access/v2/just_in_time_pdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/opentdf/platform/protocol/go/policy/subjectmapping"
otdfSDK "github.com/opentdf/platform/sdk"
ctxAuth "github.com/opentdf/platform/service/pkg/auth"
"github.com/opentdf/platform/service/tracing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/opentdf/platform/service/internal/access/v2/obligations"
Expand All @@ -36,6 +41,7 @@ var (
type JustInTimePDP struct {
logger *logger.Logger
sdk *otdfSDK.SDK
tracer trace.Tracer
// embedded entitlement PDP
pdp *PolicyDecisionPoint
// embedded obligations PDP
Expand Down Expand Up @@ -66,33 +72,49 @@ func NewJustInTimePDP(
p := &JustInTimePDP{
sdk: sdk,
logger: log,
tracer: otel.Tracer(tracing.ServiceName),
}

ctx, span := p.tracer.Start(ctx, "NewJustInTimePDP")
defer span.End()

// If no store is provided, have EntitlementPolicyRetriever fetch from policy services
if !store.IsEnabled() || !store.IsReady(ctx) {
cacheUsed := store.IsEnabled() && store.IsReady(ctx)
span.SetAttributes(attribute.Bool("cache.hit", cacheUsed))
if !cacheUsed {
Comment thread
pflynn-virtru marked this conversation as resolved.
log.DebugContext(ctx, "no EntitlementPolicyStore provided or not yet ready, will retrieve directly from policy services")
store = NewEntitlementPolicyRetriever(sdk)
}

allAttributes, err := store.ListAllAttributes(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to list attributes")
return nil, fmt.Errorf("failed to list cached attributes: %w", err)
}
allSubjectMappings, err := store.ListAllSubjectMappings(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to list subject mappings")
return nil, fmt.Errorf("failed to list cached subject mappings: %w", err)
}
allRegisteredResources, err := store.ListAllRegisteredResources(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to fetch registered resources")
return nil, fmt.Errorf("failed to fetch all registered resources: %w", err)
}
allObligations, err := store.ListAllObligations(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to fetch obligations")
return nil, fmt.Errorf("failed to fetch all obligations: %w", err)
}

pdp, err := NewPolicyDecisionPoint(ctx, log, allAttributes, allSubjectMappings, allRegisteredResources, allowDirectEntitlements)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to create PDP")
return nil, fmt.Errorf("failed to create new policy decision point: %w", err)
}
p.pdp = pdp
Expand All @@ -105,6 +127,8 @@ func NewJustInTimePDP(
allObligations,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to create obligations PDP")
return nil, fmt.Errorf("failed to create new obligations policy decision point: %w", err)
}
p.obligationsPDP = obligationsPDP
Expand Down Expand Up @@ -140,37 +164,56 @@ func (p *JustInTimePDP) GetDecision(
requestContext *policy.RequestContext,
fulfillableObligationValueFQNs []string,
) (*Decision, error) {
ctx, span := p.tracer.Start(ctx, "JustInTimePDP.GetDecision")
Comment thread
pflynn-virtru marked this conversation as resolved.
defer span.End()
span.SetAttributes(attribute.Int("resource.count", len(resources)))

var (
entityRepresentations []*entityresolutionV2.EntityRepresentation
err error
skipEnvironmentEntities = true
)

// Because there are three possible types of entities, check obligations first to more easily handle decisioning logic
oblCtx, oblSpan := p.tracer.Start(ctx, "EvaluateObligations")
obligationDecision, err := p.obligationsPDP.GetAllTriggeredObligationsAreFulfilled(
ctx,
oblCtx,
resources,
action,
requestContext,
fulfillableObligationValueFQNs,
)
if err != nil {
oblSpan.RecordError(err)
oblSpan.SetStatus(codes.Error, err.Error())
}
oblSpan.End()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to check obligations")
return nil, fmt.Errorf("failed to check obligations: %w", err)
}
hasRequiredObligations := len(obligationDecision.RequiredObligationValueFQNs) > 0
allObligationsSatisfied := (!hasRequiredObligations || obligationDecision.AllObligationsSatisfied)

resolveCtx, resolveSpan := p.tracer.Start(ctx, "ResolveEntities")
switch entityIdentifier.GetIdentifier().(type) {
case *authzV2.EntityIdentifier_EntityChain:
entityRepresentations, err = p.resolveEntitiesFromEntityChain(ctx, entityIdentifier.GetEntityChain(), skipEnvironmentEntities)
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "entity_chain"))
entityRepresentations, err = p.resolveEntitiesFromEntityChain(resolveCtx, entityIdentifier.GetEntityChain(), skipEnvironmentEntities)

case *authzV2.EntityIdentifier_Token:
entityRepresentations, err = p.resolveEntitiesFromToken(ctx, entityIdentifier.GetToken(), skipEnvironmentEntities, resources)
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "token"))
entityRepresentations, err = p.resolveEntitiesFromToken(resolveCtx, entityIdentifier.GetToken(), skipEnvironmentEntities, resources)

case *authzV2.EntityIdentifier_WithRequestToken:
entityRepresentations, err = p.resolveEntitiesFromRequestToken(ctx, entityIdentifier.GetWithRequestToken(), skipEnvironmentEntities, resources)
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "with_request_token"))
entityRepresentations, err = p.resolveEntitiesFromRequestToken(resolveCtx, entityIdentifier.GetWithRequestToken(), skipEnvironmentEntities, resources)

case *authzV2.EntityIdentifier_RegisteredResourceValueFqn:
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "registered_resource"))
resolveSpan.End()

regResValueFQN := strings.ToLower(entityIdentifier.GetRegisteredResourceValueFqn())
// Registered resources do not have entity representations, so only one decision is made
decision, entitlements, err := p.pdp.GetDecisionRegisteredResource(ctx, regResValueFQN, action, resources)
Expand Down Expand Up @@ -204,23 +247,51 @@ func (p *JustInTimePDP) GetDecision(
return decision, nil

default:
resolveSpan.RecordError(ErrInvalidEntityType)
resolveSpan.SetStatus(codes.Error, ErrInvalidEntityType.Error())
resolveSpan.End()
Comment thread
pflynn-virtru marked this conversation as resolved.
span.RecordError(ErrInvalidEntityType)
span.SetStatus(codes.Error, ErrInvalidEntityType.Error())
return nil, ErrInvalidEntityType
}
if err != nil {
resolveSpan.RecordError(err)
resolveSpan.SetStatus(codes.Error, err.Error())
}
resolveSpan.End()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to resolve entity identifier")
return nil, fmt.Errorf("failed to resolve entity identifier: %w", err)
}

// Get a decision on each entity representation and consolidate into an overall decision
evalCtx, evalSpan := p.tracer.Start(ctx, "EvaluateDecision")
evalSpan.SetAttributes(
attribute.Int("entity_representation.count", len(entityRepresentations)),
)

var resourceDecisionsAcrossAllEntityReps []ResourceDecision
allPermitted := true

for _, entityRep := range entityRepresentations {
entityRepresentationDecision, entitlements, err := p.pdp.GetDecision(ctx, entityRep, action, resources)
entityRepresentationDecision, entitlements, err := p.pdp.GetDecision(evalCtx, entityRep, action, resources)
if err != nil {
evalSpan.RecordError(err)
evalSpan.SetStatus(codes.Error, err.Error())
evalSpan.End()
span.RecordError(err)
span.SetStatus(codes.Error, "failed to get decision for entity representation")
return nil, fmt.Errorf("failed to get decision for entityRepresentation with original id [%s]: %w", entityRep.GetOriginalId(), err)
}
if entityRepresentationDecision == nil {
return nil, fmt.Errorf("decision is nil: %w", err)
nilErr := fmt.Errorf("decision is nil for entity representation [%s]", entityRep.GetOriginalId())
evalSpan.RecordError(nilErr)
evalSpan.SetStatus(codes.Error, nilErr.Error())
evalSpan.End()
Comment thread
pflynn-virtru marked this conversation as resolved.
span.RecordError(nilErr)
span.SetStatus(codes.Error, nilErr.Error())
return nil, nilErr
}

// If any entity lacks access to any resource, update overall decision denial
Expand All @@ -237,6 +308,11 @@ func (p *JustInTimePDP) GetDecision(
obligationDecision,
)
if err != nil {
evalSpan.RecordError(err)
evalSpan.SetStatus(codes.Error, err.Error())
evalSpan.End()
Comment thread
pflynn-virtru marked this conversation as resolved.
span.RecordError(err)
span.SetStatus(codes.Error, "failed to apply obligations and consolidate")
return nil, fmt.Errorf("failed to apply obligations and consolidate for entity representation [%s]: %w", entityRep.GetOriginalId(), err)
}

Expand All @@ -253,6 +329,7 @@ func (p *JustInTimePDP) GetDecision(
auditResourceDecisions,
)
}
evalSpan.End()

allEntitledWithAllObligationsSatisfied := allPermitted && allObligationsSatisfied
return &Decision{
Expand All @@ -268,6 +345,9 @@ func (p *JustInTimePDP) GetEntitlements(
entityIdentifier *authzV2.EntityIdentifier,
withComprehensiveHierarchy bool,
) ([]*authzV2.EntityEntitlements, error) {
ctx, span := p.tracer.Start(ctx, "JustInTimePDP.GetEntitlements")
Comment thread
pflynn-virtru marked this conversation as resolved.
defer span.End()

p.logger.DebugContext(ctx, "getting entitlements - resolving entity chain")

var (
Expand All @@ -276,31 +356,53 @@ func (p *JustInTimePDP) GetEntitlements(
skipEnvironmentEntities = false
)

resolveCtx, resolveSpan := p.tracer.Start(ctx, "ResolveEntities")
switch entityIdentifier.GetIdentifier().(type) {
case *authzV2.EntityIdentifier_EntityChain:
entityRepresentations, err = p.resolveEntitiesFromEntityChain(ctx, entityIdentifier.GetEntityChain(), skipEnvironmentEntities)
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "entity_chain"))
entityRepresentations, err = p.resolveEntitiesFromEntityChain(resolveCtx, entityIdentifier.GetEntityChain(), skipEnvironmentEntities)

case *authzV2.EntityIdentifier_Token:
entityRepresentations, err = p.resolveEntitiesFromToken(ctx, entityIdentifier.GetToken(), skipEnvironmentEntities, []*authzV2.Resource{})
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "token"))
entityRepresentations, err = p.resolveEntitiesFromToken(resolveCtx, entityIdentifier.GetToken(), skipEnvironmentEntities, []*authzV2.Resource{})

case *authzV2.EntityIdentifier_RegisteredResourceValueFqn:
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "registered_resource"))
resolveSpan.End()

p.logger.DebugContext(ctx, "getting entitlements - resolving registered resource value FQN")
regResValueFQN := strings.ToLower(entityIdentifier.GetRegisteredResourceValueFqn())
// registered resources do not have entity representations, so we can skip the remaining logic
return p.pdp.GetEntitlementsRegisteredResource(ctx, regResValueFQN, withComprehensiveHierarchy)

case *authzV2.EntityIdentifier_WithRequestToken:
entityRepresentations, err = p.resolveEntitiesFromRequestToken(ctx, entityIdentifier.GetWithRequestToken(), skipEnvironmentEntities, []*authzV2.Resource{})
resolveSpan.SetAttributes(attribute.String("entity_identifier.type", "with_request_token"))
entityRepresentations, err = p.resolveEntitiesFromRequestToken(resolveCtx, entityIdentifier.GetWithRequestToken(), skipEnvironmentEntities, []*authzV2.Resource{})

default:
return nil, fmt.Errorf("entity type %T: %w", entityIdentifier.GetIdentifier(), ErrInvalidEntityType)
defaultErr := fmt.Errorf("entity type %T: %w", entityIdentifier.GetIdentifier(), ErrInvalidEntityType)
resolveSpan.RecordError(defaultErr)
resolveSpan.SetStatus(codes.Error, defaultErr.Error())
resolveSpan.End()
Comment thread
pflynn-virtru marked this conversation as resolved.
span.RecordError(defaultErr)
span.SetStatus(codes.Error, defaultErr.Error())
return nil, defaultErr
}
if err != nil {
resolveSpan.RecordError(err)
resolveSpan.SetStatus(codes.Error, err.Error())
}
resolveSpan.End()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to resolve entities")
return nil, fmt.Errorf("failed to resolve entities from entity identifier: %w", err)
}

matchedSubjectMappings, err := p.getMatchedSubjectMappings(ctx, entityRepresentations)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to get matched subject mappings")
return nil, fmt.Errorf("failed to get matched subject mappings: %w", err)
}
// If no subject mappings matched, return empty entitlements
Expand All @@ -311,6 +413,8 @@ func (p *JustInTimePDP) GetEntitlements(

entitlements, err := p.pdp.GetEntitlements(ctx, entityRepresentations, matchedSubjectMappings, withComprehensiveHierarchy)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to get entitlements")
return nil, fmt.Errorf("failed to get entitlements: %w", err)
}
return entitlements, nil
Expand Down
5 changes: 5 additions & 0 deletions service/pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

sq "github.com/Masterminds/squirrel"
"github.com/exaring/otelpgx"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -250,6 +251,10 @@ func (c Config) buildConfig() (*pgxpool.Config, error) {
parsed.MaxConnIdleTime = time.Duration(c.Pool.MaxConnIdleTime) * time.Second
parsed.HealthCheckPeriod = time.Duration(c.Pool.HealthCheckPeriod) * time.Second

// Instrument all database queries with OpenTelemetry tracing via pgx native tracer interface.
// When tracing is disabled, the global provider is noop and spans are zero-cost.
parsed.ConnConfig.Tracer = otelpgx.NewTracer()

// Configure the search_path schema immediately on connection opening
parsed.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Exec(ctx, "SET search_path TO "+pgx.Identifier{c.Schema}.Sanitize())
Expand Down
Loading