API Reference
Complete Python SDK API reference with types, classes, and function signatures.
Complete reference for all types, classes, and functions in the Beltic Python SDK.
Validation
validate_developer_credential
Validate a DeveloperCredential against the v1 JSON schema.
def validate_developer_credential(
input: dict
) -> ValidationResult[DeveloperCredential]validate_agent_credential
Validate an AgentCredential against the v1 JSON schema.
def validate_agent_credential(
input: dict
) -> ValidationResult[AgentCredential]is_developer_credential
Type guard to check if a credential is a DeveloperCredential.
def is_developer_credential(credential: dict) -> boolis_agent_credential
Type guard to check if a credential is an AgentCredential.
def is_agent_credential(credential: dict) -> boolSigning
sign_credential
Sign a credential as a JWS token.
def sign_credential(
credential: dict,
private_key: CryptoKey,
options: SignOptions
) -> strgenerate_key_pair
Generate a new Ed25519 or ES256 key pair.
def generate_key_pair(
algorithm: Literal["EdDSA", "ES256"]
) -> tuple[CryptoKey, CryptoKey]import_key_from_pem
Import a key from PEM format.
def import_key_from_pem(
pem: bytes,
algorithm: Literal["EdDSA", "ES256"],
private: bool = False
) -> CryptoKeyexport_key_to_pem
Export a key to PEM format.
def export_key_to_pem(
key: CryptoKey,
private: bool = False
) -> bytesexport_public_key
Export a public key as JWK.
def export_public_key(key: CryptoKey) -> dictimport_public_key
Import a public key from JWK.
def import_public_key(
jwk: dict,
algorithm: Literal["EdDSA", "ES256"]
) -> CryptoKeyVerification
verify_credential
Verify a JWS token's signature and claims.
async def verify_credential(
token: str,
options: VerifyOptions
) -> VerifiedCredentialdecode_token
Decode a JWT without verification (for debugging).
def decode_token(token: str) -> DecodedTokenget_credential_type
Get the type of credential from a token.
def get_credential_type(
token: str
) -> Literal["developer", "agent"] | NoneTrust Chain
verify_agent_trust_chain
Verify complete agent trust chain with policies.
async def verify_agent_trust_chain(
agent_token: str,
options: TrustChainOptions
) -> TrustChainResultStatus List
decode_status_list
Decode a compressed Status List 2021 bitstring.
def decode_status_list(encoded_list: str) -> bytesis_bit_set
Check if a bit is set in a bitstring.
def is_bit_set(bitstring: bytes, index: int) -> boolcheck_status_list_2021
Check credential status using Status List 2021.
async def check_status_list_2021(
status_list_url: str,
status_list_index: int
) -> CredentialStatusparse_status_entry
Parse a credentialStatus object from a credential.
def parse_status_entry(
credential_status: dict | None
) -> StatusEntry | NoneStatusListCache
Cache for Status List 2021 fetches.
class StatusListCache:
def __init__(self, ttl_ms: int = 300000): ...
async def fetch(self, url: str) -> bytes: ...
def clear(self) -> None: ...HTTP Signing (Web Bot Auth)
sign_http_request
Sign an HTTP request per RFC 9421.
async def sign_http_request(
request: HttpRequestInfo,
options: HttpSignOptions
) -> HttpSignatureHeaderscreate_signed_request
Create a fully signed HTTP request.
async def create_signed_request(
request: HttpRequestInfo,
options: HttpSignOptions
) -> dictcompute_jwk_thumbprint
Compute JWK thumbprint (RFC 7638).
def compute_jwk_thumbprint(jwk: dict) -> strcompute_content_digest
Compute Content-Digest header value.
def compute_content_digest(body: bytes) -> strgenerate_nonce
Generate a cryptographic nonce.
def generate_nonce() -> strHTTP Verification
verify_http_signature
Verify HTTP message signature.
async def verify_http_signature(
request: IncomingHttpRequest,
options: HttpVerifyOptions
) -> HttpVerificationResultfetch_key_directory
Fetch a key directory from URL.
async def fetch_key_directory(url: str) -> KeyDirectoryfind_key_by_thumbprint
Find a key in a directory by thumbprint.
def find_key_by_thumbprint(
directory: KeyDirectory,
thumbprint: str
) -> dict | Nonecreate_default_key_resolver
Create a default key resolver for HTTP verification.
def create_default_key_resolver() -> Callableverify_content_digest
Verify Content-Digest header.
def verify_content_digest(body: bytes, header: str) -> boolKey Directory
generate_key_directory
Generate a key directory JSON structure.
def generate_key_directory(
options: GenerateDirectoryOptions
) -> dictsign_directory_response
Sign a key directory response.
async def sign_directory_response(
directory: dict,
options: SignDirectoryOptions
) -> SignedDirectoryResponsegenerate_web_bot_auth_setup
Generate complete Web Bot Auth setup.
async def generate_web_bot_auth_setup(
base_url: str | None = None
) -> dictReplay Protection
InMemoryJtiStore
In-memory JTI store for replay protection.
class InMemoryJtiStore:
def __init__(self, ttl_seconds: int = 300): ...
async def has(self, jti: str) -> bool: ...
async def add(self, jti: str, exp: int) -> None: ...
async def cleanup(self) -> None: ...ReplayDetectedError
Error raised when replay is detected.
class ReplayDetectedError(BelticError):
jti: str
first_seen: datetimeContent Integrity
compute_content_hash
Compute hash of content.
def compute_content_hash(
content: bytes,
algorithm: ContentHashAlgorithm = "sha-256"
) -> bytesverify_content_hash
Verify content against hash.
def verify_content_hash(
content: bytes,
hash: bytes,
algorithm: ContentHashAlgorithm = "sha-256"
) -> boolcreate_hash_claim
Create a hash claim for embedding in credentials.
def create_hash_claim(
content: bytes,
algorithm: ContentHashAlgorithm = "sha-256"
) -> dictverify_hash_claim
Verify a hash claim.
def verify_hash_claim(content: bytes, claim: dict) -> boolHash Chains
CredentialChain
Credential hash chain for auditing.
class CredentialChain:
def __init__(self): ...
def add(self, credential: dict, token: str) -> ChainEntry: ...
def get_proof(self) -> ChainProof: ...
def verify(self) -> bool: ...
def __len__(self) -> int: ...compute_chain_hash
Compute hash for chain entry.
def compute_chain_hash(
credential_hash: bytes,
previous_hash: bytes | None
) -> bytesverify_chain_integrity
Verify chain integrity.
def verify_chain_integrity(entries: list[ChainEntry]) -> boolMulti-Signature
MultiSigCredential
Credential with multiple signatures.
class MultiSigCredential:
def __init__(self, credential: dict): ...
async def add_signature(
self,
private_key: CryptoKey,
signer_id: str,
options: SignOptions
) -> None: ...
def get_signatures(self) -> list[SignatureEntry]: ...
async def verify_all(
self,
key_resolver: MultiSigKeyResolver
) -> MultiSigVerificationResult: ...add_signature
Add signature to multi-sig credential.
async def add_signature(
credential: MultiSigCredential,
private_key: CryptoKey,
signer_id: str,
options: SignOptions
) -> Noneverify_multi_signatures
Verify all signatures on a multi-sig credential.
async def verify_multi_signatures(
credential: MultiSigCredential,
key_resolver: MultiSigKeyResolver
) -> MultiSigVerificationResultAudit Logging
AuditLogger
Audit logger with pluggable handlers.
class AuditLogger:
def __init__(self): ...
def add_handler(self, handler: AuditHandler) -> None: ...
def log(self, event: AuditEvent) -> None: ...AuditEvent
Audit event structure.
@dataclass
class AuditEvent:
type: AuditEventType
timestamp: datetime
credential_id: str | None
action: str
actor: str | None
details: dictAuditEventType
Audit event types.
class AuditEventType(Enum):
CREDENTIAL_ISSUED = "credential_issued"
CREDENTIAL_VERIFIED = "credential_verified"
CREDENTIAL_REVOKED = "credential_revoked"
SIGNATURE_CREATED = "signature_created"
SIGNATURE_VERIFIED = "signature_verified"
TRUST_CHAIN_VERIFIED = "trust_chain_verified"
POLICY_VIOLATION = "policy_violation"ConsoleAuditHandler
Log audit events to console.
class ConsoleAuditHandler(AuditHandler):
def handle(self, event: AuditEvent) -> None: ...FileAuditHandler
Log audit events to file.
class FileAuditHandler(AuditHandler):
def __init__(self, path: str): ...
def handle(self, event: AuditEvent) -> None: ...InMemoryAuditHandler
Store audit events in memory.
class InMemoryAuditHandler(AuditHandler):
events: list[AuditEvent]
def handle(self, event: AuditEvent) -> None: ...
def clear(self) -> None: ...Cloud Signers
LocalSigner
Local key signer.
class LocalSigner(CloudSigner):
def __init__(
self,
private_key: CryptoKey,
algorithm: Literal["EdDSA", "ES256"]
): ...
async def sign(self, data: bytes) -> bytes: ...
def get_info(self) -> SignerInfo: ...CloudSigner
Abstract base class for cloud signers.
class CloudSigner(ABC):
@abstractmethod
async def sign(self, data: bytes) -> bytes: ...
@abstractmethod
def get_info(self) -> SignerInfo: ...Selective Disclosure (SD-JWT)
create_sd_jwt
Create a selective disclosure JWT.
async def create_sd_jwt(
credential: dict,
private_key: CryptoKey,
options: CreateSdJwtOptions
) -> SDJwtcreate_presentation
Create a presentation from SD-JWT with selected disclosures.
def create_presentation(
sd_jwt: SDJwt,
disclosed_paths: list[str]
) -> strverify_sd_jwt
Verify an SD-JWT presentation.
async def verify_sd_jwt(
presentation: str,
options: VerifySdJwtOptions
) -> SdJwtVerificationResultcreate_disclosure
Create a disclosure for a claim.
def create_disclosure(
salt: str,
claim_name: str,
claim_value: Any
) -> Disclosuredecode_disclosure
Decode a disclosure from base64url.
def decode_disclosure(encoded: str) -> DisclosureError Classes
BelticError
Base error class.
class BelticError(Exception):
code: str
message: strValidationError
Validation error with detailed issues.
class ValidationError(BelticError):
issues: list[ValidationErrorDetail]
def format(self) -> str: ...
def by_path(self) -> dict[str, list[ValidationErrorDetail]]: ...SignatureError
Signature verification error.
class SignatureError(BelticError):
step: Literal["PARSE", "KEY_RESOLUTION", "SIGNATURE", "CLAIMS", "SCHEMA"]
code: strTrustChainError
Trust chain verification error.
class TrustChainError(BelticError):
step: str
agent_credential: dict | None
developer_credential: dict | NonePolicyViolationError
Policy violation error.
class PolicyViolationError(BelticError):
violations: list[PolicyViolation]
def has_type(self, violation_type: str) -> bool: ...Types
SignOptions
@dataclass
class SignOptions:
alg: Literal["EdDSA", "ES256"]
issuer_did: str
subject_did: str
key_id: str | None = None
audience: str | None = NoneVerifyOptions
@dataclass
class VerifyOptions:
key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
expected_issuer: str | None = None
expected_audience: str | None = None
allowed_algorithms: list[str] | None = NoneTrustChainOptions
@dataclass
class TrustChainOptions:
key_resolver: Callable[[KeyResolverInput], Awaitable[CryptoKey]]
fetch_developer_credential: Callable[[str], Awaitable[str]]
check_status: Callable[[StatusCheckInput], Awaitable[CredentialStatus]] | None = None
policy: TrustPolicy | None = NoneTrustPolicy
@dataclass
class TrustPolicy:
min_kyb_tier: str | None = None
min_prompt_injection_score: int | None = None
min_pii_leakage_score: int | None = None
min_tool_abuse_score: int | None = None
min_harm_refusal_score: int | None = None
min_verification_level: str | None = None
prohibited_data_categories: list[str] | None = NoneHttpRequestInfo
@dataclass
class HttpRequestInfo:
method: str
url: str
headers: dict[str, str] = field(default_factory=dict)
body: bytes | None = NoneHttpSignOptions
@dataclass
class HttpSignOptions:
private_key: CryptoKey
key_id: str
key_directory_url: str
components: list[str] | None = None
expires_in: int = 60