import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; function getIsoNow() { return new Date().toISOString(); } function clampInt(value, min, max, fallback) { const n = Number.parseInt(String(value ?? ''), 10); if (!Number.isInteger(n)) return fallback; return Math.min(max, Math.max(min, n)); } function envList(name, fallbackCsv) { const raw = process.env[name] ?? fallbackCsv; return String(raw) .split(',') .map((s) => s.trim()) .filter(Boolean); } function envIntList(name, fallbackCsv) { const out = []; for (const item of envList(name, fallbackCsv)) { const n = Number.parseInt(item, 10); if (!Number.isFinite(n)) continue; out.push(n); } return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite); } function toNumberOrNull(value) { if (value == null) return null; if (typeof value === 'number') return Number.isFinite(value) ? value : null; if (typeof value === 'string') { const s = value.trim(); if (!s) return null; const n = Number(s); return Number.isFinite(n) ? n : null; } return null; } function numStr(value) { if (value == null) return null; if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; if (typeof value === 'string') return value.trim() || null; return null; } function jsonNormalize(value) { if (typeof value !== 'string') return value; const s = value.trim(); if (!s) return null; try { return JSON.parse(s); } catch { return value; } } function resolveConfig() { const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql'; const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200'); const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs, bandsBps, pricePrecision, basePrecision, }; } async function graphqlRequest(cfg, query, variables) { const headers = { 'content-type': 'application/json' }; if (cfg.hasuraAuthToken) { headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; } else if (cfg.hasuraAdminSecret) { headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; } else { throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)'); } const res = await fetch(cfg.hasuraUrl, { method: 'POST', headers, body: JSON.stringify({ query, variables }), signal: AbortSignal.timeout(10_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); const json = JSON.parse(text); if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); return json.data; } function parseLevels(raw, pricePrecision, basePrecision, side) { const v = jsonNormalize(raw); if (!Array.isArray(v)) return []; const out = []; for (const item of v) { const priceInt = toNumberOrNull(item?.price); const sizeInt = toNumberOrNull(item?.size); if (priceInt == null || sizeInt == null) continue; const price = priceInt / pricePrecision; const size = sizeInt / basePrecision; if (!Number.isFinite(price) || !Number.isFinite(size)) continue; out.push({ price, size }); } if (side === 'bid') out.sort((a, b) => b.price - a.price); if (side === 'ask') out.sort((a, b) => a.price - b.price); return out; } function computeMid(bestBid, bestAsk, markPrice, oraclePrice) { if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2; if (markPrice != null) return markPrice; if (oraclePrice != null) return oraclePrice; return null; } function computeBandDepth({ bids, asks, mid, bandBps }) { if (mid == null || !(mid > 0)) { return { bidBase: 0, askBase: 0, bidUsd: 0, askUsd: 0, imbalance: null }; } const minBidPrice = mid * (1 - bandBps / 10_000); const maxAskPrice = mid * (1 + bandBps / 10_000); let bidBase = 0; let askBase = 0; let bidUsd = 0; let askUsd = 0; for (const lvl of bids) { if (lvl.price < minBidPrice) break; bidBase += lvl.size; bidUsd += lvl.size * lvl.price; } for (const lvl of asks) { if (lvl.price > maxAskPrice) break; askBase += lvl.size; askUsd += lvl.size * lvl.price; } const denom = bidUsd + askUsd; const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null; return { bidBase, askBase, bidUsd, askUsd, imbalance }; } async function fetchL2Latest(cfg) { const query = ` query DlobL2Latest($markets: [String!]!) { dlob_l2_latest(where: {market_name: {_in: $markets}}) { market_name market_type market_index ts slot mark_price oracle_price best_bid_price best_ask_price bids asks updated_at } } `; const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; } async function upsertDepth(cfg, rows) { if (!rows.length) return; const mutation = ` mutation UpsertDlobDepth($rows: [dlob_depth_bps_latest_insert_input!]!) { insert_dlob_depth_bps_latest( objects: $rows on_conflict: { constraint: dlob_depth_bps_latest_pkey update_columns: [ market_type market_index ts slot mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance raw updated_at ] } ) { affected_rows } } `; await graphqlRequest(cfg, mutation, { rows }); } async function main() { const cfg = resolveConfig(); const lastUpdatedAtByMarket = new Map(); console.log( JSON.stringify( { service: 'dlob-depth-worker', startedAt: getIsoNow(), hasuraUrl: cfg.hasuraUrl, hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', markets: cfg.markets, pollMs: cfg.pollMs, bandsBps: cfg.bandsBps, pricePrecision: cfg.pricePrecision, basePrecision: cfg.basePrecision, }, null, 2 ) ); while (true) { const rows = []; try { const l2Rows = await fetchL2Latest(cfg); for (const l2 of l2Rows) { const market = String(l2.market_name || '').trim(); if (!market) continue; const updatedAt = l2.updated_at || null; if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue; if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt); const bestBid = toNumberOrNull(l2.best_bid_price); const bestAsk = toNumberOrNull(l2.best_ask_price); const markPrice = toNumberOrNull(l2.mark_price); const oraclePrice = toNumberOrNull(l2.oracle_price); const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice); const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid'); const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask'); for (const bandBps of cfg.bandsBps) { const d = computeBandDepth({ bids, asks, mid, bandBps }); rows.push({ market_name: market, band_bps: bandBps, market_type: l2.market_type ? String(l2.market_type) : 'perp', market_index: typeof l2.market_index === 'number' ? l2.market_index : null, ts: l2.ts == null ? null : String(l2.ts), slot: l2.slot == null ? null : String(l2.slot), mid_price: numStr(mid), best_bid_price: numStr(bestBid), best_ask_price: numStr(bestAsk), bid_base: numStr(d.bidBase), ask_base: numStr(d.askBase), bid_usd: numStr(d.bidUsd), ask_usd: numStr(d.askUsd), imbalance: numStr(d.imbalance), raw: { ref: 'mid', pricePrecision: cfg.pricePrecision, basePrecision: cfg.basePrecision, }, updated_at: updatedAt, }); } } } catch (err) { console.error(`[dlob-depth-worker] fetch/compute: ${String(err?.message || err)}`); } try { await upsertDepth(cfg, rows); } catch (err) { console.error(`[dlob-depth-worker] upsert: ${String(err?.message || err)}`); } await sleep(cfg.pollMs); } } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });