phase-10.5: screener enhancements

This commit is contained in:
Kazuma
2026-06-11 19:18:19 -04:00
parent f0c794f0c0
commit bf2a85b5c4
51 changed files with 3745 additions and 36 deletions
+43 -1
View File
@@ -11,6 +11,16 @@ import { CallsController, CalendarService } from './domains/calls';
import { AuthController, AuthService, UserStore, verifyJwt } from './domains/auth';
import type { TokenPayload } from './domains/auth';
import { WatchlistController, WatchlistRepository } from './domains/watchlist';
import {
NewsController,
NewsRepository,
NewsPipeline,
UniverseProvider,
NewsScheduler,
EdgarPoller,
PrWirePoller,
} from './domains/news';
import { DigestController, DigestService } from './domains/digest';
// Shared infrastructure
import {
@@ -141,7 +151,14 @@ export async function buildApp({ logger = true, db: injectedDb }: BuildAppOption
// Register controllers
// Public routes (GET) remain open; write routes require JWT + trader role
new ScreenerController(engine, catalystCache, new SignalSnapshotRepository(db)).register(app);
const newsRepo = new NewsRepository(db);
new ScreenerController(
engine,
catalystCache,
new SignalSnapshotRepository(db),
yahoo,
newsRepo,
).register(app);
new FinanceController(engine, new PortfolioRepository(db), advisor, {
authGuard,
traderGuard,
@@ -154,6 +171,31 @@ export async function buildApp({ logger = true, db: injectedDb }: BuildAppOption
new WatchlistController(new WatchlistRepository(db), { authGuard }).register(app);
// ── News domain (FREE-DATA-STACK) — pipeline + read API + polling ────────
new NewsController(newsRepo, yahoo).register(app);
// ── Digest domain (P1.1) — snapshot diff + catalyst join, on demand ──────
new DigestController(new DigestService(new SignalSnapshotRepository(db), newsRepo)).register(app);
// Polling runs inside the server unless NEWS_POLL=off (use bin/poll-news.ts
// from cron instead). Timers are unref'd and cleared on app.close().
if (process.env.NEWS_POLL !== 'off') {
const newsLogger = {
log: (...args: unknown[]) => app.log.info(args.map(String).join(' ')),
warn: (...args: unknown[]) => app.log.warn(args.map(String).join(' ')),
write: () => {},
};
const newsScheduler = new NewsScheduler(
new NewsPipeline(newsRepo),
new UniverseProvider(db),
new EdgarPoller(newsLogger),
new PrWirePoller(newsLogger),
newsLogger,
);
app.addHook('onReady', async () => newsScheduler.start());
app.addHook('onClose', async () => newsScheduler.stop());
}
app.get('/health', async () => ({ status: 'ok' }));
return app;
+110
View File
@@ -0,0 +1,110 @@
import { SignalSnapshotRepository } from '../shared/persistence/SignalSnapshotRepository';
import { NewsRepository } from '../news/NewsRepository';
import { SIGNAL_ORDER } from '../shared/config/constants';
import type {
DigestCatalyst,
DigestChange,
DigestReport,
NewsArticleRow,
SignalSnapshotRow,
} from '../shared/types';
/**
* Daily change digest (PRODUCT.md P1.1) — the step that makes the snapshot
* ledger and the news pipeline actionable together.
*
* For each ticker snapshotted today, diff against its most recent previous
* snapshot. A signal flip alone is just information; a signal flip WITH a
* known catalyst attached is the highest-value alert the free stack can
* produce. M&A stories are always surfaced, change or no change.
*
* Run order matters: screen first (writes today's snapshots), digest second.
*/
export class DigestService {
/** How many days back to look for catalyst stories per ticker. */
private static readonly NEWS_LOOKBACK_DAYS = 2;
constructor(
private readonly snapshots: SignalSnapshotRepository,
private readonly news: NewsRepository,
) {}
build(date = new Date().toISOString().slice(0, 10)): DigestReport {
const today = this.snapshots.byDate(date);
const previous = new Map(this.snapshots.latestBefore(date).map((r) => [r.ticker, r]));
const newsSince = DigestService.daysBefore(date, DigestService.NEWS_LOOKBACK_DAYS);
const changes: DigestChange[] = [];
const newTickers: string[] = [];
const maStories = new Map<string, DigestCatalyst>(); // url → story, deduped
for (const snap of today) {
const prev = previous.get(snap.ticker);
const catalysts = this.news
.newsForTicker(snap.ticker, newsSince)
.map(DigestService.toCatalyst);
// Always collect M&A stories, even without a signal change
for (const c of catalysts) {
if (c.catalyst === 'ma') maStories.set(c.url, c);
}
if (!prev) {
newTickers.push(snap.ticker);
continue;
}
if (prev.signal === snap.signal) continue;
changes.push({
ticker: snap.ticker,
previousSignal: prev.signal,
newSignal: snap.signal,
previousDate: prev.snapshot_date,
scoreDelta: DigestService.scoreDelta(prev, snap),
price: snap.price,
catalysts,
});
}
// Strongest impact first: biggest move across the signal ordering
changes.sort((a, b) => DigestService.impact(b) - DigestService.impact(a));
return {
date,
changes,
newTickers,
maStories: [...maStories.values()],
snapshotCount: today.length,
};
}
// ── Helpers ───────────────────────────────────────────────────────────────
private static toCatalyst(row: NewsArticleRow): DigestCatalyst {
return {
headline: row.headline,
catalyst: row.catalyst,
source: row.source,
url: row.url,
publishedAt: row.published_at,
};
}
private static scoreDelta(prev: SignalSnapshotRow, curr: SignalSnapshotRow): number | null {
if (prev.fundamental_score == null || curr.fundamental_score == null) return null;
return +(curr.fundamental_score - prev.fundamental_score).toFixed(1);
}
/** Distance moved across the signal ordering (Strong Buy=0 … Avoid=4). */
private static impact(change: DigestChange): number {
const ord = (s: string) => SIGNAL_ORDER[s] ?? 5;
return Math.abs(ord(change.newSignal) - ord(change.previousSignal));
}
/** YYYY-MM-DD `n` days before the given day. */
private static daysBefore(date: string, n: number): string {
const d = new Date(`${date}T00:00:00.000Z`);
d.setUTCDate(d.getUTCDate() - n);
return d.toISOString().slice(0, 10);
}
}
+128
View File
@@ -0,0 +1,128 @@
import type { DigestReport, Logger } from '../shared/types';
/**
* Posts the daily digest to a Discord webhook (DISCORD_WEBHOOK_URL in .env).
* When the env var is unset, send() is a no-op and the caller falls back to
* console output — the digest is still useful without Discord.
*
* Embed building is a pure static so it can be unit-tested without network.
*/
export class DiscordNotifier {
private static readonly MAX_FIELDS = 10; // Discord caps embeds at 25 fields; keep digests scannable
constructor(
private readonly logger: Logger,
private readonly webhookUrl = process.env.DISCORD_WEBHOOK_URL,
) {}
get enabled(): boolean {
return Boolean(this.webhookUrl);
}
async send(report: DigestReport): Promise<boolean> {
if (!this.webhookUrl) return false;
const payload = DiscordNotifier.buildPayload(report);
if (!payload) {
this.logger.log('Digest: nothing to report — Discord post skipped');
return false;
}
let res = await this.post(payload);
// Forum channels require a thread name (Discord error code 220001) —
// retry once, creating a post titled with the digest date.
if (res.status === 400 && (await DiscordNotifier.isForumError(res))) {
this.logger.log('Webhook targets a forum channel — retrying with thread_name');
res = await this.post({ ...payload, thread_name: `Signal Digest ${report.date}` });
}
if (!res.ok) {
const body = await res.text().catch(() => '');
this.logger.warn(
`Discord webhook failed: HTTP ${res.status}${body.slice(0, 200) || 'no response body'}`,
);
if (res.status === 401 || res.status === 404) {
this.logger.warn(
'Hint: the URL in .env must be the RAW webhook URL (no <>, no quotes, no HTML escaping), ' +
'ending in a ~68-char token. Re-copy it: Channel Settings → Integrations → Webhooks.',
);
}
return false;
}
return true;
}
private post(payload: object): Promise<Response> {
return fetch(this.webhookUrl as string, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
}
private static async isForumError(res: Response): Promise<boolean> {
try {
const body = (await res.clone().json()) as { code?: number };
return body.code === 220001;
} catch {
return false;
}
}
/** Returns null when there is nothing worth posting. */
static buildPayload(report: DigestReport): { embeds: unknown[] } | null {
if (report.changes.length === 0 && report.maStories.length === 0) return null;
const fields: Array<{ name: string; value: string; inline: boolean }> = [];
for (const c of report.changes.slice(0, DiscordNotifier.MAX_FIELDS)) {
const delta =
c.scoreDelta != null ? ` (score ${c.scoreDelta > 0 ? '+' : ''}${c.scoreDelta})` : '';
const catalystLine = c.catalysts.length
? c.catalysts
.slice(0, 2)
.map((s) => `• [${s.catalyst ?? 'news'}] ${DiscordNotifier.trim(s.headline, 80)}`)
.join('\n')
: '• no catalyst found — verdict moved on fundamentals/market data';
fields.push({
name: `${c.ticker}: ${c.previousSignal}${c.newSignal}${delta}`,
value: catalystLine,
inline: false,
});
}
if (report.changes.length > DiscordNotifier.MAX_FIELDS) {
fields.push({
name: `…and ${report.changes.length - DiscordNotifier.MAX_FIELDS} more changes`,
value: 'See GET /api/digest for the full report',
inline: false,
});
}
if (report.maStories.length > 0) {
fields.push({
name: `🔱 M&A activity (${report.maStories.length})`,
value: report.maStories
.slice(0, 5)
.map((s) => `${DiscordNotifier.trim(s.headline, 90)}`)
.join('\n'),
inline: false,
});
}
return {
embeds: [
{
title: `📊 Daily Signal Digest — ${report.date}`,
description: `${report.snapshotCount} tickers screened · ${report.changes.length} signal change(s)`,
color: report.changes.length > 0 ? 0xf0b429 : 0x4ade80, // amber if changes, green if calm
fields,
},
],
};
}
private static trim(s: string, max: number): string {
return s.length <= max ? s : `${s.slice(0, max - 1)}`;
}
}
@@ -0,0 +1,22 @@
import type { FastifyInstance, FastifyRequest } from 'fastify';
import { DigestService } from './DigestService';
/**
* On-demand digest read (P1.1). The scheduled path is bin/daily-digest.ts;
* this endpoint lets the UI (or curl) build the same report any time.
*/
export class DigestController {
constructor(private readonly digest: DigestService) {}
register(app: FastifyInstance): void {
app.get('/api/digest', this.today.bind(this));
}
/** GET /api/digest?date=YYYY-MM-DD (defaults to today) */
private async today(req: FastifyRequest) {
const { date } = req.query as { date?: string };
const day =
date && /^\d{4}-\d{2}-\d{2}$/.test(date) ? date : new Date().toISOString().slice(0, 10);
return this.digest.build(day);
}
}
+5
View File
@@ -0,0 +1,5 @@
// Digest domain — daily change detection (PRODUCT.md P1.1)
export { DigestService } from './DigestService';
export { DiscordNotifier } from './DiscordNotifier';
export { DigestController } from './digest.controller';
+165
View File
@@ -0,0 +1,165 @@
import { createHash } from 'crypto';
import { NewsRepository } from './NewsRepository';
import type { CatalystType, IngestStats, NormalizedStory } from '../shared/types';
/**
* Shared ingest pipeline (FREE-DATA-STACK §2) — every source flows through
* here: FILTER → DEDUPE → CLASSIFY → STORE. All drops happen BEFORE insert,
* cheapest check first, so the tables stay small by construction (§4).
*/
export class NewsPipeline {
/** §4.4 — max stories linked per ticker per day (filings exempt). */
private static readonly DAILY_CAP = 25;
/** §4.3 — syndicated-copy window for title dedupe. */
private static readonly TITLE_WINDOW_MS = 48 * 60 * 60 * 1000;
/** §4.2 — headlines with no decision value are never stored. */
private static readonly NOISE_PATTERNS: RegExp[] = [
/\b\d+\s+(?:best|top|hot)\s+stocks?\b/i,
/\bstocks?\s+to\s+(?:watch|buy|sell)\b/i,
/\bprice\s+target\s+(?:raised|lowered|reiterated|maintained)\b/i,
/\b(?:premarket|after-?hours?)\s+movers?\b/i,
/\bwhy\s+.{0,40}\s+stock\s+(?:jumped|popped|soared|plunged|tanked)\b/i,
/\bmotley\s+fool\b/i,
];
constructor(private readonly repo: NewsRepository) {}
/**
* Run a batch of normalized stories through the pipeline.
* `universe` is the tracked-ticker set from UniverseProvider.
*/
ingest(stories: NormalizedStory[], universe: Set<string>): IngestStats {
const stats: IngestStats = {
fetched: stories.length,
stored: 0,
droppedNoUniverseTicker: 0,
droppedNoise: 0,
droppedDuplicate: 0,
droppedCapped: 0,
};
for (const story of stories) {
this.ingestOne(story, universe, stats);
}
return stats;
}
private ingestOne(story: NormalizedStory, universe: Set<string>, stats: IngestStats): void {
const isFiling = story.source === 'edgar';
// 1. Universe filter — the big one (§4.1)
const tickers = [...new Set(story.tickers.map((t) => t.toUpperCase()))].filter((t) =>
universe.has(t),
);
if (tickers.length === 0) {
stats.droppedNoUniverseTicker++;
return;
}
// 2. Noise blocklist (§4.2) — filings are never noise
if (!isFiling && NewsPipeline.isNoise(story.headline)) {
stats.droppedNoise++;
return;
}
// 3. Dedupe (§4.3): url hash (storage-level PK) + recent title match
const urlHash = NewsPipeline.sha(story.url);
const titleHash = NewsPipeline.sha(NewsPipeline.normalizeTitle(story.headline));
const titleCutoff = new Date(Date.now() - NewsPipeline.TITLE_WINDOW_MS).toISOString();
if (this.repo.titleSeenSince(titleHash, titleCutoff)) {
stats.droppedDuplicate++;
return;
}
// 4. Per-ticker daily cap (§4.4) — filings keep priority past the cap
const day = story.publishedAt.slice(0, 10);
const eligible = isFiling
? tickers
: tickers.filter((t) => this.repo.countTickerDay(t, day) < NewsPipeline.DAILY_CAP);
if (eligible.length === 0) {
stats.droppedCapped++;
return;
}
// 5. Classify + store
const catalyst = story.catalystHint ?? NewsPipeline.classify(story.headline);
const inserted = this.repo.insertArticle({
urlHash,
titleHash,
tickers: eligible,
headline: story.headline.trim(),
body: story.body ?? null,
source: story.source,
catalyst,
url: story.url,
publishedAt: story.publishedAt,
});
if (!inserted) {
stats.droppedDuplicate++; // url_hash collision — already stored
return;
}
for (const ticker of eligible) {
this.repo.linkTicker(ticker, day, urlHash);
}
stats.stored++;
}
/** Retention jobs (§5) — call once daily. */
runRetention(now = new Date()): { bodiesPurged: number; rowsDeleted: number } {
const bodyCutoff = new Date(now.getTime() - 90 * 24 * 60 * 60 * 1000).toISOString();
const rowCutoff = new Date(now.getTime() - 548 * 24 * 60 * 60 * 1000).toISOString(); // ~18mo
return {
bodiesPurged: this.repo.purgeBodiesBefore(bodyCutoff),
rowsDeleted: this.repo.deleteUnreferencedBefore(rowCutoff),
};
}
// ── Pure helpers (exposed for tests) ──────────────────────────────────────
static isNoise(headline: string): boolean {
return NewsPipeline.NOISE_PATTERNS.some((re) => re.test(headline));
}
/**
* Keyword catalyst classifier. Order matters: M&A beats earnings
* ("acquisition closes in Q2" is an M&A story).
*/
static classify(headline: string): CatalystType | null {
const h = headline.toLowerCase();
if (
/\b(acqui[sr]|merger|takeover|buyout|tender offer|business combination|to be acquired)/.test(
h,
)
)
return 'ma';
if (/\b(guidance|outlook|forecast|raises full[- ]year|lowers full[- ]year)/.test(h))
return 'guidance';
if (
/\b(earnings|results|eps|quarterly report|q[1-4] (?:20\d\d|results)|fiscal (?:year|q[1-4]))/.test(
h,
)
)
return 'earnings';
if (
/\b(sec |fda|doj|ftc|antitrust|investigation|subpoena|lawsuit|settl|recall|approval)/.test(h)
)
return 'regulatory';
if (/\b(fed |fomc|inflation|cpi|jobs report|rate (?:cut|hike)|treasury yield)/.test(h))
return 'macro';
return null;
}
static normalizeTitle(title: string): string {
return title
.toLowerCase()
.replace(/[^a-z0-9 ]/g, '')
.replace(/\s+/g, ' ')
.trim();
}
private static sha(input: string): string {
return createHash('sha256').update(input).digest('hex');
}
}
+76
View File
@@ -0,0 +1,76 @@
import { DatabaseConnection } from '../shared/db/index';
import { QueryBuilder } from '../shared/utils/QueryBuilder';
import type { NewsArticleRow } from '../shared/types';
/**
* Persistence for the free-tier news pipeline (FREE-DATA-STACK §3).
* Pure data access — all filtering/dedupe decisions live in NewsPipeline.
*/
export class NewsRepository {
constructor(private readonly db: DatabaseConnection) {}
/** Returns true if the row was inserted (false = duplicate url_hash). */
insertArticle(a: {
urlHash: string;
titleHash: string;
tickers: string[];
headline: string;
body: string | null;
source: string;
catalyst: string | null;
url: string;
publishedAt: string;
}): boolean {
const qb = new QueryBuilder('NEWS_QUERIES.INSERT_ARTICLE', [
a.urlHash,
a.titleHash,
JSON.stringify(a.tickers),
a.headline,
a.body,
a.source,
a.catalyst,
a.url,
a.publishedAt,
new Date().toISOString(),
]);
return this.db.run(qb) > 0;
}
titleSeenSince(titleHash: string, sinceIso: string): boolean {
const qb = new QueryBuilder('NEWS_QUERIES.TITLE_SEEN_SINCE', [titleHash, sinceIso]);
return this.db.get(qb) != null;
}
linkTicker(ticker: string, day: string, urlHash: string): void {
const qb = new QueryBuilder('NEWS_QUERIES.INSERT_CATALYST_LINK', [ticker, day, urlHash]);
this.db.run(qb);
}
countTickerDay(ticker: string, day: string): number {
const qb = new QueryBuilder('NEWS_QUERIES.COUNT_TICKER_DAY', [ticker, day]);
return this.db.get<{ n: number }>(qb)?.n ?? 0;
}
newsForTicker(ticker: string, sinceDay: string): NewsArticleRow[] {
const qb = new QueryBuilder('NEWS_QUERIES.SELECT_TICKER_NEWS', [
ticker.toUpperCase(),
sinceDay,
]);
return this.db.all<NewsArticleRow>(qb);
}
recent(limit: number): NewsArticleRow[] {
const qb = new QueryBuilder('NEWS_QUERIES.SELECT_RECENT', [limit]);
return this.db.all<NewsArticleRow>(qb);
}
/** Retention: null out bodies older than cutoff. Returns rows changed. */
purgeBodiesBefore(cutoffIso: string): number {
return this.db.run(new QueryBuilder('NEWS_QUERIES.PURGE_BODIES_BEFORE', [cutoffIso]));
}
/** Retention: delete old rows no ticker references. Returns rows deleted. */
deleteUnreferencedBefore(cutoffIso: string): number {
return this.db.run(new QueryBuilder('NEWS_QUERIES.DELETE_UNREFERENCED_BEFORE', [cutoffIso]));
}
}
+106
View File
@@ -0,0 +1,106 @@
import { NewsPipeline } from './NewsPipeline';
import { UniverseProvider } from './UniverseProvider';
import { EdgarPoller } from './pollers/EdgarPoller';
import { PrWirePoller } from './pollers/PrWirePoller';
import type { IngestStats, Logger } from '../shared/types';
/**
* In-process polling scheduler (FREE-DATA-STACK §2). No Redis/BullMQ at the
* free tier — plain intervals, unref'd so they never hold the process open.
*
* Cadences: EDGAR 10 min, PR-wire 15 min, retention daily.
* Disable entirely with NEWS_POLL=off (e.g. when running bin/poll-news.ts
* from cron instead of inside the server).
*/
export class NewsScheduler {
private static readonly EDGAR_INTERVAL_MS = 10 * 60 * 1000;
private static readonly PRWIRE_INTERVAL_MS = 15 * 60 * 1000;
private static readonly RETENTION_INTERVAL_MS = 24 * 60 * 60 * 1000;
private timers: NodeJS.Timeout[] = [];
constructor(
private readonly pipeline: NewsPipeline,
private readonly universe: UniverseProvider,
private readonly edgar: EdgarPoller,
private readonly prwire: PrWirePoller,
private readonly logger: Logger,
) {}
start(): void {
if (this.timers.length > 0) return; // already running
const every = (ms: number, fn: () => void) => {
const t = setInterval(fn, ms);
t.unref(); // never keep the process alive just for polling
this.timers.push(t);
};
every(NewsScheduler.EDGAR_INTERVAL_MS, () => void this.runEdgar());
every(NewsScheduler.PRWIRE_INTERVAL_MS, () => void this.runPrWire());
every(NewsScheduler.RETENTION_INTERVAL_MS, () => this.runRetention());
// Prime once shortly after boot (delay keeps server startup fast)
const boot = setTimeout(() => void this.runOnce(), 15_000);
boot.unref();
this.timers.push(boot);
this.logger.log('News scheduler started (EDGAR 10m, PR-wire 15m, retention 24h)');
}
stop(): void {
for (const t of this.timers) clearInterval(t);
this.timers = [];
}
/** One full cycle of everything — used at boot and by bin/poll-news.ts. */
async runOnce(): Promise<{ edgar: IngestStats; prwire: IngestStats }> {
const edgar = await this.runEdgar();
const prwire = await this.runPrWire();
return { edgar, prwire };
}
private async runEdgar(): Promise<IngestStats> {
try {
const stories = await this.edgar.poll(this.universe.getUniverse());
const stats = this.pipeline.ingest(stories, this.universe.getUniverse());
if (stats.stored > 0) this.logger.log(`EDGAR: stored ${stats.stored}/${stats.fetched}`);
return stats;
} catch (err) {
this.logger.warn('EDGAR poll cycle failed:', (err as Error).message);
return NewsScheduler.emptyStats();
}
}
private async runPrWire(): Promise<IngestStats> {
try {
const stories = await this.prwire.poll();
const stats = this.pipeline.ingest(stories, this.universe.getUniverse());
if (stats.stored > 0) this.logger.log(`PR-wire: stored ${stats.stored}/${stats.fetched}`);
return stats;
} catch (err) {
this.logger.warn('PR-wire poll cycle failed:', (err as Error).message);
return NewsScheduler.emptyStats();
}
}
private runRetention(): void {
try {
const { bodiesPurged, rowsDeleted } = this.pipeline.runRetention();
this.logger.log(`News retention: ${bodiesPurged} bodies purged, ${rowsDeleted} rows deleted`);
} catch (err) {
this.logger.warn('News retention failed:', (err as Error).message);
}
}
private static emptyStats(): IngestStats {
return {
fetched: 0,
stored: 0,
droppedNoUniverseTicker: 0,
droppedNoise: 0,
droppedDuplicate: 0,
droppedCapped: 0,
};
}
}
+50
View File
@@ -0,0 +1,50 @@
import { DatabaseConnection } from '../shared/db/index';
import { QueryBuilder } from '../shared/utils/QueryBuilder';
/**
* The tracked-ticker universe (FREE-DATA-STACK §4.1):
* watchlist holdings tickers screened in the last 30 days.
*
* This is the news pipeline's first and biggest filter — stories about
* tickers outside the universe are never stored. Cached for 10 minutes;
* the universe changes slowly.
*/
export class UniverseProvider {
private static readonly CACHE_TTL_MS = 10 * 60 * 1000;
private static readonly SNAPSHOT_LOOKBACK_DAYS = 30;
private cache: { universe: Set<string>; expiresAt: number } = {
universe: new Set(),
expiresAt: 0,
};
constructor(private readonly db: DatabaseConnection) {}
getUniverse(): Set<string> {
if (Date.now() < this.cache.expiresAt) return this.cache.universe;
const sinceDay = new Date(
Date.now() - UniverseProvider.SNAPSHOT_LOOKBACK_DAYS * 24 * 60 * 60 * 1000,
)
.toISOString()
.slice(0, 10);
const tickers = new Set<string>();
const add = (rows: { ticker: string }[]) =>
rows.forEach((r) => tickers.add(r.ticker.toUpperCase()));
add(this.db.all(new QueryBuilder('UNIVERSE_QUERIES.DISTINCT_WATCHLIST_TICKERS')));
add(this.db.all(new QueryBuilder('UNIVERSE_QUERIES.DISTINCT_HOLDING_TICKERS')));
add(
this.db.all(new QueryBuilder('UNIVERSE_QUERIES.DISTINCT_SNAPSHOT_TICKERS_SINCE', [sinceDay])),
);
this.cache = { universe: tickers, expiresAt: Date.now() + UniverseProvider.CACHE_TTL_MS };
return tickers;
}
/** Force next getUniverse() to re-read (e.g. after a watchlist change). */
invalidate(): void {
this.cache.expiresAt = 0;
}
}
+10
View File
@@ -0,0 +1,10 @@
// News domain — free-tier news ingestion pipeline (FREE-DATA-STACK.md)
export { NewsController } from './news.controller';
export { NewsRepository } from './NewsRepository';
export { NewsPipeline } from './NewsPipeline';
export { UniverseProvider } from './UniverseProvider';
export { NewsScheduler } from './NewsScheduler';
export { EdgarPoller } from './pollers/EdgarPoller';
export { PrWirePoller } from './pollers/PrWirePoller';
export { RssParser } from './rss';
+90
View File
@@ -0,0 +1,90 @@
import type { FastifyInstance, FastifyRequest } from 'fastify';
import { NewsRepository } from './NewsRepository';
import { YahooFinanceClient } from '../shared';
import type { NewsArticleRow } from '../shared/types';
interface StoryView {
headline: string;
tickers: string[];
source: string;
catalyst: string | null;
url: string;
publishedAt: string;
}
/**
* Read side of the news pipeline. Stored pipeline stories (curated, catalyst-
* tagged, historical) are merged with a live per-ticker Yahoo search on
* request — stored gives depth, live gives freshness. The RSS firehoses
* can't be queried per-ticker on demand, which is why they go through the
* polling pipeline instead.
*/
export class NewsController {
constructor(
private readonly repo: NewsRepository,
private readonly yahoo?: YahooFinanceClient,
) {}
register(app: FastifyInstance): void {
app.get('/api/news/recent', this.recent.bind(this));
app.get('/api/news/:ticker', this.byTicker.bind(this));
}
/** GET /api/news/:ticker?days=7&live=1 (live Yahoo merge on by default) */
private async byTicker(req: FastifyRequest) {
const ticker = (req.params as { ticker: string }).ticker.toUpperCase();
const query = req.query as { days?: string; live?: string };
const days = Math.min(Number(query.days ?? 7) || 7, 90);
const live = query.live !== '0';
const sinceDay = new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString().slice(0, 10);
const stored = this.repo.newsForTicker(ticker, sinceDay).map(NewsController.serialize);
const fresh = live ? await this.fetchLive(ticker) : [];
// Merge, dedupe by URL, newest first
const byUrl = new Map<string, StoryView>();
for (const s of [...stored, ...fresh]) byUrl.set(s.url, byUrl.get(s.url) ?? s);
const stories = [...byUrl.values()].sort((a, b) => b.publishedAt.localeCompare(a.publishedAt));
return { ticker, days, stories };
}
/** Live per-ticker Yahoo news search — freshness layer, best-effort. */
private async fetchLive(ticker: string): Promise<StoryView[]> {
if (!this.yahoo) return [];
try {
const items = await this.yahoo.search(ticker, { newsCount: 8 });
return items
.filter((n) => n.title && n.link)
.map((n) => ({
headline: n.title as string,
tickers: [ticker],
source: 'yahoo',
catalyst: null,
url: n.link as string,
publishedAt: n.providerPublishTime
? new Date(n.providerPublishTime).toISOString()
: new Date().toISOString(),
}));
} catch {
return [];
}
}
/** GET /api/news/recent?limit=50 */
private async recent(req: FastifyRequest) {
const limit = Math.min(Number((req.query as { limit?: string }).limit ?? 50) || 50, 200);
return { stories: this.repo.recent(limit).map(NewsController.serialize) };
}
private static serialize(row: NewsArticleRow) {
return {
headline: row.headline,
tickers: JSON.parse(row.ticker_list) as string[],
source: row.source,
catalyst: row.catalyst,
url: row.url,
publishedAt: row.published_at,
};
}
}
+122
View File
@@ -0,0 +1,122 @@
import { RssParser } from '../rss';
import type { CatalystType, Logger, NormalizedStory } from '../../shared/types';
/**
* SEC EDGAR poller (FREE-DATA-STACK §1.3 / P1.2 Tier 2). Free forever, and
* the highest-value source: filings frequently precede the headline.
*
* Strategy: poll the site-wide "current filings" atom feed once per form
* type (4 requests/cycle total, well inside SEC fair use), map filer CIK →
* ticker via the daily-cached company_tickers.json, and emit stories only
* for universe tickers. The pipeline applies its own universe filter again —
* defense in depth.
*
* SEC requires a descriptive User-Agent with contact info: set
* EDGAR_USER_AGENT in .env (e.g. "market-screener/1.0 you@example.com").
*/
export class EdgarPoller {
private static readonly TICKER_MAP_URL = 'https://www.sec.gov/files/company_tickers.json';
private static readonly TICKER_MAP_TTL_MS = 24 * 60 * 60 * 1000;
/** form type → catalyst classification (overrides keyword classify). */
private static readonly FORMS: Array<{ form: string; catalyst: CatalystType }> = [
{ form: '8-K', catalyst: 'regulatory' }, // material events
{ form: 'SC 13D', catalyst: 'ma' }, // activist stake >5% — classic pre-M&A tell
{ form: 'S-4', catalyst: 'ma' }, // merger registration
{ form: 'DEFM14A', catalyst: 'ma' }, // merger proxy
];
private cikToTicker: Map<string, string> = new Map();
private mapExpiresAt = 0;
constructor(
private readonly logger: Logger,
private readonly userAgent = process.env.EDGAR_USER_AGENT ??
'market-screener/1.0 (set EDGAR_USER_AGENT in .env)',
) {}
/** Fetch all form feeds and return normalized stories for universe tickers. */
async poll(universe: Set<string>): Promise<NormalizedStory[]> {
if (universe.size === 0) return [];
await this.refreshTickerMap();
const stories: NormalizedStory[] = [];
for (const { form, catalyst } of EdgarPoller.FORMS) {
try {
const xml = await this.fetchText(EdgarPoller.feedUrl(form));
stories.push(...this.parseFeed(xml, form, catalyst, universe));
} catch (err) {
this.logger.warn(`EDGAR ${form} feed failed:`, (err as Error).message);
}
}
return stories;
}
/** Parse one atom feed. Public for fixture tests. */
parseFeed(
xml: string,
form: string,
catalyst: CatalystType,
universe: Set<string>,
): NormalizedStory[] {
const stories: NormalizedStory[] = [];
for (const entry of RssParser.blocks(xml, 'entry')) {
const title = RssParser.tag(entry, 'title') ?? '';
const updated = RssParser.tag(entry, 'updated');
const url = RssParser.link(entry);
if (!title || !url || !updated) continue;
// Title format: "8-K - APPLE INC (0000320193) (Filer)"
const cikMatch = title.match(/\((\d{10})\)/);
if (!cikMatch) continue;
const ticker = this.cikToTicker.get(cikMatch[1]);
if (!ticker || !universe.has(ticker)) continue;
const company = title
.replace(/^[^-]+-\s*/, '')
.replace(/\(\d{10}\)/g, '')
.replace(/\((Filer|Subject|Reporting)\)/gi, '')
.trim();
stories.push({
tickers: [ticker],
headline: `${form} filing: ${company}`,
body: null,
source: 'edgar',
url,
publishedAt: new Date(updated).toISOString(),
catalystHint: catalyst,
});
}
return stories;
}
/** Inject a CIK→ticker map directly (tests). CIKs are 10-digit zero-padded. */
setTickerMap(map: Map<string, string>): void {
this.cikToTicker = map;
this.mapExpiresAt = Date.now() + EdgarPoller.TICKER_MAP_TTL_MS;
}
private async refreshTickerMap(): Promise<void> {
if (Date.now() < this.mapExpiresAt && this.cikToTicker.size > 0) return;
const raw = await this.fetchText(EdgarPoller.TICKER_MAP_URL);
const data = JSON.parse(raw) as Record<string, { cik_str: number; ticker: string }>;
const map = new Map<string, string>();
for (const entry of Object.values(data)) {
map.set(String(entry.cik_str).padStart(10, '0'), entry.ticker.toUpperCase());
}
this.setTickerMap(map);
this.logger.log(`EDGAR ticker map refreshed: ${map.size} companies`);
}
private static feedUrl(form: string): string {
const type = encodeURIComponent(form);
return `https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=${type}&company=&dateb=&owner=include&count=100&output=atom`;
}
private async fetchText(url: string): Promise<string> {
const res = await fetch(url, { headers: { 'User-Agent': this.userAgent } });
if (!res.ok) throw new Error(`HTTP ${res.status} for ${url}`);
return res.text();
}
}
@@ -0,0 +1,91 @@
import { RssParser } from '../rss';
import type { Logger, NormalizedStory } from '../../shared/types';
/**
* PR-wire RSS poller (FREE-DATA-STACK §1.4 / P1.2 Tier 3) — press releases
* that the other free feeds miss, mostly small-caps.
*
* Ticker extraction relies on the wire convention of exchange tags in the
* text: "(NYSE: ABC)", "(Nasdaq: XYZ)". Stories without an exchange tag
* produce no tickers and are dropped by the pipeline's universe filter —
* that's intentional; untagged wire stories are rarely decision-grade.
*
* Feed list is overridable: NEWS_PRWIRE_FEEDS="url1,url2" in .env
* (wire RSS URLs change occasionally — if a feed 404s, update the env var).
*/
export class PrWirePoller {
private static readonly DEFAULT_FEEDS = [
// GlobeNewswire — public-company news
'https://www.globenewswire.com/RssFeed/orgclass/1/feedTitle/GlobeNewswire%20-%20News%20about%20Public%20Companies',
// PR Newswire — all news releases
'https://www.prnewswire.com/rss/news-releases-list.rss',
];
private static readonly EXCHANGE_TAG =
/\((?:NYSE(?:\s+American)?|NASDAQ|Nasdaq|AMEX|CBOE|OTC(?:QB|QX|MKTS)?)\s*:\s*([A-Za-z][A-Za-z.]{0,5})\)/g;
private readonly feeds: string[];
constructor(
private readonly logger: Logger,
feeds?: string[],
) {
const env = process.env.NEWS_PRWIRE_FEEDS;
this.feeds = feeds ?? (env ? env.split(',').map((s) => s.trim()) : PrWirePoller.DEFAULT_FEEDS);
}
async poll(): Promise<NormalizedStory[]> {
const stories: NormalizedStory[] = [];
for (const feed of this.feeds) {
try {
const xml = await this.fetchText(feed);
stories.push(...PrWirePoller.parseFeed(xml));
} catch (err) {
this.logger.warn(`PR-wire feed failed (${feed}):`, (err as Error).message);
}
}
return stories;
}
/** Parse one RSS feed. Public static for fixture tests. */
static parseFeed(xml: string): NormalizedStory[] {
const stories: NormalizedStory[] = [];
for (const item of RssParser.blocks(xml, 'item')) {
const title = RssParser.tag(item, 'title');
const url = RssParser.link(item);
const pubDate = RssParser.tag(item, 'pubDate');
if (!title || !url) continue;
const description = RssParser.tag(item, 'description') ?? '';
const tickers = PrWirePoller.extractTickers(`${title} ${description}`);
if (tickers.length === 0) continue; // no exchange tag → skip early
stories.push({
tickers,
headline: title,
body: description || null,
source: 'prwire',
url,
publishedAt: pubDate ? new Date(pubDate).toISOString() : new Date().toISOString(),
});
}
return stories;
}
/** "(NYSE: ABC)" / "(Nasdaq: XYZ)" → ['ABC', 'XYZ']. Public for tests. */
static extractTickers(text: string): string[] {
const out = new Set<string>();
for (const m of text.matchAll(PrWirePoller.EXCHANGE_TAG)) {
out.add(m[1].toUpperCase());
}
return [...out];
}
private async fetchText(url: string): Promise<string> {
const res = await fetch(url, {
headers: { 'User-Agent': 'market-screener/1.0 (+rss reader)' },
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.text();
}
}
+43
View File
@@ -0,0 +1,43 @@
/**
* Minimal RSS/Atom extraction — enough for EDGAR atom feeds and PR-wire RSS.
* Deliberately dependency-free; if a feed outgrows this, swap in
* fast-xml-parser without touching the pollers' output shape.
*/
export class RssParser {
/** Extract raw <item>…</item> or <entry>…</entry> blocks. */
static blocks(xml: string, tag: 'item' | 'entry'): string[] {
const re = new RegExp(`<${tag}[\\s>][\\s\\S]*?<\\/${tag}>`, 'g');
return xml.match(re) ?? [];
}
/** First occurrence of a simple tag's text content, entity-decoded. */
static tag(block: string, name: string): string | null {
const re = new RegExp(`<${name}[^>]*>([\\s\\S]*?)<\\/${name}>`, 'i');
const m = block.match(re);
return m ? RssParser.clean(m[1]) : null;
}
/** Atom-style <link href="…"/> (self-closing) or RSS <link>…</link>. */
static link(block: string): string | null {
const href = block.match(/<link[^>]*href="([^"]+)"/i);
if (href) return RssParser.decode(href[1].trim());
return RssParser.tag(block, 'link');
}
private static clean(raw: string): string {
const noCdata = raw.replace(/<!\[CDATA\[([\s\S]*?)\]\]>/g, '$1');
const noTags = noCdata.replace(/<[^>]+>/g, ' ');
return RssParser.decode(noTags).replace(/\s+/g, ' ').trim();
}
private static decode(s: string): string {
return s
.replace(/&amp;/g, '&')
.replace(/&lt;/g, '<')
.replace(/&gt;/g, '>')
.replace(/&quot;/g, '"')
.replace(/&#0?39;/g, "'")
.replace(/&apos;/g, "'")
.replace(/&#(\d+);/g, (_, n) => String.fromCharCode(Number(n)));
}
}
+213 -1
View File
@@ -1,15 +1,42 @@
import type { FastifyInstance, FastifyRequest } from 'fastify';
import { ScreenerEngine } from './ScreenerEngine';
import { CatalystCache, SignalSnapshotRepository } from '../../domains/shared';
import { CatalystCache, SignalSnapshotRepository, YahooFinanceClient } from '../../domains/shared';
import type { DataHealth, LiveAssetResult, ScreenerResult } from '../../domains/shared';
import type { NewsRepository } from '../news/NewsRepository';
import { screenSchema } from '../../domains/shared/types/schemas';
export class ScreenerController {
/** Company profiles change rarely — cache for an hour. */
private static readonly PROFILE_TTL_MS = 60 * 60 * 1000;
private profileCache = new Map<string, { data: unknown; expiresAt: number }>();
/** Sector pulse — SPDR sector ETFs as the standard proxy, cached 15 min. */
private static readonly SECTOR_TTL_MS = 15 * 60 * 1000;
private static readonly SECTOR_ETFS: Array<{ etf: string; sector: string; name: string }> = [
{ etf: 'XLK', sector: 'TECHNOLOGY', name: 'Technology' },
{ etf: 'XLF', sector: 'FINANCIAL', name: 'Financials' },
{ etf: 'XLE', sector: 'ENERGY', name: 'Energy' },
{ etf: 'XLV', sector: 'HEALTHCARE', name: 'Healthcare' },
{ etf: 'XLC', sector: 'COMMUNICATION', name: 'Communication' },
{ etf: 'XLP', sector: 'CONSUMER_STAPLES', name: 'Staples' },
{ etf: 'XLY', sector: 'CONSUMER_DISCRETIONARY', name: 'Discretionary' },
{ etf: 'XLRE', sector: 'REIT', name: 'Real Estate' },
{ etf: 'XLI', sector: 'GENERAL', name: 'Industrials' },
{ etf: 'XLU', sector: 'GENERAL', name: 'Utilities' },
];
private sectorCache: { data: unknown; expiresAt: number } | null = null;
/** Sector drill-down (holdings + screen + news) — cached 30 min per sector. */
private static readonly SECTOR_DETAIL_TTL_MS = 30 * 60 * 1000;
private sectorDetailCache = new Map<string, { data: unknown; expiresAt: number }>();
constructor(
private readonly engine: ScreenerEngine,
private readonly catalystCache: CatalystCache,
// Optional so tests and minimal setups work without a database.
private readonly snapshots?: SignalSnapshotRepository,
private readonly yahoo?: YahooFinanceClient,
private readonly news?: NewsRepository,
) {}
register(app: FastifyInstance): void {
@@ -24,6 +51,161 @@ export class ScreenerController {
this.catalysts.bind(this),
);
app.get('/api/screen/history/:ticker', this.history.bind(this));
app.get('/api/screen/profile/:ticker', this.profile.bind(this));
app.get('/api/screen/chart/:ticker', this.chart.bind(this));
app.get('/api/screen/sectors', this.sectors.bind(this));
app.get('/api/screen/sector/:sector', this.sectorDetail.bind(this));
}
/**
* Sector drill-down: the sector ETF's top 10 holdings, freshly screened
* (signal + advice-ready rows), plus recent news for those tickers and
* macro stories — "what's in this sector and why is it moving".
*/
private async sectorDetail(req: FastifyRequest) {
const sector = (req.params as { sector: string }).sector.toUpperCase();
const entry = ScreenerController.SECTOR_ETFS.find((s) => s.sector === sector);
if (!entry || !this.yahoo) return { sector, etf: null, stocks: [], news: [] };
const cached = this.sectorDetailCache.get(sector);
if (cached && Date.now() < cached.expiresAt) return cached.data;
const holdings = await this.yahoo.fetchTopHoldings(entry.etf, 10);
const results = holdings.length > 0 ? await this.engine.screenTickers(holdings) : null;
const stocks = results
? ScreenerController.serializeAssets(results.STOCK as LiveAssetResult[])
: [];
// News: stored stories for these tickers (last 3 days), deduped by URL
const newsSince = new Date(Date.now() - 3 * 24 * 60 * 60 * 1000).toISOString().slice(0, 10);
const byUrl = new Map<string, unknown>();
if (this.news) {
for (const ticker of holdings) {
for (const row of this.news.newsForTicker(ticker, newsSince)) {
byUrl.set(row.url, {
headline: row.headline,
tickers: JSON.parse(row.ticker_list),
source: row.source,
catalyst: row.catalyst,
url: row.url,
publishedAt: row.published_at,
});
}
}
}
const data = {
sector,
etf: entry.etf,
name: entry.name,
stocks,
news: [...byUrl.values()],
};
this.sectorDetailCache.set(sector, {
data,
expiresAt: Date.now() + ScreenerController.SECTOR_DETAIL_TTL_MS,
});
return data;
}
/**
* Sector pulse — today's % change per sector via SPDR sector ETFs (the
* standard proxy). Returns sectors sorted best→worst plus the leader.
*/
private async sectors() {
if (this.sectorCache && Date.now() < this.sectorCache.expiresAt) {
return this.sectorCache.data;
}
if (!this.yahoo) return { asOf: null, leader: null, sectors: [] };
const results = await Promise.all(
ScreenerController.SECTOR_ETFS.map(async ({ etf, sector, name }) => {
try {
const summary = await this.yahoo!.fetchSummary(etf);
const pr = summary?.price ?? {};
const price = pr.regularMarketPrice ?? null;
const prev = pr.regularMarketPreviousClose ?? null;
const changePct =
price != null && prev != null && prev > 0
? +(((price - prev) / prev) * 100).toFixed(2)
: null;
return { etf, sector, name, changePct };
} catch {
return { etf, sector, name, changePct: null };
}
}),
);
const sectors = results
.filter((s) => s.changePct != null)
.sort((a, b) => (b.changePct as number) - (a.changePct as number));
const data = {
asOf: new Date().toISOString(),
leader: sectors[0] ?? null,
sectors,
};
this.sectorCache = { data, expiresAt: Date.now() + ScreenerController.SECTOR_TTL_MS };
return data;
}
/** Company profile for the ticker modal — name, description, sector. */
private async profile(req: FastifyRequest) {
const ticker = (req.params as { ticker: string }).ticker.toUpperCase();
if (!this.yahoo) return { ticker, profile: null };
const cached = this.profileCache.get(ticker);
if (cached && Date.now() < cached.expiresAt) return cached.data;
try {
const summary = await this.yahoo.fetchSummary(ticker);
const ap = summary?.assetProfile ?? {};
const pr = summary?.price ?? {};
const fd = summary?.financialData ?? {};
const price = pr.regularMarketPrice ?? null;
const targetMean = fd.targetMeanPrice ?? null;
const data = {
ticker,
profile: {
name: pr.longName ?? pr.shortName ?? ticker,
summary: ap.longBusinessSummary ?? null,
sector: ap.sector ?? null,
industry: ap.industry ?? null,
website: ap.website ?? null,
employees: ap.fullTimeEmployees ?? null,
marketCap: pr.marketCap ?? null,
currentPrice: price,
// Analyst price targets (Yahoo sell-side consensus)
targets: {
mean: targetMean,
high: fd.targetHighPrice ?? null,
low: fd.targetLowPrice ?? null,
analysts: fd.numberOfAnalystOpinions ?? null,
recommendationMean: fd.recommendationMean ?? null, // 1=Strong Buy … 5=Strong Sell
upsidePct:
targetMean != null && price != null && price > 0
? +(((targetMean - price) / price) * 100).toFixed(1)
: null,
},
},
};
this.profileCache.set(ticker, {
data,
expiresAt: Date.now() + ScreenerController.PROFILE_TTL_MS,
});
return data;
} catch {
return { ticker, profile: null };
}
}
/** Closes for the ticker modal chart. ?range=1d|5d|1mo|3mo|6mo|1y. */
private async chart(req: FastifyRequest) {
const ticker = (req.params as { ticker: string }).ticker.toUpperCase();
const raw = (req.query as { range?: string }).range ?? '6mo';
const range = raw in YahooFinanceClient.CHART_RANGES ? raw : '6mo';
if (!this.yahoo) return { ticker, range, points: [] };
return { ticker, range, points: await this.yahoo.fetchCloses(ticker, range) };
}
/** Signal snapshot history for one ticker (P0.1 ledger read side). */
@@ -65,6 +247,7 @@ export class ScreenerController {
const tickers = (req.body as { tickers: string[] }).tickers.map((t) => t.toUpperCase());
const results = await this.engine.screenTickers(tickers);
this.recordSnapshots(results, req);
this.flagTurnarounds(results);
const dataHealth = ScreenerController.assessDataHealth(results);
if (dataHealth.degraded) {
req.log?.warn?.({ dataHealth }, 'screen batch returned degraded fundamentals data');
@@ -78,6 +261,35 @@ export class ScreenerController {
};
}
/**
* Turnaround-watch (candidate flag, NOT a prediction): the stock's style is
* already Turnaround (earnings down, revenue holding) AND its fundamental
* score improved vs the previous snapshot in the ledger. Both legs must
* hold — style alone is static, improvement alone is noise.
*/
private flagTurnarounds(results: ScreenerResult): void {
if (!this.snapshots) return;
for (const row of results.STOCK as LiveAssetResult[]) {
const metrics = row.asset.metrics as { growthCategory?: string };
if (metrics?.growthCategory !== 'Turnaround') continue;
if (row.fundamental.tier === 'REJECT' || row.fundamental.score == null) continue;
try {
// History includes today's snapshot (recorded just above) — compare
// today's score against the most recent prior day with a score.
const history = this.snapshots.history(row.asset.ticker);
const prior = [...history]
.reverse()
.find((h) => h.snapshot_date < history[history.length - 1]?.snapshot_date);
if (prior?.fundamental_score != null && row.fundamental.score > prior.fundamental_score) {
row.turnaroundWatch = true;
}
} catch {
// best-effort — never fail the screen for a highlight
}
}
}
/**
* P0.4 data-sanity sentinel — if a large share of screened stocks come back
* with null core fundamentals (P/E, ROE), the upstream source has likely
@@ -40,6 +40,13 @@ export class DataMapper {
const currentPrice = pr.regularMarketPrice ?? 0;
const sharesOutstanding = ks.sharesOutstanding ?? 0;
// Today's % change — powers the sector drill-down "Today" sort
const prevClose = pr.regularMarketPreviousClose ?? null;
const dayChangePct =
prevClose != null && prevClose > 0 && (currentPrice as number) > 0
? +((((currentPrice as number) - prevClose) / prevClose) * 100).toFixed(2)
: null;
const operatingCashflow = fd.operatingCashflow ?? 0;
const freeCashflow = fd.freeCashflow ?? 0;
@@ -131,6 +138,7 @@ export class DataMapper {
? (sd.trailingAnnualDividendYield as number) * 100
: null,
beta: sd.beta ?? null,
dayChangePct,
week52High,
week52Low,
week52Change,
@@ -1,5 +1,5 @@
import YahooFinance from 'yahoo-finance2';
import type { YahooNewsItem, YahooSearchOptions, YahooFinanceLib } from '../types';
import type { YahooNewsItem, YahooSearchOptions, YahooFinanceLib, PricePoint } from '../types';
import { YAHOO_MODULES } from '../config/constants';
export class YahooFinanceClient {
@@ -49,4 +49,71 @@ export class YahooFinanceClient {
const { news = [] } = await this.lib.search(query, opts);
return news;
}
/**
* Top holdings of an ETF (ticker symbols, largest weight first).
* Used for sector drill-down. Returns [] on any failure.
*/
async fetchTopHoldings(etf: string, limit = 10): Promise<string[]> {
try {
const result = await this.lib.quoteSummary(
YahooFinanceClient.normalise(etf),
{ modules: ['topHoldings'] },
{ validateResult: false },
);
const holdings = (result?.topHoldings?.holdings ?? []) as Array<{ symbol?: string }>;
return holdings
.map((h) => h.symbol)
.filter((s): s is string => Boolean(s))
.slice(0, limit)
.map((s) => s.toUpperCase());
} catch {
return [];
}
}
/** Chart range presets — Robinhood/Yahoo-style. Intraday for short ranges. */
static readonly CHART_RANGES: Record<string, { days: number; interval: string }> = {
'1d': { days: 1, interval: '5m' },
'5d': { days: 5, interval: '30m' },
'1mo': { days: 30, interval: '1d' },
'3mo': { days: 91, interval: '1d' },
'6mo': { days: 182, interval: '1d' },
ytd: { days: 0, interval: '1d' }, // days computed dynamically (Jan 1 → now)
'1y': { days: 365, interval: '1d' },
'5y': { days: 1826, interval: '1wk' }, // weekly bars keep ~260 points
};
/**
* Closing prices for a named range (ticker modal chart). Intraday ranges
* keep the full timestamp; daily ranges keep the date only.
* Returns [] on any failure — the chart is a nice-to-have, never a blocker.
*/
async fetchCloses(ticker: string, range = '6mo'): Promise<PricePoint[]> {
const preset = YahooFinanceClient.CHART_RANGES[range] ?? YahooFinanceClient.CHART_RANGES['6mo'];
try {
const period1 =
range === 'ytd'
? new Date(Date.UTC(new Date().getUTCFullYear(), 0, 1))
: new Date(Date.now() - preset.days * 24 * 60 * 60 * 1000);
const result = await this.lib.chart(
YahooFinanceClient.normalise(ticker),
{ period1, interval: preset.interval },
{ validateResult: false },
);
const quotes = (result?.quotes ?? []) as Array<{ date?: string | Date; close?: number }>;
const intraday = preset.interval !== '1d';
return quotes
.filter((q) => q.close != null && q.date != null)
.map((q) => {
const iso = new Date(q.date as string | Date).toISOString();
return {
date: intraday ? iso : iso.slice(0, 10),
close: +(q.close as number).toFixed(2),
};
});
} catch {
return [];
}
}
}
@@ -163,6 +163,68 @@ export const UNIVERSE_QUERIES = {
WHERE type != 'crypto'
ORDER BY ticker
`,
// Every ticker screened recently (snapshot ledger) — part of the news universe
DISTINCT_SNAPSHOT_TICKERS_SINCE: `
SELECT DISTINCT ticker FROM signal_snapshots
WHERE snapshot_date >= ?
ORDER BY ticker
`,
};
// ── News Queries (FREE-DATA-STACK §25 — free-tier news pipeline) ───────────
export const NEWS_QUERIES = {
// INSERT OR IGNORE — url_hash PK is the first dedupe line (returns 0 changes on dup)
INSERT_ARTICLE: `
INSERT OR IGNORE INTO news_articles
(url_hash, title_hash, ticker_list, headline, body, source, catalyst, url, published_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
// Second dedupe line: same (normalized) title seen recently → syndicated copy
TITLE_SEEN_SINCE: `
SELECT 1 FROM news_articles
WHERE title_hash = ? AND published_at >= ?
LIMIT 1
`,
INSERT_CATALYST_LINK: `
INSERT OR IGNORE INTO ticker_catalysts (ticker, day, url_hash)
VALUES (?, ?, ?)
`,
// Per-ticker daily cap check (FREE-DATA-STACK §4.4)
COUNT_TICKER_DAY: `
SELECT COUNT(*) AS n FROM ticker_catalysts
WHERE ticker = ? AND day = ?
`,
// Stories for one ticker since a given day — what the UI reads (never Yahoo live)
SELECT_TICKER_NEWS: `
SELECT a.* FROM ticker_catalysts c
JOIN news_articles a ON a.url_hash = c.url_hash
WHERE c.ticker = ? AND c.day >= ?
ORDER BY a.published_at DESC
`,
SELECT_RECENT: `
SELECT * FROM news_articles
ORDER BY published_at DESC
LIMIT ?
`,
// Retention (FREE-DATA-STACK §5): purge bodies after 90d, drop unreferenced after 18mo
PURGE_BODIES_BEFORE: `
UPDATE news_articles SET body = NULL
WHERE body IS NOT NULL AND published_at < ?
`,
DELETE_UNREFERENCED_BEFORE: `
DELETE FROM news_articles
WHERE published_at < ?
AND url_hash NOT IN (SELECT url_hash FROM ticker_catalysts)
`,
};
// ── Signal Snapshot Queries (P0.1 — signal track record) ────────────────────
@@ -287,6 +349,31 @@ export const DDL = `
CREATE INDEX IF NOT EXISTS idx_snapshots_date ON signal_snapshots(snapshot_date);
CREATE INDEX IF NOT EXISTS idx_snapshots_signal ON signal_snapshots(signal, snapshot_date);
CREATE TABLE IF NOT EXISTS news_articles (
url_hash TEXT PRIMARY KEY, -- sha256(url)
title_hash TEXT NOT NULL, -- sha256(normalized headline) — syndication dedupe
ticker_list TEXT NOT NULL, -- JSON array of matched universe tickers
headline TEXT NOT NULL,
body TEXT, -- nullable; purged after 90 days (retention job)
source TEXT NOT NULL, -- 'edgar' | 'prwire' | 'yahoo'
catalyst TEXT, -- 'earnings'|'ma'|'guidance'|'regulatory'|'macro'|NULL
url TEXT NOT NULL,
published_at TEXT NOT NULL, -- ISO timestamp
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_news_published ON news_articles(published_at DESC);
CREATE INDEX IF NOT EXISTS idx_news_title ON news_articles(title_hash, published_at);
CREATE TABLE IF NOT EXISTS ticker_catalysts (
ticker TEXT NOT NULL,
day TEXT NOT NULL, -- YYYY-MM-DD (published date)
url_hash TEXT NOT NULL REFERENCES news_articles(url_hash),
PRIMARY KEY (ticker, day, url_hash)
);
CREATE INDEX IF NOT EXISTS idx_catalysts_ticker ON ticker_catalysts(ticker, day DESC);
`;
// ── Runtime migrations (ALTER TABLE for existing DBs) ────────────────────────
+3 -1
View File
@@ -34,6 +34,7 @@ export class Stock extends Asset {
pFFO: data.pFFO ?? null,
dividendYield: data.dividendYield ?? null,
beta: data.beta ?? null,
dayChangePct: data.dayChangePct ?? null,
week52High: data.week52High ?? null,
week52Low: data.week52Low ?? null,
week52Change: data.week52Change ?? null,
@@ -192,7 +193,8 @@ export class Stock extends Asset {
if (m.quickRatio != null) display['Quick'] = fmt(m.quickRatio, 2);
if (m.beta != null) display['Beta'] = fmt(m.beta, 2);
// 52-week movement
// Movement
if (m.dayChangePct != null) display['Day %'] = fmtSign(m.dayChangePct, '%');
if (w52pos != null) display['52W Pos'] = w52pos;
if (m.week52Change != null) display['52W Chg'] = fmtSign(m.week52Change, '%');
if (m.week52FromHigh != null) display['From High'] = fmtSign(m.week52FromHigh, '%');
@@ -85,6 +85,12 @@ export interface AssetResult {
signal: Signal;
inflated: ScoreResult;
fundamental: ScoreResult;
/**
* Turnaround-watch highlight: style is Turnaround AND the fundamental
* score improved vs the previous snapshot. A candidate flag, not a
* prediction — set by the screener controller, absent for ETFs/bonds.
*/
turnaroundWatch?: boolean;
}
/**
@@ -0,0 +1,30 @@
/**
* Daily change digest types (PRODUCT.md P1.1).
*/
export interface DigestCatalyst {
headline: string;
catalyst: string | null; // 'earnings' | 'ma' | 'guidance' | 'regulatory' | 'macro' | null
source: string; // 'edgar' | 'prwire' | 'yahoo'
url: string;
publishedAt: string;
}
/** A ticker whose signal changed since the previous snapshot. */
export interface DigestChange {
ticker: string;
previousSignal: string;
newSignal: string;
previousDate: string; // day of the previous snapshot
scoreDelta: number | null; // fundamental score change, when both sides have one
price: number | null;
catalysts: DigestCatalyst[]; // recent stories for this ticker (the "why", maybe)
}
export interface DigestReport {
date: string; // YYYY-MM-DD the digest covers
changes: DigestChange[]; // signal flips, strongest-impact first
newTickers: string[]; // first-ever snapshot today (no baseline to diff)
maStories: DigestCatalyst[]; // all M&A-classified stories in the window, always surfaced
snapshotCount: number; // tickers snapshotted today
}
@@ -50,6 +50,7 @@ export interface YahooNewsItem {
publisher: string;
link: string;
relatedTickers?: string[];
providerPublishTime?: string | number | Date;
}
export interface YahooSearchOptions {
@@ -66,6 +67,17 @@ export interface YahooFinanceLib {
queryOpts?: { validateResult?: boolean },
): Promise<any>;
search(query: string, opts?: YahooSearchOptions): Promise<{ news?: YahooNewsItem[] }>;
chart(
ticker: string,
opts: { period1: Date | string; interval?: string },
queryOpts?: { validateResult?: boolean },
): Promise<any>;
}
/** One point of daily price history (ticker modal chart). */
export interface PricePoint {
date: string; // YYYY-MM-DD
close: number;
}
// ── SimpleFIN client types ─────────────────────────────────────────────────
+9
View File
@@ -32,6 +32,7 @@ export type {
YahooNewsItem,
YahooSearchOptions,
YahooFinanceLib,
PricePoint,
SimpleFINOptions,
SimpleFINTransaction,
SimpleFINAccount,
@@ -55,6 +56,14 @@ export type {
HoldingRow,
SignalSnapshotRow,
} from './repositories.model';
export type {
NewsSource,
CatalystType,
NormalizedStory,
NewsArticleRow,
IngestStats,
} from './news.model';
export type { DigestCatalyst, DigestChange, DigestReport } from './digest.model';
export type { NumVal, SanitizedMetrics, SanitizedBondMetrics } from './scorers.model';
export type {
BenchmarkProviderOptions,
@@ -32,6 +32,7 @@ export interface StockData {
pFFO?: number | null;
dividendYield?: number | null;
beta?: number | null;
dayChangePct?: number | null;
week52High?: number | null;
week52Low?: number | null;
week52Change?: number | null;
@@ -66,6 +67,7 @@ export interface StockMetrics {
pFFO: number | null;
dividendYield: number | null;
beta: number | null;
dayChangePct: number | null;
week52High: number | null;
week52Low: number | null;
week52Change: number | null;
+43
View File
@@ -0,0 +1,43 @@
/**
* News pipeline types (FREE-DATA-STACK.md).
*/
export type NewsSource = 'edgar' | 'prwire' | 'yahoo';
export type CatalystType = 'earnings' | 'ma' | 'guidance' | 'regulatory' | 'macro';
/** One story after a poller has normalized it — the only shape the pipeline accepts. */
export interface NormalizedStory {
tickers: string[];
headline: string;
body?: string | null;
source: NewsSource;
url: string;
publishedAt: string; // ISO timestamp
/** Poller-supplied classification (e.g. EDGAR form type); overrides keyword classify. */
catalystHint?: CatalystType | null;
}
/** Raw row from news_articles (snake_case, as stored). */
export interface NewsArticleRow {
url_hash: string;
title_hash: string;
ticker_list: string; // JSON array stringified
headline: string;
body: string | null;
source: string;
catalyst: string | null;
url: string;
published_at: string;
created_at: string;
}
/** What one ingest run did — logged by pollers and bin/poll-news. */
export interface IngestStats {
fetched: number;
stored: number;
droppedNoUniverseTicker: number;
droppedNoise: number;
droppedDuplicate: number;
droppedCapped: number;
}