Compare commits

..

3 Commits

Author SHA1 Message Date
codex
b46d7d85c6 chore(staging): add candles-cache-worker deployment 2026-02-01 18:13:24 +01:00
codex
e5543f408a feat(staging): add candles-cache-worker and api override 2026-02-01 18:12:26 +01:00
codex
b239f564b2 feat(staging): add candles cache + v2 slippage 2026-02-01 18:12:15 +01:00
10 changed files with 2579 additions and 294 deletions

View File

@@ -18,6 +18,9 @@ spec:
- name: trade-api-wrapper - name: trade-api-wrapper
configMap: configMap:
name: trade-api-wrapper name: trade-api-wrapper
- name: trade-api-upstream
configMap:
name: trade-api-upstream
containers: containers:
- name: api - name: api
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435 image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
@@ -31,6 +34,10 @@ spec:
mountPath: /override/wrapper.mjs mountPath: /override/wrapper.mjs
subPath: wrapper.mjs subPath: wrapper.mjs
readOnly: true readOnly: true
- name: trade-api-upstream
mountPath: /app/services/api/server.mjs
subPath: server.mjs
readOnly: true
env: env:
- name: PORT - name: PORT
value: "8787" value: "8787"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: candles-cache-worker
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: candles-cache-worker
template:
metadata:
labels:
app.kubernetes.io/name: candles-cache-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: CANDLES_SYMBOLS
value: SOL-PERP,PUMP-PERP
- name: CANDLES_SOURCES
value: any
- name: CANDLES_TFS
value: 1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d
- name: CANDLES_TARGET_POINTS
value: "1024"
- name: CANDLES_POLL_MS
value: "5000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: candles-cache-worker-script

View File

@@ -0,0 +1,344 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function parseTimeframeToSeconds(tf) {
const v = String(tf || '').trim().toLowerCase();
if (!v) return 60;
const m = v.match(/^(\d+)(s|m|h|d)$/);
if (!m) throw new Error('invalid_tf');
const num = Number.parseInt(m[1], 10);
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
const unit = m[2];
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
return num * mult;
}
function sqlLit(value) {
const s = String(value ?? '');
return `'${s.replace(/'/g, "''")}'`;
}
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
const u = String(graphqlUrlOrBase || '').trim();
if (!u) return 'http://hasura:8080';
// common case: http://hasura:8080/v1/graphql
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
return u.replace(/\/$/, '');
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const graphqlUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
// sources: include "any" (sourceKey='') by default.
const rawSources = envList('CANDLES_SOURCES', 'any');
const sources = [];
for (const s of rawSources) {
const v = String(s).trim();
if (!v) continue;
if (v.toLowerCase() === 'any') sources.push('');
else sources.push(v);
}
if (!sources.includes('')) sources.unshift('');
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
const bucketSecondsList = tfList
.map((t) => {
try {
return parseTimeframeToSeconds(t);
} catch {
return null;
}
})
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
}
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
const url = `${cfg.hasuraBaseUrl}/v2/query`;
const body = {
type: 'run_sql',
args: {
source: 'default',
sql,
read_only: Boolean(readOnly),
},
};
const res = await fetch(url, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
return JSON.parse(text);
}
function chunkSecondsForBucket(bucketSeconds) {
if (bucketSeconds <= 5) return 15 * 60;
if (bucketSeconds <= 60) return 60 * 60;
if (bucketSeconds <= 300) return 6 * 60 * 60;
if (bucketSeconds <= 3600) return 24 * 60 * 60;
return 7 * 24 * 60 * 60;
}
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
return `
WITH base AS (
SELECT
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
AND ts >= ${sqlLit(fromIso)}::timestamptz
AND ts < ${sqlLit(toIso)}::timestamptz
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)})
),
agg AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*)::bigint AS ticks
FROM base
GROUP BY bucket
)
INSERT INTO public.drift_candles_cache
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at)
SELECT
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now()
FROM agg
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
oracle_close = EXCLUDED.oracle_close,
ticks = EXCLUDED.ticks,
updated_at = now();
`;
}
async function getTickRange(cfg, { symbol, sourceKey }) {
const sql = `
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)});
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
if (!row) return { minTs: null, maxTs: null };
const minTs = row[0] ? String(row[0]).trim() : null;
const maxTs = row[1] ? String(row[1]).trim() : null;
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
}
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
const endMs = Date.parse(maxTsIso);
const minMs = minTsIso ? Date.parse(minTsIso) : null;
const wantSpanMs = targetPoints * bucketSeconds * 1000;
const wantFromMs = endMs - wantSpanMs;
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
let fromMs = wantFromMs;
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
if (minMs != null) fromMs = Math.max(fromMs, minMs);
return new Date(Math.max(0, fromMs)).toISOString();
}
function safetyWindowSeconds(bucketSeconds) {
if (bucketSeconds <= 60) return 10 * 60;
if (bucketSeconds <= 300) return 60 * 60;
if (bucketSeconds <= 3600) return 6 * 60 * 60;
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
return 2 * 24 * 60 * 60;
}
async function backfill(cfg, { symbol, sourceKey, fromIso, toIso }) {
for (const bs of cfg.bucketSecondsList) {
const chunk = chunkSecondsForBucket(bs);
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
const a = new Date(t).toISOString();
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: a, toIso: b }));
}
}
}
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
const sql = `
SELECT max(bucket) AS max_bucket
FROM public.drift_candles_cache
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
// Hasura returns {result_type, result:[[col...],[row...]]}
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
const v = row && row[0] ? String(row[0]) : null;
return v && v.trim() ? v.trim() : null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'candles-cache-worker',
startedAt: getIsoNow(),
hasuraBaseUrl: cfg.hasuraBaseUrl,
symbols: cfg.symbols,
sources: cfg.sources.map((s) => (s ? s : '(any)')),
bucketSecondsList: cfg.bucketSecondsList,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
pollMs: cfg.pollMs,
},
null,
2
)
);
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
const range = await getTickRange(cfg, { symbol, sourceKey });
if (!range.maxTs) continue;
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
const maxBs = cfg.bucketSecondsList[cfg.bucketSecondsList.length - 1] || 60;
const fromIso = desiredFromIso({
minTsIso: range.minTs,
maxTsIso: toIso,
bucketSeconds: maxBs,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
});
console.log(
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
);
try {
await backfill(cfg, { symbol, sourceKey, fromIso, toIso });
} catch (err) {
console.error(`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}): ${String(err?.message || err)}`);
}
}
}
// Prime last buckets.
const last = new Map(); // key -> iso bucket
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
try {
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
if (maxBucket) last.set(k, maxBucket);
} catch {
// ignore
}
}
}
}
while (true) {
const loopNow = Date.now();
const loopIso = new Date(loopNow).toISOString();
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
const prev = last.get(k);
const safety = safetyWindowSeconds(bs);
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
try {
await hasuraRunSql(
cfg,
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
);
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
last.set(k, loopIso);
} catch (err) {
console.error(
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -31,7 +31,7 @@ spec:
- name: DLOB_POLL_MS - name: DLOB_POLL_MS
value: "1000" value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD - name: DLOB_SLIPPAGE_SIZES_USD
value: "10,25,50,100,250,500,1000,5000,10000,50000" value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
- name: PRICE_PRECISION - name: PRICE_PRECISION
value: "1000000" value: "1000000"
- name: BASE_PRECISION - name: BASE_PRECISION

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process'; import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises'; import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() { function getIsoNow() {
return new Date().toISOString(); return new Date().toISOString();
} }
@@ -19,14 +29,87 @@ function envList(name, fallbackCsv) {
.filter(Boolean); .filter(Boolean);
} }
function envIntList(name, fallbackCsv) { function parsePositiveNumber(value) {
const out = []; const n = Number.parseFloat(String(value ?? '').trim());
for (const item of envList(name, fallbackCsv)) { if (!Number.isFinite(n) || !(n > 0)) return null;
const n = Number.parseInt(item, 10); return n;
if (!Number.isFinite(n)) continue;
out.push(n);
} }
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
.map(parsePositiveNumber)
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
sizesUsdInt,
depthLevels,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
} }
function toNumberOrNull(value) { function toNumberOrNull(value) {
@@ -41,167 +124,107 @@ function toNumberOrNull(value) {
return null; return null;
} }
function numStr(value) { function normalizeLevels(raw) {
if (value == null) return null; if (raw == null) return [];
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; if (Array.isArray(raw)) return raw;
if (typeof value === 'string') return value.trim() || null; if (typeof raw === 'string') {
return null; const s = raw.trim();
} if (!s) return [];
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
try { try {
return JSON.parse(s); const v = JSON.parse(s);
return Array.isArray(v) ? v : [];
} catch { } catch {
return value; return [];
} }
} }
return [];
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
pricePrecision,
basePrecision,
};
} }
async function graphqlRequest(cfg, query, variables) { function parseScaledLevels(raw, pricePrecision, basePrecision) {
const headers = { 'content-type': 'application/json' }; const levels = normalizeLevels(raw);
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
const out = []; const out = [];
for (const item of v) { for (const it of levels) {
const priceInt = toNumberOrNull(item?.price); const priceInt = toNumberOrNull(it?.price);
const sizeInt = toNumberOrNull(item?.size); const sizeInt = toNumberOrNull(it?.size);
if (priceInt == null || sizeInt == null) continue; if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision; const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision; const base = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue; if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
out.push({ price, size }); out.push({ price, base });
} }
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out; return out;
} }
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) { function simulateFill(levels, sizeUsd) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function simulateBuy(asks, mid, sizeUsd) {
let remainingUsd = sizeUsd; let remainingUsd = sizeUsd;
let filledUsd = 0; let filledUsd = 0;
let filledBase = 0; let filledBase = 0;
let totalQuoteUsd = 0;
let worstPrice = null; let worstPrice = null;
let levelsConsumed = 0; let levelsConsumed = 0;
for (const lvl of asks) { for (const l of levels) {
if (!(remainingUsd > 0)) break; if (remainingUsd <= 0) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue; const levelUsd = l.base * l.price;
if (levelUsd <= 0) continue;
const maxBase = remainingUsd / lvl.price;
const takeBase = Math.min(lvl.size, maxBase);
if (!(takeBase > 0)) continue;
const cost = takeBase * lvl.price;
filledUsd += cost;
filledBase += takeBase;
remainingUsd -= cost;
worstPrice = lvl.price;
levelsConsumed += 1; levelsConsumed += 1;
} worstPrice = l.price;
const vwap = filledBase > 0 ? filledUsd / filledBase : null; const takeUsd = Math.min(remainingUsd, levelUsd);
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null; const takeBase = takeUsd / l.price;
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct }; remainingUsd -= takeUsd;
} filledUsd += takeUsd;
function simulateSell(bids, mid, sizeUsd) {
if (mid == null || !(mid > 0)) {
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
}
const baseTarget = sizeUsd / mid;
let remainingBase = baseTarget;
let proceedsUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of bids) {
if (!(remainingBase > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const takeBase = Math.min(lvl.size, remainingBase);
if (!(takeBase > 0)) continue;
const proceeds = takeBase * lvl.price;
proceedsUsd += proceeds;
filledBase += takeBase; filledBase += takeBase;
remainingBase -= takeBase; totalQuoteUsd += takeUsd;
worstPrice = lvl.price;
levelsConsumed += 1;
} }
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null; const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null; const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct }; return {
filledUsd,
filledBase,
vwapPrice,
worstPrice,
levelsConsumed,
fillPct,
};
} }
async function fetchL2Latest(cfg) { function impactBps({ side, mid, vwap }) {
if (mid == null || vwap == null || mid <= 0) return null;
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
return null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
depthLevels: cfg.depthLevels,
},
null,
2
)
);
const lastSeenUpdatedAt = new Map(); // market -> updated_at
while (true) {
const updatedAt = getIsoNow();
try {
const query = ` const query = `
query DlobL2Latest($markets: [String!]!) { query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: { market_name: { _in: $markets } }) { dlob_l2_latest(where: { market_name: { _in: $markets } }) {
@@ -210,8 +233,6 @@ async function fetchL2Latest(cfg) {
market_index market_index
ts ts
slot slot
mark_price
oracle_price
best_bid_price best_bid_price
best_ask_price best_ask_price
bids bids
@@ -220,14 +241,93 @@ async function fetchL2Latest(cfg) {
} }
} }
`; `;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const objectsV1 = [];
const objectsV2 = [];
for (const row of rows) {
const market = String(row?.market_name || '').trim();
if (!market) continue;
const rowUpdatedAt = row?.updated_at ?? null;
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
const bestBid = toNumberOrNull(row?.best_bid_price);
const bestAsk = toNumberOrNull(row?.best_ask_price);
if (bestBid == null || bestAsk == null) continue;
const mid = (bestBid + bestAsk) / 2;
if (!Number.isFinite(mid) || mid <= 0) continue;
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => b.price - a.price)
.slice(0, cfg.depthLevels);
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => a.price - b.price)
.slice(0, cfg.depthLevels);
for (const sizeUsd of cfg.sizesUsd) {
// buy consumes asks (worse prices as you go up)
{
const sim = simulateFill(asks, sizeUsd);
const baseObj = {
market_name: market,
side: 'buy',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
} }
async function upsertSlippage(cfg, rows) { // sell consumes bids (worse prices as you go down)
if (!rows.length) return; {
const sim = simulateFill(bids, sizeUsd);
const baseObj = {
market_name: market,
side: 'sell',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
}
}
if (objectsV1.length) {
const mutation = ` const mutation = `
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) { mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest( insert_dlob_slippage_latest(
objects: $rows objects: $rows
on_conflict: { on_conflict: {
@@ -238,8 +338,6 @@ async function upsertSlippage(cfg, rows) {
ts ts
slot slot
mid_price mid_price
best_bid_price
best_ask_price
vwap_price vwap_price
worst_price worst_price
filled_usd filled_usd
@@ -254,119 +352,40 @@ async function upsertSlippage(cfg, rows) {
) { affected_rows } ) { affected_rows }
} }
`; `;
await graphqlRequest(cfg, mutation, { rows }); await graphqlRequest(cfg, mutation, { rows: objectsV1 });
} }
async function main() { if (objectsV2.length) {
const cfg = resolveConfig(); const mutation = `
const lastUpdatedAtByMarket = new Map(); mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
insert_dlob_slippage_latest_v2(
console.log( objects: $rows
JSON.stringify( on_conflict: {
{ constraint: dlob_slippage_latest_v2_pkey
service: 'dlob-slippage-worker', update_columns: [
startedAt: getIsoNow(), market_type
hasuraUrl: cfg.hasuraUrl, market_index
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', ts
markets: cfg.markets, slot
pollMs: cfg.pollMs, mid_price
sizesUsd: cfg.sizesUsd, vwap_price
pricePrecision: cfg.pricePrecision, worst_price
basePrecision: cfg.basePrecision, filled_usd
}, filled_base
null, impact_bps
2 levels_consumed
) fill_pct
); raw
updated_at
while (true) { ]
const rows = [];
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
for (const sizeUsd of cfg.sizesUsd) {
const buy = simulateBuy(asks, mid, sizeUsd);
rows.push({
market_name: market,
side: 'buy',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(buy.vwap),
worst_price: numStr(buy.worstPrice),
filled_usd: numStr(buy.filledUsd),
filled_base: numStr(buy.filledBase),
impact_bps: numStr(buy.impactBps),
levels_consumed: buy.levelsConsumed,
fill_pct: numStr(buy.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
const sell = simulateSell(bids, mid, sizeUsd);
rows.push({
market_name: market,
side: 'sell',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(sell.vwap),
worst_price: numStr(sell.worstPrice),
filled_usd: numStr(sell.filledUsd),
filled_base: numStr(sell.filledBase),
impact_bps: numStr(sell.impactBps),
levels_consumed: sell.levelsConsumed,
fill_pct: numStr(sell.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
} }
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
} }
} catch (err) { } catch (err) {
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`); console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
}
try {
await upsertSlippage(cfg, rows);
} catch (err) {
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
} }
await sleep(cfg.pollMs); await sleep(cfg.pollMs);

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process'; import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises'; import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() { function getIsoNow() {
return new Date().toISOString(); return new Date().toISOString();
} }
@@ -20,30 +30,53 @@ function envList(name, fallbackCsv) {
} }
function resolveConfig() { function resolveConfig() {
const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim(); const tokensPath =
const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim(); process.env.HASURA_TOKENS_FILE ||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000); const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
return { hasuraUrl, hasuraAdminSecret, markets, pollMs }; return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs };
} }
async function graphqlRequest(cfg, query, variables) { async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, { const res = await fetch(cfg.hasuraUrl, {
method: 'POST', method: 'POST',
headers: { headers,
'content-type': 'application/json',
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
},
body: JSON.stringify({ query, variables }), body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000), signal: AbortSignal.timeout(15_000),
}); });
const text = await res.text(); const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text); const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data; return json.data;
} }
@@ -63,6 +96,7 @@ async function main() {
service: 'dlob-ts-archiver', service: 'dlob-ts-archiver',
startedAt: getIsoNow(), startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl, hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets, markets: cfg.markets,
pollMs: cfg.pollMs, pollMs: cfg.pollMs,
}, },
@@ -93,6 +127,11 @@ async function main() {
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw raw
} }
dlob_slippage_latest_v2(where: { market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
} }
`; `;
@@ -160,28 +199,41 @@ async function main() {
raw: r.raw ?? null, raw: r.raw ?? null,
})); }));
if (!statsRows.length && !depthRows.length && !slippageRows.length) { const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
await sleep(cfg.pollMs); ts: now,
continue; market_name: r.market_name,
} side: r.side,
size_usd: r.size_usd,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
vwap_price: r.vwap_price ?? null,
worst_price: r.worst_price ?? null,
filled_usd: r.filled_usd ?? null,
filled_base: r.filled_base ?? null,
impact_bps: r.impact_bps ?? null,
levels_consumed: r.levels_consumed ?? null,
fill_pct: r.fill_pct ?? null,
raw: r.raw ?? null,
}));
const mutation = ` const mutation = `
mutation InsertTs( mutation InsertTs(
$stats: [dlob_stats_ts_insert_input!]! $stats: [dlob_stats_ts_insert_input!]!
$depth: [dlob_depth_bps_ts_insert_input!]! $depth: [dlob_depth_bps_ts_insert_input!]!
$slip: [dlob_slippage_ts_insert_input!]! $slip: [dlob_slippage_ts_insert_input!]!
$slipV2: [dlob_slippage_ts_v2_insert_input!]!
) { ) {
insert_dlob_stats_ts(objects: $stats) { affected_rows } insert_dlob_stats_ts(objects: $stats) { affected_rows }
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows } insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
insert_dlob_slippage_ts(objects: $slip) { affected_rows } insert_dlob_slippage_ts(objects: $slip) { affected_rows }
insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows }
} }
`; `;
await graphqlRequest(cfg, mutation, { await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 });
stats: statsRows,
depth: depthRows,
slip: slippageRows,
});
} catch (err) { } catch (err) {
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`); console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
} }
@@ -194,4 +246,3 @@ main().catch((err) => {
console.error(String(err?.stack || err)); console.error(String(err?.stack || err));
process.exitCode = 1; process.exitCode = 1;
}); });

View File

@@ -97,9 +97,12 @@ async function main() {
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' }; const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' }; const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' }; const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' };
const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' };
const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' }; const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' };
const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' }; const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' };
const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' }; const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' };
const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' };
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' }; const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
const candlesReturnTable = { schema: 'public', name: 'drift_candles' }; const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
@@ -169,6 +172,21 @@ async function main() {
await ensureTickTable(t); await ensureTickTable(t);
} }
// Cached candles table (precomputed by worker; public read).
await ensurePublicSelectTable(candlesCacheTable, [
'bucket',
'bucket_seconds',
'symbol',
'source',
'open',
'high',
'low',
'close',
'oracle_close',
'ticks',
'updated_at',
]);
const ensureDlobTable = async (table, columns) => { const ensureDlobTable = async (table, columns) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } }); await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -321,6 +339,28 @@ async function main() {
'updated_at', 'updated_at',
]); ]);
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
]);
await ensurePublicSelectTable(dlobStatsTsTable, [ await ensurePublicSelectTable(dlobStatsTsTable, [
'ts', 'ts',
'id', 'id',
@@ -386,6 +426,27 @@ async function main() {
'raw', 'raw',
]); ]);
await ensurePublicSelectTable(dlobSlippageTsV2Table, [
'ts',
'id',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
]);
// Return table type for candle functions (needed for Hasura to track the function). // Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } }); await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });

View File

@@ -125,6 +125,29 @@ CREATE TABLE IF NOT EXISTS public.drift_candles (
ticks bigint ticks bigint
); );
-- Precomputed candle cache (materialized by a worker).
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
-- NOTE: `source=''` means "any source" (no source filter).
CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
bucket timestamptz NOT NULL,
bucket_seconds integer NOT NULL,
symbol text NOT NULL,
source text NOT NULL DEFAULT '',
open numeric NOT NULL,
high numeric NOT NULL,
low numeric NOT NULL,
close numeric NOT NULL,
oracle_close numeric,
ticks bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (bucket, bucket_seconds, symbol, source)
);
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC);
-- If an older version of the function exists with an incompatible return type, -- If an older version of the function exists with an incompatible return type,
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent). -- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text); DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
@@ -272,6 +295,38 @@ CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name); ON public.dlob_slippage_latest (market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name);
-- Time-series tables for UI history (start: 7 days). -- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables. -- Keep these append-only; use Timescale hypertables.
@@ -358,6 +413,33 @@ SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrat
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC); ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present). -- Retention policies (best-effort; safe if Timescale is present).
DO $$ DO $$
BEGIN BEGIN
@@ -375,3 +457,8 @@ BEGIN
PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days'); PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN EXCEPTION WHEN OTHERS THEN
END $$; END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;

View File

@@ -22,6 +22,7 @@ resources:
- dlob-depth-worker/deployment.yaml - dlob-depth-worker/deployment.yaml
- dlob-slippage-worker/deployment.yaml - dlob-slippage-worker/deployment.yaml
- dlob-ts-archiver/deployment.yaml - dlob-ts-archiver/deployment.yaml
- candles-cache-worker/deployment.yaml
configMapGenerator: configMapGenerator:
- name: postgres-initdb - name: postgres-initdb
@@ -42,9 +43,15 @@ configMapGenerator:
- name: dlob-ts-archiver-script - name: dlob-ts-archiver-script
files: files:
- dlob-ts-archiver/worker.mjs - dlob-ts-archiver/worker.mjs
- name: candles-cache-worker-script
files:
- candles-cache-worker/worker.mjs
- name: trade-api-wrapper - name: trade-api-wrapper
files: files:
- api/wrapper.mjs - api/wrapper.mjs
- name: trade-api-upstream
files:
- api/server.mjs
generatorOptions: generatorOptions:
disableNameSuffixHash: true disableNameSuffixHash: true