Compare commits
3 Commits
e6a2731d7e
...
b46d7d85c6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b46d7d85c6 | ||
|
|
e5543f408a | ||
|
|
b239f564b2 |
@@ -18,6 +18,9 @@ spec:
|
||||
- name: trade-api-wrapper
|
||||
configMap:
|
||||
name: trade-api-wrapper
|
||||
- name: trade-api-upstream
|
||||
configMap:
|
||||
name: trade-api-upstream
|
||||
containers:
|
||||
- name: api
|
||||
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
|
||||
@@ -31,6 +34,10 @@ spec:
|
||||
mountPath: /override/wrapper.mjs
|
||||
subPath: wrapper.mjs
|
||||
readOnly: true
|
||||
- name: trade-api-upstream
|
||||
mountPath: /app/services/api/server.mjs
|
||||
subPath: server.mjs
|
||||
readOnly: true
|
||||
env:
|
||||
- name: PORT
|
||||
value: "8787"
|
||||
|
||||
1661
kustomize/base/api/server.mjs
Normal file
1661
kustomize/base/api/server.mjs
Normal file
File diff suppressed because it is too large
Load Diff
48
kustomize/base/candles-cache-worker/deployment.yaml
Normal file
48
kustomize/base/candles-cache-worker/deployment.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: candles-cache-worker
|
||||
annotations:
|
||||
argocd.argoproj.io/sync-wave: "6"
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: candles-cache-worker
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: candles-cache-worker
|
||||
spec:
|
||||
containers:
|
||||
- name: worker
|
||||
image: node:20-slim
|
||||
imagePullPolicy: IfNotPresent
|
||||
env:
|
||||
- name: HASURA_GRAPHQL_URL
|
||||
value: http://hasura:8080/v1/graphql
|
||||
- name: HASURA_ADMIN_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: trade-hasura
|
||||
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||
- name: CANDLES_SYMBOLS
|
||||
value: SOL-PERP,PUMP-PERP
|
||||
- name: CANDLES_SOURCES
|
||||
value: any
|
||||
- name: CANDLES_TFS
|
||||
value: 1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d
|
||||
- name: CANDLES_TARGET_POINTS
|
||||
value: "1024"
|
||||
- name: CANDLES_POLL_MS
|
||||
value: "5000"
|
||||
command: ["node", "/app/worker.mjs"]
|
||||
volumeMounts:
|
||||
- name: script
|
||||
mountPath: /app/worker.mjs
|
||||
subPath: worker.mjs
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: script
|
||||
configMap:
|
||||
name: candles-cache-worker-script
|
||||
344
kustomize/base/candles-cache-worker/worker.mjs
Normal file
344
kustomize/base/candles-cache-worker/worker.mjs
Normal file
@@ -0,0 +1,344 @@
|
||||
import fs from 'node:fs';
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function readJsonFile(filePath) {
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function clampInt(value, min, max, fallback) {
|
||||
const n = Number.parseInt(String(value ?? ''), 10);
|
||||
if (!Number.isInteger(n)) return fallback;
|
||||
return Math.min(max, Math.max(min, n));
|
||||
}
|
||||
|
||||
function envList(name, fallbackCsv) {
|
||||
const raw = process.env[name] ?? fallbackCsv;
|
||||
return String(raw)
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function parseTimeframeToSeconds(tf) {
|
||||
const v = String(tf || '').trim().toLowerCase();
|
||||
if (!v) return 60;
|
||||
const m = v.match(/^(\d+)(s|m|h|d)$/);
|
||||
if (!m) throw new Error('invalid_tf');
|
||||
const num = Number.parseInt(m[1], 10);
|
||||
if (!Number.isInteger(num) || num <= 0) throw new Error('invalid_tf');
|
||||
const unit = m[2];
|
||||
const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400;
|
||||
return num * mult;
|
||||
}
|
||||
|
||||
function sqlLit(value) {
|
||||
const s = String(value ?? '');
|
||||
return `'${s.replace(/'/g, "''")}'`;
|
||||
}
|
||||
|
||||
function resolveHasuraBaseUrl(graphqlUrlOrBase) {
|
||||
const u = String(graphqlUrlOrBase || '').trim();
|
||||
if (!u) return 'http://hasura:8080';
|
||||
// common case: http://hasura:8080/v1/graphql
|
||||
if (u.endsWith('/v1/graphql')) return u.slice(0, -'/v1/graphql'.length);
|
||||
return u.replace(/\/$/, '');
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const tokensPath =
|
||||
process.env.HASURA_TOKENS_FILE ||
|
||||
process.env.TOKENS_FILE ||
|
||||
process.env.HASURA_CONFIG_FILE ||
|
||||
'/app/tokens/hasura.json';
|
||||
const tokens = readJsonFile(tokensPath) || {};
|
||||
|
||||
const graphqlUrl =
|
||||
process.env.HASURA_GRAPHQL_URL ||
|
||||
tokens.graphqlUrl ||
|
||||
tokens.apiUrl ||
|
||||
'http://hasura:8080/v1/graphql';
|
||||
const hasuraBaseUrl = resolveHasuraBaseUrl(graphqlUrl);
|
||||
|
||||
const hasuraAdminSecret =
|
||||
process.env.HASURA_ADMIN_SECRET ||
|
||||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
||||
tokens.adminSecret ||
|
||||
tokens.hasuraAdminSecret;
|
||||
|
||||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (required for /v2/query run_sql)');
|
||||
|
||||
const symbols = envList('CANDLES_SYMBOLS', 'SOL-PERP,PUMP-PERP');
|
||||
|
||||
// sources: include "any" (sourceKey='') by default.
|
||||
const rawSources = envList('CANDLES_SOURCES', 'any');
|
||||
const sources = [];
|
||||
for (const s of rawSources) {
|
||||
const v = String(s).trim();
|
||||
if (!v) continue;
|
||||
if (v.toLowerCase() === 'any') sources.push('');
|
||||
else sources.push(v);
|
||||
}
|
||||
if (!sources.includes('')) sources.unshift('');
|
||||
|
||||
const tfList = envList('CANDLES_TFS', '1s,3s,5s,15s,30s,1m,3m,5m,15m,30m,1h,4h,12h,1d');
|
||||
const bucketSecondsList = tfList
|
||||
.map((t) => {
|
||||
try {
|
||||
return parseTimeframeToSeconds(t);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.filter((n) => n != null)
|
||||
.map((n) => n)
|
||||
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
|
||||
.sort((a, b) => a - b);
|
||||
|
||||
const targetPoints = clampInt(process.env.CANDLES_TARGET_POINTS, 10, 100_000, 1024);
|
||||
// legacy: kept for compatibility; if set, used as a minimum warmup window (days).
|
||||
const backfillDays = clampInt(process.env.CANDLES_BACKFILL_DAYS, 0, 3650, 0);
|
||||
const pollMs = clampInt(process.env.CANDLES_POLL_MS, 250, 60_000, 5000);
|
||||
|
||||
return { hasuraBaseUrl, hasuraAdminSecret, symbols, sources, bucketSecondsList, targetPoints, backfillDays, pollMs };
|
||||
}
|
||||
|
||||
async function hasuraRunSql(cfg, sql, { readOnly } = { readOnly: false }) {
|
||||
const url = `${cfg.hasuraBaseUrl}/v2/query`;
|
||||
const body = {
|
||||
type: 'run_sql',
|
||||
args: {
|
||||
source: 'default',
|
||||
sql,
|
||||
read_only: Boolean(readOnly),
|
||||
},
|
||||
};
|
||||
|
||||
const res = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json', 'x-hasura-admin-secret': cfg.hasuraAdminSecret },
|
||||
body: JSON.stringify(body),
|
||||
signal: AbortSignal.timeout(60_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura run_sql HTTP ${res.status}: ${text}`);
|
||||
return JSON.parse(text);
|
||||
}
|
||||
|
||||
function chunkSecondsForBucket(bucketSeconds) {
|
||||
if (bucketSeconds <= 5) return 15 * 60;
|
||||
if (bucketSeconds <= 60) return 60 * 60;
|
||||
if (bucketSeconds <= 300) return 6 * 60 * 60;
|
||||
if (bucketSeconds <= 3600) return 24 * 60 * 60;
|
||||
return 7 * 24 * 60 * 60;
|
||||
}
|
||||
|
||||
function sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds, fromIso, toIso }) {
|
||||
return `
|
||||
WITH base AS (
|
||||
SELECT
|
||||
time_bucket(make_interval(secs => ${bucketSeconds}), ts) AS bucket,
|
||||
ts,
|
||||
COALESCE(mark_price, oracle_price) AS px,
|
||||
oracle_price AS oracle_px
|
||||
FROM public.drift_ticks
|
||||
WHERE symbol = ${sqlLit(symbol)}
|
||||
AND ts >= ${sqlLit(fromIso)}::timestamptz
|
||||
AND ts < ${sqlLit(toIso)}::timestamptz
|
||||
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)})
|
||||
),
|
||||
agg AS (
|
||||
SELECT
|
||||
bucket,
|
||||
(array_agg(px ORDER BY ts ASC))[1] AS open,
|
||||
max(px) AS high,
|
||||
min(px) AS low,
|
||||
(array_agg(px ORDER BY ts DESC))[1] AS close,
|
||||
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
|
||||
count(*)::bigint AS ticks
|
||||
FROM base
|
||||
GROUP BY bucket
|
||||
)
|
||||
INSERT INTO public.drift_candles_cache
|
||||
(bucket, bucket_seconds, symbol, source, open, high, low, close, oracle_close, ticks, updated_at)
|
||||
SELECT
|
||||
bucket, ${bucketSeconds}, ${sqlLit(symbol)}, ${sqlLit(sourceKey)}, open, high, low, close, oracle_close, ticks, now()
|
||||
FROM agg
|
||||
ON CONFLICT (bucket, bucket_seconds, symbol, source) DO UPDATE SET
|
||||
open = EXCLUDED.open,
|
||||
high = EXCLUDED.high,
|
||||
low = EXCLUDED.low,
|
||||
close = EXCLUDED.close,
|
||||
oracle_close = EXCLUDED.oracle_close,
|
||||
ticks = EXCLUDED.ticks,
|
||||
updated_at = now();
|
||||
`;
|
||||
}
|
||||
|
||||
async function getTickRange(cfg, { symbol, sourceKey }) {
|
||||
const sql = `
|
||||
SELECT min(ts) AS min_ts, max(ts) AS max_ts
|
||||
FROM public.drift_ticks
|
||||
WHERE symbol=${sqlLit(symbol)}
|
||||
AND (${sqlLit(sourceKey)} = '' OR source = ${sqlLit(sourceKey)});
|
||||
`;
|
||||
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
|
||||
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
|
||||
if (!row) return { minTs: null, maxTs: null };
|
||||
const minTs = row[0] ? String(row[0]).trim() : null;
|
||||
const maxTs = row[1] ? String(row[1]).trim() : null;
|
||||
return { minTs: minTs && minTs.length ? minTs : null, maxTs: maxTs && maxTs.length ? maxTs : null };
|
||||
}
|
||||
|
||||
function desiredFromIso({ minTsIso, maxTsIso, bucketSeconds, targetPoints, backfillDays }) {
|
||||
const endMs = Date.parse(maxTsIso);
|
||||
const minMs = minTsIso ? Date.parse(minTsIso) : null;
|
||||
const wantSpanMs = targetPoints * bucketSeconds * 1000;
|
||||
const wantFromMs = endMs - wantSpanMs;
|
||||
const minFromMs = backfillDays > 0 ? endMs - backfillDays * 24 * 60 * 60 * 1000 : null;
|
||||
let fromMs = wantFromMs;
|
||||
if (minFromMs != null) fromMs = Math.min(fromMs, minFromMs);
|
||||
if (minMs != null) fromMs = Math.max(fromMs, minMs);
|
||||
return new Date(Math.max(0, fromMs)).toISOString();
|
||||
}
|
||||
|
||||
function safetyWindowSeconds(bucketSeconds) {
|
||||
if (bucketSeconds <= 60) return 10 * 60;
|
||||
if (bucketSeconds <= 300) return 60 * 60;
|
||||
if (bucketSeconds <= 3600) return 6 * 60 * 60;
|
||||
if (bucketSeconds <= 14_400) return 24 * 60 * 60;
|
||||
return 2 * 24 * 60 * 60;
|
||||
}
|
||||
|
||||
async function backfill(cfg, { symbol, sourceKey, fromIso, toIso }) {
|
||||
for (const bs of cfg.bucketSecondsList) {
|
||||
const chunk = chunkSecondsForBucket(bs);
|
||||
for (let t = Date.parse(fromIso); t < Date.parse(toIso); t += chunk * 1000) {
|
||||
const a = new Date(t).toISOString();
|
||||
const b = new Date(Math.min(Date.parse(toIso), t + chunk * 1000)).toISOString();
|
||||
await hasuraRunSql(cfg, sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: a, toIso: b }));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds }) {
|
||||
const sql = `
|
||||
SELECT max(bucket) AS max_bucket
|
||||
FROM public.drift_candles_cache
|
||||
WHERE symbol=${sqlLit(symbol)} AND source=${sqlLit(sourceKey)} AND bucket_seconds=${bucketSeconds};
|
||||
`;
|
||||
const out = await hasuraRunSql(cfg, sql, { readOnly: true });
|
||||
// Hasura returns {result_type, result:[[col...],[row...]]}
|
||||
const row = Array.isArray(out?.result) && out.result.length >= 2 ? out.result[1] : null;
|
||||
const v = row && row[0] ? String(row[0]) : null;
|
||||
return v && v.trim() ? v.trim() : null;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'candles-cache-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraBaseUrl: cfg.hasuraBaseUrl,
|
||||
symbols: cfg.symbols,
|
||||
sources: cfg.sources.map((s) => (s ? s : '(any)')),
|
||||
bucketSecondsList: cfg.bucketSecondsList,
|
||||
targetPoints: cfg.targetPoints,
|
||||
backfillDays: cfg.backfillDays,
|
||||
pollMs: cfg.pollMs,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
// Backfill to warm cache: for each timeframe keep ~targetPoints candles (or "as much as we have").
|
||||
for (const symbol of cfg.symbols) {
|
||||
for (const sourceKey of cfg.sources) {
|
||||
const range = await getTickRange(cfg, { symbol, sourceKey });
|
||||
if (!range.maxTs) continue;
|
||||
const toIso = new Date(Date.parse(range.maxTs)).toISOString();
|
||||
const maxBs = cfg.bucketSecondsList[cfg.bucketSecondsList.length - 1] || 60;
|
||||
const fromIso = desiredFromIso({
|
||||
minTsIso: range.minTs,
|
||||
maxTsIso: toIso,
|
||||
bucketSeconds: maxBs,
|
||||
targetPoints: cfg.targetPoints,
|
||||
backfillDays: cfg.backfillDays,
|
||||
});
|
||||
|
||||
console.log(
|
||||
`[candles-cache-worker] warmup symbol=${symbol} source=${sourceKey || '(any)'} from=${fromIso} to=${toIso} points=${cfg.targetPoints}`
|
||||
);
|
||||
try {
|
||||
await backfill(cfg, { symbol, sourceKey, fromIso, toIso });
|
||||
} catch (err) {
|
||||
console.error(`[candles-cache-worker] warmup failed (${symbol}/${sourceKey || 'any'}): ${String(err?.message || err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prime last buckets.
|
||||
const last = new Map(); // key -> iso bucket
|
||||
for (const symbol of cfg.symbols) {
|
||||
for (const sourceKey of cfg.sources) {
|
||||
for (const bs of cfg.bucketSecondsList) {
|
||||
const k = `${symbol}::${sourceKey}::${bs}`;
|
||||
try {
|
||||
const maxBucket = await getMaxBucket(cfg, { symbol, sourceKey, bucketSeconds: bs });
|
||||
if (maxBucket) last.set(k, maxBucket);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
const loopNow = Date.now();
|
||||
const loopIso = new Date(loopNow).toISOString();
|
||||
|
||||
for (const symbol of cfg.symbols) {
|
||||
for (const sourceKey of cfg.sources) {
|
||||
for (const bs of cfg.bucketSecondsList) {
|
||||
const k = `${symbol}::${sourceKey}::${bs}`;
|
||||
const prev = last.get(k);
|
||||
const safety = safetyWindowSeconds(bs);
|
||||
const fromMs = prev ? Date.parse(prev) - safety * 1000 : loopNow - safety * 1000;
|
||||
const fromIso2 = new Date(Math.max(0, fromMs)).toISOString();
|
||||
try {
|
||||
await hasuraRunSql(
|
||||
cfg,
|
||||
sqlUpsertCandlesFromTicks({ symbol, sourceKey, bucketSeconds: bs, fromIso: fromIso2, toIso: loopIso })
|
||||
);
|
||||
// best-effort: move last pointer close to now (actual max will lag by at most one bucket)
|
||||
last.set(k, loopIso);
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[candles-cache-worker] update failed (${symbol}/${sourceKey || 'any'}/${bs}s): ${String(err?.message || err)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
@@ -31,7 +31,7 @@ spec:
|
||||
- name: DLOB_POLL_MS
|
||||
value: "1000"
|
||||
- name: DLOB_SLIPPAGE_SIZES_USD
|
||||
value: "10,25,50,100,250,500,1000,5000,10000,50000"
|
||||
value: "0.1,0.2,0.5,1,2,5,10,25,50,100,250,500,1000,5000,10000,50000"
|
||||
- name: PRICE_PRECISION
|
||||
value: "1000000"
|
||||
- name: BASE_PRECISION
|
||||
|
||||
@@ -1,6 +1,16 @@
|
||||
import fs from 'node:fs';
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function readJsonFile(filePath) {
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
@@ -19,14 +29,87 @@ function envList(name, fallbackCsv) {
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function envIntList(name, fallbackCsv) {
|
||||
const out = [];
|
||||
for (const item of envList(name, fallbackCsv)) {
|
||||
const n = Number.parseInt(item, 10);
|
||||
if (!Number.isFinite(n)) continue;
|
||||
out.push(n);
|
||||
function parsePositiveNumber(value) {
|
||||
const n = Number.parseFloat(String(value ?? '').trim());
|
||||
if (!Number.isFinite(n) || !(n > 0)) return null;
|
||||
return n;
|
||||
}
|
||||
return out.length ? out : envList(name, fallbackCsv).map((v) => Number.parseInt(v, 10)).filter(Number.isFinite);
|
||||
|
||||
function resolveConfig() {
|
||||
const tokensPath =
|
||||
process.env.HASURA_TOKENS_FILE ||
|
||||
process.env.TOKENS_FILE ||
|
||||
process.env.HASURA_CONFIG_FILE ||
|
||||
'/app/tokens/hasura.json';
|
||||
const tokens = readJsonFile(tokensPath) || {};
|
||||
|
||||
const hasuraUrl =
|
||||
process.env.HASURA_GRAPHQL_URL ||
|
||||
tokens.graphqlUrl ||
|
||||
tokens.apiUrl ||
|
||||
'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret =
|
||||
process.env.HASURA_ADMIN_SECRET ||
|
||||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
||||
tokens.adminSecret ||
|
||||
tokens.hasuraAdminSecret;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
|
||||
|
||||
const sizesUsd = envList('DLOB_SLIPPAGE_SIZES_USD', '10,25,50,100,250,500,1000')
|
||||
.map(parsePositiveNumber)
|
||||
.filter((n) => n != null)
|
||||
.map((n) => n)
|
||||
.filter((n, idx, arr) => arr.findIndex((x) => x === n) === idx)
|
||||
.sort((a, b) => a - b);
|
||||
|
||||
const sizesUsdInt = sizesUsd.filter((n) => Number.isInteger(n));
|
||||
|
||||
const depthLevels = clampInt(process.env.DLOB_DEPTH, 1, 50, 25);
|
||||
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
||||
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
||||
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0) throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
||||
if (!Number.isFinite(basePrecision) || basePrecision <= 0) throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
||||
|
||||
return {
|
||||
hasuraUrl,
|
||||
hasuraAdminSecret,
|
||||
hasuraAuthToken,
|
||||
markets,
|
||||
pollMs,
|
||||
sizesUsd,
|
||||
sizesUsdInt,
|
||||
depthLevels,
|
||||
pricePrecision,
|
||||
basePrecision,
|
||||
};
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) {
|
||||
throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
}
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function toNumberOrNull(value) {
|
||||
@@ -41,167 +124,107 @@ function toNumberOrNull(value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
function numStr(value) {
|
||||
if (value == null) return null;
|
||||
if (typeof value === 'number') return Number.isFinite(value) ? String(value) : null;
|
||||
if (typeof value === 'string') return value.trim() || null;
|
||||
return null;
|
||||
}
|
||||
|
||||
function jsonNormalize(value) {
|
||||
if (typeof value !== 'string') return value;
|
||||
const s = value.trim();
|
||||
if (!s) return null;
|
||||
function normalizeLevels(raw) {
|
||||
if (raw == null) return [];
|
||||
if (Array.isArray(raw)) return raw;
|
||||
if (typeof raw === 'string') {
|
||||
const s = raw.trim();
|
||||
if (!s) return [];
|
||||
try {
|
||||
return JSON.parse(s);
|
||||
const v = JSON.parse(s);
|
||||
return Array.isArray(v) ? v : [];
|
||||
} catch {
|
||||
return value;
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret = process.env.HASURA_ADMIN_SECRET || process.env.HASURA_GRAPHQL_ADMIN_SECRET || undefined;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_POLL_MS, 250, 60_000, 1000);
|
||||
const sizesUsd = envIntList('DLOB_SLIPPAGE_SIZES_USD', '100,500,1000,5000,10000,50000');
|
||||
|
||||
const pricePrecision = Number(process.env.PRICE_PRECISION || 1_000_000);
|
||||
const basePrecision = Number(process.env.BASE_PRECISION || 1_000_000_000);
|
||||
if (!Number.isFinite(pricePrecision) || pricePrecision <= 0)
|
||||
throw new Error(`Invalid PRICE_PRECISION: ${process.env.PRICE_PRECISION}`);
|
||||
if (!Number.isFinite(basePrecision) || basePrecision <= 0)
|
||||
throw new Error(`Invalid BASE_PRECISION: ${process.env.BASE_PRECISION}`);
|
||||
|
||||
return {
|
||||
hasuraUrl,
|
||||
hasuraAdminSecret,
|
||||
hasuraAuthToken,
|
||||
markets,
|
||||
pollMs,
|
||||
sizesUsd,
|
||||
pricePrecision,
|
||||
basePrecision,
|
||||
};
|
||||
return [];
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
return json.data;
|
||||
}
|
||||
|
||||
function parseLevels(raw, pricePrecision, basePrecision, side) {
|
||||
const v = jsonNormalize(raw);
|
||||
if (!Array.isArray(v)) return [];
|
||||
|
||||
function parseScaledLevels(raw, pricePrecision, basePrecision) {
|
||||
const levels = normalizeLevels(raw);
|
||||
const out = [];
|
||||
for (const item of v) {
|
||||
const priceInt = toNumberOrNull(item?.price);
|
||||
const sizeInt = toNumberOrNull(item?.size);
|
||||
for (const it of levels) {
|
||||
const priceInt = toNumberOrNull(it?.price);
|
||||
const sizeInt = toNumberOrNull(it?.size);
|
||||
if (priceInt == null || sizeInt == null) continue;
|
||||
const price = priceInt / pricePrecision;
|
||||
const size = sizeInt / basePrecision;
|
||||
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;
|
||||
out.push({ price, size });
|
||||
const base = sizeInt / basePrecision;
|
||||
if (!Number.isFinite(price) || !Number.isFinite(base)) continue;
|
||||
out.push({ price, base });
|
||||
}
|
||||
|
||||
if (side === 'bid') out.sort((a, b) => b.price - a.price);
|
||||
if (side === 'ask') out.sort((a, b) => a.price - b.price);
|
||||
return out;
|
||||
}
|
||||
|
||||
function computeMid(bestBid, bestAsk, markPrice, oraclePrice) {
|
||||
if (bestBid != null && bestAsk != null) return (bestBid + bestAsk) / 2;
|
||||
if (markPrice != null) return markPrice;
|
||||
if (oraclePrice != null) return oraclePrice;
|
||||
return null;
|
||||
}
|
||||
|
||||
function simulateBuy(asks, mid, sizeUsd) {
|
||||
function simulateFill(levels, sizeUsd) {
|
||||
let remainingUsd = sizeUsd;
|
||||
let filledUsd = 0;
|
||||
let filledBase = 0;
|
||||
let totalQuoteUsd = 0;
|
||||
let worstPrice = null;
|
||||
let levelsConsumed = 0;
|
||||
|
||||
for (const lvl of asks) {
|
||||
if (!(remainingUsd > 0)) break;
|
||||
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
|
||||
|
||||
const maxBase = remainingUsd / lvl.price;
|
||||
const takeBase = Math.min(lvl.size, maxBase);
|
||||
if (!(takeBase > 0)) continue;
|
||||
|
||||
const cost = takeBase * lvl.price;
|
||||
filledUsd += cost;
|
||||
filledBase += takeBase;
|
||||
remainingUsd -= cost;
|
||||
worstPrice = lvl.price;
|
||||
for (const l of levels) {
|
||||
if (remainingUsd <= 0) break;
|
||||
const levelUsd = l.base * l.price;
|
||||
if (levelUsd <= 0) continue;
|
||||
levelsConsumed += 1;
|
||||
}
|
||||
worstPrice = l.price;
|
||||
|
||||
const vwap = filledBase > 0 ? filledUsd / filledBase : null;
|
||||
const impactBps = vwap != null && mid != null && mid > 0 ? (vwap / mid - 1) * 10_000 : null;
|
||||
const fillPct = sizeUsd > 0 ? filledUsd / sizeUsd : null;
|
||||
const takeUsd = Math.min(remainingUsd, levelUsd);
|
||||
const takeBase = takeUsd / l.price;
|
||||
|
||||
return { vwap, worstPrice, filledUsd, filledBase, impactBps, levelsConsumed, fillPct };
|
||||
}
|
||||
|
||||
function simulateSell(bids, mid, sizeUsd) {
|
||||
if (mid == null || !(mid > 0)) {
|
||||
return { vwap: null, worstPrice: null, filledUsd: 0, filledBase: 0, impactBps: null, levelsConsumed: 0, fillPct: null };
|
||||
}
|
||||
|
||||
const baseTarget = sizeUsd / mid;
|
||||
let remainingBase = baseTarget;
|
||||
let proceedsUsd = 0;
|
||||
let filledBase = 0;
|
||||
let worstPrice = null;
|
||||
let levelsConsumed = 0;
|
||||
|
||||
for (const lvl of bids) {
|
||||
if (!(remainingBase > 0)) break;
|
||||
if (!(lvl.price > 0) || !(lvl.size > 0)) continue;
|
||||
|
||||
const takeBase = Math.min(lvl.size, remainingBase);
|
||||
if (!(takeBase > 0)) continue;
|
||||
|
||||
const proceeds = takeBase * lvl.price;
|
||||
proceedsUsd += proceeds;
|
||||
remainingUsd -= takeUsd;
|
||||
filledUsd += takeUsd;
|
||||
filledBase += takeBase;
|
||||
remainingBase -= takeBase;
|
||||
worstPrice = lvl.price;
|
||||
levelsConsumed += 1;
|
||||
totalQuoteUsd += takeUsd;
|
||||
}
|
||||
|
||||
const vwap = filledBase > 0 ? proceedsUsd / filledBase : null;
|
||||
const impactBps = vwap != null && mid > 0 ? (1 - vwap / mid) * 10_000 : null;
|
||||
const fillPct = baseTarget > 0 ? filledBase / baseTarget : null;
|
||||
const vwapPrice = filledBase > 0 ? totalQuoteUsd / filledBase : null;
|
||||
const fillPct = sizeUsd > 0 ? (filledUsd / sizeUsd) * 100 : null;
|
||||
|
||||
return { vwap, worstPrice, filledUsd: proceedsUsd, filledBase, impactBps, levelsConsumed, fillPct };
|
||||
return {
|
||||
filledUsd,
|
||||
filledBase,
|
||||
vwapPrice,
|
||||
worstPrice,
|
||||
levelsConsumed,
|
||||
fillPct,
|
||||
};
|
||||
}
|
||||
|
||||
async function fetchL2Latest(cfg) {
|
||||
function impactBps({ side, mid, vwap }) {
|
||||
if (mid == null || vwap == null || mid <= 0) return null;
|
||||
if (side === 'buy') return ((vwap / mid) - 1) * 10_000;
|
||||
if (side === 'sell') return (1 - (vwap / mid)) * 10_000;
|
||||
return null;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-slippage-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
sizesUsd: cfg.sizesUsd,
|
||||
depthLevels: cfg.depthLevels,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
const lastSeenUpdatedAt = new Map(); // market -> updated_at
|
||||
|
||||
while (true) {
|
||||
const updatedAt = getIsoNow();
|
||||
|
||||
try {
|
||||
const query = `
|
||||
query DlobL2Latest($markets: [String!]!) {
|
||||
dlob_l2_latest(where: { market_name: { _in: $markets } }) {
|
||||
@@ -210,8 +233,6 @@ async function fetchL2Latest(cfg) {
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mark_price
|
||||
oracle_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
bids
|
||||
@@ -220,14 +241,93 @@ async function fetchL2Latest(cfg) {
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||
return Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
|
||||
const rows = Array.isArray(data?.dlob_l2_latest) ? data.dlob_l2_latest : [];
|
||||
|
||||
const objectsV1 = [];
|
||||
const objectsV2 = [];
|
||||
|
||||
for (const row of rows) {
|
||||
const market = String(row?.market_name || '').trim();
|
||||
if (!market) continue;
|
||||
|
||||
const rowUpdatedAt = row?.updated_at ?? null;
|
||||
if (rowUpdatedAt && lastSeenUpdatedAt.get(market) === rowUpdatedAt) continue;
|
||||
if (rowUpdatedAt) lastSeenUpdatedAt.set(market, rowUpdatedAt);
|
||||
|
||||
const bestBid = toNumberOrNull(row?.best_bid_price);
|
||||
const bestAsk = toNumberOrNull(row?.best_ask_price);
|
||||
if (bestBid == null || bestAsk == null) continue;
|
||||
|
||||
const mid = (bestBid + bestAsk) / 2;
|
||||
if (!Number.isFinite(mid) || mid <= 0) continue;
|
||||
|
||||
const bids = parseScaledLevels(row?.bids, cfg.pricePrecision, cfg.basePrecision)
|
||||
.slice()
|
||||
.sort((a, b) => b.price - a.price)
|
||||
.slice(0, cfg.depthLevels);
|
||||
const asks = parseScaledLevels(row?.asks, cfg.pricePrecision, cfg.basePrecision)
|
||||
.slice()
|
||||
.sort((a, b) => a.price - b.price)
|
||||
.slice(0, cfg.depthLevels);
|
||||
|
||||
for (const sizeUsd of cfg.sizesUsd) {
|
||||
// buy consumes asks (worse prices as you go up)
|
||||
{
|
||||
const sim = simulateFill(asks, sizeUsd);
|
||||
const baseObj = {
|
||||
market_name: market,
|
||||
side: 'buy',
|
||||
market_type: row?.market_type ?? 'perp',
|
||||
market_index: row?.market_index ?? null,
|
||||
ts: row?.ts == null ? null : String(row.ts),
|
||||
slot: row?.slot == null ? null : String(row.slot),
|
||||
mid_price: String(mid),
|
||||
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
|
||||
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
|
||||
filled_usd: String(sim.filledUsd),
|
||||
filled_base: String(sim.filledBase),
|
||||
impact_bps: impactBps({ side: 'buy', mid, vwap: sim.vwapPrice }),
|
||||
levels_consumed: sim.levelsConsumed,
|
||||
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
|
||||
raw: { depthLevels: cfg.depthLevels },
|
||||
updated_at: updatedAt,
|
||||
};
|
||||
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
|
||||
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
|
||||
}
|
||||
|
||||
async function upsertSlippage(cfg, rows) {
|
||||
if (!rows.length) return;
|
||||
// sell consumes bids (worse prices as you go down)
|
||||
{
|
||||
const sim = simulateFill(bids, sizeUsd);
|
||||
const baseObj = {
|
||||
market_name: market,
|
||||
side: 'sell',
|
||||
market_type: row?.market_type ?? 'perp',
|
||||
market_index: row?.market_index ?? null,
|
||||
ts: row?.ts == null ? null : String(row.ts),
|
||||
slot: row?.slot == null ? null : String(row.slot),
|
||||
mid_price: String(mid),
|
||||
vwap_price: sim.vwapPrice == null ? null : String(sim.vwapPrice),
|
||||
worst_price: sim.worstPrice == null ? null : String(sim.worstPrice),
|
||||
filled_usd: String(sim.filledUsd),
|
||||
filled_base: String(sim.filledBase),
|
||||
impact_bps: impactBps({ side: 'sell', mid, vwap: sim.vwapPrice }),
|
||||
levels_consumed: sim.levelsConsumed,
|
||||
fill_pct: sim.fillPct == null ? null : String(sim.fillPct),
|
||||
raw: { depthLevels: cfg.depthLevels },
|
||||
updated_at: updatedAt,
|
||||
};
|
||||
objectsV2.push({ ...baseObj, size_usd: String(sizeUsd) });
|
||||
if (Number.isInteger(sizeUsd)) objectsV1.push({ ...baseObj, size_usd: Math.trunc(sizeUsd) });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (objectsV1.length) {
|
||||
const mutation = `
|
||||
mutation UpsertDlobSlippage($rows: [dlob_slippage_latest_insert_input!]!) {
|
||||
mutation UpsertSlippageV1($rows: [dlob_slippage_latest_insert_input!]!) {
|
||||
insert_dlob_slippage_latest(
|
||||
objects: $rows
|
||||
on_conflict: {
|
||||
@@ -238,8 +338,6 @@ async function upsertSlippage(cfg, rows) {
|
||||
ts
|
||||
slot
|
||||
mid_price
|
||||
best_bid_price
|
||||
best_ask_price
|
||||
vwap_price
|
||||
worst_price
|
||||
filled_usd
|
||||
@@ -254,119 +352,40 @@ async function upsertSlippage(cfg, rows) {
|
||||
) { affected_rows }
|
||||
}
|
||||
`;
|
||||
await graphqlRequest(cfg, mutation, { rows });
|
||||
await graphqlRequest(cfg, mutation, { rows: objectsV1 });
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const cfg = resolveConfig();
|
||||
const lastUpdatedAtByMarket = new Map();
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
service: 'dlob-slippage-worker',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
sizesUsd: cfg.sizesUsd,
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
while (true) {
|
||||
const rows = [];
|
||||
|
||||
try {
|
||||
const l2Rows = await fetchL2Latest(cfg);
|
||||
for (const l2 of l2Rows) {
|
||||
const market = String(l2.market_name || '').trim();
|
||||
if (!market) continue;
|
||||
|
||||
const updatedAt = l2.updated_at || null;
|
||||
if (updatedAt && lastUpdatedAtByMarket.get(market) === updatedAt) continue;
|
||||
if (updatedAt) lastUpdatedAtByMarket.set(market, updatedAt);
|
||||
|
||||
const bestBid = toNumberOrNull(l2.best_bid_price);
|
||||
const bestAsk = toNumberOrNull(l2.best_ask_price);
|
||||
const markPrice = toNumberOrNull(l2.mark_price);
|
||||
const oraclePrice = toNumberOrNull(l2.oracle_price);
|
||||
const mid = computeMid(bestBid, bestAsk, markPrice, oraclePrice);
|
||||
|
||||
const bids = parseLevels(l2.bids, cfg.pricePrecision, cfg.basePrecision, 'bid');
|
||||
const asks = parseLevels(l2.asks, cfg.pricePrecision, cfg.basePrecision, 'ask');
|
||||
|
||||
for (const sizeUsd of cfg.sizesUsd) {
|
||||
const buy = simulateBuy(asks, mid, sizeUsd);
|
||||
rows.push({
|
||||
market_name: market,
|
||||
side: 'buy',
|
||||
size_usd: sizeUsd,
|
||||
market_type: l2.market_type ? String(l2.market_type) : 'perp',
|
||||
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mid_price: numStr(mid),
|
||||
best_bid_price: numStr(bestBid),
|
||||
best_ask_price: numStr(bestAsk),
|
||||
vwap_price: numStr(buy.vwap),
|
||||
worst_price: numStr(buy.worstPrice),
|
||||
filled_usd: numStr(buy.filledUsd),
|
||||
filled_base: numStr(buy.filledBase),
|
||||
impact_bps: numStr(buy.impactBps),
|
||||
levels_consumed: buy.levelsConsumed,
|
||||
fill_pct: numStr(buy.fillPct),
|
||||
raw: {
|
||||
ref: 'mid',
|
||||
units: 'usd',
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
});
|
||||
|
||||
const sell = simulateSell(bids, mid, sizeUsd);
|
||||
rows.push({
|
||||
market_name: market,
|
||||
side: 'sell',
|
||||
size_usd: sizeUsd,
|
||||
market_type: l2.market_type ? String(l2.market_type) : 'perp',
|
||||
market_index: typeof l2.market_index === 'number' ? l2.market_index : null,
|
||||
ts: l2.ts == null ? null : String(l2.ts),
|
||||
slot: l2.slot == null ? null : String(l2.slot),
|
||||
mid_price: numStr(mid),
|
||||
best_bid_price: numStr(bestBid),
|
||||
best_ask_price: numStr(bestAsk),
|
||||
vwap_price: numStr(sell.vwap),
|
||||
worst_price: numStr(sell.worstPrice),
|
||||
filled_usd: numStr(sell.filledUsd),
|
||||
filled_base: numStr(sell.filledBase),
|
||||
impact_bps: numStr(sell.impactBps),
|
||||
levels_consumed: sell.levelsConsumed,
|
||||
fill_pct: numStr(sell.fillPct),
|
||||
raw: {
|
||||
ref: 'mid',
|
||||
units: 'usd',
|
||||
pricePrecision: cfg.pricePrecision,
|
||||
basePrecision: cfg.basePrecision,
|
||||
},
|
||||
updated_at: updatedAt,
|
||||
});
|
||||
if (objectsV2.length) {
|
||||
const mutation = `
|
||||
mutation UpsertSlippageV2($rows: [dlob_slippage_latest_v2_insert_input!]!) {
|
||||
insert_dlob_slippage_latest_v2(
|
||||
objects: $rows
|
||||
on_conflict: {
|
||||
constraint: dlob_slippage_latest_v2_pkey
|
||||
update_columns: [
|
||||
market_type
|
||||
market_index
|
||||
ts
|
||||
slot
|
||||
mid_price
|
||||
vwap_price
|
||||
worst_price
|
||||
filled_usd
|
||||
filled_base
|
||||
impact_bps
|
||||
levels_consumed
|
||||
fill_pct
|
||||
raw
|
||||
updated_at
|
||||
]
|
||||
}
|
||||
) { affected_rows }
|
||||
}
|
||||
`;
|
||||
await graphqlRequest(cfg, mutation, { rows: objectsV2 });
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[dlob-slippage-worker] fetch/compute: ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
try {
|
||||
await upsertSlippage(cfg, rows);
|
||||
} catch (err) {
|
||||
console.error(`[dlob-slippage-worker] upsert: ${String(err?.message || err)}`);
|
||||
console.error(`[dlob-slippage-worker] ${String(err?.message || err)}`);
|
||||
}
|
||||
|
||||
await sleep(cfg.pollMs);
|
||||
|
||||
@@ -1,6 +1,16 @@
|
||||
import fs from 'node:fs';
|
||||
import process from 'node:process';
|
||||
import { setTimeout as sleep } from 'node:timers/promises';
|
||||
|
||||
function readJsonFile(filePath) {
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function getIsoNow() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
@@ -20,30 +30,53 @@ function envList(name, fallbackCsv) {
|
||||
}
|
||||
|
||||
function resolveConfig() {
|
||||
const hasuraUrl = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql').trim();
|
||||
const hasuraAdminSecret = String(process.env.HASURA_ADMIN_SECRET || '').trim();
|
||||
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
|
||||
const tokensPath =
|
||||
process.env.HASURA_TOKENS_FILE ||
|
||||
process.env.TOKENS_FILE ||
|
||||
process.env.HASURA_CONFIG_FILE ||
|
||||
'/app/tokens/hasura.json';
|
||||
const tokens = readJsonFile(tokensPath) || {};
|
||||
|
||||
const hasuraUrl =
|
||||
process.env.HASURA_GRAPHQL_URL ||
|
||||
tokens.graphqlUrl ||
|
||||
tokens.apiUrl ||
|
||||
'http://hasura:8080/v1/graphql';
|
||||
const hasuraAdminSecret =
|
||||
process.env.HASURA_ADMIN_SECRET ||
|
||||
process.env.HASURA_GRAPHQL_ADMIN_SECRET ||
|
||||
tokens.adminSecret ||
|
||||
tokens.hasuraAdminSecret;
|
||||
const hasuraAuthToken = process.env.HASURA_AUTH_TOKEN || process.env.HASURA_JWT || undefined;
|
||||
|
||||
const markets = envList('DLOB_MARKETS', 'PUMP-PERP,SOL-PERP,1MBONK-PERP,BTC-PERP,ETH-PERP');
|
||||
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 250, 60_000, 1000);
|
||||
const pollMs = clampInt(process.env.DLOB_TS_POLL_MS, 500, 60_000, 1000);
|
||||
|
||||
return { hasuraUrl, hasuraAdminSecret, markets, pollMs };
|
||||
return { hasuraUrl, hasuraAdminSecret, hasuraAuthToken, markets, pollMs };
|
||||
}
|
||||
|
||||
async function graphqlRequest(cfg, query, variables) {
|
||||
const headers = { 'content-type': 'application/json' };
|
||||
if (cfg.hasuraAuthToken) {
|
||||
headers.authorization = `Bearer ${cfg.hasuraAuthToken}`;
|
||||
} else if (cfg.hasuraAdminSecret) {
|
||||
headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret;
|
||||
} else {
|
||||
throw new Error('Missing Hasura auth (set HASURA_AUTH_TOKEN or HASURA_ADMIN_SECRET or mount tokens/hasura.json)');
|
||||
}
|
||||
|
||||
const res = await fetch(cfg.hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
|
||||
},
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables }),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text);
|
||||
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
if (json.errors?.length) {
|
||||
throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||
}
|
||||
return json.data;
|
||||
}
|
||||
|
||||
@@ -63,6 +96,7 @@ async function main() {
|
||||
service: 'dlob-ts-archiver',
|
||||
startedAt: getIsoNow(),
|
||||
hasuraUrl: cfg.hasuraUrl,
|
||||
hasuraAuth: cfg.hasuraAuthToken ? 'bearer' : cfg.hasuraAdminSecret ? 'admin-secret' : 'none',
|
||||
markets: cfg.markets,
|
||||
pollMs: cfg.pollMs,
|
||||
},
|
||||
@@ -93,6 +127,11 @@ async function main() {
|
||||
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
|
||||
raw
|
||||
}
|
||||
dlob_slippage_latest_v2(where: { market_name: { _in: $markets } }) {
|
||||
market_name side size_usd market_type market_index ts slot
|
||||
mid_price vwap_price worst_price filled_usd filled_base impact_bps levels_consumed fill_pct
|
||||
raw
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
@@ -160,28 +199,41 @@ async function main() {
|
||||
raw: r.raw ?? null,
|
||||
}));
|
||||
|
||||
if (!statsRows.length && !depthRows.length && !slippageRows.length) {
|
||||
await sleep(cfg.pollMs);
|
||||
continue;
|
||||
}
|
||||
const slippageRowsV2 = (data?.dlob_slippage_latest_v2 || []).map((r) => ({
|
||||
ts: now,
|
||||
market_name: r.market_name,
|
||||
side: r.side,
|
||||
size_usd: r.size_usd,
|
||||
market_type: r.market_type,
|
||||
market_index: r.market_index ?? null,
|
||||
source_ts: mapBigint(r.ts),
|
||||
slot: mapBigint(r.slot),
|
||||
mid_price: r.mid_price ?? null,
|
||||
vwap_price: r.vwap_price ?? null,
|
||||
worst_price: r.worst_price ?? null,
|
||||
filled_usd: r.filled_usd ?? null,
|
||||
filled_base: r.filled_base ?? null,
|
||||
impact_bps: r.impact_bps ?? null,
|
||||
levels_consumed: r.levels_consumed ?? null,
|
||||
fill_pct: r.fill_pct ?? null,
|
||||
raw: r.raw ?? null,
|
||||
}));
|
||||
|
||||
const mutation = `
|
||||
mutation InsertTs(
|
||||
$stats: [dlob_stats_ts_insert_input!]!
|
||||
$depth: [dlob_depth_bps_ts_insert_input!]!
|
||||
$slip: [dlob_slippage_ts_insert_input!]!
|
||||
$slipV2: [dlob_slippage_ts_v2_insert_input!]!
|
||||
) {
|
||||
insert_dlob_stats_ts(objects: $stats) { affected_rows }
|
||||
insert_dlob_depth_bps_ts(objects: $depth) { affected_rows }
|
||||
insert_dlob_slippage_ts(objects: $slip) { affected_rows }
|
||||
insert_dlob_slippage_ts_v2(objects: $slipV2) { affected_rows }
|
||||
}
|
||||
`;
|
||||
|
||||
await graphqlRequest(cfg, mutation, {
|
||||
stats: statsRows,
|
||||
depth: depthRows,
|
||||
slip: slippageRows,
|
||||
});
|
||||
await graphqlRequest(cfg, mutation, { stats: statsRows, depth: depthRows, slip: slippageRows, slipV2: slippageRowsV2 });
|
||||
} catch (err) {
|
||||
console.error(`[dlob-ts-archiver] ${String(err?.message || err)}`);
|
||||
}
|
||||
@@ -194,4 +246,3 @@ main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
|
||||
|
||||
@@ -97,9 +97,12 @@ async function main() {
|
||||
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
|
||||
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
|
||||
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
|
||||
const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' };
|
||||
const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' };
|
||||
const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' };
|
||||
const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' };
|
||||
const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' };
|
||||
const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' };
|
||||
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
|
||||
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
|
||||
|
||||
@@ -169,6 +172,21 @@ async function main() {
|
||||
await ensureTickTable(t);
|
||||
}
|
||||
|
||||
// Cached candles table (precomputed by worker; public read).
|
||||
await ensurePublicSelectTable(candlesCacheTable, [
|
||||
'bucket',
|
||||
'bucket_seconds',
|
||||
'symbol',
|
||||
'source',
|
||||
'open',
|
||||
'high',
|
||||
'low',
|
||||
'close',
|
||||
'oracle_close',
|
||||
'ticks',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
const ensureDlobTable = async (table, columns) => {
|
||||
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
|
||||
await metadata({ type: 'pg_track_table', args: { source, table } });
|
||||
@@ -321,6 +339,28 @@ async function main() {
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
|
||||
'market_name',
|
||||
'side',
|
||||
'size_usd',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'ts',
|
||||
'slot',
|
||||
'mid_price',
|
||||
'best_bid_price',
|
||||
'best_ask_price',
|
||||
'vwap_price',
|
||||
'worst_price',
|
||||
'filled_usd',
|
||||
'filled_base',
|
||||
'impact_bps',
|
||||
'levels_consumed',
|
||||
'fill_pct',
|
||||
'raw',
|
||||
'updated_at',
|
||||
]);
|
||||
|
||||
await ensurePublicSelectTable(dlobStatsTsTable, [
|
||||
'ts',
|
||||
'id',
|
||||
@@ -386,6 +426,27 @@ async function main() {
|
||||
'raw',
|
||||
]);
|
||||
|
||||
await ensurePublicSelectTable(dlobSlippageTsV2Table, [
|
||||
'ts',
|
||||
'id',
|
||||
'market_name',
|
||||
'side',
|
||||
'size_usd',
|
||||
'market_type',
|
||||
'market_index',
|
||||
'source_ts',
|
||||
'slot',
|
||||
'mid_price',
|
||||
'vwap_price',
|
||||
'worst_price',
|
||||
'filled_usd',
|
||||
'filled_base',
|
||||
'impact_bps',
|
||||
'levels_consumed',
|
||||
'fill_pct',
|
||||
'raw',
|
||||
]);
|
||||
|
||||
// Return table type for candle functions (needed for Hasura to track the function).
|
||||
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });
|
||||
|
||||
|
||||
@@ -125,6 +125,29 @@ CREATE TABLE IF NOT EXISTS public.drift_candles (
|
||||
ticks bigint
|
||||
);
|
||||
|
||||
-- Precomputed candle cache (materialized by a worker).
|
||||
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
|
||||
-- NOTE: `source=''` means "any source" (no source filter).
|
||||
CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
|
||||
bucket timestamptz NOT NULL,
|
||||
bucket_seconds integer NOT NULL,
|
||||
symbol text NOT NULL,
|
||||
source text NOT NULL DEFAULT '',
|
||||
open numeric NOT NULL,
|
||||
high numeric NOT NULL,
|
||||
low numeric NOT NULL,
|
||||
close numeric NOT NULL,
|
||||
oracle_close numeric,
|
||||
ticks bigint NOT NULL DEFAULT 0,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (bucket, bucket_seconds, symbol, source)
|
||||
);
|
||||
|
||||
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
|
||||
ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC);
|
||||
|
||||
-- If an older version of the function exists with an incompatible return type,
|
||||
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
|
||||
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
|
||||
@@ -272,6 +295,38 @@ CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
|
||||
ON public.dlob_slippage_latest (market_name);
|
||||
|
||||
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
|
||||
-- Keep v1 intact for backward compatibility and to avoid data loss.
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
|
||||
market_name TEXT NOT NULL,
|
||||
side TEXT NOT NULL, -- buy|sell
|
||||
size_usd NUMERIC NOT NULL,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
ts BIGINT,
|
||||
slot BIGINT,
|
||||
mid_price NUMERIC,
|
||||
best_bid_price NUMERIC,
|
||||
best_ask_price NUMERIC,
|
||||
vwap_price NUMERIC,
|
||||
worst_price NUMERIC,
|
||||
filled_usd NUMERIC,
|
||||
filled_base NUMERIC,
|
||||
impact_bps NUMERIC,
|
||||
levels_consumed INTEGER,
|
||||
fill_pct NUMERIC,
|
||||
raw JSONB,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (market_name, side, size_usd),
|
||||
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
|
||||
ON public.dlob_slippage_latest_v2 (updated_at DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
|
||||
ON public.dlob_slippage_latest_v2 (market_name);
|
||||
|
||||
-- Time-series tables for UI history (start: 7 days).
|
||||
-- Keep these append-only; use Timescale hypertables.
|
||||
|
||||
@@ -358,6 +413,33 @@ SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrat
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
|
||||
ON public.dlob_slippage_ts (market_name, ts DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
|
||||
ts TIMESTAMPTZ NOT NULL,
|
||||
id BIGSERIAL NOT NULL,
|
||||
market_name TEXT NOT NULL,
|
||||
side TEXT NOT NULL,
|
||||
size_usd NUMERIC NOT NULL,
|
||||
market_type TEXT NOT NULL DEFAULT 'perp',
|
||||
market_index INTEGER,
|
||||
source_ts BIGINT,
|
||||
slot BIGINT,
|
||||
mid_price NUMERIC,
|
||||
vwap_price NUMERIC,
|
||||
worst_price NUMERIC,
|
||||
filled_usd NUMERIC,
|
||||
filled_base NUMERIC,
|
||||
impact_bps NUMERIC,
|
||||
levels_consumed INTEGER,
|
||||
fill_pct NUMERIC,
|
||||
raw JSONB,
|
||||
PRIMARY KEY (ts, id)
|
||||
);
|
||||
|
||||
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
|
||||
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
|
||||
|
||||
-- Retention policies (best-effort; safe if Timescale is present).
|
||||
DO $$
|
||||
BEGIN
|
||||
@@ -375,3 +457,8 @@ BEGIN
|
||||
PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days');
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
END $$;
|
||||
DO $$
|
||||
BEGIN
|
||||
PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days');
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
END $$;
|
||||
|
||||
@@ -22,6 +22,7 @@ resources:
|
||||
- dlob-depth-worker/deployment.yaml
|
||||
- dlob-slippage-worker/deployment.yaml
|
||||
- dlob-ts-archiver/deployment.yaml
|
||||
- candles-cache-worker/deployment.yaml
|
||||
|
||||
configMapGenerator:
|
||||
- name: postgres-initdb
|
||||
@@ -42,9 +43,15 @@ configMapGenerator:
|
||||
- name: dlob-ts-archiver-script
|
||||
files:
|
||||
- dlob-ts-archiver/worker.mjs
|
||||
- name: candles-cache-worker-script
|
||||
files:
|
||||
- candles-cache-worker/worker.mjs
|
||||
- name: trade-api-wrapper
|
||||
files:
|
||||
- api/wrapper.mjs
|
||||
- name: trade-api-upstream
|
||||
files:
|
||||
- api/server.mjs
|
||||
|
||||
generatorOptions:
|
||||
disableNameSuffixHash: true
|
||||
|
||||
Reference in New Issue
Block a user