Python SDK Guide
Trust Chains
Verify agent-developer trust chains with policy enforcement using the Python SDK.
Trust chain verification ensures that an AgentCredential is linked to a valid DeveloperCredential and that both meet your policy requirements.
Trust Chain Concept
Agent Credential ──links to──▶ Developer Credential ──issued by──▶ Beltic
│ │
▼ ▼
Safety scores KYB verification
Tool capabilities Sanctions screening
Data handling Risk ratingWhen verifying an agent, you should:
- Verify the AgentCredential signature
- Fetch and verify the linked DeveloperCredential
- Check both credentials against your policy
- Optionally check revocation status
Basic Trust Chain Verification
from beltic import verify_agent_trust_chain, TrustChainOptions
result = await verify_agent_trust_chain(
agent_token,
TrustChainOptions(
key_resolver=key_resolver,
fetch_developer_credential=fetch_developer_cred,
),
)
print(f"Agent: {result.agent.credential['agentName']}")
print(f"Developer: {result.developer.credential['legalName']}")With Policy Enforcement
from beltic import verify_agent_trust_chain, TrustChainOptions, TrustPolicy
result = await verify_agent_trust_chain(
agent_token,
TrustChainOptions(
key_resolver=key_resolver,
fetch_developer_credential=fetch_developer_cred,
# Optional: Check revocation status
check_status=check_status_callback,
# Required policies
policy=TrustPolicy(
min_kyb_tier="tier_2",
min_prompt_injection_score=80,
min_pii_leakage_score=80,
min_verification_level="beltic_verified",
prohibited_data_categories=["children_data"],
),
),
)
# Check policy results
print(f"KYB OK: {result.policy['kybOk']}")
print(f"Safety OK: {result.policy['safetyOk']}")
print(f"Data Categories OK: {result.policy['dataCategoriesOk']}")TrustChainOptions
class TrustChainOptions:
# Required: Resolve public keys from DIDs
key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
# Required: Fetch developer credential by ID
fetch_developer_credential: Callable[[str], Awaitable[str]]
# Optional: Check revocation status
check_status: Callable[[StatusCheckInput], Awaitable[CredentialStatus]] | None = None
# Optional: Policy requirements
policy: TrustPolicy | None = NoneTrustPolicy
class TrustPolicy:
# Developer KYB tier requirement
min_kyb_tier: Literal["tier_1", "tier_2", "tier_3"] | None = None
# Safety score requirements (0-100)
min_prompt_injection_score: int | None = None
min_pii_leakage_score: int | None = None
min_tool_abuse_score: int | None = None
min_harm_refusal_score: int | None = None
# Verification level requirement
min_verification_level: Literal[
"self_attested",
"beltic_verified",
"third_party_verified"
] | None = None
# Data categories that must NOT be processed
prohibited_data_categories: list[str] | None = NoneCallback Functions
Key Resolver
async def key_resolver(input: KeyResolverInput) -> CryptoKey:
"""Resolve public key from DID or key registry."""
# input.kid - Key ID (e.g., "did:web:beltic.com#key-1")
# input.alg - Algorithm (e.g., "EdDSA")
# input.iss - Issuer DID
# Example: Fetch from DID document
did_doc = await fetch_did_document(input.iss)
key_entry = find_key_by_id(did_doc, input.kid)
return import_public_key(key_entry["publicKeyJwk"], input.alg)Fetch Developer Credential
async def fetch_developer_cred(developer_id: str) -> str:
"""Fetch developer credential JWT by ID."""
# Fetch from your credential store or Beltic API
response = await httpx.get(
f"https://api.beltic.com/credentials/{developer_id}"
)
return response.json()["token"]Check Status
from beltic import StatusCheckInput, CredentialStatus
async def check_status(input: StatusCheckInput) -> CredentialStatus:
"""Check credential revocation status."""
# input.credential_id - Credential ID
# input.revocation_list_url - Status list URL
# Using Status List 2021
from beltic import check_status_list_2021
return await check_status_list_2021(
input.revocation_list_url,
input.credential_id
)
# Returns: "active", "revoked", "suspended", or "unknown"TrustChainResult
class TrustChainResult:
# Verified credentials
agent: VerifiedCredential # Agent credential and metadata
developer: VerifiedCredential # Developer credential and metadata
# Status check results (if check_status was provided)
status: dict[str, CredentialStatus] # {"agent": "active", "developer": "active"}
# Policy check results (if policy was provided)
policy: dict[str, bool | list]
# {
# "kybOk": True,
# "safetyOk": True,
# "verificationLevelOk": True,
# "dataCategoriesOk": True,
# "violations": []
# }Error Handling
TrustChainError
from beltic import verify_agent_trust_chain, TrustChainError
try:
result = await verify_agent_trust_chain(token, options)
except TrustChainError as e:
print(f"Step: {e.step}")
# AGENT_VERIFY, DEVELOPER_FETCH, DEVELOPER_VERIFY, STATUS_CHECK, POLICY
print(f"Message: {e.message}")
# Partially verified credentials may be available
if e.agent_credential:
print(f"Agent: {e.agent_credential.get('agentName')}")
if e.developer_credential:
print(f"Developer: {e.developer_credential.get('legalName')}")PolicyViolationError
from beltic import PolicyViolationError
try:
result = await verify_agent_trust_chain(token, options)
except PolicyViolationError as e:
print(f"Violations: {len(e.violations)}")
for violation in e.violations:
print(f" Type: {violation.type}") # KYB_TIER, SAFETY_SCORE, etc.
print(f" Required: {violation.required}")
print(f" Actual: {violation.actual}")
# Check specific violation types
if e.has_type("KYB_TIER"):
print("KYB tier requirement not met")Complete Example
import asyncio
import httpx
from beltic import (
verify_agent_trust_chain,
TrustChainOptions,
TrustPolicy,
TrustChainError,
PolicyViolationError,
import_public_key,
)
# Key cache for demo
KEY_CACHE = {}
async def key_resolver(input):
"""Resolve public key from Beltic's well-known keys."""
if input.iss not in KEY_CACHE:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{input.iss}/.well-known/jwks.json")
jwks = resp.json()
for key in jwks["keys"]:
KEY_CACHE[key["kid"]] = key
jwk = KEY_CACHE.get(input.kid)
return import_public_key(jwk, input.alg)
async def fetch_developer_cred(developer_id: str) -> str:
"""Fetch developer credential from Beltic API."""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://api.beltic.com/v1/credentials/developer/{developer_id}"
)
return resp.json()["token"]
async def main():
agent_token = "eyJhbGciOiJFZERTQSIs..."
try:
result = await verify_agent_trust_chain(
agent_token,
TrustChainOptions(
key_resolver=key_resolver,
fetch_developer_credential=fetch_developer_cred,
policy=TrustPolicy(
min_kyb_tier="tier_2",
min_prompt_injection_score=85,
min_pii_leakage_score=85,
prohibited_data_categories=["children_data", "biometric"],
),
),
)
print("✓ Trust chain verified")
print(f" Agent: {result.agent.credential['agentName']}")
print(f" Developer: {result.developer.credential['legalName']}")
print(f" KYB Tier: {result.developer.credential['kybTier']}")
print(f" Prompt Injection Score: {result.agent.credential['promptInjectionRobustnessScore']}")
except PolicyViolationError as e:
print("✗ Policy violations:")
for v in e.violations:
print(f" - {v.type}: required {v.required}, got {v.actual}")
except TrustChainError as e:
print(f"✗ Trust chain error at step {e.step}: {e.message}")
asyncio.run(main())Status List 2021
For checking credential revocation:
from beltic import (
decode_status_list,
is_bit_set,
parse_status_entry,
StatusListCache,
)
# Parse status entry from credential
entry = parse_status_entry(credential.get("credentialStatus"))
if entry:
# Fetch and decode status list
async with httpx.AsyncClient() as client:
response = await client.get(entry.status_list_credential)
status_list_cred = response.json()
# Decode and check
bitstring = decode_status_list(
status_list_cred["credentialSubject"]["encodedList"]
)
index = int(entry.status_list_index)
is_revoked = is_bit_set(bitstring, index)
print(f"Revoked: {is_revoked}")
# Use cache for efficiency
cache = StatusListCache(ttl_ms=5 * 60 * 1000) # 5 minutes