- Add "source" column + composite PKs for DLOB tables\n- Filter public Hasura selects by X-Hasura-Dlob-Source\n- Run parallel workers for mevnode + dlob.drift.trade\n- Frontend proxy sets x-hasura-dlob-source from cookie and injects UI switch
405 lines
13 KiB
JavaScript
405 lines
13 KiB
JavaScript
import fs from 'node:fs';
|
|
import process from 'node:process';
|
|
import { setTimeout as sleep } from 'node:timers/promises';
|
|
|
|
function readJsonFile(filePath) {
|
|
try {
|
|
const raw = fs.readFileSync(filePath, 'utf8');
|
|
return JSON.parse(raw);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
function getIsoNow() {
|
|
return new Date().toISOString();
|
|
}
|
|
|
|
function clampInt(value, min, max, fallback) {
|
|
const n = Number.parseInt(String(value ?? ''), 10);
|
|
if (!Number.isInteger(n)) return fallback;
|
|
return Math.min(max, Math.max(min, n));
|
|
}
|
|
|
|
function envList(name, fallbackCsv) {
|
|
const raw = process.env[name] ?? fallbackCsv;
|
|
return String(raw)
|
|
.split(',')
|
|
.map((s) => s.trim())
|
|
.filter(Boolean);
|
|
}
|
|
|
|
function parsePositiveNumber(value) {
|
|
const n = Number.parseFloat(String(value ?? '').trim());
|
|
if (!Number.isFinite(n) || !(n > 0)) return null;
|
|
return n;
|
|
}
|
|
|
|
function resolveConfig() {
|
|
const tokensPath =
|
|
process.env.HASURA_TOKENS_FILE ||
|
|
process.env.TOKENS_FILE ||
|
|
process.env.HASURA_CONFIG_FILE ||
|
|
'/app/tokens/hasura.json';
|
|
const tokens = readJsonFile(tokensPath) || {};
|
|
|
|
const hasuraUrl =
|
|
process.env.HASURA_GRAPHQL_URL ||
|
|
tokens.graphqlUrl ||
|
|
tokens.apiUrl ||
|
|
'http://hasura:8080/v1/graphql';
|
|
const hasuraAdminSecret =
|
|
process.env.HASURA_ADMIN_SECRET ||
|
|
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
|
tokens.adminSecret ||
|
|
tokens.hasuraAdminSecret;
|
|
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
|
|
|
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
|
|
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
|
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
|
|
|
|
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
|
|
.map(parsePositiveNumber)
|
|
.filter((n) => n != null)
|
|
.map((n) => n)
|
|
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
|
|
.sort((a, b) => a - b);
|
|
|
|
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
|
|
|
|
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
|
|
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
|
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
|
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
|
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
|
|
|
return {
|
|
hasuraUrl,
|
|
hasuraAdminSecret,
|
|
hasuraAuthToken,
|
|
dlobSource,
|
|
markets,
|
|
pollMs,
|
|
sizesUsd,
|
|
sizesUsdInt,
|
|
depthLevels,
|
|
pricePrecision,
|
|
basePrecision,
|
|
};
|
|
}
|
|
|
|
async function graphqlRequest(cfg, query, variables) {
|
|
const headers = { 'content-type': 'application/json' };
|
|
if (cfg.hasuraAuthToken) {
|
|
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
|
} else if (cfg.hasuraAdminSecret) {
|
|
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
|
} else {
|
|
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
|
|
}
|
|
|
|
const res = await fetch(cfg.hasuraUrl, {
|
|
method: 'POST',
|
|
headers,
|
|
body: JSON.stringify({ query, variables }),
|
|
signal: AbortSignal.timeout(15_000),
|
|
});
|
|
const text = await res.text();
|
|
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
|
const json = JSON.parse(text);
|
|
if (json.errors?.length) {
|
|
throw new Error(json.errors.map((e) => e.message).join(' | '));
|
|
}
|
|
return json.data;
|
|
}
|
|
|
|
function toNumberOrNull(value) {
|
|
if (value == null) return null;
|
|
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
|
|
if (typeof value === 'string') {
|
|
const s = value.trim();
|
|
if (!s) return null;
|
|
const n = Number(s);
|
|
return Number.isFinite(n) ? n : null;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function normalizeLevels(raw) {
|
|
if (raw == null) return [];
|
|
if (Array.isArray(raw)) return raw;
|
|
if (typeof raw === 'string') {
|
|
const s = raw.trim();
|
|
if (!s) return [];
|
|
try {
|
|
const v = JSON.parse(s);
|
|
return Array.isArray(v) ? v : [];
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
return [];
|
|
}
|
|
|
|
function parseScaledLevels(raw, pricePrecision, basePrecision) {
|
|
const levels = normalizeLevels(raw);
|
|
const out = [];
|
|
for (const it of levels) {
|
|
const priceInt = toNumberOrNull(it?.price);
|
|
const sizeInt = toNumberOrNull(it?.size);
|
|
if (priceInt == null || sizeInt == null) continue;
|
|
const price = priceInt / pricePrecision;
|
|
const base = sizeInt / basePrecision;
|
|
if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
|
|
out.push({ price, base });
|
|
}
|
|
return out;
|
|
}
|
|
|
|
function simulateFill(levels, sizeUsd) {
|
|
let remainingUsd = sizeUsd;
|
|
let filledUsd = 0;
|
|
let filledBase = 0;
|
|
let totalQuoteUsd = 0;
|
|
let worstPrice = null;
|
|
let levelsConsumed = 0;
|
|
|
|
for (const l of levels) {
|
|
if (remainingUsd <= 0) break;
|
|
const levelUsd = l.base * l.price;
|
|
if (levelUsd <= 0) continue;
|
|
levelsConsumed += 1;
|
|
worstPrice = l.price;
|
|
|
|
const takeUsd = Math.min(remainingUsd, levelUsd);
|
|
const takeBase = takeUsd / l.price;
|
|
|
|
remainingUsd -= takeUsd;
|
|
filledUsd += takeUsd;
|
|
filledBase += takeBase;
|
|
totalQuoteUsd += takeUsd;
|
|
}
|
|
|
|
const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
|
|
const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
|
|
|
|
return {
|
|
filledUsd,
|
|
filledBase,
|
|
vwapPrice,
|
|
worstPrice,
|
|
levelsConsumed,
|
|
fillPct,
|
|
};
|
|
}
|
|
|
|
function impactBps({ side, mid, vwap }) {
|
|
if (mid == null || vwap == null || mid <= 0) return null;
|
|
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
|
|
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
|
|
return null;
|
|
}
|
|
|
|
async function main() {
|
|
const cfg = resolveConfig();
|
|
|
|
console.log(
|
|
JSON.stringify(
|
|
{
|
|
service: 'dlob-slippage-worker',
|
|
startedAt: getIsoNow(),
|
|
hasuraUrl: cfg.hasuraUrl,
|
|
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
|
dlobSource: cfg.dlobSource,
|
|
markets: cfg.markets,
|
|
pollMs: cfg.pollMs,
|
|
sizesUsd: cfg.sizesUsd,
|
|
depthLevels: cfg.depthLevels,
|
|
},
|
|
null,
|
|
2
|
|
)
|
|
);
|
|
|
|
const lastSeenUpdatedAt = new Map(); // market -> updated_at
|
|
|
|
while (true) {
|
|
const updatedAt = getIsoNow();
|
|
|
|
try {
|
|
const query = `
|
|
query DlobL2Latest($source: String!, $markets: [String!]!) {
|
|
dlob_l2_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
|
|
source
|
|
market_name
|
|
market_type
|
|
market_index
|
|
ts
|
|
slot
|
|
best_bid_price
|
|
best_ask_price
|
|
bids
|
|
asks
|
|
updated_at
|
|
}
|
|
}
|
|
`;
|
|
|
|
const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
|
|
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
|
|
|
|
const objectsV1 = [];
|
|
const objectsV2 = [];
|
|
|
|
for (const row of rows) {
|
|
const market = String(row?.market_name || '').trim();
|
|
if (!market) continue;
|
|
|
|
const rowUpdatedAt = row?.updated_at ?? null;
|
|
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
|
|
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
|
|
|
|
const bestBid = toNumberOrNull(row?.best_bid_price);
|
|
const bestAsk = toNumberOrNull(row?.best_ask_price);
|
|
if (bestBid == null || bestAsk == null) continue;
|
|
|
|
const mid = (bestBid + bestAsk) / 2;
|
|
if (!Number.isFinite(mid) || mid <= 0) continue;
|
|
|
|
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
|
|
.slice()
|
|
.sort((a, b) => b.price - a.price)
|
|
.slice(0, cfg.depthLevels);
|
|
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
|
|
.slice()
|
|
.sort((a, b) => a.price - b.price)
|
|
.slice(0, cfg.depthLevels);
|
|
|
|
for (const sizeUsd of cfg.sizesUsd) {
|
|
// buy consumes asks (worse prices as you go up)
|
|
{
|
|
const sim = simulateFill(asks, sizeUsd);
|
|
const baseObj = {
|
|
source: cfg.dlobSource,
|
|
market_name: market,
|
|
side: 'buy',
|
|
market_type: row?.market_type ?? 'perp',
|
|
market_index: row?.market_index ?? null,
|
|
ts: row?.ts == null ? null : String(row.ts),
|
|
slot: row?.slot == null ? null : String(row.slot),
|
|
mid_price: String(mid),
|
|
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
|
|
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
|
|
filled_usd: String(sim.filledUsd),
|
|
filled_base: String(sim.filledBase),
|
|
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
|
|
levels_consumed: sim.levelsConsumed,
|
|
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
|
|
raw: { depthLevels: cfg.depthLevels },
|
|
updated_at: updatedAt,
|
|
};
|
|
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
|
|
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
|
|
}
|
|
|
|
// sell consumes bids (worse prices as you go down)
|
|
{
|
|
const sim = simulateFill(bids, sizeUsd);
|
|
const baseObj = {
|
|
source: cfg.dlobSource,
|
|
market_name: market,
|
|
side: 'sell',
|
|
market_type: row?.market_type ?? 'perp',
|
|
market_index: row?.market_index ?? null,
|
|
ts: row?.ts == null ? null : String(row.ts),
|
|
slot: row?.slot == null ? null : String(row.slot),
|
|
mid_price: String(mid),
|
|
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
|
|
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
|
|
filled_usd: String(sim.filledUsd),
|
|
filled_base: String(sim.filledBase),
|
|
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
|
|
levels_consumed: sim.levelsConsumed,
|
|
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
|
|
raw: { depthLevels: cfg.depthLevels },
|
|
updated_at: updatedAt,
|
|
};
|
|
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
|
|
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
|
|
}
|
|
}
|
|
}
|
|
|
|
if (objectsV1.length) {
|
|
const mutation = `
|
|
mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
|
|
insert_dlob_slippage_latest(
|
|
objects: $rows
|
|
on_conflict: {
|
|
constraint: dlob_slippage_latest_pkey
|
|
update_columns: [
|
|
market_type
|
|
market_index
|
|
ts
|
|
slot
|
|
mid_price
|
|
vwap_price
|
|
worst_price
|
|
filled_usd
|
|
filled_base
|
|
impact_bps
|
|
levels_consumed
|
|
fill_pct
|
|
raw
|
|
updated_at
|
|
]
|
|
}
|
|
) { affected_rows }
|
|
}
|
|
`;
|
|
await graphqlRequest(cfg, mutation, { rows: objectsV1 });
|
|
}
|
|
|
|
if (objectsV2.length) {
|
|
const mutation = `
|
|
mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
|
|
insert_dlob_slippage_latest_v2(
|
|
objects: $rows
|
|
on_conflict: {
|
|
constraint: dlob_slippage_latest_v2_pkey
|
|
update_columns: [
|
|
market_type
|
|
market_index
|
|
ts
|
|
slot
|
|
mid_price
|
|
vwap_price
|
|
worst_price
|
|
filled_usd
|
|
filled_base
|
|
impact_bps
|
|
levels_consumed
|
|
fill_pct
|
|
raw
|
|
updated_at
|
|
]
|
|
}
|
|
) { affected_rows }
|
|
}
|
|
`;
|
|
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
|
|
}
|
|
} catch (err) {
|
|
console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
|
|
}
|
|
|
|
await sleep(cfg.pollMs);
|
|
}
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error(String(err?.stack || err));
|
|
process.exitCode = 1;
|
|
});
|