import { NewsPipeline } from './NewsPipeline'; import { UniverseProvider } from './UniverseProvider'; import { EdgarPoller } from './pollers/EdgarPoller'; import { PrWirePoller } from './pollers/PrWirePoller'; import type { IngestStats, Logger } from '../shared/types'; /** * In-process polling scheduler (FREE-DATA-STACK §2). No Redis/BullMQ at the * free tier — plain intervals, unref'd so they never hold the process open. * * Cadences: EDGAR 10 min, PR-wire 15 min, retention daily. * Disable entirely with NEWS_POLL=off (e.g. when running bin/poll-news.ts * from cron instead of inside the server). */ export class NewsScheduler { private static readonly EDGAR_INTERVAL_MS = 10 * 60 * 1000; private static readonly PRWIRE_INTERVAL_MS = 15 * 60 * 1000; private static readonly RETENTION_INTERVAL_MS = 24 * 60 * 60 * 1000; private timers: NodeJS.Timeout[] = []; constructor( private readonly pipeline: NewsPipeline, private readonly universe: UniverseProvider, private readonly edgar: EdgarPoller, private readonly prwire: PrWirePoller, private readonly logger: Logger, ) {} start(): void { if (this.timers.length > 0) return; // already running const every = (ms: number, fn: () => void) => { const t = setInterval(fn, ms); t.unref(); // never keep the process alive just for polling this.timers.push(t); }; every(NewsScheduler.EDGAR_INTERVAL_MS, () => void this.runEdgar()); every(NewsScheduler.PRWIRE_INTERVAL_MS, () => void this.runPrWire()); every(NewsScheduler.RETENTION_INTERVAL_MS, () => this.runRetention()); // Prime once shortly after boot (delay keeps server startup fast) const boot = setTimeout(() => void this.runOnce(), 15_000); boot.unref(); this.timers.push(boot); this.logger.log('News scheduler started (EDGAR 10m, PR-wire 15m, retention 24h)'); } stop(): void { for (const t of this.timers) clearInterval(t); this.timers = []; } /** One full cycle of everything — used at boot and by bin/poll-news.ts. */ async runOnce(): Promise<{ edgar: IngestStats; prwire: IngestStats }> { const edgar = await this.runEdgar(); const prwire = await this.runPrWire(); return { edgar, prwire }; } private async runEdgar(): Promise { try { const stories = await this.edgar.poll(this.universe.getUniverse()); const stats = this.pipeline.ingest(stories, this.universe.getUniverse()); if (stats.stored > 0) this.logger.log(`EDGAR: stored ${stats.stored}/${stats.fetched}`); return stats; } catch (err) { this.logger.warn('EDGAR poll cycle failed:', (err as Error).message); return NewsScheduler.emptyStats(); } } private async runPrWire(): Promise { try { const stories = await this.prwire.poll(); const stats = this.pipeline.ingest(stories, this.universe.getUniverse()); if (stats.stored > 0) this.logger.log(`PR-wire: stored ${stats.stored}/${stats.fetched}`); return stats; } catch (err) { this.logger.warn('PR-wire poll cycle failed:', (err as Error).message); return NewsScheduler.emptyStats(); } } private runRetention(): void { try { const { bodiesPurged, rowsDeleted } = this.pipeline.runRetention(); this.logger.log(`News retention: ${bodiesPurged} bodies purged, ${rowsDeleted} rows deleted`); } catch (err) { this.logger.warn('News retention failed:', (err as Error).message); } } private static emptyStats(): IngestStats { return { fetched: 0, stored: 0, droppedNoUniverseTicker: 0, droppedNoise: 0, droppedDuplicate: 0, droppedCapped: 0, }; } }