import { createHash } from 'crypto'; import { NewsRepository } from './NewsRepository'; import type { CatalystType, IngestStats, NormalizedStory } from '../shared/types'; /** * Shared ingest pipeline (FREE-DATA-STACK §2) — every source flows through * here: FILTER → DEDUPE → CLASSIFY → STORE. All drops happen BEFORE insert, * cheapest check first, so the tables stay small by construction (§4). */ export class NewsPipeline { /** §4.4 — max stories linked per ticker per day (filings exempt). */ private static readonly DAILY_CAP = 25; /** §4.3 — syndicated-copy window for title dedupe. */ private static readonly TITLE_WINDOW_MS = 48 * 60 * 60 * 1000; /** §4.2 — headlines with no decision value are never stored. */ private static readonly NOISE_PATTERNS: RegExp[] = [ /\b\d+\s+(?:best|top|hot)\s+stocks?\b/i, /\bstocks?\s+to\s+(?:watch|buy|sell)\b/i, /\bprice\s+target\s+(?:raised|lowered|reiterated|maintained)\b/i, /\b(?:premarket|after-?hours?)\s+movers?\b/i, /\bwhy\s+.{0,40}\s+stock\s+(?:jumped|popped|soared|plunged|tanked)\b/i, /\bmotley\s+fool\b/i, ]; constructor(private readonly repo: NewsRepository) {} /** * Run a batch of normalized stories through the pipeline. * `universe` is the tracked-ticker set from UniverseProvider. */ ingest(stories: NormalizedStory[], universe: Set): IngestStats { const stats: IngestStats = { fetched: stories.length, stored: 0, droppedNoUniverseTicker: 0, droppedNoise: 0, droppedDuplicate: 0, droppedCapped: 0, }; for (const story of stories) { this.ingestOne(story, universe, stats); } return stats; } private ingestOne(story: NormalizedStory, universe: Set, stats: IngestStats): void { const isFiling = story.source === 'edgar'; // 1. Universe filter — the big one (§4.1) const tickers = [...new Set(story.tickers.map((t) => t.toUpperCase()))].filter((t) => universe.has(t), ); if (tickers.length === 0) { stats.droppedNoUniverseTicker++; return; } // 2. Noise blocklist (§4.2) — filings are never noise if (!isFiling && NewsPipeline.isNoise(story.headline)) { stats.droppedNoise++; return; } // 3. Dedupe (§4.3): url hash (storage-level PK) + recent title match const urlHash = NewsPipeline.sha(story.url); const titleHash = NewsPipeline.sha(NewsPipeline.normalizeTitle(story.headline)); const titleCutoff = new Date(Date.now() - NewsPipeline.TITLE_WINDOW_MS).toISOString(); if (this.repo.titleSeenSince(titleHash, titleCutoff)) { stats.droppedDuplicate++; return; } // 4. Per-ticker daily cap (§4.4) — filings keep priority past the cap const day = story.publishedAt.slice(0, 10); const eligible = isFiling ? tickers : tickers.filter((t) => this.repo.countTickerDay(t, day) < NewsPipeline.DAILY_CAP); if (eligible.length === 0) { stats.droppedCapped++; return; } // 5. Classify + store const catalyst = story.catalystHint ?? NewsPipeline.classify(story.headline); const inserted = this.repo.insertArticle({ urlHash, titleHash, tickers: eligible, headline: story.headline.trim(), body: story.body ?? null, source: story.source, catalyst, url: story.url, publishedAt: story.publishedAt, }); if (!inserted) { stats.droppedDuplicate++; // url_hash collision — already stored return; } for (const ticker of eligible) { this.repo.linkTicker(ticker, day, urlHash); } stats.stored++; } /** Retention jobs (§5) — call once daily. */ runRetention(now = new Date()): { bodiesPurged: number; rowsDeleted: number } { const bodyCutoff = new Date(now.getTime() - 90 * 24 * 60 * 60 * 1000).toISOString(); const rowCutoff = new Date(now.getTime() - 548 * 24 * 60 * 60 * 1000).toISOString(); // ~18mo return { bodiesPurged: this.repo.purgeBodiesBefore(bodyCutoff), rowsDeleted: this.repo.deleteUnreferencedBefore(rowCutoff), }; } // ── Pure helpers (exposed for tests) ────────────────────────────────────── static isNoise(headline: string): boolean { return NewsPipeline.NOISE_PATTERNS.some((re) => re.test(headline)); } /** * Keyword catalyst classifier. Order matters: M&A beats earnings * ("acquisition closes in Q2" is an M&A story). */ static classify(headline: string): CatalystType | null { const h = headline.toLowerCase(); if ( /\b(acqui[sr]|merger|takeover|buyout|tender offer|business combination|to be acquired)/.test( h, ) ) return 'ma'; if (/\b(guidance|outlook|forecast|raises full[- ]year|lowers full[- ]year)/.test(h)) return 'guidance'; if ( /\b(earnings|results|eps|quarterly report|q[1-4] (?:20\d\d|results)|fiscal (?:year|q[1-4]))/.test( h, ) ) return 'earnings'; if ( /\b(sec |fda|doj|ftc|antitrust|investigation|subpoena|lawsuit|settl|recall|approval)/.test(h) ) return 'regulatory'; if (/\b(fed |fomc|inflation|cpi|jobs report|rate (?:cut|hike)|treasury yield)/.test(h)) return 'macro'; return null; } static normalizeTitle(title: string): string { return title .toLowerCase() .replace(/[^a-z0-9 ]/g, '') .replace(/\s+/g, ' ') .trim(); } private static sha(input: string): string { return createHash('sha256').update(input).digest('hex'); } }