Compare commits

..

3 Commits

Author SHA1 Message Date
codex
b46d7d85c6 chore(staging): add candles-cache-worker deployment 2026-02-01 18:13:24 +01:00
codex
e5543f408a feat(staging): add candles-cache-worker and api override 2026-02-01 18:12:26 +01:00
codex
b239f564b2 feat(staging): add candles cache + v2 slippage 2026-02-01 18:12:15 +01:00
10 changed files with 2579 additions and 294 deletions

View File

@@ -18,6 +18,9 @@ spec:
- name: trade-api-wrapper
configMap:
name: trade-api-wrapper
- name: trade-api-upstream
configMap:
name: trade-api-upstream
containers:
- name: api
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
@@ -31,6 +34,10 @@ spec:
mountPath: /override/wrapper.mjs
subPath: wrapper.mjs
readOnly: true
- name: trade-api-upstream
mountPath: /app/services/api/server.mjs
subPath: server.mjs
readOnly: true
env:
- name: PORT
value: "8787"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: candles-cache-worker
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: candles-cache-worker
template:
metadata:
labels:
app.kubernetes.io/name: candles-cache-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: CANDLES_SYMBOLS
value: SOL-PERP,PUMP-PERP
- name: CANDLES_SOURCES
value: any
- name: CANDLES_TFS
value: 1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d
- name: CANDLES_TARGET_POINTS
value: "1024"
- name: CANDLES_POLL_MS
value: "5000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: candles-cache-worker-script

View File

@@ -0,0 +1,344 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function parseTimeframeToSeconds(tf) {
const v = String(tf || '').trim().toLowerCase();
if (!v) return 60;
const m = v.match(/^(\d+)(s|m|h|d)$/);
if (!m) throw new Error('invalid_tf');
const num = Number.parseInt(m[1], 10);
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
const unit = m[2];
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
return num * mult;
}
function sqlLit(value) {
const s = String(value ?? '');
return `'${s.replace(/'/g, "''")}'`;
}
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
const u = String(graphqlUrlOrBase || '').trim();
if (!u) return 'http://hasura:8080';
// common case: http://hasura:8080/v1/graphql
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
return u.replace(/\/$/, '');
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const graphqlUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
// sources: include "any" (sourceKey='') by default.
const rawSources = envList('CANDLES_SOURCES', 'any');
const sources = [];
for (const s of rawSources) {
const v = String(s).trim();
if (!v) continue;
if (v.toLowerCase() === 'any') sources.push('');
else sources.push(v);
}
if (!sources.includes('')) sources.unshift('');
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
const bucketSecondsList = tfList
.map((t) => {
try {
return parseTimeframeToSeconds(t);
} catch {
return null;
}
})
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
}
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
const url = `${cfg.hasuraBaseUrl}/v2/query`;
const body = {
type: 'run_sql',
args: {
source: 'default',
sql,
read_only: Boolean(readOnly),
},
};
const res = await fetch(url, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
return JSON.parse(text);
}
function chunkSecondsForBucket(bucketSeconds) {
if (bucketSeconds <= 5) return 15 * 60;
if (bucketSeconds <= 60) return 60 * 60;
if (bucketSeconds <= 300) return 6 * 60 * 60;
if (bucketSeconds <= 3600) return 24 * 60 * 60;
return 7 * 24 * 60 * 60;
}
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
return `
WITH base AS (
SELECT
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
AND ts >= ${sqlLit(fromIso)}::timestamptz
AND ts < ${sqlLit(toIso)}::timestamptz
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)})
),
agg AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*)::bigint AS ticks
FROM base
GROUP BY bucket
)
INSERT INTO public.drift_candles_cache
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at)
SELECT
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now()
FROM agg
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
oracle_close = EXCLUDED.oracle_close,
ticks = EXCLUDED.ticks,
updated_at = now();
`;
}
async function getTickRange(cfg, { symbol, sourceKey }) {
const sql = `
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)});
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
if (!row) return { minTs: null, maxTs: null };
const minTs = row[0] ? String(row[0]).trim() : null;
const maxTs = row[1] ? String(row[1]).trim() : null;
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
}
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
const endMs = Date.parse(maxTsIso);
const minMs = minTsIso ? Date.parse(minTsIso) : null;
const wantSpanMs = targetPoints * bucketSeconds * 1000;
const wantFromMs = endMs - wantSpanMs;
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
let fromMs = wantFromMs;
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
if (minMs != null) fromMs = Math.max(fromMs, minMs);
return new Date(Math.max(0, fromMs)).toISOString();
}
function safetyWindowSeconds(bucketSeconds) {
if (bucketSeconds <= 60) return 10 * 60;
if (bucketSeconds <= 300) return 60 * 60;
if (bucketSeconds <= 3600) return 6 * 60 * 60;
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
return 2 * 24 * 60 * 60;
}
async function backfill(cfg, { symbol, sourceKey, fromIso, toIso }) {
for (const bs of cfg.bucketSecondsList) {
const chunk = chunkSecondsForBucket(bs);
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
const a = new Date(t).toISOString();
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: a, toIso: b }));
}
}
}
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
const sql = `
SELECT max(bucket) AS max_bucket
FROM public.drift_candles_cache
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
// Hasura returns {result_type, result:[[col...],[row...]]}
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
const v = row && row[0] ? String(row[0]) : null;
return v && v.trim() ? v.trim() : null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'candles-cache-worker',
startedAt: getIsoNow(),
hasuraBaseUrl: cfg.hasuraBaseUrl,
symbols: cfg.symbols,
sources: cfg.sources.map((s) => (s ? s : '(any)')),
bucketSecondsList: cfg.bucketSecondsList,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
pollMs: cfg.pollMs,
},
null,
2
)
);
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
const range = await getTickRange(cfg, { symbol, sourceKey });
if (!range.maxTs) continue;
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
const maxBs = cfg.bucketSecondsList[cfg.bucketSecondsList.length - 1] || 60;
const fromIso = desiredFromIso({
minTsIso: range.minTs,
maxTsIso: toIso,
bucketSeconds: maxBs,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
});
console.log(
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
);
try {
await backfill(cfg, { symbol, sourceKey, fromIso, toIso });
} catch (err) {
console.error(`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}): ${String(err?.message || err)}`);
}
}
}
// Prime last buckets.
const last = new Map(); // key -> iso bucket
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
try {
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
if (maxBucket) last.set(k, maxBucket);
} catch {
// ignore
}
}
}
}
while (true) {
const loopNow = Date.now();
const loopIso = new Date(loopNow).toISOString();
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
const prev = last.get(k);
const safety = safetyWindowSeconds(bs);
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
try {
await hasuraRunSql(
cfg,
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
);
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
last.set(k, loopIso);
} catch (err) {
console.error(
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -31,7 +31,7 @@ spec:
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD
value: "10,25,50,100,250,500,1000,5000,10000,50000"
value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
@@ -19,14 +29,87 @@ function envList(name, fallbackCsv) {
.filter(Boolean);
}
function envIntList(name, fallbackCsv) {
const out = [];
for (const item of envList(name, fallbackCsv)) {
const n = Number.parseInt(item, 10);
if (!Number.isFinite(n)) continue;
out.push(n);
function parsePositiveNumber(value) {
const n = Number.parseFloat(String(value ?? '').trim());
if (!Number.isFinite(n) || !(n > 0)) return null;
return n;
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
.map(parsePositiveNumber)
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
sizesUsdInt,
depthLevels,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
function toNumberOrNull(value) {
@@ -41,177 +124,115 @@ function toNumberOrNull(value) {
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
function normalizeLevels(raw) {
if (raw == null) return [];
if (Array.isArray(raw)) return raw;
if (typeof raw === 'string') {
const s = raw.trim();
if (!s) return [];
try {
return JSON.parse(s);
const v = JSON.parse(s);
return Array.isArray(v) ? v : [];
} catch {
return value;
return [];
}
}
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
return [];
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
function parseScaledLevels(raw, pricePrecision, basePrecision) {
const levels = normalizeLevels(raw);
const out = [];
for (const item of v) {
const priceInt = toNumberOrNull(item?.price);
const sizeInt = toNumberOrNull(item?.size);
for (const it of levels) {
const priceInt = toNumberOrNull(it?.price);
const sizeInt = toNumberOrNull(it?.size);
if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
out.push({ price, size });
const base = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
out.push({ price, base });
}
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out;
}
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function simulateBuy(asks, mid, sizeUsd) {
function simulateFill(levels, sizeUsd) {
let remainingUsd = sizeUsd;
let filledUsd = 0;
let filledBase = 0;
let totalQuoteUsd = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of asks) {
if (!(remainingUsd > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const maxBase = remainingUsd / lvl.price;
const takeBase = Math.min(lvl.size, maxBase);
if (!(takeBase > 0)) continue;
const cost = takeBase * lvl.price;
filledUsd += cost;
filledBase += takeBase;
remainingUsd -= cost;
worstPrice = lvl.price;
for (const l of levels) {
if (remainingUsd <= 0) break;
const levelUsd = l.base * l.price;
if (levelUsd <= 0) continue;
levelsConsumed += 1;
worstPrice = l.price;
const takeUsd = Math.min(remainingUsd, levelUsd);
const takeBase = takeUsd / l.price;
remainingUsd -= takeUsd;
filledUsd += takeUsd;
filledBase += takeBase;
totalQuoteUsd += takeUsd;
}
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
return {
filledUsd,
filledBase,
vwapPrice,
worstPrice,
levelsConsumed,
fillPct,
};
}
function simulateSell(bids, mid, sizeUsd) {
if (mid == null || !(mid > 0)) {
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
}
const baseTarget = sizeUsd / mid;
let remainingBase = baseTarget;
let proceedsUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of bids) {
if (!(remainingBase > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const takeBase = Math.min(lvl.size, remainingBase);
if (!(takeBase > 0)) continue;
const proceeds = takeBase * lvl.price;
proceedsUsd += proceeds;
filledBase += takeBase;
remainingBase -= takeBase;
worstPrice = lvl.price;
levelsConsumed += 1;
}
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
function impactBps({ side, mid, vwap }) {
if (mid == null || vwap == null || mid <= 0) return null;
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
return null;
}
async function fetchL2Latest(cfg) {
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
depthLevels: cfg.depthLevels,
},
null,
2
)
);
const lastSeenUpdatedAt = new Map(); // market -> updated_at
while (true) {
const updatedAt = getIsoNow();
try {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
dlob_l2_latest(where: { market_name: { _in: $markets } }) {
market_name
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
@@ -220,14 +241,93 @@ async function fetchL2Latest(cfg) {
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
}
async function upsertSlippage(cfg, rows) {
if (!rows.length) return;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const objectsV1 = [];
const objectsV2 = [];
for (const row of rows) {
const market = String(row?.market_name || '').trim();
if (!market) continue;
const rowUpdatedAt = row?.updated_at ?? null;
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
const bestBid = toNumberOrNull(row?.best_bid_price);
const bestAsk = toNumberOrNull(row?.best_ask_price);
if (bestBid == null || bestAsk == null) continue;
const mid = (bestBid + bestAsk) / 2;
if (!Number.isFinite(mid) || mid <= 0) continue;
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => b.price - a.price)
.slice(0, cfg.depthLevels);
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => a.price - b.price)
.slice(0, cfg.depthLevels);
for (const sizeUsd of cfg.sizesUsd) {
// buy consumes asks (worse prices as you go up)
{
const sim = simulateFill(asks, sizeUsd);
const baseObj = {
market_name: market,
side: 'buy',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
// sell consumes bids (worse prices as you go down)
{
const sim = simulateFill(bids, sizeUsd);
const baseObj = {
market_name: market,
side: 'sell',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
}
}
if (objectsV1.length) {
const mutation = `
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest(
objects: $rows
on_conflict: {
@@ -238,8 +338,6 @@ async function upsertSlippage(cfg, rows) {
ts
slot
mid_price
best_bid_price
best_ask_price
vwap_price
worst_price
filled_usd
@@ -254,119 +352,40 @@ async function upsertSlippage(cfg, rows) {
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows });
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
null,
2
)
);
while (true) {
const rows = [];
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
for (const sizeUsd of cfg.sizesUsd) {
const buy = simulateBuy(asks, mid, sizeUsd);
rows.push({
market_name: market,
side: 'buy',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(buy.vwap),
worst_price: numStr(buy.worstPrice),
filled_usd: numStr(buy.filledUsd),
filled_base: numStr(buy.filledBase),
impact_bps: numStr(buy.impactBps),
levels_consumed: buy.levelsConsumed,
fill_pct: numStr(buy.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
const sell = simulateSell(bids, mid, sizeUsd);
rows.push({
market_name: market,
side: 'sell',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(sell.vwap),
worst_price: numStr(sell.worstPrice),
filled_usd: numStr(sell.filledUsd),
filled_base: numStr(sell.filledBase),
impact_bps: numStr(sell.impactBps),
levels_consumed: sell.levelsConsumed,
fill_pct: numStr(sell.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
await graphqlRequest(cfg, mutation, { rows: objectsV1 });
}
if (objectsV2.length) {
const mutation = `
mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
insert_dlob_slippage_latest_v2(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_v2_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
}
} catch (err) {
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
}
try {
await upsertSlippage(cfg, rows);
} catch (err) {
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
@@ -20,30 +30,53 @@ function envList(name, fallbackCsv) {
}
function resolveConfig() {
const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim();
const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim();
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000);
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
return { hasuraUrl, hasuraAdminSecret, markets, pollMs };
return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs };
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
},
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
@@ -63,6 +96,7 @@ async function main() {
service: 'dlob-ts-archiver',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
},
@@ -93,6 +127,11 @@ async function main() {
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
dlob_slippage_latest_v2(where: { market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
}
`;
@@ -160,28 +199,41 @@ async function main() {
raw: r.raw ?? null,
}));
if (!statsRows.length && !depthRows.length && !slippageRows.length) {
await sleep(cfg.pollMs);
continue;
}
const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
ts: now,
market_name: r.market_name,
side: r.side,
size_usd: r.size_usd,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
vwap_price: r.vwap_price ?? null,
worst_price: r.worst_price ?? null,
filled_usd: r.filled_usd ?? null,
filled_base: r.filled_base ?? null,
impact_bps: r.impact_bps ?? null,
levels_consumed: r.levels_consumed ?? null,
fill_pct: r.fill_pct ?? null,
raw: r.raw ?? null,
}));
const mutation = `
mutation InsertTs(
$stats: [dlob_stats_ts_insert_input!]!
$depth: [dlob_depth_bps_ts_insert_input!]!
$slip: [dlob_slippage_ts_insert_input!]!
$slipV2: [dlob_slippage_ts_v2_insert_input!]!
) {
insert_dlob_stats_ts(objects: $stats) { affected_rows }
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
insert_dlob_slippage_ts(objects: $slip) { affected_rows }
insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, {
stats: statsRows,
depth: depthRows,
slip: slippageRows,
});
await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 });
} catch (err) {
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
}
@@ -194,4 +246,3 @@ main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -97,9 +97,12 @@ async function main() {
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' };
const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' };
const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' };
const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' };
const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' };
const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' };
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
@@ -169,6 +172,21 @@ async function main() {
await ensureTickTable(t);
}
// Cached candles table (precomputed by worker; public read).
await ensurePublicSelectTable(candlesCacheTable, [
'bucket',
'bucket_seconds',
'symbol',
'source',
'open',
'high',
'low',
'close',
'oracle_close',
'ticks',
'updated_at',
]);
const ensureDlobTable = async (table, columns) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -321,6 +339,28 @@ async function main() {
'updated_at',
]);
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
]);
await ensurePublicSelectTable(dlobStatsTsTable, [
'ts',
'id',
@@ -386,6 +426,27 @@ async function main() {
'raw',
]);
await ensurePublicSelectTable(dlobSlippageTsV2Table, [
'ts',
'id',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
]);
// Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });

View File

@@ -125,6 +125,29 @@ CREATE TABLE IF NOT EXISTS public.drift_candles (
ticks bigint
);
-- Precomputed candle cache (materialized by a worker).
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
-- NOTE: `source=''` means "any source" (no source filter).
CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
bucket timestamptz NOT NULL,
bucket_seconds integer NOT NULL,
symbol text NOT NULL,
source text NOT NULL DEFAULT '',
open numeric NOT NULL,
high numeric NOT NULL,
low numeric NOT NULL,
close numeric NOT NULL,
oracle_close numeric,
ticks bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (bucket, bucket_seconds, symbol, source)
);
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC);
-- If an older version of the function exists with an incompatible return type,
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
@@ -272,6 +295,38 @@ CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name);
-- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables.
@@ -358,6 +413,33 @@ SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrat
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present).
DO $$
BEGIN
@@ -375,3 +457,8 @@ BEGIN
PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;

View File

@@ -22,6 +22,7 @@ resources:
- dlob-depth-worker/deployment.yaml
- dlob-slippage-worker/deployment.yaml
- dlob-ts-archiver/deployment.yaml
- candles-cache-worker/deployment.yaml
configMapGenerator:
- name: postgres-initdb
@@ -42,9 +43,15 @@ configMapGenerator:
- name: dlob-ts-archiver-script
files:
- dlob-ts-archiver/worker.mjs
- name: candles-cache-worker-script
files:
- candles-cache-worker/worker.mjs
- name: trade-api-wrapper
files:
- api/wrapper.mjs
- name: trade-api-upstream
files:
- api/server.mjs
generatorOptions:
disableNameSuffixHash: true