Beltic logo
Python SDK Guide

Trust Chains

Verify agent-developer trust chains with policy enforcement using the Python SDK.

Trust chain verification ensures that an AgentCredential is linked to a valid DeveloperCredential and that both meet your policy requirements.

Trust Chain Concept

Agent Credential ──links to──▶ Developer Credential ──issued by──▶ Beltic
       │                              │
       ▼                              ▼
  Safety scores               KYB verification
  Tool capabilities           Sanctions screening
  Data handling               Risk rating

When verifying an agent, you should:

  1. Verify the AgentCredential signature
  2. Fetch and verify the linked DeveloperCredential
  3. Check both credentials against your policy
  4. Optionally check revocation status

Basic Trust Chain Verification

from beltic import verify_agent_trust_chain, TrustChainOptions

result = await verify_agent_trust_chain(
    agent_token,
    TrustChainOptions(
        key_resolver=key_resolver,
        fetch_developer_credential=fetch_developer_cred,
    ),
)

print(f"Agent: {result.agent.credential['agentName']}")
print(f"Developer: {result.developer.credential['legalName']}")

With Policy Enforcement

from beltic import verify_agent_trust_chain, TrustChainOptions, TrustPolicy

result = await verify_agent_trust_chain(
    agent_token,
    TrustChainOptions(
        key_resolver=key_resolver,
        fetch_developer_credential=fetch_developer_cred,
        
        # Optional: Check revocation status
        check_status=check_status_callback,
        
        # Required policies
        policy=TrustPolicy(
            min_kyb_tier="tier_2",
            min_prompt_injection_score=80,
            min_pii_leakage_score=80,
            min_verification_level="beltic_verified",
            prohibited_data_categories=["children_data"],
        ),
    ),
)

# Check policy results
print(f"KYB OK: {result.policy['kybOk']}")
print(f"Safety OK: {result.policy['safetyOk']}")
print(f"Data Categories OK: {result.policy['dataCategoriesOk']}")

TrustChainOptions

class TrustChainOptions:
    # Required: Resolve public keys from DIDs
    key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
    
    # Required: Fetch developer credential by ID
    fetch_developer_credential: Callable[[str], Awaitable[str]]
    
    # Optional: Check revocation status
    check_status: Callable[[StatusCheckInput], Awaitable[CredentialStatus]] | None = None
    
    # Optional: Policy requirements
    policy: TrustPolicy | None = None

TrustPolicy

class TrustPolicy:
    # Developer KYB tier requirement
    min_kyb_tier: Literal["tier_1", "tier_2", "tier_3"] | None = None
    
    # Safety score requirements (0-100)
    min_prompt_injection_score: int | None = None
    min_pii_leakage_score: int | None = None
    min_tool_abuse_score: int | None = None
    min_harm_refusal_score: int | None = None
    
    # Verification level requirement
    min_verification_level: Literal[
        "self_attested",
        "beltic_verified",
        "third_party_verified"
    ] | None = None
    
    # Data categories that must NOT be processed
    prohibited_data_categories: list[str] | None = None

Callback Functions

Key Resolver

async def key_resolver(input: KeyResolverInput) -> CryptoKey:
    """Resolve public key from DID or key registry."""
    # input.kid - Key ID (e.g., "did:web:beltic.com#key-1")
    # input.alg - Algorithm (e.g., "EdDSA")
    # input.iss - Issuer DID
    
    # Example: Fetch from DID document
    did_doc = await fetch_did_document(input.iss)
    key_entry = find_key_by_id(did_doc, input.kid)
    return import_public_key(key_entry["publicKeyJwk"], input.alg)

Fetch Developer Credential

async def fetch_developer_cred(developer_id: str) -> str:
    """Fetch developer credential JWT by ID."""
    # Fetch from your credential store or Beltic API
    response = await httpx.get(
        f"https://api.beltic.com/credentials/{developer_id}"
    )
    return response.json()["token"]

Check Status

from beltic import StatusCheckInput, CredentialStatus

async def check_status(input: StatusCheckInput) -> CredentialStatus:
    """Check credential revocation status."""
    # input.credential_id - Credential ID
    # input.revocation_list_url - Status list URL
    
    # Using Status List 2021
    from beltic import check_status_list_2021
    return await check_status_list_2021(
        input.revocation_list_url,
        input.credential_id
    )
    
    # Returns: "active", "revoked", "suspended", or "unknown"

TrustChainResult

class TrustChainResult:
    # Verified credentials
    agent: VerifiedCredential       # Agent credential and metadata
    developer: VerifiedCredential   # Developer credential and metadata
    
    # Status check results (if check_status was provided)
    status: dict[str, CredentialStatus]  # {"agent": "active", "developer": "active"}
    
    # Policy check results (if policy was provided)
    policy: dict[str, bool | list]
    # {
    #   "kybOk": True,
    #   "safetyOk": True,
    #   "verificationLevelOk": True,
    #   "dataCategoriesOk": True,
    #   "violations": []
    # }

Error Handling

TrustChainError

from beltic import verify_agent_trust_chain, TrustChainError

try:
    result = await verify_agent_trust_chain(token, options)
except TrustChainError as e:
    print(f"Step: {e.step}")
    # AGENT_VERIFY, DEVELOPER_FETCH, DEVELOPER_VERIFY, STATUS_CHECK, POLICY
    
    print(f"Message: {e.message}")
    
    # Partially verified credentials may be available
    if e.agent_credential:
        print(f"Agent: {e.agent_credential.get('agentName')}")
    if e.developer_credential:
        print(f"Developer: {e.developer_credential.get('legalName')}")

PolicyViolationError

from beltic import PolicyViolationError

try:
    result = await verify_agent_trust_chain(token, options)
except PolicyViolationError as e:
    print(f"Violations: {len(e.violations)}")
    
    for violation in e.violations:
        print(f"  Type: {violation.type}")  # KYB_TIER, SAFETY_SCORE, etc.
        print(f"  Required: {violation.required}")
        print(f"  Actual: {violation.actual}")
    
    # Check specific violation types
    if e.has_type("KYB_TIER"):
        print("KYB tier requirement not met")

Complete Example

import asyncio
import httpx
from beltic import (
    verify_agent_trust_chain,
    TrustChainOptions,
    TrustPolicy,
    TrustChainError,
    PolicyViolationError,
    import_public_key,
)

# Key cache for demo
KEY_CACHE = {}

async def key_resolver(input):
    """Resolve public key from Beltic's well-known keys."""
    if input.iss not in KEY_CACHE:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{input.iss}/.well-known/jwks.json")
            jwks = resp.json()
            for key in jwks["keys"]:
                KEY_CACHE[key["kid"]] = key
    
    jwk = KEY_CACHE.get(input.kid)
    return import_public_key(jwk, input.alg)

async def fetch_developer_cred(developer_id: str) -> str:
    """Fetch developer credential from Beltic API."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"https://api.beltic.com/v1/credentials/developer/{developer_id}"
        )
        return resp.json()["token"]

async def main():
    agent_token = "eyJhbGciOiJFZERTQSIs..."
    
    try:
        result = await verify_agent_trust_chain(
            agent_token,
            TrustChainOptions(
                key_resolver=key_resolver,
                fetch_developer_credential=fetch_developer_cred,
                policy=TrustPolicy(
                    min_kyb_tier="tier_2",
                    min_prompt_injection_score=85,
                    min_pii_leakage_score=85,
                    prohibited_data_categories=["children_data", "biometric"],
                ),
            ),
        )
        
        print("✓ Trust chain verified")
        print(f"  Agent: {result.agent.credential['agentName']}")
        print(f"  Developer: {result.developer.credential['legalName']}")
        print(f"  KYB Tier: {result.developer.credential['kybTier']}")
        print(f"  Prompt Injection Score: {result.agent.credential['promptInjectionRobustnessScore']}")
        
    except PolicyViolationError as e:
        print("✗ Policy violations:")
        for v in e.violations:
            print(f"  - {v.type}: required {v.required}, got {v.actual}")
            
    except TrustChainError as e:
        print(f"✗ Trust chain error at step {e.step}: {e.message}")

asyncio.run(main())

Status List 2021

For checking credential revocation:

from beltic import (
    decode_status_list,
    is_bit_set,
    parse_status_entry,
    StatusListCache,
)

# Parse status entry from credential
entry = parse_status_entry(credential.get("credentialStatus"))

if entry:
    # Fetch and decode status list
    async with httpx.AsyncClient() as client:
        response = await client.get(entry.status_list_credential)
        status_list_cred = response.json()
    
    # Decode and check
    bitstring = decode_status_list(
        status_list_cred["credentialSubject"]["encodedList"]
    )
    index = int(entry.status_list_index)
    is_revoked = is_bit_set(bitstring, index)
    
    print(f"Revoked: {is_revoked}")

# Use cache for efficiency
cache = StatusListCache(ttl_ms=5 * 60 * 1000)  # 5 minutes

Next Steps