From 9e7d7b88ac4060ba4ef7c5e60ccde90cff6ae377 Mon Sep 17 00:00:00 2001 From: u1 Date: Tue, 3 Feb 2026 12:51:29 +0100 Subject: [PATCH] feat(candles): support oracle OHLC basis --- kustomize/base/api/server.mjs | 41 ++++++++++++++++--- .../base/candles-cache-worker/worker.mjs | 38 ++++++++++++++++- kustomize/base/initdb/001_init.sql | 29 +++++++++++++ 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/kustomize/base/api/server.mjs b/kustomize/base/api/server.mjs index 99f6047..a2e2547 100644 --- a/kustomize/base/api/server.mjs +++ b/kustomize/base/api/server.mjs @@ -1006,6 +1006,7 @@ async function handler(cfg, req, res) { const symbol = (url.searchParams.get('symbol') || '').trim(); const source = (url.searchParams.get('source') || '').trim(); + const basisRaw = (url.searchParams.get('basis') || '').trim().toLowerCase(); const tf = (url.searchParams.get('tf') || url.searchParams.get('timeframe') || '1m').trim(); const limit = clampInt(url.searchParams.get('limit') || '300', 10, 2000); @@ -1023,6 +1024,11 @@ async function handler(cfg, req, res) { } const sourceKey = source || ''; + const basis = basisRaw === 'mark' ? 'mark' : basisRaw === 'oracle' || !basisRaw ? 'oracle' : null; + if (!basis) { + sendJson(res, 400, { ok: false, error: 'invalid_basis' }, cfg.corsOrigin); + return; + } try { // Cache-first: read precomputed candles from `drift_candles_cache`. @@ -1039,6 +1045,9 @@ async function handler(cfg, req, res) { high low close + oracle_open + oracle_high + oracle_low oracle_close ticks } @@ -1064,6 +1073,9 @@ async function handler(cfg, req, res) { high low close + oracle_open + oracle_high + oracle_low oracle_close ticks } @@ -1085,11 +1097,23 @@ async function handler(cfg, req, res) { .reverse() .map((r) => { const time = Math.floor(Date.parse(r.bucket) / 1000); - const open = Number(r.open); - const high = Number(r.high); - const low = Number(r.low); - const close = Number(r.close); - const oracle = r.oracle_close == null ? null : Number(r.oracle_close); + const oracleClose = r.oracle_close == null ? null : Number(r.oracle_close); + const oracleOpen = r.oracle_open == null ? oracleClose : Number(r.oracle_open); + const oracleHigh = r.oracle_high == null ? oracleClose : Number(r.oracle_high); + const oracleLow = r.oracle_low == null ? oracleClose : Number(r.oracle_low); + + const markOpen = Number(r.open); + const markHigh = Number(r.high); + const markLow = Number(r.low); + const markClose = Number(r.close); + + const open = basis === 'oracle' ? oracleOpen : markOpen; + const high = basis === 'oracle' ? oracleHigh : markHigh; + const low = basis === 'oracle' ? oracleLow : markLow; + const close = basis === 'oracle' ? oracleClose : markClose; + + // Always expose oracle close (even if basis=mark). + const oracle = oracleClose; const volume = Number(r.ticks || 0); return { time, open, high, low, close, volume, oracle }; }) @@ -1131,6 +1155,7 @@ async function handler(cfg, req, res) { ) { bucket close + oracle_close } } `; @@ -1151,7 +1176,10 @@ async function handler(cfg, req, res) { for (const r of ptsRows) { const t = tsToUnixSeconds(r.bucket); if (t == null) continue; - const p = parseNumeric(r.close); + const p = + basis === 'oracle' + ? parseNumeric(r.oracle_close) ?? parseNumeric(r.close) + : parseNumeric(r.close) ?? parseNumeric(r.oracle_close); if (p == null) continue; const idx = Math.floor((t - firstStart) / bucketSeconds); const start = firstStart + idx * bucketSeconds; @@ -1222,6 +1250,7 @@ async function handler(cfg, req, res) { candlesFunction: cfg.candlesFunction, symbol, source: source || null, + basis, tf, bucketSeconds, candles, diff --git a/kustomize/base/candles-cache-worker/worker.mjs b/kustomize/base/candles-cache-worker/worker.mjs index 39ba680..c1f91e8 100644 --- a/kustomize/base/candles-cache-worker/worker.mjs +++ b/kustomize/base/candles-cache-worker/worker.mjs @@ -173,21 +173,55 @@ function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, max(px) AS high, min(px) AS low, (array_agg(px ORDER BY ts DESC))[1] AS close, + (array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open, + max(oracle_px) AS oracle_high, + min(oracle_px) AS oracle_low, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, count(*)::bigint AS ticks FROM base GROUP BY bucket ) INSERT INTO public.drift_candles_cache - (bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at) + ( + bucket, + bucket_seconds, + symbol, + source, + open, + high, + low, + close, + oracle_open, + oracle_high, + oracle_low, + oracle_close, + ticks, + updated_at + ) SELECT - bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now() + bucket, + ${bucketSeconds}, + ${sqlLit(symbol)}, + ${sqlLit(sourceKey)}, + open, + high, + low, + close, + oracle_open, + oracle_high, + oracle_low, + oracle_close, + ticks, + now() FROM agg ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, + oracle_open = EXCLUDED.oracle_open, + oracle_high = EXCLUDED.oracle_high, + oracle_low = EXCLUDED.oracle_low, oracle_close = EXCLUDED.oracle_close, ticks = EXCLUDED.ticks, updated_at = now(); diff --git a/kustomize/base/initdb/001_init.sql b/kustomize/base/initdb/001_init.sql index fc6c900..6abb55d 100644 --- a/kustomize/base/initdb/001_init.sql +++ b/kustomize/base/initdb/001_init.sql @@ -121,10 +121,17 @@ CREATE TABLE IF NOT EXISTS public.drift_candles ( high numeric, low numeric, close numeric, + oracle_open numeric, + oracle_high numeric, + oracle_low numeric, oracle_close numeric, ticks bigint ); +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric; +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric; +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric; + -- Precomputed candle cache (materialized by a worker). -- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand. -- NOTE: `source=''` means "any source" (no source filter). @@ -137,12 +144,19 @@ CREATE TABLE IF NOT EXISTS public.drift_candles_cache ( high numeric NOT NULL, low numeric NOT NULL, close numeric NOT NULL, + oracle_open numeric, + oracle_high numeric, + oracle_low numeric, oracle_close numeric, ticks bigint NOT NULL DEFAULT 0, updated_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (bucket, bucket_seconds, symbol, source) ); +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric; +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric; +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric; + SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx @@ -174,6 +188,9 @@ AS $$ c.high, c.low, c.close, + c.oracle_open, + c.oracle_high, + c.oracle_low, c.oracle_close, c.ticks FROM public.drift_candles_cache c, src @@ -201,6 +218,9 @@ AS $$ max(px) AS high, min(px) AS low, (array_agg(px ORDER BY ts DESC))[1] AS close, + (array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open, + max(oracle_px) AS oracle_high, + min(oracle_px) AS oracle_low, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, count(*) AS ticks FROM raw_fallback @@ -236,6 +256,9 @@ AS $$ d.high, d.low, d.close, + d.oracle_open, + d.oracle_high, + d.oracle_low, d.oracle_close, d.ticks FROM series s @@ -261,6 +284,9 @@ AS $$ g.high, g.low, g.close, + g.oracle_open, + g.oracle_high, + g.oracle_low, g.oracle_close, g.ticks, COALESCE( @@ -282,6 +308,9 @@ AS $$ COALESCE(high, ff_close) AS high, COALESCE(low, ff_close) AS low, COALESCE(close, ff_close) AS close, + COALESCE(oracle_open, ff_oracle) AS oracle_open, + COALESCE(oracle_high, ff_oracle) AS oracle_high, + COALESCE(oracle_low, ff_oracle) AS oracle_low, COALESCE(oracle_close, ff_oracle) AS oracle_close, COALESCE(ticks, 0) AS ticks FROM ff