feat(staging): ingest ticks from dlob stats
This commit is contained in:
219
kustomize/overlays/staging/dlob-ingestor.mjs
Normal file
219
kustomize/overlays/staging/dlob-ingestor.mjs
Normal file
@@ -0,0 +1,219 @@
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function envString(name, fallback) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const s = String(v).trim();
|
||||
return s ? s : fallback;
|
||||
}
|
||||
|
||||
function envInt(name, fallback, { min, max } = {}) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const n = Number.parseInt(String(v), 10);
|
||||
if (!Number.isFinite(n)) return fallback;
|
||||
const low = typeof min === 'number' ? min : n;
|
||||
const high = typeof max === 'number' ? max : n;
|
||||
return Math.max(low, Math.min(high, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function toIntOrNull(v) {
|
||||
if (v == null) return null;
|
||||
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
|
||||
if (typeof v === 'string') {
|
||||
const s = v.trim();
|
||||
if (!s) return null;
|
||||
const n = Number.parseInt(s, 10);
|
||||
return Number.isFinite(n) ? n : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(v) {
|
||||
if (v == null) return null;
|
||||
if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null;
|
||||
if (typeof v === 'string') {
|
||||
const s = v.trim();
|
||||
return s ? s : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function isoFromEpochMs(v) {
|
||||
const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN;
|
||||
if (!Number.isFinite(n) || n <= 0) return null;
|
||||
const d = new Date(n);
|
||||
const ms = d.getTime();
|
||||
if (!Number.isFinite(ms)) return null;
|
||||
return d.toISOString();
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql');
|
||||
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
|
||||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
|
||||
const source = envString('TICKS_SOURCE', 'dlob_stats');
|
||||
|
||||
return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source };
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
|
||||
},
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
|
||||
let json;
|
||||
try {
|
||||
json = JSON.parse(text);
|
||||
} catch {
|
||||
throw new Error(`Hasura: invalid json: ${text}`);
|
||||
}
|
||||
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
async function fetchStats(cfg) {
|
||||
const query = `
|
||||
query DlobStatsLatest($markets: [String!]!) {
|
||||
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
|
||||
market_name
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
oracle_price
|
||||
mark_price
|
||||
mid_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
updated_at
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
const rows = Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : [];
|
||||
return rows;
|
||||
}
|
||||
|
||||
async function insertTicks(cfg, objects) {
|
||||
if (!objects.length) return 0;
|
||||
|
||||
const mutation = `
|
||||
mutation InsertTicks($objects: [drift_ticks_insert_input!]!) {
|
||||
insert_drift_ticks(objects: $objects) { affected_rows }
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, mutation, { objects });
|
||||
return Number(data?.insert_drift_ticks?.affected_rows || 0);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastUpdatedAtByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'trade-ingestor',
|
||||
mode: 'dlob_stats_ticks',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
source: cfg.source,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const rows = await fetchStats(cfg);
|
||||
const nowIso = getIsoNow();
|
||||
|
||||
const objects = [];
|
||||
for (const r of rows) {
|
||||
const marketName = String(r?.market_name || '').trim();
|
||||
if (!marketName) continue;
|
||||
|
||||
const updatedAt = r?.updated_at ? String(r.updated_at) : '';
|
||||
if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue;
|
||||
if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt);
|
||||
|
||||
const marketIndex = toIntOrNull(r?.market_index) ?? 0;
|
||||
const dlobIso = isoFromEpochMs(r?.ts);
|
||||
const tsIso = dlobIso || nowIso;
|
||||
|
||||
const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price);
|
||||
const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice;
|
||||
if (!oraclePrice) continue;
|
||||
|
||||
objects.push({
|
||||
ts: tsIso,
|
||||
market_index: marketIndex,
|
||||
symbol: marketName,
|
||||
oracle_price: oraclePrice,
|
||||
mark_price: markPrice,
|
||||
oracle_slot: r?.slot == null ? null : String(r.slot),
|
||||
source: cfg.source,
|
||||
raw: {
|
||||
from: 'dlob_stats_latest',
|
||||
market_name: marketName,
|
||||
market_index: marketIndex,
|
||||
dlob: {
|
||||
ts: r?.ts ?? null,
|
||||
slot: r?.slot ?? null,
|
||||
best_bid_price: r?.best_bid_price ?? null,
|
||||
best_ask_price: r?.best_ask_price ?? null,
|
||||
mid_price: r?.mid_price ?? null,
|
||||
updated_at: updatedAt || null,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const inserted = await insertTicks(cfg, objects);
|
||||
if (inserted) {
|
||||
console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[dlob-ticks] error: ${String(err?.message || err)}`);
|
||||
await sleep(2_000);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
Reference in New Issue
Block a user