From b239f564b273b46d432241dd603f38f90f0875ea Mon Sep 17 00:00:00 2001 From: codex Date: Sun, 1 Feb 2026 18:12:15 +0100 Subject: [PATCH] feat(staging): add candles cache + v2 slippage --- kustomize/base/api/deployment.yaml | 7 + .../base/dlob-slippage-worker/deployment.yaml | 2 +- .../base/dlob-slippage-worker/worker.mjs | 565 +++++++++--------- kustomize/base/dlob-ts-archiver/worker.mjs | 91 ++- kustomize/base/hasura/hasura-bootstrap.mjs | 61 ++ kustomize/base/initdb/001_init.sql | 87 +++ kustomize/base/kustomization.yaml | 7 + 7 files changed, 526 insertions(+), 294 deletions(-) diff --git a/kustomize/base/api/deployment.yaml b/kustomize/base/api/deployment.yaml index dd77df2..a1e299c 100644 --- a/kustomize/base/api/deployment.yaml +++ b/kustomize/base/api/deployment.yaml @@ -18,6 +18,9 @@ spec: - name: trade-api-wrapper configMap: name: trade-api-wrapper + - name: trade-api-upstream + configMap: + name: trade-api-upstream containers: - name: api image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435 @@ -31,6 +34,10 @@ spec: mountPath: /override/wrapper.mjs subPath: wrapper.mjs readOnly: true + - name: trade-api-upstream + mountPath: /app/services/api/server.mjs + subPath: server.mjs + readOnly: true env: - name: PORT value: "8787" diff --git a/kustomize/base/dlob-slippage-worker/deployment.yaml b/kustomize/base/dlob-slippage-worker/deployment.yaml index 5d40f04..4c7f6b2 100644 --- a/kustomize/base/dlob-slippage-worker/deployment.yaml +++ b/kustomize/base/dlob-slippage-worker/deployment.yaml @@ -31,7 +31,7 @@ spec: - name: DLOB_POLL_MS value: "1000" - name: DLOB_SLIPPAGE_SIZES_USD - value: "10,25,50,100,250,500,1000,5000,10000,50000" + value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000" - name: PRICE_PRECISION value: "1000000" - name: BASE_PRECISION diff --git a/kustomize/base/dlob-slippage-worker/worker.mjs b/kustomize/base/dlob-slippage-worker/worker.mjs index 19a8f51..2994456 100644 --- a/kustomize/base/dlob-slippage-worker/worker.mjs +++ b/kustomize/base/dlob-slippage-worker/worker.mjs @@ -1,6 +1,16 @@ +import fs from 'node:fs'; import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; +function readJsonFile(filePath) { + try { + const raw = fs.readFileSync(filePath, 'utf8'); + return JSON.parse(raw); + } catch { + return undefined; + } +} + function getIsoNow() { return new Date().toISOString(); } @@ -19,14 +29,87 @@ function envList(name, fallbackCsv) { .filter(Boolean); } -function envIntList(name, fallbackCsv) { - const out = []; - for (const item of envList(name, fallbackCsv)) { - const n = Number.parseInt(item, 10); - if (!Number.isFinite(n)) continue; - out.push(n); +function parsePositiveNumber(value) { + const n = Number.parseFloat(String(value ?? '').trim()); + if (!Number.isFinite(n) || !(n > 0)) return null; + return n; +} + +function resolveConfig() { + const tokensPath = + process.env.HASURA_TOKENS_FILE || + process.env.TOKENS_FILE || + process.env.HASURA_CONFIG_FILE || + '/app/tokens/hasura.json'; + const tokens = readJsonFile(tokensPath) || {}; + + const hasuraUrl = + process.env.HASURA_GRAPHQL_URL || + tokens.graphqlUrl || + tokens.apiUrl || + 'http://hasura:8080/v1/graphql'; + const hasuraAdminSecret = + process.env.HASURA_ADMIN_SECRET || + process.env.HASURA_GRAPHQL_ADMIN_SECRET || + tokens.adminSecret || + tokens.hasuraAdminSecret; + const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); + const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); + + const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000') + .map(parsePositiveNumber) + .filter((n) => n != null) + .map((n) => n) + .filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx) + .sort((a, b) => a - b); + + const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n)); + + const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25); + const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); + const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); + if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); + if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); + + return { + hasuraUrl, + hasuraAdminSecret, + hasuraAuthToken, + markets, + pollMs, + sizesUsd, + sizesUsdInt, + depthLevels, + pricePrecision, + basePrecision, + }; +} + +async function graphqlRequest(cfg, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (cfg.hasuraAuthToken) { + headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; + } else if (cfg.hasuraAdminSecret) { + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } else { + throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)'); } - return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite); + + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(15_000), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = JSON.parse(text); + if (json.errors?.length) { + throw new Error(json.errors.map((e) => e.message).join(' | ')); + } + return json.data; } function toNumberOrNull(value) { @@ -41,225 +124,83 @@ function toNumberOrNull(value) { return null; } -function numStr(value) { - if (value == null) return null; - if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; - if (typeof value === 'string') return value.trim() || null; - return null; -} - -function jsonNormalize(value) { - if (typeof value !== 'string') return value; - const s = value.trim(); - if (!s) return null; - try { - return JSON.parse(s); - } catch { - return value; +function normalizeLevels(raw) { + if (raw == null) return []; + if (Array.isArray(raw)) return raw; + if (typeof raw === 'string') { + const s = raw.trim(); + if (!s) return []; + try { + const v = JSON.parse(s); + return Array.isArray(v) ? v : []; + } catch { + return []; + } } + return []; } -function resolveConfig() { - const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql'; - const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined; - const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; - - const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); - const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); - const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000'); - - const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); - const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); - if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) - throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); - if (!Number.isFinite(basePrecision) || basePrecision <= 0) - throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); - - return { - hasuraUrl, - hasuraAdminSecret, - hasuraAuthToken, - markets, - pollMs, - sizesUsd, - pricePrecision, - basePrecision, - }; -} - -async function graphqlRequest(cfg, query, variables) { - const headers = { 'content-type': 'application/json' }; - if (cfg.hasuraAuthToken) { - headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; - } else if (cfg.hasuraAdminSecret) { - headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; - } else { - throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)'); - } - - const res = await fetch(cfg.hasuraUrl, { - method: 'POST', - headers, - body: JSON.stringify({ query, variables }), - signal: AbortSignal.timeout(10_000), - }); - const text = await res.text(); - if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); - const json = JSON.parse(text); - if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); - return json.data; -} - -function parseLevels(raw, pricePrecision, basePrecision, side) { - const v = jsonNormalize(raw); - if (!Array.isArray(v)) return []; - +function parseScaledLevels(raw, pricePrecision, basePrecision) { + const levels = normalizeLevels(raw); const out = []; - for (const item of v) { - const priceInt = toNumberOrNull(item?.price); - const sizeInt = toNumberOrNull(item?.size); + for (const it of levels) { + const priceInt = toNumberOrNull(it?.price); + const sizeInt = toNumberOrNull(it?.size); if (priceInt == null || sizeInt == null) continue; const price = priceInt / pricePrecision; - const size = sizeInt / basePrecision; - if (!Number.isFinite(price) || !Number.isFinite(size)) continue; - out.push({ price, size }); + const base = sizeInt / basePrecision; + if (!Number.isFinite(price) || !Number.isFinite(base)) continue; + out.push({ price, base }); } - - if (side === 'bid') out.sort((a, b) => b.price - a.price); - if (side === 'ask') out.sort((a, b) => a.price - b.price); return out; } -function computeMid(bestBid, bestAsk, markPrice, oraclePrice) { - if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2; - if (markPrice != null) return markPrice; - if (oraclePrice != null) return oraclePrice; - return null; -} - -function simulateBuy(asks, mid, sizeUsd) { +function simulateFill(levels, sizeUsd) { let remainingUsd = sizeUsd; let filledUsd = 0; let filledBase = 0; + let totalQuoteUsd = 0; let worstPrice = null; let levelsConsumed = 0; - for (const lvl of asks) { - if (!(remainingUsd > 0)) break; - if (!(lvl.price > 0) || !(lvl.size > 0)) continue; - - const maxBase = remainingUsd / lvl.price; - const takeBase = Math.min(lvl.size, maxBase); - if (!(takeBase > 0)) continue; - - const cost = takeBase * lvl.price; - filledUsd += cost; - filledBase += takeBase; - remainingUsd -= cost; - worstPrice = lvl.price; + for (const l of levels) { + if (remainingUsd <= 0) break; + const levelUsd = l.base * l.price; + if (levelUsd <= 0) continue; levelsConsumed += 1; - } + worstPrice = l.price; - const vwap = filledBase > 0 ? filledUsd / filledBase : null; - const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null; - const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null; + const takeUsd = Math.min(remainingUsd, levelUsd); + const takeBase = takeUsd / l.price; - return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct }; -} - -function simulateSell(bids, mid, sizeUsd) { - if (mid == null || !(mid > 0)) { - return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null }; - } - - const baseTarget = sizeUsd / mid; - let remainingBase = baseTarget; - let proceedsUsd = 0; - let filledBase = 0; - let worstPrice = null; - let levelsConsumed = 0; - - for (const lvl of bids) { - if (!(remainingBase > 0)) break; - if (!(lvl.price > 0) || !(lvl.size > 0)) continue; - - const takeBase = Math.min(lvl.size, remainingBase); - if (!(takeBase > 0)) continue; - - const proceeds = takeBase * lvl.price; - proceedsUsd += proceeds; + remainingUsd -= takeUsd; + filledUsd += takeUsd; filledBase += takeBase; - remainingBase -= takeBase; - worstPrice = lvl.price; - levelsConsumed += 1; + totalQuoteUsd += takeUsd; } - const vwap = filledBase > 0 ? proceedsUsd / filledBase : null; - const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null; - const fillPct = baseTarget > 0 ? filledBase / baseTarget : null; + const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null; + const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null; - return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct }; + return { + filledUsd, + filledBase, + vwapPrice, + worstPrice, + levelsConsumed, + fillPct, + }; } -async function fetchL2Latest(cfg) { - const query = ` - query DlobL2Latest($markets: [String!]!) { - dlob_l2_latest(where: {market_name: {_in: $markets}}) { - market_name - market_type - market_index - ts - slot - mark_price - oracle_price - best_bid_price - best_ask_price - bids - asks - updated_at - } - } - `; - const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); - return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; -} - -async function upsertSlippage(cfg, rows) { - if (!rows.length) return; - const mutation = ` - mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) { - insert_dlob_slippage_latest( - objects: $rows - on_conflict: { - constraint: dlob_slippage_latest_pkey - update_columns: [ - market_type - market_index - ts - slot - mid_price - best_bid_price - best_ask_price - vwap_price - worst_price - filled_usd - filled_base - impact_bps - levels_consumed - fill_pct - raw - updated_at - ] - } - ) { affected_rows } - } - `; - await graphqlRequest(cfg, mutation, { rows }); +function impactBps({ side, mid, vwap }) { + if (mid == null || vwap == null || mid <= 0) return null; + if (side === 'buy') return ((vwap / mid) - 1) * 10_000; + if (side === 'sell') return (1 - (vwap / mid)) * 10_000; + return null; } async function main() { const cfg = resolveConfig(); - const lastUpdatedAtByMarket = new Map(); console.log( JSON.stringify( @@ -271,102 +212,180 @@ async function main() { markets: cfg.markets, pollMs: cfg.pollMs, sizesUsd: cfg.sizesUsd, - pricePrecision: cfg.pricePrecision, - basePrecision: cfg.basePrecision, + depthLevels: cfg.depthLevels, }, null, 2 ) ); + const lastSeenUpdatedAt = new Map(); // market -> updated_at + while (true) { - const rows = []; + const updatedAt = getIsoNow(); try { - const l2Rows = await fetchL2Latest(cfg); - for (const l2 of l2Rows) { - const market = String(l2.market_name || '').trim(); + const query = ` + query DlobL2Latest($markets: [String!]!) { + dlob_l2_latest(where: { market_name: { _in: $markets } }) { + market_name + market_type + market_index + ts + slot + best_bid_price + best_ask_price + bids + asks + updated_at + } + } + `; + + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; + + const objectsV1 = []; + const objectsV2 = []; + + for (const row of rows) { + const market = String(row?.market_name || '').trim(); if (!market) continue; - const updatedAt = l2.updated_at || null; - if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue; - if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt); + const rowUpdatedAt = row?.updated_at ?? null; + if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue; + if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt); - const bestBid = toNumberOrNull(l2.best_bid_price); - const bestAsk = toNumberOrNull(l2.best_ask_price); - const markPrice = toNumberOrNull(l2.mark_price); - const oraclePrice = toNumberOrNull(l2.oracle_price); - const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice); + const bestBid = toNumberOrNull(row?.best_bid_price); + const bestAsk = toNumberOrNull(row?.best_ask_price); + if (bestBid == null || bestAsk == null) continue; - const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid'); - const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask'); + const mid = (bestBid + bestAsk) / 2; + if (!Number.isFinite(mid) || mid <= 0) continue; + + const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision) + .slice() + .sort((a, b) => b.price - a.price) + .slice(0, cfg.depthLevels); + const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision) + .slice() + .sort((a, b) => a.price - b.price) + .slice(0, cfg.depthLevels); for (const sizeUsd of cfg.sizesUsd) { - const buy = simulateBuy(asks, mid, sizeUsd); - rows.push({ - market_name: market, - side: 'buy', - size_usd: sizeUsd, - market_type: l2.market_type ? String(l2.market_type) : 'perp', - market_index: typeof l2.market_index === 'number' ? l2.market_index : null, - ts: l2.ts == null ? null : String(l2.ts), - slot: l2.slot == null ? null : String(l2.slot), - mid_price: numStr(mid), - best_bid_price: numStr(bestBid), - best_ask_price: numStr(bestAsk), - vwap_price: numStr(buy.vwap), - worst_price: numStr(buy.worstPrice), - filled_usd: numStr(buy.filledUsd), - filled_base: numStr(buy.filledBase), - impact_bps: numStr(buy.impactBps), - levels_consumed: buy.levelsConsumed, - fill_pct: numStr(buy.fillPct), - raw: { - ref: 'mid', - units: 'usd', - pricePrecision: cfg.pricePrecision, - basePrecision: cfg.basePrecision, - }, - updated_at: updatedAt, - }); + // buy consumes asks (worse prices as you go up) + { + const sim = simulateFill(asks, sizeUsd); + const baseObj = { + market_name: market, + side: 'buy', + market_type: row?.market_type ?? 'perp', + market_index: row?.market_index ?? null, + ts: row?.ts == null ? null : String(row.ts), + slot: row?.slot == null ? null : String(row.slot), + mid_price: String(mid), + vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice), + worst_price: sim.worstPrice == null ? null : String(sim.worstPrice), + filled_usd: String(sim.filledUsd), + filled_base: String(sim.filledBase), + impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }), + levels_consumed: sim.levelsConsumed, + fill_pct: sim.fillPct == null ? null : String(sim.fillPct), + raw: { depthLevels: cfg.depthLevels }, + updated_at: updatedAt, + }; + objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) }); + if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) }); + } - const sell = simulateSell(bids, mid, sizeUsd); - rows.push({ - market_name: market, - side: 'sell', - size_usd: sizeUsd, - market_type: l2.market_type ? String(l2.market_type) : 'perp', - market_index: typeof l2.market_index === 'number' ? l2.market_index : null, - ts: l2.ts == null ? null : String(l2.ts), - slot: l2.slot == null ? null : String(l2.slot), - mid_price: numStr(mid), - best_bid_price: numStr(bestBid), - best_ask_price: numStr(bestAsk), - vwap_price: numStr(sell.vwap), - worst_price: numStr(sell.worstPrice), - filled_usd: numStr(sell.filledUsd), - filled_base: numStr(sell.filledBase), - impact_bps: numStr(sell.impactBps), - levels_consumed: sell.levelsConsumed, - fill_pct: numStr(sell.fillPct), - raw: { - ref: 'mid', - units: 'usd', - pricePrecision: cfg.pricePrecision, - basePrecision: cfg.basePrecision, - }, - updated_at: updatedAt, - }); + // sell consumes bids (worse prices as you go down) + { + const sim = simulateFill(bids, sizeUsd); + const baseObj = { + market_name: market, + side: 'sell', + market_type: row?.market_type ?? 'perp', + market_index: row?.market_index ?? null, + ts: row?.ts == null ? null : String(row.ts), + slot: row?.slot == null ? null : String(row.slot), + mid_price: String(mid), + vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice), + worst_price: sim.worstPrice == null ? null : String(sim.worstPrice), + filled_usd: String(sim.filledUsd), + filled_base: String(sim.filledBase), + impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }), + levels_consumed: sim.levelsConsumed, + fill_pct: sim.fillPct == null ? null : String(sim.fillPct), + raw: { depthLevels: cfg.depthLevels }, + updated_at: updatedAt, + }; + objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) }); + if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) }); + } } } - } catch (err) { - console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`); - } - try { - await upsertSlippage(cfg, rows); + if (objectsV1.length) { + const mutation = ` + mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) { + insert_dlob_slippage_latest( + objects: $rows + on_conflict: { + constraint: dlob_slippage_latest_pkey + update_columns: [ + market_type + market_index + ts + slot + mid_price + vwap_price + worst_price + filled_usd + filled_base + impact_bps + levels_consumed + fill_pct + raw + updated_at + ] + } + ) { affected_rows } + } + `; + await graphqlRequest(cfg, mutation, { rows: objectsV1 }); + } + + if (objectsV2.length) { + const mutation = ` + mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) { + insert_dlob_slippage_latest_v2( + objects: $rows + on_conflict: { + constraint: dlob_slippage_latest_v2_pkey + update_columns: [ + market_type + market_index + ts + slot + mid_price + vwap_price + worst_price + filled_usd + filled_base + impact_bps + levels_consumed + fill_pct + raw + updated_at + ] + } + ) { affected_rows } + } + `; + await graphqlRequest(cfg, mutation, { rows: objectsV2 }); + } } catch (err) { - console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`); + console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`); } await sleep(cfg.pollMs); diff --git a/kustomize/base/dlob-ts-archiver/worker.mjs b/kustomize/base/dlob-ts-archiver/worker.mjs index 498a16b..ae85b32 100644 --- a/kustomize/base/dlob-ts-archiver/worker.mjs +++ b/kustomize/base/dlob-ts-archiver/worker.mjs @@ -1,6 +1,16 @@ +import fs from 'node:fs'; import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; +function readJsonFile(filePath) { + try { + const raw = fs.readFileSync(filePath, 'utf8'); + return JSON.parse(raw); + } catch { + return undefined; + } +} + function getIsoNow() { return new Date().toISOString(); } @@ -20,30 +30,53 @@ function envList(name, fallbackCsv) { } function resolveConfig() { - const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim(); - const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim(); - if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); + const tokensPath = + process.env.HASURA_TOKENS_FILE || + process.env.TOKENS_FILE || + process.env.HASURA_CONFIG_FILE || + '/app/tokens/hasura.json'; + const tokens = readJsonFile(tokensPath) || {}; + + const hasuraUrl = + process.env.HASURA_GRAPHQL_URL || + tokens.graphqlUrl || + tokens.apiUrl || + 'http://hasura:8080/v1/graphql'; + const hasuraAdminSecret = + process.env.HASURA_ADMIN_SECRET || + process.env.HASURA_GRAPHQL_ADMIN_SECRET || + tokens.adminSecret || + tokens.hasuraAdminSecret; + const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); - const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000); + const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000); - return { hasuraUrl, hasuraAdminSecret, markets, pollMs }; + return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs }; } async function graphqlRequest(cfg, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (cfg.hasuraAuthToken) { + headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; + } else if (cfg.hasuraAdminSecret) { + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } else { + throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)'); + } + const res = await fetch(cfg.hasuraUrl, { method: 'POST', - headers: { - 'content-type': 'application/json', - 'x-hasura-admin-secret': cfg.hasuraAdminSecret, - }, + headers, body: JSON.stringify({ query, variables }), signal: AbortSignal.timeout(15_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); const json = JSON.parse(text); - if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + if (json.errors?.length) { + throw new Error(json.errors.map((e) => e.message).join(' | ')); + } return json.data; } @@ -63,6 +96,7 @@ async function main() { service: 'dlob-ts-archiver', startedAt: getIsoNow(), hasuraUrl: cfg.hasuraUrl, + hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', markets: cfg.markets, pollMs: cfg.pollMs, }, @@ -93,6 +127,11 @@ async function main() { mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct raw } + dlob_slippage_latest_v2(where: { market_name: { _in: $markets } }) { + market_name side size_usd market_type market_index ts slot + mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct + raw + } } `; @@ -160,28 +199,41 @@ async function main() { raw: r.raw ?? null, })); - if (!statsRows.length && !depthRows.length && !slippageRows.length) { - await sleep(cfg.pollMs); - continue; - } + const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({ + ts: now, + market_name: r.market_name, + side: r.side, + size_usd: r.size_usd, + market_type: r.market_type, + market_index: r.market_index ?? null, + source_ts: mapBigint(r.ts), + slot: mapBigint(r.slot), + mid_price: r.mid_price ?? null, + vwap_price: r.vwap_price ?? null, + worst_price: r.worst_price ?? null, + filled_usd: r.filled_usd ?? null, + filled_base: r.filled_base ?? null, + impact_bps: r.impact_bps ?? null, + levels_consumed: r.levels_consumed ?? null, + fill_pct: r.fill_pct ?? null, + raw: r.raw ?? null, + })); const mutation = ` mutation InsertTs( $stats: [dlob_stats_ts_insert_input!]! $depth: [dlob_depth_bps_ts_insert_input!]! $slip: [dlob_slippage_ts_insert_input!]! + $slipV2: [dlob_slippage_ts_v2_insert_input!]! ) { insert_dlob_stats_ts(objects: $stats) { affected_rows } insert_dlob_depth_bps_ts(objects: $depth) { affected_rows } insert_dlob_slippage_ts(objects: $slip) { affected_rows } + insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows } } `; - await graphqlRequest(cfg, mutation, { - stats: statsRows, - depth: depthRows, - slip: slippageRows, - }); + await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 }); } catch (err) { console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`); } @@ -194,4 +246,3 @@ main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; }); - diff --git a/kustomize/base/hasura/hasura-bootstrap.mjs b/kustomize/base/hasura/hasura-bootstrap.mjs index c8fd883..25488f4 100644 --- a/kustomize/base/hasura/hasura-bootstrap.mjs +++ b/kustomize/base/hasura/hasura-bootstrap.mjs @@ -97,9 +97,12 @@ async function main() { const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' }; const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' }; const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' }; + const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' }; + const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' }; const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' }; const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' }; const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' }; + const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' }; const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' }; const candlesReturnTable = { schema: 'public', name: 'drift_candles' }; @@ -169,6 +172,21 @@ async function main() { await ensureTickTable(t); } + // Cached candles table (precomputed by worker; public read). + await ensurePublicSelectTable(candlesCacheTable, [ + 'bucket', + 'bucket_seconds', + 'symbol', + 'source', + 'open', + 'high', + 'low', + 'close', + 'oracle_close', + 'ticks', + 'updated_at', + ]); + const ensureDlobTable = async (table, columns) => { await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); await metadata({ type: 'pg_track_table', args: { source, table } }); @@ -321,6 +339,28 @@ async function main() { 'updated_at', ]); + await ensurePublicSelectTable(dlobSlippageLatestV2Table, [ + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + 'updated_at', + ]); + await ensurePublicSelectTable(dlobStatsTsTable, [ 'ts', 'id', @@ -386,6 +426,27 @@ async function main() { 'raw', ]); + await ensurePublicSelectTable(dlobSlippageTsV2Table, [ + 'ts', + 'id', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + ]); + // Return table type for candle functions (needed for Hasura to track the function). await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } }); diff --git a/kustomize/base/initdb/001_init.sql b/kustomize/base/initdb/001_init.sql index be42a45..7f601b1 100644 --- a/kustomize/base/initdb/001_init.sql +++ b/kustomize/base/initdb/001_init.sql @@ -125,6 +125,29 @@ CREATE TABLE IF NOT EXISTS public.drift_candles ( ticks bigint ); +-- Precomputed candle cache (materialized by a worker). +-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand. +-- NOTE: `source=''` means "any source" (no source filter). +CREATE TABLE IF NOT EXISTS public.drift_candles_cache ( + bucket timestamptz NOT NULL, + bucket_seconds integer NOT NULL, + symbol text NOT NULL, + source text NOT NULL DEFAULT '', + open numeric NOT NULL, + high numeric NOT NULL, + low numeric NOT NULL, + close numeric NOT NULL, + oracle_close numeric, + ticks bigint NOT NULL DEFAULT 0, + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (bucket, bucket_seconds, symbol, source) +); + +SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx + ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC); + -- If an older version of the function exists with an incompatible return type, -- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent). DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text); @@ -272,6 +295,38 @@ CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx ON public.dlob_slippage_latest (market_name); +-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side. +-- Keep v1 intact for backward compatibility and to avoid data loss. +CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 ( + market_name TEXT NOT NULL, + side TEXT NOT NULL, -- buy|sell + size_usd NUMERIC NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (market_name, side, size_usd), + CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell')) +); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx + ON public.dlob_slippage_latest_v2 (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx + ON public.dlob_slippage_latest_v2 (market_name); + -- Time-series tables for UI history (start: 7 days). -- Keep these append-only; use Timescale hypertables. @@ -358,6 +413,33 @@ SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrat CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx ON public.dlob_slippage_ts (market_name, ts DESC); +CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_name TEXT NOT NULL, + side TEXT NOT NULL, + size_usd NUMERIC NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx + ON public.dlob_slippage_ts_v2 (market_name, ts DESC); + -- Retention policies (best-effort; safe if Timescale is present). DO $$ BEGIN @@ -375,3 +457,8 @@ BEGIN PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days'); EXCEPTION WHEN OTHERS THEN END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; diff --git a/kustomize/base/kustomization.yaml b/kustomize/base/kustomization.yaml index 154d489..35586f0 100644 --- a/kustomize/base/kustomization.yaml +++ b/kustomize/base/kustomization.yaml @@ -22,6 +22,7 @@ resources: - dlob-depth-worker/deployment.yaml - dlob-slippage-worker/deployment.yaml - dlob-ts-archiver/deployment.yaml + - candles-cache-worker/deployment.yaml configMapGenerator: - name: postgres-initdb @@ -42,9 +43,15 @@ configMapGenerator: - name: dlob-ts-archiver-script files: - dlob-ts-archiver/worker.mjs + - name: candles-cache-worker-script + files: + - candles-cache-worker/worker.mjs - name: trade-api-wrapper files: - api/wrapper.mjs + - name: trade-api-upstream + files: + - api/server.mjs generatorOptions: disableNameSuffixHash: true