feat(staging): add dlob ts history archiver
This commit is contained in:
197
kustomize/base/dlob-ts-archiver/worker.mjs
Normal file
197
kustomize/base/dlob-ts-archiver/worker.mjs
Normal file
@@ -0,0 +1,197 @@
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function clampInt(value, min, max, fallback) {
|
||||
const n = Number.parseInt(String(value ?? ''), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
return Math.min(max, Math.max(min, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim();
|
||||
const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim();
|
||||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000);
|
||||
|
||||
return { hasuraUrl, hasuraAdminSecret, markets, pollMs };
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
|
||||
},
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function mapBigint(v) {
|
||||
if (v == null) return null;
|
||||
if (typeof v === 'number') return Number.isFinite(v) ? String(Math.trunc(v)) : null;
|
||||
if (typeof v === 'string') return v.trim() || null;
|
||||
return null;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-ts-archiver',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
const now = getIsoNow();
|
||||
|
||||
try {
|
||||
const query = `
|
||||
query Latest($markets: [String!]!) {
|
||||
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
|
||||
market_name market_type market_index ts slot
|
||||
mark_price oracle_price best_bid_price best_ask_price mid_price
|
||||
spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance
|
||||
raw
|
||||
}
|
||||
dlob_depth_bps_latest(where: { market_name: { _in: $markets } }) {
|
||||
market_name band_bps market_type market_index ts slot
|
||||
mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance
|
||||
raw
|
||||
}
|
||||
dlob_slippage_latest(where: { market_name: { _in: $markets } }) {
|
||||
market_name side size_usd market_type market_index ts slot
|
||||
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
|
||||
raw
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
|
||||
const statsRows = (data?.dlob_stats_latest || []).map((r) => ({
|
||||
ts: now,
|
||||
market_name: r.market_name,
|
||||
market_type: r.market_type,
|
||||
market_index: r.market_index ?? null,
|
||||
source_ts: mapBigint(r.ts),
|
||||
slot: mapBigint(r.slot),
|
||||
mark_price: r.mark_price ?? null,
|
||||
oracle_price: r.oracle_price ?? null,
|
||||
best_bid_price: r.best_bid_price ?? null,
|
||||
best_ask_price: r.best_ask_price ?? null,
|
||||
mid_price: r.mid_price ?? null,
|
||||
spread_abs: r.spread_abs ?? null,
|
||||
spread_bps: r.spread_bps ?? null,
|
||||
depth_levels: r.depth_levels ?? null,
|
||||
depth_bid_base: r.depth_bid_base ?? null,
|
||||
depth_ask_base: r.depth_ask_base ?? null,
|
||||
depth_bid_usd: r.depth_bid_usd ?? null,
|
||||
depth_ask_usd: r.depth_ask_usd ?? null,
|
||||
imbalance: r.imbalance ?? null,
|
||||
raw: r.raw ?? null,
|
||||
}));
|
||||
|
||||
const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({
|
||||
ts: now,
|
||||
market_name: r.market_name,
|
||||
band_bps: r.band_bps,
|
||||
market_type: r.market_type,
|
||||
market_index: r.market_index ?? null,
|
||||
source_ts: mapBigint(r.ts),
|
||||
slot: mapBigint(r.slot),
|
||||
mid_price: r.mid_price ?? null,
|
||||
best_bid_price: r.best_bid_price ?? null,
|
||||
best_ask_price: r.best_ask_price ?? null,
|
||||
bid_base: r.bid_base ?? null,
|
||||
ask_base: r.ask_base ?? null,
|
||||
bid_usd: r.bid_usd ?? null,
|
||||
ask_usd: r.ask_usd ?? null,
|
||||
imbalance: r.imbalance ?? null,
|
||||
raw: r.raw ?? null,
|
||||
}));
|
||||
|
||||
const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({
|
||||
ts: now,
|
||||
market_name: r.market_name,
|
||||
side: r.side,
|
||||
size_usd: r.size_usd,
|
||||
market_type: r.market_type,
|
||||
market_index: r.market_index ?? null,
|
||||
source_ts: mapBigint(r.ts),
|
||||
slot: mapBigint(r.slot),
|
||||
mid_price: r.mid_price ?? null,
|
||||
vwap_price: r.vwap_price ?? null,
|
||||
worst_price: r.worst_price ?? null,
|
||||
filled_usd: r.filled_usd ?? null,
|
||||
filled_base: r.filled_base ?? null,
|
||||
impact_bps: r.impact_bps ?? null,
|
||||
levels_consumed: r.levels_consumed ?? null,
|
||||
fill_pct: r.fill_pct ?? null,
|
||||
raw: r.raw ?? null,
|
||||
}));
|
||||
|
||||
if (!statsRows.length && !depthRows.length && !slippageRows.length) {
|
||||
await sleep(cfg.pollMs);
|
||||
continue;
|
||||
}
|
||||
|
||||
const mutation = `
|
||||
mutation InsertTs(
|
||||
$stats: [dlob_stats_ts_insert_input!]!
|
||||
$depth: [dlob_depth_bps_ts_insert_input!]!
|
||||
$slip: [dlob_slippage_ts_insert_input!]!
|
||||
) {
|
||||
insert_dlob_stats_ts(objects: $stats) { affected_rows }
|
||||
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
|
||||
insert_dlob_slippage_ts(objects: $slip) { affected_rows }
|
||||
}
|
||||
`;
|
||||
|
||||
await graphqlRequest(cfg, mutation, {
|
||||
stats: statsRows,
|
||||
depth: depthRows,
|
||||
slip: slippageRows,
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user