import fs from 'node:fs'; import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; function readJsonFile(filePath) { try { const raw = fs.readFileSync(filePath, 'utf8'); return JSON.parse(raw); } catch { return undefined; } } function getIsoNow() { return new Date().toISOString(); } function clampInt(value, min, max, fallback) { const n = Number.parseInt(String(value ?? ''), 10); if (!Number.isInteger(n)) return fallback; return Math.min(max, Math.max(min, n)); } function envList(name, fallbackCsv) { const raw = process.env[name] ?? fallbackCsv; return String(raw) .split(',') .map((s) => s.trim()) .filter(Boolean); } function parseTimeframeToSeconds(tf) { const v = String(tf || '').trim().toLowerCase(); if (!v) return 60; const m = v.match(/^(\d+)(s|m|h|d)$/); if (!m) throw new Error('invalid_tf'); const num = Number.parseInt(m[1], 10); if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf'); const unit = m[2]; const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400; return num * mult; } function sqlLit(value) { const s = String(value ?? ''); return `'${s.replace(/'/g, "''")}'`; } function resolveHasuraBaseUrl(graphqlUrlOrBase) { const u = String(graphqlUrlOrBase || '').trim(); if (!u) return 'http://hasura:8080'; // common case: http://hasura:8080/v1/graphql if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length); return u.replace(/\/$/, ''); } function resolveConfig() { const tokensPath = process.env.HASURA_TOKENS_FILE || process.env.TOKENS_FILE || process.env.HASURA_CONFIG_FILE || '/app/tokens/hasura.json'; const tokens = readJsonFile(tokensPath) || {}; const graphqlUrl = process.env.HASURA_GRAPHQL_URL || tokens.graphqlUrl || tokens.apiUrl || 'http://hasura:8080/v1/graphql'; const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl); const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || tokens.adminSecret || tokens.hasuraAdminSecret; if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)'); const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP'); // sources: include "any" (sourceKey='') by default. const rawSources = envList('CANDLES_SOURCES', 'any'); const sources = []; for (const s of rawSources) { const v = String(s).trim(); if (!v) continue; if (v.toLowerCase() === 'any') sources.push(''); else sources.push(v); } if (!sources.includes('')) sources.unshift(''); const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d'); const bucketSecondsList = tfList .map((t) => { try { return parseTimeframeToSeconds(t); } catch { return null; } }) .filter((n) => n != null) .map((n) => n) .filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx) .sort((a, b) => a - b); const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024); // legacy: kept for compatibility; if set, used as a minimum warmup window (days). const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0); const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000); return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs }; } async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) { const url = `${cfg.hasuraBaseUrl}/v2/query`; const body = { type: 'run_sql', args: { source: 'default', sql, read_only: Boolean(readOnly), }, }; const res = await fetch(url, { method: 'POST', headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret }, body: JSON.stringify(body), signal: AbortSignal.timeout(60_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`); return JSON.parse(text); } function chunkSecondsForBucket(bucketSeconds) { if (bucketSeconds <= 5) return 15 * 60; if (bucketSeconds <= 60) return 60 * 60; if (bucketSeconds <= 300) return 6 * 60 * 60; if (bucketSeconds <= 3600) return 24 * 60 * 60; return 7 * 24 * 60 * 60; } function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) { return ` WITH base AS ( SELECT time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket, ts, COALESCE(mark_price, oracle_price) AS px, oracle_price AS oracle_px FROM public.drift_ticks WHERE symbol = ${sqlLit(symbol)} AND ts >= ${sqlLit(fromIso)}::timestamptz AND ts < ${sqlLit(toIso)}::timestamptz AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}) ), agg AS ( SELECT bucket, (array_agg(px ORDER BY ts ASC))[1] AS open, max(px) AS high, min(px) AS low, (array_agg(px ORDER BY ts DESC))[1] AS close, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, count(*)::bigint AS ticks FROM base GROUP BY bucket ) INSERT INTO public.drift_candles_cache (bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at) SELECT bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now() FROM agg ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, oracle_close = EXCLUDED.oracle_close, ticks = EXCLUDED.ticks, updated_at = now(); `; } function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) { return ` DELETE FROM public.drift_candles_cache WHERE symbol = ${sqlLit(symbol)} AND source = ${sqlLit(sourceKey)} AND bucket_seconds = ${bucketSeconds} AND bucket < ${sqlLit(cutoffIso)}::timestamptz; `; } async function getTickRange(cfg, { symbol, sourceKey }) { const sql = ` SELECT min(ts) AS min_ts, max(ts) AS max_ts FROM public.drift_ticks WHERE symbol=${sqlLit(symbol)} AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}); `; const out = await hasuraRunSql(cfg, sql, { readOnly: true }); const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null; if (!row) return { minTs: null, maxTs: null }; const minTs = row[0] ? String(row[0]).trim() : null; const maxTs = row[1] ? String(row[1]).trim() : null; return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null }; } function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) { const endMs = Date.parse(maxTsIso); const minMs = minTsIso ? Date.parse(minTsIso) : null; const wantSpanMs = targetPoints * bucketSeconds * 1000; const wantFromMs = endMs - wantSpanMs; const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null; let fromMs = wantFromMs; if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs); if (minMs != null) fromMs = Math.max(fromMs, minMs); return new Date(Math.max(0, fromMs)).toISOString(); } function safetyWindowSeconds(bucketSeconds) { if (bucketSeconds <= 60) return 10 * 60; if (bucketSeconds <= 300) return 60 * 60; if (bucketSeconds <= 3600) return 6 * 60 * 60; if (bucketSeconds <= 14_400) return 24 * 60 * 60; return 2 * 24 * 60 * 60; } async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) { const chunk = chunkSecondsForBucket(bucketSeconds); for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) { const a = new Date(t).toISOString(); const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString(); await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b })); } } async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) { const sql = ` SELECT max(bucket) AS max_bucket FROM public.drift_candles_cache WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds}; `; const out = await hasuraRunSql(cfg, sql, { readOnly: true }); // Hasura returns {result_type, result:[[col...],[row...]]} const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null; const v = row && row[0] ? String(row[0]) : null; return v && v.trim() ? v.trim() : null; } async function main() { const cfg = resolveConfig(); console.log( JSON.stringify( { service: 'candles-cache-worker', startedAt: getIsoNow(), hasuraBaseUrl: cfg.hasuraBaseUrl, symbols: cfg.symbols, sources: cfg.sources.map((s) => (s ? s : '(any)')), bucketSecondsList: cfg.bucketSecondsList, targetPoints: cfg.targetPoints, backfillDays: cfg.backfillDays, pollMs: cfg.pollMs, }, null, 2 ) ); // Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have"). for (const symbol of cfg.symbols) { for (const sourceKey of cfg.sources) { const range = await getTickRange(cfg, { symbol, sourceKey }); if (!range.maxTs) continue; const toIso = new Date(Date.parse(range.maxTs)).toISOString(); for (const bs of cfg.bucketSecondsList) { const fromIso = desiredFromIso({ minTsIso: range.minTs, maxTsIso: toIso, bucketSeconds: bs, targetPoints: cfg.targetPoints, backfillDays: cfg.backfillDays, }); console.log( `[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}` ); try { await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso }); // Enforce max window for this bucket (derived data; safe to prune). await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso })); } catch (err) { console.error( `[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}` ); } } } } // Prime last buckets. const last = new Map(); // key -> iso bucket for (const symbol of cfg.symbols) { for (const sourceKey of cfg.sources) { for (const bs of cfg.bucketSecondsList) { const k = `${symbol}::${sourceKey}::${bs}`; try { const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs }); if (maxBucket) last.set(k, maxBucket); } catch { // ignore } } } } while (true) { const loopNow = Date.now(); const loopIso = new Date(loopNow).toISOString(); const pruneEveryMs = 60_000; const lastPruneAt = last.__pruneAt || new Map(); // stash on the Map to keep closure-local without introducing a new outer var last.__pruneAt = lastPruneAt; for (const symbol of cfg.symbols) { for (const sourceKey of cfg.sources) { for (const bs of cfg.bucketSecondsList) { const k = `${symbol}::${sourceKey}::${bs}`; const prev = last.get(k); const safety = safetyWindowSeconds(bs); const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000; const fromIso2 = new Date(Math.max(0, fromMs)).toISOString(); try { await hasuraRunSql( cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso }) ); // best-effort: move last pointer close to now (actual max will lag by at most one bucket) last.set(k, loopIso); const prevPrune = lastPruneAt.get(k) || 0; if (loopNow - prevPrune >= pruneEveryMs) { const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0)); const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString(); try { await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso })); } finally { lastPruneAt.set(k, loopNow); } } } catch (err) { console.error( `[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}` ); } } } } await sleep(cfg.pollMs); } } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });