diff --git a/kustomize/base/dlob-depth-worker/worker.mjs b/kustomize/base/dlob-depth-worker/worker.mjs new file mode 100644 index 0000000..ec89eea --- /dev/null +++ b/kustomize/base/dlob-depth-worker/worker.mjs @@ -0,0 +1,311 @@ +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function getIsoNow() { + return new Date().toISOString(); +} + +function clampInt(value, min, max, fallback) { + const n = Number.parseInt(String(value ?? ''), 10); + if (!Number.isInteger(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function envIntList(name, fallbackCsv) { + const out = []; + for (const item of envList(name, fallbackCsv)) { + const n = Number.parseInt(item, 10); + if (!Number.isFinite(n)) continue; + out.push(n); + } + return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite); +} + +function toNumberOrNull(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value === 'string') { + const s = value.trim(); + if (!s) return null; + const n = Number(s); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function numStr(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null; + if (typeof value === 'string') return value.trim() || null; + return null; +} + +function jsonNormalize(value) { + if (typeof value !== 'string') return value; + const s = value.trim(); + if (!s) return null; + try { + return JSON.parse(s); + } catch { + return value; + } +} + +function resolveConfig() { + const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql'; + const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined; + const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); + const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); + const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200'); + + const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000); + const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000); + if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) + throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`); + if (!Number.isFinite(basePrecision) || basePrecision <= 0) + throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`); + + return { + hasuraUrl, + hasuraAdminSecret, + hasuraAuthToken, + markets, + pollMs, + bandsBps, + pricePrecision, + basePrecision, + }; +} + +async function graphqlRequest(cfg, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (cfg.hasuraAuthToken) { + headers.authorization = `Bearer ${cfg.hasuraAuthToken}`; + } else if (cfg.hasuraAdminSecret) { + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } else { + throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)'); + } + + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(10_000), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = JSON.parse(text); + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +function parseLevels(raw, pricePrecision, basePrecision, side) { + const v = jsonNormalize(raw); + if (!Array.isArray(v)) return []; + + const out = []; + for (const item of v) { + const priceInt = toNumberOrNull(item?.price); + const sizeInt = toNumberOrNull(item?.size); + if (priceInt == null || sizeInt == null) continue; + const price = priceInt / pricePrecision; + const size = sizeInt / basePrecision; + if (!Number.isFinite(price) || !Number.isFinite(size)) continue; + out.push({ price, size }); + } + + if (side === 'bid') out.sort((a, b) => b.price - a.price); + if (side === 'ask') out.sort((a, b) => a.price - b.price); + return out; +} + +function computeMid(bestBid, bestAsk, markPrice, oraclePrice) { + if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2; + if (markPrice != null) return markPrice; + if (oraclePrice != null) return oraclePrice; + return null; +} + +function computeBandDepth({ bids, asks, mid, bandBps }) { + if (mid == null || !(mid > 0)) { + return { bidBase: 0, askBase: 0, bidUsd: 0, askUsd: 0, imbalance: null }; + } + + const minBidPrice = mid * (1 - bandBps / 10_000); + const maxAskPrice = mid * (1 + bandBps / 10_000); + + let bidBase = 0; + let askBase = 0; + let bidUsd = 0; + let askUsd = 0; + + for (const lvl of bids) { + if (lvl.price < minBidPrice) break; + bidBase += lvl.size; + bidUsd += lvl.size * lvl.price; + } + + for (const lvl of asks) { + if (lvl.price > maxAskPrice) break; + askBase += lvl.size; + askUsd += lvl.size * lvl.price; + } + + const denom = bidUsd + askUsd; + const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null; + return { bidBase, askBase, bidUsd, askUsd, imbalance }; +} + +async function fetchL2Latest(cfg) { + const query = ` + query DlobL2Latest($markets: [String!]!) { + dlob_l2_latest(where: {market_name: {_in: $markets}}) { + market_name + market_type + market_index + ts + slot + mark_price + oracle_price + best_bid_price + best_ask_price + bids + asks + updated_at + } + } + `; + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; +} + +async function upsertDepth(cfg, rows) { + if (!rows.length) return; + const mutation = ` + mutation UpsertDlobDepth($rows: [dlob_depth_bps_latest_insert_input!]!) { + insert_dlob_depth_bps_latest( + objects: $rows + on_conflict: { + constraint: dlob_depth_bps_latest_pkey + update_columns: [ + market_type + market_index + ts + slot + mid_price + best_bid_price + best_ask_price + bid_base + ask_base + bid_usd + ask_usd + imbalance + raw + updated_at + ] + } + ) { affected_rows } + } + `; + await graphqlRequest(cfg, mutation, { rows }); +} + +async function main() { + const cfg = resolveConfig(); + const lastUpdatedAtByMarket = new Map(); + + console.log( + JSON.stringify( + { + service: 'dlob-depth-worker', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', + markets: cfg.markets, + pollMs: cfg.pollMs, + bandsBps: cfg.bandsBps, + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }, + null, + 2 + ) + ); + + while (true) { + const rows = []; + + try { + const l2Rows = await fetchL2Latest(cfg); + for (const l2 of l2Rows) { + const market = String(l2.market_name || '').trim(); + if (!market) continue; + + const updatedAt = l2.updated_at || null; + if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue; + if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt); + + const bestBid = toNumberOrNull(l2.best_bid_price); + const bestAsk = toNumberOrNull(l2.best_ask_price); + const markPrice = toNumberOrNull(l2.mark_price); + const oraclePrice = toNumberOrNull(l2.oracle_price); + const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice); + + const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid'); + const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask'); + + for (const bandBps of cfg.bandsBps) { + const d = computeBandDepth({ bids, asks, mid, bandBps }); + rows.push({ + market_name: market, + band_bps: bandBps, + market_type: l2.market_type ? String(l2.market_type) : 'perp', + market_index: typeof l2.market_index === 'number' ? l2.market_index : null, + ts: l2.ts == null ? null : String(l2.ts), + slot: l2.slot == null ? null : String(l2.slot), + mid_price: numStr(mid), + best_bid_price: numStr(bestBid), + best_ask_price: numStr(bestAsk), + bid_base: numStr(d.bidBase), + ask_base: numStr(d.askBase), + bid_usd: numStr(d.bidUsd), + ask_usd: numStr(d.askUsd), + imbalance: numStr(d.imbalance), + raw: { + ref: 'mid', + pricePrecision: cfg.pricePrecision, + basePrecision: cfg.basePrecision, + }, + updated_at: updatedAt, + }); + } + } + } catch (err) { + console.error(`[dlob-depth-worker] fetch/compute: ${String(err?.message || err)}`); + } + + try { + await upsertDepth(cfg, rows); + } catch (err) { + console.error(`[dlob-depth-worker] upsert: ${String(err?.message || err)}`); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +});