Beltic logo
Advanced Topics

Advanced SDK Features

Advanced features in the Beltic SDK including selective disclosure, replay protection, hash chains, multi-signatures, and audit logging.

Both the TypeScript and Python SDKs include advanced features for enterprise use cases. This page covers features beyond basic validation, signing, and verification.

Selective Disclosure (SD-JWT)

SD-JWT allows credential holders to selectively disclose specific claims without revealing the entire credential. This is useful for privacy-preserving verification scenarios.

import {
  createSdJwt,
  createPresentation,
  verifySdJwt,
} from '@beltic/sdk';

// Create SD-JWT with selectively disclosable claims
const sdJwt = await createSdJwt(credential, privateKey, {
  alg: 'EdDSA',
  issuerDid: 'did:web:beltic.com',
  subjectDid: 'did:web:developer.example.com',
  disclosablePaths: [
    '/legalName',
    '/incorporationJurisdiction',
    '/kybTier',
  ],
});

// Create presentation with selected disclosures
const presentation = createPresentation(sdJwt, [
  '/kybTier',  // Only disclose KYB tier
]);

// Verify SD-JWT presentation
const result = await verifySdJwt(presentation, {
  keyResolver: async (header) => publicKey,
});

console.log(result.disclosedClaims);  // Only contains kybTier
from beltic import (
    create_sd_jwt,
    create_presentation,
    verify_sd_jwt,
)

# Create SD-JWT with selectively disclosable claims
sd_jwt = await create_sd_jwt(credential, private_key, {
    "alg": "EdDSA",
    "issuer_did": "did:web:beltic.com",
    "subject_did": "did:web:developer.example.com",
    "disclosable_paths": [
        "/legalName",
        "/incorporationJurisdiction",
        "/kybTier",
    ],
})

# Create presentation with selected disclosures
presentation = create_presentation(sd_jwt, [
    "/kybTier",  # Only disclose KYB tier
])

# Verify SD-JWT presentation
async def resolver(input):
    return public_key

result = await verify_sd_jwt(presentation, {
    "key_resolver": resolver,
})

print(result.disclosed_claims)  # Only contains kybTier

Use Cases

  • Minimal disclosure: Share only KYB tier without revealing legal name
  • Regulatory compliance: Disclose different fields to different verifiers
  • Privacy protection: Prevent data correlation across platforms

Replay Protection

Prevent credential tokens from being reused in replay attacks by tracking seen JTI (JWT ID) values.

import {
  InMemoryJtiStore,
  checkReplay,
  ReplayDetectedError,
} from '@beltic/sdk';

// Create JTI store (use Redis/database in production)
const jtiStore = new InMemoryJtiStore(300); // 5 min TTL

// Check for replay before processing
async function processToken(token: string) {
  const decoded = decodeToken(token);
  const jti = decoded.payload.jti;
  const exp = decoded.payload.exp;
  
  try {
    await checkReplay(jtiStore, jti, exp);
    // Process token...
  } catch (error) {
    if (error instanceof ReplayDetectedError) {
      console.error(`Replay attack detected: ${error.jti}`);
      throw error;
    }
    throw error;
  }
}

// With middleware
import { createReplayMiddleware } from '@beltic/sdk';

const replayMiddleware = createReplayMiddleware(jtiStore);

app.use('/api/verify', replayMiddleware);
from beltic import (
    InMemoryJtiStore,
    ReplayDetectedError,
    decode_token,
)

# Create JTI store (use Redis/database in production)
jti_store = InMemoryJtiStore(ttl_seconds=300)  # 5 min TTL

async def process_token(token: str):
    decoded = decode_token(token)
    jti = decoded.payload["jti"]
    exp = decoded.payload["exp"]
    
    # Check if already seen
    if await jti_store.has(jti):
        raise ReplayDetectedError(jti)
    
    # Mark as seen
    await jti_store.add(jti, exp)
    
    # Process token...

Production Considerations

Use a persistent store in production. InMemoryJtiStore is suitable for development but data is lost on restart. Use Redis, PostgreSQL, or a similar persistent store.

Content Integrity

Compute and verify cryptographic hashes of content to ensure integrity.

import {
  computeContentHash,
  verifyContentHash,
  createHashClaim,
  verifyHashClaim,
} from '@beltic/sdk';

// Compute hash of content
const content = Buffer.from(JSON.stringify(credential));
const hash = computeContentHash(content, 'sha-256');

// Verify hash
const isValid = verifyContentHash(content, hash, 'sha-256');

// Create hash claim for embedding in credentials
const hashClaim = createHashClaim(content, 'sha-256');
// { algorithm: "sha-256", hash: "base64url-encoded-hash" }

// Verify hash claim
const claimValid = verifyHashClaim(content, hashClaim);
from beltic import (
    compute_content_hash,
    verify_content_hash,
    create_hash_claim,
    verify_hash_claim,
)

# Compute hash of content
content = json.dumps(credential).encode()
hash = compute_content_hash(content, "sha-256")

# Verify hash
is_valid = verify_content_hash(content, hash, "sha-256")

# Create hash claim for embedding in credentials
hash_claim = create_hash_claim(content, "sha-256")
# {"algorithm": "sha-256", "hash": "base64url-encoded-hash"}

# Verify hash claim
claim_valid = verify_hash_claim(content, hash_claim)

Credential Hash Chains

Create an auditable chain of credentials where each entry is linked to the previous one via cryptographic hashes.

import {
  CredentialChain,
  verifyChainIntegrity,
} from '@beltic/sdk';

// Create a new chain
const chain = new CredentialChain();

// Add credentials to chain
const entry1 = chain.add(credential1, token1);
const entry2 = chain.add(credential2, token2);
const entry3 = chain.add(credential3, token3);

// Each entry has:
// - credentialHash: hash of the credential
// - previousHash: hash of previous entry (null for first)
// - chainHash: combined hash linking to previous

// Get proof for audit
const proof = chain.getProof();

// Verify chain integrity
const isIntact = chain.verify();

// Verify from serialized entries
const entries = [...]; // Load from database
const valid = verifyChainIntegrity(entries);
from beltic import (
    CredentialChain,
    verify_chain_integrity,
)

# Create a new chain
chain = CredentialChain()

# Add credentials to chain
entry1 = chain.add(credential1, token1)
entry2 = chain.add(credential2, token2)
entry3 = chain.add(credential3, token3)

# Get proof for audit
proof = chain.get_proof()

# Verify chain integrity
is_intact = chain.verify()

# Verify from serialized entries
entries = [...]  # Load from database
valid = verify_chain_integrity(entries)

Use Cases

  • Audit trail: Track all credential issuances in tamper-evident log
  • Version history: Link credential updates to previous versions
  • Compliance: Demonstrate credential lifecycle for regulators

Multi-Signature Support

Credentials requiring approval from multiple parties can use multi-signature support.

import {
  MultiSigCredential,
  addSignature,
  verifyMultiSignatures,
} from '@beltic/sdk';

// Create multi-sig credential
const multiSig = new MultiSigCredential(credential);

// Add signatures from multiple parties
await multiSig.addSignature(issuerPrivateKey, 'issuer', {
  alg: 'EdDSA',
  issuerDid: 'did:web:beltic.com',
  subjectDid: 'did:web:developer.example.com',
});

await multiSig.addSignature(auditorPrivateKey, 'auditor', {
  alg: 'EdDSA',
  issuerDid: 'did:web:auditor.example.com',
  subjectDid: 'did:web:developer.example.com',
});

// Get all signatures
const signatures = multiSig.getSignatures();
// [{ signerId: 'issuer', ... }, { signerId: 'auditor', ... }]

// Verify all signatures
const result = await multiSig.verifyAll(async (signerId) => {
  // Return public key for each signer
  if (signerId === 'issuer') return issuerPublicKey;
  if (signerId === 'auditor') return auditorPublicKey;
  throw new Error(`Unknown signer: ${signerId}`);
});

console.log(result.allValid);  // true if all signatures valid
console.log(result.results);   // Per-signature results
from beltic import (
    MultiSigCredential,
    add_signature,
    verify_multi_signatures,
)

# Create multi-sig credential
multi_sig = MultiSigCredential(credential)

# Add signatures from multiple parties
await add_signature(multi_sig, issuer_private_key, "issuer", {
    "alg": "EdDSA",
    "issuer_did": "did:web:beltic.com",
    "subject_did": "did:web:developer.example.com",
})

await add_signature(multi_sig, auditor_private_key, "auditor", {
    "alg": "EdDSA",
    "issuer_did": "did:web:auditor.example.com",
    "subject_did": "did:web:developer.example.com",
})

# Get all signatures
signatures = multi_sig.get_signatures()

# Verify all signatures
async def key_resolver(signer_id):
    if signer_id == "issuer":
        return issuer_public_key
    if signer_id == "auditor":
        return auditor_public_key
    raise ValueError(f"Unknown signer: {signer_id}")

result = await verify_multi_signatures(multi_sig, key_resolver)

print(result.all_valid)  # True if all signatures valid
print(result.results)    # Per-signature results

Use Cases

  • Multi-party verification: Require both issuer and auditor signatures
  • Threshold signatures: Require N-of-M signatures
  • Regulatory approval: Compliance officer co-signs credentials

Audit Logging

Track credential operations with pluggable audit handlers.

import {
  AuditLogger,
  AuditEventType,
  ConsoleAuditHandler,
  InMemoryAuditHandler,
  getAuditLogger,
  setAuditLogger,
} from '@beltic/sdk';

// Create logger with handlers
const logger = new AuditLogger();
logger.addHandler(new ConsoleAuditHandler());

const memoryHandler = new InMemoryAuditHandler();
logger.addHandler(memoryHandler);

// Set as global logger
setAuditLogger(logger);

// Log events manually
logger.log({
  type: AuditEventType.CREDENTIAL_VERIFIED,
  timestamp: new Date(),
  credentialId: 'abc-123',
  action: 'verify',
  actor: 'did:web:verifier.example.com',
  details: { algorithm: 'EdDSA' },
});

// Access in-memory events
console.log(memoryHandler.events);

// SDK operations auto-log when logger is set
const verified = await verifyCredential(token, options);
// Automatically logs CREDENTIAL_VERIFIED event
from beltic import (
    AuditLogger,
    AuditEventType,
    AuditEvent,
    ConsoleAuditHandler,
    FileAuditHandler,
    InMemoryAuditHandler,
)
from datetime import datetime

# Create logger with handlers
logger = AuditLogger()
logger.add_handler(ConsoleAuditHandler())

# Log to file
logger.add_handler(FileAuditHandler("audit.log"))

# Also keep in memory
memory_handler = InMemoryAuditHandler()
logger.add_handler(memory_handler)

# Log events manually
logger.log(AuditEvent(
    type=AuditEventType.CREDENTIAL_VERIFIED,
    timestamp=datetime.utcnow(),
    credential_id="abc-123",
    action="verify",
    actor="did:web:verifier.example.com",
    details={"algorithm": "EdDSA"},
))

# Access in-memory events
print(memory_handler.events)

Event Types

EventDescription
CREDENTIAL_ISSUEDNew credential was signed
CREDENTIAL_VERIFIEDCredential signature was verified
CREDENTIAL_REVOKEDCredential was revoked
SIGNATURE_CREATEDSignature was created
SIGNATURE_VERIFIEDSignature was verified
TRUST_CHAIN_VERIFIEDTrust chain was verified
POLICY_VIOLATIONPolicy check failed

Cloud Signers

Use cloud KMS services for production key management.

import {
  LocalSigner,
  AwsKmsSigner,
  signWithSigner,
} from '@beltic/sdk';

// Local signer (for development)
const localSigner = new LocalSigner(privateKey, 'EdDSA');

// AWS KMS signer (for production)
const kmsSigner = new AwsKmsSigner({
  keyId: 'arn:aws:kms:us-east-1:123456789:key/abc-123',
  region: 'us-east-1',
  algorithm: 'ECDSA_SHA_256',  // ES256
});

// Sign with any signer
const signature = await signWithSigner(kmsSigner, data);

// Get signer info
const info = kmsSigner.getInfo();
console.log(info.algorithm);  // 'ES256'
console.log(info.keyId);      // KMS key ARN
from beltic import (
    LocalSigner,
    CloudSigner,
)

# Local signer (for development)
local_signer = LocalSigner(private_key, "EdDSA")

# Sign with signer
signature = await local_signer.sign(data)

# Get signer info
info = local_signer.get_info()
print(info.algorithm)  # "EdDSA"

Cloud signers keep private keys in the cloud. The private key never leaves the KMS service, providing better security for production deployments.

Canonicalization (RFC 8785)

Deterministic JSON serialization for consistent hashing.

import {
  canonicalize,
  canonicalizeToString,
  computeCanonicalHash,
} from '@beltic/sdk';

// Canonicalize JSON object
const obj = { b: 2, a: 1 };
const canonical = canonicalizeToString(obj);
// '{"a":1,"b":2}'  // Keys sorted, no extra whitespace

// Compute hash of canonical form
const hash = computeCanonicalHash(credential, 'sha-256');
# Python SDK uses content integrity module
from beltic import compute_content_hash
import json

# Canonicalize by sorting keys
canonical = json.dumps(obj, sort_keys=True, separators=(',', ':'))

# Compute hash
hash = compute_content_hash(canonical.encode(), "sha-256")

Error Sanitization

Sanitize error messages to prevent information leakage in production.

import {
  sanitizeMessage,
  createSanitizedError,
  SanitizedError,
} from '@beltic/sdk';

try {
  await verifyCredential(token, options);
} catch (error) {
  // Get safe message for external users
  const publicMessage = getPublicMessage(error);
  
  // Create sanitized error for API response
  const sanitized = createSanitizedError(error);
  
  res.status(401).json({
    error: sanitized.message,  // Safe for public
    code: sanitized.code,
  });
  
  // Log full error internally
  logger.error('Verification failed', { error });
}

See Also