Files
2026-06-11 19:18:19 -04:00

107 lines
3.6 KiB
TypeScript

import { NewsPipeline } from './NewsPipeline';
import { UniverseProvider } from './UniverseProvider';
import { EdgarPoller } from './pollers/EdgarPoller';
import { PrWirePoller } from './pollers/PrWirePoller';
import type { IngestStats, Logger } from '../shared/types';
/**
* In-process polling scheduler (FREE-DATA-STACK §2). No Redis/BullMQ at the
* free tier — plain intervals, unref'd so they never hold the process open.
*
* Cadences: EDGAR 10 min, PR-wire 15 min, retention daily.
* Disable entirely with NEWS_POLL=off (e.g. when running bin/poll-news.ts
* from cron instead of inside the server).
*/
export class NewsScheduler {
private static readonly EDGAR_INTERVAL_MS = 10 * 60 * 1000;
private static readonly PRWIRE_INTERVAL_MS = 15 * 60 * 1000;
private static readonly RETENTION_INTERVAL_MS = 24 * 60 * 60 * 1000;
private timers: NodeJS.Timeout[] = [];
constructor(
private readonly pipeline: NewsPipeline,
private readonly universe: UniverseProvider,
private readonly edgar: EdgarPoller,
private readonly prwire: PrWirePoller,
private readonly logger: Logger,
) {}
start(): void {
if (this.timers.length > 0) return; // already running
const every = (ms: number, fn: () => void) => {
const t = setInterval(fn, ms);
t.unref(); // never keep the process alive just for polling
this.timers.push(t);
};
every(NewsScheduler.EDGAR_INTERVAL_MS, () => void this.runEdgar());
every(NewsScheduler.PRWIRE_INTERVAL_MS, () => void this.runPrWire());
every(NewsScheduler.RETENTION_INTERVAL_MS, () => this.runRetention());
// Prime once shortly after boot (delay keeps server startup fast)
const boot = setTimeout(() => void this.runOnce(), 15_000);
boot.unref();
this.timers.push(boot);
this.logger.log('News scheduler started (EDGAR 10m, PR-wire 15m, retention 24h)');
}
stop(): void {
for (const t of this.timers) clearInterval(t);
this.timers = [];
}
/** One full cycle of everything — used at boot and by bin/poll-news.ts. */
async runOnce(): Promise<{ edgar: IngestStats; prwire: IngestStats }> {
const edgar = await this.runEdgar();
const prwire = await this.runPrWire();
return { edgar, prwire };
}
private async runEdgar(): Promise<IngestStats> {
try {
const stories = await this.edgar.poll(this.universe.getUniverse());
const stats = this.pipeline.ingest(stories, this.universe.getUniverse());
if (stats.stored > 0) this.logger.log(`EDGAR: stored ${stats.stored}/${stats.fetched}`);
return stats;
} catch (err) {
this.logger.warn('EDGAR poll cycle failed:', (err as Error).message);
return NewsScheduler.emptyStats();
}
}
private async runPrWire(): Promise<IngestStats> {
try {
const stories = await this.prwire.poll();
const stats = this.pipeline.ingest(stories, this.universe.getUniverse());
if (stats.stored > 0) this.logger.log(`PR-wire: stored ${stats.stored}/${stats.fetched}`);
return stats;
} catch (err) {
this.logger.warn('PR-wire poll cycle failed:', (err as Error).message);
return NewsScheduler.emptyStats();
}
}
private runRetention(): void {
try {
const { bodiesPurged, rowsDeleted } = this.pipeline.runRetention();
this.logger.log(`News retention: ${bodiesPurged} bodies purged, ${rowsDeleted} rows deleted`);
} catch (err) {
this.logger.warn('News retention failed:', (err as Error).message);
}
}
private static emptyStats(): IngestStats {
return {
fetched: 0,
stored: 0,
droppedNoUniverseTicker: 0,
droppedNoise: 0,
droppedDuplicate: 0,
droppedCapped: 0,
};
}
}