Compare commits

...

46 Commits

Author SHA1 Message Date
mpabi
59c3f3ee06 feat(dlob): pin markets and wire mevnode endpoints
- Limit DLOB workers/ingestors to SOL-PERP, DOGE-PERP, JUP-PERP across base and staging config.
- Set publisher market ids to [0,7,24] for drift protocol.
- Add overlay patches for dlob-publisher and dlob-server to use wg0 RPC endpoints 10.66.66.1:8899/8900 in staging and prod.
- Extend Agave dashboard and add PrometheusRules for RPC up/lag/I/O alerts.
- Ensure overlays reference new patches for automated ArgoCD rollouts.
2026-02-15 00:40:50 +01:00
mpabi
9c4c3096d7 chore(grafana): set default time range to now 2026-02-14 14:08:11 +01:00
mpabi
b02bd6b66c feat(grafana): expand agave dashboard with node + geyser stats 2026-02-14 14:01:34 +01:00
mpabi
890ac4c86e feat(monitoring): add mpabi node-exporter scrape and agave dashboard 2026-02-14 13:43:26 +01:00
u1
9c6e974d3a fix(monitoring): add fallback_scrape_protocol for geyser metrics (missing Content-Type) 2026-02-14 11:52:50 +00:00
u1
d1dc32d3bb chore(monitoring): delete traefik basic auth secret 2026-02-14 11:28:40 +00:00
u1
a6a0accd6a chore(monitoring): remove basic auth middleware from prometheus ingressroute 2026-02-14 11:28:39 +00:00
u1
3b8b1f5492 chore(monitoring): delete traefik basic auth middleware 2026-02-14 11:28:38 +00:00
u1
0c80a08732 chore(monitoring): remove basic auth middleware from grafana ingressroute 2026-02-14 11:28:38 +00:00
u1
8b72e62621 chore(monitoring): remove traefik basic auth from monitoring extras 2026-02-14 11:28:37 +00:00
u1
ff7a4b69cd fix(monitoring): disable prometheus-operator admission webhooks for GitOps stability 2026-02-14 11:04:31 +00:00
u1
e7f9594381 fix(monitoring): use Replace sync option to avoid CRD patch annotation size issues 2026-02-14 11:01:03 +00:00
u1
0c0f219d02 fix(monitoring): skip CRDs in ArgoCD helm rendering 2026-02-14 11:00:04 +00:00
u1
9cf0ed84d9 fix(monitoring): disable CRD management in ArgoCD (avoid annotation size limit) 2026-02-14 10:59:32 +00:00
u1
2702edce22 fix(monitoring): keep monitoring-stack as helm release name (avoid duplicate install) 2026-02-14 10:58:11 +00:00
u1
c33533fcd6 fix(monitoring): route prometheus-http to monitoring-stack service 2026-02-14 10:58:10 +00:00
u1
32eb047551 fix(monitoring): route prometheus to monitoring-stack service 2026-02-14 10:58:09 +00:00
u1
d27d64e407 fix(monitoring): route grafana-http to monitoring-stack service 2026-02-14 10:58:08 +00:00
u1
fa6893aa98 fix(monitoring): route grafana to monitoring-stack service 2026-02-14 10:58:08 +00:00
u1
77a8265b40 fix(monitoring): set helm releaseName to reuse existing kube-prometheus-stack resources 2026-02-14 10:55:36 +00:00
u1
c692f8d653 fix(monitoring): use server-side apply to avoid CRD annotation size limit 2026-02-14 10:54:48 +00:00
u1
47096c9877 feat(monitoring): add ArgoCD app for monitoring ingress/certs/auth 2026-02-14 10:53:55 +00:00
u1
0104532e73 feat(monitoring): add ArgoCD app for kube-prometheus-stack 2026-02-14 10:53:55 +00:00
u1
19e7e48190 feat(monitoring): add prometheus ingressroute http->https redirect 2026-02-14 10:53:39 +00:00
u1
7ef3ffe62c feat(monitoring): add prometheus ingressroute (https + basic auth) 2026-02-14 10:53:38 +00:00
u1
1853ef6452 feat(monitoring): add grafana ingressroute http->https redirect 2026-02-14 10:53:38 +00:00
u1
34ef9490a4 feat(monitoring): add grafana ingressroute (https + basic auth) 2026-02-14 10:53:37 +00:00
u1
f3bc3da9bb feat(monitoring): add TLS certificate for grafana/prometheus 2026-02-14 10:53:35 +00:00
u1
f797234abd feat(monitoring): add traefik redirect-to-https middleware 2026-02-14 10:53:35 +00:00
u1
c95a4286fb feat(monitoring): add basic auth secret for traefik 2026-02-14 10:53:34 +00:00
u1
b72f281651 feat(monitoring): add traefik basic auth middleware 2026-02-14 10:53:34 +00:00
u1
98912c5b03 feat(monitoring): add kustomize scaffold for monitoring extras 2026-02-14 10:53:05 +00:00
u1
28876fa1d2 fix(dlob): relax dlob-publisher probes 2026-02-13 12:04:13 +01:00
u1
5f46d26037 feat(frontend): per-user DLOB source header
- Add cookie-based source selector (mevnode|drift)\n- Proxy sets x-hasura-dlob-source for HTTP + WS\n- Include same proxy script mount in prod overlay
2026-02-13 11:33:13 +01:00
u1
57433c7e75 feat(dlob): support two sources + per-user switch
- Add "source" column + composite PKs for DLOB tables\n- Filter public Hasura selects by X-Hasura-Dlob-Source\n- Run parallel workers for mevnode + dlob.drift.trade\n- Frontend proxy sets x-hasura-dlob-source from cookie and injects UI switch
2026-02-13 11:25:32 +01:00
u1
9e7d7b88ac feat(candles): support oracle OHLC basis 2026-02-03 12:51:29 +01:00
u1
bd88eaa3c8 fix(api): keep candle open continuity 2026-02-03 10:27:54 +01:00
u1
cd4cbec7e0 fix(api): fill-forward missing candle buckets 2026-02-02 23:14:18 +01:00
u1
144f6e7c86 fix(db): fill-forward candles buckets 2026-02-02 23:05:04 +01:00
u1
ef8f7cbeaa fix(candles): serve chart from cache and stabilize any-source 2026-02-02 22:28:30 +01:00
codex
507da3165f chore(frontend): bump image sha-b06fe7f 2026-02-01 22:00:55 +01:00
codex
f41792a0fa fix(candles-cache): cap per-tf window to 1024 2026-02-01 19:07:49 +01:00
codex
e16a02453e fix(staging): hoist ensurePublicSelectTable in hasura bootstrap 2026-02-01 18:49:00 +01:00
codex
b46d7d85c6 chore(staging): add candles-cache-worker deployment 2026-02-01 18:13:24 +01:00
codex
e5543f408a feat(staging): add candles-cache-worker and api override 2026-02-01 18:12:26 +01:00
codex
b239f564b2 feat(staging): add candles cache + v2 slippage 2026-02-01 18:12:15 +01:00
44 changed files with 4880 additions and 366 deletions

View File

@@ -0,0 +1,20 @@
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: monitoring-extras
namespace: argocd
spec:
project: default
source:
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
targetRevision: main
path: kustomize/infra/monitoring-extras
destination:
server: https://kubernetes.default.svc
namespace: monitoring
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true

View File

@@ -0,0 +1,50 @@
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: monitoring-stack
namespace: argocd
spec:
project: default
source:
repoURL: https://prometheus-community.github.io/helm-charts
chart: kube-prometheus-stack
targetRevision: 81.6.9
helm:
skipCrds: true
values: |
grafana:
enabled: true
prometheus:
prometheusSpec:
scrapeInterval: 15s
evaluationInterval: 15s
additionalScrapeConfigs:
- job_name: mpabi-yellowstone-geyser
metrics_path: /metrics
scrape_interval: 10s
fallback_scrape_protocol: PrometheusText0.0.4
static_configs:
- targets:
- 10.66.66.1:8999
- job_name: mpabi-node-exporter
metrics_path: /metrics
scrape_interval: 15s
static_configs:
- targets:
- 10.66.66.1:9100
prometheusOperator:
admissionWebhooks:
enabled: false
patch:
enabled: false
destination:
server: https://kubernetes.default.svc
namespace: monitoring
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
- Replace=true
- ServerSideApply=true

View File

@@ -18,6 +18,9 @@ spec:
- name: trade-api-wrapper
configMap:
name: trade-api-wrapper
- name: trade-api-upstream
configMap:
name: trade-api-upstream
containers:
- name: api
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
@@ -31,6 +34,10 @@ spec:
mountPath: /override/wrapper.mjs
subPath: wrapper.mjs
readOnly: true
- name: trade-api-upstream
mountPath: /app/services/api/server.mjs
subPath: server.mjs
readOnly: true
env:
- name: PORT
value: "8787"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: candles-cache-worker
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: candles-cache-worker
template:
metadata:
labels:
app.kubernetes.io/name: candles-cache-worker
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: CANDLES_SYMBOLS
value: SOL-PERP,PUMP-PERP
- name: CANDLES_SOURCES
value: any
- name: CANDLES_TFS
value: 1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d
- name: CANDLES_TARGET_POINTS
value: "1024"
- name: CANDLES_POLL_MS
value: "5000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: candles-cache-worker-script

View File

@@ -0,0 +1,431 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
function clampInt(value, min, max, fallback) {
const n = Number.parseInt(String(value ?? ''), 10);
if (!Number.isInteger(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function envList(name, fallbackCsv) {
const raw = process.env[name] ?? fallbackCsv;
return String(raw)
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
function parseTimeframeToSeconds(tf) {
const v = String(tf || '').trim().toLowerCase();
if (!v) return 60;
const m = v.match(/^(\d+)(s|m|h|d)$/);
if (!m) throw new Error('invalid_tf');
const num = Number.parseInt(m[1], 10);
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
const unit = m[2];
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
return num * mult;
}
function sqlLit(value) {
const s = String(value ?? '');
return `'${s.replace(/'/g, "''")}'`;
}
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
const u = String(graphqlUrlOrBase || '').trim();
if (!u) return 'http://hasura:8080';
// common case: http://hasura:8080/v1/graphql
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
return u.replace(/\/$/, '');
}
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const graphqlUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
// sources: include "any" (sourceKey='') by default.
const rawSources = envList('CANDLES_SOURCES', 'any');
const sources = [];
for (const s of rawSources) {
const v = String(s).trim();
if (!v) continue;
if (v.toLowerCase() === 'any') sources.push('');
else sources.push(v);
}
if (!sources.includes('')) sources.unshift('');
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
const bucketSecondsList = tfList
.map((t) => {
try {
return parseTimeframeToSeconds(t);
} catch {
return null;
}
})
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
}
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
const url = `${cfg.hasuraBaseUrl}/v2/query`;
const body = {
type: 'run_sql',
args: {
source: 'default',
sql,
read_only: Boolean(readOnly),
},
};
const res = await fetch(url, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
body: JSON.stringify(body),
signal: AbortSignal.timeout(60_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
return JSON.parse(text);
}
function chunkSecondsForBucket(bucketSeconds) {
if (bucketSeconds <= 5) return 15 * 60;
if (bucketSeconds <= 60) return 60 * 60;
if (bucketSeconds <= 300) return 6 * 60 * 60;
if (bucketSeconds <= 3600) return 24 * 60 * 60;
return 7 * 24 * 60 * 60;
}
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
return `
WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
),
base AS (
SELECT
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
AND ts >= ${sqlLit(fromIso)}::timestamptz
AND ts < ${sqlLit(toIso)}::timestamptz
AND (
${sqlLit(sourceKey)} <> '' AND source = ${sqlLit(sourceKey)}
OR ${sqlLit(sourceKey)} = '' AND source = COALESCE((SELECT chosen_source FROM chosen), source)
)
),
agg AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open,
max(oracle_px) AS oracle_high,
min(oracle_px) AS oracle_low,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*)::bigint AS ticks
FROM base
GROUP BY bucket
)
INSERT INTO public.drift_candles_cache
(
bucket,
bucket_seconds,
symbol,
source,
open,
high,
low,
close,
oracle_open,
oracle_high,
oracle_low,
oracle_close,
ticks,
updated_at
)
SELECT
bucket,
${bucketSeconds},
${sqlLit(symbol)},
${sqlLit(sourceKey)},
open,
high,
low,
close,
oracle_open,
oracle_high,
oracle_low,
oracle_close,
ticks,
now()
FROM agg
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
oracle_open = EXCLUDED.oracle_open,
oracle_high = EXCLUDED.oracle_high,
oracle_low = EXCLUDED.oracle_low,
oracle_close = EXCLUDED.oracle_close,
ticks = EXCLUDED.ticks,
updated_at = now();
`;
}
function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) {
return `
DELETE FROM public.drift_candles_cache
WHERE symbol = ${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)}
AND bucket_seconds = ${bucketSeconds}
AND bucket < ${sqlLit(cutoffIso)}::timestamptz;
`;
}
async function getTickRange(cfg, { symbol, sourceKey }) {
const sql =
String(sourceKey) === ''
? `
WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
)
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND source = COALESCE((SELECT chosen_source FROM chosen), source);
`
: `
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
if (!row) return { minTs: null, maxTs: null };
const minTs = row[0] ? String(row[0]).trim() : null;
const maxTs = row[1] ? String(row[1]).trim() : null;
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
}
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
const endMs = Date.parse(maxTsIso);
const minMs = minTsIso ? Date.parse(minTsIso) : null;
const wantSpanMs = targetPoints * bucketSeconds * 1000;
const wantFromMs = endMs - wantSpanMs;
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
let fromMs = wantFromMs;
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
if (minMs != null) fromMs = Math.max(fromMs, minMs);
return new Date(Math.max(0, fromMs)).toISOString();
}
function safetyWindowSeconds(bucketSeconds) {
if (bucketSeconds <= 60) return 10 * 60;
if (bucketSeconds <= 300) return 60 * 60;
if (bucketSeconds <= 3600) return 6 * 60 * 60;
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
return 2 * 24 * 60 * 60;
}
async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
const chunk = chunkSecondsForBucket(bucketSeconds);
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
const a = new Date(t).toISOString();
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b }));
}
}
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
const sql = `
SELECT max(bucket) AS max_bucket
FROM public.drift_candles_cache
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
`;
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
// Hasura returns {result_type, result:[[col...],[row...]]}
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
const v = row && row[0] ? String(row[0]) : null;
return v && v.trim() ? v.trim() : null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'candles-cache-worker',
startedAt: getIsoNow(),
hasuraBaseUrl: cfg.hasuraBaseUrl,
symbols: cfg.symbols,
sources: cfg.sources.map((s) => (s ? s : '(any)')),
bucketSecondsList: cfg.bucketSecondsList,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
pollMs: cfg.pollMs,
},
null,
2
)
);
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
const range = await getTickRange(cfg, { symbol, sourceKey });
if (!range.maxTs) continue;
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
for (const bs of cfg.bucketSecondsList) {
const fromIso = desiredFromIso({
minTsIso: range.minTs,
maxTsIso: toIso,
bucketSeconds: bs,
targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays,
});
console.log(
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
);
try {
await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso });
// Enforce max window for this bucket (derived data; safe to prune).
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso }));
} catch (err) {
console.error(
`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
// Prime last buckets.
const last = new Map(); // key -> iso bucket
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
try {
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
if (maxBucket) last.set(k, maxBucket);
} catch {
// ignore
}
}
}
}
while (true) {
const loopNow = Date.now();
const loopIso = new Date(loopNow).toISOString();
const pruneEveryMs = 60_000;
const lastPruneAt = last.__pruneAt || new Map();
// stash on the Map to keep closure-local without introducing a new outer var
last.__pruneAt = lastPruneAt;
for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) {
for (const bs of cfg.bucketSecondsList) {
const k = `${symbol}::${sourceKey}::${bs}`;
const prev = last.get(k);
const safety = safetyWindowSeconds(bs);
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
try {
await hasuraRunSql(
cfg,
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
);
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
last.set(k, loopIso);
const prevPrune = lastPruneAt.get(k) || 0;
if (loopNow - prevPrune >= pruneEveryMs) {
const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0));
const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString();
try {
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso }));
} finally {
lastPruneAt.set(k, loopNow);
}
}
} catch (err) {
console.error(
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
}
}
}
await sleep(cfg.pollMs);
}
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-depth-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-depth-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-depth-worker-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_DEPTH_BPS_BANDS
value: "5,10,20,50,100,200"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-depth-worker-script

View File

@@ -26,8 +26,10 @@ spec:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_DEPTH_BPS_BANDS

View File

@@ -64,7 +64,8 @@ function resolveConfig() {
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200');
@@ -79,6 +80,7 @@ function resolveConfig() {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
dlobSource,
markets,
pollMs,
bandsBps,
@@ -169,8 +171,9 @@ function computeBandDepth({ bids, asks, mid, bandBps }) {
async function fetchL2Latest(cfg) {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
query DlobL2Latest($source: String!, $markets: [String!]!) {
dlob_l2_latest(where: {source: {_eq: $source}, market_name: {_in: $markets}}) {
source
market_name
market_type
market_index
@@ -186,7 +189,7 @@ async function fetchL2Latest(cfg) {
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
}
@@ -232,6 +235,7 @@ async function main() {
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets,
pollMs: cfg.pollMs,
bandsBps: cfg.bandsBps,
@@ -268,6 +272,7 @@ async function main() {
for (const bandBps of cfg.bandsBps) {
const d = computeBandDepth({ bids, asks, mid, bandBps });
rows.push({
source: cfg.dlobSource,
market_name: market,
band_bps: bandBps,
market_type: l2.market_type ? String(l2.market_type) : 'perp',

View File

@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-slippage-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-slippage-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-slippage-worker-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD
value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-slippage-worker-script

View File

@@ -26,12 +26,14 @@ spec:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD
value: "10,25,50,100,250,500,1000,5000,10000,50000"
value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
@@ -19,14 +29,89 @@ function envList(name, fallbackCsv) {
.filter(Boolean);
}
function envIntList(name, fallbackCsv) {
const out = [];
for (const item of envList(name, fallbackCsv)) {
const n = Number.parseInt(item, 10);
if (!Number.isFinite(n)) continue;
out.push(n);
function parsePositiveNumber(value) {
const n = Number.parseFloat(String(value ?? '').trim());
if (!Number.isFinite(n) || !(n > 0)) return null;
return n;
}
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
function resolveConfig() {
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
.map(parsePositiveNumber)
.filter((n) => n != null)
.map((n) => n)
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
.sort((a, b) => a - b);
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
dlobSource,
markets,
pollMs,
sizesUsd,
sizesUsdInt,
depthLevels,
pricePrecision,
basePrecision,
};
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
function toNumberOrNull(value) {
@@ -41,177 +126,117 @@ function toNumberOrNull(value) {
return null;
}
function numStr(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
if (typeof value === 'string') return value.trim() || null;
return null;
}
function jsonNormalize(value) {
if (typeof value !== 'string') return value;
const s = value.trim();
if (!s) return null;
function normalizeLevels(raw) {
if (raw == null) return [];
if (Array.isArray(raw)) return raw;
if (typeof raw === 'string') {
const s = raw.trim();
if (!s) return [];
try {
return JSON.parse(s);
const v = JSON.parse(s);
return Array.isArray(v) ? v : [];
} catch {
return value;
return [];
}
}
function resolveConfig() {
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
return {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
markets,
pollMs,
sizesUsd,
pricePrecision,
basePrecision,
};
return [];
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(10_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function parseLevels(raw, pricePrecision, basePrecision, side) {
const v = jsonNormalize(raw);
if (!Array.isArray(v)) return [];
function parseScaledLevels(raw, pricePrecision, basePrecision) {
const levels = normalizeLevels(raw);
const out = [];
for (const item of v) {
const priceInt = toNumberOrNull(item?.price);
const sizeInt = toNumberOrNull(item?.size);
for (const it of levels) {
const priceInt = toNumberOrNull(it?.price);
const sizeInt = toNumberOrNull(it?.size);
if (priceInt == null || sizeInt == null) continue;
const price = priceInt / pricePrecision;
const size = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
out.push({ price, size });
const base = sizeInt / basePrecision;
if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
out.push({ price, base });
}
if (side === 'bid') out.sort((a, b) => b.price - a.price);
if (side === 'ask') out.sort((a, b) => a.price - b.price);
return out;
}
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
if (markPrice != null) return markPrice;
if (oraclePrice != null) return oraclePrice;
return null;
}
function simulateBuy(asks, mid, sizeUsd) {
function simulateFill(levels, sizeUsd) {
let remainingUsd = sizeUsd;
let filledUsd = 0;
let filledBase = 0;
let totalQuoteUsd = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of asks) {
if (!(remainingUsd > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const maxBase = remainingUsd / lvl.price;
const takeBase = Math.min(lvl.size, maxBase);
if (!(takeBase > 0)) continue;
const cost = takeBase * lvl.price;
filledUsd += cost;
filledBase += takeBase;
remainingUsd -= cost;
worstPrice = lvl.price;
for (const l of levels) {
if (remainingUsd <= 0) break;
const levelUsd = l.base * l.price;
if (levelUsd <= 0) continue;
levelsConsumed += 1;
}
worstPrice = l.price;
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
const takeUsd = Math.min(remainingUsd, levelUsd);
const takeBase = takeUsd / l.price;
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
}
function simulateSell(bids, mid, sizeUsd) {
if (mid == null || !(mid > 0)) {
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
}
const baseTarget = sizeUsd / mid;
let remainingBase = baseTarget;
let proceedsUsd = 0;
let filledBase = 0;
let worstPrice = null;
let levelsConsumed = 0;
for (const lvl of bids) {
if (!(remainingBase > 0)) break;
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
const takeBase = Math.min(lvl.size, remainingBase);
if (!(takeBase > 0)) continue;
const proceeds = takeBase * lvl.price;
proceedsUsd += proceeds;
remainingUsd -= takeUsd;
filledUsd += takeUsd;
filledBase += takeBase;
remainingBase -= takeBase;
worstPrice = lvl.price;
levelsConsumed += 1;
totalQuoteUsd += takeUsd;
}
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
return {
filledUsd,
filledBase,
vwapPrice,
worstPrice,
levelsConsumed,
fillPct,
};
}
async function fetchL2Latest(cfg) {
function impactBps({ side, mid, vwap }) {
if (mid == null || vwap == null || mid <= 0) return null;
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
return null;
}
async function main() {
const cfg = resolveConfig();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
depthLevels: cfg.depthLevels,
},
null,
2
)
);
const lastSeenUpdatedAt = new Map(); // market -> updated_at
while (true) {
const updatedAt = getIsoNow();
try {
const query = `
query DlobL2Latest($markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) {
query DlobL2Latest($source: String!, $markets: [String!]!) {
dlob_l2_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
source
market_name
market_type
market_index
ts
slot
mark_price
oracle_price
best_bid_price
best_ask_price
bids
@@ -220,14 +245,95 @@ async function fetchL2Latest(cfg) {
}
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const objectsV1 = [];
const objectsV2 = [];
for (const row of rows) {
const market = String(row?.market_name || '').trim();
if (!market) continue;
const rowUpdatedAt = row?.updated_at ?? null;
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
const bestBid = toNumberOrNull(row?.best_bid_price);
const bestAsk = toNumberOrNull(row?.best_ask_price);
if (bestBid == null || bestAsk == null) continue;
const mid = (bestBid + bestAsk) / 2;
if (!Number.isFinite(mid) || mid <= 0) continue;
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => b.price - a.price)
.slice(0, cfg.depthLevels);
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
.slice()
.sort((a, b) => a.price - b.price)
.slice(0, cfg.depthLevels);
for (const sizeUsd of cfg.sizesUsd) {
// buy consumes asks (worse prices as you go up)
{
const sim = simulateFill(asks, sizeUsd);
const baseObj = {
source: cfg.dlobSource,
market_name: market,
side: 'buy',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
async function upsertSlippage(cfg, rows) {
if (!rows.length) return;
// sell consumes bids (worse prices as you go down)
{
const sim = simulateFill(bids, sizeUsd);
const baseObj = {
source: cfg.dlobSource,
market_name: market,
side: 'sell',
market_type: row?.market_type ?? 'perp',
market_index: row?.market_index ?? null,
ts: row?.ts == null ? null : String(row.ts),
slot: row?.slot == null ? null : String(row.slot),
mid_price: String(mid),
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
filled_usd: String(sim.filledUsd),
filled_base: String(sim.filledBase),
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
levels_consumed: sim.levelsConsumed,
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
raw: { depthLevels: cfg.depthLevels },
updated_at: updatedAt,
};
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
}
}
}
if (objectsV1.length) {
const mutation = `
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
insert_dlob_slippage_latest(
objects: $rows
on_conflict: {
@@ -238,8 +344,6 @@ async function upsertSlippage(cfg, rows) {
ts
slot
mid_price
best_bid_price
best_ask_price
vwap_price
worst_price
filled_usd
@@ -254,119 +358,40 @@ async function upsertSlippage(cfg, rows) {
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows });
await graphqlRequest(cfg, mutation, { rows: objectsV1 });
}
async function main() {
const cfg = resolveConfig();
const lastUpdatedAtByMarket = new Map();
console.log(
JSON.stringify(
{
service: 'dlob-slippage-worker',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
markets: cfg.markets,
pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd,
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
null,
2
)
);
while (true) {
const rows = [];
try {
const l2Rows = await fetchL2Latest(cfg);
for (const l2 of l2Rows) {
const market = String(l2.market_name || '').trim();
if (!market) continue;
const updatedAt = l2.updated_at || null;
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
const bestBid = toNumberOrNull(l2.best_bid_price);
const bestAsk = toNumberOrNull(l2.best_ask_price);
const markPrice = toNumberOrNull(l2.mark_price);
const oraclePrice = toNumberOrNull(l2.oracle_price);
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
for (const sizeUsd of cfg.sizesUsd) {
const buy = simulateBuy(asks, mid, sizeUsd);
rows.push({
market_name: market,
side: 'buy',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(buy.vwap),
worst_price: numStr(buy.worstPrice),
filled_usd: numStr(buy.filledUsd),
filled_base: numStr(buy.filledBase),
impact_bps: numStr(buy.impactBps),
levels_consumed: buy.levelsConsumed,
fill_pct: numStr(buy.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
const sell = simulateSell(bids, mid, sizeUsd);
rows.push({
market_name: market,
side: 'sell',
size_usd: sizeUsd,
market_type: l2.market_type ? String(l2.market_type) : 'perp',
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
ts: l2.ts == null ? null : String(l2.ts),
slot: l2.slot == null ? null : String(l2.slot),
mid_price: numStr(mid),
best_bid_price: numStr(bestBid),
best_ask_price: numStr(bestAsk),
vwap_price: numStr(sell.vwap),
worst_price: numStr(sell.worstPrice),
filled_usd: numStr(sell.filledUsd),
filled_base: numStr(sell.filledBase),
impact_bps: numStr(sell.impactBps),
levels_consumed: sell.levelsConsumed,
fill_pct: numStr(sell.fillPct),
raw: {
ref: 'mid',
units: 'usd',
pricePrecision: cfg.pricePrecision,
basePrecision: cfg.basePrecision,
},
updated_at: updatedAt,
});
if (objectsV2.length) {
const mutation = `
mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
insert_dlob_slippage_latest_v2(
objects: $rows
on_conflict: {
constraint: dlob_slippage_latest_v2_pkey
update_columns: [
market_type
market_index
ts
slot
mid_price
vwap_price
worst_price
filled_usd
filled_base
impact_bps
levels_consumed
fill_pct
raw
updated_at
]
}
) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
}
} catch (err) {
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
}
try {
await upsertSlippage(cfg, rows);
} catch (err) {
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
}
await sleep(cfg.pollMs);

View File

@@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-ts-archiver-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-ts-archiver-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-ts-archiver-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_TS_POLL_MS
value: "1000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-ts-archiver-script

View File

@@ -26,8 +26,10 @@ spec:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_TS_POLL_MS
value: "1000"
command: ["node", "/app/worker.mjs"]

View File

@@ -1,6 +1,16 @@
import fs from 'node:fs';
import process from 'node:process';
import { setTimeout as sleep } from 'node:timers/promises';
function readJsonFile(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function getIsoNow() {
return new Date().toISOString();
}
@@ -20,30 +30,54 @@ function envList(name, fallbackCsv) {
}
function resolveConfig() {
const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim();
const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim();
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
const tokensPath =
process.env.HASURA_TOKENS_FILE ||
process.env.TOKENS_FILE ||
process.env.HASURA_CONFIG_FILE ||
'/app/tokens/hasura.json';
const tokens = readJsonFile(tokensPath) || {};
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000);
const hasuraUrl =
process.env.HASURA_GRAPHQL_URL ||
tokens.graphqlUrl ||
tokens.apiUrl ||
'http://hasura:8080/v1/graphql';
const hasuraAdminSecret =
process.env.HASURA_ADMIN_SECRET ||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
tokens.adminSecret ||
tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
return { hasuraUrl, hasuraAdminSecret, markets, pollMs };
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, dlobSource, markets, pollMs };
}
async function graphqlRequest(cfg, query, variables) {
const headers = { 'content-type': 'application/json' };
if (cfg.hasuraAuthToken) {
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
} else if (cfg.hasuraAdminSecret) {
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
} else {
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
}
const res = await fetch(cfg.hasuraUrl, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
},
headers,
body: JSON.stringify({ query, variables }),
signal: AbortSignal.timeout(15_000),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = JSON.parse(text);
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
if (json.errors?.length) {
throw new Error(json.errors.map((e) => e.message).join(' | '));
}
return json.data;
}
@@ -63,6 +97,8 @@ async function main() {
service: 'dlob-ts-archiver',
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets,
pollMs: cfg.pollMs,
},
@@ -76,19 +112,24 @@ async function main() {
try {
const query = `
query Latest($markets: [String!]!) {
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
query Latest($source: String!, $markets: [String!]!) {
dlob_stats_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name market_type market_index ts slot
mark_price oracle_price best_bid_price best_ask_price mid_price
spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance
raw
}
dlob_depth_bps_latest(where: { market_name: { _in: $markets } }) {
dlob_depth_bps_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name band_bps market_type market_index ts slot
mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance
raw
}
dlob_slippage_latest(where: { market_name: { _in: $markets } }) {
dlob_slippage_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
}
dlob_slippage_latest_v2(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw
@@ -96,10 +137,11 @@ async function main() {
}
`;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
const statsRows = (data?.dlob_stats_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
market_type: r.market_type,
market_index: r.market_index ?? null,
@@ -123,6 +165,7 @@ async function main() {
const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
band_bps: r.band_bps,
market_type: r.market_type,
@@ -142,6 +185,7 @@ async function main() {
const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
side: r.side,
size_usd: r.size_usd,
@@ -160,28 +204,42 @@ async function main() {
raw: r.raw ?? null,
}));
if (!statsRows.length && !depthRows.length && !slippageRows.length) {
await sleep(cfg.pollMs);
continue;
}
const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
ts: now,
source: cfg.dlobSource,
market_name: r.market_name,
side: r.side,
size_usd: r.size_usd,
market_type: r.market_type,
market_index: r.market_index ?? null,
source_ts: mapBigint(r.ts),
slot: mapBigint(r.slot),
mid_price: r.mid_price ?? null,
vwap_price: r.vwap_price ?? null,
worst_price: r.worst_price ?? null,
filled_usd: r.filled_usd ?? null,
filled_base: r.filled_base ?? null,
impact_bps: r.impact_bps ?? null,
levels_consumed: r.levels_consumed ?? null,
fill_pct: r.fill_pct ?? null,
raw: r.raw ?? null,
}));
const mutation = `
mutation InsertTs(
$stats: [dlob_stats_ts_insert_input!]!
$depth: [dlob_depth_bps_ts_insert_input!]!
$slip: [dlob_slippage_ts_insert_input!]!
$slipV2: [dlob_slippage_ts_v2_insert_input!]!
) {
insert_dlob_stats_ts(objects: $stats) { affected_rows }
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
insert_dlob_slippage_ts(objects: $slip) { affected_rows }
insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows }
}
`;
await graphqlRequest(cfg, mutation, {
stats: statsRows,
depth: depthRows,
slip: slippageRows,
});
await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 });
} catch (err) {
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
}
@@ -194,4 +252,3 @@ main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,52 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "5"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-worker-drift
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_HTTP_URL
value: https://dlob.drift.trade
- name: DLOB_FORCE_IPV6
value: "true"
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "500"
- name: DLOB_DEPTH
value: "10"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-worker-script

View File

@@ -26,10 +26,12 @@ spec:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_HTTP_URL
value: http://dlob-server:6969
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "500"
- name: DLOB_DEPTH

View File

@@ -64,8 +64,9 @@ function resolveConfig() {
.trim()
.replace(/\/$/, '');
const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false);
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10);
const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500);
@@ -80,6 +81,7 @@ function resolveConfig() {
hasuraUrl,
hasuraAdminSecret,
hasuraAuthToken,
dlobSource,
dlobHttpBase,
dlobForceIpv6,
markets,
@@ -238,8 +240,9 @@ function computeStats({ l2, depth, pricePrecision, basePrecision }) {
};
}
function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
function l2ToInsertObject({ dlobSource, l2, updatedAt, pricePrecision }) {
return {
source: dlobSource,
market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
@@ -256,8 +259,9 @@ function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
};
}
function statsToInsertObject({ l2, stats, updatedAt }) {
function statsToInsertObject({ dlobSource, l2, stats, updatedAt }) {
return {
source: dlobSource,
market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
@@ -371,6 +375,7 @@ async function main() {
startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
dlobHttpBase: cfg.dlobHttpBase,
dlobForceIpv6: cfg.dlobForceIpv6,
markets: cfg.markets,
@@ -410,8 +415,8 @@ async function main() {
basePrecision: cfg.basePrecision,
});
l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision }));
statsObjects.push(statsToInsertObject({ l2, stats, updatedAt }));
l2Objects.push(l2ToInsertObject({ dlobSource: cfg.dlobSource, l2, updatedAt, pricePrecision: cfg.pricePrecision }));
statsObjects.push(statsToInsertObject({ dlobSource: cfg.dlobSource, l2, stats, updatedAt }));
}
try {

View File

@@ -43,7 +43,7 @@ spec:
- name: REDIS_CLIENT
value: DLOB
- name: PERP_MARKETS_TO_LOAD
value: "0,1,2,4,75"
value: "0,7,24"
- name: ENDPOINT
valueFrom:
secretKeyRef:
@@ -59,11 +59,15 @@ spec:
httpGet:
path: /startup
port: http
initialDelaySeconds: 10
initialDelaySeconds: 120
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 30
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
initialDelaySeconds: 240
periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 10

View File

@@ -16,7 +16,7 @@ spec:
- name: gitea-registry
containers:
- name: frontend
image: gitea.mpabi.pl/trade/trade-frontend:sha-ob-20260111203413
image: gitea.mpabi.pl/trade/trade-frontend:sha-b06fe7f
imagePullPolicy: IfNotPresent
ports:
- name: http

View File

@@ -89,6 +89,8 @@ async function main() {
console.log(`[hasura-bootstrap] HASURA_URL=${HASURA_URL}`);
await waitForHasura();
const PUBLIC_DLOB_SOURCE_HEADER = 'X-Hasura-Dlob-Source';
const apiTokensTable = { schema: 'public', name: 'api_tokens' };
const source = 'default';
@@ -97,9 +99,12 @@ async function main() {
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' };
const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' };
const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' };
const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' };
const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' };
const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' };
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
@@ -169,7 +174,22 @@ async function main() {
await ensureTickTable(t);
}
const ensureDlobTable = async (table, columns) => {
// Cached candles table (precomputed by worker; public read).
await ensurePublicSelectTable(candlesCacheTable, [
'bucket',
'bucket_seconds',
'symbol',
'source',
'open',
'high',
'low',
'close',
'oracle_close',
'ticks',
'updated_at',
]);
const ensureDlobTable = async (table, columns, { publicFilter } = {}) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -182,7 +202,7 @@ async function main() {
role: 'public',
permission: {
columns,
filter: {},
filter: publicFilter || {},
},
},
});
@@ -218,7 +238,7 @@ async function main() {
});
};
const ensurePublicSelectTable = async (table, columns) => {
async function ensurePublicSelectTable(table, columns, { publicFilter } = {}) {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -231,7 +251,7 @@ async function main() {
role: 'public',
permission: {
columns,
filter: {},
filter: publicFilter || {},
},
},
});
@@ -239,9 +259,12 @@ async function main() {
// Computed/archived tables are written by workers using admin secret; keep ingestor off by default.
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
};
}
const dlobPublicFilter = { source: { _eq: PUBLIC_DLOB_SOURCE_HEADER } };
await ensureDlobTable(dlobL2LatestTable, [
'source',
'market_name',
'market_type',
'market_index',
@@ -255,9 +278,10 @@ async function main() {
'asks',
'raw',
'updated_at',
]);
], { publicFilter: dlobPublicFilter });
await ensureDlobTable(dlobStatsLatestTable, [
'source',
'market_name',
'market_type',
'market_index',
@@ -278,9 +302,10 @@ async function main() {
'imbalance',
'raw',
'updated_at',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsLatestTable, [
'source',
'market_name',
'band_bps',
'market_type',
@@ -297,9 +322,10 @@ async function main() {
'imbalance',
'raw',
'updated_at',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestTable, [
'source',
'market_name',
'side',
'size_usd',
@@ -319,11 +345,35 @@ async function main() {
'fill_pct',
'raw',
'updated_at',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobStatsTsTable, [
'ts',
'id',
'source',
'market_name',
'market_type',
'market_index',
@@ -343,11 +393,12 @@ async function main() {
'depth_ask_usd',
'imbalance',
'raw',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsTsTable, [
'ts',
'id',
'source',
'market_name',
'band_bps',
'market_type',
@@ -363,11 +414,12 @@ async function main() {
'ask_usd',
'imbalance',
'raw',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsTable, [
'ts',
'id',
'source',
'market_name',
'side',
'size_usd',
@@ -384,7 +436,29 @@ async function main() {
'levels_consumed',
'fill_pct',
'raw',
]);
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsV2Table, [
'ts',
'id',
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
], { publicFilter: dlobPublicFilter });
// Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });

View File

@@ -121,10 +121,47 @@ CREATE TABLE IF NOT EXISTS public.drift_candles (
high numeric,
low numeric,
close numeric,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric,
ticks bigint
);
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric;
-- Precomputed candle cache (materialized by a worker).
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
-- NOTE: `source=''` means "any source" (no source filter).
CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
bucket timestamptz NOT NULL,
bucket_seconds integer NOT NULL,
symbol text NOT NULL,
source text NOT NULL DEFAULT '',
open numeric NOT NULL,
high numeric NOT NULL,
low numeric NOT NULL,
close numeric NOT NULL,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric,
ticks bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (bucket, bucket_seconds, symbol, source)
);
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric;
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC);
-- If an older version of the function exists with an incompatible return type,
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
@@ -139,27 +176,144 @@ RETURNS SETOF public.drift_candles
LANGUAGE sql
STABLE
AS $$
WITH base AS (
-- Zwraca zawsze "ciągłe" buckety (fill forward), nawet jeśli nie było ticków w danej sekundzie/minucie.
-- Dzięki temu frontend może rysować regularną oś czasu (np. 1px = 1s) bez dziwnych przeskoków.
WITH src AS (
SELECT COALESCE(p_source, '') AS source_key
),
raw_cached AS (
SELECT
c.bucket,
c.open,
c.high,
c.low,
c.close,
c.oracle_open,
c.oracle_high,
c.oracle_low,
c.oracle_close,
c.ticks
FROM public.drift_candles_cache c, src
WHERE c.symbol = p_symbol
AND c.bucket_seconds = p_bucket_seconds
AND c.source = src.source_key
ORDER BY c.bucket DESC
LIMIT p_limit
),
raw_fallback AS (
SELECT
time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
FROM public.drift_ticks, src
WHERE symbol = p_symbol
AND (p_source IS NULL OR source = p_source)
AND (src.source_key = '' OR source = src.source_key)
AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2))
)
),
computed AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open,
max(oracle_px) AS oracle_high,
min(oracle_px) AS oracle_low,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*) AS ticks
FROM base
FROM raw_fallback
GROUP BY bucket
),
data AS (
SELECT * FROM raw_cached
UNION ALL
SELECT * FROM computed
WHERE NOT EXISTS (SELECT 1 FROM raw_cached)
),
bounds AS (
SELECT max(bucket) AS end_bucket FROM data
),
params AS (
SELECT
make_interval(secs => p_bucket_seconds) AS step,
make_interval(secs => (p_bucket_seconds * (p_limit - 1))) AS span
),
series AS (
SELECT generate_series(
bounds.end_bucket - params.span,
bounds.end_bucket,
params.step
) AS bucket
FROM bounds, params
WHERE bounds.end_bucket IS NOT NULL
),
joined AS (
SELECT
s.bucket,
d.open,
d.high,
d.low,
d.close,
d.oracle_open,
d.oracle_high,
d.oracle_low,
d.oracle_close,
d.ticks
FROM series s
LEFT JOIN data d USING (bucket)
ORDER BY s.bucket ASC
),
grouped AS (
SELECT
*,
sum(CASE WHEN close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_close,
sum(CASE WHEN oracle_close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_oracle
FROM joined
),
first_vals AS (
SELECT
(SELECT close FROM grouped WHERE close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_close,
(SELECT oracle_close FROM grouped WHERE oracle_close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_oracle
),
ff AS (
SELECT
g.bucket,
g.open,
g.high,
g.low,
g.close,
g.oracle_open,
g.oracle_high,
g.oracle_low,
g.oracle_close,
g.ticks,
COALESCE(
g.close,
max(g.close) OVER (PARTITION BY g.grp_close),
f.first_close
) AS ff_close,
COALESCE(
g.oracle_close,
max(g.oracle_close) OVER (PARTITION BY g.grp_oracle),
f.first_oracle
) AS ff_oracle
FROM grouped g
CROSS JOIN first_vals f
)
SELECT
bucket,
COALESCE(open, ff_close) AS open,
COALESCE(high, ff_close) AS high,
COALESCE(low, ff_close) AS low,
COALESCE(close, ff_close) AS close,
COALESCE(oracle_open, ff_oracle) AS oracle_open,
COALESCE(oracle_high, ff_oracle) AS oracle_high,
COALESCE(oracle_low, ff_oracle) AS oracle_low,
COALESCE(oracle_close, ff_oracle) AS oracle_close,
COALESCE(ticks, 0) AS ticks
FROM ff
ORDER BY bucket DESC
LIMIT p_limit;
$$;
@@ -167,7 +321,8 @@ $$;
-- Latest DLOB orderbook snapshots (top-N levels), per market.
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
market_name TEXT PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
@@ -179,15 +334,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
bids JSONB,
asks JSONB,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
ON public.dlob_l2_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx
ON public.dlob_l2_latest (source, updated_at DESC);
-- Derived stats for fast UI display.
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
market_name TEXT PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
@@ -206,15 +398,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
depth_ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
ON public.dlob_stats_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx
ON public.dlob_stats_latest (source, updated_at DESC);
-- Depth snapshots within bps bands around mid-price (per market, per band).
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
@@ -231,18 +460,54 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, band_bps)
PRIMARY KEY (source, market_name, band_bps)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN
EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
ON public.dlob_depth_bps_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
ON public.dlob_depth_bps_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx
ON public.dlob_depth_bps_latest (source, market_name);
-- Slippage/impact estimates for "market" orders at common USD sizes.
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
@@ -262,22 +527,126 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
ON public.dlob_slippage_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx
ON public.dlob_slippage_latest (source, market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx
ON public.dlob_slippage_latest_v2 (source, market_name);
-- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables.
CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
@@ -300,14 +669,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx
ON public.dlob_stats_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx
ON public.dlob_stats_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
@@ -326,14 +705,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx
ON public.dlob_depth_bps_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx
ON public.dlob_depth_bps_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
@@ -353,11 +742,56 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx
ON public.dlob_slippage_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present).
DO $$
BEGIN
@@ -375,3 +809,8 @@ BEGIN
PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;

View File

@@ -19,9 +19,14 @@ resources:
- dlob/server-service.yaml
- dlob/server-deployment.yaml
- dlob-worker/deployment.yaml
- dlob-worker/deployment-drift.yaml
- dlob-depth-worker/deployment.yaml
- dlob-depth-worker/deployment-drift.yaml
- dlob-slippage-worker/deployment.yaml
- dlob-slippage-worker/deployment-drift.yaml
- dlob-ts-archiver/deployment.yaml
- dlob-ts-archiver/deployment-drift.yaml
- candles-cache-worker/deployment.yaml
configMapGenerator:
- name: postgres-initdb
@@ -42,9 +47,15 @@ configMapGenerator:
- name: dlob-ts-archiver-script
files:
- dlob-ts-archiver/worker.mjs
- name: candles-cache-worker-script
files:
- candles-cache-worker/worker.mjs
- name: trade-api-wrapper
files:
- api/wrapper.mjs
- name: trade-api-upstream
files:
- api/server.mjs
generatorOptions:
disableNameSuffixHash: true

View File

@@ -0,0 +1,13 @@
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: monitoring-mpabi-pl
namespace: monitoring
spec:
secretName: monitoring-mpabi-pl-tls
issuerRef:
kind: ClusterIssuer
name: letsencrypt-prod
dnsNames:
- grafana.mpabi.pl
- prometheus.mpabi.pl

View File

@@ -0,0 +1,238 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-dashboard-agave-status
namespace: monitoring
labels:
grafana_dashboard: "1"
data:
agave-status.json: |-
{
"uid": "agave-status-mpabi",
"title": "Agave @ mpabi",
"timezone": "browser",
"schemaVersion": 39,
"version": 2,
"time": { "from": "now-6h", "to": "now" },
"timepicker": {
"refresh_intervals": ["5s", "10s", "30s", "1m", "5m"],
"time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d"]
},
"refresh": "10s",
"tags": ["agave", "solana", "mpabi"],
"templating": {
"list": [
{
"name": "instance",
"type": "query",
"label": "Node Exporter Instance",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"query": { "query": "label_values(up{job=\"mpabi-node-exporter\"}, instance)", "refId": "PromVarInstance" },
"current": { "selected": false, "text": "10.66.66.1:9100", "value": "10.66.66.1:9100" }
}
]
},
"panels": [
{
"id": 1,
"type": "stat",
"title": "Geyser Metrics Target (Prometheus up)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "A", "expr": "up{job=\"mpabi-yellowstone-geyser\"}" }],
"options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "orientation": "horizontal", "textMode": "value" },
"fieldConfig": {
"defaults": { "thresholds": { "mode": "absolute", "steps": [{ "color": "red", "value": null }, { "color": "green", "value": 1 }] } },
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 0, "y": 0 }
},
{
"id": 2,
"type": "stat",
"title": "agave-validator.service (systemd active)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "B", "expr": "node_systemd_unit_state{job=\"mpabi-node-exporter\",instance=\"$instance\",name=\"agave-validator.service\",state=\"active\"}" }],
"options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "orientation": "horizontal", "textMode": "value" },
"fieldConfig": {
"defaults": { "thresholds": { "mode": "absolute", "steps": [{ "color": "red", "value": null }, { "color": "green", "value": 1 }] } },
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 12, "y": 0 }
},
{
"id": 3,
"type": "timeseries",
"title": "CPU Used (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "C", "expr": "100 - (avg by (instance) (rate(node_cpu_seconds_total{job=\"mpabi-node-exporter\",instance=\"$instance\",mode=\"idle\"}[5m])) * 100)" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 6 }
},
{
"id": 4,
"type": "timeseries",
"title": "Load (1m)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "D", "expr": "node_load1{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }],
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 6 }
},
{
"id": 5,
"type": "timeseries",
"title": "Memory Used (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "E", "expr": "100 - (node_memory_MemAvailable_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} / node_memory_MemTotal_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} * 100)" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 6 }
},
{
"id": 6,
"type": "timeseries",
"title": "Swap Used (GiB)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "F", "expr": "(node_memory_SwapTotal_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} - node_memory_SwapFree_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"}) / 1024 / 1024 / 1024" }],
"fieldConfig": { "defaults": { "unit": "gbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 14 }
},
{
"id": 7,
"type": "timeseries",
"title": "Disk Free Accounts (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "G", "expr": "100 * node_filesystem_avail_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/accounts\"} / node_filesystem_size_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/accounts\"}" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 14 }
},
{
"id": 8,
"type": "timeseries",
"title": "Disk Free Ledger (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "H", "expr": "100 * node_filesystem_avail_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/ledger\"} / node_filesystem_size_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/ledger\"}" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 14 }
},
{
"id": 9,
"type": "timeseries",
"title": "Disk IO (NVMe) Read/Write (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "I", "expr": "sum by (device) (rate(node_disk_read_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=~\"nvme.*\"}[5m])) / 1024 / 1024", "legendFormat": "read {{device}}" },
{ "refId": "J", "expr": "sum by (device) (rate(node_disk_written_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=~\"nvme.*\"}[5m])) / 1024 / 1024", "legendFormat": "write {{device}}" }
],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 22 }
},
{
"id": 10,
"type": "timeseries",
"title": "Network wg0 RX/TX (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "K", "expr": "rate(node_network_receive_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=\"wg0\"}[5m]) / 1024 / 1024", "legendFormat": "rx" },
{ "refId": "L", "expr": "rate(node_network_transmit_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=\"wg0\"}[5m]) / 1024 / 1024", "legendFormat": "tx" }
],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 22 }
},
{
"id": 11,
"type": "timeseries",
"title": "Geyser: Subscriber Queue Size",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "M", "expr": "grpc_subscriber_queue_size{job=\"mpabi-yellowstone-geyser\"}", "legendFormat": "{{subscriber_id}}" }],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 30 }
},
{
"id": 12,
"type": "timeseries",
"title": "Geyser: Connections Total",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "N", "expr": "connections_total{job=\"mpabi-yellowstone-geyser\"}" }],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 30 }
},
{
"id": 13,
"type": "timeseries",
"title": "Geyser: Bytes Sent (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "O", "expr": "rate(grpc_bytes_sent{job=\"mpabi-yellowstone-geyser\"}[5m]) / 1024 / 1024", "legendFormat": "{{subscriber_id}}" }],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 38 }
},
{
"id": 14,
"type": "timeseries",
"title": "Geyser: Messages Sent (/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "P", "expr": "rate(grpc_message_sent_count{job=\"mpabi-yellowstone-geyser\"}[5m])", "legendFormat": "{{subscriber_id}}" }],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 38 }
},
{
"id": 15,
"type": "timeseries",
"title": "Geyser: Disconnects (increase 15m)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "Q", "expr": "sum by (reason) (increase(grpc_client_disconnects_total{job=\"mpabi-yellowstone-geyser\"}[15m]))", "legendFormat": "{{reason}}" }],
"gridPos": { "h": 8, "w": 24, "x": 0, "y": 46 }
},
{
"id": 16,
"type": "stat",
"title": "RPC Slot Lag (slots)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "R", "expr": "solana_rpc_slot_lag{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }],
"fieldConfig": {
"defaults": {
"unit": "short",
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 20 },
{ "color": "red", "value": 50 }
]
}
},
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 0, "y": 54 }
},
{
"id": 17,
"type": "stat",
"title": "RPC Slot Lag (szac. minuty)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "S", "expr": "solana_rpc_slot_lag{job=\"mpabi-node-exporter\",instance=\"$instance\"} * 0.4 / 60" }],
"fieldConfig": {
"defaults": {
"unit": "min",
"decimals": 2,
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 1 },
{ "color": "red", "value": 2 }
]
}
},
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 12, "y": 54 }
},
{
"id": 18,
"type": "timeseries",
"title": "RPC Slot & Reference Slot",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "T", "expr": "solana_rpc_slot{job=\"mpabi-node-exporter\",instance=\"$instance\"}" },
{ "refId": "U", "expr": "solana_rpc_slot_reference{job=\"mpabi-node-exporter\",instance=\"$instance\"}" },
{ "refId": "V", "expr": "solana_rpc_block_height{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }
],
"gridPos": { "h": 8, "w": 24, "x": 0, "y": 60 }
}
]
}

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: grafana-http
namespace: monitoring
spec:
entryPoints:
- web
routes:
- match: Host(`grafana.mpabi.pl`)
kind: Rule
middlewares:
- name: redirect-to-https
services:
- name: monitoring-stack-grafana
port: 80

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: grafana
namespace: monitoring
spec:
entryPoints:
- websecure
routes:
- match: Host(`grafana.mpabi.pl`)
kind: Rule
services:
- name: monitoring-stack-grafana
port: 80
tls:
secretName: monitoring-mpabi-pl-tls

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: prometheus-http
namespace: monitoring
spec:
entryPoints:
- web
routes:
- match: Host(`prometheus.mpabi.pl`)
kind: Rule
middlewares:
- name: redirect-to-https
services:
- name: monitoring-stack-kube-prom-prometheus
port: 9090

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: prometheus
namespace: monitoring
spec:
entryPoints:
- websecure
routes:
- match: Host(`prometheus.mpabi.pl`)
kind: Rule
services:
- name: monitoring-stack-kube-prom-prometheus
port: 9090
tls:
secretName: monitoring-mpabi-pl-tls

View File

@@ -0,0 +1,11 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- middleware-redirect-to-https.yaml
- certificate-monitoring-mpabi-pl.yaml
- ingressroute-grafana.yaml
- ingressroute-grafana-http.yaml
- ingressroute-prometheus.yaml
- ingressroute-prometheus-http.yaml
- dashboard-agave-status.yaml
- prometheus-rules-agave.yaml

View File

@@ -0,0 +1,9 @@
apiVersion: traefik.io/v1alpha1
kind: Middleware
metadata:
name: redirect-to-https
namespace: monitoring
spec:
redirectScheme:
scheme: https
permanent: true

View File

@@ -0,0 +1,61 @@
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: agave-rpc-alerts
namespace: monitoring
labels:
app: kube-prometheus-stack
release: monitoring-stack
spec:
groups:
- name: agave-rpc-health
rules:
- alert: AgaveRPCDown
expr: "max by (instance) (solana_rpc_up{job=\"mpabi-node-exporter\"}) == 0"
for: 30s
labels:
severity: critical
team: mpabi
annotations:
summary: "Agave RPC is unreachable"
description: "RPC probe from node exporter reports solana_rpc_up == 0 for instance {{ $labels.instance }}."
- alert: AgaveRPCSlotLagHigh
expr: "sum by (instance) (solana_rpc_slot_lag{job=\"mpabi-node-exporter\"}) > 50"
for: 2m
labels:
severity: warning
team: mpabi
annotations:
summary: "Agave RPC is lagging behind cluster"
description: "Current slot lag is {{ $value }} for instance {{ $labels.instance }}. Reference endpoint in probe config may be misconfigured or validator is behind."
- alert: AgaveRPCSlotLagCritical
expr: "sum by (instance) (solana_rpc_slot_lag{job=\"mpabi-node-exporter\"}) > 500"
for: 2m
labels:
severity: critical
team: mpabi
annotations:
summary: "Agave RPC severe lag"
description: "Slot lag is critically high ({{ $value }} slots) on instance {{ $labels.instance }}."
- alert: AgaveIOHigh
expr: |
sum by (instance) (
(rate(node_disk_read_bytes_total{job="mpabi-node-exporter",device=~"nvme.*"}[5m]) +
rate(node_disk_written_bytes_total{job="mpabi-node-exporter",device=~"nvme.*"}[5m])) / 1024 / 1024
) > 300
for: 5m
labels:
severity: warning
team: mpabi
annotations:
summary: "High storage I/O on Agave node"
description: "Combined NVMe read+write throughput >300 MiB/s for 5m on {{ $labels.instance }}. Check disk pressure and Geyser/ledger workload."
- alert: AgaveIOWaitHigh
expr: "avg by (instance) (rate(node_cpu_seconds_total{job=\"mpabi-node-exporter\",mode=\"iowait\"}[5m])) > 0.2"
for: 5m
labels:
severity: warning
team: mpabi
annotations:
summary: "High iowait on Agave node"
description: "Iowait over 20% on average for 5m on {{ $labels.instance }}. Storage latency is likely impacting slot progress."

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-publisher
spec:
template:
spec:
containers:
- name: publisher
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-server
spec:
template:
spec:
containers:
- name: server
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,18 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-frontend
spec:
template:
spec:
containers:
- name: frontend
volumeMounts:
- name: frontend-server-script
mountPath: /app/services/frontend/server.mjs
subPath: frontend-server.mjs
readOnly: true
volumes:
- name: frontend-server-script
configMap:
name: trade-frontend-server-script

View File

@@ -0,0 +1,813 @@
import crypto from 'node:crypto';
import { spawnSync } from 'node:child_process';
import fs from 'node:fs';
import http from 'node:http';
import https from 'node:https';
import net from 'node:net';
import path from 'node:path';
import tls from 'node:tls';
const PORT = Number.parseInt(process.env.PORT || '8081', 10);
if (!Number.isInteger(PORT) || PORT <= 0) throw new Error(`Invalid PORT: ${process.env.PORT}`);
const APP_VERSION = String(process.env.APP_VERSION || 'v1').trim() || 'v1';
const BUILD_TIMESTAMP = String(process.env.BUILD_TIMESTAMP || '').trim() || undefined;
const STARTED_AT = new Date().toISOString();
const STATIC_DIR = process.env.STATIC_DIR || '/srv';
const BASIC_AUTH_FILE = process.env.BASIC_AUTH_FILE || '/tokens/frontend.json';
const API_READ_TOKEN_FILE = process.env.API_READ_TOKEN_FILE || '/tokens/read.json';
const API_UPSTREAM = process.env.API_UPSTREAM || process.env.API_URL || 'http://api:8787';
const HASURA_UPSTREAM = process.env.HASURA_UPSTREAM || 'http://hasura:8080';
const HASURA_GRAPHQL_PATH = process.env.HASURA_GRAPHQL_PATH || '/v1/graphql';
const GRAPHQL_CORS_ORIGIN = process.env.GRAPHQL_CORS_ORIGIN || process.env.CORS_ORIGIN || '*';
const BASIC_AUTH_MODE = String(process.env.BASIC_AUTH_MODE || 'on')
.trim()
.toLowerCase();
const BASIC_AUTH_ENABLED = !['off', 'false', '0', 'disabled', 'none'].includes(BASIC_AUTH_MODE);
const AUTH_USER_HEADER = String(process.env.AUTH_USER_HEADER || 'x-trade-user')
.trim()
.toLowerCase();
const AUTH_MODE = String(process.env.AUTH_MODE || 'session')
.trim()
.toLowerCase();
const HTPASSWD_FILE = String(process.env.HTPASSWD_FILE || '/auth/users').trim();
const AUTH_SESSION_SECRET_FILE = String(process.env.AUTH_SESSION_SECRET_FILE || '').trim() || null;
const AUTH_SESSION_COOKIE = String(process.env.AUTH_SESSION_COOKIE || 'trade_session')
.trim()
.toLowerCase();
const AUTH_SESSION_TTL_SECONDS = Number.parseInt(process.env.AUTH_SESSION_TTL_SECONDS || '43200', 10); // 12h
const DLOB_SOURCE_COOKIE = String(process.env.DLOB_SOURCE_COOKIE || 'trade_dlob_source').trim() || 'trade_dlob_source';
const DLOB_SOURCE_DEFAULT = String(process.env.DLOB_SOURCE_DEFAULT || 'mevnode').trim() || 'mevnode';
function readJson(filePath) {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
}
function readText(filePath) {
return fs.readFileSync(filePath, 'utf8');
}
function timingSafeEqualStr(a, b) {
const aa = Buffer.from(String(a), 'utf8');
const bb = Buffer.from(String(b), 'utf8');
if (aa.length !== bb.length) return false;
return crypto.timingSafeEqual(aa, bb);
}
function timingSafeEqualBuf(a, b) {
if (!(a instanceof Uint8Array) || !(b instanceof Uint8Array)) return false;
if (a.length !== b.length) return false;
return crypto.timingSafeEqual(Buffer.from(a), Buffer.from(b));
}
function loadBasicAuth() {
const j = readJson(BASIC_AUTH_FILE);
const username = (j?.username || '').toString();
const password = (j?.password || '').toString();
if (!username || !password) throw new Error(`Invalid BASIC_AUTH_FILE: ${BASIC_AUTH_FILE}`);
return { username, password };
}
function loadApiReadToken() {
const j = readJson(API_READ_TOKEN_FILE);
const token = (j?.token || '').toString();
if (!token) throw new Error(`Invalid API_READ_TOKEN_FILE: ${API_READ_TOKEN_FILE}`);
return token;
}
function send(res, status, headers, body) {
res.statusCode = status;
for (const [k, v] of Object.entries(headers || {})) res.setHeader(k, v);
if (body == null) return void res.end();
res.end(body);
}
function sendJson(res, status, body) {
send(res, status, { 'content-type': 'application/json; charset=utf-8', 'cache-control': 'no-store' }, JSON.stringify(body));
}
function basicAuthRequired(res) {
res.setHeader('www-authenticate', 'Basic realm="trade"');
send(res, 401, { 'content-type': 'text/plain; charset=utf-8' }, 'unauthorized');
}
function unauthorized(res) {
sendJson(res, 401, { ok: false, error: 'unauthorized' });
}
function isAuthorized(req, creds) {
const auth = req.headers.authorization || '';
const m = String(auth).match(/^Basic\s+(.+)$/i);
if (!m?.[1]) return false;
let decoded;
try {
decoded = Buffer.from(m[1], 'base64').toString('utf8');
} catch {
return false;
}
const idx = decoded.indexOf(':');
if (idx < 0) return false;
const u = decoded.slice(0, idx);
const p = decoded.slice(idx + 1);
return timingSafeEqualStr(u, creds.username) && timingSafeEqualStr(p, creds.password);
}
const MIME = {
'.html': 'text/html; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.js': 'application/javascript; charset=utf-8',
'.mjs': 'application/javascript; charset=utf-8',
'.json': 'application/json; charset=utf-8',
'.svg': 'image/svg+xml',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.ico': 'image/x-icon',
'.txt': 'text/plain; charset=utf-8',
'.map': 'application/json; charset=utf-8',
};
function contentTypeFor(filePath) {
return MIME[path.extname(filePath).toLowerCase()] || 'application/octet-stream';
}
function safePathFromUrlPath(urlPath) {
const decoded = decodeURIComponent(urlPath);
const cleaned = decoded.replace(/\0/g, '');
// strip leading slash so join() doesn't ignore STATIC_DIR
const rel = cleaned.replace(/^\/+/, '');
const normalized = path.normalize(rel);
// prevent traversal
if (normalized.startsWith('..') || path.isAbsolute(normalized)) return null;
return normalized;
}
function injectIndexHtml(html, { dlobSource, redirectPath }) {
const src = normalizeDlobSource(dlobSource) || 'mevnode';
const redirect = safeRedirectPath(redirectPath);
const hrefBase = `/prefs/dlob-source?redirect=${encodeURIComponent(redirect)}&set=`;
const styleActive = 'font-weight:700;text-decoration:underline;';
const styleInactive = 'font-weight:400;text-decoration:none;';
const snippet = `
<!-- trade: dlob source switch -->
<div style="position:fixed;right:12px;bottom:12px;z-index:2147483647;background:rgba(0,0,0,0.72);color:#fff;padding:8px 10px;border-radius:10px;font:12px/1.2 system-ui,-apple-system,Segoe UI,Roboto,sans-serif;backdrop-filter:blur(6px);">
<span style="opacity:0.85;margin-right:6px;">DLOB</span>
<a href="${hrefBase}mevnode" style="color:#fff;${src === 'mevnode' ? styleActive : styleInactive}">mevnode</a>
<span style="opacity:0.6;margin:0 6px;">|</span>
<a href="${hrefBase}drift" style="color:#fff;${src === 'drift' ? styleActive : styleInactive}">drift</a>
</div>
`;
const bodyClose = /<\/body>/i;
if (bodyClose.test(html)) return html.replace(bodyClose, `${snippet}</body>`);
return `${html}\n${snippet}\n`;
}
function serveStatic(req, res) {
if (req.method !== 'GET' && req.method !== 'HEAD') {
send(res, 405, { 'content-type': 'text/plain; charset=utf-8' }, 'method_not_allowed');
return;
}
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const rel = safePathFromUrlPath(url.pathname);
if (rel == null) {
send(res, 400, { 'content-type': 'text/plain; charset=utf-8' }, 'bad_request');
return;
}
const root = path.resolve(STATIC_DIR);
const fileCandidate = path.resolve(root, rel);
if (!fileCandidate.startsWith(root)) {
send(res, 400, { 'content-type': 'text/plain; charset=utf-8' }, 'bad_request');
return;
}
const trySend = (filePath) => {
try {
const st = fs.statSync(filePath);
if (st.isDirectory()) return trySend(path.join(filePath, 'index.html'));
res.statusCode = 200;
res.setHeader('content-type', contentTypeFor(filePath));
res.setHeader('cache-control', filePath.endsWith('index.html') ? 'no-cache' : 'public, max-age=31536000');
if (req.method === 'HEAD') return void res.end();
if (filePath.endsWith('index.html')) {
const html = fs.readFileSync(filePath, 'utf8');
const injected = injectIndexHtml(html, {
dlobSource: resolveDlobSource(req),
redirectPath: url.pathname + url.search,
});
res.end(injected);
return true;
}
fs.createReadStream(filePath).pipe(res);
return true;
} catch {
return false;
}
};
// exact file, otherwise SPA fallback
if (trySend(fileCandidate)) return;
const indexPath = path.join(root, 'index.html');
if (trySend(indexPath)) return;
send(res, 404, { 'content-type': 'text/plain; charset=utf-8' }, 'not_found');
}
function stripHopByHopHeaders(headers) {
const hop = new Set([
'connection',
'keep-alive',
'proxy-authenticate',
'proxy-authorization',
'te',
'trailer',
'transfer-encoding',
'upgrade',
]);
const out = {};
for (const [k, v] of Object.entries(headers || {})) {
if (hop.has(k.toLowerCase())) continue;
out[k] = v;
}
return out;
}
function readHeader(req, name) {
const v = req.headers[String(name).toLowerCase()];
return Array.isArray(v) ? v[0] : v;
}
function readCookie(req, name) {
const raw = typeof req.headers.cookie === 'string' ? req.headers.cookie : '';
if (!raw) return null;
const needle = `${name}=`;
for (const part of raw.split(';')) {
const t = part.trim();
if (!t.startsWith(needle)) continue;
return t.slice(needle.length) || null;
}
return null;
}
function normalizeDlobSource(value) {
const v = String(value ?? '')
.trim()
.toLowerCase();
if (v === 'mevnode') return 'mevnode';
if (v === 'drift') return 'drift';
return null;
}
function resolveDlobSource(req) {
const fromCookie = normalizeDlobSource(readCookie(req, DLOB_SOURCE_COOKIE));
if (fromCookie) return fromCookie;
return normalizeDlobSource(DLOB_SOURCE_DEFAULT) || 'mevnode';
}
function safeRedirectPath(value) {
const s = String(value ?? '').trim();
if (!s.startsWith('/')) return '/';
if (s.startsWith('//')) return '/';
return s.replace(/\r|\n/g, '');
}
function setDlobSourceCookie(res, { secure, dlobSource }) {
const src = normalizeDlobSource(dlobSource);
if (!src) return false;
const parts = [
`${DLOB_SOURCE_COOKIE}=${src}`,
'Path=/',
'SameSite=Lax',
'HttpOnly',
'Max-Age=31536000',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
return true;
}
function resolveAuthUser(req) {
const user = readHeader(req, AUTH_USER_HEADER) || readHeader(req, 'x-webauth-user');
const value = typeof user === 'string' ? user.trim() : '';
return value || null;
}
function isHttpsRequest(req) {
const xf = readHeader(req, 'x-forwarded-proto');
if (typeof xf === 'string' && xf.toLowerCase() === 'https') return true;
return Boolean(req.socket && req.socket.encrypted);
}
function base64urlEncode(buf) {
return Buffer.from(buf)
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/g, '');
}
function base64urlDecode(str) {
const cleaned = String(str).replace(/-/g, '+').replace(/_/g, '/');
const pad = cleaned.length % 4 === 0 ? '' : '='.repeat(4 - (cleaned.length % 4));
return Buffer.from(cleaned + pad, 'base64');
}
function loadSessionSecret() {
if (process.env.AUTH_SESSION_SECRET && String(process.env.AUTH_SESSION_SECRET).trim()) {
return Buffer.from(String(process.env.AUTH_SESSION_SECRET).trim(), 'utf8');
}
if (AUTH_SESSION_SECRET_FILE) {
try {
const txt = readText(AUTH_SESSION_SECRET_FILE).trim();
if (txt) return Buffer.from(txt, 'utf8');
} catch {
// ignore
}
}
return crypto.randomBytes(32);
}
const SESSION_SECRET = loadSessionSecret();
function signSessionPayload(payloadB64) {
return crypto.createHmac('sha256', SESSION_SECRET).update(payloadB64).digest();
}
function makeSessionCookieValue(username) {
const now = Math.floor(Date.now() / 1000);
const exp = now + (Number.isFinite(AUTH_SESSION_TTL_SECONDS) && AUTH_SESSION_TTL_SECONDS > 0 ? AUTH_SESSION_TTL_SECONDS : 43200);
const payload = JSON.stringify({ u: String(username), exp });
const payloadB64 = base64urlEncode(Buffer.from(payload, 'utf8'));
const sigB64 = base64urlEncode(signSessionPayload(payloadB64));
return `${payloadB64}.${sigB64}`;
}
function getSessionUser(req) {
const raw = readCookie(req, AUTH_SESSION_COOKIE);
if (!raw) return null;
const parts = raw.split('.');
if (parts.length !== 2) return null;
const [payloadB64, sigB64] = parts;
if (!payloadB64 || !sigB64) return null;
let payload;
try {
payload = JSON.parse(base64urlDecode(payloadB64).toString('utf8'));
} catch {
return null;
}
const u = typeof payload?.u === 'string' ? payload.u.trim() : '';
const exp = Number(payload?.exp);
if (!u || !Number.isFinite(exp)) return null;
const now = Math.floor(Date.now() / 1000);
if (now >= exp) return null;
const expected = signSessionPayload(payloadB64);
let got;
try {
got = base64urlDecode(sigB64);
} catch {
return null;
}
if (!timingSafeEqualBuf(expected, got)) return null;
return u;
}
function resolveAuthenticatedUser(req) {
const sessionUser = getSessionUser(req);
if (sessionUser) return sessionUser;
const headerUser = resolveAuthUser(req);
if (headerUser) return headerUser;
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') return 'anonymous';
return null;
}
function clearSessionCookie(res, secure) {
const parts = [`${AUTH_SESSION_COOKIE}=`, 'Path=/', 'Max-Age=0', 'HttpOnly', 'SameSite=Lax'];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
}
function setSessionCookie(res, secure, username) {
const value = makeSessionCookieValue(username);
const parts = [
`${AUTH_SESSION_COOKIE}=${value}`,
'Path=/',
`Max-Age=${Number.isFinite(AUTH_SESSION_TTL_SECONDS) ? AUTH_SESSION_TTL_SECONDS : 43200}`,
'HttpOnly',
'SameSite=Lax',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
}
function verifyWithHtpasswd(username, password) {
try {
const r = spawnSync('htpasswd', ['-vb', HTPASSWD_FILE, String(username), String(password)], {
stdio: 'ignore',
timeout: 3000,
});
return r.status === 0;
} catch {
return false;
}
}
function readBody(req, limitBytes = 1024 * 16) {
return new Promise((resolve, reject) => {
let total = 0;
const chunks = [];
req.on('data', (chunk) => {
total += chunk.length;
if (total > limitBytes) {
reject(new Error('payload_too_large'));
req.destroy();
return;
}
chunks.push(chunk);
});
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
req.on('error', reject);
});
}
function proxyApi(req, res, apiReadToken) {
const upstreamBase = new URL(API_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const prefix = '/api';
const strippedPath = inUrl.pathname === prefix ? '/' : inUrl.pathname.startsWith(prefix + '/') ? inUrl.pathname.slice(prefix.length) : null;
if (strippedPath == null) {
send(res, 404, { 'content-type': 'text/plain; charset=utf-8' }, 'not_found');
return;
}
const target = new URL(upstreamBase.toString());
target.pathname = strippedPath || '/';
target.search = inUrl.search;
const isHttps = target.protocol === 'https:';
const lib = isHttps ? https : http;
const headers = stripHopByHopHeaders(req.headers);
delete headers.authorization; // basic auth from client must not leak upstream
headers.host = target.host;
headers.authorization = `Bearer ${apiReadToken}`;
const upstreamReq = lib.request(
{
protocol: target.protocol,
hostname: target.hostname,
port: target.port || (isHttps ? 443 : 80),
method: req.method,
path: target.pathname + target.search,
headers,
},
(upstreamRes) => {
const outHeaders = stripHopByHopHeaders(upstreamRes.headers);
res.writeHead(upstreamRes.statusCode || 502, outHeaders);
upstreamRes.pipe(res);
}
);
upstreamReq.on('error', (err) => {
if (!res.headersSent) {
send(res, 502, { 'content-type': 'text/plain; charset=utf-8' }, `bad_gateway: ${err?.message || err}`);
} else {
res.destroy();
}
});
req.pipe(upstreamReq);
}
function withCors(res) {
res.setHeader('access-control-allow-origin', GRAPHQL_CORS_ORIGIN);
res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS');
res.setHeader(
'access-control-allow-headers',
'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id, x-hasura-dlob-source'
);
}
function proxyGraphqlHttp(req, res) {
const upstreamBase = new URL(HASURA_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const target = new URL(upstreamBase.toString());
target.pathname = HASURA_GRAPHQL_PATH;
target.search = inUrl.search;
const isHttps = target.protocol === 'https:';
const lib = isHttps ? https : http;
const headers = stripHopByHopHeaders(req.headers);
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const upstreamReq = lib.request(
{
protocol: target.protocol,
hostname: target.hostname,
port: target.port || (isHttps ? 443 : 80),
method: req.method,
path: target.pathname + target.search,
headers,
},
(upstreamRes) => {
const outHeaders = stripHopByHopHeaders(upstreamRes.headers);
withCors(res);
res.writeHead(upstreamRes.statusCode || 502, outHeaders);
upstreamRes.pipe(res);
}
);
upstreamReq.on('error', (err) => {
if (!res.headersSent) {
withCors(res);
send(res, 502, { 'content-type': 'text/plain; charset=utf-8' }, `bad_gateway: ${err?.message || err}`);
} else {
res.destroy();
}
});
req.pipe(upstreamReq);
}
function isGraphqlPath(pathname) {
return pathname === '/graphql' || pathname === '/graphql-ws';
}
function proxyGraphqlWs(req, socket, head) {
const upstreamBase = new URL(HASURA_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const target = new URL(upstreamBase.toString());
target.pathname = HASURA_GRAPHQL_PATH;
target.search = inUrl.search;
const port = Number(target.port || (target.protocol === 'https:' ? 443 : 80));
const host = target.hostname;
const connect =
target.protocol === 'https:'
? () => tls.connect({ host, port, servername: host })
: () => net.connect({ host, port });
const upstream = connect();
upstream.setNoDelay(true);
socket.setNoDelay(true);
// For WebSocket upgrades we must forward `connection`/`upgrade` and related headers.
const headers = { ...req.headers };
delete headers['content-length'];
delete headers['content-type'];
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const lines = [];
lines.push(`GET ${target.pathname + target.search} HTTP/1.1`);
for (const [k, v] of Object.entries(headers)) {
if (v == null) continue;
if (Array.isArray(v)) {
for (const vv of v) lines.push(`${k}: ${vv}`);
} else {
lines.push(`${k}: ${v}`);
}
}
lines.push('', '');
upstream.write(lines.join('\r\n'));
if (head?.length) upstream.write(head);
upstream.on('error', () => {
try {
socket.destroy();
} catch {
// ignore
}
});
socket.on('error', () => {
try {
upstream.destroy();
} catch {
// ignore
}
});
upstream.pipe(socket);
socket.pipe(upstream);
}
async function handler(req, res) {
if (req.method === 'GET' && (req.url === '/healthz' || req.url?.startsWith('/healthz?'))) {
send(
res,
200,
{ 'content-type': 'application/json; charset=utf-8' },
JSON.stringify({ ok: true, version: APP_VERSION, buildTimestamp: BUILD_TIMESTAMP, startedAt: STARTED_AT })
);
return;
}
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
if (isGraphqlPath(url.pathname)) {
if (req.method === 'OPTIONS') {
withCors(res);
res.statusCode = 204;
res.end();
return;
}
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
withCors(res);
unauthorized(res);
return;
}
}
withCors(res);
proxyGraphqlHttp(req, res);
return;
}
if (req.method === 'GET' && url.pathname === '/whoami') {
sendJson(res, 200, { ok: true, user: resolveAuthenticatedUser(req), mode: AUTH_MODE });
return;
}
if (req.method === 'GET' && url.pathname === '/prefs/dlob-source') {
const set = url.searchParams.get('set');
if (!set) {
sendJson(res, 200, { ok: true, dlobSource: resolveDlobSource(req) });
return;
}
const ok = setDlobSourceCookie(res, { secure: isHttpsRequest(req), dlobSource: set });
if (!ok) {
sendJson(res, 400, { ok: false, error: 'invalid_dlob_source' });
return;
}
res.statusCode = 302;
res.setHeader('location', safeRedirectPath(url.searchParams.get('redirect') || '/'));
res.end();
return;
}
if (req.method === 'POST' && url.pathname === '/auth/login') {
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') {
sendJson(res, 400, { ok: false, error: 'auth_disabled' });
return;
}
const raw = await readBody(req);
const ct = String(req.headers['content-type'] || '').toLowerCase();
let username = '';
let password = '';
if (ct.includes('application/json')) {
let json;
try {
json = JSON.parse(raw);
} catch {
sendJson(res, 400, { ok: false, error: 'bad_json' });
return;
}
username = typeof json?.username === 'string' ? json.username.trim() : '';
password = typeof json?.password === 'string' ? json.password : '';
} else {
const params = new URLSearchParams(raw);
username = String(params.get('username') || '').trim();
password = String(params.get('password') || '');
}
if (!username || !password || username.length > 64 || password.length > 200) {
sendJson(res, 400, { ok: false, error: 'invalid_input' });
return;
}
const ok = verifyWithHtpasswd(username, password);
if (!ok) {
unauthorized(res);
return;
}
const secure = isHttpsRequest(req);
setSessionCookie(res, secure, username);
sendJson(res, 200, { ok: true, user: username });
return;
}
if ((req.method === 'POST' || req.method === 'GET') && (url.pathname === '/auth/logout' || url.pathname === '/logout')) {
clearSessionCookie(res, isHttpsRequest(req));
if (req.method === 'GET') {
res.statusCode = 302;
res.setHeader('location', '/');
res.end();
return;
}
sendJson(res, 200, { ok: true });
return;
}
if (BASIC_AUTH_ENABLED) {
let creds;
try {
creds = loadBasicAuth();
} catch (e) {
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
return;
}
if (!isAuthorized(req, creds)) {
basicAuthRequired(res);
return;
}
}
if (req.url?.startsWith('/api') && (req.url === '/api' || req.url.startsWith('/api/'))) {
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
unauthorized(res);
return;
}
}
let token;
try {
token = loadApiReadToken();
} catch (e) {
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
return;
}
proxyApi(req, res, token);
return;
}
serveStatic(req, res);
}
const server = http.createServer((req, res) => {
handler(req, res).catch((e) => {
if (res.headersSent) {
res.destroy();
return;
}
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
});
});
server.on('upgrade', (req, socket, head) => {
try {
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
if (!isGraphqlPath(url.pathname)) {
socket.destroy();
return;
}
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
try {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
} catch {
// ignore
}
socket.destroy();
return;
}
}
proxyGraphqlWs(req, socket, head);
} catch {
socket.destroy();
}
});
server.listen(PORT, () => {
console.log(
JSON.stringify(
{
service: 'trade-frontend',
port: PORT,
staticDir: STATIC_DIR,
apiUpstream: API_UPSTREAM,
hasuraUpstream: HASURA_UPSTREAM,
basicAuthFile: BASIC_AUTH_FILE,
basicAuthMode: BASIC_AUTH_MODE,
apiReadTokenFile: API_READ_TOKEN_FILE,
authUserHeader: AUTH_USER_HEADER,
authMode: AUTH_MODE,
htpasswdFile: HTPASSWD_FILE,
},
null,
2
)
);
});

View File

@@ -6,5 +6,18 @@ namespace: trade-prod
resources:
- ../../base
patchesStrategicMerge:
- dlob-rpc-endpoint-patch.yaml
- dlob-rpc-server-endpoint-patch.yaml
- frontend-graphql-proxy-patch.yaml
configMapGenerator:
- name: trade-frontend-server-script
files:
- frontend-server.mjs
generatorOptions:
disableNameSuffixHash: true
commonLabels:
env: prod

View File

@@ -66,7 +66,7 @@ function resolveConfig() {
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
const source = envString('TICKS_SOURCE', 'dlob_stats');

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-publisher
spec:
template:
spec:
containers:
- name: publisher
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-server
spec:
template:
spec:
containers:
- name: server
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -1,16 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker
spec:
template:
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: worker
env:
- name: DLOB_HTTP_URL
value: https://dlob.drift.trade
- name: DLOB_FORCE_IPV6
value: "true"

View File

@@ -37,6 +37,8 @@ const AUTH_SESSION_COOKIE = String(process.env.AUTH_SESSION_COOKIE || 'trade_ses
.trim()
.toLowerCase();
const AUTH_SESSION_TTL_SECONDS = Number.parseInt(process.env.AUTH_SESSION_TTL_SECONDS || '43200', 10); // 12h
const DLOB_SOURCE_COOKIE = String(process.env.DLOB_SOURCE_COOKIE || 'trade_dlob_source').trim() || 'trade_dlob_source';
const DLOB_SOURCE_DEFAULT = String(process.env.DLOB_SOURCE_DEFAULT || 'mevnode').trim() || 'mevnode';
function readJson(filePath) {
const raw = fs.readFileSync(filePath, 'utf8');
@@ -143,6 +145,29 @@ function safePathFromUrlPath(urlPath) {
return normalized;
}
function injectIndexHtml(html, { dlobSource, redirectPath }) {
const src = normalizeDlobSource(dlobSource) || 'mevnode';
const redirect = safeRedirectPath(redirectPath);
const hrefBase = `/prefs/dlob-source?redirect=${encodeURIComponent(redirect)}&set=`;
const styleActive = 'font-weight:700;text-decoration:underline;';
const styleInactive = 'font-weight:400;text-decoration:none;';
const snippet = `
<!-- trade: dlob source switch -->
<div style="position:fixed;right:12px;bottom:12px;z-index:2147483647;background:rgba(0,0,0,0.72);color:#fff;padding:8px 10px;border-radius:10px;font:12px/1.2 system-ui,-apple-system,Segoe UI,Roboto,sans-serif;backdrop-filter:blur(6px);">
<span style="opacity:0.85;margin-right:6px;">DLOB</span>
<a href="${hrefBase}mevnode" style="color:#fff;${src === 'mevnode' ? styleActive : styleInactive}">mevnode</a>
<span style="opacity:0.6;margin:0 6px;">|</span>
<a href="${hrefBase}drift" style="color:#fff;${src === 'drift' ? styleActive : styleInactive}">drift</a>
</div>
`;
const bodyClose = /<\/body>/i;
if (bodyClose.test(html)) return html.replace(bodyClose, `${snippet}</body>`);
return `${html}\n${snippet}\n`;
}
function serveStatic(req, res) {
if (req.method !== 'GET' && req.method !== 'HEAD') {
send(res, 405, { 'content-type': 'text/plain; charset=utf-8' }, 'method_not_allowed');
@@ -171,6 +196,15 @@ function serveStatic(req, res) {
res.setHeader('content-type', contentTypeFor(filePath));
res.setHeader('cache-control', filePath.endsWith('index.html') ? 'no-cache' : 'public, max-age=31536000');
if (req.method === 'HEAD') return void res.end();
if (filePath.endsWith('index.html')) {
const html = fs.readFileSync(filePath, 'utf8');
const injected = injectIndexHtml(html, {
dlobSource: resolveDlobSource(req),
redirectPath: url.pathname + url.search,
});
res.end(injected);
return true;
}
fs.createReadStream(filePath).pipe(res);
return true;
} catch {
@@ -222,6 +256,43 @@ function readCookie(req, name) {
return null;
}
function normalizeDlobSource(value) {
const v = String(value ?? '')
.trim()
.toLowerCase();
if (v === 'mevnode') return 'mevnode';
if (v === 'drift') return 'drift';
return null;
}
function resolveDlobSource(req) {
const fromCookie = normalizeDlobSource(readCookie(req, DLOB_SOURCE_COOKIE));
if (fromCookie) return fromCookie;
return normalizeDlobSource(DLOB_SOURCE_DEFAULT) || 'mevnode';
}
function safeRedirectPath(value) {
const s = String(value ?? '').trim();
if (!s.startsWith('/')) return '/';
if (s.startsWith('//')) return '/';
return s.replace(/\r|\n/g, '');
}
function setDlobSourceCookie(res, { secure, dlobSource }) {
const src = normalizeDlobSource(dlobSource);
if (!src) return false;
const parts = [
`${DLOB_SOURCE_COOKIE}=${src}`,
'Path=/',
'SameSite=Lax',
'HttpOnly',
'Max-Age=31536000',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
return true;
}
function resolveAuthUser(req) {
const user = readHeader(req, AUTH_USER_HEADER) || readHeader(req, 'x-webauth-user');
const value = typeof user === 'string' ? user.trim() : '';
@@ -423,7 +494,7 @@ function withCors(res) {
res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS');
res.setHeader(
'access-control-allow-headers',
'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id'
'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id, x-hasura-dlob-source'
);
}
@@ -440,6 +511,8 @@ function proxyGraphqlHttp(req, res) {
const headers = stripHopByHopHeaders(req.headers);
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const upstreamReq = lib.request(
{
@@ -499,6 +572,8 @@ function proxyGraphqlWs(req, socket, head) {
delete headers['content-length'];
delete headers['content-type'];
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const lines = [];
lines.push(`GET ${target.pathname + target.search} HTTP/1.1`);
@@ -570,6 +645,25 @@ async function handler(req, res) {
return;
}
if (req.method === 'GET' && url.pathname === '/prefs/dlob-source') {
const set = url.searchParams.get('set');
if (!set) {
sendJson(res, 200, { ok: true, dlobSource: resolveDlobSource(req) });
return;
}
const ok = setDlobSourceCookie(res, { secure: isHttpsRequest(req), dlobSource: set });
if (!ok) {
sendJson(res, 400, { ok: false, error: 'invalid_dlob_source' });
return;
}
res.statusCode = 302;
res.setHeader('location', safeRedirectPath(url.searchParams.get('redirect') || '/'));
res.end();
return;
}
if (req.method === 'POST' && url.pathname === '/auth/login') {
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') {
sendJson(res, 400, { ok: false, error: 'auth_disabled' });

View File

@@ -18,7 +18,7 @@ spec:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: TICKS_POLL_MS
value: "1000"
- name: TICKS_SOURCE

View File

@@ -10,11 +10,12 @@ resources:
- frontend-ingress-root.yaml
patchesStrategicMerge:
- dlob-rpc-endpoint-patch.yaml
- dlob-rpc-server-endpoint-patch.yaml
- hasura-patch.yaml
- frontend-auth-patch.yaml
- frontend-graphql-proxy-patch.yaml
- ingestor-dlob-patch.yaml
- dlob-worker-patch.yaml
configMapGenerator:
- name: trade-dlob-ingestor-script