Beltic logo
Python SDK Guide

API Reference

Complete Python SDK API reference with types, classes, and function signatures.

Complete reference for all types, classes, and functions in the Beltic Python SDK.

Validation

validate_developer_credential

Validate a DeveloperCredential against the v1 JSON schema.

def validate_developer_credential(
    input: dict
) -> ValidationResult[DeveloperCredential]

validate_agent_credential

Validate an AgentCredential against the v1 JSON schema.

def validate_agent_credential(
    input: dict
) -> ValidationResult[AgentCredential]

is_developer_credential

Type guard to check if a credential is a DeveloperCredential.

def is_developer_credential(credential: dict) -> bool

is_agent_credential

Type guard to check if a credential is an AgentCredential.

def is_agent_credential(credential: dict) -> bool

Signing

sign_credential

Sign a credential as a JWS token.

def sign_credential(
    credential: dict,
    private_key: CryptoKey,
    options: SignOptions
) -> str

generate_key_pair

Generate a new Ed25519 or ES256 key pair.

def generate_key_pair(
    algorithm: Literal["EdDSA", "ES256"]
) -> tuple[CryptoKey, CryptoKey]

import_key_from_pem

Import a key from PEM format.

def import_key_from_pem(
    pem: bytes,
    algorithm: Literal["EdDSA", "ES256"],
    private: bool = False
) -> CryptoKey

export_key_to_pem

Export a key to PEM format.

def export_key_to_pem(
    key: CryptoKey,
    private: bool = False
) -> bytes

export_public_key

Export a public key as JWK.

def export_public_key(key: CryptoKey) -> dict

import_public_key

Import a public key from JWK.

def import_public_key(
    jwk: dict,
    algorithm: Literal["EdDSA", "ES256"]
) -> CryptoKey

Verification

verify_credential

Verify a JWS token's signature and claims.

async def verify_credential(
    token: str,
    options: VerifyOptions
) -> VerifiedCredential

decode_token

Decode a JWT without verification (for debugging).

def decode_token(token: str) -> DecodedToken

get_credential_type

Get the type of credential from a token.

def get_credential_type(
    token: str
) -> Literal["developer", "agent"] | None

Trust Chain

verify_agent_trust_chain

Verify complete agent trust chain with policies.

async def verify_agent_trust_chain(
    agent_token: str,
    options: TrustChainOptions
) -> TrustChainResult

Status List

decode_status_list

Decode a compressed Status List 2021 bitstring.

def decode_status_list(encoded_list: str) -> bytes

is_bit_set

Check if a bit is set in a bitstring.

def is_bit_set(bitstring: bytes, index: int) -> bool

check_status_list_2021

Check credential status using Status List 2021.

async def check_status_list_2021(
    status_list_url: str,
    status_list_index: int
) -> CredentialStatus

parse_status_entry

Parse a credentialStatus object from a credential.

def parse_status_entry(
    credential_status: dict | None
) -> StatusEntry | None

StatusListCache

Cache for Status List 2021 fetches.

class StatusListCache:
    def __init__(self, ttl_ms: int = 300000): ...
    async def fetch(self, url: str) -> bytes: ...
    def clear(self) -> None: ...

HTTP Signing (Web Bot Auth)

sign_http_request

Sign an HTTP request per RFC 9421.

async def sign_http_request(
    request: HttpRequestInfo,
    options: HttpSignOptions
) -> HttpSignatureHeaders

create_signed_request

Create a fully signed HTTP request.

async def create_signed_request(
    request: HttpRequestInfo,
    options: HttpSignOptions
) -> dict

compute_jwk_thumbprint

Compute JWK thumbprint (RFC 7638).

def compute_jwk_thumbprint(jwk: dict) -> str

compute_content_digest

Compute Content-Digest header value.

def compute_content_digest(body: bytes) -> str

generate_nonce

Generate a cryptographic nonce.

def generate_nonce() -> str

HTTP Verification

verify_http_signature

Verify HTTP message signature.

async def verify_http_signature(
    request: IncomingHttpRequest,
    options: HttpVerifyOptions
) -> HttpVerificationResult

fetch_key_directory

Fetch a key directory from URL.

async def fetch_key_directory(url: str) -> KeyDirectory

find_key_by_thumbprint

Find a key in a directory by thumbprint.

def find_key_by_thumbprint(
    directory: KeyDirectory,
    thumbprint: str
) -> dict | None

create_default_key_resolver

Create a default key resolver for HTTP verification.

def create_default_key_resolver() -> Callable

verify_content_digest

Verify Content-Digest header.

def verify_content_digest(body: bytes, header: str) -> bool

Key Directory

generate_key_directory

Generate a key directory JSON structure.

def generate_key_directory(
    options: GenerateDirectoryOptions
) -> dict

sign_directory_response

Sign a key directory response.

async def sign_directory_response(
    directory: dict,
    options: SignDirectoryOptions
) -> SignedDirectoryResponse

generate_web_bot_auth_setup

Generate complete Web Bot Auth setup.

async def generate_web_bot_auth_setup(
    base_url: str | None = None
) -> dict

Replay Protection

InMemoryJtiStore

In-memory JTI store for replay protection.

class InMemoryJtiStore:
    def __init__(self, ttl_seconds: int = 300): ...
    async def has(self, jti: str) -> bool: ...
    async def add(self, jti: str, exp: int) -> None: ...
    async def cleanup(self) -> None: ...

ReplayDetectedError

Error raised when replay is detected.

class ReplayDetectedError(BelticError):
    jti: str
    first_seen: datetime

Content Integrity

compute_content_hash

Compute hash of content.

def compute_content_hash(
    content: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> bytes

verify_content_hash

Verify content against hash.

def verify_content_hash(
    content: bytes,
    hash: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> bool

create_hash_claim

Create a hash claim for embedding in credentials.

def create_hash_claim(
    content: bytes,
    algorithm: ContentHashAlgorithm = "sha-256"
) -> dict

verify_hash_claim

Verify a hash claim.

def verify_hash_claim(content: bytes, claim: dict) -> bool

Hash Chains

CredentialChain

Credential hash chain for auditing.

class CredentialChain:
    def __init__(self): ...
    def add(self, credential: dict, token: str) -> ChainEntry: ...
    def get_proof(self) -> ChainProof: ...
    def verify(self) -> bool: ...
    def __len__(self) -> int: ...

compute_chain_hash

Compute hash for chain entry.

def compute_chain_hash(
    credential_hash: bytes,
    previous_hash: bytes | None
) -> bytes

verify_chain_integrity

Verify chain integrity.

def verify_chain_integrity(entries: list[ChainEntry]) -> bool

Multi-Signature

MultiSigCredential

Credential with multiple signatures.

class MultiSigCredential:
    def __init__(self, credential: dict): ...
    async def add_signature(
        self,
        private_key: CryptoKey,
        signer_id: str,
        options: SignOptions
    ) -> None: ...
    def get_signatures(self) -> list[SignatureEntry]: ...
    async def verify_all(
        self,
        key_resolver: MultiSigKeyResolver
    ) -> MultiSigVerificationResult: ...

add_signature

Add signature to multi-sig credential.

async def add_signature(
    credential: MultiSigCredential,
    private_key: CryptoKey,
    signer_id: str,
    options: SignOptions
) -> None

verify_multi_signatures

Verify all signatures on a multi-sig credential.

async def verify_multi_signatures(
    credential: MultiSigCredential,
    key_resolver: MultiSigKeyResolver
) -> MultiSigVerificationResult

Audit Logging

AuditLogger

Audit logger with pluggable handlers.

class AuditLogger:
    def __init__(self): ...
    def add_handler(self, handler: AuditHandler) -> None: ...
    def log(self, event: AuditEvent) -> None: ...

AuditEvent

Audit event structure.

@dataclass
class AuditEvent:
    type: AuditEventType
    timestamp: datetime
    credential_id: str | None
    action: str
    actor: str | None
    details: dict

AuditEventType

Audit event types.

class AuditEventType(Enum):
    CREDENTIAL_ISSUED = "credential_issued"
    CREDENTIAL_VERIFIED = "credential_verified"
    CREDENTIAL_REVOKED = "credential_revoked"
    SIGNATURE_CREATED = "signature_created"
    SIGNATURE_VERIFIED = "signature_verified"
    TRUST_CHAIN_VERIFIED = "trust_chain_verified"
    POLICY_VIOLATION = "policy_violation"

ConsoleAuditHandler

Log audit events to console.

class ConsoleAuditHandler(AuditHandler):
    def handle(self, event: AuditEvent) -> None: ...

FileAuditHandler

Log audit events to file.

class FileAuditHandler(AuditHandler):
    def __init__(self, path: str): ...
    def handle(self, event: AuditEvent) -> None: ...

InMemoryAuditHandler

Store audit events in memory.

class InMemoryAuditHandler(AuditHandler):
    events: list[AuditEvent]
    def handle(self, event: AuditEvent) -> None: ...
    def clear(self) -> None: ...

Cloud Signers

LocalSigner

Local key signer.

class LocalSigner(CloudSigner):
    def __init__(
        self,
        private_key: CryptoKey,
        algorithm: Literal["EdDSA", "ES256"]
    ): ...
    async def sign(self, data: bytes) -> bytes: ...
    def get_info(self) -> SignerInfo: ...

CloudSigner

Abstract base class for cloud signers.

class CloudSigner(ABC):
    @abstractmethod
    async def sign(self, data: bytes) -> bytes: ...
    @abstractmethod
    def get_info(self) -> SignerInfo: ...

Selective Disclosure (SD-JWT)

create_sd_jwt

Create a selective disclosure JWT.

async def create_sd_jwt(
    credential: dict,
    private_key: CryptoKey,
    options: CreateSdJwtOptions
) -> SDJwt

create_presentation

Create a presentation from SD-JWT with selected disclosures.

def create_presentation(
    sd_jwt: SDJwt,
    disclosed_paths: list[str]
) -> str

verify_sd_jwt

Verify an SD-JWT presentation.

async def verify_sd_jwt(
    presentation: str,
    options: VerifySdJwtOptions
) -> SdJwtVerificationResult

create_disclosure

Create a disclosure for a claim.

def create_disclosure(
    salt: str,
    claim_name: str,
    claim_value: Any
) -> Disclosure

decode_disclosure

Decode a disclosure from base64url.

def decode_disclosure(encoded: str) -> Disclosure

Error Classes

BelticError

Base error class.

class BelticError(Exception):
    code: str
    message: str

ValidationError

Validation error with detailed issues.

class ValidationError(BelticError):
    issues: list[ValidationErrorDetail]
    
    def format(self) -> str: ...
    def by_path(self) -> dict[str, list[ValidationErrorDetail]]: ...

SignatureError

Signature verification error.

class SignatureError(BelticError):
    step: Literal["PARSE", "KEY_RESOLUTION", "SIGNATURE", "CLAIMS", "SCHEMA"]
    code: str

TrustChainError

Trust chain verification error.

class TrustChainError(BelticError):
    step: str
    agent_credential: dict | None
    developer_credential: dict | None

PolicyViolationError

Policy violation error.

class PolicyViolationError(BelticError):
    violations: list[PolicyViolation]
    
    def has_type(self, violation_type: str) -> bool: ...

Types

SignOptions

@dataclass
class SignOptions:
    alg: Literal["EdDSA", "ES256"]
    issuer_did: str
    subject_did: str
    key_id: str | None = None
    audience: str | None = None

VerifyOptions

@dataclass
class VerifyOptions:
    key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
    expected_issuer: str | None = None
    expected_audience: str | None = None
    allowed_algorithms: list[str] | None = None

TrustChainOptions

@dataclass
class TrustChainOptions:
    key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
    fetch_developer_credential: Callable[[str], Awaitable[str]]
    check_status: Callable[[StatusCheckInput], Awaitable[CredentialStatus]] | None = None
    policy: TrustPolicy | None = None

TrustPolicy

@dataclass
class TrustPolicy:
    min_kyb_tier: str | None = None
    min_prompt_injection_score: int | None = None
    min_pii_leakage_score: int | None = None
    min_tool_abuse_score: int | None = None
    min_harm_refusal_score: int | None = None
    min_verification_level: str | None = None
    prohibited_data_categories: list[str] | None = None

HttpRequestInfo

@dataclass
class HttpRequestInfo:
    method: str
    url: str
    headers: dict[str, str] = field(default_factory=dict)
    body: bytes | None = None

HttpSignOptions

@dataclass
class HttpSignOptions:
    private_key: CryptoKey
    key_id: str
    key_directory_url: str
    components: list[str] | None = None
    expires_in: int = 60

See Also