Files
trade-deploy/kustomize/base/dlob-ts-archiver/worker.mjs
u1 57433c7e75 feat(dlob): support two sources + per-user switch
- Add "source" column + composite PKs for DLOB tables\n- Filter public Hasura selects by X-Hasura-Dlob-Source\n- Run parallel workers for mevnode + dlob.drift.trade\n- Frontend proxy sets x-hasura-dlob-source from cookie and injects UI switch
2026-02-13 11:25:32 +01:00

255 lines
8.6 KiB
JavaScript

import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, dlobSource, markets, pollMs };
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
function mapBigint(v) {
if (v == null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? String(Math.trunc(v)) : null;
if (typeof v === 'string') return v.trim() || null;
return null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'dlob-ts-archiver',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets,
pollMs: cfg.pollMs,
},
null,
2
)
);
while (true) {
const now = getIsoNow();
try {
const query = `
query Latest($source: String!, $markets: [String!]!) {
dlob_stats_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name market_type market_index ts slot
mark_price oracle_price best_bid_price best_ask_price mid_price
spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance
raw
}
dlob_depth_bps_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name band_bps market_type market_index ts slot
mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance
raw
}
dlob_slippage_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
dlob_slippage_latest_v2(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
}
`;
const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
const statsRows = (data?.dlob_stats_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mark_price: r.mark_price ?? null,
oracle_price: r.oracle_price ?? null,
best_bid_price: r.best_bid_price ?? null,
best_ask_price: r.best_ask_price ?? null,
mid_price: r.mid_price ?? null,
spread_abs: r.spread_abs ?? null,
spread_bps: r.spread_bps ?? null,
depth_levels: r.depth_levels ?? null,
depth_bid_base: r.depth_bid_base ?? null,
depth_ask_base: r.depth_ask_base ?? null,
depth_bid_usd: r.depth_bid_usd ?? null,
depth_ask_usd: r.depth_ask_usd ?? null,
imbalance: r.imbalance ?? null,
raw: r.raw ?? null,
}));
const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
band_bps: r.band_bps,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
best_bid_price: r.best_bid_price ?? null,
best_ask_price: r.best_ask_price ?? null,
bid_base: r.bid_base ?? null,
ask_base: r.ask_base ?? null,
bid_usd: r.bid_usd ?? null,
ask_usd: r.ask_usd ?? null,
imbalance: r.imbalance ?? null,
raw: r.raw ?? null,
}));
const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
side: r.side,
size_usd: r.size_usd,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
vwap_price: r.vwap_price ?? null,
worst_price: r.worst_price ?? null,
filled_usd: r.filled_usd ?? null,
filled_base: r.filled_base ?? null,
impact_bps: r.impact_bps ?? null,
levels_consumed: r.levels_consumed ?? null,
fill_pct: r.fill_pct ?? null,
raw: r.raw ?? null,
}));
const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
side: r.side,
size_usd: r.size_usd,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
vwap_price: r.vwap_price ?? null,
worst_price: r.worst_price ?? null,
filled_usd: r.filled_usd ?? null,
filled_base: r.filled_base ?? null,
impact_bps: r.impact_bps ?? null,
levels_consumed: r.levels_consumed ?? null,
fill_pct: r.fill_pct ?? null,
raw: r.raw ?? null,
}));
const mutation = `
mutation InsertTs(
$stats: [dlob_stats_ts_insert_input!]!
$depth: [dlob_depth_bps_ts_insert_input!]!
$slip: [dlob_slippage_ts_insert_input!]!
$slipV2: [dlob_slippage_ts_v2_insert_input!]!
) {
insert_dlob_stats_ts(objects: $stats) { affected_rows }
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
insert_dlob_slippage_ts(objects: $slip) { affected_rows }
insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 });
} catch (err) {
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});