Advanced SDK Features
Advanced features in the Beltic SDK including selective disclosure, replay protection, hash chains, multi-signatures, and audit logging.
Both the TypeScript and Python SDKs include advanced features for enterprise use cases. This page covers features beyond basic validation, signing, and verification.
Selective Disclosure (SD-JWT)
SD-JWT allows credential holders to selectively disclose specific claims without revealing the entire credential. This is useful for privacy-preserving verification scenarios.
import {
createSdJwt,
createPresentation,
verifySdJwt,
} from '@beltic/sdk';
// Create SD-JWT with selectively disclosable claims
const sdJwt = await createSdJwt(credential, privateKey, {
alg: 'EdDSA',
issuerDid: 'did:web:beltic.com',
subjectDid: 'did:web:developer.example.com',
disclosablePaths: [
'/legalName',
'/incorporationJurisdiction',
'/kybTier',
],
});
// Create presentation with selected disclosures
const presentation = createPresentation(sdJwt, [
'/kybTier', // Only disclose KYB tier
]);
// Verify SD-JWT presentation
const result = await verifySdJwt(presentation, {
keyResolver: async (header) => publicKey,
});
console.log(result.disclosedClaims); // Only contains kybTierfrom beltic import (
create_sd_jwt,
create_presentation,
verify_sd_jwt,
)
# Create SD-JWT with selectively disclosable claims
sd_jwt = await create_sd_jwt(credential, private_key, {
"alg": "EdDSA",
"issuer_did": "did:web:beltic.com",
"subject_did": "did:web:developer.example.com",
"disclosable_paths": [
"/legalName",
"/incorporationJurisdiction",
"/kybTier",
],
})
# Create presentation with selected disclosures
presentation = create_presentation(sd_jwt, [
"/kybTier", # Only disclose KYB tier
])
# Verify SD-JWT presentation
async def resolver(input):
return public_key
result = await verify_sd_jwt(presentation, {
"key_resolver": resolver,
})
print(result.disclosed_claims) # Only contains kybTierUse Cases
- Minimal disclosure: Share only KYB tier without revealing legal name
- Regulatory compliance: Disclose different fields to different verifiers
- Privacy protection: Prevent data correlation across platforms
Replay Protection
Prevent credential tokens from being reused in replay attacks by tracking seen JTI (JWT ID) values.
import {
InMemoryJtiStore,
checkReplay,
ReplayDetectedError,
} from '@beltic/sdk';
// Create JTI store (use Redis/database in production)
const jtiStore = new InMemoryJtiStore(300); // 5 min TTL
// Check for replay before processing
async function processToken(token: string) {
const decoded = decodeToken(token);
const jti = decoded.payload.jti;
const exp = decoded.payload.exp;
try {
await checkReplay(jtiStore, jti, exp);
// Process token...
} catch (error) {
if (error instanceof ReplayDetectedError) {
console.error(`Replay attack detected: ${error.jti}`);
throw error;
}
throw error;
}
}
// With middleware
import { createReplayMiddleware } from '@beltic/sdk';
const replayMiddleware = createReplayMiddleware(jtiStore);
app.use('/api/verify', replayMiddleware);from beltic import (
InMemoryJtiStore,
ReplayDetectedError,
decode_token,
)
# Create JTI store (use Redis/database in production)
jti_store = InMemoryJtiStore(ttl_seconds=300) # 5 min TTL
async def process_token(token: str):
decoded = decode_token(token)
jti = decoded.payload["jti"]
exp = decoded.payload["exp"]
# Check if already seen
if await jti_store.has(jti):
raise ReplayDetectedError(jti)
# Mark as seen
await jti_store.add(jti, exp)
# Process token...Production Considerations
Use a persistent store in production. InMemoryJtiStore is suitable for development but data is lost on restart. Use Redis, PostgreSQL, or a similar persistent store.
Content Integrity
Compute and verify cryptographic hashes of content to ensure integrity.
import {
computeContentHash,
verifyContentHash,
createHashClaim,
verifyHashClaim,
} from '@beltic/sdk';
// Compute hash of content
const content = Buffer.from(JSON.stringify(credential));
const hash = computeContentHash(content, 'sha-256');
// Verify hash
const isValid = verifyContentHash(content, hash, 'sha-256');
// Create hash claim for embedding in credentials
const hashClaim = createHashClaim(content, 'sha-256');
// { algorithm: "sha-256", hash: "base64url-encoded-hash" }
// Verify hash claim
const claimValid = verifyHashClaim(content, hashClaim);from beltic import (
compute_content_hash,
verify_content_hash,
create_hash_claim,
verify_hash_claim,
)
# Compute hash of content
content = json.dumps(credential).encode()
hash = compute_content_hash(content, "sha-256")
# Verify hash
is_valid = verify_content_hash(content, hash, "sha-256")
# Create hash claim for embedding in credentials
hash_claim = create_hash_claim(content, "sha-256")
# {"algorithm": "sha-256", "hash": "base64url-encoded-hash"}
# Verify hash claim
claim_valid = verify_hash_claim(content, hash_claim)Credential Hash Chains
Create an auditable chain of credentials where each entry is linked to the previous one via cryptographic hashes.
import {
CredentialChain,
verifyChainIntegrity,
} from '@beltic/sdk';
// Create a new chain
const chain = new CredentialChain();
// Add credentials to chain
const entry1 = chain.add(credential1, token1);
const entry2 = chain.add(credential2, token2);
const entry3 = chain.add(credential3, token3);
// Each entry has:
// - credentialHash: hash of the credential
// - previousHash: hash of previous entry (null for first)
// - chainHash: combined hash linking to previous
// Get proof for audit
const proof = chain.getProof();
// Verify chain integrity
const isIntact = chain.verify();
// Verify from serialized entries
const entries = [...]; // Load from database
const valid = verifyChainIntegrity(entries);from beltic import (
CredentialChain,
verify_chain_integrity,
)
# Create a new chain
chain = CredentialChain()
# Add credentials to chain
entry1 = chain.add(credential1, token1)
entry2 = chain.add(credential2, token2)
entry3 = chain.add(credential3, token3)
# Get proof for audit
proof = chain.get_proof()
# Verify chain integrity
is_intact = chain.verify()
# Verify from serialized entries
entries = [...] # Load from database
valid = verify_chain_integrity(entries)Use Cases
- Audit trail: Track all credential issuances in tamper-evident log
- Version history: Link credential updates to previous versions
- Compliance: Demonstrate credential lifecycle for regulators
Multi-Signature Support
Credentials requiring approval from multiple parties can use multi-signature support.
import {
MultiSigCredential,
addSignature,
verifyMultiSignatures,
} from '@beltic/sdk';
// Create multi-sig credential
const multiSig = new MultiSigCredential(credential);
// Add signatures from multiple parties
await multiSig.addSignature(issuerPrivateKey, 'issuer', {
alg: 'EdDSA',
issuerDid: 'did:web:beltic.com',
subjectDid: 'did:web:developer.example.com',
});
await multiSig.addSignature(auditorPrivateKey, 'auditor', {
alg: 'EdDSA',
issuerDid: 'did:web:auditor.example.com',
subjectDid: 'did:web:developer.example.com',
});
// Get all signatures
const signatures = multiSig.getSignatures();
// [{ signerId: 'issuer', ... }, { signerId: 'auditor', ... }]
// Verify all signatures
const result = await multiSig.verifyAll(async (signerId) => {
// Return public key for each signer
if (signerId === 'issuer') return issuerPublicKey;
if (signerId === 'auditor') return auditorPublicKey;
throw new Error(`Unknown signer: ${signerId}`);
});
console.log(result.allValid); // true if all signatures valid
console.log(result.results); // Per-signature resultsfrom beltic import (
MultiSigCredential,
add_signature,
verify_multi_signatures,
)
# Create multi-sig credential
multi_sig = MultiSigCredential(credential)
# Add signatures from multiple parties
await add_signature(multi_sig, issuer_private_key, "issuer", {
"alg": "EdDSA",
"issuer_did": "did:web:beltic.com",
"subject_did": "did:web:developer.example.com",
})
await add_signature(multi_sig, auditor_private_key, "auditor", {
"alg": "EdDSA",
"issuer_did": "did:web:auditor.example.com",
"subject_did": "did:web:developer.example.com",
})
# Get all signatures
signatures = multi_sig.get_signatures()
# Verify all signatures
async def key_resolver(signer_id):
if signer_id == "issuer":
return issuer_public_key
if signer_id == "auditor":
return auditor_public_key
raise ValueError(f"Unknown signer: {signer_id}")
result = await verify_multi_signatures(multi_sig, key_resolver)
print(result.all_valid) # True if all signatures valid
print(result.results) # Per-signature resultsUse Cases
- Multi-party verification: Require both issuer and auditor signatures
- Threshold signatures: Require N-of-M signatures
- Regulatory approval: Compliance officer co-signs credentials
Audit Logging
Track credential operations with pluggable audit handlers.
import {
AuditLogger,
AuditEventType,
ConsoleAuditHandler,
InMemoryAuditHandler,
getAuditLogger,
setAuditLogger,
} from '@beltic/sdk';
// Create logger with handlers
const logger = new AuditLogger();
logger.addHandler(new ConsoleAuditHandler());
const memoryHandler = new InMemoryAuditHandler();
logger.addHandler(memoryHandler);
// Set as global logger
setAuditLogger(logger);
// Log events manually
logger.log({
type: AuditEventType.CREDENTIAL_VERIFIED,
timestamp: new Date(),
credentialId: 'abc-123',
action: 'verify',
actor: 'did:web:verifier.example.com',
details: { algorithm: 'EdDSA' },
});
// Access in-memory events
console.log(memoryHandler.events);
// SDK operations auto-log when logger is set
const verified = await verifyCredential(token, options);
// Automatically logs CREDENTIAL_VERIFIED eventfrom beltic import (
AuditLogger,
AuditEventType,
AuditEvent,
ConsoleAuditHandler,
FileAuditHandler,
InMemoryAuditHandler,
)
from datetime import datetime
# Create logger with handlers
logger = AuditLogger()
logger.add_handler(ConsoleAuditHandler())
# Log to file
logger.add_handler(FileAuditHandler("audit.log"))
# Also keep in memory
memory_handler = InMemoryAuditHandler()
logger.add_handler(memory_handler)
# Log events manually
logger.log(AuditEvent(
type=AuditEventType.CREDENTIAL_VERIFIED,
timestamp=datetime.utcnow(),
credential_id="abc-123",
action="verify",
actor="did:web:verifier.example.com",
details={"algorithm": "EdDSA"},
))
# Access in-memory events
print(memory_handler.events)Event Types
| Event | Description |
|---|---|
CREDENTIAL_ISSUED | New credential was signed |
CREDENTIAL_VERIFIED | Credential signature was verified |
CREDENTIAL_REVOKED | Credential was revoked |
SIGNATURE_CREATED | Signature was created |
SIGNATURE_VERIFIED | Signature was verified |
TRUST_CHAIN_VERIFIED | Trust chain was verified |
POLICY_VIOLATION | Policy check failed |
Cloud Signers
Use cloud KMS services for production key management.
import {
LocalSigner,
AwsKmsSigner,
signWithSigner,
} from '@beltic/sdk';
// Local signer (for development)
const localSigner = new LocalSigner(privateKey, 'EdDSA');
// AWS KMS signer (for production)
const kmsSigner = new AwsKmsSigner({
keyId: 'arn:aws:kms:us-east-1:123456789:key/abc-123',
region: 'us-east-1',
algorithm: 'ECDSA_SHA_256', // ES256
});
// Sign with any signer
const signature = await signWithSigner(kmsSigner, data);
// Get signer info
const info = kmsSigner.getInfo();
console.log(info.algorithm); // 'ES256'
console.log(info.keyId); // KMS key ARNfrom beltic import (
LocalSigner,
CloudSigner,
)
# Local signer (for development)
local_signer = LocalSigner(private_key, "EdDSA")
# Sign with signer
signature = await local_signer.sign(data)
# Get signer info
info = local_signer.get_info()
print(info.algorithm) # "EdDSA"Cloud signers keep private keys in the cloud. The private key never leaves the KMS service, providing better security for production deployments.
Canonicalization (RFC 8785)
Deterministic JSON serialization for consistent hashing.
import {
canonicalize,
canonicalizeToString,
computeCanonicalHash,
} from '@beltic/sdk';
// Canonicalize JSON object
const obj = { b: 2, a: 1 };
const canonical = canonicalizeToString(obj);
// '{"a":1,"b":2}' // Keys sorted, no extra whitespace
// Compute hash of canonical form
const hash = computeCanonicalHash(credential, 'sha-256');# Python SDK uses content integrity module
from beltic import compute_content_hash
import json
# Canonicalize by sorting keys
canonical = json.dumps(obj, sort_keys=True, separators=(',', ':'))
# Compute hash
hash = compute_content_hash(canonical.encode(), "sha-256")Error Sanitization
Sanitize error messages to prevent information leakage in production.
import {
sanitizeMessage,
createSanitizedError,
SanitizedError,
} from '@beltic/sdk';
try {
await verifyCredential(token, options);
} catch (error) {
// Get safe message for external users
const publicMessage = getPublicMessage(error);
// Create sanitized error for API response
const sanitized = createSanitizedError(error);
res.status(401).json({
error: sanitized.message, // Safe for public
code: sanitized.code,
});
// Log full error internally
logger.error('Verification failed', { error });
}