import process from 'node:process'; import { setTimeout as sleep } from 'node:timers/promises'; function getIsoNow() { return new Date().toISOString(); } function envString(name, fallback) { const v = process.env[name]; if (v == null) return fallback; const s = String(v).trim(); return s ? s : fallback; } function envInt(name, fallback, { min, max } = {}) { const v = process.env[name]; if (v == null) return fallback; const n = Number.parseInt(String(v), 10); if (!Number.isFinite(n)) return fallback; const low = typeof min === 'number' ? min : n; const high = typeof max === 'number' ? max : n; return Math.max(low, Math.min(high, n)); } function envList(name, fallbackCsv) { const raw = process.env[name] ?? fallbackCsv; return String(raw) .split(',') .map((s) => s.trim()) .filter(Boolean); } function toIntOrNull(v) { if (v == null) return null; if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null; if (typeof v === 'string') { const s = v.trim(); if (!s) return null; const n = Number.parseInt(s, 10); return Number.isFinite(n) ? n : null; } return null; } function numStr(v) { if (v == null) return null; if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null; if (typeof v === 'string') { const s = v.trim(); return s ? s : null; } return null; } function isoFromEpochMs(v) { const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN; if (!Number.isFinite(n) || n <= 0) return null; const d = new Date(n); const ms = d.getTime(); if (!Number.isFinite(ms)) return null; return d.toISOString(); } function resolveConfig() { const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql'); const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', ''); if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); const markets = envList('DLOB_MARKETS', 'SOL-PERP,PUMP-PERP'); const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 }); const source = envString('TICKS_SOURCE', 'dlob_stats'); return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source }; } async function graphqlRequest(cfg, query, variables) { const res = await fetch(cfg.hasuraUrl, { method: 'POST', headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret, }, body: JSON.stringify({ query, variables }), signal: AbortSignal.timeout(10_000), }); const text = await res.text(); if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); let json; try { json = JSON.parse(text); } catch { throw new Error(`Hasura: invalid json: ${text}`); } if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); return json.data; } async function fetchStats(cfg) { const query = ` query DlobStatsLatest($markets: [String!]!) { dlob_hot_derived_latest(where: { market_name: { _in: $markets }, is_indicative: { _eq: false } }) { market_name market_index ts_ms slot oracle_price mark_price mid_price best_bid_price best_ask_price updated_at } dlob_all_derived_latest(where: { market_name: { _in: $markets }, is_indicative: { _eq: false } }) { market_name market_index ts_ms slot oracle_price mark_price mid_price best_bid_price best_ask_price updated_at } } `; const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); const merged = new Map(); for (const row of Array.isArray(data?.dlob_all_derived_latest) ? data.dlob_all_derived_latest : []) { const marketName = String(row?.market_name || '').trim(); if (!marketName) continue; merged.set(marketName, { ...row, __from: 'dlob_all_derived_latest' }); } for (const row of Array.isArray(data?.dlob_hot_derived_latest) ? data.dlob_hot_derived_latest : []) { const marketName = String(row?.market_name || '').trim(); if (!marketName) continue; merged.set(marketName, { ...row, __from: 'dlob_hot_derived_latest' }); } return Array.from(merged.values()); } async function insertTicks(cfg, objects) { if (!objects.length) return 0; const mutation = ` mutation InsertTicks($objects: [drift_ticks_insert_input!]!) { insert_drift_ticks(objects: $objects) { affected_rows } } `; const data = await graphqlRequest(cfg, mutation, { objects }); return Number(data?.insert_drift_ticks?.affected_rows || 0); } async function main() { const cfg = resolveConfig(); const lastUpdatedAtByMarket = new Map(); console.log( JSON.stringify( { service: 'trade-ingestor', mode: 'dlob_stats_ticks', startedAt: getIsoNow(), hasuraUrl: cfg.hasuraUrl, markets: cfg.markets, pollMs: cfg.pollMs, source: cfg.source, }, null, 2 ) ); while (true) { try { const rows = await fetchStats(cfg); const nowIso = getIsoNow(); const objects = []; for (const r of rows) { const marketName = String(r?.market_name || '').trim(); if (!marketName) continue; const updatedAt = r?.updated_at ? String(r.updated_at) : ''; if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue; if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt); const marketIndex = toIntOrNull(r?.market_index) ?? 0; const dlobIso = isoFromEpochMs(r?.ts_ms); const tsIso = dlobIso || nowIso; const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price); const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice; if (!oraclePrice) continue; objects.push({ ts: tsIso, market_index: marketIndex, symbol: marketName, oracle_price: oraclePrice, mark_price: markPrice, oracle_slot: r?.slot == null ? null : String(r.slot), source: cfg.source, raw: { from: r?.__from || 'dlob_all_derived_latest', market_name: marketName, market_index: marketIndex, dlob: { ts_ms: r?.ts_ms ?? null, slot: r?.slot ?? null, best_bid_price: r?.best_bid_price ?? null, best_ask_price: r?.best_ask_price ?? null, mid_price: r?.mid_price ?? null, updated_at: updatedAt || null, }, }, }); } const inserted = await insertTicks(cfg, objects); if (inserted) { console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`); } } catch (err) { console.error(`[dlob-ticks] error: ${String(err?.message || err)}`); await sleep(2_000); } await sleep(cfg.pollMs); } } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });