Compare commits
33 Commits
227d035f01
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 773784e1a3 | |||
| 56045f8f55 | |||
| 1ba4c72e11 | |||
| 1efa41c112 | |||
| b925ad78e8 | |||
| 0f638f1d70 | |||
| 5c59b27808 | |||
| d66ce89c7a | |||
| 78711b9baf | |||
| 51424eeefc | |||
| 45a637f545 | |||
| 1b1603c8f0 | |||
| 86fcc286e5 | |||
| 5992a54ac3 | |||
| 44853ab6f6 | |||
| c06a459b67 | |||
| 370cb3f74c | |||
| ead68a25cf | |||
| 392458ad99 | |||
| bd05eab467 | |||
| f39f201b70 | |||
| fee9120bc2 | |||
| f32be5ea1c | |||
| 1a7a1c4de8 | |||
| a628f9044f | |||
| 0c853354eb | |||
| 476eb331c2 | |||
| 93587645cd | |||
| 42f26089e1 | |||
| 75e87a7cc8 | |||
| 0eef6bca12 | |||
| ac50ca2117 | |||
| 5442d52ab1 |
@@ -6,7 +6,7 @@ metadata:
|
||||
spec:
|
||||
project: default
|
||||
source:
|
||||
repoURL: https://rv32i.pl/trade/trade-deploy.git
|
||||
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
|
||||
targetRevision: main
|
||||
path: kustomize/infra/portainer
|
||||
destination:
|
||||
|
||||
@@ -6,7 +6,7 @@ metadata:
|
||||
spec:
|
||||
project: default
|
||||
source:
|
||||
repoURL: https://rv32i.pl/trade/trade-deploy.git
|
||||
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
|
||||
targetRevision: main
|
||||
path: kustomize/overlays/prod
|
||||
destination:
|
||||
|
||||
@@ -6,7 +6,7 @@ metadata:
|
||||
spec:
|
||||
project: default
|
||||
source:
|
||||
repoURL: https://rv32i.pl/trade/trade-deploy.git
|
||||
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
|
||||
targetRevision: main
|
||||
path: kustomize/overlays/staging
|
||||
destination:
|
||||
|
||||
@@ -16,7 +16,7 @@ spec:
|
||||
- name: gitea-registry
|
||||
containers:
|
||||
- name: api
|
||||
image: rv32i.pl/trade/trade-api:k3s-20260106013603
|
||||
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- name: http
|
||||
|
||||
48
kustomize/base/dlob-depth-worker/deployment.yaml
Normal file
48
kustomize/base/dlob-depth-worker/deployment.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-depth-worker
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "6"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-depth-worker
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-depth-worker
|
||||
spec:
|
||||
containers:
|
||||
- name: worker
|
||||
image: node:20-slim
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: HASURA_GRAPHQL_URL
|
||||
value: http://hasura:8080/v1/graphql
|
||||
- name: HASURA_ADMIN_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-hasura
|
||||
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||
- name: DLOB_MARKETS
|
||||
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
|
||||
- name: DLOB_POLL_MS
|
||||
value: "1000"
|
||||
- name: DLOB_DEPTH_BPS_BANDS
|
||||
value: "5,10,20,50,100,200"
|
||||
- name: PRICE_PRECISION
|
||||
value: "1000000"
|
||||
- name: BASE_PRECISION
|
||||
value: "1000000000"
|
||||
command: ["node", "/app/worker.mjs"]
|
||||
volumeMounts:
|
||||
- name: script
|
||||
mountPath: /app/worker.mjs
|
||||
subPath: worker.mjs
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: script
|
||||
configMap:
|
||||
name: dlob-depth-worker-script
|
||||
311
kustomize/base/dlob-depth-worker/worker.mjs
Normal file
311
kustomize/base/dlob-depth-worker/worker.mjs
Normal file
@@ -0,0 +1,311 @@
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function clampInt(value, min, max, fallback) {
|
||||
const n = Number.parseInt(String(value ?? ''), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
return Math.min(max, Math.max(min, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function envIntList(name, fallbackCsv) {
|
||||
const out = [];
|
||||
for (const item of envList(name, fallbackCsv)) {
|
||||
const n = Number.parseInt(item, 10);
|
||||
if (!Number.isFinite(n)) continue;
|
||||
out.push(n);
|
||||
}
|
||||
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
|
||||
}
|
||||
|
||||
function toNumberOrNull(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
|
||||
if (typeof value === 'string') {
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
const n = Number(s);
|
||||
return Number.isFinite(n) ? n : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
|
||||
if (typeof value === 'string') return value.trim() || null;
|
||||
return null;
|
||||
}
|
||||
|
||||
function jsonNormalize(value) {
|
||||
if (typeof value !== 'string') return value;
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
try {
|
||||
return JSON.parse(s);
|
||||
} catch {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
|
||||
const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200');
|
||||
|
||||
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
||||
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
||||
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
|
||||
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
||||
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
|
||||
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
||||
|
||||
return {
|
||||
hasuraUrl,
|
||||
hasuraAdminSecret,
|
||||
hasuraAuthToken,
|
||||
markets,
|
||||
pollMs,
|
||||
bandsBps,
|
||||
pricePrecision,
|
||||
basePrecision,
|
||||
};
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function parseLevels(raw, pricePrecision, basePrecision, side) {
|
||||
const v = jsonNormalize(raw);
|
||||
if (!Array.isArray(v)) return [];
|
||||
|
||||
const out = [];
|
||||
for (const item of v) {
|
||||
const priceInt = toNumberOrNull(item?.price);
|
||||
const sizeInt = toNumberOrNull(item?.size);
|
||||
if (priceInt == null || sizeInt == null) continue;
|
||||
const price = priceInt / pricePrecision;
|
||||
const size = sizeInt / basePrecision;
|
||||
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
|
||||
out.push({ price, size });
|
||||
}
|
||||
|
||||
if (side === 'bid') out.sort((a, b) => b.price - a.price);
|
||||
if (side === 'ask') out.sort((a, b) => a.price - b.price);
|
||||
return out;
|
||||
}
|
||||
|
||||
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
|
||||
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
|
||||
if (markPrice != null) return markPrice;
|
||||
if (oraclePrice != null) return oraclePrice;
|
||||
return null;
|
||||
}
|
||||
|
||||
function computeBandDepth({ bids, asks, mid, bandBps }) {
|
||||
if (mid == null || !(mid > 0)) {
|
||||
return { bidBase: 0, askBase: 0, bidUsd: 0, askUsd: 0, imbalance: null };
|
||||
}
|
||||
|
||||
const minBidPrice = mid * (1 - bandBps / 10_000);
|
||||
const maxAskPrice = mid * (1 + bandBps / 10_000);
|
||||
|
||||
let bidBase = 0;
|
||||
let askBase = 0;
|
||||
let bidUsd = 0;
|
||||
let askUsd = 0;
|
||||
|
||||
for (const lvl of bids) {
|
||||
if (lvl.price < minBidPrice) break;
|
||||
bidBase += lvl.size;
|
||||
bidUsd += lvl.size * lvl.price;
|
||||
}
|
||||
|
||||
for (const lvl of asks) {
|
||||
if (lvl.price > maxAskPrice) break;
|
||||
askBase += lvl.size;
|
||||
askUsd += lvl.size * lvl.price;
|
||||
}
|
||||
|
||||
const denom = bidUsd + askUsd;
|
||||
const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null;
|
||||
return { bidBase, askBase, bidUsd, askUsd, imbalance };
|
||||
}
|
||||
|
||||
async function fetchL2Latest(cfg) {
|
||||
const query = `
|
||||
query DlobL2Latest($markets: [String!]!) {
|
||||
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
|
||||
market_name
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mark_price
|
||||
oracle_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
bids
|
||||
asks
|
||||
updated_at
|
||||
}
|
||||
}
|
||||
`;
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
|
||||
}
|
||||
|
||||
async function upsertDepth(cfg, rows) {
|
||||
if (!rows.length) return;
|
||||
const mutation = `
|
||||
mutation UpsertDlobDepth($rows: [dlob_depth_bps_latest_insert_input!]!) {
|
||||
insert_dlob_depth_bps_latest(
|
||||
objects: $rows
|
||||
on_conflict: {
|
||||
constraint: dlob_depth_bps_latest_pkey
|
||||
update_columns: [
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mid_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
bid_base
|
||||
ask_base
|
||||
bid_usd
|
||||
ask_usd
|
||||
imbalance
|
||||
raw
|
||||
updated_at
|
||||
]
|
||||
}
|
||||
) { affected_rows }
|
||||
}
|
||||
`;
|
||||
await graphqlRequest(cfg, mutation, { rows });
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastUpdatedAtByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-depth-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
bandsBps: cfg.bandsBps,
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
const rows = [];
|
||||
|
||||
try {
|
||||
const l2Rows = await fetchL2Latest(cfg);
|
||||
for (const l2 of l2Rows) {
|
||||
const market = String(l2.market_name || '').trim();
|
||||
if (!market) continue;
|
||||
|
||||
const updatedAt = l2.updated_at || null;
|
||||
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
|
||||
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
|
||||
|
||||
const bestBid = toNumberOrNull(l2.best_bid_price);
|
||||
const bestAsk = toNumberOrNull(l2.best_ask_price);
|
||||
const markPrice = toNumberOrNull(l2.mark_price);
|
||||
const oraclePrice = toNumberOrNull(l2.oracle_price);
|
||||
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
|
||||
|
||||
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
|
||||
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
|
||||
|
||||
for (const bandBps of cfg.bandsBps) {
|
||||
const d = computeBandDepth({ bids, asks, mid, bandBps });
|
||||
rows.push({
|
||||
market_name: market,
|
||||
band_bps: bandBps,
|
||||
market_type: l2.market_type ? String(l2.market_type) : 'perp',
|
||||
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mid_price: numStr(mid),
|
||||
best_bid_price: numStr(bestBid),
|
||||
best_ask_price: numStr(bestAsk),
|
||||
bid_base: numStr(d.bidBase),
|
||||
ask_base: numStr(d.askBase),
|
||||
bid_usd: numStr(d.bidUsd),
|
||||
ask_usd: numStr(d.askUsd),
|
||||
imbalance: numStr(d.imbalance),
|
||||
raw: {
|
||||
ref: 'mid',
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[dlob-depth-worker] fetch/compute: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
try {
|
||||
await upsertDepth(cfg, rows);
|
||||
} catch (err) {
|
||||
console.error(`[dlob-depth-worker] upsert: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
48
kustomize/base/dlob-slippage-worker/deployment.yaml
Normal file
48
kustomize/base/dlob-slippage-worker/deployment.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-slippage-worker
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "6"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-slippage-worker
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-slippage-worker
|
||||
spec:
|
||||
containers:
|
||||
- name: worker
|
||||
image: node:20-slim
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: HASURA_GRAPHQL_URL
|
||||
value: http://hasura:8080/v1/graphql
|
||||
- name: HASURA_ADMIN_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-hasura
|
||||
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||
- name: DLOB_MARKETS
|
||||
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
|
||||
- name: DLOB_POLL_MS
|
||||
value: "1000"
|
||||
- name: DLOB_SLIPPAGE_SIZES_USD
|
||||
value: "100,500,1000,5000,10000,50000"
|
||||
- name: PRICE_PRECISION
|
||||
value: "1000000"
|
||||
- name: BASE_PRECISION
|
||||
value: "1000000000"
|
||||
command: ["node", "/app/worker.mjs"]
|
||||
volumeMounts:
|
||||
- name: script
|
||||
mountPath: /app/worker.mjs
|
||||
subPath: worker.mjs
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: script
|
||||
configMap:
|
||||
name: dlob-slippage-worker-script
|
||||
379
kustomize/base/dlob-slippage-worker/worker.mjs
Normal file
379
kustomize/base/dlob-slippage-worker/worker.mjs
Normal file
@@ -0,0 +1,379 @@
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function clampInt(value, min, max, fallback) {
|
||||
const n = Number.parseInt(String(value ?? ''), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
return Math.min(max, Math.max(min, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function envIntList(name, fallbackCsv) {
|
||||
const out = [];
|
||||
for (const item of envList(name, fallbackCsv)) {
|
||||
const n = Number.parseInt(item, 10);
|
||||
if (!Number.isFinite(n)) continue;
|
||||
out.push(n);
|
||||
}
|
||||
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
|
||||
}
|
||||
|
||||
function toNumberOrNull(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
|
||||
if (typeof value === 'string') {
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
const n = Number(s);
|
||||
return Number.isFinite(n) ? n : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
|
||||
if (typeof value === 'string') return value.trim() || null;
|
||||
return null;
|
||||
}
|
||||
|
||||
function jsonNormalize(value) {
|
||||
if (typeof value !== 'string') return value;
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
try {
|
||||
return JSON.parse(s);
|
||||
} catch {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
|
||||
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
|
||||
|
||||
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
||||
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
||||
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
|
||||
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
||||
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
|
||||
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
||||
|
||||
return {
|
||||
hasuraUrl,
|
||||
hasuraAdminSecret,
|
||||
hasuraAuthToken,
|
||||
markets,
|
||||
pollMs,
|
||||
sizesUsd,
|
||||
pricePrecision,
|
||||
basePrecision,
|
||||
};
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function parseLevels(raw, pricePrecision, basePrecision, side) {
|
||||
const v = jsonNormalize(raw);
|
||||
if (!Array.isArray(v)) return [];
|
||||
|
||||
const out = [];
|
||||
for (const item of v) {
|
||||
const priceInt = toNumberOrNull(item?.price);
|
||||
const sizeInt = toNumberOrNull(item?.size);
|
||||
if (priceInt == null || sizeInt == null) continue;
|
||||
const price = priceInt / pricePrecision;
|
||||
const size = sizeInt / basePrecision;
|
||||
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
|
||||
out.push({ price, size });
|
||||
}
|
||||
|
||||
if (side === 'bid') out.sort((a, b) => b.price - a.price);
|
||||
if (side === 'ask') out.sort((a, b) => a.price - b.price);
|
||||
return out;
|
||||
}
|
||||
|
||||
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
|
||||
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
|
||||
if (markPrice != null) return markPrice;
|
||||
if (oraclePrice != null) return oraclePrice;
|
||||
return null;
|
||||
}
|
||||
|
||||
function simulateBuy(asks, mid, sizeUsd) {
|
||||
let remainingUsd = sizeUsd;
|
||||
let filledUsd = 0;
|
||||
let filledBase = 0;
|
||||
let worstPrice = null;
|
||||
let levelsConsumed = 0;
|
||||
|
||||
for (const lvl of asks) {
|
||||
if (!(remainingUsd > 0)) break;
|
||||
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
|
||||
|
||||
const maxBase = remainingUsd / lvl.price;
|
||||
const takeBase = Math.min(lvl.size, maxBase);
|
||||
if (!(takeBase > 0)) continue;
|
||||
|
||||
const cost = takeBase * lvl.price;
|
||||
filledUsd += cost;
|
||||
filledBase += takeBase;
|
||||
remainingUsd -= cost;
|
||||
worstPrice = lvl.price;
|
||||
levelsConsumed += 1;
|
||||
}
|
||||
|
||||
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
|
||||
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
|
||||
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
|
||||
|
||||
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
|
||||
}
|
||||
|
||||
function simulateSell(bids, mid, sizeUsd) {
|
||||
if (mid == null || !(mid > 0)) {
|
||||
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
|
||||
}
|
||||
|
||||
const baseTarget = sizeUsd / mid;
|
||||
let remainingBase = baseTarget;
|
||||
let proceedsUsd = 0;
|
||||
let filledBase = 0;
|
||||
let worstPrice = null;
|
||||
let levelsConsumed = 0;
|
||||
|
||||
for (const lvl of bids) {
|
||||
if (!(remainingBase > 0)) break;
|
||||
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
|
||||
|
||||
const takeBase = Math.min(lvl.size, remainingBase);
|
||||
if (!(takeBase > 0)) continue;
|
||||
|
||||
const proceeds = takeBase * lvl.price;
|
||||
proceedsUsd += proceeds;
|
||||
filledBase += takeBase;
|
||||
remainingBase -= takeBase;
|
||||
worstPrice = lvl.price;
|
||||
levelsConsumed += 1;
|
||||
}
|
||||
|
||||
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
|
||||
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
|
||||
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
|
||||
|
||||
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
|
||||
}
|
||||
|
||||
async function fetchL2Latest(cfg) {
|
||||
const query = `
|
||||
query DlobL2Latest($markets: [String!]!) {
|
||||
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
|
||||
market_name
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mark_price
|
||||
oracle_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
bids
|
||||
asks
|
||||
updated_at
|
||||
}
|
||||
}
|
||||
`;
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
|
||||
}
|
||||
|
||||
async function upsertSlippage(cfg, rows) {
|
||||
if (!rows.length) return;
|
||||
const mutation = `
|
||||
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
|
||||
insert_dlob_slippage_latest(
|
||||
objects: $rows
|
||||
on_conflict: {
|
||||
constraint: dlob_slippage_latest_pkey
|
||||
update_columns: [
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mid_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
vwap_price
|
||||
worst_price
|
||||
filled_usd
|
||||
filled_base
|
||||
impact_bps
|
||||
levels_consumed
|
||||
fill_pct
|
||||
raw
|
||||
updated_at
|
||||
]
|
||||
}
|
||||
) { affected_rows }
|
||||
}
|
||||
`;
|
||||
await graphqlRequest(cfg, mutation, { rows });
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastUpdatedAtByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-slippage-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
sizesUsd: cfg.sizesUsd,
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
const rows = [];
|
||||
|
||||
try {
|
||||
const l2Rows = await fetchL2Latest(cfg);
|
||||
for (const l2 of l2Rows) {
|
||||
const market = String(l2.market_name || '').trim();
|
||||
if (!market) continue;
|
||||
|
||||
const updatedAt = l2.updated_at || null;
|
||||
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
|
||||
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
|
||||
|
||||
const bestBid = toNumberOrNull(l2.best_bid_price);
|
||||
const bestAsk = toNumberOrNull(l2.best_ask_price);
|
||||
const markPrice = toNumberOrNull(l2.mark_price);
|
||||
const oraclePrice = toNumberOrNull(l2.oracle_price);
|
||||
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
|
||||
|
||||
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
|
||||
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
|
||||
|
||||
for (const sizeUsd of cfg.sizesUsd) {
|
||||
const buy = simulateBuy(asks, mid, sizeUsd);
|
||||
rows.push({
|
||||
market_name: market,
|
||||
side: 'buy',
|
||||
size_usd: sizeUsd,
|
||||
market_type: l2.market_type ? String(l2.market_type) : 'perp',
|
||||
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mid_price: numStr(mid),
|
||||
best_bid_price: numStr(bestBid),
|
||||
best_ask_price: numStr(bestAsk),
|
||||
vwap_price: numStr(buy.vwap),
|
||||
worst_price: numStr(buy.worstPrice),
|
||||
filled_usd: numStr(buy.filledUsd),
|
||||
filled_base: numStr(buy.filledBase),
|
||||
impact_bps: numStr(buy.impactBps),
|
||||
levels_consumed: buy.levelsConsumed,
|
||||
fill_pct: numStr(buy.fillPct),
|
||||
raw: {
|
||||
ref: 'mid',
|
||||
units: 'usd',
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
});
|
||||
|
||||
const sell = simulateSell(bids, mid, sizeUsd);
|
||||
rows.push({
|
||||
market_name: market,
|
||||
side: 'sell',
|
||||
size_usd: sizeUsd,
|
||||
market_type: l2.market_type ? String(l2.market_type) : 'perp',
|
||||
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mid_price: numStr(mid),
|
||||
best_bid_price: numStr(bestBid),
|
||||
best_ask_price: numStr(bestAsk),
|
||||
vwap_price: numStr(sell.vwap),
|
||||
worst_price: numStr(sell.worstPrice),
|
||||
filled_usd: numStr(sell.filledUsd),
|
||||
filled_base: numStr(sell.filledBase),
|
||||
impact_bps: numStr(sell.impactBps),
|
||||
levels_consumed: sell.levelsConsumed,
|
||||
fill_pct: numStr(sell.fillPct),
|
||||
raw: {
|
||||
ref: 'mid',
|
||||
units: 'usd',
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
try {
|
||||
await upsertSlippage(cfg, rows);
|
||||
} catch (err) {
|
||||
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
46
kustomize/base/dlob-worker/deployment.yaml
Normal file
46
kustomize/base/dlob-worker/deployment.yaml
Normal file
@@ -0,0 +1,46 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-worker
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "5"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-worker
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-worker
|
||||
spec:
|
||||
containers:
|
||||
- name: worker
|
||||
image: node:20-slim
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: HASURA_GRAPHQL_URL
|
||||
value: http://hasura:8080/v1/graphql
|
||||
- name: HASURA_ADMIN_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-hasura
|
||||
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||
- name: DLOB_HTTP_URL
|
||||
value: http://dlob-server:6969
|
||||
- name: DLOB_MARKETS
|
||||
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
|
||||
- name: DLOB_POLL_MS
|
||||
value: "500"
|
||||
- name: DLOB_DEPTH
|
||||
value: "10"
|
||||
command: ["node", "/app/worker.mjs"]
|
||||
volumeMounts:
|
||||
- name: script
|
||||
mountPath: /app/worker.mjs
|
||||
subPath: worker.mjs
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: script
|
||||
configMap:
|
||||
name: dlob-worker-script
|
||||
430
kustomize/base/dlob-worker/worker.mjs
Normal file
430
kustomize/base/dlob-worker/worker.mjs
Normal file
@@ -0,0 +1,430 @@
|
||||
import fs from 'node:fs';
|
||||
import * as http from 'node:http';
|
||||
import * as https from 'node:https';
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function readJsonFile(filePath) {
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function clampInt(value, min, max, fallback) {
|
||||
const n = Number.parseInt(String(value ?? ''), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
return Math.min(max, Math.max(min, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function envBool(name, fallback = false) {
|
||||
const raw = process.env[name];
|
||||
if (raw == null) return fallback;
|
||||
const v = String(raw).trim().toLowerCase();
|
||||
if (['1', 'true', 'yes', 'y', 'on'].includes(v)) return true;
|
||||
if (['0', 'false', 'no', 'n', 'off'].includes(v)) return false;
|
||||
return fallback;
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const tokensPath =
|
||||
process.env.HASURA_TOKENS_FILE ||
|
||||
process.env.TOKENS_FILE ||
|
||||
process.env.HASURA_CONFIG_FILE ||
|
||||
'/app/tokens/hasura.json';
|
||||
const tokens = readJsonFile(tokensPath) || {};
|
||||
|
||||
const hasuraUrl =
|
||||
process.env.HASURA_GRAPHQL_URL ||
|
||||
tokens.graphqlUrl ||
|
||||
tokens.apiUrl ||
|
||||
'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret =
|
||||
process.env.HASURA_ADMIN_SECRET ||
|
||||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
||||
tokens.adminSecret ||
|
||||
tokens.hasuraAdminSecret;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const dlobHttpBase = String(process.env.DLOB_HTTP_URL || process.env.DLOB_HTTP_BASE || 'https://dlob.drift.trade')
|
||||
.trim()
|
||||
.replace(/\/$/, '');
|
||||
const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false);
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10);
|
||||
const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500);
|
||||
|
||||
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
||||
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
||||
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
|
||||
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
||||
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
|
||||
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
||||
|
||||
return {
|
||||
hasuraUrl,
|
||||
hasuraAdminSecret,
|
||||
hasuraAuthToken,
|
||||
dlobHttpBase,
|
||||
dlobForceIpv6,
|
||||
markets,
|
||||
depth,
|
||||
pollMs,
|
||||
pricePrecision,
|
||||
basePrecision,
|
||||
};
|
||||
}
|
||||
|
||||
async function requestText(url, { timeoutMs, family } = {}) {
|
||||
const u = new URL(url);
|
||||
const client = u.protocol === 'https:' ? https : http;
|
||||
|
||||
const port = u.port ? Number.parseInt(u.port, 10) : u.protocol === 'https:' ? 443 : 80;
|
||||
if (!Number.isFinite(port)) throw new Error(`Invalid port for url: ${url}`);
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const req = client.request(
|
||||
{
|
||||
protocol: u.protocol,
|
||||
hostname: u.hostname,
|
||||
port,
|
||||
path: `${u.pathname}${u.search}`,
|
||||
method: 'GET',
|
||||
family,
|
||||
servername: u.hostname,
|
||||
headers: {
|
||||
accept: 'application/json',
|
||||
},
|
||||
},
|
||||
(res) => {
|
||||
let data = '';
|
||||
res.setEncoding('utf8');
|
||||
res.on('data', (chunk) => {
|
||||
data += chunk;
|
||||
});
|
||||
res.on('end', () => {
|
||||
resolve({ status: res.statusCode ?? 0, text: data });
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
req.on('error', reject);
|
||||
req.setTimeout(timeoutMs ?? 5_000, () => {
|
||||
req.destroy(new Error(`Timeout after ${timeoutMs ?? 5_000}ms`));
|
||||
});
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) {
|
||||
throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
}
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function toNumberOrNull(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
|
||||
if (typeof value === 'string') {
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
const n = Number(s);
|
||||
return Number.isFinite(n) ? n : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
|
||||
if (typeof value === 'string') return value.trim() || null;
|
||||
return null;
|
||||
}
|
||||
|
||||
function parseScaled(valueRaw, scale) {
|
||||
const n = toNumberOrNull(valueRaw);
|
||||
if (n == null) return null;
|
||||
return n / scale;
|
||||
}
|
||||
|
||||
function computeStats({ l2, depth, pricePrecision, basePrecision }) {
|
||||
const bids = Array.isArray(l2?.bids) ? l2.bids : [];
|
||||
const asks = Array.isArray(l2?.asks) ? l2.asks : [];
|
||||
|
||||
const bestBid = parseScaled(l2?.bestBidPrice ?? bids?.[0]?.price, pricePrecision);
|
||||
const bestAsk = parseScaled(l2?.bestAskPrice ?? asks?.[0]?.price, pricePrecision);
|
||||
const markPrice = parseScaled(l2?.markPrice, pricePrecision);
|
||||
const oraclePrice = parseScaled(l2?.oracleData?.price ?? l2?.oracle, pricePrecision);
|
||||
|
||||
const mid = bestBid != null && bestAsk != null ? (bestBid + bestAsk) / 2 : null;
|
||||
const spreadAbs = bestBid != null && bestAsk != null ? bestAsk - bestBid : null;
|
||||
const spreadBps = spreadAbs != null && mid != null && mid > 0 ? (spreadAbs / mid) * 10_000 : null;
|
||||
|
||||
const levels = Math.max(1, depth);
|
||||
let bidBase = 0;
|
||||
let askBase = 0;
|
||||
let bidUsd = 0;
|
||||
let askUsd = 0;
|
||||
|
||||
for (let i = 0; i < Math.min(levels, bids.length); i += 1) {
|
||||
const p = parseScaled(bids[i]?.price, pricePrecision);
|
||||
const s = toNumberOrNull(bids[i]?.size);
|
||||
if (p == null || s == null) continue;
|
||||
const base = s / basePrecision;
|
||||
bidBase += base;
|
||||
bidUsd += base * p;
|
||||
}
|
||||
|
||||
for (let i = 0; i < Math.min(levels, asks.length); i += 1) {
|
||||
const p = parseScaled(asks[i]?.price, pricePrecision);
|
||||
const s = toNumberOrNull(asks[i]?.size);
|
||||
if (p == null || s == null) continue;
|
||||
const base = s / basePrecision;
|
||||
askBase += base;
|
||||
askUsd += base * p;
|
||||
}
|
||||
|
||||
const denom = bidUsd + askUsd;
|
||||
const imbalance = denom > 0 ? (bidUsd - askUsd) / denom : null;
|
||||
|
||||
return {
|
||||
bestBid,
|
||||
bestAsk,
|
||||
mid,
|
||||
spreadAbs,
|
||||
spreadBps,
|
||||
markPrice,
|
||||
oraclePrice,
|
||||
depthLevels: levels,
|
||||
bidBase,
|
||||
askBase,
|
||||
bidUsd,
|
||||
askUsd,
|
||||
imbalance,
|
||||
};
|
||||
}
|
||||
|
||||
function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
|
||||
return {
|
||||
market_name: String(l2.marketName),
|
||||
market_type: String(l2.marketType || 'perp'),
|
||||
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mark_price: numStr(parseScaled(l2.markPrice, pricePrecision)),
|
||||
oracle_price: numStr(parseScaled(l2.oracleData?.price ?? l2.oracle, pricePrecision)),
|
||||
best_bid_price: numStr(parseScaled(l2.bestBidPrice, pricePrecision)),
|
||||
best_ask_price: numStr(parseScaled(l2.bestAskPrice, pricePrecision)),
|
||||
bids: l2.bids ?? null,
|
||||
asks: l2.asks ?? null,
|
||||
raw: l2 ?? null,
|
||||
updated_at: updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
function statsToInsertObject({ l2, stats, updatedAt }) {
|
||||
return {
|
||||
market_name: String(l2.marketName),
|
||||
market_type: String(l2.marketType || 'perp'),
|
||||
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mark_price: stats.markPrice == null ? null : String(stats.markPrice),
|
||||
oracle_price: stats.oraclePrice == null ? null : String(stats.oraclePrice),
|
||||
best_bid_price: stats.bestBid == null ? null : String(stats.bestBid),
|
||||
best_ask_price: stats.bestAsk == null ? null : String(stats.bestAsk),
|
||||
mid_price: stats.mid == null ? null : String(stats.mid),
|
||||
spread_abs: stats.spreadAbs == null ? null : String(stats.spreadAbs),
|
||||
spread_bps: stats.spreadBps == null ? null : String(stats.spreadBps),
|
||||
depth_levels: stats.depthLevels,
|
||||
depth_bid_base: Number.isFinite(stats.bidBase) ? String(stats.bidBase) : null,
|
||||
depth_ask_base: Number.isFinite(stats.askBase) ? String(stats.askBase) : null,
|
||||
depth_bid_usd: Number.isFinite(stats.bidUsd) ? String(stats.bidUsd) : null,
|
||||
depth_ask_usd: Number.isFinite(stats.askUsd) ? String(stats.askUsd) : null,
|
||||
imbalance: stats.imbalance == null ? null : String(stats.imbalance),
|
||||
raw: {
|
||||
spreadPct: l2.spreadPct ?? null,
|
||||
spreadQuote: l2.spreadQuote ?? null,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
async function fetchL2(cfg, marketName) {
|
||||
const u = new URL(`${cfg.dlobHttpBase}/l2`);
|
||||
u.searchParams.set('marketName', marketName);
|
||||
u.searchParams.set('depth', String(cfg.depth));
|
||||
|
||||
const url = u.toString();
|
||||
if (cfg.dlobForceIpv6) {
|
||||
const { status, text } = await requestText(url, { timeoutMs: 5_000, family: 6 });
|
||||
if (status < 200 || status >= 300) throw new Error(`DLOB HTTP ${status}: ${text}`);
|
||||
return JSON.parse(text);
|
||||
}
|
||||
|
||||
const res = await fetch(url, { signal: AbortSignal.timeout(5_000) });
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`DLOB HTTP ${res.status}: ${text}`);
|
||||
return JSON.parse(text);
|
||||
}
|
||||
|
||||
async function upsertBatch(cfg, l2Objects, statsObjects) {
|
||||
if (!l2Objects.length && !statsObjects.length) return;
|
||||
|
||||
const mutation = `
|
||||
mutation UpsertDlob($l2: [dlob_l2_latest_insert_input!]!, $stats: [dlob_stats_latest_insert_input!]!) {
|
||||
insert_dlob_l2_latest(
|
||||
objects: $l2
|
||||
on_conflict: {
|
||||
constraint: dlob_l2_latest_pkey
|
||||
update_columns: [
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mark_price
|
||||
oracle_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
bids
|
||||
asks
|
||||
raw
|
||||
updated_at
|
||||
]
|
||||
}
|
||||
) { affected_rows }
|
||||
insert_dlob_stats_latest(
|
||||
objects: $stats
|
||||
on_conflict: {
|
||||
constraint: dlob_stats_latest_pkey
|
||||
update_columns: [
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mark_price
|
||||
oracle_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
mid_price
|
||||
spread_abs
|
||||
spread_bps
|
||||
depth_levels
|
||||
depth_bid_base
|
||||
depth_ask_base
|
||||
depth_bid_usd
|
||||
depth_ask_usd
|
||||
imbalance
|
||||
raw
|
||||
updated_at
|
||||
]
|
||||
}
|
||||
) { affected_rows }
|
||||
}
|
||||
`;
|
||||
|
||||
await graphqlRequest(cfg, mutation, { l2: l2Objects, stats: statsObjects });
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastTsByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
dlobHttpBase: cfg.dlobHttpBase,
|
||||
dlobForceIpv6: cfg.dlobForceIpv6,
|
||||
markets: cfg.markets,
|
||||
depth: cfg.depth,
|
||||
pollMs: cfg.pollMs,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
const updatedAt = getIsoNow();
|
||||
|
||||
const results = await Promise.allSettled(cfg.markets.map((m) => fetchL2(cfg, m)));
|
||||
const l2Objects = [];
|
||||
const statsObjects = [];
|
||||
|
||||
for (let i = 0; i < results.length; i += 1) {
|
||||
const market = cfg.markets[i];
|
||||
const r = results[i];
|
||||
if (r.status !== 'fulfilled') {
|
||||
console.error(`[dlob-worker] fetch ${market}: ${String(r.reason?.message || r.reason)}`);
|
||||
continue;
|
||||
}
|
||||
const l2 = r.value;
|
||||
if (!l2?.marketName) continue;
|
||||
|
||||
const ts = l2.ts == null ? null : String(l2.ts);
|
||||
if (ts != null && lastTsByMarket.get(l2.marketName) === ts) continue;
|
||||
if (ts != null) lastTsByMarket.set(l2.marketName, ts);
|
||||
|
||||
const stats = computeStats({
|
||||
l2,
|
||||
depth: cfg.depth,
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
});
|
||||
|
||||
l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision }));
|
||||
statsObjects.push(statsToInsertObject({ l2, stats, updatedAt }));
|
||||
}
|
||||
|
||||
try {
|
||||
await upsertBatch(cfg, l2Objects, statsObjects);
|
||||
} catch (err) {
|
||||
console.error(`[dlob-worker] upsert: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
69
kustomize/base/dlob/publisher-deployment.yaml
Normal file
69
kustomize/base/dlob/publisher-deployment.yaml
Normal file
@@ -0,0 +1,69 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-publisher
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "4"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-publisher
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-publisher
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: gitea-registry
|
||||
containers:
|
||||
- name: publisher
|
||||
image: gitea.mpabi.pl/trade/trade-dlob-server:sha-8a378b7-lite-l2
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 8080
|
||||
env:
|
||||
- name: RUNNING_LOCAL
|
||||
value: "true"
|
||||
- name: LOCAL_CACHE
|
||||
value: "true"
|
||||
- name: ENV
|
||||
value: mainnet-beta
|
||||
- name: USE_WEBSOCKET
|
||||
value: "true"
|
||||
- name: USE_ORDER_SUBSCRIBER
|
||||
value: "true"
|
||||
- name: DISABLE_GPA_REFRESH
|
||||
value: "true"
|
||||
- name: ELASTICACHE_HOST
|
||||
value: dlob-redis
|
||||
- name: ELASTICACHE_PORT
|
||||
value: "6379"
|
||||
- name: REDIS_CLIENT
|
||||
value: DLOB
|
||||
- name: PERP_MARKETS_TO_LOAD
|
||||
value: "0,1,2,4,75"
|
||||
- name: ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-dlob-rpc
|
||||
key: ENDPOINT
|
||||
- name: WS_ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-dlob-rpc
|
||||
key: WS_ENDPOINT
|
||||
command: ["node", "/lib/publishers/dlobPublisher.js"]
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /startup
|
||||
port: http
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: http
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 20
|
||||
79
kustomize/base/dlob/redis.yaml
Normal file
79
kustomize/base/dlob/redis.yaml
Normal file
@@ -0,0 +1,79 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: dlob-redis
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "3"
|
||||
spec:
|
||||
selector:
|
||||
app.kubernetes.io/name: dlob-redis
|
||||
ports:
|
||||
- name: redis
|
||||
port: 6379
|
||||
targetPort: redis
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-redis
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "3"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-redis
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-redis
|
||||
spec:
|
||||
containers:
|
||||
- name: redis
|
||||
image: redis:7-alpine
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
ports:
|
||||
- name: redis
|
||||
containerPort: 6379
|
||||
# DLOB redis client uses ioredis Cluster when RUNNING_LOCAL=true and LOCAL_CACHE=true.
|
||||
# We run a single-node Redis Cluster (no TLS) and assign all slots on startup.
|
||||
command: ["/bin/sh", "-lc"]
|
||||
args:
|
||||
- |
|
||||
exec redis-server \
|
||||
--save "" \
|
||||
--appendonly no \
|
||||
--protected-mode no \
|
||||
--bind 0.0.0.0 \
|
||||
--cluster-enabled yes \
|
||||
--cluster-config-file /data/nodes.conf \
|
||||
--cluster-node-timeout 5000 \
|
||||
--cluster-require-full-coverage no \
|
||||
--cluster-announce-ip "${POD_IP}" \
|
||||
--cluster-announce-port 6379 \
|
||||
--cluster-announce-bus-port 16379
|
||||
lifecycle:
|
||||
postStart:
|
||||
exec:
|
||||
command:
|
||||
- /bin/sh
|
||||
- -lc
|
||||
- |
|
||||
set -e
|
||||
for i in $(seq 1 60); do
|
||||
redis-cli -h 127.0.0.1 -p 6379 ping >/dev/null 2>&1 && break
|
||||
sleep 1
|
||||
done
|
||||
|
||||
# If cluster is already initialized, do nothing.
|
||||
if redis-cli -h 127.0.0.1 -p 6379 cluster info 2>/dev/null | grep -q 'cluster_slots_assigned:16384'; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Redis 7+ supports CLUSTER ADDSLOTSRANGE.
|
||||
redis-cli -h 127.0.0.1 -p 6379 cluster addslotsrange 0 16383 || true
|
||||
63
kustomize/base/dlob/server-deployment.yaml
Normal file
63
kustomize/base/dlob/server-deployment.yaml
Normal file
@@ -0,0 +1,63 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-server
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "4"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: dlob-server
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: dlob-server
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: gitea-registry
|
||||
containers:
|
||||
- name: server
|
||||
image: gitea.mpabi.pl/trade/trade-dlob-server:sha-8a378b7-lite-l2
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 6969
|
||||
env:
|
||||
- name: RUNNING_LOCAL
|
||||
value: "true"
|
||||
- name: LOCAL_CACHE
|
||||
value: "true"
|
||||
- name: ENV
|
||||
value: mainnet-beta
|
||||
- name: PORT
|
||||
value: "6969"
|
||||
- name: ELASTICACHE_HOST
|
||||
value: dlob-redis
|
||||
- name: ELASTICACHE_PORT
|
||||
value: "6379"
|
||||
- name: REDIS_CLIENT
|
||||
value: DLOB
|
||||
- name: ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-dlob-rpc
|
||||
key: ENDPOINT
|
||||
- name: WS_ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-dlob-rpc
|
||||
key: WS_ENDPOINT
|
||||
command: ["node", "/lib/serverLite.js"]
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /startup
|
||||
port: http
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 10
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: http
|
||||
initialDelaySeconds: 20
|
||||
periodSeconds: 20
|
||||
13
kustomize/base/dlob/server-service.yaml
Normal file
13
kustomize/base/dlob/server-service.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: dlob-server
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "4"
|
||||
spec:
|
||||
selector:
|
||||
app.kubernetes.io/name: dlob-server
|
||||
ports:
|
||||
- name: http
|
||||
port: 6969
|
||||
targetPort: http
|
||||
@@ -16,7 +16,7 @@ spec:
|
||||
- name: gitea-registry
|
||||
containers:
|
||||
- name: frontend
|
||||
image: rv32i.pl/trade/trade-frontend:sha-6107c4e
|
||||
image: gitea.mpabi.pl/trade/trade-frontend:sha-ca9e44a
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- name: http
|
||||
|
||||
@@ -37,6 +37,8 @@ spec:
|
||||
value: "false"
|
||||
- name: HASURA_GRAPHQL_CORS_DOMAIN
|
||||
value: "http://localhost:5173,http://127.0.0.1:5173"
|
||||
- name: HASURA_GRAPHQL_UNAUTHORIZED_ROLE
|
||||
value: "public"
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /healthz
|
||||
|
||||
@@ -51,7 +51,7 @@ function isAlreadyExistsError(errText) {
|
||||
|
||||
function isMissingRelationError(errText) {
|
||||
const t = String(errText || '').toLowerCase();
|
||||
return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked');
|
||||
return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked') || t.includes('already untracked');
|
||||
}
|
||||
|
||||
function normalizeName(name) {
|
||||
@@ -93,6 +93,10 @@ async function main() {
|
||||
const source = 'default';
|
||||
|
||||
const baseTicks = { schema: 'public', name: 'drift_ticks' };
|
||||
const dlobL2LatestTable = { schema: 'public', name: 'dlob_l2_latest' };
|
||||
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
|
||||
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
|
||||
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
|
||||
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
|
||||
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
|
||||
|
||||
@@ -161,8 +165,139 @@ async function main() {
|
||||
for (const t of tickTables) {
|
||||
await ensureTickTable(t);
|
||||
}
|
||||
|
||||
const ensureDlobTable = async (table, columns) => {
|
||||
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
|
||||
await metadata({ type: 'pg_track_table', args: { source, table } });
|
||||
|
||||
await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } });
|
||||
await metadata({
|
||||
type: 'pg_create_select_permission',
|
||||
args: {
|
||||
source,
|
||||
table,
|
||||
role: 'public',
|
||||
permission: {
|
||||
columns,
|
||||
filter: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
|
||||
await metadata({
|
||||
type: 'pg_create_insert_permission',
|
||||
args: {
|
||||
source,
|
||||
table,
|
||||
role: 'ingestor',
|
||||
permission: {
|
||||
check: {},
|
||||
set: {},
|
||||
columns,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
|
||||
await metadata({
|
||||
type: 'pg_create_update_permission',
|
||||
args: {
|
||||
source,
|
||||
table,
|
||||
role: 'ingestor',
|
||||
permission: {
|
||||
filter: {},
|
||||
check: {},
|
||||
columns,
|
||||
},
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
await ensureDlobTable(dlobL2LatestTable, [
|
||||
'market_name',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'ts',
|
||||
'slot',
|
||||
'mark_price',
|
||||
'oracle_price',
|
||||
'best_bid_price',
|
||||
'best_ask_price',
|
||||
'bids',
|
||||
'asks',
|
||||
'raw',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
await ensureDlobTable(dlobStatsLatestTable, [
|
||||
'market_name',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'ts',
|
||||
'slot',
|
||||
'mark_price',
|
||||
'oracle_price',
|
||||
'best_bid_price',
|
||||
'best_ask_price',
|
||||
'mid_price',
|
||||
'spread_abs',
|
||||
'spread_bps',
|
||||
'depth_levels',
|
||||
'depth_bid_base',
|
||||
'depth_ask_base',
|
||||
'depth_bid_usd',
|
||||
'depth_ask_usd',
|
||||
'imbalance',
|
||||
'raw',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
await ensureDlobTable(dlobDepthBpsLatestTable, [
|
||||
'market_name',
|
||||
'band_bps',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'ts',
|
||||
'slot',
|
||||
'mid_price',
|
||||
'best_bid_price',
|
||||
'best_ask_price',
|
||||
'bid_base',
|
||||
'ask_base',
|
||||
'bid_usd',
|
||||
'ask_usd',
|
||||
'imbalance',
|
||||
'raw',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
await ensureDlobTable(dlobSlippageLatestTable, [
|
||||
'market_name',
|
||||
'side',
|
||||
'size_usd',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'ts',
|
||||
'slot',
|
||||
'mid_price',
|
||||
'best_bid_price',
|
||||
'best_ask_price',
|
||||
'vwap_price',
|
||||
'worst_price',
|
||||
'filled_usd',
|
||||
'filled_base',
|
||||
'impact_bps',
|
||||
'levels_consumed',
|
||||
'fill_pct',
|
||||
'raw',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
// Return table type for candle functions (needed for Hasura to track the function).
|
||||
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });
|
||||
|
||||
try {
|
||||
await metadata({ type: 'pg_track_table', args: { source, table: apiTokensTable } });
|
||||
} catch (err) {
|
||||
|
||||
@@ -3,7 +3,9 @@ kind: Job
|
||||
metadata:
|
||||
name: hasura-bootstrap
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "2"
|
||||
argocd.argoproj.io/hook: Sync
|
||||
argocd.argoproj.io/hook-delete-policy: BeforeHookCreation,HookSucceeded
|
||||
argocd.argoproj.io/sync-wave: "3"
|
||||
spec:
|
||||
backoffLimit: 5
|
||||
template:
|
||||
|
||||
@@ -16,7 +16,7 @@ spec:
|
||||
- name: gitea-registry
|
||||
containers:
|
||||
- name: ingestor
|
||||
image: rv32i.pl/trade/trade-ingestor:k3s-20260106013603
|
||||
image: gitea.mpabi.pl/trade/trade-ingestor:k3s-20260106013603
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: MARKET_NAME
|
||||
|
||||
@@ -163,3 +163,111 @@ AS $$
|
||||
ORDER BY bucket DESC
|
||||
LIMIT p_limit;
|
||||
$$;
|
||||
|
||||
-- Latest DLOB orderbook snapshots (top-N levels), per market.
|
||||
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
|
||||
market_name TEXT PRIMARY KEY,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
ts BIGINT,
|
||||
slot BIGINT,
|
||||
mark_price NUMERIC,
|
||||
oracle_price NUMERIC,
|
||||
best_bid_price NUMERIC,
|
||||
best_ask_price NUMERIC,
|
||||
bids JSONB,
|
||||
asks JSONB,
|
||||
raw JSONB,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
|
||||
ON public.dlob_l2_latest (updated_at DESC);
|
||||
|
||||
-- Derived stats for fast UI display.
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
|
||||
market_name TEXT PRIMARY KEY,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
ts BIGINT,
|
||||
slot BIGINT,
|
||||
mark_price NUMERIC,
|
||||
oracle_price NUMERIC,
|
||||
best_bid_price NUMERIC,
|
||||
best_ask_price NUMERIC,
|
||||
mid_price NUMERIC,
|
||||
spread_abs NUMERIC,
|
||||
spread_bps NUMERIC,
|
||||
depth_levels INTEGER,
|
||||
depth_bid_base NUMERIC,
|
||||
depth_ask_base NUMERIC,
|
||||
depth_bid_usd NUMERIC,
|
||||
depth_ask_usd NUMERIC,
|
||||
imbalance NUMERIC,
|
||||
raw JSONB,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
|
||||
ON public.dlob_stats_latest (updated_at DESC);
|
||||
|
||||
-- Depth snapshots within bps bands around mid-price (per market, per band).
|
||||
-- Filled by a derived worker that reads `dlob_l2_latest`.
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
|
||||
market_name TEXT NOT NULL,
|
||||
band_bps INTEGER NOT NULL,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
ts BIGINT,
|
||||
slot BIGINT,
|
||||
mid_price NUMERIC,
|
||||
best_bid_price NUMERIC,
|
||||
best_ask_price NUMERIC,
|
||||
bid_base NUMERIC,
|
||||
ask_base NUMERIC,
|
||||
bid_usd NUMERIC,
|
||||
ask_usd NUMERIC,
|
||||
imbalance NUMERIC,
|
||||
raw JSONB,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (market_name, band_bps)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
|
||||
ON public.dlob_depth_bps_latest (updated_at DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
|
||||
ON public.dlob_depth_bps_latest (market_name);
|
||||
|
||||
-- Slippage/impact estimates for "market" orders at common USD sizes.
|
||||
-- Filled by a derived worker that reads `dlob_l2_latest`.
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
|
||||
market_name TEXT NOT NULL,
|
||||
side TEXT NOT NULL,
|
||||
size_usd INTEGER NOT NULL,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
ts BIGINT,
|
||||
slot BIGINT,
|
||||
mid_price NUMERIC,
|
||||
best_bid_price NUMERIC,
|
||||
best_ask_price NUMERIC,
|
||||
vwap_price NUMERIC,
|
||||
worst_price NUMERIC,
|
||||
filled_usd NUMERIC,
|
||||
filled_base NUMERIC,
|
||||
impact_bps NUMERIC,
|
||||
levels_consumed INTEGER,
|
||||
fill_pct NUMERIC,
|
||||
raw JSONB,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (market_name, side, size_usd),
|
||||
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
|
||||
ON public.dlob_slippage_latest (updated_at DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
|
||||
ON public.dlob_slippage_latest (market_name);
|
||||
|
||||
@@ -5,6 +5,7 @@ resources:
|
||||
- cm-trade-deploy.yaml
|
||||
- postgres/service.yaml
|
||||
- postgres/statefulset.yaml
|
||||
- postgres/job-migrate.yaml
|
||||
- hasura/service.yaml
|
||||
- hasura/deployment.yaml
|
||||
- hasura/job-bootstrap.yaml
|
||||
@@ -13,6 +14,13 @@ resources:
|
||||
- ingestor/deployment.yaml
|
||||
- frontend/service.yaml
|
||||
- frontend/deployment.yaml
|
||||
- dlob/redis.yaml
|
||||
- dlob/publisher-deployment.yaml
|
||||
- dlob/server-service.yaml
|
||||
- dlob/server-deployment.yaml
|
||||
- dlob-worker/deployment.yaml
|
||||
- dlob-depth-worker/deployment.yaml
|
||||
- dlob-slippage-worker/deployment.yaml
|
||||
|
||||
configMapGenerator:
|
||||
- name: postgres-initdb
|
||||
@@ -21,6 +29,15 @@ configMapGenerator:
|
||||
- name: hasura-bootstrap-script
|
||||
files:
|
||||
- hasura/hasura-bootstrap.mjs
|
||||
- name: dlob-worker-script
|
||||
files:
|
||||
- dlob-worker/worker.mjs
|
||||
- name: dlob-depth-worker-script
|
||||
files:
|
||||
- dlob-depth-worker/worker.mjs
|
||||
- name: dlob-slippage-worker-script
|
||||
files:
|
||||
- dlob-slippage-worker/worker.mjs
|
||||
|
||||
generatorOptions:
|
||||
disableNameSuffixHash: true
|
||||
|
||||
36
kustomize/base/postgres/job-migrate.yaml
Normal file
36
kustomize/base/postgres/job-migrate.yaml
Normal file
@@ -0,0 +1,36 @@
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: postgres-migrate
|
||||
annotations:
|
||||
argocd.argoproj.io/hook: Sync
|
||||
argocd.argoproj.io/hook-delete-policy: BeforeHookCreation,HookSucceeded
|
||||
argocd.argoproj.io/sync-wave: "2"
|
||||
spec:
|
||||
backoffLimit: 3
|
||||
template:
|
||||
spec:
|
||||
restartPolicy: OnFailure
|
||||
containers:
|
||||
- name: postgres-migrate
|
||||
image: postgres:16
|
||||
imagePullPolicy: IfNotPresent
|
||||
envFrom:
|
||||
- secretRef:
|
||||
name: trade-postgres
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
export PGPASSWORD="$POSTGRES_PASSWORD"
|
||||
until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done
|
||||
psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -f /migrations/001_init.sql
|
||||
volumeMounts:
|
||||
- name: migrations
|
||||
mountPath: /migrations/001_init.sql
|
||||
subPath: 001_init.sql
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: migrations
|
||||
configMap:
|
||||
name: postgres-initdb
|
||||
219
kustomize/overlays/staging/dlob-ingestor.mjs
Normal file
219
kustomize/overlays/staging/dlob-ingestor.mjs
Normal file
@@ -0,0 +1,219 @@
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function envString(name, fallback) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const s = String(v).trim();
|
||||
return s ? s : fallback;
|
||||
}
|
||||
|
||||
function envInt(name, fallback, { min, max } = {}) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const n = Number.parseInt(String(v), 10);
|
||||
if (!Number.isFinite(n)) return fallback;
|
||||
const low = typeof min === 'number' ? min : n;
|
||||
const high = typeof max === 'number' ? max : n;
|
||||
return Math.max(low, Math.min(high, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function toIntOrNull(v) {
|
||||
if (v == null) return null;
|
||||
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
|
||||
if (typeof v === 'string') {
|
||||
const s = v.trim();
|
||||
if (!s) return null;
|
||||
const n = Number.parseInt(s, 10);
|
||||
return Number.isFinite(n) ? n : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(v) {
|
||||
if (v == null) return null;
|
||||
if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null;
|
||||
if (typeof v === 'string') {
|
||||
const s = v.trim();
|
||||
return s ? s : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function isoFromEpochMs(v) {
|
||||
const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN;
|
||||
if (!Number.isFinite(n) || n <= 0) return null;
|
||||
const d = new Date(n);
|
||||
const ms = d.getTime();
|
||||
if (!Number.isFinite(ms)) return null;
|
||||
return d.toISOString();
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql');
|
||||
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
|
||||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
|
||||
const source = envString('TICKS_SOURCE', 'dlob_stats');
|
||||
|
||||
return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source };
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
|
||||
},
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
|
||||
let json;
|
||||
try {
|
||||
json = JSON.parse(text);
|
||||
} catch {
|
||||
throw new Error(`Hasura: invalid json: ${text}`);
|
||||
}
|
||||
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
async function fetchStats(cfg) {
|
||||
const query = `
|
||||
query DlobStatsLatest($markets: [String!]!) {
|
||||
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
|
||||
market_name
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
oracle_price
|
||||
mark_price
|
||||
mid_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
updated_at
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
const rows = Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : [];
|
||||
return rows;
|
||||
}
|
||||
|
||||
async function insertTicks(cfg, objects) {
|
||||
if (!objects.length) return 0;
|
||||
|
||||
const mutation = `
|
||||
mutation InsertTicks($objects: [drift_ticks_insert_input!]!) {
|
||||
insert_drift_ticks(objects: $objects) { affected_rows }
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, mutation, { objects });
|
||||
return Number(data?.insert_drift_ticks?.affected_rows || 0);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastUpdatedAtByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'trade-ingestor',
|
||||
mode: 'dlob_stats_ticks',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
source: cfg.source,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const rows = await fetchStats(cfg);
|
||||
const nowIso = getIsoNow();
|
||||
|
||||
const objects = [];
|
||||
for (const r of rows) {
|
||||
const marketName = String(r?.market_name || '').trim();
|
||||
if (!marketName) continue;
|
||||
|
||||
const updatedAt = r?.updated_at ? String(r.updated_at) : '';
|
||||
if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue;
|
||||
if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt);
|
||||
|
||||
const marketIndex = toIntOrNull(r?.market_index) ?? 0;
|
||||
const dlobIso = isoFromEpochMs(r?.ts);
|
||||
const tsIso = dlobIso || nowIso;
|
||||
|
||||
const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price);
|
||||
const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice;
|
||||
if (!oraclePrice) continue;
|
||||
|
||||
objects.push({
|
||||
ts: tsIso,
|
||||
market_index: marketIndex,
|
||||
symbol: marketName,
|
||||
oracle_price: oraclePrice,
|
||||
mark_price: markPrice,
|
||||
oracle_slot: r?.slot == null ? null : String(r.slot),
|
||||
source: cfg.source,
|
||||
raw: {
|
||||
from: 'dlob_stats_latest',
|
||||
market_name: marketName,
|
||||
market_index: marketIndex,
|
||||
dlob: {
|
||||
ts: r?.ts ?? null,
|
||||
slot: r?.slot ?? null,
|
||||
best_bid_price: r?.best_bid_price ?? null,
|
||||
best_ask_price: r?.best_ask_price ?? null,
|
||||
mid_price: r?.mid_price ?? null,
|
||||
updated_at: updatedAt || null,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const inserted = await insertTicks(cfg, objects);
|
||||
if (inserted) {
|
||||
console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[dlob-ticks] error: ${String(err?.message || err)}`);
|
||||
await sleep(2_000);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
16
kustomize/overlays/staging/dlob-worker-patch.yaml
Normal file
16
kustomize/overlays/staging/dlob-worker-patch.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dlob-worker
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
hostNetwork: true
|
||||
dnsPolicy: ClusterFirstWithHostNet
|
||||
containers:
|
||||
- name: worker
|
||||
env:
|
||||
- name: DLOB_HTTP_URL
|
||||
value: https://dlob.drift.trade
|
||||
- name: DLOB_FORCE_IPV6
|
||||
value: "true"
|
||||
487
kustomize/overlays/staging/fake-ingestor.mjs
Normal file
487
kustomize/overlays/staging/fake-ingestor.mjs
Normal file
@@ -0,0 +1,487 @@
|
||||
import fs from 'node:fs';
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function envString(name, fallback) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const s = String(v).trim();
|
||||
return s ? s : fallback;
|
||||
}
|
||||
|
||||
function envNumber(name, fallback) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const n = Number(v);
|
||||
return Number.isFinite(n) ? n : fallback;
|
||||
}
|
||||
|
||||
function envInt(name, fallback, { min, max } = {}) {
|
||||
const v = process.env[name];
|
||||
if (v == null) return fallback;
|
||||
const n = Number.parseInt(String(v), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
const nn = Math.max(min ?? n, Math.min(max ?? n, n));
|
||||
return nn;
|
||||
}
|
||||
|
||||
function readJson(filePath) {
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function readTokenFromFile(filePath) {
|
||||
const json = readJson(filePath);
|
||||
const raw = json?.token || json?.jwt || json?.authToken;
|
||||
const tok = typeof raw === 'string' ? raw.trim() : '';
|
||||
return tok ? tok : undefined;
|
||||
}
|
||||
|
||||
function urlWithPath(baseUrl, path) {
|
||||
const u = new URL(baseUrl);
|
||||
const basePath = u.pathname && u.pathname !== '/' ? u.pathname.replace(/\/$/, '') : '';
|
||||
u.pathname = `${basePath}${path}`;
|
||||
return u;
|
||||
}
|
||||
|
||||
async function httpJson(url, options) {
|
||||
const res = await fetch(url, options);
|
||||
const text = await res.text();
|
||||
let json;
|
||||
try {
|
||||
json = JSON.parse(text);
|
||||
} catch {
|
||||
json = undefined;
|
||||
}
|
||||
return { ok: res.ok, status: res.status, json, text };
|
||||
}
|
||||
|
||||
function mean(values) {
|
||||
if (!values.length) return 0;
|
||||
return values.reduce((a, b) => a + b, 0) / values.length;
|
||||
}
|
||||
|
||||
function stddev(values) {
|
||||
if (!values.length) return 0;
|
||||
const m = mean(values);
|
||||
const v = values.reduce((acc, x) => acc + (x - m) * (x - m), 0) / values.length;
|
||||
return Math.sqrt(v);
|
||||
}
|
||||
|
||||
function quantile(values, q) {
|
||||
if (!values.length) return 0;
|
||||
const sorted = values.slice().sort((a, b) => a - b);
|
||||
const pos = (sorted.length - 1) * q;
|
||||
const base = Math.floor(pos);
|
||||
const rest = pos - base;
|
||||
if (sorted[base + 1] == null) return sorted[base];
|
||||
return sorted[base] + rest * (sorted[base + 1] - sorted[base]);
|
||||
}
|
||||
|
||||
function randn() {
|
||||
// Box–Muller
|
||||
let u = 0;
|
||||
let v = 0;
|
||||
while (u === 0) u = Math.random();
|
||||
while (v === 0) v = Math.random();
|
||||
return Math.sqrt(-2 * Math.log(u)) * Math.cos(2 * Math.PI * v);
|
||||
}
|
||||
|
||||
function clamp(v, min, max) {
|
||||
if (v < min) return min;
|
||||
if (v > max) return max;
|
||||
return v;
|
||||
}
|
||||
|
||||
function isTruthy(value) {
|
||||
const v = String(value ?? '')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
if (!v) return false;
|
||||
return !['0', 'false', 'off', 'no', 'none', 'disabled'].includes(v);
|
||||
}
|
||||
|
||||
function parsePriceFromTick(t) {
|
||||
const mp = t?.mark_price ?? t?.markPrice;
|
||||
const op = t?.oracle_price ?? t?.oraclePrice ?? t?.price;
|
||||
const v = mp != null ? Number(mp) : Number(op);
|
||||
return Number.isFinite(v) && v > 0 ? v : null;
|
||||
}
|
||||
|
||||
function computeLogReturns(prices) {
|
||||
const out = [];
|
||||
for (let i = 1; i < prices.length; i++) {
|
||||
const a = prices[i - 1];
|
||||
const b = prices[i];
|
||||
if (!(a > 0) || !(b > 0)) continue;
|
||||
const r = Math.log(b / a);
|
||||
if (Number.isFinite(r)) out.push(r);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
class BlockSampler {
|
||||
#values;
|
||||
#minBlock;
|
||||
#maxBlock;
|
||||
#idx = 0;
|
||||
#end = 0;
|
||||
#remaining = 0;
|
||||
|
||||
constructor(values, { minBlock, maxBlock }) {
|
||||
this.#values = values.slice();
|
||||
this.#minBlock = Math.max(1, minBlock);
|
||||
this.#maxBlock = Math.max(this.#minBlock, maxBlock);
|
||||
this.#startNewBlock();
|
||||
}
|
||||
|
||||
#startNewBlock() {
|
||||
const n = this.#values.length;
|
||||
if (n <= 1) {
|
||||
this.#idx = 0;
|
||||
this.#end = n;
|
||||
this.#remaining = n;
|
||||
return;
|
||||
}
|
||||
|
||||
const start = Math.floor(Math.random() * n);
|
||||
const span = this.#maxBlock - this.#minBlock + 1;
|
||||
const len = this.#minBlock + Math.floor(Math.random() * span);
|
||||
|
||||
this.#idx = start;
|
||||
this.#end = Math.min(n, start + len);
|
||||
this.#remaining = this.#end - this.#idx;
|
||||
}
|
||||
|
||||
next() {
|
||||
if (this.#values.length === 0) return 0;
|
||||
if (this.#remaining <= 0 || this.#idx >= this.#end) this.#startNewBlock();
|
||||
|
||||
const v = this.#values[this.#idx];
|
||||
this.#idx += 1;
|
||||
this.#remaining -= 1;
|
||||
|
||||
return Number.isFinite(v) ? v : 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchSeedTicksPage({ apiBase, readToken, symbol, source, limit, to }) {
|
||||
const u = urlWithPath(apiBase, '/v1/ticks');
|
||||
u.searchParams.set('symbol', symbol);
|
||||
u.searchParams.set('limit', String(limit));
|
||||
if (source) u.searchParams.set('source', source);
|
||||
if (to) u.searchParams.set('to', to);
|
||||
|
||||
const res = await httpJson(u.toString(), {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
authorization: `Bearer ${readToken}`,
|
||||
},
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`seed_ticks_http_${res.status}: ${res.text}`);
|
||||
if (!res.json?.ok) throw new Error(`seed_ticks_error: ${res.json?.error || res.text}`);
|
||||
|
||||
const ticks = Array.isArray(res.json?.ticks) ? res.json.ticks : [];
|
||||
return ticks;
|
||||
}
|
||||
|
||||
async function fetchSeedTicksPaged({ apiBase, readToken, symbol, source, desiredLimit, pageLimit, maxPages }) {
|
||||
const pages = [];
|
||||
let cursorTo = null;
|
||||
|
||||
for (let page = 0; page < maxPages; page++) {
|
||||
const remaining = desiredLimit - pages.reduce((acc, p) => acc + p.length, 0);
|
||||
if (remaining <= 0) break;
|
||||
|
||||
const limit = Math.min(pageLimit, remaining);
|
||||
const ticks = await fetchSeedTicksPage({ apiBase, readToken, symbol, source, limit, to: cursorTo });
|
||||
if (!ticks.length) break;
|
||||
|
||||
// Server returns ascending ticks; we unshift to keep overall chronological order.
|
||||
pages.unshift(ticks);
|
||||
|
||||
const oldestTs = String(ticks[0]?.ts || '').trim();
|
||||
if (!oldestTs) break;
|
||||
const oldestMs = Date.parse(oldestTs);
|
||||
if (!Number.isFinite(oldestMs)) break;
|
||||
cursorTo = new Date(oldestMs - 1).toISOString();
|
||||
}
|
||||
|
||||
const flat = pages.flat();
|
||||
if (!flat.length) return flat;
|
||||
|
||||
// Best-effort de-duplication (in case of overlapping `to` bounds).
|
||||
const seen = new Set();
|
||||
const out = [];
|
||||
for (const t of flat) {
|
||||
const ts = String(t?.ts || '');
|
||||
const key = ts ? ts : JSON.stringify(t);
|
||||
if (seen.has(key)) continue;
|
||||
seen.add(key);
|
||||
out.push(t);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function ingestTick({ apiBase, writeToken, tick }) {
|
||||
const u = urlWithPath(apiBase, '/v1/ingest/tick');
|
||||
const res = await httpJson(u.toString(), {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
authorization: `Bearer ${writeToken}`,
|
||||
},
|
||||
body: JSON.stringify(tick),
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`ingest_http_${res.status}: ${res.text}`);
|
||||
if (!res.json?.ok) throw new Error(`ingest_error: ${res.json?.error || res.text}`);
|
||||
return res.json?.id || null;
|
||||
}
|
||||
|
||||
function formatNumeric(value) {
|
||||
if (!Number.isFinite(value)) return '0';
|
||||
// Avoid scientific notation for small values while keeping reasonable precision.
|
||||
const abs = Math.abs(value);
|
||||
if (abs === 0) return '0';
|
||||
if (abs >= 1) return value.toFixed(8).replace(/\.?0+$/, '');
|
||||
if (abs >= 0.01) return value.toFixed(10).replace(/\.?0+$/, '');
|
||||
if (abs >= 0.0001) return value.toFixed(12).replace(/\.?0+$/, '');
|
||||
return value.toFixed(16).replace(/\.?0+$/, '');
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const apiBase = envString('INGEST_API_URL', 'http://trade-api:8787');
|
||||
const symbol = envString('MARKET_NAME', envString('SYMBOL', 'PUMP-PERP'));
|
||||
const source = envString('SOURCE', 'drift_oracle');
|
||||
const intervalMs = envInt('INTERVAL_MS', 1000, { min: 50, max: 60_000 });
|
||||
|
||||
const readTokenFile = envString('FAKE_READ_TOKEN_FILE', envString('READ_TOKEN_FILE', '/tokens/read.json'));
|
||||
const writeTokenFile = envString('FAKE_WRITE_TOKEN_FILE', envString('WRITE_TOKEN_FILE', '/app/tokens/alg.json'));
|
||||
|
||||
const seedLimitDesired = envInt('FAKE_SEED_LIMIT', 50_000, { min: 50, max: 200_000 });
|
||||
const seedPageLimit = envInt('FAKE_SEED_PAGE_LIMIT', 5000, { min: 50, max: 5000 });
|
||||
const seedMaxPages = envInt(
|
||||
'FAKE_SEED_MAX_PAGES',
|
||||
Math.ceil(seedLimitDesired / seedPageLimit) + 2,
|
||||
{ min: 1, max: 200 }
|
||||
);
|
||||
const seedSourceRaw = process.env.FAKE_SEED_SOURCE;
|
||||
const seedSource = seedSourceRaw == null ? source : String(seedSourceRaw).trim();
|
||||
|
||||
const minBlock = envInt('FAKE_BLOCK_MIN', 120, { min: 1, max: 50_000 });
|
||||
const maxBlock = envInt('FAKE_BLOCK_MAX', 1200, { min: 1, max: 50_000 });
|
||||
|
||||
const volScale = envNumber('FAKE_VOL_SCALE', 1);
|
||||
const noiseScale = envNumber('FAKE_NOISE_SCALE', 0.05);
|
||||
const meanReversion = envNumber('FAKE_MEAN_REVERSION', 0.0002);
|
||||
const markNoiseBps = envNumber('FAKE_MARK_NOISE_BPS', 5);
|
||||
const logEvery = envInt('FAKE_LOG_EVERY', 30, { min: 1, max: 10_000 });
|
||||
|
||||
const marketIndexEnv = process.env.MARKET_INDEX;
|
||||
const marketIndexFallback = Number.isInteger(Number(marketIndexEnv)) ? Number(marketIndexEnv) : 0;
|
||||
const startPriceEnv = envNumber('FAKE_START_PRICE', 0);
|
||||
|
||||
const clampEnabled = isTruthy(process.env.FAKE_CLAMP ?? '1');
|
||||
const clampQLow = clamp(envNumber('FAKE_CLAMP_Q_LOW', 0.05), 0.0, 0.49);
|
||||
const clampQHigh = clamp(envNumber('FAKE_CLAMP_Q_HIGH', 0.95), 0.51, 1.0);
|
||||
const clampLowMult = envNumber('FAKE_CLAMP_LOW_MULT', 0.8);
|
||||
const clampHighMult = envNumber('FAKE_CLAMP_HIGH_MULT', 1.2);
|
||||
|
||||
const readToken = readTokenFromFile(readTokenFile);
|
||||
const writeToken = readTokenFromFile(writeTokenFile);
|
||||
if (!writeToken) throw new Error(`Missing write token (expected JSON token at ${writeTokenFile})`);
|
||||
|
||||
let seedTicks = [];
|
||||
let seedPrices = [];
|
||||
let marketIndex = marketIndexFallback;
|
||||
let seedFromTs = null;
|
||||
let seedToTs = null;
|
||||
|
||||
if (!readToken) {
|
||||
console.warn(`[fake-ingestor] No read token at ${readTokenFile}; running without seed data.`);
|
||||
} else {
|
||||
seedTicks = await fetchSeedTicksPaged({
|
||||
apiBase,
|
||||
readToken,
|
||||
symbol,
|
||||
source: seedSource ? seedSource : undefined,
|
||||
desiredLimit: seedLimitDesired,
|
||||
pageLimit: seedPageLimit,
|
||||
maxPages: seedMaxPages,
|
||||
});
|
||||
|
||||
seedFromTs = seedTicks.length ? String(seedTicks[0]?.ts || '') : null;
|
||||
seedToTs = seedTicks.length ? String(seedTicks[seedTicks.length - 1]?.ts || '') : null;
|
||||
|
||||
for (const t of seedTicks) {
|
||||
const p = parsePriceFromTick(t);
|
||||
if (p != null) seedPrices.push(p);
|
||||
}
|
||||
|
||||
const lastTick = seedTicks.length ? seedTicks[seedTicks.length - 1] : null;
|
||||
const mi = lastTick?.market_index ?? lastTick?.marketIndex;
|
||||
if (Number.isInteger(Number(mi))) marketIndex = Number(mi);
|
||||
}
|
||||
|
||||
if (!seedPrices.length && startPriceEnv > 0) {
|
||||
seedPrices = [startPriceEnv];
|
||||
}
|
||||
if (!seedPrices.length) {
|
||||
throw new Error(
|
||||
'No seed prices available. Provide FAKE_START_PRICE (and optionally MARKET_INDEX) or mount a read token to seed from /v1/ticks.'
|
||||
);
|
||||
}
|
||||
|
||||
const returnsRaw = computeLogReturns(seedPrices);
|
||||
const mu = mean(returnsRaw);
|
||||
const sigma = stddev(returnsRaw);
|
||||
const center = envString('FAKE_CENTER_RETURNS', '1') !== '0';
|
||||
const returns = returnsRaw.length
|
||||
? center
|
||||
? returnsRaw.map((r) => r - mu)
|
||||
: returnsRaw
|
||||
: [];
|
||||
|
||||
const sampler = new BlockSampler(returns.length ? returns : [0], { minBlock, maxBlock });
|
||||
|
||||
const pLow = quantile(seedPrices, clampQLow);
|
||||
const p50 = quantile(seedPrices, 0.5);
|
||||
const pHigh = quantile(seedPrices, clampQHigh);
|
||||
const clampMin = pLow > 0 ? pLow * clampLowMult : Math.min(...seedPrices) * clampLowMult;
|
||||
const clampMax = pHigh > 0 ? pHigh * clampHighMult : Math.max(...seedPrices) * clampHighMult;
|
||||
|
||||
let logPrice = Math.log(seedPrices[seedPrices.length - 1]);
|
||||
const targetLog = Math.log(p50 > 0 ? p50 : seedPrices[seedPrices.length - 1]);
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'trade-fake-ingestor',
|
||||
apiBase,
|
||||
symbol,
|
||||
source,
|
||||
intervalMs,
|
||||
marketIndex,
|
||||
seed: {
|
||||
ok: Boolean(seedTicks.length),
|
||||
desiredLimit: seedLimitDesired,
|
||||
pageLimit: seedPageLimit,
|
||||
maxPages: seedMaxPages,
|
||||
source: seedSource || null,
|
||||
ticks: seedTicks.length,
|
||||
prices: seedPrices.length,
|
||||
fromTs: seedFromTs,
|
||||
toTs: seedToTs,
|
||||
},
|
||||
model: {
|
||||
type: 'block-bootstrap-returns',
|
||||
centerReturns: center,
|
||||
mu,
|
||||
sigma,
|
||||
volScale,
|
||||
noiseScale,
|
||||
meanReversion,
|
||||
block: { minBlock, maxBlock },
|
||||
clamp: clampEnabled ? { min: clampMin, max: clampMax } : { disabled: true },
|
||||
},
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
let stopping = false;
|
||||
process.on('SIGINT', () => {
|
||||
stopping = true;
|
||||
});
|
||||
process.on('SIGTERM', () => {
|
||||
stopping = true;
|
||||
});
|
||||
|
||||
let tickCount = 0;
|
||||
let nextAt = Date.now();
|
||||
|
||||
while (!stopping) {
|
||||
const now = Date.now();
|
||||
if (now < nextAt) await sleep(nextAt - now);
|
||||
nextAt += intervalMs;
|
||||
|
||||
const r = sampler.next();
|
||||
const noise = (Number.isFinite(sigma) ? sigma : 0) * noiseScale * randn();
|
||||
const revert = Number.isFinite(meanReversion) ? meanReversion * (targetLog - logPrice) : 0;
|
||||
const step = (Number.isFinite(r) ? r : 0) * volScale + noise + revert;
|
||||
|
||||
logPrice += step;
|
||||
let oraclePrice = Math.exp(logPrice);
|
||||
if (clampEnabled && Number.isFinite(clampMin) && Number.isFinite(clampMax) && clampMax > clampMin) {
|
||||
oraclePrice = clamp(oraclePrice, clampMin, clampMax);
|
||||
logPrice = Math.log(oraclePrice);
|
||||
}
|
||||
|
||||
const markNoise = (markNoiseBps / 10_000) * randn();
|
||||
const markPrice = oraclePrice * (1 + markNoise);
|
||||
|
||||
const ts = new Date().toISOString();
|
||||
const tick = {
|
||||
ts,
|
||||
market_index: marketIndex,
|
||||
symbol,
|
||||
oracle_price: formatNumeric(oraclePrice),
|
||||
mark_price: formatNumeric(markPrice),
|
||||
source,
|
||||
raw: {
|
||||
fake: true,
|
||||
model: 'block-bootstrap-returns',
|
||||
seeded: seedTicks.length
|
||||
? {
|
||||
symbol,
|
||||
source: seedSource || null,
|
||||
desiredLimit: seedLimitDesired,
|
||||
pageLimit: seedPageLimit,
|
||||
maxPages: seedMaxPages,
|
||||
fromTs: seedFromTs,
|
||||
toTs: seedToTs,
|
||||
}
|
||||
: {
|
||||
symbol,
|
||||
source: null,
|
||||
desiredLimit: 0,
|
||||
pageLimit: seedPageLimit,
|
||||
maxPages: seedMaxPages,
|
||||
fromTs: null,
|
||||
toTs: null,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
await ingestTick({ apiBase, writeToken, tick });
|
||||
tickCount += 1;
|
||||
if (tickCount % logEvery === 0) {
|
||||
console.log(
|
||||
`[fake-ingestor] ok ticks=${tickCount} ts=${ts} px=${tick.oracle_price} mark=${tick.mark_price} clamp=[${formatNumeric(
|
||||
clampMin
|
||||
)},${formatNumeric(clampMax)}]`
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn(`[fake-ingestor] ingest failed: ${String(err?.message || err)}`);
|
||||
await sleep(Math.min(5000, Math.max(250, intervalMs)));
|
||||
}
|
||||
}
|
||||
|
||||
console.log('[fake-ingestor] stopped');
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
24
kustomize/overlays/staging/frontend-ingress-root.yaml
Normal file
24
kustomize/overlays/staging/frontend-ingress-root.yaml
Normal file
@@ -0,0 +1,24 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: trade-frontend-root
|
||||
annotations:
|
||||
cert-manager.io/cluster-issuer: letsencrypt-prod
|
||||
traefik.ingress.kubernetes.io/router.entrypoints: websecure
|
||||
spec:
|
||||
ingressClassName: traefik
|
||||
tls:
|
||||
- hosts:
|
||||
- mpabi.pl
|
||||
secretName: mpabi-pl-tls
|
||||
rules:
|
||||
- host: mpabi.pl
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: trade-frontend
|
||||
port:
|
||||
number: 8081
|
||||
36
kustomize/overlays/staging/ingestor-dlob-patch.yaml
Normal file
36
kustomize/overlays/staging/ingestor-dlob-patch.yaml
Normal file
@@ -0,0 +1,36 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: trade-ingestor
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: ingestor
|
||||
image: node:20-slim
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: HASURA_GRAPHQL_URL
|
||||
value: http://hasura:8080/v1/graphql
|
||||
- name: HASURA_ADMIN_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-hasura
|
||||
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||
- name: DLOB_MARKETS
|
||||
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
|
||||
- name: TICKS_POLL_MS
|
||||
value: "1000"
|
||||
- name: TICKS_SOURCE
|
||||
value: "dlob_stats"
|
||||
command: ["node"]
|
||||
args: ["/opt/dlob/dlob-ingestor.mjs"]
|
||||
volumeMounts:
|
||||
- name: dlob-script
|
||||
mountPath: /opt/dlob/dlob-ingestor.mjs
|
||||
subPath: dlob-ingestor.mjs
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: dlob-script
|
||||
configMap:
|
||||
name: trade-dlob-ingestor-script
|
||||
54
kustomize/overlays/staging/ingestor-fake-patch.yaml
Normal file
54
kustomize/overlays/staging/ingestor-fake-patch.yaml
Normal file
@@ -0,0 +1,54 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: trade-ingestor
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: ingestor
|
||||
env:
|
||||
- name: FAKE_READ_TOKEN_FILE
|
||||
value: "/tokens/read.json"
|
||||
- name: FAKE_WRITE_TOKEN_FILE
|
||||
value: "/app/tokens/alg.json"
|
||||
- name: FAKE_SEED_LIMIT
|
||||
value: "50000"
|
||||
- name: FAKE_SEED_PAGE_LIMIT
|
||||
value: "5000"
|
||||
- name: FAKE_SEED_MAX_PAGES
|
||||
value: "20"
|
||||
- name: FAKE_BLOCK_MIN
|
||||
value: "300"
|
||||
- name: FAKE_BLOCK_MAX
|
||||
value: "1800"
|
||||
- name: FAKE_VOL_SCALE
|
||||
value: "1.8"
|
||||
- name: FAKE_NOISE_SCALE
|
||||
value: "0.03"
|
||||
- name: FAKE_MEAN_REVERSION
|
||||
value: "0.0001"
|
||||
- name: FAKE_CLAMP_Q_LOW
|
||||
value: "0.02"
|
||||
- name: FAKE_CLAMP_Q_HIGH
|
||||
value: "0.98"
|
||||
- name: FAKE_CLAMP_LOW_MULT
|
||||
value: "0.7"
|
||||
- name: FAKE_CLAMP_HIGH_MULT
|
||||
value: "1.3"
|
||||
command: ["node"]
|
||||
args: ["/opt/fake/fake-ingestor.mjs"]
|
||||
volumeMounts:
|
||||
- name: fake-script
|
||||
mountPath: /opt/fake
|
||||
readOnly: true
|
||||
- name: read-tokens
|
||||
mountPath: /tokens
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: fake-script
|
||||
configMap:
|
||||
name: trade-fake-ingestor-script
|
||||
- name: read-tokens
|
||||
secret:
|
||||
secretName: trade-frontend-tokens
|
||||
@@ -7,10 +7,21 @@ resources:
|
||||
- ../../base
|
||||
- pgadmin.yaml
|
||||
- frontend-ingress.yaml
|
||||
- frontend-ingress-root.yaml
|
||||
|
||||
patchesStrategicMerge:
|
||||
- hasura-patch.yaml
|
||||
- frontend-auth-patch.yaml
|
||||
- ingestor-dlob-patch.yaml
|
||||
- dlob-worker-patch.yaml
|
||||
|
||||
configMapGenerator:
|
||||
- name: trade-dlob-ingestor-script
|
||||
files:
|
||||
- dlob-ingestor.mjs
|
||||
|
||||
generatorOptions:
|
||||
disableNameSuffixHash: true
|
||||
|
||||
commonLabels:
|
||||
env: staging
|
||||
|
||||
Reference in New Issue
Block a user