Files
trade-deploy/kustomize/base/candles-cache-worker/worker.mjs

398 lines
14 KiB
JavaScript

import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function parseTimeframeToSeconds(tf) {
const v = String(tf || '').trim().toLowerCase();
if (!v) return 60;
const m = v.match(/^(\d+)(s|m|h|d)$/);
if (!m) throw new Error('invalid_tf');
const num = Number.parseInt(m[1], 10);
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
const unit = m[2];
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
return num * mult;
}
function sqlLit(value) {
const s = String(value ?? '');
return `'${s.replace(/'/g, "''")}'`;
}
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
const u = String(graphqlUrlOrBase || '').trim();
if (!u) return 'http://hasura:8080';
// common case: http://hasura:8080/v1/graphql
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
return u.replace(/\/$/, '');
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const graphqlUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
// sources: include "any" (sourceKey='') by default.
const rawSources = envList('CANDLES_SOURCES', 'any');
const sources = [];
for (const s of rawSources) {
const v = String(s).trim();
if (!v) continue;
if (v.toLowerCase() === 'any') sources.push('');
else sources.push(v);
}
if (!sources.includes('')) sources.unshift('');
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
const bucketSecondsList = tfList
.map((t) => {
try {
return parseTimeframeToSeconds(t);
} catch {
return null;
}
})
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
}
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
const url = `${cfg.hasuraBaseUrl}/v2/query`;
const body = {
type: 'run_sql',
args: {
source: 'default',
sql,
read_only: Boolean(readOnly),
},
};
const res = await fetch(url, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
return JSON.parse(text);
}
function chunkSecondsForBucket(bucketSeconds) {
if (bucketSeconds <= 5) return 15 * 60;
if (bucketSeconds <= 60) return 60 * 60;
if (bucketSeconds <= 300) return 6 * 60 * 60;
if (bucketSeconds <= 3600) return 24 * 60 * 60;
return 7 * 24 * 60 * 60;
}
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
return `
WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
),
base AS (
SELECT
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
AND ts >= ${sqlLit(fromIso)}::timestamptz
AND ts < ${sqlLit(toIso)}::timestamptz
AND (
${sqlLit(sourceKey)} <> '' AND source = ${sqlLit(sourceKey)}
OR ${sqlLit(sourceKey)} = '' AND source = COALESCE((SELECT chosen_source FROM chosen), source)
)
),
agg AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*)::bigint AS ticks
FROM base
GROUP BY bucket
)
INSERT INTO public.drift_candles_cache
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at)
SELECT
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now()
FROM agg
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
oracle_close = EXCLUDED.oracle_close,
ticks = EXCLUDED.ticks,
updated_at = now();
`;
}
function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) {
return `
DELETE FROM public.drift_candles_cache
WHERE symbol = ${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)}
AND bucket_seconds = ${bucketSeconds}
AND bucket < ${sqlLit(cutoffIso)}::timestamptz;
`;
}
async function getTickRange(cfg, { symbol, sourceKey }) {
const sql =
String(sourceKey) === ''
? `
WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
)
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND source = COALESCE((SELECT chosen_source FROM chosen), source);
`
: `
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
if (!row) return { minTs: null, maxTs: null };
const minTs = row[0] ? String(row[0]).trim() : null;
const maxTs = row[1] ? String(row[1]).trim() : null;
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
}
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
const endMs = Date.parse(maxTsIso);
const minMs = minTsIso ? Date.parse(minTsIso) : null;
const wantSpanMs = targetPoints * bucketSeconds * 1000;
const wantFromMs = endMs - wantSpanMs;
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
let fromMs = wantFromMs;
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
if (minMs != null) fromMs = Math.max(fromMs, minMs);
return new Date(Math.max(0, fromMs)).toISOString();
}
function safetyWindowSeconds(bucketSeconds) {
if (bucketSeconds <= 60) return 10 * 60;
if (bucketSeconds <= 300) return 60 * 60;
if (bucketSeconds <= 3600) return 6 * 60 * 60;
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
return 2 * 24 * 60 * 60;
}
async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
const chunk = chunkSecondsForBucket(bucketSeconds);
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
const a = new Date(t).toISOString();
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b }));
}
}
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
const sql = `
SELECT max(bucket) AS max_bucket
FROM public.drift_candles_cache
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
// Hasura returns {result_type, result:[[col...],[row...]]}
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
const v = row && row[0] ? String(row[0]) : null;
return v && v.trim() ? v.trim() : null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'candles-cache-worker',
startedAt: getIsoNow(),
hasuraBaseUrl: cfg.hasuraBaseUrl,
symbols: cfg.symbols,
sources: cfg.sources.map((s) => (s ? s : '(any)')),
bucketSecondsList: cfg.bucketSecondsList,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
pollMs: cfg.pollMs,
},
null,
2
)
);
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
const range = await getTickRange(cfg, { symbol, sourceKey });
if (!range.maxTs) continue;
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
for (const bs of cfg.bucketSecondsList) {
const fromIso = desiredFromIso({
minTsIso: range.minTs,
maxTsIso: toIso,
bucketSeconds: bs,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
});
console.log(
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
);
try {
await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso });
// Enforce max window for this bucket (derived data; safe to prune).
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso }));
} catch (err) {
console.error(
`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
// Prime last buckets.
const last = new Map(); // key -> iso bucket
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
try {
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
if (maxBucket) last.set(k, maxBucket);
} catch {
// ignore
}
}
}
}
while (true) {
const loopNow = Date.now();
const loopIso = new Date(loopNow).toISOString();
const pruneEveryMs = 60_000;
const lastPruneAt = last.__pruneAt || new Map();
// stash on the Map to keep closure-local without introducing a new outer var
last.__pruneAt = lastPruneAt;
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
const prev = last.get(k);
const safety = safetyWindowSeconds(bs);
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
try {
await hasuraRunSql(
cfg,
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
);
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
last.set(k, loopIso);
const prevPrune = lastPruneAt.get(k) || 0;
if (loopNow - prevPrune >= pruneEveryMs) {
const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0));
const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString();
try {
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso }));
} finally {
lastPruneAt.set(k, loopNow);
}
}
} catch (err) {
console.error(
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});