diff --git a/kustomize/overlays/staging/dlob-ingestor.mjs b/kustomize/overlays/staging/dlob-ingestor.mjs new file mode 100644 index 0000000..ac9e82f --- /dev/null +++ b/kustomize/overlays/staging/dlob-ingestor.mjs @@ -0,0 +1,219 @@ +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function getIsoNow() { + return new Date().toISOString(); +} + +function envString(name, fallback) { + const v = process.env[name]; + if (v == null) return fallback; + const s = String(v).trim(); + return s ? s : fallback; +} + +function envInt(name, fallback, { min, max } = {}) { + const v = process.env[name]; + if (v == null) return fallback; + const n = Number.parseInt(String(v), 10); + if (!Number.isFinite(n)) return fallback; + const low = typeof min === 'number' ? min : n; + const high = typeof max === 'number' ? max : n; + return Math.max(low, Math.min(high, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function toIntOrNull(v) { + if (v == null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null; + if (typeof v === 'string') { + const s = v.trim(); + if (!s) return null; + const n = Number.parseInt(s, 10); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function numStr(v) { + if (v == null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null; + if (typeof v === 'string') { + const s = v.trim(); + return s ? s : null; + } + return null; +} + +function isoFromEpochMs(v) { + const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN; + if (!Number.isFinite(n) || n <= 0) return null; + const d = new Date(n); + const ms = d.getTime(); + if (!Number.isFinite(ms)) return null; + return d.toISOString(); +} + +function resolveConfig() { + const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql'); + const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', ''); + if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); + + const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); + const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 }); + const source = envString('TICKS_SOURCE', 'dlob_stats'); + + return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source }; +} + +async function graphqlRequest(cfg, query, variables) { + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-hasura-admin-secret': cfg.hasuraAdminSecret, + }, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(10_000), + }); + + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + + let json; + try { + json = JSON.parse(text); + } catch { + throw new Error(`Hasura: invalid json: ${text}`); + } + + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +async function fetchStats(cfg) { + const query = ` + query DlobStatsLatest($markets: [String!]!) { + dlob_stats_latest(where: { market_name: { _in: $markets } }) { + market_name + market_index + ts + slot + oracle_price + mark_price + mid_price + best_bid_price + best_ask_price + updated_at + } + } + `; + + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + const rows = Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : []; + return rows; +} + +async function insertTicks(cfg, objects) { + if (!objects.length) return 0; + + const mutation = ` + mutation InsertTicks($objects: [drift_ticks_insert_input!]!) { + insert_drift_ticks(objects: $objects) { affected_rows } + } + `; + + const data = await graphqlRequest(cfg, mutation, { objects }); + return Number(data?.insert_drift_ticks?.affected_rows || 0); +} + +async function main() { + const cfg = resolveConfig(); + const lastUpdatedAtByMarket = new Map(); + + console.log( + JSON.stringify( + { + service: 'trade-ingestor', + mode: 'dlob_stats_ticks', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + markets: cfg.markets, + pollMs: cfg.pollMs, + source: cfg.source, + }, + null, + 2 + ) + ); + + while (true) { + try { + const rows = await fetchStats(cfg); + const nowIso = getIsoNow(); + + const objects = []; + for (const r of rows) { + const marketName = String(r?.market_name || '').trim(); + if (!marketName) continue; + + const updatedAt = r?.updated_at ? String(r.updated_at) : ''; + if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue; + if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt); + + const marketIndex = toIntOrNull(r?.market_index) ?? 0; + const dlobIso = isoFromEpochMs(r?.ts); + const tsIso = dlobIso || nowIso; + + const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price); + const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice; + if (!oraclePrice) continue; + + objects.push({ + ts: tsIso, + market_index: marketIndex, + symbol: marketName, + oracle_price: oraclePrice, + mark_price: markPrice, + oracle_slot: r?.slot == null ? null : String(r.slot), + source: cfg.source, + raw: { + from: 'dlob_stats_latest', + market_name: marketName, + market_index: marketIndex, + dlob: { + ts: r?.ts ?? null, + slot: r?.slot ?? null, + best_bid_price: r?.best_bid_price ?? null, + best_ask_price: r?.best_ask_price ?? null, + mid_price: r?.mid_price ?? null, + updated_at: updatedAt || null, + }, + }, + }); + } + + const inserted = await insertTicks(cfg, objects); + if (inserted) { + console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`); + } + } catch (err) { + console.error(`[dlob-ticks] error: ${String(err?.message || err)}`); + await sleep(2_000); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +});