From f41792a0fad46abd39123f8c659f6505f208f9d6 Mon Sep 17 00:00:00 2001 From: codex Date: Sun, 1 Feb 2026 19:07:49 +0100 Subject: [PATCH] fix(candles-cache): cap per-tf window to 1024 --- .../base/candles-cache-worker/worker.mjs | 74 +++++++++++++------ 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/kustomize/base/candles-cache-worker/worker.mjs b/kustomize/base/candles-cache-worker/worker.mjs index 63fec4a..7c152af 100644 --- a/kustomize/base/candles-cache-worker/worker.mjs +++ b/kustomize/base/candles-cache-worker/worker.mjs @@ -184,6 +184,16 @@ function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, `; } +function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) { + return ` + DELETE FROM public.drift_candles_cache + WHERE symbol = ${sqlLit(symbol)} + AND source = ${sqlLit(sourceKey)} + AND bucket_seconds = ${bucketSeconds} + AND bucket < ${sqlLit(cutoffIso)}::timestamptz; + `; +} + async function getTickRange(cfg, { symbol, sourceKey }) { const sql = ` SELECT min(ts) AS min_ts, max(ts) AS max_ts @@ -219,14 +229,12 @@ function safetyWindowSeconds(bucketSeconds) { return 2 * 24 * 60 * 60; } -async function backfill(cfg, { symbol, sourceKey, fromIso, toIso }) { - for (const bs of cfg.bucketSecondsList) { - const chunk = chunkSecondsForBucket(bs); - for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) { - const a = new Date(t).toISOString(); - const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString(); - await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: a, toIso: b })); - } +async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) { + const chunk = chunkSecondsForBucket(bucketSeconds); + for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) { + const a = new Date(t).toISOString(); + const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString(); + await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b })); } } @@ -270,22 +278,27 @@ async function main() { const range = await getTickRange(cfg, { symbol, sourceKey }); if (!range.maxTs) continue; const toIso = new Date(Date.parse(range.maxTs)).toISOString(); - const maxBs = cfg.bucketSecondsList[cfg.bucketSecondsList.length - 1] || 60; - const fromIso = desiredFromIso({ - minTsIso: range.minTs, - maxTsIso: toIso, - bucketSeconds: maxBs, - targetPoints: cfg.targetPoints, - backfillDays: cfg.backfillDays, - }); + for (const bs of cfg.bucketSecondsList) { + const fromIso = desiredFromIso({ + minTsIso: range.minTs, + maxTsIso: toIso, + bucketSeconds: bs, + targetPoints: cfg.targetPoints, + backfillDays: cfg.backfillDays, + }); - console.log( - `[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} from=${fromIso} to=${toIso} points=${cfg.targetPoints}` - ); - try { - await backfill(cfg, { symbol, sourceKey, fromIso, toIso }); - } catch (err) { - console.error(`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}): ${String(err?.message || err)}`); + console.log( + `[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}` + ); + try { + await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso }); + // Enforce max window for this bucket (derived data; safe to prune). + await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso })); + } catch (err) { + console.error( + `[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}` + ); + } } } } @@ -309,6 +322,10 @@ async function main() { while (true) { const loopNow = Date.now(); const loopIso = new Date(loopNow).toISOString(); + const pruneEveryMs = 60_000; + const lastPruneAt = last.__pruneAt || new Map(); + // stash on the Map to keep closure-local without introducing a new outer var + last.__pruneAt = lastPruneAt; for (const symbol of cfg.symbols) { for (const sourceKey of cfg.sources) { @@ -325,6 +342,17 @@ async function main() { ); // best-effort: move last pointer close to now (actual max will lag by at most one bucket) last.set(k, loopIso); + + const prevPrune = lastPruneAt.get(k) || 0; + if (loopNow - prevPrune >= pruneEveryMs) { + const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0)); + const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString(); + try { + await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso })); + } finally { + lastPruneAt.set(k, loopNow); + } + } } catch (err) { console.error( `[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`