feat(staging): add candles cache + v2 slippage

This commit is contained in:
codex
2026-02-01 18:12:15 +01:00
parent e6a2731d7e
commit b239f564b2
7 changed files with 526 additions and 294 deletions

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
@@ -19,14 +29,87 @@ function envList(name, fallbackCsv) {
.filter(Boolean);
}
function envIntList(name, fallbackCsv) {
const out = [];
for (const item of envList(name, fallbackCsv)) {
const n = Number.parseInt(item, 10);
if (!Number.isFinite(n)) continue;
out.push(n);
function parsePositiveNumber(value) {
const n = Number.parseFloat(String(value ?? '').trim());
if (!Number.isFinite(n) || !(n > 0)) return null;
return n;
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
.map(parsePositiveNumber)
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
sizesUsdInt,
depthLevels,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
function toNumberOrNull(value) {
@@ -41,225 +124,83 @@ function toNumberOrNull(value) {
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
try {
return JSON.parse(s);
} catch {
return value;
function normalizeLevels(raw) {
if (raw == null) return [];
if (Array.isArray(raw)) return raw;
if (typeof raw === 'string') {
const s = raw.trim();
if (!s) return [];
try {
const v = JSON.parse(s);
return Array.isArray(v) ? v : [];
} catch {
return [];
}
}
return [];
}
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
function parseScaledLevels(raw, pricePrecision, basePrecision) {
const levels = normalizeLevels(raw);
const out = [];
for (const item of v) {
const priceInt = toNumberOrNull(item?.price);
const sizeInt = toNumberOrNull(item?.size);
for (const it of levels) {
const priceInt = toNumberOrNull(it?.price);
const sizeInt = toNumberOrNull(it?.size);
if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
out.push({ price, size });
const base = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
out.push({ price, base });
}
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out;
}
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function simulateBuy(asks, mid, sizeUsd) {
function simulateFill(levels, sizeUsd) {
let remainingUsd = sizeUsd;
let filledUsd = 0;
let filledBase = 0;
let totalQuoteUsd = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of asks) {
if (!(remainingUsd > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const maxBase = remainingUsd / lvl.price;
const takeBase = Math.min(lvl.size, maxBase);
if (!(takeBase > 0)) continue;
const cost = takeBase * lvl.price;
filledUsd += cost;
filledBase += takeBase;
remainingUsd -= cost;
worstPrice = lvl.price;
for (const l of levels) {
if (remainingUsd <= 0) break;
const levelUsd = l.base * l.price;
if (levelUsd <= 0) continue;
levelsConsumed += 1;
}
worstPrice = l.price;
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
const takeUsd = Math.min(remainingUsd, levelUsd);
const takeBase = takeUsd / l.price;
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
}
function simulateSell(bids, mid, sizeUsd) {
if (mid == null || !(mid > 0)) {
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
}
const baseTarget = sizeUsd / mid;
let remainingBase = baseTarget;
let proceedsUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of bids) {
if (!(remainingBase > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const takeBase = Math.min(lvl.size, remainingBase);
if (!(takeBase > 0)) continue;
const proceeds = takeBase * lvl.price;
proceedsUsd += proceeds;
remainingUsd -= takeUsd;
filledUsd += takeUsd;
filledBase += takeBase;
remainingBase -= takeBase;
worstPrice = lvl.price;
levelsConsumed += 1;
totalQuoteUsd += takeUsd;
}
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
return {
filledUsd,
filledBase,
vwapPrice,
worstPrice,
levelsConsumed,
fillPct,
};
}
async function fetchL2Latest(cfg) {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
market_name
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
asks
updated_at
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
}
async function upsertSlippage(cfg, rows) {
if (!rows.length) return;
const mutation = `
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
best_bid_price
best_ask_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows });
function impactBps({ side, mid, vwap }) {
if (mid == null || vwap == null || mid <= 0) return null;
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
return null;
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
@@ -271,102 +212,180 @@ async function main() {
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
depthLevels: cfg.depthLevels,
},
null,
2
)
);
const lastSeenUpdatedAt = new Map(); // market -> updated_at
while (true) {
const rows = [];
const updatedAt = getIsoNow();
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: { market_name: { _in: $markets } }) {
market_name
market_type
market_index
ts
slot
best_bid_price
best_ask_price
bids
asks
updated_at
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const objectsV1 = [];
const objectsV2 = [];
for (const row of rows) {
const market = String(row?.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const rowUpdatedAt = row?.updated_at ?? null;
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bestBid = toNumberOrNull(row?.best_bid_price);
const bestAsk = toNumberOrNull(row?.best_ask_price);
if (bestBid == null || bestAsk == null) continue;
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
const mid = (bestBid + bestAsk) / 2;
if (!Number.isFinite(mid) || mid <= 0) continue;
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => b.price - a.price)
.slice(0, cfg.depthLevels);
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => a.price - b.price)
.slice(0, cfg.depthLevels);
for (const sizeUsd of cfg.sizesUsd) {
const buy = simulateBuy(asks, mid, sizeUsd);
rows.push({
market_name: market,
side: 'buy',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(buy.vwap),
worst_price: numStr(buy.worstPrice),
filled_usd: numStr(buy.filledUsd),
filled_base: numStr(buy.filledBase),
impact_bps: numStr(buy.impactBps),
levels_consumed: buy.levelsConsumed,
fill_pct: numStr(buy.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
// buy consumes asks (worse prices as you go up)
{
const sim = simulateFill(asks, sizeUsd);
const baseObj = {
market_name: market,
side: 'buy',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
const sell = simulateSell(bids, mid, sizeUsd);
rows.push({
market_name: market,
side: 'sell',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(sell.vwap),
worst_price: numStr(sell.worstPrice),
filled_usd: numStr(sell.filledUsd),
filled_base: numStr(sell.filledBase),
impact_bps: numStr(sell.impactBps),
levels_consumed: sell.levelsConsumed,
fill_pct: numStr(sell.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
// sell consumes bids (worse prices as you go down)
{
const sim = simulateFill(bids, sizeUsd);
const baseObj = {
market_name: market,
side: 'sell',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
}
}
} catch (err) {
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
}
try {
await upsertSlippage(cfg, rows);
if (objectsV1.length) {
const mutation = `
mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows: objectsV1 });
}
if (objectsV2.length) {
const mutation = `
mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
insert_dlob_slippage_latest_v2(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_v2_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
}
} catch (err) {
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);