diff --git a/kustomize/base/dlob-worker/worker.mjs b/kustomize/base/dlob-worker/worker.mjs new file mode 100644 index 0000000..8fea21a --- /dev/null +++ b/kustomize/base/dlob-worker/worker.mjs @@ -0,0 +1,367 @@ +import fs from 'node:fs'; +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function readJsonFile(filePath) { + try { + const raw = fs.readFileSync(filePath, 'utf8'); + return JSON.parse(raw); + } catch { + return undefined; + } +} + +function getIsoNow() { + return new Date().toISOString(); +} + +function clampInt(value, min, max, fallback) { + const n = Number.parseInt(String(value ?? ''), 10); + if (!Number.isInteger(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function resolveConfig() { + const tokensPath = + process.env.HASURA_TOKENS_FILE || + process.env.TOKENS_FILE || + process.env.HASURA_CONFIG_FILE || + '/app/tokens/hasura.json'; + const tokens = readJsonFile(tokensPath) || {}; + + const hasuraUrl = + process.env.HASURA_GRAPHQL_URL || + tokens.graphqlUrl || + tokens.apiUrl || + 'http://hasura:8080/v1/graphql'; + const hasuraAdminSecret = + process.env.HASURA_ADMIN_SECRET || + process.env.HASURA_GRAPHQL_ADMIN_SECRET || + tokens.adminSecret || + tokens.hasuraAdminSecret; + const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; + + const dlobHttpBase = String(process.env.DLOB_HTTP_URL || process.env.DLOB_HTTP_BASE || 'https://dlob.drift.trade') + .trim() + .replace(/\/$/, ''); + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,BONK-PERP,BTC-PERP,ETH-PERP'); + const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10); + const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500); + + const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); + const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); + if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) + throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); + if (!Number.isFinite(basePrecision) || basePrecision <= 0) + throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); + + return { + hasuraUrl, + hasuraAdminSecret, + hasuraAuthToken, + dlobHttpBase, + markets, + depth, + pollMs, + pricePrecision, + basePrecision, + }; +} + +async function graphqlRequest(cfg, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (cfg.hasuraAuthToken) { + headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; + } else if (cfg.hasuraAdminSecret) { + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } else { + throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)'); + } + + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(10_000), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = JSON.parse(text); + if (json.errors?.length) { + throw new Error(json.errors.map((e) => e.message).join(' | ')); + } + return json.data; +} + +function toNumberOrNull(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value === 'string') { + const s = value.trim(); + if (!s) return null; + const n = Number(s); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function numStr(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; + if (typeof value === 'string') return value.trim() || null; + return null; +} + +function parseScaled(valueRaw, scale) { + const n = toNumberOrNull(valueRaw); + if (n == null) return null; + return n / scale; +} + +function computeStats({ l2, depth, pricePrecision, basePrecision }) { + const bids = Array.isArray(l2?.bids) ? l2.bids : []; + const asks = Array.isArray(l2?.asks) ? l2.asks : []; + + const bestBid = parseScaled(l2?.bestBidPrice ?? bids?.[0]?.price, pricePrecision); + const bestAsk = parseScaled(l2?.bestAskPrice ?? asks?.[0]?.price, pricePrecision); + const markPrice = parseScaled(l2?.markPrice, pricePrecision); + const oraclePrice = parseScaled(l2?.oracleData?.price ?? l2?.oracle, pricePrecision); + + const mid = bestBid != null && bestAsk != null ? (bestBid + bestAsk) / 2 : null; + const spreadAbs = bestBid != null && bestAsk != null ? bestAsk - bestBid : null; + const spreadBps = spreadAbs != null && mid != null && mid > 0 ? (spreadAbs / mid) * 10_000 : null; + + const levels = Math.max(1, depth); + let bidBase = 0; + let askBase = 0; + let bidUsd = 0; + let askUsd = 0; + + for (let i = 0; i < Math.min(levels, bids.length); i += 1) { + const p = parseScaled(bids[i]?.price, pricePrecision); + const s = toNumberOrNull(bids[i]?.size); + if (p == null || s == null) continue; + const base = s / basePrecision; + bidBase += base; + bidUsd += base * p; + } + + for (let i = 0; i < Math.min(levels, asks.length); i += 1) { + const p = parseScaled(asks[i]?.price, pricePrecision); + const s = toNumberOrNull(asks[i]?.size); + if (p == null || s == null) continue; + const base = s / basePrecision; + askBase += base; + askUsd += base * p; + } + + const denom = bidUsd + askUsd; + const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null; + + return { + bestBid, + bestAsk, + mid, + spreadAbs, + spreadBps, + markPrice, + oraclePrice, + depthLevels: levels, + bidBase, + askBase, + bidUsd, + askUsd, + imbalance, + }; +} + +function l2ToInsertObject({ l2, updatedAt, pricePrecision }) { + return { + market_name: String(l2.marketName), + market_type: String(l2.marketType || 'perp'), + market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, + ts: l2.ts == null ? null : String(l2.ts), + slot: l2.slot == null ? null : String(l2.slot), + mark_price: numStr(parseScaled(l2.markPrice, pricePrecision)), + oracle_price: numStr(parseScaled(l2.oracleData?.price ?? l2.oracle, pricePrecision)), + best_bid_price: numStr(parseScaled(l2.bestBidPrice, pricePrecision)), + best_ask_price: numStr(parseScaled(l2.bestAskPrice, pricePrecision)), + bids: l2.bids ?? null, + asks: l2.asks ?? null, + raw: l2 ?? null, + updated_at: updatedAt, + }; +} + +function statsToInsertObject({ l2, stats, updatedAt }) { + return { + market_name: String(l2.marketName), + market_type: String(l2.marketType || 'perp'), + market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, + ts: l2.ts == null ? null : String(l2.ts), + slot: l2.slot == null ? null : String(l2.slot), + mark_price: stats.markPrice == null ? null : String(stats.markPrice), + oracle_price: stats.oraclePrice == null ? null : String(stats.oraclePrice), + best_bid_price: stats.bestBid == null ? null : String(stats.bestBid), + best_ask_price: stats.bestAsk == null ? null : String(stats.bestAsk), + mid_price: stats.mid == null ? null : String(stats.mid), + spread_abs: stats.spreadAbs == null ? null : String(stats.spreadAbs), + spread_bps: stats.spreadBps == null ? null : String(stats.spreadBps), + depth_levels: stats.depthLevels, + depth_bid_base: Number.isFinite(stats.bidBase) ? String(stats.bidBase) : null, + depth_ask_base: Number.isFinite(stats.askBase) ? String(stats.askBase) : null, + depth_bid_usd: Number.isFinite(stats.bidUsd) ? String(stats.bidUsd) : null, + depth_ask_usd: Number.isFinite(stats.askUsd) ? String(stats.askUsd) : null, + imbalance: stats.imbalance == null ? null : String(stats.imbalance), + raw: { + spreadPct: l2.spreadPct ?? null, + spreadQuote: l2.spreadQuote ?? null, + }, + updated_at: updatedAt, + }; +} + +async function fetchL2(cfg, marketName) { + const u = new URL(`${cfg.dlobHttpBase}/l2`); + u.searchParams.set('marketName', marketName); + u.searchParams.set('depth', String(cfg.depth)); + const res = await fetch(u.toString(), { signal: AbortSignal.timeout(5_000) }); + const text = await res.text(); + if (!res.ok) throw new Error(`DLOB HTTP ${res.status}: ${text}`); + return JSON.parse(text); +} + +async function upsertBatch(cfg, l2Objects, statsObjects) { + if (!l2Objects.length && !statsObjects.length) return; + + const mutation = ` + mutation UpsertDlob($l2: [dlob_l2_latest_insert_input!]!, $stats: [dlob_stats_latest_insert_input!]!) { + insert_dlob_l2_latest( + objects: $l2 + on_conflict: { + constraint: dlob_l2_latest_pkey + update_columns: [ + market_type + market_index + ts + slot + mark_price + oracle_price + best_bid_price + best_ask_price + bids + asks + raw + updated_at + ] + } + ) { affected_rows } + insert_dlob_stats_latest( + objects: $stats + on_conflict: { + constraint: dlob_stats_latest_pkey + update_columns: [ + market_type + market_index + ts + slot + mark_price + oracle_price + best_bid_price + best_ask_price + mid_price + spread_abs + spread_bps + depth_levels + depth_bid_base + depth_ask_base + depth_bid_usd + depth_ask_usd + imbalance + raw + updated_at + ] + } + ) { affected_rows } + } + `; + + await graphqlRequest(cfg, mutation, { l2: l2Objects, stats: statsObjects }); +} + +async function main() { + const cfg = resolveConfig(); + const lastTsByMarket = new Map(); + + console.log( + JSON.stringify( + { + service: 'dlob-worker', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', + dlobHttpBase: cfg.dlobHttpBase, + markets: cfg.markets, + depth: cfg.depth, + pollMs: cfg.pollMs, + }, + null, + 2 + ) + ); + + while (true) { + const updatedAt = getIsoNow(); + + const results = await Promise.allSettled(cfg.markets.map((m) => fetchL2(cfg, m))); + const l2Objects = []; + const statsObjects = []; + + for (let i = 0; i < results.length; i += 1) { + const market = cfg.markets[i]; + const r = results[i]; + if (r.status !== 'fulfilled') { + console.error(`[dlob-worker] fetch ${market}: ${String(r.reason?.message || r.reason)}`); + continue; + } + const l2 = r.value; + if (!l2?.marketName) continue; + + const ts = l2.ts == null ? null : String(l2.ts); + if (ts != null && lastTsByMarket.get(l2.marketName) === ts) continue; + if (ts != null) lastTsByMarket.set(l2.marketName, ts); + + const stats = computeStats({ + l2, + depth: cfg.depth, + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }); + + l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision })); + statsObjects.push(statsToInsertObject({ l2, stats, updatedAt })); + } + + try { + await upsertBatch(cfg, l2Objects, statsObjects); + } catch (err) { + console.error(`[dlob-worker] upsert: ${String(err?.message || err)}`); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +});