import fs from 'node:fs'; import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; function readJsonFile(filePath) { try { const raw = fs.readFileSync(filePath, 'utf8'); return JSON.parse(raw); } catch { return undefined; } } function getIsoNow() { return new Date().toISOString(); } function clampInt(value, min, max, fallback) { const n = Number.parseInt(String(value ?? ''), 10); if (!Number.isInteger(n)) return fallback; return Math.min(max, Math.max(min, n)); } function envList(name, fallbackCsv) { const raw = process.env[name] ?? fallbackCsv; return String(raw) .split(',') .map((s) => s.trim()) .filter(Boolean); } function parsePositiveNumber(value) { const n = Number.parseFloat(String(value ?? '').trim()); if (!Number.isFinite(n) || !(n > 0)) return null; return n; } function resolveConfig() { const tokensPath = process.env.HASURA_TOKENS_FILE || process.env.TOKENS_FILE || process.env.HASURA_CONFIG_FILE || '/app/tokens/hasura.json'; const tokens = readJsonFile(tokensPath) || {}; const hasuraUrl = process.env.HASURA_GRAPHQL_URL || tokens.graphqlUrl || tokens.apiUrl || 'http://hasura:8080/v1/graphql'; const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || tokens.adminSecret || tokens.hasuraAdminSecret; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode'; const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP'); const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000') .map(parsePositiveNumber) .filter((n) => n != null) .map((n) => n) .filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx) .sort((a, b) => a - b); const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n)); const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25); const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, dlobSource, markets, pollMs, sizesUsd, sizesUsdInt, depthLevels, pricePrecision, basePrecision, }; } async function graphqlRequest(cfg, query, variables) { const headers = { 'content-type': 'application/json' }; if (cfg.hasuraAuthToken) { headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; } else if (cfg.hasuraAdminSecret) { headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; } else { throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)'); } const res = await fetch(cfg.hasuraUrl, { method: 'POST', headers, body: JSON.stringify({ query, variables }), signal: AbortSignal.timeout(15_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); const json = JSON.parse(text); if (json.errors?.length) { throw new Error(json.errors.map((e) => e.message).join(' | ')); } return json.data; } function toNumberOrNull(value) { if (value == null) return null; if (typeof value === 'number') return Number.isFinite(value) ? value : null; if (typeof value === 'string') { const s = value.trim(); if (!s) return null; const n = Number(s); return Number.isFinite(n) ? n : null; } return null; } function normalizeLevels(raw) { if (raw == null) return []; if (Array.isArray(raw)) return raw; if (typeof raw === 'string') { const s = raw.trim(); if (!s) return []; try { const v = JSON.parse(s); return Array.isArray(v) ? v : []; } catch { return []; } } return []; } function parseScaledLevels(raw, pricePrecision, basePrecision) { const levels = normalizeLevels(raw); const out = []; for (const it of levels) { const priceInt = toNumberOrNull(it?.price); const sizeInt = toNumberOrNull(it?.size); if (priceInt == null || sizeInt == null) continue; const price = priceInt / pricePrecision; const base = sizeInt / basePrecision; if (!Number.isFinite(price) || !Number.isFinite(base)) continue; out.push({ price, base }); } return out; } function simulateFill(levels, sizeUsd) { let remainingUsd = sizeUsd; let filledUsd = 0; let filledBase = 0; let totalQuoteUsd = 0; let worstPrice = null; let levelsConsumed = 0; for (const l of levels) { if (remainingUsd <= 0) break; const levelUsd = l.base * l.price; if (levelUsd <= 0) continue; levelsConsumed += 1; worstPrice = l.price; const takeUsd = Math.min(remainingUsd, levelUsd); const takeBase = takeUsd / l.price; remainingUsd -= takeUsd; filledUsd += takeUsd; filledBase += takeBase; totalQuoteUsd += takeUsd; } const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null; const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null; return { filledUsd, filledBase, vwapPrice, worstPrice, levelsConsumed, fillPct, }; } function impactBps({ side, mid, vwap }) { if (mid == null || vwap == null || mid <= 0) return null; if (side === 'buy') return ((vwap / mid) - 1) * 10_000; if (side === 'sell') return (1 - (vwap / mid)) * 10_000; return null; } async function main() { const cfg = resolveConfig(); console.log( JSON.stringify( { service: 'dlob-slippage-worker', startedAt: getIsoNow(), hasuraUrl: cfg.hasuraUrl, hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', dlobSource: cfg.dlobSource, markets: cfg.markets, pollMs: cfg.pollMs, sizesUsd: cfg.sizesUsd, depthLevels: cfg.depthLevels, }, null, 2 ) ); const lastSeenUpdatedAt = new Map(); // market -> updated_at while (true) { const updatedAt = getIsoNow(); try { const query = ` query DlobL2Latest($source: String!, $markets: [String!]!) { dlob_l2_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) { source market_name market_type market_index ts slot best_bid_price best_ask_price bids asks updated_at } } `; const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets }); const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; const objectsV1 = []; const objectsV2 = []; for (const row of rows) { const market = String(row?.market_name || '').trim(); if (!market) continue; const rowUpdatedAt = row?.updated_at ?? null; if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue; if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt); const bestBid = toNumberOrNull(row?.best_bid_price); const bestAsk = toNumberOrNull(row?.best_ask_price); if (bestBid == null || bestAsk == null) continue; const mid = (bestBid + bestAsk) / 2; if (!Number.isFinite(mid) || mid <= 0) continue; const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision) .slice() .sort((a, b) => b.price - a.price) .slice(0, cfg.depthLevels); const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision) .slice() .sort((a, b) => a.price - b.price) .slice(0, cfg.depthLevels); for (const sizeUsd of cfg.sizesUsd) { // buy consumes asks (worse prices as you go up) { const sim = simulateFill(asks, sizeUsd); const baseObj = { source: cfg.dlobSource, market_name: market, side: 'buy', market_type: row?.market_type ?? 'perp', market_index: row?.market_index ?? null, ts: row?.ts == null ? null : String(row.ts), slot: row?.slot == null ? null : String(row.slot), mid_price: String(mid), vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice), worst_price: sim.worstPrice == null ? null : String(sim.worstPrice), filled_usd: String(sim.filledUsd), filled_base: String(sim.filledBase), impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }), levels_consumed: sim.levelsConsumed, fill_pct: sim.fillPct == null ? null : String(sim.fillPct), raw: { depthLevels: cfg.depthLevels }, updated_at: updatedAt, }; objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) }); if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) }); } // sell consumes bids (worse prices as you go down) { const sim = simulateFill(bids, sizeUsd); const baseObj = { source: cfg.dlobSource, market_name: market, side: 'sell', market_type: row?.market_type ?? 'perp', market_index: row?.market_index ?? null, ts: row?.ts == null ? null : String(row.ts), slot: row?.slot == null ? null : String(row.slot), mid_price: String(mid), vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice), worst_price: sim.worstPrice == null ? null : String(sim.worstPrice), filled_usd: String(sim.filledUsd), filled_base: String(sim.filledBase), impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }), levels_consumed: sim.levelsConsumed, fill_pct: sim.fillPct == null ? null : String(sim.fillPct), raw: { depthLevels: cfg.depthLevels }, updated_at: updatedAt, }; objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) }); if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) }); } } } if (objectsV1.length) { const mutation = ` mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) { insert_dlob_slippage_latest( objects: $rows on_conflict: { constraint: dlob_slippage_latest_pkey update_columns: [ market_type market_index ts slot mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct raw updated_at ] } ) { affected_rows } } `; await graphqlRequest(cfg, mutation, { rows: objectsV1 }); } if (objectsV2.length) { const mutation = ` mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) { insert_dlob_slippage_latest_v2( objects: $rows on_conflict: { constraint: dlob_slippage_latest_v2_pkey update_columns: [ market_type market_index ts slot mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct raw updated_at ] } ) { affected_rows } } `; await graphqlRequest(cfg, mutation, { rows: objectsV2 }); } } catch (err) { console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`); } await sleep(cfg.pollMs); } } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });