Compare commits

...

34 Commits

Author SHA1 Message Date
u1
d3f00cfbb4 chore(staging): bump trade-frontend image to sha-ob-20260111203413 2026-01-11 20:34:54 +00:00
u1
773784e1a3 chore(staging): bump trade-api image to k3s-20260111095435 2026-01-11 09:55:33 +00:00
u1
56045f8f55 chore(staging): bump trade-frontend image to sha-ca9e44a 2026-01-11 09:55:18 +00:00
u1
1ba4c72e11 feat(staging): switch ticks ingest to dlob stats 2026-01-10 12:16:17 +00:00
u1
1efa41c112 feat(staging): switch ingestor to dlob stats 2026-01-10 12:16:09 +00:00
u1
b925ad78e8 feat(staging): ingest ticks from dlob stats 2026-01-10 12:16:01 +00:00
u1
0f638f1d70 feat(deploy): add dlob derived stats workers 2026-01-10 11:45:01 +00:00
u1
5c59b27808 feat(dlob): add dlob-slippage-worker script 2026-01-10 11:30:41 +00:00
u1
d66ce89c7a feat(dlob): add dlob-slippage-worker deployment 2026-01-10 11:28:12 +00:00
u1
78711b9baf feat(dlob): add dlob-depth-worker script 2026-01-10 11:27:57 +00:00
u1
51424eeefc feat(dlob): add dlob-depth-worker deployment 2026-01-10 11:26:41 +00:00
u1
45a637f545 feat(hasura): track derived dlob stats tables 2026-01-10 11:26:13 +00:00
u1
1b1603c8f0 feat(db): add dlob depth/slippage tables 2026-01-10 11:24:51 +00:00
u1
86fcc286e5 fix(dlob): run redis as single-node cluster 2026-01-10 10:08:48 +00:00
u1
5992a54ac3 fix(dlob-worker): allow forcing IPv6 egress 2026-01-10 10:08:32 +00:00
u1
44853ab6f6 chore(staging): enable dlob-worker ipv6 patch 2026-01-10 10:07:20 +00:00
u1
c06a459b67 fix(staging): force dlob-worker via IPv6 2026-01-10 10:07:12 +00:00
u1
370cb3f74c fix(staging): disable redis TLS for dlob 2026-01-10 09:49:38 +01:00
u1
ead68a25cf feat(staging): self-host DLOB service 2026-01-10 09:37:52 +01:00
u1
392458ad99 fix(hasura): ignore already-untracked errors in bootstrap 2026-01-10 01:16:24 +00:00
u1
bd05eab467 fix(deploy): bump frontend image to sha-226406a 2026-01-10 01:06:22 +00:00
u1
f39f201b70 feat(dlob): wire worker + migrate job into kustomize 2026-01-10 00:54:04 +00:00
u1
fee9120bc2 feat(dlob): add worker script 2026-01-10 00:53:48 +00:00
u1
f32be5ea1c feat(dlob): add dlob worker deployment 2026-01-10 00:52:45 +00:00
u1
1a7a1c4de8 chore(hasura): run bootstrap as Argo hook 2026-01-10 00:52:33 +00:00
u1
a628f9044f feat(db): add postgres migrate job 2026-01-10 00:52:10 +00:00
u1
0c853354eb feat(hasura): track DLOB tables and public permissions 2026-01-10 00:42:46 +00:00
u1
476eb331c2 feat(db): add DLOB latest tables 2026-01-10 00:41:53 +00:00
u1
93587645cd feat(hasura): enable unauthorized public role 2026-01-10 00:41:09 +00:00
u1
42f26089e1 deploy(frontend): bump image to sha-f85e6da 2026-01-10 00:40:46 +00:00
u1
75e87a7cc8 feat(staging): tune fake-ingestor realism
- Seed uses paged /v1/ticks fetch (beyond 5k) to capture longer history.

- Longer return blocks + adjusted volatility params for a more lifelike curve.
2026-01-09 01:48:55 +01:00
u1
0eef6bca12 feat(staging): switch ingestor to fake
- Replaces Drift/RPC ingest with a seeded fake generator in the staging overlay.

- Seeds from existing ticks via trade-api /v1/ticks and writes new ticks via /v1/ingest/tick.
2026-01-09 01:13:35 +01:00
u1
ac50ca2117 chore(staging): bump images + add mpabi.pl host
- Switch custom images to gitea.mpabi.pl registry (rv32i.pl TLS no longer matches).
- Bump frontend to sha-a12c86f.
- Add staging ingress for mpabi.pl pointing to trade-frontend.
2026-01-06 23:46:06 +01:00
u1
5442d52ab1 chore(argocd): use gitea.mpabi.pl repoURL 2026-01-06 18:10:58 +01:00
29 changed files with 2641 additions and 8 deletions

View File

@@ -6,7 +6,7 @@ metadata:
spec:
project: default
source:
repoURL: https://rv32i.pl/trade/trade-deploy.git
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
targetRevision: main
path: kustomize/infra/portainer
destination:

View File

@@ -6,7 +6,7 @@ metadata:
spec:
project: default
source:
repoURL: https://rv32i.pl/trade/trade-deploy.git
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
targetRevision: main
path: kustomize/overlays/prod
destination:

View File

@@ -6,7 +6,7 @@ metadata:
spec:
project: default
source:
repoURL: https://rv32i.pl/trade/trade-deploy.git
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
targetRevision: main
path: kustomize/overlays/staging
destination:

View File

@@ -16,7 +16,7 @@ spec:
- name: gitea-registry
containers:
- name: api
image: rv32i.pl/trade/trade-api:k3s-20260106013603
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
imagePullPolicy: IfNotPresent
ports:
- name: http

View File

@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-depth-worker
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-depth-worker
template:
metadata:
labels:
app.kubernetes.io/name: dlob-depth-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_DEPTH_BPS_BANDS
value: "5,10,20,50,100,200"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-depth-worker-script

View File

@@ -0,0 +1,311 @@
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function envIntList(name, fallbackCsv) {
const out = [];
for (const item of envList(name, fallbackCsv)) {
const n = Number.parseInt(item, 10);
if (!Number.isFinite(n)) continue;
out.push(n);
}
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
}
function toNumberOrNull(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
if (typeof value === 'string') {
const s = value.trim();
if (!s) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
try {
return JSON.parse(s);
} catch {
return value;
}
}
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
bandsBps,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
const out = [];
for (const item of v) {
const priceInt = toNumberOrNull(item?.price);
const sizeInt = toNumberOrNull(item?.size);
if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
out.push({ price, size });
}
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out;
}
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function computeBandDepth({ bids, asks, mid, bandBps }) {
if (mid == null || !(mid > 0)) {
return { bidBase: 0, askBase: 0, bidUsd: 0, askUsd: 0, imbalance: null };
}
const minBidPrice = mid * (1 - bandBps / 10_000);
const maxAskPrice = mid * (1 + bandBps / 10_000);
let bidBase = 0;
let askBase = 0;
let bidUsd = 0;
let askUsd = 0;
for (const lvl of bids) {
if (lvl.price < minBidPrice) break;
bidBase += lvl.size;
bidUsd += lvl.size * lvl.price;
}
for (const lvl of asks) {
if (lvl.price > maxAskPrice) break;
askBase += lvl.size;
askUsd += lvl.size * lvl.price;
}
const denom = bidUsd + askUsd;
const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null;
return { bidBase, askBase, bidUsd, askUsd, imbalance };
}
async function fetchL2Latest(cfg) {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
market_name
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
asks
updated_at
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
}
async function upsertDepth(cfg, rows) {
if (!rows.length) return;
const mutation = `
mutation UpsertDlobDepth($rows: [dlob_depth_bps_latest_insert_input!]!) {
insert_dlob_depth_bps_latest(
objects: $rows
on_conflict: {
constraint: dlob_depth_bps_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
best_bid_price
best_ask_price
bid_base
ask_base
bid_usd
ask_usd
imbalance
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows });
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'dlob-depth-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
bandsBps: cfg.bandsBps,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
null,
2
)
);
while (true) {
const rows = [];
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
for (const bandBps of cfg.bandsBps) {
const d = computeBandDepth({ bids, asks, mid, bandBps });
rows.push({
market_name: market,
band_bps: bandBps,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
bid_base: numStr(d.bidBase),
ask_base: numStr(d.askBase),
bid_usd: numStr(d.bidUsd),
ask_usd: numStr(d.askUsd),
imbalance: numStr(d.imbalance),
raw: {
ref: 'mid',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
}
}
} catch (err) {
console.error(`[dlob-depth-worker] fetch/compute: ${String(err?.message || err)}`);
}
try {
await upsertDepth(cfg, rows);
} catch (err) {
console.error(`[dlob-depth-worker] upsert: ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-slippage-worker
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-slippage-worker
template:
metadata:
labels:
app.kubernetes.io/name: dlob-slippage-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD
value: "100,500,1000,5000,10000,50000"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-slippage-worker-script

View File

@@ -0,0 +1,379 @@
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function envIntList(name, fallbackCsv) {
const out = [];
for (const item of envList(name, fallbackCsv)) {
const n = Number.parseInt(item, 10);
if (!Number.isFinite(n)) continue;
out.push(n);
}
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
}
function toNumberOrNull(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
if (typeof value === 'string') {
const s = value.trim();
if (!s) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
try {
return JSON.parse(s);
} catch {
return value;
}
}
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
const out = [];
for (const item of v) {
const priceInt = toNumberOrNull(item?.price);
const sizeInt = toNumberOrNull(item?.size);
if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
out.push({ price, size });
}
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out;
}
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function simulateBuy(asks, mid, sizeUsd) {
let remainingUsd = sizeUsd;
let filledUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of asks) {
if (!(remainingUsd > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const maxBase = remainingUsd / lvl.price;
const takeBase = Math.min(lvl.size, maxBase);
if (!(takeBase > 0)) continue;
const cost = takeBase * lvl.price;
filledUsd += cost;
filledBase += takeBase;
remainingUsd -= cost;
worstPrice = lvl.price;
levelsConsumed += 1;
}
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
}
function simulateSell(bids, mid, sizeUsd) {
if (mid == null || !(mid > 0)) {
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
}
const baseTarget = sizeUsd / mid;
let remainingBase = baseTarget;
let proceedsUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of bids) {
if (!(remainingBase > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const takeBase = Math.min(lvl.size, remainingBase);
if (!(takeBase > 0)) continue;
const proceeds = takeBase * lvl.price;
proceedsUsd += proceeds;
filledBase += takeBase;
remainingBase -= takeBase;
worstPrice = lvl.price;
levelsConsumed += 1;
}
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
}
async function fetchL2Latest(cfg) {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
market_name
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
asks
updated_at
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
}
async function upsertSlippage(cfg, rows) {
if (!rows.length) return;
const mutation = `
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
best_bid_price
best_ask_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows });
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
null,
2
)
);
while (true) {
const rows = [];
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
for (const sizeUsd of cfg.sizesUsd) {
const buy = simulateBuy(asks, mid, sizeUsd);
rows.push({
market_name: market,
side: 'buy',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(buy.vwap),
worst_price: numStr(buy.worstPrice),
filled_usd: numStr(buy.filledUsd),
filled_base: numStr(buy.filledBase),
impact_bps: numStr(buy.impactBps),
levels_consumed: buy.levelsConsumed,
fill_pct: numStr(buy.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
const sell = simulateSell(bids, mid, sizeUsd);
rows.push({
market_name: market,
side: 'sell',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(sell.vwap),
worst_price: numStr(sell.worstPrice),
filled_usd: numStr(sell.filledUsd),
filled_base: numStr(sell.filledBase),
impact_bps: numStr(sell.impactBps),
levels_consumed: sell.levelsConsumed,
fill_pct: numStr(sell.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
}
}
} catch (err) {
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
}
try {
await upsertSlippage(cfg, rows);
} catch (err) {
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,46 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker
annotations:
argocd.argoproj.io/sync-wave: "5"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-worker
template:
metadata:
labels:
app.kubernetes.io/name: dlob-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_HTTP_URL
value: http://dlob-server:6969
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
- name: DLOB_POLL_MS
value: "500"
- name: DLOB_DEPTH
value: "10"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-worker-script

View File

@@ -0,0 +1,430 @@
import fs from 'node:fs';
import * as http from 'node:http';
import * as https from 'node:https';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function envBool(name, fallback = false) {
const raw = process.env[name];
if (raw == null) return fallback;
const v = String(raw).trim().toLowerCase();
if (['1', 'true', 'yes', 'y', 'on'].includes(v)) return true;
if (['0', 'false', 'no', 'n', 'off'].includes(v)) return false;
return fallback;
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const dlobHttpBase = String(process.env.DLOB_HTTP_URL || process.env.DLOB_HTTP_BASE || 'https://dlob.drift.trade')
.trim()
.replace(/\/$/, '');
const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false);
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10);
const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500);
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
dlobHttpBase,
dlobForceIpv6,
markets,
depth,
pollMs,
pricePrecision,
basePrecision,
};
}
async function requestText(url, { timeoutMs, family } = {}) {
const u = new URL(url);
const client = u.protocol === 'https:' ? https : http;
const port = u.port ? Number.parseInt(u.port, 10) : u.protocol === 'https:' ? 443 : 80;
if (!Number.isFinite(port)) throw new Error(`Invalid port for url: ${url}`);
return await new Promise((resolve, reject) => {
const req = client.request(
{
protocol: u.protocol,
hostname: u.hostname,
port,
path: `${u.pathname}${u.search}`,
method: 'GET',
family,
servername: u.hostname,
headers: {
accept: 'application/json',
},
},
(res) => {
let data = '';
res.setEncoding('utf8');
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
resolve({ status: res.statusCode ?? 0, text: data });
});
}
);
req.on('error', reject);
req.setTimeout(timeoutMs ?? 5_000, () => {
req.destroy(new Error(`Timeout after ${timeoutMs ?? 5_000}ms`));
});
req.end();
});
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
function toNumberOrNull(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
if (typeof value === 'string') {
const s = value.trim();
if (!s) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function parseScaled(valueRaw, scale) {
const n = toNumberOrNull(valueRaw);
if (n == null) return null;
return n / scale;
}
function computeStats({ l2, depth, pricePrecision, basePrecision }) {
const bids = Array.isArray(l2?.bids) ? l2.bids : [];
const asks = Array.isArray(l2?.asks) ? l2.asks : [];
const bestBid = parseScaled(l2?.bestBidPrice ?? bids?.[0]?.price, pricePrecision);
const bestAsk = parseScaled(l2?.bestAskPrice ?? asks?.[0]?.price, pricePrecision);
const markPrice = parseScaled(l2?.markPrice, pricePrecision);
const oraclePrice = parseScaled(l2?.oracleData?.price ?? l2?.oracle, pricePrecision);
const mid = bestBid != null && bestAsk != null ? (bestBid + bestAsk) / 2 : null;
const spreadAbs = bestBid != null && bestAsk != null ? bestAsk - bestBid : null;
const spreadBps = spreadAbs != null && mid != null && mid > 0 ? (spreadAbs / mid) * 10_000 : null;
const levels = Math.max(1, depth);
let bidBase = 0;
let askBase = 0;
let bidUsd = 0;
let askUsd = 0;
for (let i = 0; i < Math.min(levels, bids.length); i += 1) {
const p = parseScaled(bids[i]?.price, pricePrecision);
const s = toNumberOrNull(bids[i]?.size);
if (p == null || s == null) continue;
const base = s / basePrecision;
bidBase += base;
bidUsd += base * p;
}
for (let i = 0; i < Math.min(levels, asks.length); i += 1) {
const p = parseScaled(asks[i]?.price, pricePrecision);
const s = toNumberOrNull(asks[i]?.size);
if (p == null || s == null) continue;
const base = s / basePrecision;
askBase += base;
askUsd += base * p;
}
const denom = bidUsd + askUsd;
const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null;
return {
bestBid,
bestAsk,
mid,
spreadAbs,
spreadBps,
markPrice,
oraclePrice,
depthLevels: levels,
bidBase,
askBase,
bidUsd,
askUsd,
imbalance,
};
}
function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
return {
market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mark_price: numStr(parseScaled(l2.markPrice, pricePrecision)),
oracle_price: numStr(parseScaled(l2.oracleData?.price ?? l2.oracle, pricePrecision)),
best_bid_price: numStr(parseScaled(l2.bestBidPrice, pricePrecision)),
best_ask_price: numStr(parseScaled(l2.bestAskPrice, pricePrecision)),
bids: l2.bids ?? null,
asks: l2.asks ?? null,
raw: l2 ?? null,
updated_at: updatedAt,
};
}
function statsToInsertObject({ l2, stats, updatedAt }) {
return {
market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mark_price: stats.markPrice == null ? null : String(stats.markPrice),
oracle_price: stats.oraclePrice == null ? null : String(stats.oraclePrice),
best_bid_price: stats.bestBid == null ? null : String(stats.bestBid),
best_ask_price: stats.bestAsk == null ? null : String(stats.bestAsk),
mid_price: stats.mid == null ? null : String(stats.mid),
spread_abs: stats.spreadAbs == null ? null : String(stats.spreadAbs),
spread_bps: stats.spreadBps == null ? null : String(stats.spreadBps),
depth_levels: stats.depthLevels,
depth_bid_base: Number.isFinite(stats.bidBase) ? String(stats.bidBase) : null,
depth_ask_base: Number.isFinite(stats.askBase) ? String(stats.askBase) : null,
depth_bid_usd: Number.isFinite(stats.bidUsd) ? String(stats.bidUsd) : null,
depth_ask_usd: Number.isFinite(stats.askUsd) ? String(stats.askUsd) : null,
imbalance: stats.imbalance == null ? null : String(stats.imbalance),
raw: {
spreadPct: l2.spreadPct ?? null,
spreadQuote: l2.spreadQuote ?? null,
},
updated_at: updatedAt,
};
}
async function fetchL2(cfg, marketName) {
const u = new URL(`${cfg.dlobHttpBase}/l2`);
u.searchParams.set('marketName', marketName);
u.searchParams.set('depth', String(cfg.depth));
const url = u.toString();
if (cfg.dlobForceIpv6) {
const { status, text } = await requestText(url, { timeoutMs: 5_000, family: 6 });
if (status < 200 || status >= 300) throw new Error(`DLOB HTTP ${status}: ${text}`);
return JSON.parse(text);
}
const res = await fetch(url, { signal: AbortSignal.timeout(5_000) });
const text = await res.text();
if (!res.ok) throw new Error(`DLOB HTTP ${res.status}: ${text}`);
return JSON.parse(text);
}
async function upsertBatch(cfg, l2Objects, statsObjects) {
if (!l2Objects.length && !statsObjects.length) return;
const mutation = `
mutation UpsertDlob($l2: [dlob_l2_latest_insert_input!]!, $stats: [dlob_stats_latest_insert_input!]!) {
insert_dlob_l2_latest(
objects: $l2
on_conflict: {
constraint: dlob_l2_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
asks
raw
updated_at
]
}
) { affected_rows }
insert_dlob_stats_latest(
objects: $stats
on_conflict: {
constraint: dlob_stats_latest_pkey
update_columns: [
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
mid_price
spread_abs
spread_bps
depth_levels
depth_bid_base
depth_ask_base
depth_bid_usd
depth_ask_usd
imbalance
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { l2: l2Objects, stats: statsObjects });
}
async function main() {
const cfg = resolveConfig();
const lastTsByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'dlob-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobHttpBase: cfg.dlobHttpBase,
dlobForceIpv6: cfg.dlobForceIpv6,
markets: cfg.markets,
depth: cfg.depth,
pollMs: cfg.pollMs,
},
null,
2
)
);
while (true) {
const updatedAt = getIsoNow();
const results = await Promise.allSettled(cfg.markets.map((m) => fetchL2(cfg, m)));
const l2Objects = [];
const statsObjects = [];
for (let i = 0; i < results.length; i += 1) {
const market = cfg.markets[i];
const r = results[i];
if (r.status !== 'fulfilled') {
console.error(`[dlob-worker] fetch ${market}: ${String(r.reason?.message || r.reason)}`);
continue;
}
const l2 = r.value;
if (!l2?.marketName) continue;
const ts = l2.ts == null ? null : String(l2.ts);
if (ts != null && lastTsByMarket.get(l2.marketName) === ts) continue;
if (ts != null) lastTsByMarket.set(l2.marketName, ts);
const stats = computeStats({
l2,
depth: cfg.depth,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
});
l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision }));
statsObjects.push(statsToInsertObject({ l2, stats, updatedAt }));
}
try {
await upsertBatch(cfg, l2Objects, statsObjects);
} catch (err) {
console.error(`[dlob-worker] upsert: ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,69 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-publisher
annotations:
argocd.argoproj.io/sync-wave: "4"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-publisher
template:
metadata:
labels:
app.kubernetes.io/name: dlob-publisher
spec:
imagePullSecrets:
- name: gitea-registry
containers:
- name: publisher
image: gitea.mpabi.pl/trade/trade-dlob-server:sha-8a378b7-lite-l2
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8080
env:
- name: RUNNING_LOCAL
value: "true"
- name: LOCAL_CACHE
value: "true"
- name: ENV
value: mainnet-beta
- name: USE_WEBSOCKET
value: "true"
- name: USE_ORDER_SUBSCRIBER
value: "true"
- name: DISABLE_GPA_REFRESH
value: "true"
- name: ELASTICACHE_HOST
value: dlob-redis
- name: ELASTICACHE_PORT
value: "6379"
- name: REDIS_CLIENT
value: DLOB
- name: PERP_MARKETS_TO_LOAD
value: "0,1,2,4,75"
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: trade-dlob-rpc
key: ENDPOINT
- name: WS_ENDPOINT
valueFrom:
secretKeyRef:
name: trade-dlob-rpc
key: WS_ENDPOINT
command: ["node", "/lib/publishers/dlobPublisher.js"]
readinessProbe:
httpGet:
path: /startup
port: http
initialDelaySeconds: 10
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
periodSeconds: 20

View File

@@ -0,0 +1,79 @@
apiVersion: v1
kind: Service
metadata:
name: dlob-redis
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
selector:
app.kubernetes.io/name: dlob-redis
ports:
- name: redis
port: 6379
targetPort: redis
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-redis
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-redis
template:
metadata:
labels:
app.kubernetes.io/name: dlob-redis
spec:
containers:
- name: redis
image: redis:7-alpine
imagePullPolicy: IfNotPresent
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
ports:
- name: redis
containerPort: 6379
# DLOB redis client uses ioredis Cluster when RUNNING_LOCAL=true and LOCAL_CACHE=true.
# We run a single-node Redis Cluster (no TLS) and assign all slots on startup.
command: ["/bin/sh", "-lc"]
args:
- |
exec redis-server \
--save "" \
--appendonly no \
--protected-mode no \
--bind 0.0.0.0 \
--cluster-enabled yes \
--cluster-config-file /data/nodes.conf \
--cluster-node-timeout 5000 \
--cluster-require-full-coverage no \
--cluster-announce-ip "${POD_IP}" \
--cluster-announce-port 6379 \
--cluster-announce-bus-port 16379
lifecycle:
postStart:
exec:
command:
- /bin/sh
- -lc
- |
set -e
for i in $(seq 1 60); do
redis-cli -h 127.0.0.1 -p 6379 ping >/dev/null 2>&1 && break
sleep 1
done
# If cluster is already initialized, do nothing.
if redis-cli -h 127.0.0.1 -p 6379 cluster info 2>/dev/null | grep -q 'cluster_slots_assigned:16384'; then
exit 0
fi
# Redis 7+ supports CLUSTER ADDSLOTSRANGE.
redis-cli -h 127.0.0.1 -p 6379 cluster addslotsrange 0 16383 || true

View File

@@ -0,0 +1,63 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-server
annotations:
argocd.argoproj.io/sync-wave: "4"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-server
template:
metadata:
labels:
app.kubernetes.io/name: dlob-server
spec:
imagePullSecrets:
- name: gitea-registry
containers:
- name: server
image: gitea.mpabi.pl/trade/trade-dlob-server:sha-8a378b7-lite-l2
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 6969
env:
- name: RUNNING_LOCAL
value: "true"
- name: LOCAL_CACHE
value: "true"
- name: ENV
value: mainnet-beta
- name: PORT
value: "6969"
- name: ELASTICACHE_HOST
value: dlob-redis
- name: ELASTICACHE_PORT
value: "6379"
- name: REDIS_CLIENT
value: DLOB
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: trade-dlob-rpc
key: ENDPOINT
- name: WS_ENDPOINT
valueFrom:
secretKeyRef:
name: trade-dlob-rpc
key: WS_ENDPOINT
command: ["node", "/lib/serverLite.js"]
readinessProbe:
httpGet:
path: /startup
port: http
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 20
periodSeconds: 20

View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: dlob-server
annotations:
argocd.argoproj.io/sync-wave: "4"
spec:
selector:
app.kubernetes.io/name: dlob-server
ports:
- name: http
port: 6969
targetPort: http

View File

@@ -16,7 +16,7 @@ spec:
- name: gitea-registry
containers:
- name: frontend
image: rv32i.pl/trade/trade-frontend:sha-6107c4e
image: gitea.mpabi.pl/trade/trade-frontend:sha-ob-20260111203413
imagePullPolicy: IfNotPresent
ports:
- name: http

View File

@@ -37,6 +37,8 @@ spec:
value: "false"
- name: HASURA_GRAPHQL_CORS_DOMAIN
value: "http://localhost:5173,http://127.0.0.1:5173"
- name: HASURA_GRAPHQL_UNAUTHORIZED_ROLE
value: "public"
readinessProbe:
httpGet:
path: /healthz

View File

@@ -51,7 +51,7 @@ function isAlreadyExistsError(errText) {
function isMissingRelationError(errText) {
const t = String(errText || '').toLowerCase();
return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked');
return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked') || t.includes('already untracked');
}
function normalizeName(name) {
@@ -93,6 +93,10 @@ async function main() {
const source = 'default';
const baseTicks = { schema: 'public', name: 'drift_ticks' };
const dlobL2LatestTable = { schema: 'public', name: 'dlob_l2_latest' };
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
@@ -161,8 +165,139 @@ async function main() {
for (const t of tickTables) {
await ensureTickTable(t);
}
const ensureDlobTable = async (table, columns) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } });
await metadata({
type: 'pg_create_select_permission',
args: {
source,
table,
role: 'public',
permission: {
columns,
filter: {},
},
},
});
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_insert_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
check: {},
set: {},
columns,
},
},
});
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_update_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
filter: {},
check: {},
columns,
},
},
});
};
await ensureDlobTable(dlobL2LatestTable, [
'market_name',
'market_type',
'market_index',
'ts',
'slot',
'mark_price',
'oracle_price',
'best_bid_price',
'best_ask_price',
'bids',
'asks',
'raw',
'updated_at',
]);
await ensureDlobTable(dlobStatsLatestTable, [
'market_name',
'market_type',
'market_index',
'ts',
'slot',
'mark_price',
'oracle_price',
'best_bid_price',
'best_ask_price',
'mid_price',
'spread_abs',
'spread_bps',
'depth_levels',
'depth_bid_base',
'depth_ask_base',
'depth_bid_usd',
'depth_ask_usd',
'imbalance',
'raw',
'updated_at',
]);
await ensureDlobTable(dlobDepthBpsLatestTable, [
'market_name',
'band_bps',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'bid_base',
'ask_base',
'bid_usd',
'ask_usd',
'imbalance',
'raw',
'updated_at',
]);
await ensureDlobTable(dlobSlippageLatestTable, [
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
]);
// Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });
try {
await metadata({ type: 'pg_track_table', args: { source, table: apiTokensTable } });
} catch (err) {

View File

@@ -3,7 +3,9 @@ kind: Job
metadata:
name: hasura-bootstrap
annotations:
argocd.argoproj.io/sync-wave: "2"
argocd.argoproj.io/hook: Sync
argocd.argoproj.io/hook-delete-policy: BeforeHookCreation,HookSucceeded
argocd.argoproj.io/sync-wave: "3"
spec:
backoffLimit: 5
template:

View File

@@ -16,7 +16,7 @@ spec:
- name: gitea-registry
containers:
- name: ingestor
image: rv32i.pl/trade/trade-ingestor:k3s-20260106013603
image: gitea.mpabi.pl/trade/trade-ingestor:k3s-20260106013603
imagePullPolicy: IfNotPresent
env:
- name: MARKET_NAME

View File

@@ -163,3 +163,111 @@ AS $$
ORDER BY bucket DESC
LIMIT p_limit;
$$;
-- Latest DLOB orderbook snapshots (top-N levels), per market.
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
market_name TEXT PRIMARY KEY,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mark_price NUMERIC,
oracle_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
bids JSONB,
asks JSONB,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
ON public.dlob_l2_latest (updated_at DESC);
-- Derived stats for fast UI display.
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
market_name TEXT PRIMARY KEY,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mark_price NUMERIC,
oracle_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
mid_price NUMERIC,
spread_abs NUMERIC,
spread_bps NUMERIC,
depth_levels INTEGER,
depth_bid_base NUMERIC,
depth_ask_base NUMERIC,
depth_bid_usd NUMERIC,
depth_ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
ON public.dlob_stats_latest (updated_at DESC);
-- Depth snapshots within bps bands around mid-price (per market, per band).
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
bid_base NUMERIC,
ask_base NUMERIC,
bid_usd NUMERIC,
ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, band_bps)
);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
ON public.dlob_depth_bps_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
ON public.dlob_depth_bps_latest (market_name);
-- Slippage/impact estimates for "market" orders at common USD sizes.
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
ON public.dlob_slippage_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name);

View File

@@ -5,6 +5,7 @@ resources:
- cm-trade-deploy.yaml
- postgres/service.yaml
- postgres/statefulset.yaml
- postgres/job-migrate.yaml
- hasura/service.yaml
- hasura/deployment.yaml
- hasura/job-bootstrap.yaml
@@ -13,6 +14,13 @@ resources:
- ingestor/deployment.yaml
- frontend/service.yaml
- frontend/deployment.yaml
- dlob/redis.yaml
- dlob/publisher-deployment.yaml
- dlob/server-service.yaml
- dlob/server-deployment.yaml
- dlob-worker/deployment.yaml
- dlob-depth-worker/deployment.yaml
- dlob-slippage-worker/deployment.yaml
configMapGenerator:
- name: postgres-initdb
@@ -21,6 +29,15 @@ configMapGenerator:
- name: hasura-bootstrap-script
files:
- hasura/hasura-bootstrap.mjs
- name: dlob-worker-script
files:
- dlob-worker/worker.mjs
- name: dlob-depth-worker-script
files:
- dlob-depth-worker/worker.mjs
- name: dlob-slippage-worker-script
files:
- dlob-slippage-worker/worker.mjs
generatorOptions:
disableNameSuffixHash: true

View File

@@ -0,0 +1,36 @@
apiVersion: batch/v1
kind: Job
metadata:
name: postgres-migrate
annotations:
argocd.argoproj.io/hook: Sync
argocd.argoproj.io/hook-delete-policy: BeforeHookCreation,HookSucceeded
argocd.argoproj.io/sync-wave: "2"
spec:
backoffLimit: 3
template:
spec:
restartPolicy: OnFailure
containers:
- name: postgres-migrate
image: postgres:16
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: trade-postgres
command:
- sh
- -ec
- |
export PGPASSWORD="$POSTGRES_PASSWORD"
until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done
psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -f /migrations/001_init.sql
volumeMounts:
- name: migrations
mountPath: /migrations/001_init.sql
subPath: 001_init.sql
readOnly: true
volumes:
- name: migrations
configMap:
name: postgres-initdb

View File

@@ -0,0 +1,219 @@
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function getIsoNow() {
return new Date().toISOString();
}
function envString(name, fallback) {
const v = process.env[name];
if (v == null) return fallback;
const s = String(v).trim();
return s ? s : fallback;
}
function envInt(name, fallback, { min, max } = {}) {
const v = process.env[name];
if (v == null) return fallback;
const n = Number.parseInt(String(v), 10);
if (!Number.isFinite(n)) return fallback;
const low = typeof min === 'number' ? min : n;
const high = typeof max === 'number' ? max : n;
return Math.max(low, Math.min(high, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function toIntOrNull(v) {
if (v == null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
if (typeof v === 'string') {
const s = v.trim();
if (!s) return null;
const n = Number.parseInt(s, 10);
return Number.isFinite(n) ? n : null;
}
return null;
}
function numStr(v) {
if (v == null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null;
if (typeof v === 'string') {
const s = v.trim();
return s ? s : null;
}
return null;
}
function isoFromEpochMs(v) {
const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN;
if (!Number.isFinite(n) || n <= 0) return null;
const d = new Date(n);
const ms = d.getTime();
if (!Number.isFinite(ms)) return null;
return d.toISOString();
}
function resolveConfig() {
const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql');
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
const source = envString('TICKS_SOURCE', 'dlob_stats');
return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source };
}
async function graphqlRequest(cfg, query, variables) {
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
},
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
let json;
try {
json = JSON.parse(text);
} catch {
throw new Error(`Hasura: invalid json: ${text}`);
}
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
async function fetchStats(cfg) {
const query = `
query DlobStatsLatest($markets: [String!]!) {
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
market_name
market_index
ts
slot
oracle_price
mark_price
mid_price
best_bid_price
best_ask_price
updated_at
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
const rows = Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : [];
return rows;
}
async function insertTicks(cfg, objects) {
if (!objects.length) return 0;
const mutation = `
mutation InsertTicks($objects: [drift_ticks_insert_input!]!) {
insert_drift_ticks(objects: $objects) { affected_rows }
}
`;
const data = await graphqlRequest(cfg, mutation, { objects });
return Number(data?.insert_drift_ticks?.affected_rows || 0);
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'trade-ingestor',
mode: 'dlob_stats_ticks',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
markets: cfg.markets,
pollMs: cfg.pollMs,
source: cfg.source,
},
null,
2
)
);
while (true) {
try {
const rows = await fetchStats(cfg);
const nowIso = getIsoNow();
const objects = [];
for (const r of rows) {
const marketName = String(r?.market_name || '').trim();
if (!marketName) continue;
const updatedAt = r?.updated_at ? String(r.updated_at) : '';
if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt);
const marketIndex = toIntOrNull(r?.market_index) ?? 0;
const dlobIso = isoFromEpochMs(r?.ts);
const tsIso = dlobIso || nowIso;
const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price);
const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice;
if (!oraclePrice) continue;
objects.push({
ts: tsIso,
market_index: marketIndex,
symbol: marketName,
oracle_price: oraclePrice,
mark_price: markPrice,
oracle_slot: r?.slot == null ? null : String(r.slot),
source: cfg.source,
raw: {
from: 'dlob_stats_latest',
market_name: marketName,
market_index: marketIndex,
dlob: {
ts: r?.ts ?? null,
slot: r?.slot ?? null,
best_bid_price: r?.best_bid_price ?? null,
best_ask_price: r?.best_ask_price ?? null,
mid_price: r?.mid_price ?? null,
updated_at: updatedAt || null,
},
},
});
}
const inserted = await insertTicks(cfg, objects);
if (inserted) {
console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`);
}
} catch (err) {
console.error(`[dlob-ticks] error: ${String(err?.message || err)}`);
await sleep(2_000);
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,16 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker
spec:
template:
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: worker
env:
- name: DLOB_HTTP_URL
value: https://dlob.drift.trade
- name: DLOB_FORCE_IPV6
value: "true"

View File

@@ -0,0 +1,487 @@
import fs from 'node:fs';
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function envString(name, fallback) {
const v = process.env[name];
if (v == null) return fallback;
const s = String(v).trim();
return s ? s : fallback;
}
function envNumber(name, fallback) {
const v = process.env[name];
if (v == null) return fallback;
const n = Number(v);
return Number.isFinite(n) ? n : fallback;
}
function envInt(name, fallback, { min, max } = {}) {
const v = process.env[name];
if (v == null) return fallback;
const n = Number.parseInt(String(v), 10);
if (!Number.isInteger(n)) return fallback;
const nn = Math.max(min ?? n, Math.min(max ?? n, n));
return nn;
}
function readJson(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function readTokenFromFile(filePath) {
const json = readJson(filePath);
const raw = json?.token || json?.jwt || json?.authToken;
const tok = typeof raw === 'string' ? raw.trim() : '';
return tok ? tok : undefined;
}
function urlWithPath(baseUrl, path) {
const u = new URL(baseUrl);
const basePath = u.pathname && u.pathname !== '/' ? u.pathname.replace(/\/$/, '') : '';
u.pathname = `${basePath}${path}`;
return u;
}
async function httpJson(url, options) {
const res = await fetch(url, options);
const text = await res.text();
let json;
try {
json = JSON.parse(text);
} catch {
json = undefined;
}
return { ok: res.ok, status: res.status, json, text };
}
function mean(values) {
if (!values.length) return 0;
return values.reduce((a, b) => a + b, 0) / values.length;
}
function stddev(values) {
if (!values.length) return 0;
const m = mean(values);
const v = values.reduce((acc, x) => acc + (x - m) * (x - m), 0) / values.length;
return Math.sqrt(v);
}
function quantile(values, q) {
if (!values.length) return 0;
const sorted = values.slice().sort((a, b) => a - b);
const pos = (sorted.length - 1) * q;
const base = Math.floor(pos);
const rest = pos - base;
if (sorted[base + 1] == null) return sorted[base];
return sorted[base] + rest * (sorted[base + 1] - sorted[base]);
}
function randn() {
// BoxMuller
let u = 0;
let v = 0;
while (u === 0) u = Math.random();
while (v === 0) v = Math.random();
return Math.sqrt(-2 * Math.log(u)) * Math.cos(2 * Math.PI * v);
}
function clamp(v, min, max) {
if (v < min) return min;
if (v > max) return max;
return v;
}
function isTruthy(value) {
const v = String(value ?? '')
.trim()
.toLowerCase();
if (!v) return false;
return !['0', 'false', 'off', 'no', 'none', 'disabled'].includes(v);
}
function parsePriceFromTick(t) {
const mp = t?.mark_price ?? t?.markPrice;
const op = t?.oracle_price ?? t?.oraclePrice ?? t?.price;
const v = mp != null ? Number(mp) : Number(op);
return Number.isFinite(v) && v > 0 ? v : null;
}
function computeLogReturns(prices) {
const out = [];
for (let i = 1; i < prices.length; i++) {
const a = prices[i - 1];
const b = prices[i];
if (!(a > 0) || !(b > 0)) continue;
const r = Math.log(b / a);
if (Number.isFinite(r)) out.push(r);
}
return out;
}
class BlockSampler {
#values;
#minBlock;
#maxBlock;
#idx = 0;
#end = 0;
#remaining = 0;
constructor(values, { minBlock, maxBlock }) {
this.#values = values.slice();
this.#minBlock = Math.max(1, minBlock);
this.#maxBlock = Math.max(this.#minBlock, maxBlock);
this.#startNewBlock();
}
#startNewBlock() {
const n = this.#values.length;
if (n <= 1) {
this.#idx = 0;
this.#end = n;
this.#remaining = n;
return;
}
const start = Math.floor(Math.random() * n);
const span = this.#maxBlock - this.#minBlock + 1;
const len = this.#minBlock + Math.floor(Math.random() * span);
this.#idx = start;
this.#end = Math.min(n, start + len);
this.#remaining = this.#end - this.#idx;
}
next() {
if (this.#values.length === 0) return 0;
if (this.#remaining <= 0 || this.#idx >= this.#end) this.#startNewBlock();
const v = this.#values[this.#idx];
this.#idx += 1;
this.#remaining -= 1;
return Number.isFinite(v) ? v : 0;
}
}
async function fetchSeedTicksPage({ apiBase, readToken, symbol, source, limit, to }) {
const u = urlWithPath(apiBase, '/v1/ticks');
u.searchParams.set('symbol', symbol);
u.searchParams.set('limit', String(limit));
if (source) u.searchParams.set('source', source);
if (to) u.searchParams.set('to', to);
const res = await httpJson(u.toString(), {
method: 'GET',
headers: {
authorization: `Bearer ${readToken}`,
},
});
if (!res.ok) throw new Error(`seed_ticks_http_${res.status}: ${res.text}`);
if (!res.json?.ok) throw new Error(`seed_ticks_error: ${res.json?.error || res.text}`);
const ticks = Array.isArray(res.json?.ticks) ? res.json.ticks : [];
return ticks;
}
async function fetchSeedTicksPaged({ apiBase, readToken, symbol, source, desiredLimit, pageLimit, maxPages }) {
const pages = [];
let cursorTo = null;
for (let page = 0; page < maxPages; page++) {
const remaining = desiredLimit - pages.reduce((acc, p) => acc + p.length, 0);
if (remaining <= 0) break;
const limit = Math.min(pageLimit, remaining);
const ticks = await fetchSeedTicksPage({ apiBase, readToken, symbol, source, limit, to: cursorTo });
if (!ticks.length) break;
// Server returns ascending ticks; we unshift to keep overall chronological order.
pages.unshift(ticks);
const oldestTs = String(ticks[0]?.ts || '').trim();
if (!oldestTs) break;
const oldestMs = Date.parse(oldestTs);
if (!Number.isFinite(oldestMs)) break;
cursorTo = new Date(oldestMs - 1).toISOString();
}
const flat = pages.flat();
if (!flat.length) return flat;
// Best-effort de-duplication (in case of overlapping `to` bounds).
const seen = new Set();
const out = [];
for (const t of flat) {
const ts = String(t?.ts || '');
const key = ts ? ts : JSON.stringify(t);
if (seen.has(key)) continue;
seen.add(key);
out.push(t);
}
return out;
}
async function ingestTick({ apiBase, writeToken, tick }) {
const u = urlWithPath(apiBase, '/v1/ingest/tick');
const res = await httpJson(u.toString(), {
method: 'POST',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${writeToken}`,
},
body: JSON.stringify(tick),
});
if (!res.ok) throw new Error(`ingest_http_${res.status}: ${res.text}`);
if (!res.json?.ok) throw new Error(`ingest_error: ${res.json?.error || res.text}`);
return res.json?.id || null;
}
function formatNumeric(value) {
if (!Number.isFinite(value)) return '0';
// Avoid scientific notation for small values while keeping reasonable precision.
const abs = Math.abs(value);
if (abs === 0) return '0';
if (abs >= 1) return value.toFixed(8).replace(/\.?0+$/, '');
if (abs >= 0.01) return value.toFixed(10).replace(/\.?0+$/, '');
if (abs >= 0.0001) return value.toFixed(12).replace(/\.?0+$/, '');
return value.toFixed(16).replace(/\.?0+$/, '');
}
async function main() {
const apiBase = envString('INGEST_API_URL', 'http://trade-api:8787');
const symbol = envString('MARKET_NAME', envString('SYMBOL', 'PUMP-PERP'));
const source = envString('SOURCE', 'drift_oracle');
const intervalMs = envInt('INTERVAL_MS', 1000, { min: 50, max: 60_000 });
const readTokenFile = envString('FAKE_READ_TOKEN_FILE', envString('READ_TOKEN_FILE', '/tokens/read.json'));
const writeTokenFile = envString('FAKE_WRITE_TOKEN_FILE', envString('WRITE_TOKEN_FILE', '/app/tokens/alg.json'));
const seedLimitDesired = envInt('FAKE_SEED_LIMIT', 50_000, { min: 50, max: 200_000 });
const seedPageLimit = envInt('FAKE_SEED_PAGE_LIMIT', 5000, { min: 50, max: 5000 });
const seedMaxPages = envInt(
'FAKE_SEED_MAX_PAGES',
Math.ceil(seedLimitDesired / seedPageLimit) + 2,
{ min: 1, max: 200 }
);
const seedSourceRaw = process.env.FAKE_SEED_SOURCE;
const seedSource = seedSourceRaw == null ? source : String(seedSourceRaw).trim();
const minBlock = envInt('FAKE_BLOCK_MIN', 120, { min: 1, max: 50_000 });
const maxBlock = envInt('FAKE_BLOCK_MAX', 1200, { min: 1, max: 50_000 });
const volScale = envNumber('FAKE_VOL_SCALE', 1);
const noiseScale = envNumber('FAKE_NOISE_SCALE', 0.05);
const meanReversion = envNumber('FAKE_MEAN_REVERSION', 0.0002);
const markNoiseBps = envNumber('FAKE_MARK_NOISE_BPS', 5);
const logEvery = envInt('FAKE_LOG_EVERY', 30, { min: 1, max: 10_000 });
const marketIndexEnv = process.env.MARKET_INDEX;
const marketIndexFallback = Number.isInteger(Number(marketIndexEnv)) ? Number(marketIndexEnv) : 0;
const startPriceEnv = envNumber('FAKE_START_PRICE', 0);
const clampEnabled = isTruthy(process.env.FAKE_CLAMP ?? '1');
const clampQLow = clamp(envNumber('FAKE_CLAMP_Q_LOW', 0.05), 0.0, 0.49);
const clampQHigh = clamp(envNumber('FAKE_CLAMP_Q_HIGH', 0.95), 0.51, 1.0);
const clampLowMult = envNumber('FAKE_CLAMP_LOW_MULT', 0.8);
const clampHighMult = envNumber('FAKE_CLAMP_HIGH_MULT', 1.2);
const readToken = readTokenFromFile(readTokenFile);
const writeToken = readTokenFromFile(writeTokenFile);
if (!writeToken) throw new Error(`Missing write token (expected JSON token at ${writeTokenFile})`);
let seedTicks = [];
let seedPrices = [];
let marketIndex = marketIndexFallback;
let seedFromTs = null;
let seedToTs = null;
if (!readToken) {
console.warn(`[fake-ingestor] No read token at ${readTokenFile}; running without seed data.`);
} else {
seedTicks = await fetchSeedTicksPaged({
apiBase,
readToken,
symbol,
source: seedSource ? seedSource : undefined,
desiredLimit: seedLimitDesired,
pageLimit: seedPageLimit,
maxPages: seedMaxPages,
});
seedFromTs = seedTicks.length ? String(seedTicks[0]?.ts || '') : null;
seedToTs = seedTicks.length ? String(seedTicks[seedTicks.length - 1]?.ts || '') : null;
for (const t of seedTicks) {
const p = parsePriceFromTick(t);
if (p != null) seedPrices.push(p);
}
const lastTick = seedTicks.length ? seedTicks[seedTicks.length - 1] : null;
const mi = lastTick?.market_index ?? lastTick?.marketIndex;
if (Number.isInteger(Number(mi))) marketIndex = Number(mi);
}
if (!seedPrices.length && startPriceEnv > 0) {
seedPrices = [startPriceEnv];
}
if (!seedPrices.length) {
throw new Error(
'No seed prices available. Provide FAKE_START_PRICE (and optionally MARKET_INDEX) or mount a read token to seed from /v1/ticks.'
);
}
const returnsRaw = computeLogReturns(seedPrices);
const mu = mean(returnsRaw);
const sigma = stddev(returnsRaw);
const center = envString('FAKE_CENTER_RETURNS', '1') !== '0';
const returns = returnsRaw.length
? center
? returnsRaw.map((r) => r - mu)
: returnsRaw
: [];
const sampler = new BlockSampler(returns.length ? returns : [0], { minBlock, maxBlock });
const pLow = quantile(seedPrices, clampQLow);
const p50 = quantile(seedPrices, 0.5);
const pHigh = quantile(seedPrices, clampQHigh);
const clampMin = pLow > 0 ? pLow * clampLowMult : Math.min(...seedPrices) * clampLowMult;
const clampMax = pHigh > 0 ? pHigh * clampHighMult : Math.max(...seedPrices) * clampHighMult;
let logPrice = Math.log(seedPrices[seedPrices.length - 1]);
const targetLog = Math.log(p50 > 0 ? p50 : seedPrices[seedPrices.length - 1]);
console.log(
JSON.stringify(
{
service: 'trade-fake-ingestor',
apiBase,
symbol,
source,
intervalMs,
marketIndex,
seed: {
ok: Boolean(seedTicks.length),
desiredLimit: seedLimitDesired,
pageLimit: seedPageLimit,
maxPages: seedMaxPages,
source: seedSource || null,
ticks: seedTicks.length,
prices: seedPrices.length,
fromTs: seedFromTs,
toTs: seedToTs,
},
model: {
type: 'block-bootstrap-returns',
centerReturns: center,
mu,
sigma,
volScale,
noiseScale,
meanReversion,
block: { minBlock, maxBlock },
clamp: clampEnabled ? { min: clampMin, max: clampMax } : { disabled: true },
},
},
null,
2
)
);
let stopping = false;
process.on('SIGINT', () => {
stopping = true;
});
process.on('SIGTERM', () => {
stopping = true;
});
let tickCount = 0;
let nextAt = Date.now();
while (!stopping) {
const now = Date.now();
if (now < nextAt) await sleep(nextAt - now);
nextAt += intervalMs;
const r = sampler.next();
const noise = (Number.isFinite(sigma) ? sigma : 0) * noiseScale * randn();
const revert = Number.isFinite(meanReversion) ? meanReversion * (targetLog - logPrice) : 0;
const step = (Number.isFinite(r) ? r : 0) * volScale + noise + revert;
logPrice += step;
let oraclePrice = Math.exp(logPrice);
if (clampEnabled && Number.isFinite(clampMin) && Number.isFinite(clampMax) && clampMax > clampMin) {
oraclePrice = clamp(oraclePrice, clampMin, clampMax);
logPrice = Math.log(oraclePrice);
}
const markNoise = (markNoiseBps / 10_000) * randn();
const markPrice = oraclePrice * (1 + markNoise);
const ts = new Date().toISOString();
const tick = {
ts,
market_index: marketIndex,
symbol,
oracle_price: formatNumeric(oraclePrice),
mark_price: formatNumeric(markPrice),
source,
raw: {
fake: true,
model: 'block-bootstrap-returns',
seeded: seedTicks.length
? {
symbol,
source: seedSource || null,
desiredLimit: seedLimitDesired,
pageLimit: seedPageLimit,
maxPages: seedMaxPages,
fromTs: seedFromTs,
toTs: seedToTs,
}
: {
symbol,
source: null,
desiredLimit: 0,
pageLimit: seedPageLimit,
maxPages: seedMaxPages,
fromTs: null,
toTs: null,
},
},
};
try {
await ingestTick({ apiBase, writeToken, tick });
tickCount += 1;
if (tickCount % logEvery === 0) {
console.log(
`[fake-ingestor] ok ticks=${tickCount} ts=${ts} px=${tick.oracle_price} mark=${tick.mark_price} clamp=[${formatNumeric(
clampMin
)},${formatNumeric(clampMax)}]`
);
}
} catch (err) {
console.warn(`[fake-ingestor] ingest failed: ${String(err?.message || err)}`);
await sleep(Math.min(5000, Math.max(250, intervalMs)));
}
}
console.log('[fake-ingestor] stopped');
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,24 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: trade-frontend-root
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
traefik.ingress.kubernetes.io/router.entrypoints: websecure
spec:
ingressClassName: traefik
tls:
- hosts:
- mpabi.pl
secretName: mpabi-pl-tls
rules:
- host: mpabi.pl
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: trade-frontend
port:
number: 8081

View File

@@ -0,0 +1,36 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-ingestor
spec:
template:
spec:
containers:
- name: ingestor
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
- name: TICKS_POLL_MS
value: "1000"
- name: TICKS_SOURCE
value: "dlob_stats"
command: ["node"]
args: ["/opt/dlob/dlob-ingestor.mjs"]
volumeMounts:
- name: dlob-script
mountPath: /opt/dlob/dlob-ingestor.mjs
subPath: dlob-ingestor.mjs
readOnly: true
volumes:
- name: dlob-script
configMap:
name: trade-dlob-ingestor-script

View File

@@ -0,0 +1,54 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-ingestor
spec:
template:
spec:
containers:
- name: ingestor
env:
- name: FAKE_READ_TOKEN_FILE
value: "/tokens/read.json"
- name: FAKE_WRITE_TOKEN_FILE
value: "/app/tokens/alg.json"
- name: FAKE_SEED_LIMIT
value: "50000"
- name: FAKE_SEED_PAGE_LIMIT
value: "5000"
- name: FAKE_SEED_MAX_PAGES
value: "20"
- name: FAKE_BLOCK_MIN
value: "300"
- name: FAKE_BLOCK_MAX
value: "1800"
- name: FAKE_VOL_SCALE
value: "1.8"
- name: FAKE_NOISE_SCALE
value: "0.03"
- name: FAKE_MEAN_REVERSION
value: "0.0001"
- name: FAKE_CLAMP_Q_LOW
value: "0.02"
- name: FAKE_CLAMP_Q_HIGH
value: "0.98"
- name: FAKE_CLAMP_LOW_MULT
value: "0.7"
- name: FAKE_CLAMP_HIGH_MULT
value: "1.3"
command: ["node"]
args: ["/opt/fake/fake-ingestor.mjs"]
volumeMounts:
- name: fake-script
mountPath: /opt/fake
readOnly: true
- name: read-tokens
mountPath: /tokens
readOnly: true
volumes:
- name: fake-script
configMap:
name: trade-fake-ingestor-script
- name: read-tokens
secret:
secretName: trade-frontend-tokens

View File

@@ -7,10 +7,21 @@ resources:
- ../../base
- pgadmin.yaml
- frontend-ingress.yaml
- frontend-ingress-root.yaml
patchesStrategicMerge:
- hasura-patch.yaml
- frontend-auth-patch.yaml
- ingestor-dlob-patch.yaml
- dlob-worker-patch.yaml
configMapGenerator:
- name: trade-dlob-ingestor-script
files:
- dlob-ingestor.mjs
generatorOptions:
disableNameSuffixHash: true
commonLabels:
env: staging