import fs from 'node:fs'; import * as http from 'node:http'; import * as https from 'node:https'; import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; function readJsonFile(filePath) { try { const raw = fs.readFileSync(filePath, 'utf8'); return JSON.parse(raw); } catch { return undefined; } } function getIsoNow() { return new Date().toISOString(); } function clampInt(value, min, max, fallback) { const n = Number.parseInt(String(value ?? ''), 10); if (!Number.isInteger(n)) return fallback; return Math.min(max, Math.max(min, n)); } function envList(name, fallbackCsv) { const raw = process.env[name] ?? fallbackCsv; return String(raw) .split(',') .map((s) => s.trim()) .filter(Boolean); } function envBool(name, fallback = false) { const raw = process.env[name]; if (raw == null) return fallback; const v = String(raw).trim().toLowerCase(); if (['1', 'true', 'yes', 'y', 'on'].includes(v)) return true; if (['0', 'false', 'no', 'n', 'off'].includes(v)) return false; return fallback; } function resolveConfig() { const tokensPath = process.env.HASURA_TOKENS_FILE || process.env.TOKENS_FILE || process.env.HASURA_CONFIG_FILE || '/app/tokens/hasura.json'; const tokens = readJsonFile(tokensPath) || {}; const hasuraUrl = process.env.HASURA_GRAPHQL_URL || tokens.graphqlUrl || tokens.apiUrl || 'http://hasura:8080/v1/graphql'; const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || tokens.adminSecret || tokens.hasuraAdminSecret; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const dlobHttpBase = String(process.env.DLOB_HTTP_URL || process.env.DLOB_HTTP_BASE || 'https://dlob.drift.trade') .trim() .replace(/\/$/, ''); const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false); const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10); const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500); const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, dlobHttpBase, dlobForceIpv6, markets, depth, pollMs, pricePrecision, basePrecision, }; } async function requestText(url, { timeoutMs, family } = {}) { const u = new URL(url); const client = u.protocol === 'https:' ? https : http; const port = u.port ? Number.parseInt(u.port, 10) : u.protocol === 'https:' ? 443 : 80; if (!Number.isFinite(port)) throw new Error(`Invalid port for url: ${url}`); return await new Promise((resolve, reject) => { const req = client.request( { protocol: u.protocol, hostname: u.hostname, port, path: `${u.pathname}${u.search}`, method: 'GET', family, servername: u.hostname, headers: { accept: 'application/json', }, }, (res) => { let data = ''; res.setEncoding('utf8'); res.on('data', (chunk) => { data += chunk; }); res.on('end', () => { resolve({ status: res.statusCode ?? 0, text: data }); }); } ); req.on('error', reject); req.setTimeout(timeoutMs ?? 5_000, () => { req.destroy(new Error(`Timeout after ${timeoutMs ?? 5_000}ms`)); }); req.end(); }); } async function graphqlRequest(cfg, query, variables) { const headers = { 'content-type': 'application/json' }; if (cfg.hasuraAuthToken) { headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; } else if (cfg.hasuraAdminSecret) { headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; } else { throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)'); } const res = await fetch(cfg.hasuraUrl, { method: 'POST', headers, body: JSON.stringify({ query, variables }), signal: AbortSignal.timeout(10_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); const json = JSON.parse(text); if (json.errors?.length) { throw new Error(json.errors.map((e) => e.message).join(' | ')); } return json.data; } function toNumberOrNull(value) { if (value == null) return null; if (typeof value === 'number') return Number.isFinite(value) ? value : null; if (typeof value === 'string') { const s = value.trim(); if (!s) return null; const n = Number(s); return Number.isFinite(n) ? n : null; } return null; } function numStr(value) { if (value == null) return null; if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; if (typeof value === 'string') return value.trim() || null; return null; } function parseScaled(valueRaw, scale) { const n = toNumberOrNull(valueRaw); if (n == null) return null; return n / scale; } function computeStats({ l2, depth, pricePrecision, basePrecision }) { const bids = Array.isArray(l2?.bids) ? l2.bids : []; const asks = Array.isArray(l2?.asks) ? l2.asks : []; const bestBid = parseScaled(l2?.bestBidPrice ?? bids?.[0]?.price, pricePrecision); const bestAsk = parseScaled(l2?.bestAskPrice ?? asks?.[0]?.price, pricePrecision); const markPrice = parseScaled(l2?.markPrice, pricePrecision); const oraclePrice = parseScaled(l2?.oracleData?.price ?? l2?.oracle, pricePrecision); const mid = bestBid != null && bestAsk != null ? (bestBid + bestAsk) / 2 : null; const spreadAbs = bestBid != null && bestAsk != null ? bestAsk - bestBid : null; const spreadBps = spreadAbs != null && mid != null && mid > 0 ? (spreadAbs / mid) * 10_000 : null; const levels = Math.max(1, depth); let bidBase = 0; let askBase = 0; let bidUsd = 0; let askUsd = 0; for (let i = 0; i < Math.min(levels, bids.length); i += 1) { const p = parseScaled(bids[i]?.price, pricePrecision); const s = toNumberOrNull(bids[i]?.size); if (p == null || s == null) continue; const base = s / basePrecision; bidBase += base; bidUsd += base * p; } for (let i = 0; i < Math.min(levels, asks.length); i += 1) { const p = parseScaled(asks[i]?.price, pricePrecision); const s = toNumberOrNull(asks[i]?.size); if (p == null || s == null) continue; const base = s / basePrecision; askBase += base; askUsd += base * p; } const denom = bidUsd + askUsd; const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null; return { bestBid, bestAsk, mid, spreadAbs, spreadBps, markPrice, oraclePrice, depthLevels: levels, bidBase, askBase, bidUsd, askUsd, imbalance, }; } function l2ToInsertObject({ l2, updatedAt, pricePrecision }) { return { market_name: String(l2.marketName), market_type: String(l2.marketType || 'perp'), market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, ts: l2.ts == null ? null : String(l2.ts), slot: l2.slot == null ? null : String(l2.slot), mark_price: numStr(parseScaled(l2.markPrice, pricePrecision)), oracle_price: numStr(parseScaled(l2.oracleData?.price ?? l2.oracle, pricePrecision)), best_bid_price: numStr(parseScaled(l2.bestBidPrice, pricePrecision)), best_ask_price: numStr(parseScaled(l2.bestAskPrice, pricePrecision)), bids: l2.bids ?? null, asks: l2.asks ?? null, raw: l2 ?? null, updated_at: updatedAt, }; } function statsToInsertObject({ l2, stats, updatedAt }) { return { market_name: String(l2.marketName), market_type: String(l2.marketType || 'perp'), market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, ts: l2.ts == null ? null : String(l2.ts), slot: l2.slot == null ? null : String(l2.slot), mark_price: stats.markPrice == null ? null : String(stats.markPrice), oracle_price: stats.oraclePrice == null ? null : String(stats.oraclePrice), best_bid_price: stats.bestBid == null ? null : String(stats.bestBid), best_ask_price: stats.bestAsk == null ? null : String(stats.bestAsk), mid_price: stats.mid == null ? null : String(stats.mid), spread_abs: stats.spreadAbs == null ? null : String(stats.spreadAbs), spread_bps: stats.spreadBps == null ? null : String(stats.spreadBps), depth_levels: stats.depthLevels, depth_bid_base: Number.isFinite(stats.bidBase) ? String(stats.bidBase) : null, depth_ask_base: Number.isFinite(stats.askBase) ? String(stats.askBase) : null, depth_bid_usd: Number.isFinite(stats.bidUsd) ? String(stats.bidUsd) : null, depth_ask_usd: Number.isFinite(stats.askUsd) ? String(stats.askUsd) : null, imbalance: stats.imbalance == null ? null : String(stats.imbalance), raw: { spreadPct: l2.spreadPct ?? null, spreadQuote: l2.spreadQuote ?? null, }, updated_at: updatedAt, }; } async function fetchL2(cfg, marketName) { const u = new URL(`${cfg.dlobHttpBase}/l2`); u.searchParams.set('marketName', marketName); u.searchParams.set('depth', String(cfg.depth)); const url = u.toString(); if (cfg.dlobForceIpv6) { const { status, text } = await requestText(url, { timeoutMs: 5_000, family: 6 }); if (status < 200 || status >= 300) throw new Error(`DLOB HTTP ${status}: ${text}`); return JSON.parse(text); } const res = await fetch(url, { signal: AbortSignal.timeout(5_000) }); const text = await res.text(); if (!res.ok) throw new Error(`DLOB HTTP ${res.status}: ${text}`); return JSON.parse(text); } async function upsertBatch(cfg, l2Objects, statsObjects) { if (!l2Objects.length && !statsObjects.length) return; const mutation = ` mutation UpsertDlob($l2: [dlob_l2_latest_insert_input!]!, $stats: [dlob_stats_latest_insert_input!]!) { insert_dlob_l2_latest( objects: $l2 on_conflict: { constraint: dlob_l2_latest_pkey update_columns: [ market_type market_index ts slot mark_price oracle_price best_bid_price best_ask_price bids asks raw updated_at ] } ) { affected_rows } insert_dlob_stats_latest( objects: $stats on_conflict: { constraint: dlob_stats_latest_pkey update_columns: [ market_type market_index ts slot mark_price oracle_price best_bid_price best_ask_price mid_price spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance raw updated_at ] } ) { affected_rows } } `; await graphqlRequest(cfg, mutation, { l2: l2Objects, stats: statsObjects }); } async function main() { const cfg = resolveConfig(); const lastTsByMarket = new Map(); console.log( JSON.stringify( { service: 'dlob-worker', startedAt: getIsoNow(), hasuraUrl: cfg.hasuraUrl, hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', dlobHttpBase: cfg.dlobHttpBase, dlobForceIpv6: cfg.dlobForceIpv6, markets: cfg.markets, depth: cfg.depth, pollMs: cfg.pollMs, }, null, 2 ) ); while (true) { const updatedAt = getIsoNow(); const results = await Promise.allSettled(cfg.markets.map((m) => fetchL2(cfg, m))); const l2Objects = []; const statsObjects = []; for (let i = 0; i < results.length; i += 1) { const market = cfg.markets[i]; const r = results[i]; if (r.status !== 'fulfilled') { console.error(`[dlob-worker] fetch ${market}: ${String(r.reason?.message || r.reason)}`); continue; } const l2 = r.value; if (!l2?.marketName) continue; const ts = l2.ts == null ? null : String(l2.ts); if (ts != null && lastTsByMarket.get(l2.marketName) === ts) continue; if (ts != null) lastTsByMarket.set(l2.marketName, ts); const stats = computeStats({ l2, depth: cfg.depth, pricePrecision: cfg.pricePrecision, basePrecision: cfg.basePrecision, }); l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision })); statsObjects.push(statsToInsertObject({ l2, stats, updatedAt })); } try { await upsertBatch(cfg, l2Objects, statsObjects); } catch (err) { console.error(`[dlob-worker] upsert: ${String(err?.message || err)}`); } await sleep(cfg.pollMs); } } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });