398 lines
14 KiB
JavaScript
398 lines
14 KiB
JavaScript
import fs from 'node:fs';
|
|
import process from 'node:process';
|
|
import { setTimeout as sleep } from 'node:timers/promises';
|
|
|
|
function readJsonFile(filePath) {
|
|
try {
|
|
const raw = fs.readFileSync(filePath, 'utf8');
|
|
return JSON.parse(raw);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
function getIsoNow() {
|
|
return new Date().toISOString();
|
|
}
|
|
|
|
function clampInt(value, min, max, fallback) {
|
|
const n = Number.parseInt(String(value ?? ''), 10);
|
|
if (!Number.isInteger(n)) return fallback;
|
|
return Math.min(max, Math.max(min, n));
|
|
}
|
|
|
|
function envList(name, fallbackCsv) {
|
|
const raw = process.env[name] ?? fallbackCsv;
|
|
return String(raw)
|
|
.split(',')
|
|
.map((s) => s.trim())
|
|
.filter(Boolean);
|
|
}
|
|
|
|
function parseTimeframeToSeconds(tf) {
|
|
const v = String(tf || '').trim().toLowerCase();
|
|
if (!v) return 60;
|
|
const m = v.match(/^(\d+)(s|m|h|d)$/);
|
|
if (!m) throw new Error('invalid_tf');
|
|
const num = Number.parseInt(m[1], 10);
|
|
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
|
|
const unit = m[2];
|
|
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
|
|
return num * mult;
|
|
}
|
|
|
|
function sqlLit(value) {
|
|
const s = String(value ?? '');
|
|
return `'${s.replace(/'/g, "''")}'`;
|
|
}
|
|
|
|
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
|
|
const u = String(graphqlUrlOrBase || '').trim();
|
|
if (!u) return 'http://hasura:8080';
|
|
// common case: http://hasura:8080/v1/graphql
|
|
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
|
|
return u.replace(/\/$/, '');
|
|
}
|
|
|
|
function resolveConfig() {
|
|
const tokensPath =
|
|
process.env.HASURA_TOKENS_FILE ||
|
|
process.env.TOKENS_FILE ||
|
|
process.env.HASURA_CONFIG_FILE ||
|
|
'/app/tokens/hasura.json';
|
|
const tokens = readJsonFile(tokensPath) || {};
|
|
|
|
const graphqlUrl =
|
|
process.env.HASURA_GRAPHQL_URL ||
|
|
tokens.graphqlUrl ||
|
|
tokens.apiUrl ||
|
|
'http://hasura:8080/v1/graphql';
|
|
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
|
|
|
|
const hasuraAdminSecret =
|
|
process.env.HASURA_ADMIN_SECRET ||
|
|
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
|
tokens.adminSecret ||
|
|
tokens.hasuraAdminSecret;
|
|
|
|
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
|
|
|
|
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
|
|
|
|
// sources: include "any" (sourceKey='') by default.
|
|
const rawSources = envList('CANDLES_SOURCES', 'any');
|
|
const sources = [];
|
|
for (const s of rawSources) {
|
|
const v = String(s).trim();
|
|
if (!v) continue;
|
|
if (v.toLowerCase() === 'any') sources.push('');
|
|
else sources.push(v);
|
|
}
|
|
if (!sources.includes('')) sources.unshift('');
|
|
|
|
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
|
|
const bucketSecondsList = tfList
|
|
.map((t) => {
|
|
try {
|
|
return parseTimeframeToSeconds(t);
|
|
} catch {
|
|
return null;
|
|
}
|
|
})
|
|
.filter((n) => n != null)
|
|
.map((n) => n)
|
|
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
|
|
.sort((a, b) => a - b);
|
|
|
|
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
|
|
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
|
|
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
|
|
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
|
|
|
|
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
|
|
}
|
|
|
|
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
|
|
const url = `${cfg.hasuraBaseUrl}/v2/query`;
|
|
const body = {
|
|
type: 'run_sql',
|
|
args: {
|
|
source: 'default',
|
|
sql,
|
|
read_only: Boolean(readOnly),
|
|
},
|
|
};
|
|
|
|
const res = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
|
|
body: JSON.stringify(body),
|
|
signal: AbortSignal.timeout(60_000),
|
|
});
|
|
const text = await res.text();
|
|
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
|
|
return JSON.parse(text);
|
|
}
|
|
|
|
function chunkSecondsForBucket(bucketSeconds) {
|
|
if (bucketSeconds <= 5) return 15 * 60;
|
|
if (bucketSeconds <= 60) return 60 * 60;
|
|
if (bucketSeconds <= 300) return 6 * 60 * 60;
|
|
if (bucketSeconds <= 3600) return 24 * 60 * 60;
|
|
return 7 * 24 * 60 * 60;
|
|
}
|
|
|
|
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
|
|
return `
|
|
WITH chosen AS (
|
|
SELECT source AS chosen_source
|
|
FROM public.drift_ticks
|
|
WHERE symbol = ${sqlLit(symbol)}
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
),
|
|
base AS (
|
|
SELECT
|
|
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
|
|
ts,
|
|
COALESCE(mark_price, oracle_price) AS px,
|
|
oracle_price AS oracle_px
|
|
FROM public.drift_ticks
|
|
WHERE symbol = ${sqlLit(symbol)}
|
|
AND ts >= ${sqlLit(fromIso)}::timestamptz
|
|
AND ts < ${sqlLit(toIso)}::timestamptz
|
|
AND (
|
|
${sqlLit(sourceKey)} <> '' AND source = ${sqlLit(sourceKey)}
|
|
OR ${sqlLit(sourceKey)} = '' AND source = COALESCE((SELECT chosen_source FROM chosen), source)
|
|
)
|
|
),
|
|
agg AS (
|
|
SELECT
|
|
bucket,
|
|
(array_agg(px ORDER BY ts ASC))[1] AS open,
|
|
max(px) AS high,
|
|
min(px) AS low,
|
|
(array_agg(px ORDER BY ts DESC))[1] AS close,
|
|
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
|
|
count(*)::bigint AS ticks
|
|
FROM base
|
|
GROUP BY bucket
|
|
)
|
|
INSERT INTO public.drift_candles_cache
|
|
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at)
|
|
SELECT
|
|
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now()
|
|
FROM agg
|
|
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
|
|
open = EXCLUDED.open,
|
|
high = EXCLUDED.high,
|
|
low = EXCLUDED.low,
|
|
close = EXCLUDED.close,
|
|
oracle_close = EXCLUDED.oracle_close,
|
|
ticks = EXCLUDED.ticks,
|
|
updated_at = now();
|
|
`;
|
|
}
|
|
|
|
function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) {
|
|
return `
|
|
DELETE FROM public.drift_candles_cache
|
|
WHERE symbol = ${sqlLit(symbol)}
|
|
AND source = ${sqlLit(sourceKey)}
|
|
AND bucket_seconds = ${bucketSeconds}
|
|
AND bucket < ${sqlLit(cutoffIso)}::timestamptz;
|
|
`;
|
|
}
|
|
|
|
async function getTickRange(cfg, { symbol, sourceKey }) {
|
|
const sql =
|
|
String(sourceKey) === ''
|
|
? `
|
|
WITH chosen AS (
|
|
SELECT source AS chosen_source
|
|
FROM public.drift_ticks
|
|
WHERE symbol=${sqlLit(symbol)}
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
)
|
|
SELECT min(ts) AS min_ts, max(ts) AS max_ts
|
|
FROM public.drift_ticks
|
|
WHERE symbol=${sqlLit(symbol)}
|
|
AND source = COALESCE((SELECT chosen_source FROM chosen), source);
|
|
`
|
|
: `
|
|
SELECT min(ts) AS min_ts, max(ts) AS max_ts
|
|
FROM public.drift_ticks
|
|
WHERE symbol=${sqlLit(symbol)}
|
|
AND source = ${sqlLit(sourceKey)};
|
|
`;
|
|
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
|
|
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
|
|
if (!row) return { minTs: null, maxTs: null };
|
|
const minTs = row[0] ? String(row[0]).trim() : null;
|
|
const maxTs = row[1] ? String(row[1]).trim() : null;
|
|
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
|
|
}
|
|
|
|
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
|
|
const endMs = Date.parse(maxTsIso);
|
|
const minMs = minTsIso ? Date.parse(minTsIso) : null;
|
|
const wantSpanMs = targetPoints * bucketSeconds * 1000;
|
|
const wantFromMs = endMs - wantSpanMs;
|
|
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
|
|
let fromMs = wantFromMs;
|
|
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
|
|
if (minMs != null) fromMs = Math.max(fromMs, minMs);
|
|
return new Date(Math.max(0, fromMs)).toISOString();
|
|
}
|
|
|
|
function safetyWindowSeconds(bucketSeconds) {
|
|
if (bucketSeconds <= 60) return 10 * 60;
|
|
if (bucketSeconds <= 300) return 60 * 60;
|
|
if (bucketSeconds <= 3600) return 6 * 60 * 60;
|
|
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
|
|
return 2 * 24 * 60 * 60;
|
|
}
|
|
|
|
async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
|
|
const chunk = chunkSecondsForBucket(bucketSeconds);
|
|
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
|
|
const a = new Date(t).toISOString();
|
|
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
|
|
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b }));
|
|
}
|
|
}
|
|
|
|
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
|
|
const sql = `
|
|
SELECT max(bucket) AS max_bucket
|
|
FROM public.drift_candles_cache
|
|
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
|
|
`;
|
|
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
|
|
// Hasura returns {result_type, result:[[col...],[row...]]}
|
|
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
|
|
const v = row && row[0] ? String(row[0]) : null;
|
|
return v && v.trim() ? v.trim() : null;
|
|
}
|
|
|
|
async function main() {
|
|
const cfg = resolveConfig();
|
|
|
|
console.log(
|
|
JSON.stringify(
|
|
{
|
|
service: 'candles-cache-worker',
|
|
startedAt: getIsoNow(),
|
|
hasuraBaseUrl: cfg.hasuraBaseUrl,
|
|
symbols: cfg.symbols,
|
|
sources: cfg.sources.map((s) => (s ? s : '(any)')),
|
|
bucketSecondsList: cfg.bucketSecondsList,
|
|
targetPoints: cfg.targetPoints,
|
|
backfillDays: cfg.backfillDays,
|
|
pollMs: cfg.pollMs,
|
|
},
|
|
null,
|
|
2
|
|
)
|
|
);
|
|
|
|
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
|
|
for (const symbol of cfg.symbols) {
|
|
for (const sourceKey of cfg.sources) {
|
|
const range = await getTickRange(cfg, { symbol, sourceKey });
|
|
if (!range.maxTs) continue;
|
|
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
|
|
for (const bs of cfg.bucketSecondsList) {
|
|
const fromIso = desiredFromIso({
|
|
minTsIso: range.minTs,
|
|
maxTsIso: toIso,
|
|
bucketSeconds: bs,
|
|
targetPoints: cfg.targetPoints,
|
|
backfillDays: cfg.backfillDays,
|
|
});
|
|
|
|
console.log(
|
|
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
|
|
);
|
|
try {
|
|
await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso });
|
|
// Enforce max window for this bucket (derived data; safe to prune).
|
|
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso }));
|
|
} catch (err) {
|
|
console.error(
|
|
`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Prime last buckets.
|
|
const last = new Map(); // key -> iso bucket
|
|
for (const symbol of cfg.symbols) {
|
|
for (const sourceKey of cfg.sources) {
|
|
for (const bs of cfg.bucketSecondsList) {
|
|
const k = `${symbol}::${sourceKey}::${bs}`;
|
|
try {
|
|
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
|
|
if (maxBucket) last.set(k, maxBucket);
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
while (true) {
|
|
const loopNow = Date.now();
|
|
const loopIso = new Date(loopNow).toISOString();
|
|
const pruneEveryMs = 60_000;
|
|
const lastPruneAt = last.__pruneAt || new Map();
|
|
// stash on the Map to keep closure-local without introducing a new outer var
|
|
last.__pruneAt = lastPruneAt;
|
|
|
|
for (const symbol of cfg.symbols) {
|
|
for (const sourceKey of cfg.sources) {
|
|
for (const bs of cfg.bucketSecondsList) {
|
|
const k = `${symbol}::${sourceKey}::${bs}`;
|
|
const prev = last.get(k);
|
|
const safety = safetyWindowSeconds(bs);
|
|
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
|
|
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
|
|
try {
|
|
await hasuraRunSql(
|
|
cfg,
|
|
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
|
|
);
|
|
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
|
|
last.set(k, loopIso);
|
|
|
|
const prevPrune = lastPruneAt.get(k) || 0;
|
|
if (loopNow - prevPrune >= pruneEveryMs) {
|
|
const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0));
|
|
const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString();
|
|
try {
|
|
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso }));
|
|
} finally {
|
|
lastPruneAt.set(k, loopNow);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error(
|
|
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
await sleep(cfg.pollMs);
|
|
}
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error(String(err?.stack || err));
|
|
process.exitCode = 1;
|
|
});
|