From 5c59b278080ed733194fe77230f0ca229890e420 Mon Sep 17 00:00:00 2001 From: u1 Date: Sat, 10 Jan 2026 11:30:41 +0000 Subject: [PATCH] feat(dlob): add dlob-slippage-worker script --- .../base/dlob-slippage-worker/worker.mjs | 379 ++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 kustomize/base/dlob-slippage-worker/worker.mjs diff --git a/kustomize/base/dlob-slippage-worker/worker.mjs b/kustomize/base/dlob-slippage-worker/worker.mjs new file mode 100644 index 0000000..19a8f51 --- /dev/null +++ b/kustomize/base/dlob-slippage-worker/worker.mjs @@ -0,0 +1,379 @@ +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function getIsoNow() { + return new Date().toISOString(); +} + +function clampInt(value, min, max, fallback) { + const n = Number.parseInt(String(value ?? ''), 10); + if (!Number.isInteger(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function envIntList(name, fallbackCsv) { + const out = []; + for (const item of envList(name, fallbackCsv)) { + const n = Number.parseInt(item, 10); + if (!Number.isFinite(n)) continue; + out.push(n); + } + return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite); +} + +function toNumberOrNull(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value === 'string') { + const s = value.trim(); + if (!s) return null; + const n = Number(s); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function numStr(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; + if (typeof value === 'string') return value.trim() || null; + return null; +} + +function jsonNormalize(value) { + if (typeof value !== 'string') return value; + const s = value.trim(); + if (!s) return null; + try { + return JSON.parse(s); + } catch { + return value; + } +} + +function resolveConfig() { + const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql'; + const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined; + const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); + const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); + const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000'); + + const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); + const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); + if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) + throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); + if (!Number.isFinite(basePrecision) || basePrecision <= 0) + throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); + + return { + hasuraUrl, + hasuraAdminSecret, + hasuraAuthToken, + markets, + pollMs, + sizesUsd, + pricePrecision, + basePrecision, + }; +} + +async function graphqlRequest(cfg, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (cfg.hasuraAuthToken) { + headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; + } else if (cfg.hasuraAdminSecret) { + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } else { + throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)'); + } + + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(10_000), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = JSON.parse(text); + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +function parseLevels(raw, pricePrecision, basePrecision, side) { + const v = jsonNormalize(raw); + if (!Array.isArray(v)) return []; + + const out = []; + for (const item of v) { + const priceInt = toNumberOrNull(item?.price); + const sizeInt = toNumberOrNull(item?.size); + if (priceInt == null || sizeInt == null) continue; + const price = priceInt / pricePrecision; + const size = sizeInt / basePrecision; + if (!Number.isFinite(price) || !Number.isFinite(size)) continue; + out.push({ price, size }); + } + + if (side === 'bid') out.sort((a, b) => b.price - a.price); + if (side === 'ask') out.sort((a, b) => a.price - b.price); + return out; +} + +function computeMid(bestBid, bestAsk, markPrice, oraclePrice) { + if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2; + if (markPrice != null) return markPrice; + if (oraclePrice != null) return oraclePrice; + return null; +} + +function simulateBuy(asks, mid, sizeUsd) { + let remainingUsd = sizeUsd; + let filledUsd = 0; + let filledBase = 0; + let worstPrice = null; + let levelsConsumed = 0; + + for (const lvl of asks) { + if (!(remainingUsd > 0)) break; + if (!(lvl.price > 0) || !(lvl.size > 0)) continue; + + const maxBase = remainingUsd / lvl.price; + const takeBase = Math.min(lvl.size, maxBase); + if (!(takeBase > 0)) continue; + + const cost = takeBase * lvl.price; + filledUsd += cost; + filledBase += takeBase; + remainingUsd -= cost; + worstPrice = lvl.price; + levelsConsumed += 1; + } + + const vwap = filledBase > 0 ? filledUsd / filledBase : null; + const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null; + const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null; + + return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct }; +} + +function simulateSell(bids, mid, sizeUsd) { + if (mid == null || !(mid > 0)) { + return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null }; + } + + const baseTarget = sizeUsd / mid; + let remainingBase = baseTarget; + let proceedsUsd = 0; + let filledBase = 0; + let worstPrice = null; + let levelsConsumed = 0; + + for (const lvl of bids) { + if (!(remainingBase > 0)) break; + if (!(lvl.price > 0) || !(lvl.size > 0)) continue; + + const takeBase = Math.min(lvl.size, remainingBase); + if (!(takeBase > 0)) continue; + + const proceeds = takeBase * lvl.price; + proceedsUsd += proceeds; + filledBase += takeBase; + remainingBase -= takeBase; + worstPrice = lvl.price; + levelsConsumed += 1; + } + + const vwap = filledBase > 0 ? proceedsUsd / filledBase : null; + const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null; + const fillPct = baseTarget > 0 ? filledBase / baseTarget : null; + + return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct }; +} + +async function fetchL2Latest(cfg) { + const query = ` + query DlobL2Latest($markets: [String!]!) { + dlob_l2_latest(where: {market_name: {_in: $markets}}) { + market_name + market_type + market_index + ts + slot + mark_price + oracle_price + best_bid_price + best_ask_price + bids + asks + updated_at + } + } + `; + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; +} + +async function upsertSlippage(cfg, rows) { + if (!rows.length) return; + const mutation = ` + mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) { + insert_dlob_slippage_latest( + objects: $rows + on_conflict: { + constraint: dlob_slippage_latest_pkey + update_columns: [ + market_type + market_index + ts + slot + mid_price + best_bid_price + best_ask_price + vwap_price + worst_price + filled_usd + filled_base + impact_bps + levels_consumed + fill_pct + raw + updated_at + ] + } + ) { affected_rows } + } + `; + await graphqlRequest(cfg, mutation, { rows }); +} + +async function main() { + const cfg = resolveConfig(); + const lastUpdatedAtByMarket = new Map(); + + console.log( + JSON.stringify( + { + service: 'dlob-slippage-worker', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', + markets: cfg.markets, + pollMs: cfg.pollMs, + sizesUsd: cfg.sizesUsd, + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }, + null, + 2 + ) + ); + + while (true) { + const rows = []; + + try { + const l2Rows = await fetchL2Latest(cfg); + for (const l2 of l2Rows) { + const market = String(l2.market_name || '').trim(); + if (!market) continue; + + const updatedAt = l2.updated_at || null; + if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue; + if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt); + + const bestBid = toNumberOrNull(l2.best_bid_price); + const bestAsk = toNumberOrNull(l2.best_ask_price); + const markPrice = toNumberOrNull(l2.mark_price); + const oraclePrice = toNumberOrNull(l2.oracle_price); + const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice); + + const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid'); + const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask'); + + for (const sizeUsd of cfg.sizesUsd) { + const buy = simulateBuy(asks, mid, sizeUsd); + rows.push({ + market_name: market, + side: 'buy', + size_usd: sizeUsd, + market_type: l2.market_type ? String(l2.market_type) : 'perp', + market_index: typeof l2.market_index === 'number' ? l2.market_index : null, + ts: l2.ts == null ? null : String(l2.ts), + slot: l2.slot == null ? null : String(l2.slot), + mid_price: numStr(mid), + best_bid_price: numStr(bestBid), + best_ask_price: numStr(bestAsk), + vwap_price: numStr(buy.vwap), + worst_price: numStr(buy.worstPrice), + filled_usd: numStr(buy.filledUsd), + filled_base: numStr(buy.filledBase), + impact_bps: numStr(buy.impactBps), + levels_consumed: buy.levelsConsumed, + fill_pct: numStr(buy.fillPct), + raw: { + ref: 'mid', + units: 'usd', + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }, + updated_at: updatedAt, + }); + + const sell = simulateSell(bids, mid, sizeUsd); + rows.push({ + market_name: market, + side: 'sell', + size_usd: sizeUsd, + market_type: l2.market_type ? String(l2.market_type) : 'perp', + market_index: typeof l2.market_index === 'number' ? l2.market_index : null, + ts: l2.ts == null ? null : String(l2.ts), + slot: l2.slot == null ? null : String(l2.slot), + mid_price: numStr(mid), + best_bid_price: numStr(bestBid), + best_ask_price: numStr(bestAsk), + vwap_price: numStr(sell.vwap), + worst_price: numStr(sell.worstPrice), + filled_usd: numStr(sell.filledUsd), + filled_base: numStr(sell.filledBase), + impact_bps: numStr(sell.impactBps), + levels_consumed: sell.levelsConsumed, + fill_pct: numStr(sell.fillPct), + raw: { + ref: 'mid', + units: 'usd', + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }, + updated_at: updatedAt, + }); + } + } + } catch (err) { + console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`); + } + + try { + await upsertSlippage(cfg, rows); + } catch (err) { + console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +});