cerbos golang 内部policy check 处理简单说明

发布时间 2023-12-06 13:00:42作者: 荣锋亮

主要是想尝试实现一个rust 的check 方法,所以先研究下golang 的内部实现

  • CheckResources
func (cs *CerbosService) CheckResources(ctx context.Context, req *requestv1.CheckResourcesRequest) (*responsev1.CheckResourcesResponse, error) {
    log := logging.ReqScopeLog(ctx)
    if err := cs.checkNumResourcesLimit(len(req.Resources)); err != nil {
        log.Error("Request too large", zap.Error(err))
        return nil, err
    }
 
   // 数据提取处理,主要是一些其他数据,比较jwt 认证的
    auxData, err := cs.auxData.Extract(ctx, req.AuxData)
    if err != nil {
        log.Error("Failed to extract auxData", zap.Error(err))
        return nil, status.Error(codes.InvalidArgument, "invalid auxData")
    }
    inputs := make([]*enginev1.CheckInput, len(req.Resources))
    for i, res := range req.Resources {
        if err := cs.checkNumActionsLimit(len(res.Actions)); err != nil {
            log.Error("Request too large", zap.Error(err))
            return nil, err
        }
 
        inputs[i] = &enginev1.CheckInput{
            RequestId: req.RequestId,
            Actions:   res.Actions,
            Principal: req.Principal,
            Resource:  res.Resource,
            AuxData:   auxData,
        }
    }
   // 基于engine 提供的check 方法,此方法也是我们需要注意的
    outputs, err := cs.eng.Check(logging.ToContext(ctx, log), inputs)
    if err != nil {
        log.Error("Policy check failed", zap.Error(err))
        if errors.Is(err, compile.PolicyCompilationErr{}) {
            return nil, status.Errorf(codes.FailedPrecondition, "Check failed due to invalid policy")
        }
        return nil, status.Errorf(codes.Internal, "Policy check failed")
    }
 
    result := &responsev1.CheckResourcesResponse{
        RequestId: req.RequestId,
        Results:   make([]*responsev1.CheckResourcesResponse_ResultEntry, len(outputs)),
    }
   // 对于多个返回的check 结果,需要转换为标准格式的输出
    for i, out := range outputs {
        resource := inputs[i].Resource
        entry := &responsev1.CheckResourcesResponse_ResultEntry{
            Resource: &responsev1.CheckResourcesResponse_ResultEntry_Resource{
                Id:            resource.Id,
                Kind:          resource.Kind,
                PolicyVersion: resource.PolicyVersion,
                Scope:         resource.Scope,
            },
            ValidationErrors: out.ValidationErrors,
            Actions:          make(map[string]effectv1.Effect, len(out.Actions)),
        }
 
        if req.IncludeMeta {
            entry.Meta = &responsev1.CheckResourcesResponse_ResultEntry_Meta{
                EffectiveDerivedRoles: out.EffectiveDerivedRoles,
                Actions:               make(map[string]*responsev1.CheckResourcesResponse_ResultEntry_Meta_EffectMeta, len(out.Actions)),
            }
        }
 
        if len(out.Outputs) > 0 {
            entry.Outputs = out.Outputs
        }
 
        for action, actionEffect := range out.Actions {
            entry.Actions[action] = actionEffect.Effect
            if req.IncludeMeta {
                entry.Meta.Actions[action] = &responsev1.CheckResourcesResponse_ResultEntry_Meta_EffectMeta{
                    MatchedPolicy: actionEffect.Policy,
                    MatchedScope:  actionEffect.Scope,
                }
            }
        }
 
        result.Results[i] = entry
    }
 
    return result, nil
}
  • engine check 处理
func (engine *Engine) Check(ctx context.Context, inputs []*enginev1.CheckInput, opts ...CheckOpt) ([]*enginev1.CheckOutput, error) {
    outputs, err := metrics.RecordDuration2(metrics.EngineCheckLatency(), func() (outputs []*enginev1.CheckOutput, err error) {
        ctx, span := tracing.StartSpan(ctx, "engine.Check")
        defer span.End()
 
        checkOpts := newCheckOptions(ctx, engine.conf, opts...)
        // 包含了串行以及并行的check
        // if the number of inputs is less than the threshold, do a serial execution as it is usually faster.
        // ditto if the worker pool is not initialized
        if len(inputs) < parallelismThreshold || len(engine.workerPool) == 0 {
            outputs, err = engine.checkSerial(ctx, inputs, checkOpts)
        } else {
            outputs, err = engine.checkParallel(ctx, inputs, checkOpts)
        }
 
        if err != nil {
            tracing.MarkFailed(span, http.StatusBadRequest, err)
        }
 
        return outputs, err
    })
    metrics.EngineCheckBatchSize().Record(context.Background(), int64(len(inputs)))
 
    return engine.logCheckDecision(ctx, inputs, outputs, err)
}
  • checkSerial 以及checkParallel 核心的计算(evaluate)
func (engine *Engine) evaluate(ctx context.Context, input *enginev1.CheckInput, checkOpts *CheckOptions) (*enginev1.CheckOutput, error) {
    ctx, span := tracing.StartSpan(ctx, "engine.Evaluate")
    defer span.End()
 
    span.SetAttributes(tracing.RequestID(input.RequestId), tracing.ReqResourceID(input.Resource.Id))
 
    // exit early if the context is cancelled
    if err := ctx.Err(); err != nil {
        tracing.MarkFailed(span, http.StatusRequestTimeout, err)
        return nil, err
    }
 
    output := &enginev1.CheckOutput{
        RequestId:  input.RequestId,
        ResourceId: input.Resource.Id,
        Actions:    make(map[string]*enginev1.CheckOutput_ActionEffect, len(input.Actions)),
    }
    // 先构建上下文
    ec, err := engine.buildEvaluationCtx(ctx, checkOpts.evalParams, input)
    if err != nil {
        return nil, err
    }
 
    tctx := tracer.Start(checkOpts.tracerSink)
 
    // evaluate the policies,执行策略
    result, err := ec.evaluate(ctx, tctx, input)
    if err != nil {
        logging.FromContext(ctx).Error("Failed to evaluate policies", zap.Error(err))
        return nil, fmt.Errorf("failed to evaluate policies: %w", err)
    }
 
    // update the output
    for _, action := range input.Actions {
        output.Actions[action] = &enginev1.CheckOutput_ActionEffect{
            Effect: defaultEffect,
            Policy: noPolicyMatch,
        }
 
        if einfo, ok := result.effects[action]; ok {
            ae := output.Actions[action]
            ae.Effect = einfo.Effect
            ae.Policy = einfo.Policy
            ae.Scope = einfo.Scope
        }
    }
 
    output.EffectiveDerivedRoles = result.effectiveDerivedRoles
    output.ValidationErrors = result.validationErrors
    output.Outputs = result.outputs
 
    return output, nil
}
  • buildEvaluationCtx 处理
func (engine *Engine) buildEvaluationCtx(ctx context.Context, eparams evalParams, input *enginev1.CheckInput) (*evaluationCtx, error) {
    ec := &evaluationCtx{}
 
    principal 以及resource policy 的处理,后边实际执行需要使用具体的实现进行evaluate
 
    // get the principal policy check
    ppName, ppVersion, ppScope := engine.policyAttr(input.Principal.Id, input.Principal.PolicyVersion, input.Principal.Scope)
    ppCheck, err := engine.getPrincipalPolicyEvaluator(ctx, eparams, ppName, ppVersion, ppScope)
    if err != nil {
        return nil, fmt.Errorf("failed to get check for [%s.%s]: %w", ppName, ppVersion, err)
    }
    ec.addCheck(ppCheck)
 
    // get the resource policy check
    rpName, rpVersion, rpScope := engine.policyAttr(input.Resource.Kind, input.Resource.PolicyVersion, input.Resource.Scope)
    rpCheck, err := engine.getResourcePolicyEvaluator(ctx, eparams, rpName, rpVersion, rpScope)
    if err != nil {
        return nil, fmt.Errorf("failed to get check for [%s.%s]: %w", rpName, rpVersion, err)
    }
    ec.addCheck(rpCheck)
 
    return ec, nil
}
  • evaluate实际执行
func (ec *evaluationCtx) evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*evaluationResult, error) {
    ctx, span := tracing.StartSpan(ctx, "engine.EvalCtxEvaluate")
    defer span.End()
 
    resp := &evaluationResult{}
    if ec.numChecks == 0 {
        tracing.MarkFailed(span, http.StatusNotFound, errNoPoliciesMatched)
 
        resp.setDefaultsForUnmatchedActions(tctx, input)
        return resp, nil
    }
 
    for i := 0; i < ec.numChecks; i++ {
        c := ec.checks[i]
 
        result, err := c.Evaluate(ctx, tctx, input)
        if err != nil {
            logging.FromContext(ctx).Error("Failed to evaluate policy", zap.Error(err))
            tracing.MarkFailed(span, http.StatusInternalServerError, err)
 
            return nil, fmt.Errorf("failed to execute policy: %w", err)
        }
 
        incomplete := resp.merge(result)
        if !incomplete {
            return resp, nil
        }
    }
 
    tracing.MarkFailed(span, http.StatusNotFound, errNoPoliciesMatched)
    resp.setDefaultsForUnmatchedActions(tctx, input)
 
    return resp, nil
}

对于Evaluate目前有三种实现,如下。包含了resource 以及principal

 

  • principalPolicyEvaluator 的处理
    内部处理都基于了google 的cel-go 包
 
func (ppe *principalPolicyEvaluator) Evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*PolicyEvalResult, error) {
    _, span := tracing.StartSpan(ctx, "principal_policy.Evaluate")
    span.SetAttributes(tracing.PolicyFQN(ppe.policy.Meta.Fqn))
    defer span.End()
 
    policyKey := namer.PolicyKeyFromFQN(ppe.policy.Meta.Fqn)
    evalCtx := newEvalContext(ppe.evalParams, checkInputToRequest(input))
    result := newEvalResult(input.Actions)
 
    pctx := tctx.StartPolicy(ppe.policy.Meta.Fqn)
    for _, p := range ppe.policy.Policies {
        actionsToResolve := result.unresolvedActions()
        if len(actionsToResolve) == 0 {
            return result, nil
        }
 
        sctx := pctx.StartScope(p.Scope)
        // evaluate the variables of this policy
        variables, err := evalCtx.evaluateVariables(sctx.StartVariables(), p.OrderedVariables)
        if err != nil {
            sctx.Failed(err, "Failed to evaluate variables")
            return nil, fmt.Errorf("failed to evaluate variables: %w", err)
        }
 
        for resource, resourceRules := range p.ResourceRules {
            rctx := sctx.StartResource(resource)
            if !util.MatchesGlob(resource, input.Resource.Kind) {
                rctx.Skipped(nil, "Did not match input resource kind")
                continue
            }
 
            for _, rule := range resourceRules.ActionRules {
                matchedActions := util.FilterGlob(rule.Action, actionsToResolve)
                ruleActivated := false
                for _, action := range matchedActions {
                    actx := rctx.StartAction(action)
                    ok, err := evalCtx.satisfiesCondition(actx.StartCondition(), rule.Condition, variables)
                    if err != nil {
                        actx.Skipped(err, "Error evaluating condition")
                        continue
                    }
 
                    if !ok {
                        actx.Skipped(nil, "Condition not satisfied")
                        continue
                    }
 
                    result.setEffect(action, EffectInfo{Effect: rule.Effect, Policy: policyKey, Scope: p.Scope})
                    actx.AppliedEffect(rule.Effect, "")
                    ruleActivated = true
                }
 
                // evaluate output expression if the rule was activated
                if ruleActivated && rule.Output != nil {
                    octx := rctx.StartOutput(rule.Name)
 
                    output := &enginev1.OutputEntry{
                        Src: namer.RuleFQN(ppe.policy.Meta, p.Scope, rule.Name),
                        Val: evalCtx.evaluateProtobufValueCELExpr(rule.Output.Checked, variables),
                    }
                    result.Outputs = append(result.Outputs, output)
 
                    octx.ComputedOutput(output)
                }
            }
        }
    }
 
    result.setDefaultEffect(pctx, EffectInfo{Effect: effectv1.Effect_EFFECT_NO_MATCH})
    return result, nil
}
  • resourcePolicyEvaluator 的处理
    内部处理都基于了google 的cel-go 包
 
func (rpe *resourcePolicyEvaluator) Evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*PolicyEvalResult, error) {
    _, span := tracing.StartSpan(ctx, "resource_policy.Evaluate")
    span.SetAttributes(tracing.PolicyFQN(rpe.policy.Meta.Fqn))
    defer span.End()
 
    policyKey := namer.PolicyKeyFromFQN(rpe.policy.Meta.Fqn)
    request := checkInputToRequest(input)
    result := newEvalResult(input.Actions)
    effectiveRoles := internal.ToSet(input.Principal.Roles)
 
    pctx := tctx.StartPolicy(rpe.policy.Meta.Fqn)
 
    // validate the input
    vr, err := rpe.schemaMgr.ValidateCheckInput(ctx, rpe.policy.Schemas, input)
    if err != nil {
        pctx.Failed(err, "Error during validation")
 
        return nil, fmt.Errorf("failed to validate input: %w", err)
    }
 
    if len(vr.Errors) > 0 {
        result.ValidationErrors = vr.Errors.SchemaErrors()
 
        pctx.Failed(vr.Errors, "Validation errors")
 
        if vr.Reject {
            for _, action := range input.Actions {
                actx := pctx.StartAction(action)
 
                result.setEffect(action, EffectInfo{Effect: effectv1.Effect_EFFECT_DENY, Policy: policyKey})
 
                actx.AppliedEffect(effectv1.Effect_EFFECT_DENY, "Rejected due to validation failures")
            }
            return result, nil
        }
    }
 
    // evaluate policies in the set
    for _, p := range rpe.policy.Policies {
        // Get the actions that are yet to be resolved. This is to implement first-match-wins semantics.
        // Within the context of a single policy, later rules can potentially override the result for an action (unless it was DENY).
        actionsToResolve := result.unresolvedActions()
        if len(actionsToResolve) == 0 {
            return result, nil
        }
 
        sctx := pctx.StartScope(p.Scope)
 
        evalCtx := newEvalContext(rpe.evalParams, request)
 
        // calculate the set of effective derived roles
        effectiveDerivedRoles := make(internal.StringSet, len(p.DerivedRoles))
        for drName, dr := range p.DerivedRoles {
            dctx := sctx.StartDerivedRole(drName)
            if !internal.SetIntersects(dr.ParentRoles, effectiveRoles) {
                dctx.Skipped(nil, "No matching roles")
                continue
            }
 
            // evaluate variables of this derived roles set
            drVariables, err := evalCtx.evaluateVariables(dctx.StartVariables(), dr.OrderedVariables)
            if err != nil {
                dctx.Skipped(err, "Error evaluating variables")
                continue
            }
 
            ok, err := evalCtx.satisfiesCondition(dctx.StartCondition(), dr.Condition, drVariables)
            if err != nil {
                dctx.Skipped(err, "Error evaluating condition")
                continue
            }
 
            if !ok {
                dctx.Skipped(nil, "Condition not satisfied")
                continue
            }
 
            effectiveDerivedRoles[drName] = struct{}{}
            result.EffectiveDerivedRoles[drName] = struct{}{}
 
            dctx.Activated()
        }
 
        evalCtx = evalCtx.withEffectiveDerivedRoles(effectiveDerivedRoles)
 
        // evaluate the variables of this policy
        variables, err := evalCtx.evaluateVariables(sctx.StartVariables(), p.OrderedVariables)
        if err != nil {
            sctx.Failed(err, "Failed to evaluate variables")
            return nil, fmt.Errorf("failed to evaluate variables: %w", err)
        }
 
        // evaluate each rule until all actions have a result
        for _, rule := range p.Rules {
            rctx := sctx.StartRule(rule.Name)
 
            if !internal.SetIntersects(rule.Roles, effectiveRoles) && !internal.SetIntersects(rule.DerivedRoles, evalCtx.effectiveDerivedRoles) {
                rctx.Skipped(nil, "No matching roles or derived roles")
                continue
            }
 
            ruleActivated := false
            for actionGlob := range rule.Actions {
                matchedActions := util.FilterGlob(actionGlob, actionsToResolve)
                for _, action := range matchedActions {
                    actx := rctx.StartAction(action)
                    ok, err := evalCtx.satisfiesCondition(actx.StartCondition(), rule.Condition, variables)
                    if err != nil {
                        actx.Skipped(err, "Error evaluating condition")
                        continue
                    }
 
                    if !ok {
                        actx.Skipped(nil, "Condition not satisfied")
                        continue
                    }
 
                    result.setEffect(action, EffectInfo{Effect: rule.Effect, Policy: policyKey, Scope: p.Scope})
                    actx.AppliedEffect(rule.Effect, "")
                    ruleActivated = true
                }
            }
 
            // evaluate output expression if the rule was activated
            if ruleActivated && rule.Output != nil {
                octx := rctx.StartOutput(rule.Name)
 
                output := &enginev1.OutputEntry{
                    Src: namer.RuleFQN(rpe.policy.Meta, p.Scope, rule.Name),
                    Val: evalCtx.evaluateProtobufValueCELExpr(rule.Output.Checked, variables),
                }
                result.Outputs = append(result.Outputs, output)
 
                octx.ComputedOutput(output)
            }
        }
    }
 
    // set the default effect for actions that were not matched
    result.setDefaultEffect(pctx, EffectInfo{Effect: effectv1.Effect_EFFECT_DENY, Policy: policyKey})
 
    return result, nil
}

说明

cerbos 内部check 的处理还是比较复杂的,而且属于cerbos 的核心部分

参考资料

https://cerbos.dev/
https://cerbos.dev/product-cerbos-hub
https://github.com/cerbos/cerbos
https://github.com/google/cel-spec