diff --git a/kustomize/base/candles-cache-worker/worker.mjs b/kustomize/base/candles-cache-worker/worker.mjs index 7c152af..39ba680 100644 --- a/kustomize/base/candles-cache-worker/worker.mjs +++ b/kustomize/base/candles-cache-worker/worker.mjs @@ -144,7 +144,14 @@ function chunkSecondsForBucket(bucketSeconds) { function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) { return ` - WITH base AS ( + WITH chosen AS ( + SELECT source AS chosen_source + FROM public.drift_ticks + WHERE symbol = ${sqlLit(symbol)} + ORDER BY ts DESC + LIMIT 1 + ), + base AS ( SELECT time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket, ts, @@ -154,7 +161,10 @@ function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, WHERE symbol = ${sqlLit(symbol)} AND ts >= ${sqlLit(fromIso)}::timestamptz AND ts < ${sqlLit(toIso)}::timestamptz - AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}) + AND ( + ${sqlLit(sourceKey)} <> '' AND source = ${sqlLit(sourceKey)} + OR ${sqlLit(sourceKey)} = '' AND source = COALESCE((SELECT chosen_source FROM chosen), source) + ) ), agg AS ( SELECT @@ -195,11 +205,26 @@ function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) } async function getTickRange(cfg, { symbol, sourceKey }) { - const sql = ` + const sql = + String(sourceKey) === '' + ? ` + WITH chosen AS ( + SELECT source AS chosen_source + FROM public.drift_ticks + WHERE symbol=${sqlLit(symbol)} + ORDER BY ts DESC + LIMIT 1 + ) SELECT min(ts) AS min_ts, max(ts) AS max_ts FROM public.drift_ticks WHERE symbol=${sqlLit(symbol)} - AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}); + AND source = COALESCE((SELECT chosen_source FROM chosen), source); + ` + : ` + SELECT min(ts) AS min_ts, max(ts) AS max_ts + FROM public.drift_ticks + WHERE symbol=${sqlLit(symbol)} + AND source = ${sqlLit(sourceKey)}; `; const out = await hasuraRunSql(cfg, sql, { readOnly: true }); const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null; diff --git a/kustomize/base/initdb/001_init.sql b/kustomize/base/initdb/001_init.sql index 7f601b1..349e8bc 100644 --- a/kustomize/base/initdb/001_init.sql +++ b/kustomize/base/initdb/001_init.sql @@ -162,27 +162,52 @@ RETURNS SETOF public.drift_candles LANGUAGE sql STABLE AS $$ - WITH base AS ( + WITH src AS ( + SELECT COALESCE(p_source, '') AS source_key + ), + cached AS ( + SELECT + c.bucket, + c.open, + c.high, + c.low, + c.close, + c.oracle_close, + c.ticks + FROM public.drift_candles_cache c, src + WHERE c.symbol = p_symbol + AND c.bucket_seconds = p_bucket_seconds + AND c.source = src.source_key + ORDER BY c.bucket DESC + LIMIT p_limit + ), + fallback AS ( SELECT time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket, ts, COALESCE(mark_price, oracle_price) AS px, oracle_price AS oracle_px - FROM public.drift_ticks + FROM public.drift_ticks, src WHERE symbol = p_symbol - AND (p_source IS NULL OR source = p_source) + AND (src.source_key = '' OR source = src.source_key) AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2)) + ), + computed AS ( + SELECT + bucket, + (array_agg(px ORDER BY ts ASC))[1] AS open, + max(px) AS high, + min(px) AS low, + (array_agg(px ORDER BY ts DESC))[1] AS close, + (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, + count(*) AS ticks + FROM fallback + GROUP BY bucket ) - SELECT - bucket, - (array_agg(px ORDER BY ts ASC))[1] AS open, - max(px) AS high, - min(px) AS low, - (array_agg(px ORDER BY ts DESC))[1] AS close, - (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, - count(*) AS ticks - FROM base - GROUP BY bucket + SELECT * FROM cached + UNION ALL + SELECT * FROM computed + WHERE NOT EXISTS (SELECT 1 FROM cached) ORDER BY bucket DESC LIMIT p_limit; $$;