Compare commits

...

43 Commits

Author SHA1 Message Date
mpabi
59c3f3ee06 feat(dlob): pin markets and wire mevnode endpoints
- Limit DLOB workers/ingestors to SOL-PERP, DOGE-PERP, JUP-PERP across base and staging config.
- Set publisher market ids to [0,7,24] for drift protocol.
- Add overlay patches for dlob-publisher and dlob-server to use wg0 RPC endpoints 10.66.66.1:8899/8900 in staging and prod.
- Extend Agave dashboard and add PrometheusRules for RPC up/lag/I/O alerts.
- Ensure overlays reference new patches for automated ArgoCD rollouts.
2026-02-15 00:40:50 +01:00
mpabi
9c4c3096d7 chore(grafana): set default time range to now 2026-02-14 14:08:11 +01:00
mpabi
b02bd6b66c feat(grafana): expand agave dashboard with node + geyser stats 2026-02-14 14:01:34 +01:00
mpabi
890ac4c86e feat(monitoring): add mpabi node-exporter scrape and agave dashboard 2026-02-14 13:43:26 +01:00
u1
9c6e974d3a fix(monitoring): add fallback_scrape_protocol for geyser metrics (missing Content-Type) 2026-02-14 11:52:50 +00:00
u1
d1dc32d3bb chore(monitoring): delete traefik basic auth secret 2026-02-14 11:28:40 +00:00
u1
a6a0accd6a chore(monitoring): remove basic auth middleware from prometheus ingressroute 2026-02-14 11:28:39 +00:00
u1
3b8b1f5492 chore(monitoring): delete traefik basic auth middleware 2026-02-14 11:28:38 +00:00
u1
0c80a08732 chore(monitoring): remove basic auth middleware from grafana ingressroute 2026-02-14 11:28:38 +00:00
u1
8b72e62621 chore(monitoring): remove traefik basic auth from monitoring extras 2026-02-14 11:28:37 +00:00
u1
ff7a4b69cd fix(monitoring): disable prometheus-operator admission webhooks for GitOps stability 2026-02-14 11:04:31 +00:00
u1
e7f9594381 fix(monitoring): use Replace sync option to avoid CRD patch annotation size issues 2026-02-14 11:01:03 +00:00
u1
0c0f219d02 fix(monitoring): skip CRDs in ArgoCD helm rendering 2026-02-14 11:00:04 +00:00
u1
9cf0ed84d9 fix(monitoring): disable CRD management in ArgoCD (avoid annotation size limit) 2026-02-14 10:59:32 +00:00
u1
2702edce22 fix(monitoring): keep monitoring-stack as helm release name (avoid duplicate install) 2026-02-14 10:58:11 +00:00
u1
c33533fcd6 fix(monitoring): route prometheus-http to monitoring-stack service 2026-02-14 10:58:10 +00:00
u1
32eb047551 fix(monitoring): route prometheus to monitoring-stack service 2026-02-14 10:58:09 +00:00
u1
d27d64e407 fix(monitoring): route grafana-http to monitoring-stack service 2026-02-14 10:58:08 +00:00
u1
fa6893aa98 fix(monitoring): route grafana to monitoring-stack service 2026-02-14 10:58:08 +00:00
u1
77a8265b40 fix(monitoring): set helm releaseName to reuse existing kube-prometheus-stack resources 2026-02-14 10:55:36 +00:00
u1
c692f8d653 fix(monitoring): use server-side apply to avoid CRD annotation size limit 2026-02-14 10:54:48 +00:00
u1
47096c9877 feat(monitoring): add ArgoCD app for monitoring ingress/certs/auth 2026-02-14 10:53:55 +00:00
u1
0104532e73 feat(monitoring): add ArgoCD app for kube-prometheus-stack 2026-02-14 10:53:55 +00:00
u1
19e7e48190 feat(monitoring): add prometheus ingressroute http->https redirect 2026-02-14 10:53:39 +00:00
u1
7ef3ffe62c feat(monitoring): add prometheus ingressroute (https + basic auth) 2026-02-14 10:53:38 +00:00
u1
1853ef6452 feat(monitoring): add grafana ingressroute http->https redirect 2026-02-14 10:53:38 +00:00
u1
34ef9490a4 feat(monitoring): add grafana ingressroute (https + basic auth) 2026-02-14 10:53:37 +00:00
u1
f3bc3da9bb feat(monitoring): add TLS certificate for grafana/prometheus 2026-02-14 10:53:35 +00:00
u1
f797234abd feat(monitoring): add traefik redirect-to-https middleware 2026-02-14 10:53:35 +00:00
u1
c95a4286fb feat(monitoring): add basic auth secret for traefik 2026-02-14 10:53:34 +00:00
u1
b72f281651 feat(monitoring): add traefik basic auth middleware 2026-02-14 10:53:34 +00:00
u1
98912c5b03 feat(monitoring): add kustomize scaffold for monitoring extras 2026-02-14 10:53:05 +00:00
u1
28876fa1d2 fix(dlob): relax dlob-publisher probes 2026-02-13 12:04:13 +01:00
u1
5f46d26037 feat(frontend): per-user DLOB source header
- Add cookie-based source selector (mevnode|drift)\n- Proxy sets x-hasura-dlob-source for HTTP + WS\n- Include same proxy script mount in prod overlay
2026-02-13 11:33:13 +01:00
u1
57433c7e75 feat(dlob): support two sources + per-user switch
- Add "source" column + composite PKs for DLOB tables\n- Filter public Hasura selects by X-Hasura-Dlob-Source\n- Run parallel workers for mevnode + dlob.drift.trade\n- Frontend proxy sets x-hasura-dlob-source from cookie and injects UI switch
2026-02-13 11:25:32 +01:00
u1
9e7d7b88ac feat(candles): support oracle OHLC basis 2026-02-03 12:51:29 +01:00
u1
bd88eaa3c8 fix(api): keep candle open continuity 2026-02-03 10:27:54 +01:00
u1
cd4cbec7e0 fix(api): fill-forward missing candle buckets 2026-02-02 23:14:18 +01:00
u1
144f6e7c86 fix(db): fill-forward candles buckets 2026-02-02 23:05:04 +01:00
u1
ef8f7cbeaa fix(candles): serve chart from cache and stabilize any-source 2026-02-02 22:28:30 +01:00
codex
507da3165f chore(frontend): bump image sha-b06fe7f 2026-02-01 22:00:55 +01:00
codex
f41792a0fa fix(candles-cache): cap per-tf window to 1024 2026-02-01 19:07:49 +01:00
codex
e16a02453e fix(staging): hoist ensurePublicSelectTable in hasura bootstrap 2026-02-01 18:49:00 +01:00
42 changed files with 2347 additions and 118 deletions

View File

@@ -0,0 +1,20 @@
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: monitoring-extras
namespace: argocd
spec:
project: default
source:
repoURL: https://gitea.mpabi.pl/trade/trade-deploy.git
targetRevision: main
path: kustomize/infra/monitoring-extras
destination:
server: https://kubernetes.default.svc
namespace: monitoring
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true

View File

@@ -0,0 +1,50 @@
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: monitoring-stack
namespace: argocd
spec:
project: default
source:
repoURL: https://prometheus-community.github.io/helm-charts
chart: kube-prometheus-stack
targetRevision: 81.6.9
helm:
skipCrds: true
values: |
grafana:
enabled: true
prometheus:
prometheusSpec:
scrapeInterval: 15s
evaluationInterval: 15s
additionalScrapeConfigs:
- job_name: mpabi-yellowstone-geyser
metrics_path: /metrics
scrape_interval: 10s
fallback_scrape_protocol: PrometheusText0.0.4
static_configs:
- targets:
- 10.66.66.1:8999
- job_name: mpabi-node-exporter
metrics_path: /metrics
scrape_interval: 15s
static_configs:
- targets:
- 10.66.66.1:9100
prometheusOperator:
admissionWebhooks:
enabled: false
patch:
enabled: false
destination:
server: https://kubernetes.default.svc
namespace: monitoring
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
- Replace=true
- ServerSideApply=true

View File

@@ -755,6 +755,70 @@ function parseTimeframeToSeconds(tf) {
return num * mult; return num * mult;
} }
function fillForwardCandles(candles, { bucketSeconds, limit, nowSec }) {
if (!Array.isArray(candles) || candles.length === 0) return [];
if (!Number.isFinite(bucketSeconds) || bucketSeconds <= 0) return candles;
if (!Number.isFinite(limit) || limit <= 0) return candles;
// `candles` should be ascending by time.
const cleaned = candles
.filter((c) => c && Number.isFinite(c.time) && Number.isFinite(c.close))
.slice()
.sort((a, b) => a.time - b.time);
if (cleaned.length === 0) return [];
const map = new Map(cleaned.map((c) => [c.time, c]));
const lastDataTime = cleaned[cleaned.length - 1].time;
const now = Number.isFinite(nowSec) ? nowSec : Math.floor(Date.now() / 1000);
const alignedNow = Math.floor(now / bucketSeconds) * bucketSeconds;
const endTime = Math.max(alignedNow, lastDataTime);
const startTime = endTime - bucketSeconds * (limit - 1);
const baseline = cleaned[0];
const out = [];
out.length = limit;
let prev = null;
for (let i = 0; i < limit; i += 1) {
const t = startTime + i * bucketSeconds;
const hit = map.get(t);
if (hit) {
const c = { ...hit };
c.volume = Number.isFinite(c.volume) ? c.volume : 0;
// Keep continuity: next candle opens where previous candle closed.
// This avoids visual "gaps" when ticks are sparse.
if (prev && Number.isFinite(prev.close)) {
const prevClose = Number(prev.close);
c.open = prevClose;
c.high = Math.max(Number(c.high), prevClose, Number(c.close));
c.low = Math.min(Number(c.low), prevClose, Number(c.close));
}
out[i] = c;
prev = c;
continue;
}
const base = prev || baseline;
const close = Number(base.close);
const oracle = base.oracle == null ? null : Number(base.oracle);
const filled = {
time: t,
open: close,
high: close,
low: close,
close,
volume: 0,
oracle: Number.isFinite(oracle) ? oracle : null,
};
out[i] = filled;
prev = filled;
}
return out.filter((c) => c && Number.isFinite(c.time) && Number.isFinite(c.open) && Number.isFinite(c.close));
}
function pickFlowPointBucketSeconds(bucketSeconds, rowsPerCandle) { function pickFlowPointBucketSeconds(bucketSeconds, rowsPerCandle) {
// We want a point step that is: // We want a point step that is:
// - small enough to capture intra-candle direction, // - small enough to capture intra-candle direction,
@@ -942,6 +1006,7 @@ async function handler(cfg, req, res) {
const symbol = (url.searchParams.get('symbol') || '').trim(); const symbol = (url.searchParams.get('symbol') || '').trim();
const source = (url.searchParams.get('source') || '').trim(); const source = (url.searchParams.get('source') || '').trim();
const basisRaw = (url.searchParams.get('basis') || '').trim().toLowerCase();
const tf = (url.searchParams.get('tf') || url.searchParams.get('timeframe') || '1m').trim(); const tf = (url.searchParams.get('tf') || url.searchParams.get('timeframe') || '1m').trim();
const limit = clampInt(url.searchParams.get('limit') || '300', 10, 2000); const limit = clampInt(url.searchParams.get('limit') || '300', 10, 2000);
@@ -959,6 +1024,11 @@ async function handler(cfg, req, res) {
} }
const sourceKey = source || ''; const sourceKey = source || '';
const basis = basisRaw === 'mark' ? 'mark' : basisRaw === 'oracle' || !basisRaw ? 'oracle' : null;
if (!basis) {
sendJson(res, 400, { ok: false, error: 'invalid_basis' }, cfg.corsOrigin);
return;
}
try { try {
// Cache-first: read precomputed candles from `drift_candles_cache`. // Cache-first: read precomputed candles from `drift_candles_cache`.
@@ -975,6 +1045,9 @@ async function handler(cfg, req, res) {
high high
low low
close close
oracle_open
oracle_high
oracle_low
oracle_close oracle_close
ticks ticks
} }
@@ -1000,6 +1073,9 @@ async function handler(cfg, req, res) {
high high
low low
close close
oracle_open
oracle_high
oracle_low
oracle_close oracle_close
ticks ticks
} }
@@ -1014,24 +1090,41 @@ async function handler(cfg, req, res) {
rows = data?.[fn] || []; rows = data?.[fn] || [];
} }
const candles = rows const nowSec = Math.floor(Date.now() / 1000);
let candles = rows
.slice() .slice()
.reverse() .reverse()
.map((r) => { .map((r) => {
const time = Math.floor(Date.parse(r.bucket) / 1000); const time = Math.floor(Date.parse(r.bucket) / 1000);
const open = Number(r.open); const oracleClose = r.oracle_close == null ? null : Number(r.oracle_close);
const high = Number(r.high); const oracleOpen = r.oracle_open == null ? oracleClose : Number(r.oracle_open);
const low = Number(r.low); const oracleHigh = r.oracle_high == null ? oracleClose : Number(r.oracle_high);
const close = Number(r.close); const oracleLow = r.oracle_low == null ? oracleClose : Number(r.oracle_low);
const oracle = r.oracle_close == null ? null : Number(r.oracle_close);
const markOpen = Number(r.open);
const markHigh = Number(r.high);
const markLow = Number(r.low);
const markClose = Number(r.close);
const open = basis === 'oracle' ? oracleOpen : markOpen;
const high = basis === 'oracle' ? oracleHigh : markHigh;
const low = basis === 'oracle' ? oracleLow : markLow;
const close = basis === 'oracle' ? oracleClose : markClose;
// Always expose oracle close (even if basis=mark).
const oracle = oracleClose;
const volume = Number(r.ticks || 0); const volume = Number(r.ticks || 0);
return { time, open, high, low, close, volume, oracle }; return { time, open, high, low, close, volume, oracle };
}) })
.filter((c) => Number.isFinite(c.time) && Number.isFinite(c.open) && Number.isFinite(c.close)); .filter((c) => Number.isFinite(c.time) && Number.isFinite(c.open) && Number.isFinite(c.close));
// Make candles continuous in time: if no tick happened in a bucket, emit a flat candle using last close.
// This keeps the chart stable for 1s/3s/... views and makes timeframe switching instant (cache + no gaps).
candles = fillForwardCandles(candles, { bucketSeconds, limit, nowSec });
// Flow = share of time spent moving up/down/flat inside each bucket. // Flow = share of time spent moving up/down/flat inside each bucket.
// Used by the UI to render stacked volume bars describing microstructure. // Used by the UI to render stacked volume bars describing microstructure.
const nowSec = Math.floor(Date.now() / 1000);
const windowSeconds = bucketSeconds * candles.length; const windowSeconds = bucketSeconds * candles.length;
const canComputeFlow = candles.length > 0 && windowSeconds > 0 && windowSeconds <= 86_400; // cap at 24h const canComputeFlow = candles.length > 0 && windowSeconds > 0 && windowSeconds <= 86_400; // cap at 24h
const rowsPerCandle = Math.min(60, Math.max(12, Math.floor(bucketSeconds))); const rowsPerCandle = Math.min(60, Math.max(12, Math.floor(bucketSeconds)));
@@ -1062,6 +1155,7 @@ async function handler(cfg, req, res) {
) { ) {
bucket bucket
close close
oracle_close
} }
} }
`; `;
@@ -1082,7 +1176,10 @@ async function handler(cfg, req, res) {
for (const r of ptsRows) { for (const r of ptsRows) {
const t = tsToUnixSeconds(r.bucket); const t = tsToUnixSeconds(r.bucket);
if (t == null) continue; if (t == null) continue;
const p = parseNumeric(r.close); const p =
basis === 'oracle'
? parseNumeric(r.oracle_close) ?? parseNumeric(r.close)
: parseNumeric(r.close) ?? parseNumeric(r.oracle_close);
if (p == null) continue; if (p == null) continue;
const idx = Math.floor((t - firstStart) / bucketSeconds); const idx = Math.floor((t - firstStart) / bucketSeconds);
const start = firstStart + idx * bucketSeconds; const start = firstStart + idx * bucketSeconds;
@@ -1153,6 +1250,7 @@ async function handler(cfg, req, res) {
candlesFunction: cfg.candlesFunction, candlesFunction: cfg.candlesFunction,
symbol, symbol,
source: source || null, source: source || null,
basis,
tf, tf,
bucketSeconds, bucketSeconds,
candles, candles,

View File

@@ -144,7 +144,14 @@ function chunkSecondsForBucket(bucketSeconds) {
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) { function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
return ` return `
WITH base AS ( WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol = ${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
),
base AS (
SELECT SELECT
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket, time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
ts, ts,
@@ -154,7 +161,10 @@ function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso,
WHERE symbol = ${sqlLit(symbol)} WHERE symbol = ${sqlLit(symbol)}
AND ts >= ${sqlLit(fromIso)}::timestamptz AND ts >= ${sqlLit(fromIso)}::timestamptz
AND ts < ${sqlLit(toIso)}::timestamptz AND ts < ${sqlLit(toIso)}::timestamptz
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}) AND (
${sqlLit(sourceKey)} <> '' AND source = ${sqlLit(sourceKey)}
OR ${sqlLit(sourceKey)} = '' AND source = COALESCE((SELECT chosen_source FROM chosen), source)
)
), ),
agg AS ( agg AS (
SELECT SELECT
@@ -163,33 +173,92 @@ function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso,
max(px) AS high, max(px) AS high,
min(px) AS low, min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close, (array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open,
max(oracle_px) AS oracle_high,
min(oracle_px) AS oracle_low,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*)::bigint AS ticks count(*)::bigint AS ticks
FROM base FROM base
GROUP BY bucket GROUP BY bucket
) )
INSERT INTO public.drift_candles_cache INSERT INTO public.drift_candles_cache
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at) (
bucket,
bucket_seconds,
symbol,
source,
open,
high,
low,
close,
oracle_open,
oracle_high,
oracle_low,
oracle_close,
ticks,
updated_at
)
SELECT SELECT
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now() bucket,
${bucketSeconds},
${sqlLit(symbol)},
${sqlLit(sourceKey)},
open,
high,
low,
close,
oracle_open,
oracle_high,
oracle_low,
oracle_close,
ticks,
now()
FROM agg FROM agg
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
open = EXCLUDED.open, open = EXCLUDED.open,
high = EXCLUDED.high, high = EXCLUDED.high,
low = EXCLUDED.low, low = EXCLUDED.low,
close = EXCLUDED.close, close = EXCLUDED.close,
oracle_open = EXCLUDED.oracle_open,
oracle_high = EXCLUDED.oracle_high,
oracle_low = EXCLUDED.oracle_low,
oracle_close = EXCLUDED.oracle_close, oracle_close = EXCLUDED.oracle_close,
ticks = EXCLUDED.ticks, ticks = EXCLUDED.ticks,
updated_at = now(); updated_at = now();
`; `;
} }
function sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds, cutoffIso }) {
return `
DELETE FROM public.drift_candles_cache
WHERE symbol = ${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)}
AND bucket_seconds = ${bucketSeconds}
AND bucket < ${sqlLit(cutoffIso)}::timestamptz;
`;
}
async function getTickRange(cfg, { symbol, sourceKey }) { async function getTickRange(cfg, { symbol, sourceKey }) {
const sql = ` const sql =
String(sourceKey) === ''
? `
WITH chosen AS (
SELECT source AS chosen_source
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
ORDER BY ts DESC
LIMIT 1
)
SELECT min(ts) AS min_ts, max(ts) AS max_ts SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)} WHERE symbol=${sqlLit(symbol)}
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)}); AND source = COALESCE((SELECT chosen_source FROM chosen), source);
`
: `
SELECT min(ts) AS min_ts, max(ts) AS max_ts
FROM public.drift_ticks
WHERE symbol=${sqlLit(symbol)}
AND source = ${sqlLit(sourceKey)};
`; `;
const out = await hasuraRunSql(cfg, sql, { readOnly: true }); const out = await hasuraRunSql(cfg, sql, { readOnly: true });
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null; const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
@@ -219,14 +288,12 @@ function safetyWindowSeconds(bucketSeconds) {
return 2 * 24 * 60 * 60; return 2 * 24 * 60 * 60;
} }
async function backfill(cfg, { symbol, sourceKey, fromIso, toIso }) { async function backfillBucket(cfg, { symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
for (const bs of cfg.bucketSecondsList) { const chunk = chunkSecondsForBucket(bucketSeconds);
const chunk = chunkSecondsForBucket(bs);
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) { for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
const a = new Date(t).toISOString(); const a = new Date(t).toISOString();
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString(); const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: a, toIso: b })); await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso: a, toIso: b }));
}
} }
} }
@@ -270,22 +337,27 @@ async function main() {
const range = await getTickRange(cfg, { symbol, sourceKey }); const range = await getTickRange(cfg, { symbol, sourceKey });
if (!range.maxTs) continue; if (!range.maxTs) continue;
const toIso = new Date(Date.parse(range.maxTs)).toISOString(); const toIso = new Date(Date.parse(range.maxTs)).toISOString();
const maxBs = cfg.bucketSecondsList[cfg.bucketSecondsList.length - 1] || 60; for (const bs of cfg.bucketSecondsList) {
const fromIso = desiredFromIso({ const fromIso = desiredFromIso({
minTsIso: range.minTs, minTsIso: range.minTs,
maxTsIso: toIso, maxTsIso: toIso,
bucketSeconds: maxBs, bucketSeconds: bs,
targetPoints: cfg.targetPoints, targetPoints: cfg.targetPoints,
backfillDays: cfg.backfillDays, backfillDays: cfg.backfillDays,
}); });
console.log( console.log(
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} from=${fromIso} to=${toIso} points=${cfg.targetPoints}` `[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} bs=${bs}s from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
); );
try { try {
await backfill(cfg, { symbol, sourceKey, fromIso, toIso }); await backfillBucket(cfg, { symbol, sourceKey, bucketSeconds: bs, fromIso, toIso });
// Enforce max window for this bucket (derived data; safe to prune).
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso: fromIso }));
} catch (err) { } catch (err) {
console.error(`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}): ${String(err?.message || err)}`); console.error(
`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
);
}
} }
} }
} }
@@ -309,6 +381,10 @@ async function main() {
while (true) { while (true) {
const loopNow = Date.now(); const loopNow = Date.now();
const loopIso = new Date(loopNow).toISOString(); const loopIso = new Date(loopNow).toISOString();
const pruneEveryMs = 60_000;
const lastPruneAt = last.__pruneAt || new Map();
// stash on the Map to keep closure-local without introducing a new outer var
last.__pruneAt = lastPruneAt;
for (const symbol of cfg.symbols) { for (const symbol of cfg.symbols) {
for (const sourceKey of cfg.sources) { for (const sourceKey of cfg.sources) {
@@ -325,6 +401,17 @@ async function main() {
); );
// best-effort: move last pointer close to now (actual max will lag by at most one bucket) // best-effort: move last pointer close to now (actual max will lag by at most one bucket)
last.set(k, loopIso); last.set(k, loopIso);
const prevPrune = lastPruneAt.get(k) || 0;
if (loopNow - prevPrune >= pruneEveryMs) {
const keepSeconds = Math.max(cfg.targetPoints * bs, (cfg.backfillDays > 0 ? cfg.backfillDays * 86400 : 0));
const cutoffIso = new Date(Math.max(0, loopNow - keepSeconds * 1000)).toISOString();
try {
await hasuraRunSql(cfg, sqlDeleteOlderCandles({ symbol, sourceKey, bucketSeconds: bs, cutoffIso }));
} finally {
lastPruneAt.set(k, loopNow);
}
}
} catch (err) { } catch (err) {
console.error( console.error(
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}` `[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`

View File

@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-depth-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-depth-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-depth-worker-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_DEPTH_BPS_BANDS
value: "5,10,20,50,100,200"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-depth-worker-script

View File

@@ -26,8 +26,10 @@ spec:
secretKeyRef: secretKeyRef:
name: trade-hasura name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS - name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS - name: DLOB_POLL_MS
value: "1000" value: "1000"
- name: DLOB_DEPTH_BPS_BANDS - name: DLOB_DEPTH_BPS_BANDS

View File

@@ -64,7 +64,8 @@ function resolveConfig() {
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined; const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200'); const bandsBps = envIntList('DLOB_DEPTH_BPS_BANDS', '5,10,20,50,100,200');
@@ -79,6 +80,7 @@ function resolveConfig() {
hasuraUrl, hasuraUrl,
hasuraAdminSecret, hasuraAdminSecret,
hasuraAuthToken, hasuraAuthToken,
dlobSource,
markets, markets,
pollMs, pollMs,
bandsBps, bandsBps,
@@ -169,8 +171,9 @@ function computeBandDepth({ bids, asks, mid, bandBps }) {
async function fetchL2Latest(cfg) { async function fetchL2Latest(cfg) {
const query = ` const query = `
query DlobL2Latest($markets: [String!]!) { query DlobL2Latest($source: String!, $markets: [String!]!) {
dlob_l2_latest(where: {market_name: {_in: $markets}}) { dlob_l2_latest(where: {source: {_eq: $source}, market_name: {_in: $markets}}) {
source
market_name market_name
market_type market_type
market_index market_index
@@ -186,7 +189,7 @@ async function fetchL2Latest(cfg) {
} }
} }
`; `;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
} }
@@ -232,6 +235,7 @@ async function main() {
startedAt: getIsoNow(), startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl, hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets, markets: cfg.markets,
pollMs: cfg.pollMs, pollMs: cfg.pollMs,
bandsBps: cfg.bandsBps, bandsBps: cfg.bandsBps,
@@ -268,6 +272,7 @@ async function main() {
for (const bandBps of cfg.bandsBps) { for (const bandBps of cfg.bandsBps) {
const d = computeBandDepth({ bids, asks, mid, bandBps }); const d = computeBandDepth({ bids, asks, mid, bandBps });
rows.push({ rows.push({
source: cfg.dlobSource,
market_name: market, market_name: market,
band_bps: bandBps, band_bps: bandBps,
market_type: l2.market_type ? String(l2.market_type) : 'perp', market_type: l2.market_type ? String(l2.market_type) : 'perp',

View File

@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-slippage-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-slippage-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-slippage-worker-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD
value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
- name: PRICE_PRECISION
value: "1000000"
- name: BASE_PRECISION
value: "1000000000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-slippage-worker-script

View File

@@ -26,8 +26,10 @@ spec:
secretKeyRef: secretKeyRef:
name: trade-hasura name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS - name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS - name: DLOB_POLL_MS
value: "1000" value: "1000"
- name: DLOB_SLIPPAGE_SIZES_USD - name: DLOB_SLIPPAGE_SIZES_USD

View File

@@ -55,7 +55,8 @@ function resolveConfig() {
tokens.hasuraAdminSecret; tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000); const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000') const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
@@ -77,6 +78,7 @@ function resolveConfig() {
hasuraUrl, hasuraUrl,
hasuraAdminSecret, hasuraAdminSecret,
hasuraAuthToken, hasuraAuthToken,
dlobSource,
markets, markets,
pollMs, pollMs,
sizesUsd, sizesUsd,
@@ -209,6 +211,7 @@ async function main() {
startedAt: getIsoNow(), startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl, hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets, markets: cfg.markets,
pollMs: cfg.pollMs, pollMs: cfg.pollMs,
sizesUsd: cfg.sizesUsd, sizesUsd: cfg.sizesUsd,
@@ -226,8 +229,9 @@ async function main() {
try { try {
const query = ` const query = `
query DlobL2Latest($markets: [String!]!) { query DlobL2Latest($source: String!, $markets: [String!]!) {
dlob_l2_latest(where: { market_name: { _in: $markets } }) { dlob_l2_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
source
market_name market_name
market_type market_type
market_index market_index
@@ -242,7 +246,7 @@ async function main() {
} }
`; `;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : []; const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
const objectsV1 = []; const objectsV1 = [];
@@ -277,6 +281,7 @@ async function main() {
{ {
const sim = simulateFill(asks, sizeUsd); const sim = simulateFill(asks, sizeUsd);
const baseObj = { const baseObj = {
source: cfg.dlobSource,
market_name: market, market_name: market,
side: 'buy', side: 'buy',
market_type: row?.market_type ?? 'perp', market_type: row?.market_type ?? 'perp',
@@ -302,6 +307,7 @@ async function main() {
{ {
const sim = simulateFill(bids, sizeUsd); const sim = simulateFill(bids, sizeUsd);
const baseObj = { const baseObj = {
source: cfg.dlobSource,
market_name: market, market_name: market,
side: 'sell', side: 'sell',
market_type: row?.market_type ?? 'perp', market_type: row?.market_type ?? 'perp',

View File

@@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-ts-archiver-drift
annotations:
argocd.argoproj.io/sync-wave: "6"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-ts-archiver-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-ts-archiver-drift
spec:
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_TS_POLL_MS
value: "1000"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-ts-archiver-script

View File

@@ -26,8 +26,10 @@ spec:
secretKeyRef: secretKeyRef:
name: trade-hasura name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_MARKETS - name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_TS_POLL_MS - name: DLOB_TS_POLL_MS
value: "1000" value: "1000"
command: ["node", "/app/worker.mjs"] command: ["node", "/app/worker.mjs"]

View File

@@ -49,10 +49,11 @@ function resolveConfig() {
tokens.hasuraAdminSecret; tokens.hasuraAdminSecret;
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined; const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000); const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs }; return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, dlobSource, markets, pollMs };
} }
async function graphqlRequest(cfg, query, variables) { async function graphqlRequest(cfg, query, variables) {
@@ -97,6 +98,7 @@ async function main() {
startedAt: getIsoNow(), startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl, hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
markets: cfg.markets, markets: cfg.markets,
pollMs: cfg.pollMs, pollMs: cfg.pollMs,
}, },
@@ -110,24 +112,24 @@ async function main() {
try { try {
const query = ` const query = `
query Latest($markets: [String!]!) { query Latest($source: String!, $markets: [String!]!) {
dlob_stats_latest(where: { market_name: { _in: $markets } }) { dlob_stats_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name market_type market_index ts slot market_name market_type market_index ts slot
mark_price oracle_price best_bid_price best_ask_price mid_price mark_price oracle_price best_bid_price best_ask_price mid_price
spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance spread_abs spread_bps depth_levels depth_bid_base depth_ask_base depth_bid_usd depth_ask_usd imbalance
raw raw
} }
dlob_depth_bps_latest(where: { market_name: { _in: $markets } }) { dlob_depth_bps_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name band_bps market_type market_index ts slot market_name band_bps market_type market_index ts slot
mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance mid_price best_bid_price best_ask_price bid_base ask_base bid_usd ask_usd imbalance
raw raw
} }
dlob_slippage_latest(where: { market_name: { _in: $markets } }) { dlob_slippage_latest(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw raw
} }
dlob_slippage_latest_v2(where: { market_name: { _in: $markets } }) { dlob_slippage_latest_v2(where: { source: { _eq: $source }, market_name: { _in: $markets } }) {
market_name side size_usd market_type market_index ts slot market_name side size_usd market_type market_index ts slot
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
raw raw
@@ -135,10 +137,11 @@ async function main() {
} }
`; `;
const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); const data = await graphqlRequest(cfg, query, { source: cfg.dlobSource, markets: cfg.markets });
const statsRows = (data?.dlob_stats_latest || []).map((r) => ({ const statsRows = (data?.dlob_stats_latest || []).map((r) => ({
ts: now, ts: now,
source: cfg.dlobSource,
market_name: r.market_name, market_name: r.market_name,
market_type: r.market_type, market_type: r.market_type,
market_index: r.market_index ?? null, market_index: r.market_index ?? null,
@@ -162,6 +165,7 @@ async function main() {
const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({ const depthRows = (data?.dlob_depth_bps_latest || []).map((r) => ({
ts: now, ts: now,
source: cfg.dlobSource,
market_name: r.market_name, market_name: r.market_name,
band_bps: r.band_bps, band_bps: r.band_bps,
market_type: r.market_type, market_type: r.market_type,
@@ -181,6 +185,7 @@ async function main() {
const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({ const slippageRows = (data?.dlob_slippage_latest || []).map((r) => ({
ts: now, ts: now,
source: cfg.dlobSource,
market_name: r.market_name, market_name: r.market_name,
side: r.side, side: r.side,
size_usd: r.size_usd, size_usd: r.size_usd,
@@ -201,6 +206,7 @@ async function main() {
const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({ const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
ts: now, ts: now,
source: cfg.dlobSource,
market_name: r.market_name, market_name: r.market_name,
side: r.side, side: r.side,
size_usd: r.size_usd, size_usd: r.size_usd,

View File

@@ -0,0 +1,52 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker-drift
annotations:
argocd.argoproj.io/sync-wave: "5"
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: dlob-worker-drift
template:
metadata:
labels:
app.kubernetes.io/name: dlob-worker-drift
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: worker
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: drift
- name: DLOB_HTTP_URL
value: https://dlob.drift.trade
- name: DLOB_FORCE_IPV6
value: "true"
- name: DLOB_MARKETS
value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS
value: "500"
- name: DLOB_DEPTH
value: "10"
command: ["node", "/app/worker.mjs"]
volumeMounts:
- name: script
mountPath: /app/worker.mjs
subPath: worker.mjs
readOnly: true
volumes:
- name: script
configMap:
name: dlob-worker-script

View File

@@ -26,10 +26,12 @@ spec:
secretKeyRef: secretKeyRef:
name: trade-hasura name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_SOURCE
value: mevnode
- name: DLOB_HTTP_URL - name: DLOB_HTTP_URL
value: http://dlob-server:6969 value: http://dlob-server:6969
- name: DLOB_MARKETS - name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: DLOB_POLL_MS - name: DLOB_POLL_MS
value: "500" value: "500"
- name: DLOB_DEPTH - name: DLOB_DEPTH

View File

@@ -64,8 +64,9 @@ function resolveConfig() {
.trim() .trim()
.replace(/\/$/, ''); .replace(/\/$/, '');
const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false); const dlobForceIpv6 = envBool('DLOB_FORCE_IPV6', false);
const dlobSource = String(process.env.DLOB_SOURCE || 'mevnode').trim() || 'mevnode';
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10); const depth = clampInt(process.env.DLOB_DEPTH, 1, 50, 10);
const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500); const pollMs = clampInt(process.env.DLOB_POLL_MS, 100, 10_000, 500);
@@ -80,6 +81,7 @@ function resolveConfig() {
hasuraUrl, hasuraUrl,
hasuraAdminSecret, hasuraAdminSecret,
hasuraAuthToken, hasuraAuthToken,
dlobSource,
dlobHttpBase, dlobHttpBase,
dlobForceIpv6, dlobForceIpv6,
markets, markets,
@@ -238,8 +240,9 @@ function computeStats({ l2, depth, pricePrecision, basePrecision }) {
}; };
} }
function l2ToInsertObject({ l2, updatedAt, pricePrecision }) { function l2ToInsertObject({ dlobSource, l2, updatedAt, pricePrecision }) {
return { return {
source: dlobSource,
market_name: String(l2.marketName), market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'), market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
@@ -256,8 +259,9 @@ function l2ToInsertObject({ l2, updatedAt, pricePrecision }) {
}; };
} }
function statsToInsertObject({ l2, stats, updatedAt }) { function statsToInsertObject({ dlobSource, l2, stats, updatedAt }) {
return { return {
source: dlobSource,
market_name: String(l2.marketName), market_name: String(l2.marketName),
market_type: String(l2.marketType || 'perp'), market_type: String(l2.marketType || 'perp'),
market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null, market_index: typeof l2.marketIndex === 'number' ? l2.marketIndex : null,
@@ -371,6 +375,7 @@ async function main() {
startedAt: getIsoNow(), startedAt: getIsoNow(),
hasuraUrl: cfg.hasuraUrl, hasuraUrl: cfg.hasuraUrl,
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none', hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
dlobSource: cfg.dlobSource,
dlobHttpBase: cfg.dlobHttpBase, dlobHttpBase: cfg.dlobHttpBase,
dlobForceIpv6: cfg.dlobForceIpv6, dlobForceIpv6: cfg.dlobForceIpv6,
markets: cfg.markets, markets: cfg.markets,
@@ -410,8 +415,8 @@ async function main() {
basePrecision: cfg.basePrecision, basePrecision: cfg.basePrecision,
}); });
l2Objects.push(l2ToInsertObject({ l2, updatedAt, pricePrecision: cfg.pricePrecision })); l2Objects.push(l2ToInsertObject({ dlobSource: cfg.dlobSource, l2, updatedAt, pricePrecision: cfg.pricePrecision }));
statsObjects.push(statsToInsertObject({ l2, stats, updatedAt })); statsObjects.push(statsToInsertObject({ dlobSource: cfg.dlobSource, l2, stats, updatedAt }));
} }
try { try {

View File

@@ -43,7 +43,7 @@ spec:
- name: REDIS_CLIENT - name: REDIS_CLIENT
value: DLOB value: DLOB
- name: PERP_MARKETS_TO_LOAD - name: PERP_MARKETS_TO_LOAD
value: "0,1,2,4,75" value: "0,7,24"
- name: ENDPOINT - name: ENDPOINT
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:
@@ -59,11 +59,15 @@ spec:
httpGet: httpGet:
path: /startup path: /startup
port: http port: http
initialDelaySeconds: 10 initialDelaySeconds: 120
periodSeconds: 10 periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 30
livenessProbe: livenessProbe:
httpGet: httpGet:
path: /health path: /health
port: http port: http
initialDelaySeconds: 30 initialDelaySeconds: 240
periodSeconds: 20 periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 10

View File

@@ -16,7 +16,7 @@ spec:
- name: gitea-registry - name: gitea-registry
containers: containers:
- name: frontend - name: frontend
image: gitea.mpabi.pl/trade/trade-frontend:sha-ob-20260111203413 image: gitea.mpabi.pl/trade/trade-frontend:sha-b06fe7f
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
ports: ports:
- name: http - name: http

View File

@@ -89,6 +89,8 @@ async function main() {
console.log(`[hasura-bootstrap] HASURA_URL=${HASURA_URL}`); console.log(`[hasura-bootstrap] HASURA_URL=${HASURA_URL}`);
await waitForHasura(); await waitForHasura();
const PUBLIC_DLOB_SOURCE_HEADER = 'X-Hasura-Dlob-Source';
const apiTokensTable = { schema: 'public', name: 'api_tokens' }; const apiTokensTable = { schema: 'public', name: 'api_tokens' };
const source = 'default'; const source = 'default';
@@ -187,7 +189,7 @@ async function main() {
'updated_at', 'updated_at',
]); ]);
const ensureDlobTable = async (table, columns) => { const ensureDlobTable = async (table, columns, { publicFilter } = {}) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } }); await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -200,7 +202,7 @@ async function main() {
role: 'public', role: 'public',
permission: { permission: {
columns, columns,
filter: {}, filter: publicFilter || {},
}, },
}, },
}); });
@@ -236,7 +238,7 @@ async function main() {
}); });
}; };
const ensurePublicSelectTable = async (table, columns) => { async function ensurePublicSelectTable(table, columns, { publicFilter } = {}) {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } }); await metadata({ type: 'pg_track_table', args: { source, table } });
@@ -249,7 +251,7 @@ async function main() {
role: 'public', role: 'public',
permission: { permission: {
columns, columns,
filter: {}, filter: publicFilter || {},
}, },
}, },
}); });
@@ -257,9 +259,12 @@ async function main() {
// Computed/archived tables are written by workers using admin secret; keep ingestor off by default. // Computed/archived tables are written by workers using admin secret; keep ingestor off by default.
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } }); await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } }); await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
}; }
const dlobPublicFilter = { source: { _eq: PUBLIC_DLOB_SOURCE_HEADER } };
await ensureDlobTable(dlobL2LatestTable, [ await ensureDlobTable(dlobL2LatestTable, [
'source',
'market_name', 'market_name',
'market_type', 'market_type',
'market_index', 'market_index',
@@ -273,9 +278,10 @@ async function main() {
'asks', 'asks',
'raw', 'raw',
'updated_at', 'updated_at',
]); ], { publicFilter: dlobPublicFilter });
await ensureDlobTable(dlobStatsLatestTable, [ await ensureDlobTable(dlobStatsLatestTable, [
'source',
'market_name', 'market_name',
'market_type', 'market_type',
'market_index', 'market_index',
@@ -296,9 +302,10 @@ async function main() {
'imbalance', 'imbalance',
'raw', 'raw',
'updated_at', 'updated_at',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsLatestTable, [ await ensurePublicSelectTable(dlobDepthBpsLatestTable, [
'source',
'market_name', 'market_name',
'band_bps', 'band_bps',
'market_type', 'market_type',
@@ -315,9 +322,10 @@ async function main() {
'imbalance', 'imbalance',
'raw', 'raw',
'updated_at', 'updated_at',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestTable, [ await ensurePublicSelectTable(dlobSlippageLatestTable, [
'source',
'market_name', 'market_name',
'side', 'side',
'size_usd', 'size_usd',
@@ -337,9 +345,10 @@ async function main() {
'fill_pct', 'fill_pct',
'raw', 'raw',
'updated_at', 'updated_at',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [ await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
'source',
'market_name', 'market_name',
'side', 'side',
'size_usd', 'size_usd',
@@ -359,11 +368,12 @@ async function main() {
'fill_pct', 'fill_pct',
'raw', 'raw',
'updated_at', 'updated_at',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobStatsTsTable, [ await ensurePublicSelectTable(dlobStatsTsTable, [
'ts', 'ts',
'id', 'id',
'source',
'market_name', 'market_name',
'market_type', 'market_type',
'market_index', 'market_index',
@@ -383,11 +393,12 @@ async function main() {
'depth_ask_usd', 'depth_ask_usd',
'imbalance', 'imbalance',
'raw', 'raw',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsTsTable, [ await ensurePublicSelectTable(dlobDepthBpsTsTable, [
'ts', 'ts',
'id', 'id',
'source',
'market_name', 'market_name',
'band_bps', 'band_bps',
'market_type', 'market_type',
@@ -403,11 +414,12 @@ async function main() {
'ask_usd', 'ask_usd',
'imbalance', 'imbalance',
'raw', 'raw',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsTable, [ await ensurePublicSelectTable(dlobSlippageTsTable, [
'ts', 'ts',
'id', 'id',
'source',
'market_name', 'market_name',
'side', 'side',
'size_usd', 'size_usd',
@@ -424,11 +436,12 @@ async function main() {
'levels_consumed', 'levels_consumed',
'fill_pct', 'fill_pct',
'raw', 'raw',
]); ], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsV2Table, [ await ensurePublicSelectTable(dlobSlippageTsV2Table, [
'ts', 'ts',
'id', 'id',
'source',
'market_name', 'market_name',
'side', 'side',
'size_usd', 'size_usd',
@@ -445,7 +458,7 @@ async function main() {
'levels_consumed', 'levels_consumed',
'fill_pct', 'fill_pct',
'raw', 'raw',
]); ], { publicFilter: dlobPublicFilter });
// Return table type for candle functions (needed for Hasura to track the function). // Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } }); await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });

View File

@@ -121,10 +121,17 @@ CREATE TABLE IF NOT EXISTS public.drift_candles (
high numeric, high numeric,
low numeric, low numeric,
close numeric, close numeric,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric, oracle_close numeric,
ticks bigint ticks bigint
); );
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric;
-- Precomputed candle cache (materialized by a worker). -- Precomputed candle cache (materialized by a worker).
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand. -- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
-- NOTE: `source=''` means "any source" (no source filter). -- NOTE: `source=''` means "any source" (no source filter).
@@ -137,12 +144,19 @@ CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
high numeric NOT NULL, high numeric NOT NULL,
low numeric NOT NULL, low numeric NOT NULL,
close numeric NOT NULL, close numeric NOT NULL,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric, oracle_close numeric,
ticks bigint NOT NULL DEFAULT 0, ticks bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (bucket, bucket_seconds, symbol, source) PRIMARY KEY (bucket, bucket_seconds, symbol, source)
); );
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric;
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
@@ -162,27 +176,144 @@ RETURNS SETOF public.drift_candles
LANGUAGE sql LANGUAGE sql
STABLE STABLE
AS $$ AS $$
WITH base AS ( -- Zwraca zawsze "ciągłe" buckety (fill forward), nawet jeśli nie było ticków w danej sekundzie/minucie.
-- Dzięki temu frontend może rysować regularną oś czasu (np. 1px = 1s) bez dziwnych przeskoków.
WITH src AS (
SELECT COALESCE(p_source, '') AS source_key
),
raw_cached AS (
SELECT
c.bucket,
c.open,
c.high,
c.low,
c.close,
c.oracle_open,
c.oracle_high,
c.oracle_low,
c.oracle_close,
c.ticks
FROM public.drift_candles_cache c, src
WHERE c.symbol = p_symbol
AND c.bucket_seconds = p_bucket_seconds
AND c.source = src.source_key
ORDER BY c.bucket DESC
LIMIT p_limit
),
raw_fallback AS (
SELECT SELECT
time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket, time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket,
ts, ts,
COALESCE(mark_price, oracle_price) AS px, COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px oracle_price AS oracle_px
FROM public.drift_ticks FROM public.drift_ticks, src
WHERE symbol = p_symbol WHERE symbol = p_symbol
AND (p_source IS NULL OR source = p_source) AND (src.source_key = '' OR source = src.source_key)
AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2)) AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2))
) ),
computed AS (
SELECT SELECT
bucket, bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open, (array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high, max(px) AS high,
min(px) AS low, min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close, (array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open,
max(oracle_px) AS oracle_high,
min(oracle_px) AS oracle_low,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*) AS ticks count(*) AS ticks
FROM base FROM raw_fallback
GROUP BY bucket GROUP BY bucket
),
data AS (
SELECT * FROM raw_cached
UNION ALL
SELECT * FROM computed
WHERE NOT EXISTS (SELECT 1 FROM raw_cached)
),
bounds AS (
SELECT max(bucket) AS end_bucket FROM data
),
params AS (
SELECT
make_interval(secs => p_bucket_seconds) AS step,
make_interval(secs => (p_bucket_seconds * (p_limit - 1))) AS span
),
series AS (
SELECT generate_series(
bounds.end_bucket - params.span,
bounds.end_bucket,
params.step
) AS bucket
FROM bounds, params
WHERE bounds.end_bucket IS NOT NULL
),
joined AS (
SELECT
s.bucket,
d.open,
d.high,
d.low,
d.close,
d.oracle_open,
d.oracle_high,
d.oracle_low,
d.oracle_close,
d.ticks
FROM series s
LEFT JOIN data d USING (bucket)
ORDER BY s.bucket ASC
),
grouped AS (
SELECT
*,
sum(CASE WHEN close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_close,
sum(CASE WHEN oracle_close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_oracle
FROM joined
),
first_vals AS (
SELECT
(SELECT close FROM grouped WHERE close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_close,
(SELECT oracle_close FROM grouped WHERE oracle_close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_oracle
),
ff AS (
SELECT
g.bucket,
g.open,
g.high,
g.low,
g.close,
g.oracle_open,
g.oracle_high,
g.oracle_low,
g.oracle_close,
g.ticks,
COALESCE(
g.close,
max(g.close) OVER (PARTITION BY g.grp_close),
f.first_close
) AS ff_close,
COALESCE(
g.oracle_close,
max(g.oracle_close) OVER (PARTITION BY g.grp_oracle),
f.first_oracle
) AS ff_oracle
FROM grouped g
CROSS JOIN first_vals f
)
SELECT
bucket,
COALESCE(open, ff_close) AS open,
COALESCE(high, ff_close) AS high,
COALESCE(low, ff_close) AS low,
COALESCE(close, ff_close) AS close,
COALESCE(oracle_open, ff_oracle) AS oracle_open,
COALESCE(oracle_high, ff_oracle) AS oracle_high,
COALESCE(oracle_low, ff_oracle) AS oracle_low,
COALESCE(oracle_close, ff_oracle) AS oracle_close,
COALESCE(ticks, 0) AS ticks
FROM ff
ORDER BY bucket DESC ORDER BY bucket DESC
LIMIT p_limit; LIMIT p_limit;
$$; $$;
@@ -190,7 +321,8 @@ $$;
-- Latest DLOB orderbook snapshots (top-N levels), per market. -- Latest DLOB orderbook snapshots (top-N levels), per market.
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions. -- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest ( CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
market_name TEXT PRIMARY KEY, source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp', market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER, market_index INTEGER,
ts BIGINT, ts BIGINT,
@@ -202,15 +334,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
bids JSONB, bids JSONB,
asks JSONB, asks JSONB,
raw JSONB, raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now() updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
ON public.dlob_l2_latest (updated_at DESC); ON public.dlob_l2_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx
ON public.dlob_l2_latest (source, updated_at DESC);
-- Derived stats for fast UI display. -- Derived stats for fast UI display.
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest ( CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
market_name TEXT PRIMARY KEY, source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp', market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER, market_index INTEGER,
ts BIGINT, ts BIGINT,
@@ -229,15 +398,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
depth_ask_usd NUMERIC, depth_ask_usd NUMERIC,
imbalance NUMERIC, imbalance NUMERIC,
raw JSONB, raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now() updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
ON public.dlob_stats_latest (updated_at DESC); ON public.dlob_stats_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx
ON public.dlob_stats_latest (source, updated_at DESC);
-- Depth snapshots within bps bands around mid-price (per market, per band). -- Depth snapshots within bps bands around mid-price (per market, per band).
-- Filled by a derived worker that reads `dlob_l2_latest`. -- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest ( CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL, band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp', market_type TEXT NOT NULL DEFAULT 'perp',
@@ -254,18 +460,54 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
imbalance NUMERIC, imbalance NUMERIC,
raw JSONB, raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, band_bps) PRIMARY KEY (source, market_name, band_bps)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN
EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
ON public.dlob_depth_bps_latest (updated_at DESC); ON public.dlob_depth_bps_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
ON public.dlob_depth_bps_latest (market_name); ON public.dlob_depth_bps_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx
ON public.dlob_depth_bps_latest (source, market_name);
-- Slippage/impact estimates for "market" orders at common USD sizes. -- Slippage/impact estimates for "market" orders at common USD sizes.
-- Filled by a derived worker that reads `dlob_l2_latest`. -- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest ( CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
side TEXT NOT NULL, side TEXT NOT NULL,
size_usd INTEGER NOT NULL, size_usd INTEGER NOT NULL,
@@ -285,19 +527,55 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
fill_pct NUMERIC, fill_pct NUMERIC,
raw JSONB, raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd), PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell')) CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
ON public.dlob_slippage_latest (updated_at DESC); ON public.dlob_slippage_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name); ON public.dlob_slippage_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx
ON public.dlob_slippage_latest (source, market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side. -- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss. -- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 ( CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL, size_usd NUMERIC NOT NULL,
@@ -317,22 +595,58 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
fill_pct NUMERIC, fill_pct NUMERIC,
raw JSONB, raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd), PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell')) CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC); ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name); ON public.dlob_slippage_latest_v2 (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx
ON public.dlob_slippage_latest_v2 (source, market_name);
-- Time-series tables for UI history (start: 7 days). -- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables. -- Keep these append-only; use Timescale hypertables.
CREATE TABLE IF NOT EXISTS public.dlob_stats_ts ( CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
ts TIMESTAMPTZ NOT NULL, ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL, id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp', market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER, market_index INTEGER,
@@ -355,14 +669,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
PRIMARY KEY (ts, id) PRIMARY KEY (ts, id)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx
ON public.dlob_stats_ts (market_name, ts DESC); ON public.dlob_stats_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx
ON public.dlob_stats_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts ( CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
ts TIMESTAMPTZ NOT NULL, ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL, id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL, band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp', market_type TEXT NOT NULL DEFAULT 'perp',
@@ -381,14 +705,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
PRIMARY KEY (ts, id) PRIMARY KEY (ts, id)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx
ON public.dlob_depth_bps_ts (market_name, ts DESC); ON public.dlob_depth_bps_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx
ON public.dlob_depth_bps_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts ( CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
ts TIMESTAMPTZ NOT NULL, ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL, id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
side TEXT NOT NULL, side TEXT NOT NULL,
size_usd INTEGER NOT NULL, size_usd INTEGER NOT NULL,
@@ -408,14 +742,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
PRIMARY KEY (ts, id) PRIMARY KEY (ts, id)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC); ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx
ON public.dlob_slippage_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 ( CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL, ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL, id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL, market_name TEXT NOT NULL,
side TEXT NOT NULL, side TEXT NOT NULL,
size_usd NUMERIC NOT NULL, size_usd NUMERIC NOT NULL,
@@ -435,11 +779,19 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
PRIMARY KEY (ts, id) PRIMARY KEY (ts, id)
); );
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE); SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC); ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present). -- Retention policies (best-effort; safe if Timescale is present).
DO $$ DO $$
BEGIN BEGIN

View File

@@ -19,9 +19,13 @@ resources:
- dlob/server-service.yaml - dlob/server-service.yaml
- dlob/server-deployment.yaml - dlob/server-deployment.yaml
- dlob-worker/deployment.yaml - dlob-worker/deployment.yaml
- dlob-worker/deployment-drift.yaml
- dlob-depth-worker/deployment.yaml - dlob-depth-worker/deployment.yaml
- dlob-depth-worker/deployment-drift.yaml
- dlob-slippage-worker/deployment.yaml - dlob-slippage-worker/deployment.yaml
- dlob-slippage-worker/deployment-drift.yaml
- dlob-ts-archiver/deployment.yaml - dlob-ts-archiver/deployment.yaml
- dlob-ts-archiver/deployment-drift.yaml
- candles-cache-worker/deployment.yaml - candles-cache-worker/deployment.yaml
configMapGenerator: configMapGenerator:

View File

@@ -0,0 +1,13 @@
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: monitoring-mpabi-pl
namespace: monitoring
spec:
secretName: monitoring-mpabi-pl-tls
issuerRef:
kind: ClusterIssuer
name: letsencrypt-prod
dnsNames:
- grafana.mpabi.pl
- prometheus.mpabi.pl

View File

@@ -0,0 +1,238 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-dashboard-agave-status
namespace: monitoring
labels:
grafana_dashboard: "1"
data:
agave-status.json: |-
{
"uid": "agave-status-mpabi",
"title": "Agave @ mpabi",
"timezone": "browser",
"schemaVersion": 39,
"version": 2,
"time": { "from": "now-6h", "to": "now" },
"timepicker": {
"refresh_intervals": ["5s", "10s", "30s", "1m", "5m"],
"time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d"]
},
"refresh": "10s",
"tags": ["agave", "solana", "mpabi"],
"templating": {
"list": [
{
"name": "instance",
"type": "query",
"label": "Node Exporter Instance",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"query": { "query": "label_values(up{job=\"mpabi-node-exporter\"}, instance)", "refId": "PromVarInstance" },
"current": { "selected": false, "text": "10.66.66.1:9100", "value": "10.66.66.1:9100" }
}
]
},
"panels": [
{
"id": 1,
"type": "stat",
"title": "Geyser Metrics Target (Prometheus up)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "A", "expr": "up{job=\"mpabi-yellowstone-geyser\"}" }],
"options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "orientation": "horizontal", "textMode": "value" },
"fieldConfig": {
"defaults": { "thresholds": { "mode": "absolute", "steps": [{ "color": "red", "value": null }, { "color": "green", "value": 1 }] } },
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 0, "y": 0 }
},
{
"id": 2,
"type": "stat",
"title": "agave-validator.service (systemd active)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "B", "expr": "node_systemd_unit_state{job=\"mpabi-node-exporter\",instance=\"$instance\",name=\"agave-validator.service\",state=\"active\"}" }],
"options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "orientation": "horizontal", "textMode": "value" },
"fieldConfig": {
"defaults": { "thresholds": { "mode": "absolute", "steps": [{ "color": "red", "value": null }, { "color": "green", "value": 1 }] } },
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 12, "y": 0 }
},
{
"id": 3,
"type": "timeseries",
"title": "CPU Used (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "C", "expr": "100 - (avg by (instance) (rate(node_cpu_seconds_total{job=\"mpabi-node-exporter\",instance=\"$instance\",mode=\"idle\"}[5m])) * 100)" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 6 }
},
{
"id": 4,
"type": "timeseries",
"title": "Load (1m)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "D", "expr": "node_load1{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }],
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 6 }
},
{
"id": 5,
"type": "timeseries",
"title": "Memory Used (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "E", "expr": "100 - (node_memory_MemAvailable_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} / node_memory_MemTotal_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} * 100)" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 6 }
},
{
"id": 6,
"type": "timeseries",
"title": "Swap Used (GiB)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "F", "expr": "(node_memory_SwapTotal_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"} - node_memory_SwapFree_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\"}) / 1024 / 1024 / 1024" }],
"fieldConfig": { "defaults": { "unit": "gbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 14 }
},
{
"id": 7,
"type": "timeseries",
"title": "Disk Free Accounts (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "G", "expr": "100 * node_filesystem_avail_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/accounts\"} / node_filesystem_size_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/accounts\"}" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 14 }
},
{
"id": 8,
"type": "timeseries",
"title": "Disk Free Ledger (%)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "H", "expr": "100 * node_filesystem_avail_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/ledger\"} / node_filesystem_size_bytes{job=\"mpabi-node-exporter\",instance=\"$instance\",mountpoint=\"/var/lib/solana/ledger\"}" }],
"fieldConfig": { "defaults": { "unit": "percent", "min": 0, "max": 100 }, "overrides": [] },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 14 }
},
{
"id": 9,
"type": "timeseries",
"title": "Disk IO (NVMe) Read/Write (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "I", "expr": "sum by (device) (rate(node_disk_read_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=~\"nvme.*\"}[5m])) / 1024 / 1024", "legendFormat": "read {{device}}" },
{ "refId": "J", "expr": "sum by (device) (rate(node_disk_written_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=~\"nvme.*\"}[5m])) / 1024 / 1024", "legendFormat": "write {{device}}" }
],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 22 }
},
{
"id": 10,
"type": "timeseries",
"title": "Network wg0 RX/TX (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "K", "expr": "rate(node_network_receive_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=\"wg0\"}[5m]) / 1024 / 1024", "legendFormat": "rx" },
{ "refId": "L", "expr": "rate(node_network_transmit_bytes_total{job=\"mpabi-node-exporter\",instance=\"$instance\",device=\"wg0\"}[5m]) / 1024 / 1024", "legendFormat": "tx" }
],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 22 }
},
{
"id": 11,
"type": "timeseries",
"title": "Geyser: Subscriber Queue Size",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "M", "expr": "grpc_subscriber_queue_size{job=\"mpabi-yellowstone-geyser\"}", "legendFormat": "{{subscriber_id}}" }],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 30 }
},
{
"id": 12,
"type": "timeseries",
"title": "Geyser: Connections Total",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "N", "expr": "connections_total{job=\"mpabi-yellowstone-geyser\"}" }],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 30 }
},
{
"id": 13,
"type": "timeseries",
"title": "Geyser: Bytes Sent (MiB/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "O", "expr": "rate(grpc_bytes_sent{job=\"mpabi-yellowstone-geyser\"}[5m]) / 1024 / 1024", "legendFormat": "{{subscriber_id}}" }],
"fieldConfig": { "defaults": { "unit": "mbytes" }, "overrides": [] },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 38 }
},
{
"id": 14,
"type": "timeseries",
"title": "Geyser: Messages Sent (/s)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "P", "expr": "rate(grpc_message_sent_count{job=\"mpabi-yellowstone-geyser\"}[5m])", "legendFormat": "{{subscriber_id}}" }],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 38 }
},
{
"id": 15,
"type": "timeseries",
"title": "Geyser: Disconnects (increase 15m)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "Q", "expr": "sum by (reason) (increase(grpc_client_disconnects_total{job=\"mpabi-yellowstone-geyser\"}[15m]))", "legendFormat": "{{reason}}" }],
"gridPos": { "h": 8, "w": 24, "x": 0, "y": 46 }
},
{
"id": 16,
"type": "stat",
"title": "RPC Slot Lag (slots)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "R", "expr": "solana_rpc_slot_lag{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }],
"fieldConfig": {
"defaults": {
"unit": "short",
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 20 },
{ "color": "red", "value": 50 }
]
}
},
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 0, "y": 54 }
},
{
"id": 17,
"type": "stat",
"title": "RPC Slot Lag (szac. minuty)",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [{ "refId": "S", "expr": "solana_rpc_slot_lag{job=\"mpabi-node-exporter\",instance=\"$instance\"} * 0.4 / 60" }],
"fieldConfig": {
"defaults": {
"unit": "min",
"decimals": 2,
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 1 },
{ "color": "red", "value": 2 }
]
}
},
"overrides": []
},
"gridPos": { "h": 6, "w": 12, "x": 12, "y": 54 }
},
{
"id": 18,
"type": "timeseries",
"title": "RPC Slot & Reference Slot",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{ "refId": "T", "expr": "solana_rpc_slot{job=\"mpabi-node-exporter\",instance=\"$instance\"}" },
{ "refId": "U", "expr": "solana_rpc_slot_reference{job=\"mpabi-node-exporter\",instance=\"$instance\"}" },
{ "refId": "V", "expr": "solana_rpc_block_height{job=\"mpabi-node-exporter\",instance=\"$instance\"}" }
],
"gridPos": { "h": 8, "w": 24, "x": 0, "y": 60 }
}
]
}

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: grafana-http
namespace: monitoring
spec:
entryPoints:
- web
routes:
- match: Host(`grafana.mpabi.pl`)
kind: Rule
middlewares:
- name: redirect-to-https
services:
- name: monitoring-stack-grafana
port: 80

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: grafana
namespace: monitoring
spec:
entryPoints:
- websecure
routes:
- match: Host(`grafana.mpabi.pl`)
kind: Rule
services:
- name: monitoring-stack-grafana
port: 80
tls:
secretName: monitoring-mpabi-pl-tls

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: prometheus-http
namespace: monitoring
spec:
entryPoints:
- web
routes:
- match: Host(`prometheus.mpabi.pl`)
kind: Rule
middlewares:
- name: redirect-to-https
services:
- name: monitoring-stack-kube-prom-prometheus
port: 9090

View File

@@ -0,0 +1,16 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: prometheus
namespace: monitoring
spec:
entryPoints:
- websecure
routes:
- match: Host(`prometheus.mpabi.pl`)
kind: Rule
services:
- name: monitoring-stack-kube-prom-prometheus
port: 9090
tls:
secretName: monitoring-mpabi-pl-tls

View File

@@ -0,0 +1,11 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- middleware-redirect-to-https.yaml
- certificate-monitoring-mpabi-pl.yaml
- ingressroute-grafana.yaml
- ingressroute-grafana-http.yaml
- ingressroute-prometheus.yaml
- ingressroute-prometheus-http.yaml
- dashboard-agave-status.yaml
- prometheus-rules-agave.yaml

View File

@@ -0,0 +1,9 @@
apiVersion: traefik.io/v1alpha1
kind: Middleware
metadata:
name: redirect-to-https
namespace: monitoring
spec:
redirectScheme:
scheme: https
permanent: true

View File

@@ -0,0 +1,61 @@
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: agave-rpc-alerts
namespace: monitoring
labels:
app: kube-prometheus-stack
release: monitoring-stack
spec:
groups:
- name: agave-rpc-health
rules:
- alert: AgaveRPCDown
expr: "max by (instance) (solana_rpc_up{job=\"mpabi-node-exporter\"}) == 0"
for: 30s
labels:
severity: critical
team: mpabi
annotations:
summary: "Agave RPC is unreachable"
description: "RPC probe from node exporter reports solana_rpc_up == 0 for instance {{ $labels.instance }}."
- alert: AgaveRPCSlotLagHigh
expr: "sum by (instance) (solana_rpc_slot_lag{job=\"mpabi-node-exporter\"}) > 50"
for: 2m
labels:
severity: warning
team: mpabi
annotations:
summary: "Agave RPC is lagging behind cluster"
description: "Current slot lag is {{ $value }} for instance {{ $labels.instance }}. Reference endpoint in probe config may be misconfigured or validator is behind."
- alert: AgaveRPCSlotLagCritical
expr: "sum by (instance) (solana_rpc_slot_lag{job=\"mpabi-node-exporter\"}) > 500"
for: 2m
labels:
severity: critical
team: mpabi
annotations:
summary: "Agave RPC severe lag"
description: "Slot lag is critically high ({{ $value }} slots) on instance {{ $labels.instance }}."
- alert: AgaveIOHigh
expr: |
sum by (instance) (
(rate(node_disk_read_bytes_total{job="mpabi-node-exporter",device=~"nvme.*"}[5m]) +
rate(node_disk_written_bytes_total{job="mpabi-node-exporter",device=~"nvme.*"}[5m])) / 1024 / 1024
) > 300
for: 5m
labels:
severity: warning
team: mpabi
annotations:
summary: "High storage I/O on Agave node"
description: "Combined NVMe read+write throughput >300 MiB/s for 5m on {{ $labels.instance }}. Check disk pressure and Geyser/ledger workload."
- alert: AgaveIOWaitHigh
expr: "avg by (instance) (rate(node_cpu_seconds_total{job=\"mpabi-node-exporter\",mode=\"iowait\"}[5m])) > 0.2"
for: 5m
labels:
severity: warning
team: mpabi
annotations:
summary: "High iowait on Agave node"
description: "Iowait over 20% on average for 5m on {{ $labels.instance }}. Storage latency is likely impacting slot progress."

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-publisher
spec:
template:
spec:
containers:
- name: publisher
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-server
spec:
template:
spec:
containers:
- name: server
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,18 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-frontend
spec:
template:
spec:
containers:
- name: frontend
volumeMounts:
- name: frontend-server-script
mountPath: /app/services/frontend/server.mjs
subPath: frontend-server.mjs
readOnly: true
volumes:
- name: frontend-server-script
configMap:
name: trade-frontend-server-script

View File

@@ -0,0 +1,813 @@
import crypto from 'node:crypto';
import { spawnSync } from 'node:child_process';
import fs from 'node:fs';
import http from 'node:http';
import https from 'node:https';
import net from 'node:net';
import path from 'node:path';
import tls from 'node:tls';
const PORT = Number.parseInt(process.env.PORT || '8081', 10);
if (!Number.isInteger(PORT) || PORT <= 0) throw new Error(`Invalid PORT: ${process.env.PORT}`);
const APP_VERSION = String(process.env.APP_VERSION || 'v1').trim() || 'v1';
const BUILD_TIMESTAMP = String(process.env.BUILD_TIMESTAMP || '').trim() || undefined;
const STARTED_AT = new Date().toISOString();
const STATIC_DIR = process.env.STATIC_DIR || '/srv';
const BASIC_AUTH_FILE = process.env.BASIC_AUTH_FILE || '/tokens/frontend.json';
const API_READ_TOKEN_FILE = process.env.API_READ_TOKEN_FILE || '/tokens/read.json';
const API_UPSTREAM = process.env.API_UPSTREAM || process.env.API_URL || 'http://api:8787';
const HASURA_UPSTREAM = process.env.HASURA_UPSTREAM || 'http://hasura:8080';
const HASURA_GRAPHQL_PATH = process.env.HASURA_GRAPHQL_PATH || '/v1/graphql';
const GRAPHQL_CORS_ORIGIN = process.env.GRAPHQL_CORS_ORIGIN || process.env.CORS_ORIGIN || '*';
const BASIC_AUTH_MODE = String(process.env.BASIC_AUTH_MODE || 'on')
.trim()
.toLowerCase();
const BASIC_AUTH_ENABLED = !['off', 'false', '0', 'disabled', 'none'].includes(BASIC_AUTH_MODE);
const AUTH_USER_HEADER = String(process.env.AUTH_USER_HEADER || 'x-trade-user')
.trim()
.toLowerCase();
const AUTH_MODE = String(process.env.AUTH_MODE || 'session')
.trim()
.toLowerCase();
const HTPASSWD_FILE = String(process.env.HTPASSWD_FILE || '/auth/users').trim();
const AUTH_SESSION_SECRET_FILE = String(process.env.AUTH_SESSION_SECRET_FILE || '').trim() || null;
const AUTH_SESSION_COOKIE = String(process.env.AUTH_SESSION_COOKIE || 'trade_session')
.trim()
.toLowerCase();
const AUTH_SESSION_TTL_SECONDS = Number.parseInt(process.env.AUTH_SESSION_TTL_SECONDS || '43200', 10); // 12h
const DLOB_SOURCE_COOKIE = String(process.env.DLOB_SOURCE_COOKIE || 'trade_dlob_source').trim() || 'trade_dlob_source';
const DLOB_SOURCE_DEFAULT = String(process.env.DLOB_SOURCE_DEFAULT || 'mevnode').trim() || 'mevnode';
function readJson(filePath) {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
}
function readText(filePath) {
return fs.readFileSync(filePath, 'utf8');
}
function timingSafeEqualStr(a, b) {
const aa = Buffer.from(String(a), 'utf8');
const bb = Buffer.from(String(b), 'utf8');
if (aa.length !== bb.length) return false;
return crypto.timingSafeEqual(aa, bb);
}
function timingSafeEqualBuf(a, b) {
if (!(a instanceof Uint8Array) || !(b instanceof Uint8Array)) return false;
if (a.length !== b.length) return false;
return crypto.timingSafeEqual(Buffer.from(a), Buffer.from(b));
}
function loadBasicAuth() {
const j = readJson(BASIC_AUTH_FILE);
const username = (j?.username || '').toString();
const password = (j?.password || '').toString();
if (!username || !password) throw new Error(`Invalid BASIC_AUTH_FILE: ${BASIC_AUTH_FILE}`);
return { username, password };
}
function loadApiReadToken() {
const j = readJson(API_READ_TOKEN_FILE);
const token = (j?.token || '').toString();
if (!token) throw new Error(`Invalid API_READ_TOKEN_FILE: ${API_READ_TOKEN_FILE}`);
return token;
}
function send(res, status, headers, body) {
res.statusCode = status;
for (const [k, v] of Object.entries(headers || {})) res.setHeader(k, v);
if (body == null) return void res.end();
res.end(body);
}
function sendJson(res, status, body) {
send(res, status, { 'content-type': 'application/json; charset=utf-8', 'cache-control': 'no-store' }, JSON.stringify(body));
}
function basicAuthRequired(res) {
res.setHeader('www-authenticate', 'Basic realm="trade"');
send(res, 401, { 'content-type': 'text/plain; charset=utf-8' }, 'unauthorized');
}
function unauthorized(res) {
sendJson(res, 401, { ok: false, error: 'unauthorized' });
}
function isAuthorized(req, creds) {
const auth = req.headers.authorization || '';
const m = String(auth).match(/^Basic\s+(.+)$/i);
if (!m?.[1]) return false;
let decoded;
try {
decoded = Buffer.from(m[1], 'base64').toString('utf8');
} catch {
return false;
}
const idx = decoded.indexOf(':');
if (idx < 0) return false;
const u = decoded.slice(0, idx);
const p = decoded.slice(idx + 1);
return timingSafeEqualStr(u, creds.username) && timingSafeEqualStr(p, creds.password);
}
const MIME = {
'.html': 'text/html; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.js': 'application/javascript; charset=utf-8',
'.mjs': 'application/javascript; charset=utf-8',
'.json': 'application/json; charset=utf-8',
'.svg': 'image/svg+xml',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.ico': 'image/x-icon',
'.txt': 'text/plain; charset=utf-8',
'.map': 'application/json; charset=utf-8',
};
function contentTypeFor(filePath) {
return MIME[path.extname(filePath).toLowerCase()] || 'application/octet-stream';
}
function safePathFromUrlPath(urlPath) {
const decoded = decodeURIComponent(urlPath);
const cleaned = decoded.replace(/\0/g, '');
// strip leading slash so join() doesn't ignore STATIC_DIR
const rel = cleaned.replace(/^\/+/, '');
const normalized = path.normalize(rel);
// prevent traversal
if (normalized.startsWith('..') || path.isAbsolute(normalized)) return null;
return normalized;
}
function injectIndexHtml(html, { dlobSource, redirectPath }) {
const src = normalizeDlobSource(dlobSource) || 'mevnode';
const redirect = safeRedirectPath(redirectPath);
const hrefBase = `/prefs/dlob-source?redirect=${encodeURIComponent(redirect)}&set=`;
const styleActive = 'font-weight:700;text-decoration:underline;';
const styleInactive = 'font-weight:400;text-decoration:none;';
const snippet = `
<!-- trade: dlob source switch -->
<div style="position:fixed;right:12px;bottom:12px;z-index:2147483647;background:rgba(0,0,0,0.72);color:#fff;padding:8px 10px;border-radius:10px;font:12px/1.2 system-ui,-apple-system,Segoe UI,Roboto,sans-serif;backdrop-filter:blur(6px);">
<span style="opacity:0.85;margin-right:6px;">DLOB</span>
<a href="${hrefBase}mevnode" style="color:#fff;${src === 'mevnode' ? styleActive : styleInactive}">mevnode</a>
<span style="opacity:0.6;margin:0 6px;">|</span>
<a href="${hrefBase}drift" style="color:#fff;${src === 'drift' ? styleActive : styleInactive}">drift</a>
</div>
`;
const bodyClose = /<\/body>/i;
if (bodyClose.test(html)) return html.replace(bodyClose, `${snippet}</body>`);
return `${html}\n${snippet}\n`;
}
function serveStatic(req, res) {
if (req.method !== 'GET' && req.method !== 'HEAD') {
send(res, 405, { 'content-type': 'text/plain; charset=utf-8' }, 'method_not_allowed');
return;
}
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const rel = safePathFromUrlPath(url.pathname);
if (rel == null) {
send(res, 400, { 'content-type': 'text/plain; charset=utf-8' }, 'bad_request');
return;
}
const root = path.resolve(STATIC_DIR);
const fileCandidate = path.resolve(root, rel);
if (!fileCandidate.startsWith(root)) {
send(res, 400, { 'content-type': 'text/plain; charset=utf-8' }, 'bad_request');
return;
}
const trySend = (filePath) => {
try {
const st = fs.statSync(filePath);
if (st.isDirectory()) return trySend(path.join(filePath, 'index.html'));
res.statusCode = 200;
res.setHeader('content-type', contentTypeFor(filePath));
res.setHeader('cache-control', filePath.endsWith('index.html') ? 'no-cache' : 'public, max-age=31536000');
if (req.method === 'HEAD') return void res.end();
if (filePath.endsWith('index.html')) {
const html = fs.readFileSync(filePath, 'utf8');
const injected = injectIndexHtml(html, {
dlobSource: resolveDlobSource(req),
redirectPath: url.pathname + url.search,
});
res.end(injected);
return true;
}
fs.createReadStream(filePath).pipe(res);
return true;
} catch {
return false;
}
};
// exact file, otherwise SPA fallback
if (trySend(fileCandidate)) return;
const indexPath = path.join(root, 'index.html');
if (trySend(indexPath)) return;
send(res, 404, { 'content-type': 'text/plain; charset=utf-8' }, 'not_found');
}
function stripHopByHopHeaders(headers) {
const hop = new Set([
'connection',
'keep-alive',
'proxy-authenticate',
'proxy-authorization',
'te',
'trailer',
'transfer-encoding',
'upgrade',
]);
const out = {};
for (const [k, v] of Object.entries(headers || {})) {
if (hop.has(k.toLowerCase())) continue;
out[k] = v;
}
return out;
}
function readHeader(req, name) {
const v = req.headers[String(name).toLowerCase()];
return Array.isArray(v) ? v[0] : v;
}
function readCookie(req, name) {
const raw = typeof req.headers.cookie === 'string' ? req.headers.cookie : '';
if (!raw) return null;
const needle = `${name}=`;
for (const part of raw.split(';')) {
const t = part.trim();
if (!t.startsWith(needle)) continue;
return t.slice(needle.length) || null;
}
return null;
}
function normalizeDlobSource(value) {
const v = String(value ?? '')
.trim()
.toLowerCase();
if (v === 'mevnode') return 'mevnode';
if (v === 'drift') return 'drift';
return null;
}
function resolveDlobSource(req) {
const fromCookie = normalizeDlobSource(readCookie(req, DLOB_SOURCE_COOKIE));
if (fromCookie) return fromCookie;
return normalizeDlobSource(DLOB_SOURCE_DEFAULT) || 'mevnode';
}
function safeRedirectPath(value) {
const s = String(value ?? '').trim();
if (!s.startsWith('/')) return '/';
if (s.startsWith('//')) return '/';
return s.replace(/\r|\n/g, '');
}
function setDlobSourceCookie(res, { secure, dlobSource }) {
const src = normalizeDlobSource(dlobSource);
if (!src) return false;
const parts = [
`${DLOB_SOURCE_COOKIE}=${src}`,
'Path=/',
'SameSite=Lax',
'HttpOnly',
'Max-Age=31536000',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
return true;
}
function resolveAuthUser(req) {
const user = readHeader(req, AUTH_USER_HEADER) || readHeader(req, 'x-webauth-user');
const value = typeof user === 'string' ? user.trim() : '';
return value || null;
}
function isHttpsRequest(req) {
const xf = readHeader(req, 'x-forwarded-proto');
if (typeof xf === 'string' && xf.toLowerCase() === 'https') return true;
return Boolean(req.socket && req.socket.encrypted);
}
function base64urlEncode(buf) {
return Buffer.from(buf)
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/g, '');
}
function base64urlDecode(str) {
const cleaned = String(str).replace(/-/g, '+').replace(/_/g, '/');
const pad = cleaned.length % 4 === 0 ? '' : '='.repeat(4 - (cleaned.length % 4));
return Buffer.from(cleaned + pad, 'base64');
}
function loadSessionSecret() {
if (process.env.AUTH_SESSION_SECRET && String(process.env.AUTH_SESSION_SECRET).trim()) {
return Buffer.from(String(process.env.AUTH_SESSION_SECRET).trim(), 'utf8');
}
if (AUTH_SESSION_SECRET_FILE) {
try {
const txt = readText(AUTH_SESSION_SECRET_FILE).trim();
if (txt) return Buffer.from(txt, 'utf8');
} catch {
// ignore
}
}
return crypto.randomBytes(32);
}
const SESSION_SECRET = loadSessionSecret();
function signSessionPayload(payloadB64) {
return crypto.createHmac('sha256', SESSION_SECRET).update(payloadB64).digest();
}
function makeSessionCookieValue(username) {
const now = Math.floor(Date.now() / 1000);
const exp = now + (Number.isFinite(AUTH_SESSION_TTL_SECONDS) && AUTH_SESSION_TTL_SECONDS > 0 ? AUTH_SESSION_TTL_SECONDS : 43200);
const payload = JSON.stringify({ u: String(username), exp });
const payloadB64 = base64urlEncode(Buffer.from(payload, 'utf8'));
const sigB64 = base64urlEncode(signSessionPayload(payloadB64));
return `${payloadB64}.${sigB64}`;
}
function getSessionUser(req) {
const raw = readCookie(req, AUTH_SESSION_COOKIE);
if (!raw) return null;
const parts = raw.split('.');
if (parts.length !== 2) return null;
const [payloadB64, sigB64] = parts;
if (!payloadB64 || !sigB64) return null;
let payload;
try {
payload = JSON.parse(base64urlDecode(payloadB64).toString('utf8'));
} catch {
return null;
}
const u = typeof payload?.u === 'string' ? payload.u.trim() : '';
const exp = Number(payload?.exp);
if (!u || !Number.isFinite(exp)) return null;
const now = Math.floor(Date.now() / 1000);
if (now >= exp) return null;
const expected = signSessionPayload(payloadB64);
let got;
try {
got = base64urlDecode(sigB64);
} catch {
return null;
}
if (!timingSafeEqualBuf(expected, got)) return null;
return u;
}
function resolveAuthenticatedUser(req) {
const sessionUser = getSessionUser(req);
if (sessionUser) return sessionUser;
const headerUser = resolveAuthUser(req);
if (headerUser) return headerUser;
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') return 'anonymous';
return null;
}
function clearSessionCookie(res, secure) {
const parts = [`${AUTH_SESSION_COOKIE}=`, 'Path=/', 'Max-Age=0', 'HttpOnly', 'SameSite=Lax'];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
}
function setSessionCookie(res, secure, username) {
const value = makeSessionCookieValue(username);
const parts = [
`${AUTH_SESSION_COOKIE}=${value}`,
'Path=/',
`Max-Age=${Number.isFinite(AUTH_SESSION_TTL_SECONDS) ? AUTH_SESSION_TTL_SECONDS : 43200}`,
'HttpOnly',
'SameSite=Lax',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
}
function verifyWithHtpasswd(username, password) {
try {
const r = spawnSync('htpasswd', ['-vb', HTPASSWD_FILE, String(username), String(password)], {
stdio: 'ignore',
timeout: 3000,
});
return r.status === 0;
} catch {
return false;
}
}
function readBody(req, limitBytes = 1024 * 16) {
return new Promise((resolve, reject) => {
let total = 0;
const chunks = [];
req.on('data', (chunk) => {
total += chunk.length;
if (total > limitBytes) {
reject(new Error('payload_too_large'));
req.destroy();
return;
}
chunks.push(chunk);
});
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
req.on('error', reject);
});
}
function proxyApi(req, res, apiReadToken) {
const upstreamBase = new URL(API_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const prefix = '/api';
const strippedPath = inUrl.pathname === prefix ? '/' : inUrl.pathname.startsWith(prefix + '/') ? inUrl.pathname.slice(prefix.length) : null;
if (strippedPath == null) {
send(res, 404, { 'content-type': 'text/plain; charset=utf-8' }, 'not_found');
return;
}
const target = new URL(upstreamBase.toString());
target.pathname = strippedPath || '/';
target.search = inUrl.search;
const isHttps = target.protocol === 'https:';
const lib = isHttps ? https : http;
const headers = stripHopByHopHeaders(req.headers);
delete headers.authorization; // basic auth from client must not leak upstream
headers.host = target.host;
headers.authorization = `Bearer ${apiReadToken}`;
const upstreamReq = lib.request(
{
protocol: target.protocol,
hostname: target.hostname,
port: target.port || (isHttps ? 443 : 80),
method: req.method,
path: target.pathname + target.search,
headers,
},
(upstreamRes) => {
const outHeaders = stripHopByHopHeaders(upstreamRes.headers);
res.writeHead(upstreamRes.statusCode || 502, outHeaders);
upstreamRes.pipe(res);
}
);
upstreamReq.on('error', (err) => {
if (!res.headersSent) {
send(res, 502, { 'content-type': 'text/plain; charset=utf-8' }, `bad_gateway: ${err?.message || err}`);
} else {
res.destroy();
}
});
req.pipe(upstreamReq);
}
function withCors(res) {
res.setHeader('access-control-allow-origin', GRAPHQL_CORS_ORIGIN);
res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS');
res.setHeader(
'access-control-allow-headers',
'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id, x-hasura-dlob-source'
);
}
function proxyGraphqlHttp(req, res) {
const upstreamBase = new URL(HASURA_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const target = new URL(upstreamBase.toString());
target.pathname = HASURA_GRAPHQL_PATH;
target.search = inUrl.search;
const isHttps = target.protocol === 'https:';
const lib = isHttps ? https : http;
const headers = stripHopByHopHeaders(req.headers);
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const upstreamReq = lib.request(
{
protocol: target.protocol,
hostname: target.hostname,
port: target.port || (isHttps ? 443 : 80),
method: req.method,
path: target.pathname + target.search,
headers,
},
(upstreamRes) => {
const outHeaders = stripHopByHopHeaders(upstreamRes.headers);
withCors(res);
res.writeHead(upstreamRes.statusCode || 502, outHeaders);
upstreamRes.pipe(res);
}
);
upstreamReq.on('error', (err) => {
if (!res.headersSent) {
withCors(res);
send(res, 502, { 'content-type': 'text/plain; charset=utf-8' }, `bad_gateway: ${err?.message || err}`);
} else {
res.destroy();
}
});
req.pipe(upstreamReq);
}
function isGraphqlPath(pathname) {
return pathname === '/graphql' || pathname === '/graphql-ws';
}
function proxyGraphqlWs(req, socket, head) {
const upstreamBase = new URL(HASURA_UPSTREAM);
const inUrl = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const target = new URL(upstreamBase.toString());
target.pathname = HASURA_GRAPHQL_PATH;
target.search = inUrl.search;
const port = Number(target.port || (target.protocol === 'https:' ? 443 : 80));
const host = target.hostname;
const connect =
target.protocol === 'https:'
? () => tls.connect({ host, port, servername: host })
: () => net.connect({ host, port });
const upstream = connect();
upstream.setNoDelay(true);
socket.setNoDelay(true);
// For WebSocket upgrades we must forward `connection`/`upgrade` and related headers.
const headers = { ...req.headers };
delete headers['content-length'];
delete headers['content-type'];
headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const lines = [];
lines.push(`GET ${target.pathname + target.search} HTTP/1.1`);
for (const [k, v] of Object.entries(headers)) {
if (v == null) continue;
if (Array.isArray(v)) {
for (const vv of v) lines.push(`${k}: ${vv}`);
} else {
lines.push(`${k}: ${v}`);
}
}
lines.push('', '');
upstream.write(lines.join('\r\n'));
if (head?.length) upstream.write(head);
upstream.on('error', () => {
try {
socket.destroy();
} catch {
// ignore
}
});
socket.on('error', () => {
try {
upstream.destroy();
} catch {
// ignore
}
});
upstream.pipe(socket);
socket.pipe(upstream);
}
async function handler(req, res) {
if (req.method === 'GET' && (req.url === '/healthz' || req.url?.startsWith('/healthz?'))) {
send(
res,
200,
{ 'content-type': 'application/json; charset=utf-8' },
JSON.stringify({ ok: true, version: APP_VERSION, buildTimestamp: BUILD_TIMESTAMP, startedAt: STARTED_AT })
);
return;
}
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
if (isGraphqlPath(url.pathname)) {
if (req.method === 'OPTIONS') {
withCors(res);
res.statusCode = 204;
res.end();
return;
}
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
withCors(res);
unauthorized(res);
return;
}
}
withCors(res);
proxyGraphqlHttp(req, res);
return;
}
if (req.method === 'GET' && url.pathname === '/whoami') {
sendJson(res, 200, { ok: true, user: resolveAuthenticatedUser(req), mode: AUTH_MODE });
return;
}
if (req.method === 'GET' && url.pathname === '/prefs/dlob-source') {
const set = url.searchParams.get('set');
if (!set) {
sendJson(res, 200, { ok: true, dlobSource: resolveDlobSource(req) });
return;
}
const ok = setDlobSourceCookie(res, { secure: isHttpsRequest(req), dlobSource: set });
if (!ok) {
sendJson(res, 400, { ok: false, error: 'invalid_dlob_source' });
return;
}
res.statusCode = 302;
res.setHeader('location', safeRedirectPath(url.searchParams.get('redirect') || '/'));
res.end();
return;
}
if (req.method === 'POST' && url.pathname === '/auth/login') {
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') {
sendJson(res, 400, { ok: false, error: 'auth_disabled' });
return;
}
const raw = await readBody(req);
const ct = String(req.headers['content-type'] || '').toLowerCase();
let username = '';
let password = '';
if (ct.includes('application/json')) {
let json;
try {
json = JSON.parse(raw);
} catch {
sendJson(res, 400, { ok: false, error: 'bad_json' });
return;
}
username = typeof json?.username === 'string' ? json.username.trim() : '';
password = typeof json?.password === 'string' ? json.password : '';
} else {
const params = new URLSearchParams(raw);
username = String(params.get('username') || '').trim();
password = String(params.get('password') || '');
}
if (!username || !password || username.length > 64 || password.length > 200) {
sendJson(res, 400, { ok: false, error: 'invalid_input' });
return;
}
const ok = verifyWithHtpasswd(username, password);
if (!ok) {
unauthorized(res);
return;
}
const secure = isHttpsRequest(req);
setSessionCookie(res, secure, username);
sendJson(res, 200, { ok: true, user: username });
return;
}
if ((req.method === 'POST' || req.method === 'GET') && (url.pathname === '/auth/logout' || url.pathname === '/logout')) {
clearSessionCookie(res, isHttpsRequest(req));
if (req.method === 'GET') {
res.statusCode = 302;
res.setHeader('location', '/');
res.end();
return;
}
sendJson(res, 200, { ok: true });
return;
}
if (BASIC_AUTH_ENABLED) {
let creds;
try {
creds = loadBasicAuth();
} catch (e) {
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
return;
}
if (!isAuthorized(req, creds)) {
basicAuthRequired(res);
return;
}
}
if (req.url?.startsWith('/api') && (req.url === '/api' || req.url.startsWith('/api/'))) {
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
unauthorized(res);
return;
}
}
let token;
try {
token = loadApiReadToken();
} catch (e) {
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
return;
}
proxyApi(req, res, token);
return;
}
serveStatic(req, res);
}
const server = http.createServer((req, res) => {
handler(req, res).catch((e) => {
if (res.headersSent) {
res.destroy();
return;
}
send(res, 500, { 'content-type': 'text/plain; charset=utf-8' }, String(e?.message || e));
});
});
server.on('upgrade', (req, socket, head) => {
try {
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
if (!isGraphqlPath(url.pathname)) {
socket.destroy();
return;
}
if (AUTH_MODE !== 'off' && AUTH_MODE !== 'none' && AUTH_MODE !== 'disabled') {
const user = resolveAuthenticatedUser(req);
if (!user) {
try {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
} catch {
// ignore
}
socket.destroy();
return;
}
}
proxyGraphqlWs(req, socket, head);
} catch {
socket.destroy();
}
});
server.listen(PORT, () => {
console.log(
JSON.stringify(
{
service: 'trade-frontend',
port: PORT,
staticDir: STATIC_DIR,
apiUpstream: API_UPSTREAM,
hasuraUpstream: HASURA_UPSTREAM,
basicAuthFile: BASIC_AUTH_FILE,
basicAuthMode: BASIC_AUTH_MODE,
apiReadTokenFile: API_READ_TOKEN_FILE,
authUserHeader: AUTH_USER_HEADER,
authMode: AUTH_MODE,
htpasswdFile: HTPASSWD_FILE,
},
null,
2
)
);
});

View File

@@ -6,5 +6,18 @@ namespace: trade-prod
resources: resources:
- ../../base - ../../base
patchesStrategicMerge:
- dlob-rpc-endpoint-patch.yaml
- dlob-rpc-server-endpoint-patch.yaml
- frontend-graphql-proxy-patch.yaml
configMapGenerator:
- name: trade-frontend-server-script
files:
- frontend-server.mjs
generatorOptions:
disableNameSuffixHash: true
commonLabels: commonLabels:
env: prod env: prod

View File

@@ -66,7 +66,7 @@ function resolveConfig() {
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', ''); const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP'); const markets = envList('DLOB_MARKETS', 'SOL-PERP,DOGE-PERP,JUP-PERP');
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 }); const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
const source = envString('TICKS_SOURCE', 'dlob_stats'); const source = envString('TICKS_SOURCE', 'dlob_stats');

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-publisher
spec:
template:
spec:
containers:
- name: publisher
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -0,0 +1,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-server
spec:
template:
spec:
containers:
- name: server
env:
- name: ENDPOINT
value: "http://10.66.66.1:8899"
- name: WS_ENDPOINT
value: "ws://10.66.66.1:8900"

View File

@@ -1,16 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: dlob-worker
spec:
template:
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: worker
env:
- name: DLOB_HTTP_URL
value: https://dlob.drift.trade
- name: DLOB_FORCE_IPV6
value: "true"

View File

@@ -37,6 +37,8 @@ const AUTH_SESSION_COOKIE = String(process.env.AUTH_SESSION_COOKIE || 'trade_ses
.trim() .trim()
.toLowerCase(); .toLowerCase();
const AUTH_SESSION_TTL_SECONDS = Number.parseInt(process.env.AUTH_SESSION_TTL_SECONDS || '43200', 10); // 12h const AUTH_SESSION_TTL_SECONDS = Number.parseInt(process.env.AUTH_SESSION_TTL_SECONDS || '43200', 10); // 12h
const DLOB_SOURCE_COOKIE = String(process.env.DLOB_SOURCE_COOKIE || 'trade_dlob_source').trim() || 'trade_dlob_source';
const DLOB_SOURCE_DEFAULT = String(process.env.DLOB_SOURCE_DEFAULT || 'mevnode').trim() || 'mevnode';
function readJson(filePath) { function readJson(filePath) {
const raw = fs.readFileSync(filePath, 'utf8'); const raw = fs.readFileSync(filePath, 'utf8');
@@ -143,6 +145,29 @@ function safePathFromUrlPath(urlPath) {
return normalized; return normalized;
} }
function injectIndexHtml(html, { dlobSource, redirectPath }) {
const src = normalizeDlobSource(dlobSource) || 'mevnode';
const redirect = safeRedirectPath(redirectPath);
const hrefBase = `/prefs/dlob-source?redirect=${encodeURIComponent(redirect)}&set=`;
const styleActive = 'font-weight:700;text-decoration:underline;';
const styleInactive = 'font-weight:400;text-decoration:none;';
const snippet = `
<!-- trade: dlob source switch -->
<div style="position:fixed;right:12px;bottom:12px;z-index:2147483647;background:rgba(0,0,0,0.72);color:#fff;padding:8px 10px;border-radius:10px;font:12px/1.2 system-ui,-apple-system,Segoe UI,Roboto,sans-serif;backdrop-filter:blur(6px);">
<span style="opacity:0.85;margin-right:6px;">DLOB</span>
<a href="${hrefBase}mevnode" style="color:#fff;${src === 'mevnode' ? styleActive : styleInactive}">mevnode</a>
<span style="opacity:0.6;margin:0 6px;">|</span>
<a href="${hrefBase}drift" style="color:#fff;${src === 'drift' ? styleActive : styleInactive}">drift</a>
</div>
`;
const bodyClose = /<\/body>/i;
if (bodyClose.test(html)) return html.replace(bodyClose, `${snippet}</body>`);
return `${html}\n${snippet}\n`;
}
function serveStatic(req, res) { function serveStatic(req, res) {
if (req.method !== 'GET' && req.method !== 'HEAD') { if (req.method !== 'GET' && req.method !== 'HEAD') {
send(res, 405, { 'content-type': 'text/plain; charset=utf-8' }, 'method_not_allowed'); send(res, 405, { 'content-type': 'text/plain; charset=utf-8' }, 'method_not_allowed');
@@ -171,6 +196,15 @@ function serveStatic(req, res) {
res.setHeader('content-type', contentTypeFor(filePath)); res.setHeader('content-type', contentTypeFor(filePath));
res.setHeader('cache-control', filePath.endsWith('index.html') ? 'no-cache' : 'public, max-age=31536000'); res.setHeader('cache-control', filePath.endsWith('index.html') ? 'no-cache' : 'public, max-age=31536000');
if (req.method === 'HEAD') return void res.end(); if (req.method === 'HEAD') return void res.end();
if (filePath.endsWith('index.html')) {
const html = fs.readFileSync(filePath, 'utf8');
const injected = injectIndexHtml(html, {
dlobSource: resolveDlobSource(req),
redirectPath: url.pathname + url.search,
});
res.end(injected);
return true;
}
fs.createReadStream(filePath).pipe(res); fs.createReadStream(filePath).pipe(res);
return true; return true;
} catch { } catch {
@@ -222,6 +256,43 @@ function readCookie(req, name) {
return null; return null;
} }
function normalizeDlobSource(value) {
const v = String(value ?? '')
.trim()
.toLowerCase();
if (v === 'mevnode') return 'mevnode';
if (v === 'drift') return 'drift';
return null;
}
function resolveDlobSource(req) {
const fromCookie = normalizeDlobSource(readCookie(req, DLOB_SOURCE_COOKIE));
if (fromCookie) return fromCookie;
return normalizeDlobSource(DLOB_SOURCE_DEFAULT) || 'mevnode';
}
function safeRedirectPath(value) {
const s = String(value ?? '').trim();
if (!s.startsWith('/')) return '/';
if (s.startsWith('//')) return '/';
return s.replace(/\r|\n/g, '');
}
function setDlobSourceCookie(res, { secure, dlobSource }) {
const src = normalizeDlobSource(dlobSource);
if (!src) return false;
const parts = [
`${DLOB_SOURCE_COOKIE}=${src}`,
'Path=/',
'SameSite=Lax',
'HttpOnly',
'Max-Age=31536000',
];
if (secure) parts.push('Secure');
res.setHeader('set-cookie', parts.join('; '));
return true;
}
function resolveAuthUser(req) { function resolveAuthUser(req) {
const user = readHeader(req, AUTH_USER_HEADER) || readHeader(req, 'x-webauth-user'); const user = readHeader(req, AUTH_USER_HEADER) || readHeader(req, 'x-webauth-user');
const value = typeof user === 'string' ? user.trim() : ''; const value = typeof user === 'string' ? user.trim() : '';
@@ -423,7 +494,7 @@ function withCors(res) {
res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS'); res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS');
res.setHeader( res.setHeader(
'access-control-allow-headers', 'access-control-allow-headers',
'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id' 'content-type, authorization, x-hasura-admin-secret, x-hasura-role, x-hasura-user-id, x-hasura-dlob-source'
); );
} }
@@ -440,6 +511,8 @@ function proxyGraphqlHttp(req, res) {
const headers = stripHopByHopHeaders(req.headers); const headers = stripHopByHopHeaders(req.headers);
headers.host = target.host; headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const upstreamReq = lib.request( const upstreamReq = lib.request(
{ {
@@ -499,6 +572,8 @@ function proxyGraphqlWs(req, socket, head) {
delete headers['content-length']; delete headers['content-length'];
delete headers['content-type']; delete headers['content-type'];
headers.host = target.host; headers.host = target.host;
delete headers['x-hasura-dlob-source'];
headers['x-hasura-dlob-source'] = resolveDlobSource(req);
const lines = []; const lines = [];
lines.push(`GET ${target.pathname + target.search} HTTP/1.1`); lines.push(`GET ${target.pathname + target.search} HTTP/1.1`);
@@ -570,6 +645,25 @@ async function handler(req, res) {
return; return;
} }
if (req.method === 'GET' && url.pathname === '/prefs/dlob-source') {
const set = url.searchParams.get('set');
if (!set) {
sendJson(res, 200, { ok: true, dlobSource: resolveDlobSource(req) });
return;
}
const ok = setDlobSourceCookie(res, { secure: isHttpsRequest(req), dlobSource: set });
if (!ok) {
sendJson(res, 400, { ok: false, error: 'invalid_dlob_source' });
return;
}
res.statusCode = 302;
res.setHeader('location', safeRedirectPath(url.searchParams.get('redirect') || '/'));
res.end();
return;
}
if (req.method === 'POST' && url.pathname === '/auth/login') { if (req.method === 'POST' && url.pathname === '/auth/login') {
if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') { if (AUTH_MODE === 'off' || AUTH_MODE === 'none' || AUTH_MODE === 'disabled') {
sendJson(res, 400, { ok: false, error: 'auth_disabled' }); sendJson(res, 400, { ok: false, error: 'auth_disabled' });

View File

@@ -18,7 +18,7 @@ spec:
name: trade-hasura name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET key: HASURA_GRAPHQL_ADMIN_SECRET
- name: DLOB_MARKETS - name: DLOB_MARKETS
value: PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP value: SOL-PERP,DOGE-PERP,JUP-PERP
- name: TICKS_POLL_MS - name: TICKS_POLL_MS
value: "1000" value: "1000"
- name: TICKS_SOURCE - name: TICKS_SOURCE

View File

@@ -10,11 +10,12 @@ resources:
- frontend-ingress-root.yaml - frontend-ingress-root.yaml
patchesStrategicMerge: patchesStrategicMerge:
- dlob-rpc-endpoint-patch.yaml
- dlob-rpc-server-endpoint-patch.yaml
- hasura-patch.yaml - hasura-patch.yaml
- frontend-auth-patch.yaml - frontend-auth-patch.yaml
- frontend-graphql-proxy-patch.yaml - frontend-graphql-proxy-patch.yaml
- ingestor-dlob-patch.yaml - ingestor-dlob-patch.yaml
- dlob-worker-patch.yaml
configMapGenerator: configMapGenerator:
- name: trade-dlob-ingestor-script - name: trade-dlob-ingestor-script