166 lines
5.5 KiB
TypeScript
166 lines
5.5 KiB
TypeScript
import { createHash } from 'crypto';
|
|
import { NewsRepository } from './NewsRepository';
|
|
import type { CatalystType, IngestStats, NormalizedStory } from '../shared/types';
|
|
|
|
/**
|
|
* Shared ingest pipeline (FREE-DATA-STACK §2) — every source flows through
|
|
* here: FILTER → DEDUPE → CLASSIFY → STORE. All drops happen BEFORE insert,
|
|
* cheapest check first, so the tables stay small by construction (§4).
|
|
*/
|
|
export class NewsPipeline {
|
|
/** §4.4 — max stories linked per ticker per day (filings exempt). */
|
|
private static readonly DAILY_CAP = 25;
|
|
/** §4.3 — syndicated-copy window for title dedupe. */
|
|
private static readonly TITLE_WINDOW_MS = 48 * 60 * 60 * 1000;
|
|
|
|
/** §4.2 — headlines with no decision value are never stored. */
|
|
private static readonly NOISE_PATTERNS: RegExp[] = [
|
|
/\b\d+\s+(?:best|top|hot)\s+stocks?\b/i,
|
|
/\bstocks?\s+to\s+(?:watch|buy|sell)\b/i,
|
|
/\bprice\s+target\s+(?:raised|lowered|reiterated|maintained)\b/i,
|
|
/\b(?:premarket|after-?hours?)\s+movers?\b/i,
|
|
/\bwhy\s+.{0,40}\s+stock\s+(?:jumped|popped|soared|plunged|tanked)\b/i,
|
|
/\bmotley\s+fool\b/i,
|
|
];
|
|
|
|
constructor(private readonly repo: NewsRepository) {}
|
|
|
|
/**
|
|
* Run a batch of normalized stories through the pipeline.
|
|
* `universe` is the tracked-ticker set from UniverseProvider.
|
|
*/
|
|
ingest(stories: NormalizedStory[], universe: Set<string>): IngestStats {
|
|
const stats: IngestStats = {
|
|
fetched: stories.length,
|
|
stored: 0,
|
|
droppedNoUniverseTicker: 0,
|
|
droppedNoise: 0,
|
|
droppedDuplicate: 0,
|
|
droppedCapped: 0,
|
|
};
|
|
|
|
for (const story of stories) {
|
|
this.ingestOne(story, universe, stats);
|
|
}
|
|
return stats;
|
|
}
|
|
|
|
private ingestOne(story: NormalizedStory, universe: Set<string>, stats: IngestStats): void {
|
|
const isFiling = story.source === 'edgar';
|
|
|
|
// 1. Universe filter — the big one (§4.1)
|
|
const tickers = [...new Set(story.tickers.map((t) => t.toUpperCase()))].filter((t) =>
|
|
universe.has(t),
|
|
);
|
|
if (tickers.length === 0) {
|
|
stats.droppedNoUniverseTicker++;
|
|
return;
|
|
}
|
|
|
|
// 2. Noise blocklist (§4.2) — filings are never noise
|
|
if (!isFiling && NewsPipeline.isNoise(story.headline)) {
|
|
stats.droppedNoise++;
|
|
return;
|
|
}
|
|
|
|
// 3. Dedupe (§4.3): url hash (storage-level PK) + recent title match
|
|
const urlHash = NewsPipeline.sha(story.url);
|
|
const titleHash = NewsPipeline.sha(NewsPipeline.normalizeTitle(story.headline));
|
|
const titleCutoff = new Date(Date.now() - NewsPipeline.TITLE_WINDOW_MS).toISOString();
|
|
if (this.repo.titleSeenSince(titleHash, titleCutoff)) {
|
|
stats.droppedDuplicate++;
|
|
return;
|
|
}
|
|
|
|
// 4. Per-ticker daily cap (§4.4) — filings keep priority past the cap
|
|
const day = story.publishedAt.slice(0, 10);
|
|
const eligible = isFiling
|
|
? tickers
|
|
: tickers.filter((t) => this.repo.countTickerDay(t, day) < NewsPipeline.DAILY_CAP);
|
|
if (eligible.length === 0) {
|
|
stats.droppedCapped++;
|
|
return;
|
|
}
|
|
|
|
// 5. Classify + store
|
|
const catalyst = story.catalystHint ?? NewsPipeline.classify(story.headline);
|
|
const inserted = this.repo.insertArticle({
|
|
urlHash,
|
|
titleHash,
|
|
tickers: eligible,
|
|
headline: story.headline.trim(),
|
|
body: story.body ?? null,
|
|
source: story.source,
|
|
catalyst,
|
|
url: story.url,
|
|
publishedAt: story.publishedAt,
|
|
});
|
|
if (!inserted) {
|
|
stats.droppedDuplicate++; // url_hash collision — already stored
|
|
return;
|
|
}
|
|
|
|
for (const ticker of eligible) {
|
|
this.repo.linkTicker(ticker, day, urlHash);
|
|
}
|
|
stats.stored++;
|
|
}
|
|
|
|
/** Retention jobs (§5) — call once daily. */
|
|
runRetention(now = new Date()): { bodiesPurged: number; rowsDeleted: number } {
|
|
const bodyCutoff = new Date(now.getTime() - 90 * 24 * 60 * 60 * 1000).toISOString();
|
|
const rowCutoff = new Date(now.getTime() - 548 * 24 * 60 * 60 * 1000).toISOString(); // ~18mo
|
|
return {
|
|
bodiesPurged: this.repo.purgeBodiesBefore(bodyCutoff),
|
|
rowsDeleted: this.repo.deleteUnreferencedBefore(rowCutoff),
|
|
};
|
|
}
|
|
|
|
// ── Pure helpers (exposed for tests) ──────────────────────────────────────
|
|
|
|
static isNoise(headline: string): boolean {
|
|
return NewsPipeline.NOISE_PATTERNS.some((re) => re.test(headline));
|
|
}
|
|
|
|
/**
|
|
* Keyword catalyst classifier. Order matters: M&A beats earnings
|
|
* ("acquisition closes in Q2" is an M&A story).
|
|
*/
|
|
static classify(headline: string): CatalystType | null {
|
|
const h = headline.toLowerCase();
|
|
if (
|
|
/\b(acqui[sr]|merger|takeover|buyout|tender offer|business combination|to be acquired)/.test(
|
|
h,
|
|
)
|
|
)
|
|
return 'ma';
|
|
if (/\b(guidance|outlook|forecast|raises full[- ]year|lowers full[- ]year)/.test(h))
|
|
return 'guidance';
|
|
if (
|
|
/\b(earnings|results|eps|quarterly report|q[1-4] (?:20\d\d|results)|fiscal (?:year|q[1-4]))/.test(
|
|
h,
|
|
)
|
|
)
|
|
return 'earnings';
|
|
if (
|
|
/\b(sec |fda|doj|ftc|antitrust|investigation|subpoena|lawsuit|settl|recall|approval)/.test(h)
|
|
)
|
|
return 'regulatory';
|
|
if (/\b(fed |fomc|inflation|cpi|jobs report|rate (?:cut|hike)|treasury yield)/.test(h))
|
|
return 'macro';
|
|
return null;
|
|
}
|
|
|
|
static normalizeTitle(title: string): string {
|
|
return title
|
|
.toLowerCase()
|
|
.replace(/[^a-z0-9 ]/g, '')
|
|
.replace(/\s+/g, ' ')
|
|
.trim();
|
|
}
|
|
|
|
private static sha(input: string): string {
|
|
return createHash('sha256').update(input).digest('hex');
|
|
}
|
|
}
|