diff --git a/kustomize/base/dlob-ts-archiver/deployment.yaml b/kustomize/base/dlob-ts-archiver/deployment.yaml new file mode 100644 index 0000000..7a74a67 --- /dev/null +++ b/kustomize/base/dlob-ts-archiver/deployment.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dlob-ts-archiver + annotations: + argocd.argoproj.io/sync-wave: "6" +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: dlob-ts-archiver + template: + metadata: + labels: + app.kubernetes.io/name: dlob-ts-archiver + spec: + containers: + - name: worker + image: node:20-slim + imagePullPolicy: IfNotPresent + env: + - name: HASURA_GRAPHQL_URL + value: http://hasura:8080/v1/graphql + - name: HASURA_ADMIN_SECRET + valueFrom: + secretKeyRef: + name: trade-hasura + key: HASURA_GRAPHQL_ADMIN_SECRET + - name: DLOB_MARKETS + value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP + - name: DLOB_TS_POLL_MS + value: "1000" + command: ["node", "/app/worker.mjs"] + volumeMounts: + - name: script + mountPath: /app/worker.mjs + subPath: worker.mjs + readOnly: true + volumes: + - name: script + configMap: + name: dlob-ts-archiver-script diff --git a/kustomize/base/dlob-ts-archiver/worker.mjs b/kustomize/base/dlob-ts-archiver/worker.mjs new file mode 100644 index 0000000..498a16b --- /dev/null +++ b/kustomize/base/dlob-ts-archiver/worker.mjs @@ -0,0 +1,197 @@ +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function getIsoNow() { + return new Date().toISOString(); +} + +function clampInt(value, min, max, fallback) { + const n = Number.parseInt(String(value ?? ''), 10); + if (!Number.isInteger(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function resolveConfig() { + const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim(); + const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim(); + if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); + const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000); + + return { hasuraUrl, hasuraAdminSecret, markets, pollMs }; +} + +async function graphqlRequest(cfg, query, variables) { + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-hasura-admin-secret': cfg.hasuraAdminSecret, + }, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(15_000), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = JSON.parse(text); + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +function mapBigint(v) { + if (v == null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? String(Math.trunc(v)) : null; + if (typeof v === 'string') return v.trim() || null; + return null; +} + +async function main() { + const cfg = resolveConfig(); + + console.log( + JSON.stringify( + { + service: 'dlob-ts-archiver', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + markets: cfg.markets, + pollMs: cfg.pollMs, + }, + null, + 2 + ) + ); + + while (true) { + const now = getIsoNow(); + + try { + const query = ` + query Latest($markets: [String!]!) { + dlob_stats_latest(where: { market_name: { _in: $markets } }) { + market_name market_type market_index ts slot + mark_price oracle_price best_bid_price best_ask_price mid_price + spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance + raw + } + dlob_depth_bps_latest(where: { market_name: { _in: $markets } }) { + market_name band_bps market_type market_index ts slot + mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance + raw + } + dlob_slippage_latest(where: { market_name: { _in: $markets } }) { + market_name side size_usd market_type market_index ts slot + mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct + raw + } + } + `; + + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + + const statsRows = (data?.dlob_stats_latest || []).map((r) => ({ + ts: now, + market_name: r.market_name, + market_type: r.market_type, + market_index: r.market_index ?? null, + source_ts: mapBigint(r.ts), + slot: mapBigint(r.slot), + mark_price: r.mark_price ?? null, + oracle_price: r.oracle_price ?? null, + best_bid_price: r.best_bid_price ?? null, + best_ask_price: r.best_ask_price ?? null, + mid_price: r.mid_price ?? null, + spread_abs: r.spread_abs ?? null, + spread_bps: r.spread_bps ?? null, + depth_levels: r.depth_levels ?? null, + depth_bid_base: r.depth_bid_base ?? null, + depth_ask_base: r.depth_ask_base ?? null, + depth_bid_usd: r.depth_bid_usd ?? null, + depth_ask_usd: r.depth_ask_usd ?? null, + imbalance: r.imbalance ?? null, + raw: r.raw ?? null, + })); + + const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({ + ts: now, + market_name: r.market_name, + band_bps: r.band_bps, + market_type: r.market_type, + market_index: r.market_index ?? null, + source_ts: mapBigint(r.ts), + slot: mapBigint(r.slot), + mid_price: r.mid_price ?? null, + best_bid_price: r.best_bid_price ?? null, + best_ask_price: r.best_ask_price ?? null, + bid_base: r.bid_base ?? null, + ask_base: r.ask_base ?? null, + bid_usd: r.bid_usd ?? null, + ask_usd: r.ask_usd ?? null, + imbalance: r.imbalance ?? null, + raw: r.raw ?? null, + })); + + const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({ + ts: now, + market_name: r.market_name, + side: r.side, + size_usd: r.size_usd, + market_type: r.market_type, + market_index: r.market_index ?? null, + source_ts: mapBigint(r.ts), + slot: mapBigint(r.slot), + mid_price: r.mid_price ?? null, + vwap_price: r.vwap_price ?? null, + worst_price: r.worst_price ?? null, + filled_usd: r.filled_usd ?? null, + filled_base: r.filled_base ?? null, + impact_bps: r.impact_bps ?? null, + levels_consumed: r.levels_consumed ?? null, + fill_pct: r.fill_pct ?? null, + raw: r.raw ?? null, + })); + + if (!statsRows.length && !depthRows.length && !slippageRows.length) { + await sleep(cfg.pollMs); + continue; + } + + const mutation = ` + mutation InsertTs( + $stats: [dlob_stats_ts_insert_input!]! + $depth: [dlob_depth_bps_ts_insert_input!]! + $slip: [dlob_slippage_ts_insert_input!]! + ) { + insert_dlob_stats_ts(objects: $stats) { affected_rows } + insert_dlob_depth_bps_ts(objects: $depth) { affected_rows } + insert_dlob_slippage_ts(objects: $slip) { affected_rows } + } + `; + + await graphqlRequest(cfg, mutation, { + stats: statsRows, + depth: depthRows, + slip: slippageRows, + }); + } catch (err) { + console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +}); + diff --git a/kustomize/base/hasura/hasura-bootstrap.mjs b/kustomize/base/hasura/hasura-bootstrap.mjs index 3a881b6..c8fd883 100644 --- a/kustomize/base/hasura/hasura-bootstrap.mjs +++ b/kustomize/base/hasura/hasura-bootstrap.mjs @@ -97,6 +97,9 @@ async function main() { const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' }; const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' }; const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' }; + const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' }; + const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' }; + const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' }; const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' }; const candlesReturnTable = { schema: 'public', name: 'drift_candles' }; @@ -215,6 +218,29 @@ async function main() { }); }; + const ensurePublicSelectTable = async (table, columns) => { + await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); + await metadata({ type: 'pg_track_table', args: { source, table } }); + + await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } }); + await metadata({ + type: 'pg_create_select_permission', + args: { + source, + table, + role: 'public', + permission: { + columns, + filter: {}, + }, + }, + }); + + // Computed/archived tables are written by workers using admin secret; keep ingestor off by default. + await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } }); + await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } }); + }; + await ensureDlobTable(dlobL2LatestTable, [ 'market_name', 'market_type', @@ -254,7 +280,7 @@ async function main() { 'updated_at', ]); - await ensureDlobTable(dlobDepthBpsLatestTable, [ + await ensurePublicSelectTable(dlobDepthBpsLatestTable, [ 'market_name', 'band_bps', 'market_type', @@ -273,7 +299,7 @@ async function main() { 'updated_at', ]); - await ensureDlobTable(dlobSlippageLatestTable, [ + await ensurePublicSelectTable(dlobSlippageLatestTable, [ 'market_name', 'side', 'size_usd', @@ -295,6 +321,71 @@ async function main() { 'updated_at', ]); + await ensurePublicSelectTable(dlobStatsTsTable, [ + 'ts', + 'id', + 'market_name', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mark_price', + 'oracle_price', + 'best_bid_price', + 'best_ask_price', + 'mid_price', + 'spread_abs', + 'spread_bps', + 'depth_levels', + 'depth_bid_base', + 'depth_ask_base', + 'depth_bid_usd', + 'depth_ask_usd', + 'imbalance', + 'raw', + ]); + + await ensurePublicSelectTable(dlobDepthBpsTsTable, [ + 'ts', + 'id', + 'market_name', + 'band_bps', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'bid_base', + 'ask_base', + 'bid_usd', + 'ask_usd', + 'imbalance', + 'raw', + ]); + + await ensurePublicSelectTable(dlobSlippageTsTable, [ + 'ts', + 'id', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + ]); + // Return table type for candle functions (needed for Hasura to track the function). await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } }); diff --git a/kustomize/base/initdb/001_init.sql b/kustomize/base/initdb/001_init.sql index 3ae848b..be42a45 100644 --- a/kustomize/base/initdb/001_init.sql +++ b/kustomize/base/initdb/001_init.sql @@ -271,3 +271,107 @@ CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx ON public.dlob_slippage_latest (market_name); + +-- Time-series tables for UI history (start: 7 days). +-- Keep these append-only; use Timescale hypertables. + +CREATE TABLE IF NOT EXISTS public.dlob_stats_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_name TEXT NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mark_price NUMERIC, + oracle_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + mid_price NUMERIC, + spread_abs NUMERIC, + spread_bps NUMERIC, + depth_levels INTEGER, + depth_bid_base NUMERIC, + depth_ask_base NUMERIC, + depth_bid_usd NUMERIC, + depth_ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx + ON public.dlob_stats_ts (market_name, ts DESC); + +CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_name TEXT NOT NULL, + band_bps INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + bid_base NUMERIC, + ask_base NUMERIC, + bid_usd NUMERIC, + ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx + ON public.dlob_depth_bps_ts (market_name, ts DESC); + +CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_name TEXT NOT NULL, + side TEXT NOT NULL, + size_usd INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx + ON public.dlob_slippage_ts (market_name, ts DESC); + +-- Retention policies (best-effort; safe if Timescale is present). +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_stats_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN + -- ignore if policy exists or function unavailable +END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_depth_bps_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; diff --git a/kustomize/base/kustomization.yaml b/kustomize/base/kustomization.yaml index 20b6d10..1d6127d 100644 --- a/kustomize/base/kustomization.yaml +++ b/kustomize/base/kustomization.yaml @@ -21,6 +21,7 @@ resources: - dlob-worker/deployment.yaml - dlob-depth-worker/deployment.yaml - dlob-slippage-worker/deployment.yaml + - dlob-ts-archiver/deployment.yaml configMapGenerator: - name: postgres-initdb @@ -38,6 +39,9 @@ configMapGenerator: - name: dlob-slippage-worker-script files: - dlob-slippage-worker/worker.mjs + - name: dlob-ts-archiver-script + files: + - dlob-ts-archiver/worker.mjs generatorOptions: disableNameSuffixHash: true