431 lines
13 KiB
JavaScript
431 lines
13 KiB
JavaScript
import fs from 'node:fs';
|
|
import * as http from 'node:http';
|
|
import * as https from 'node:https';
|
|
import process from 'node:process';
|
|
import { setTimeout as sleep } from 'node:timers/promises';
|
|
|
|
function readJsonFile(filePath) {
|
|
try {
|
|
const raw = fs.readFileSync(filePath, 'utf8');
|
|
return JSON.parse(raw);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
function getIsoNow() {
|
|
return new Date().toISOString();
|
|
}
|
|
|
|
function clampInt(value, min, max, fallback) {
|
|
const n = Number.parseInt(String(value ?? ''), 10);
|
|
if (!Number.isInteger(n)) return fallback;
|
|
return Math.min(max, Math.max(min, n));
|
|
}
|
|
|
|
function envList(name, fallbackCsv) {
|
|
const raw = process.env[name] ?? fallbackCsv;
|
|
return String(raw)
|
|
.split(',')
|
|
.map((s) => s.trim())
|
|
.filter(Boolean);
|
|
}
|
|
|
|
function envBool(name, fallback = false) {
|
|
const raw = process.env[name];
|
|
if (raw == null) return fallback;
|
|
const v = String(raw).trim().toLowerCase();
|
|
if (['1', 'true', 'yes', 'y', 'on'].includes(v)) return true;
|
|
if (['0', 'false', 'no', 'n', 'off'].includes(v)) return false;
|
|
return fallback;
|
|
}
|
|
|
|
function resolveConfig() {
|
|
const tokensPath =
|
|
process.env.HASURA_TOKENS_FILE ||
|
|
process.env.TOKENS_FILE ||
|
|
process.env.HASURA_CONFIG_FILE ||
|
|
'/app/tokens/hasura.json';
|
|
const tokens = readJsonFile(tokensPath) || {};
|
|
|
|
const hasuraUrl =
|
|
process.env.HASURA_GRAPHQL_URL ||
|
|
tokens.graphqlUrl ||
|
|
tokens.apiUrl ||
|
|
'http://hasura:8080/v1/graphql';
|
|
const hasuraAdminSecret =
|
|
process.env.HASURA_ADMIN_SECRET ||
|
|
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
|
tokens.adminSecret ||
|
|
tokens.hasuraAdminSecret;
|
|
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
|
|
|
const dlobHttpBase = String(process.env.DLOB_HTTP_URL || process.env.DLOB_HTTP_BASE || 'https://dlob.drift.trade')
|
|
.trim()
|
|
.replace(/\/$/, '');
|
|
const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false);
|
|
|
|
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
|
const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10);
|
|
const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500);
|
|
|
|
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
|
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
|
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
|
|
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
|
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
|
|
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
|
|
|
return {
|
|
hasuraUrl,
|
|
hasuraAdminSecret,
|
|
hasuraAuthToken,
|
|
dlobHttpBase,
|
|
dlobForceIpv6,
|
|
markets,
|
|
depth,
|
|
pollMs,
|
|
pricePrecision,
|
|
basePrecision,
|
|
};
|
|
}
|
|
|
|
async function requestText(url, { timeoutMs, family } = {}) {
|
|
const u = new URL(url);
|
|
const client = u.protocol === 'https:' ? https : http;
|
|
|
|
const port = u.port ? Number.parseInt(u.port, 10) : u.protocol === 'https:' ? 443 : 80;
|
|
if (!Number.isFinite(port)) throw new Error(`Invalid port for url: ${url}`);
|
|
|
|
return await new Promise((resolve, reject) => {
|
|
const req = client.request(
|
|
{
|
|
protocol: u.protocol,
|
|
hostname: u.hostname,
|
|
port,
|
|
path: `${u.pathname}${u.search}`,
|
|
method: 'GET',
|
|
family,
|
|
servername: u.hostname,
|
|
headers: {
|
|
accept: 'application/json',
|
|
},
|
|
},
|
|
(res) => {
|
|
let data = '';
|
|
res.setEncoding('utf8');
|
|
res.on('data', (chunk) => {
|
|
data += chunk;
|
|
});
|
|
res.on('end', () => {
|
|
resolve({ status: res.statusCode ?? 0, text: data });
|
|
});
|
|
}
|
|
);
|
|
|
|
req.on('error', reject);
|
|
req.setTimeout(timeoutMs ?? 5_000, () => {
|
|
req.destroy(new Error(`Timeout after ${timeoutMs ?? 5_000}ms`));
|
|
});
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function graphqlRequest(cfg, query, variables) {
|
|
const headers = { 'content-type': 'application/json' };
|
|
if (cfg.hasuraAuthToken) {
|
|
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
|
} else if (cfg.hasuraAdminSecret) {
|
|
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
|
} else {
|
|
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
|
|
}
|
|
|
|
const res = await fetch(cfg.hasuraUrl, {
|
|
method: 'POST',
|
|
headers,
|
|
body: JSON.stringify({ query, variables }),
|
|
signal: AbortSignal.timeout(10_000),
|
|
});
|
|
const text = await res.text();
|
|
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
|
const json = JSON.parse(text);
|
|
if (json.errors?.length) {
|
|
throw new Error(json.errors.map((e) => e.message).join(' | '));
|
|
}
|
|
return json.data;
|
|
}
|
|
|
|
function toNumberOrNull(value) {
|
|
if (value == null) return null;
|
|
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
|
|
if (typeof value === 'string') {
|
|
const s = value.trim();
|
|
if (!s) return null;
|
|
const n = Number(s);
|
|
return Number.isFinite(n) ? n : null;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function numStr(value) {
|
|
if (value == null) return null;
|
|
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
|
|
if (typeof value === 'string') return value.trim() || null;
|
|
return null;
|
|
}
|
|
|
|
function parseScaled(valueRaw, scale) {
|
|
const n = toNumberOrNull(valueRaw);
|
|
if (n == null) return null;
|
|
return n / scale;
|
|
}
|
|
|
|
function computeStats({ l2, depth, pricePrecision, basePrecision }) {
|
|
const bids = Array.isArray(l2?.bids) ? l2.bids : [];
|
|
const asks = Array.isArray(l2?.asks) ? l2.asks : [];
|
|
|
|
const bestBid = parseScaled(l2?.bestBidPrice ?? bids?.[0]?.price, pricePrecision);
|
|
const bestAsk = parseScaled(l2?.bestAskPrice ?? asks?.[0]?.price, pricePrecision);
|
|
const markPrice = parseScaled(l2?.markPrice, pricePrecision);
|
|
const oraclePrice = parseScaled(l2?.oracleData?.price ?? l2?.oracle, pricePrecision);
|
|
|
|
const mid = bestBid != null && bestAsk != null ? (bestBid + bestAsk) / 2 : null;
|
|
const spreadAbs = bestBid != null && bestAsk != null ? bestAsk - bestBid : null;
|
|
const spreadBps = spreadAbs != null && mid != null && mid > 0 ? (spreadAbs / mid) * 10_000 : null;
|
|
|
|
const levels = Math.max(1, depth);
|
|
let bidBase = 0;
|
|
let askBase = 0;
|
|
let bidUsd = 0;
|
|
let askUsd = 0;
|
|
|
|
for (let i = 0; i < Math.min(levels, bids.length); i += 1) {
|
|
const p = parseScaled(bids[i]?.price, pricePrecision);
|
|
const s = toNumberOrNull(bids[i]?.size);
|
|
if (p == null || s == null) continue;
|
|
const base = s / basePrecision;
|
|
bidBase += base;
|
|
bidUsd += base * p;
|
|
}
|
|
|
|
for (let i = 0; i < Math.min(levels, asks.length); i += 1) {
|
|
const p = parseScaled(asks[i]?.price, pricePrecision);
|
|
const s = toNumberOrNull(asks[i]?.size);
|
|
if (p == null || s == null) continue;
|
|
const base = s / basePrecision;
|
|
askBase += base;
|
|
askUsd += base * p;
|
|
}
|
|
|
|
const denom = bidUsd + askUsd;
|
|
const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null;
|
|
|
|
return {
|
|
bestBid,
|
|
bestAsk,
|
|
mid,
|
|
spreadAbs,
|
|
spreadBps,
|
|
markPrice,
|
|
oraclePrice,
|
|
depthLevels: levels,
|
|
bidBase,
|
|
askBase,
|
|
bidUsd,
|
|
askUsd,
|
|
imbalance,
|
|
};
|
|
}
|
|
|
|
function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
|
|
return {
|
|
market_name: String(l2.marketName),
|
|
market_type: String(l2.marketType || 'perp'),
|
|
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
|
|
ts: l2.ts == null ? null : String(l2.ts),
|
|
slot: l2.slot == null ? null : String(l2.slot),
|
|
mark_price: numStr(parseScaled(l2.markPrice, pricePrecision)),
|
|
oracle_price: numStr(parseScaled(l2.oracleData?.price ?? l2.oracle, pricePrecision)),
|
|
best_bid_price: numStr(parseScaled(l2.bestBidPrice, pricePrecision)),
|
|
best_ask_price: numStr(parseScaled(l2.bestAskPrice, pricePrecision)),
|
|
bids: l2.bids ?? null,
|
|
asks: l2.asks ?? null,
|
|
raw: l2 ?? null,
|
|
updated_at: updatedAt,
|
|
};
|
|
}
|
|
|
|
function statsToInsertObject({ l2, stats, updatedAt }) {
|
|
return {
|
|
market_name: String(l2.marketName),
|
|
market_type: String(l2.marketType || 'perp'),
|
|
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
|
|
ts: l2.ts == null ? null : String(l2.ts),
|
|
slot: l2.slot == null ? null : String(l2.slot),
|
|
mark_price: stats.markPrice == null ? null : String(stats.markPrice),
|
|
oracle_price: stats.oraclePrice == null ? null : String(stats.oraclePrice),
|
|
best_bid_price: stats.bestBid == null ? null : String(stats.bestBid),
|
|
best_ask_price: stats.bestAsk == null ? null : String(stats.bestAsk),
|
|
mid_price: stats.mid == null ? null : String(stats.mid),
|
|
spread_abs: stats.spreadAbs == null ? null : String(stats.spreadAbs),
|
|
spread_bps: stats.spreadBps == null ? null : String(stats.spreadBps),
|
|
depth_levels: stats.depthLevels,
|
|
depth_bid_base: Number.isFinite(stats.bidBase) ? String(stats.bidBase) : null,
|
|
depth_ask_base: Number.isFinite(stats.askBase) ? String(stats.askBase) : null,
|
|
depth_bid_usd: Number.isFinite(stats.bidUsd) ? String(stats.bidUsd) : null,
|
|
depth_ask_usd: Number.isFinite(stats.askUsd) ? String(stats.askUsd) : null,
|
|
imbalance: stats.imbalance == null ? null : String(stats.imbalance),
|
|
raw: {
|
|
spreadPct: l2.spreadPct ?? null,
|
|
spreadQuote: l2.spreadQuote ?? null,
|
|
},
|
|
updated_at: updatedAt,
|
|
};
|
|
}
|
|
|
|
async function fetchL2(cfg, marketName) {
|
|
const u = new URL(`${cfg.dlobHttpBase}/l2`);
|
|
u.searchParams.set('marketName', marketName);
|
|
u.searchParams.set('depth', String(cfg.depth));
|
|
|
|
const url = u.toString();
|
|
if (cfg.dlobForceIpv6) {
|
|
const { status, text } = await requestText(url, { timeoutMs: 5_000, family: 6 });
|
|
if (status < 200 || status >= 300) throw new Error(`DLOB HTTP ${status}: ${text}`);
|
|
return JSON.parse(text);
|
|
}
|
|
|
|
const res = await fetch(url, { signal: AbortSignal.timeout(5_000) });
|
|
const text = await res.text();
|
|
if (!res.ok) throw new Error(`DLOB HTTP ${res.status}: ${text}`);
|
|
return JSON.parse(text);
|
|
}
|
|
|
|
async function upsertBatch(cfg, l2Objects, statsObjects) {
|
|
if (!l2Objects.length && !statsObjects.length) return;
|
|
|
|
const mutation = `
|
|
mutation UpsertDlob($l2: [dlob_l2_latest_insert_input!]!, $stats: [dlob_stats_latest_insert_input!]!) {
|
|
insert_dlob_l2_latest(
|
|
objects: $l2
|
|
on_conflict: {
|
|
constraint: dlob_l2_latest_pkey
|
|
update_columns: [
|
|
market_type
|
|
market_index
|
|
ts
|
|
slot
|
|
mark_price
|
|
oracle_price
|
|
best_bid_price
|
|
best_ask_price
|
|
bids
|
|
asks
|
|
raw
|
|
updated_at
|
|
]
|
|
}
|
|
) { affected_rows }
|
|
insert_dlob_stats_latest(
|
|
objects: $stats
|
|
on_conflict: {
|
|
constraint: dlob_stats_latest_pkey
|
|
update_columns: [
|
|
market_type
|
|
market_index
|
|
ts
|
|
slot
|
|
mark_price
|
|
oracle_price
|
|
best_bid_price
|
|
best_ask_price
|
|
mid_price
|
|
spread_abs
|
|
spread_bps
|
|
depth_levels
|
|
depth_bid_base
|
|
depth_ask_base
|
|
depth_bid_usd
|
|
depth_ask_usd
|
|
imbalance
|
|
raw
|
|
updated_at
|
|
]
|
|
}
|
|
) { affected_rows }
|
|
}
|
|
`;
|
|
|
|
await graphqlRequest(cfg, mutation, { l2: l2Objects, stats: statsObjects });
|
|
}
|
|
|
|
async function main() {
|
|
const cfg = resolveConfig();
|
|
const lastTsByMarket = new Map();
|
|
|
|
console.log(
|
|
JSON.stringify(
|
|
{
|
|
service: 'dlob-worker',
|
|
startedAt: getIsoNow(),
|
|
hasuraUrl: cfg.hasuraUrl,
|
|
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
|
dlobHttpBase: cfg.dlobHttpBase,
|
|
dlobForceIpv6: cfg.dlobForceIpv6,
|
|
markets: cfg.markets,
|
|
depth: cfg.depth,
|
|
pollMs: cfg.pollMs,
|
|
},
|
|
null,
|
|
2
|
|
)
|
|
);
|
|
|
|
while (true) {
|
|
const updatedAt = getIsoNow();
|
|
|
|
const results = await Promise.allSettled(cfg.markets.map((m) => fetchL2(cfg, m)));
|
|
const l2Objects = [];
|
|
const statsObjects = [];
|
|
|
|
for (let i = 0; i < results.length; i += 1) {
|
|
const market = cfg.markets[i];
|
|
const r = results[i];
|
|
if (r.status !== 'fulfilled') {
|
|
console.error(`[dlob-worker] fetch ${market}: ${String(r.reason?.message || r.reason)}`);
|
|
continue;
|
|
}
|
|
const l2 = r.value;
|
|
if (!l2?.marketName) continue;
|
|
|
|
const ts = l2.ts == null ? null : String(l2.ts);
|
|
if (ts != null && lastTsByMarket.get(l2.marketName) === ts) continue;
|
|
if (ts != null) lastTsByMarket.set(l2.marketName, ts);
|
|
|
|
const stats = computeStats({
|
|
l2,
|
|
depth: cfg.depth,
|
|
pricePrecision: cfg.pricePrecision,
|
|
basePrecision: cfg.basePrecision,
|
|
});
|
|
|
|
l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision }));
|
|
statsObjects.push(statsToInsertObject({ l2, stats, updatedAt }));
|
|
}
|
|
|
|
try {
|
|
await upsertBatch(cfg, l2Objects, statsObjects);
|
|
} catch (err) {
|
|
console.error(`[dlob-worker] upsert: ${String(err?.message || err)}`);
|
|
}
|
|
|
|
await sleep(cfg.pollMs);
|
|
}
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error(String(err?.stack || err));
|
|
process.exitCode = 1;
|
|
});
|