feat(canary): deploy r001 app surface on sol
All checks were successful
deploy-trade-r001-canary / apply (push) Successful in 1m1s

This commit is contained in:
mpabi
2026-04-12 17:18:42 +02:00
parent 060dcc38a6
commit 6672e1043d
19 changed files with 4511 additions and 3 deletions

View File

@@ -30,13 +30,47 @@ jobs:
install -m 0755 /tmp/kubectl /usr/local/bin/kubectl install -m 0755 /tmp/kubectl /usr/local/bin/kubectl
kubectl version --client kubectl version --client
- name: Apply canary namespace - name: Verify prerequisite secrets
env:
KUBECONFIG: /tmp/kubeconfig
run: |
kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth gitea-registry
- name: Recreate bootstrap jobs
env:
KUBECONFIG: /tmp/kubeconfig
run: |
kubectl -n trade-r001-canary delete job postgres-migrate hasura-bootstrap --ignore-not-found=true
- name: Apply canary environment
env: env:
KUBECONFIG: /tmp/kubeconfig KUBECONFIG: /tmp/kubeconfig
run: | run: |
kubectl apply -k environments/sol/trade-r001-canary kubectl apply -k environments/sol/trade-r001-canary
kubectl get ns trade-r001-canary --show-labels kubectl get ns trade-r001-canary --show-labels
kubectl -n trade-r001-canary get resourcequota,limitrange kubectl -n trade-r001-canary get svc,resourcequota,limitrange
- name: Restart application surface
env:
KUBECONFIG: /tmp/kubeconfig
run: |
kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend
- name: Wait for database and metadata bootstrap
env:
KUBECONFIG: /tmp/kubeconfig
run: |
kubectl -n trade-r001-canary wait --for=condition=complete job/postgres-migrate --timeout=300s
kubectl -n trade-r001-canary wait --for=condition=complete job/hasura-bootstrap --timeout=300s
- name: Wait for application rollouts
env:
KUBECONFIG: /tmp/kubeconfig
run: |
kubectl -n trade-r001-canary rollout status deploy/hasura --timeout=300s
kubectl -n trade-r001-canary rollout status deploy/trade-api --timeout=300s
kubectl -n trade-r001-canary rollout status deploy/trade-frontend --timeout=300s
kubectl -n trade-r001-canary get deploy,pods -o wide
- name: Verify canary namespace connectivity - name: Verify canary namespace connectivity
env: env:
@@ -60,4 +94,19 @@ jobs:
with socket.create_connection((host, port), timeout=5): with socket.create_connection((host, port), timeout=5):
print(f"OK {host}:{port}") print(f"OK {host}:{port}")
PY PY
kubectl -n trade-r001-canary exec canary-netcheck -- python - <<'PY'
import urllib.request
targets = [
"http://hasura:8080/healthz",
"http://trade-api:8787/healthz",
"http://trade-frontend:8081/healthz",
]
for url in targets:
with urllib.request.urlopen(url, timeout=10) as response:
if response.status != 200:
raise SystemExit(f"Unexpected status for {url}: {response.status}")
print(f"OK {url}")
PY
kubectl -n trade-r001-canary delete pod canary-netcheck --wait=true kubectl -n trade-r001-canary delete pod canary-netcheck --wait=true

View File

@@ -7,6 +7,7 @@ Minimal canary namespace for migration baseline `R001` on `sol`.
- Reserve a dedicated namespace for the first reconstructed trade deployment. - Reserve a dedicated namespace for the first reconstructed trade deployment.
- Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land. - Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land.
- Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`. - Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`.
- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, and `trade-frontend`.
## Current Guardrails ## Current Guardrails
@@ -30,7 +31,28 @@ Minimal canary namespace for migration baseline `R001` on `sol`.
## Notes ## Notes
- This namespace is intentionally conservative until item `14` and the validator protection envelope are fully defined. - This namespace is intentionally conservative until item `14` and the validator protection envelope are fully defined.
- The namespace does not yet recreate `gitea-registry`; that remains a separate migration prerequisite.
- Current shared infrastructure endpoints expected by canary workloads: - Current shared infrastructure endpoints expected by canary workloads:
- `postgres-host.trade-infra.svc.cluster.local:5432` - `postgres-host.trade-infra.svc.cluster.local:5432`
- `redis-host.trade-infra.svc.cluster.local:6379` - `redis-host.trade-infra.svc.cluster.local:6379`
## Application Surface
- `postgres` in this namespace is an `ExternalName` alias that points to `postgres-host.trade-infra.svc.cluster.local`.
- `Hasura` uses the live `R001` secrets copied from `trade-staging`, but connects to the host `Postgres` on `sol`.
- `trade-api` and `trade-frontend` use the current live images from Gitea registry and the same bootstrap wrapper/config pattern as the source environment.
- The canary workflow re-runs:
- `postgres-migrate`
- `hasura-bootstrap`
before it waits for `Hasura`, `trade-api`, and `trade-frontend` to become healthy.
## Operator Flow
From the repository root:
```bash
./environments/sol/trade-r001-canary/scripts/prepare-sol-postgres.sh
./environments/sol/trade-r001-canary/scripts/create-gitea-registry-secret.sh
./environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh
```
After the prerequisites are seeded, push to `main` and let `deploy-trade-r001-canary` apply the environment.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,783 @@
import crypto from 'node:crypto';
import http from 'node:http';
import { spawn } from 'node:child_process';
const WRAPPER_PORT = Number.parseInt(String(process.env.PORT || '8787'), 10);
const UPSTREAM_PORT = Number.parseInt(String(process.env.UPSTREAM_PORT || '8788'), 10);
const UPSTREAM_HOST = String(process.env.UPSTREAM_HOST || '127.0.0.1');
const UPSTREAM_ENTRY = String(process.env.UPSTREAM_ENTRY || '/app/services/api/server.mjs');
const HASURA_URL = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql');
const HASURA_ADMIN_SECRET = String(process.env.HASURA_ADMIN_SECRET || '');
const CORS_ORIGIN = String(process.env.CORS_ORIGIN || '*');
if (!Number.isInteger(WRAPPER_PORT) || WRAPPER_PORT <= 0) throw new Error('Invalid PORT');
if (!Number.isInteger(UPSTREAM_PORT) || UPSTREAM_PORT <= 0) throw new Error('Invalid UPSTREAM_PORT');
function getIsoNow() {
return new Date().toISOString();
}
function sha256Hex(text) {
return crypto.createHash('sha256').update(text, 'utf8').digest('hex');
}
function getHeader(req, name) {
const v = req.headers[String(name).toLowerCase()];
return Array.isArray(v) ? v[0] : v;
}
function readBearerToken(req) {
const auth = getHeader(req, 'authorization');
if (auth && typeof auth === 'string') {
const m = auth.match(/^Bearer\s+(.+)$/i);
if (m && m[1]) return m[1].trim();
}
const apiKey = getHeader(req, 'x-api-key');
if (apiKey && typeof apiKey === 'string' && apiKey.trim()) return apiKey.trim();
return undefined;
}
function withCors(res) {
res.setHeader('access-control-allow-origin', CORS_ORIGIN);
res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS');
res.setHeader(
'access-control-allow-headers',
'content-type, authorization, x-api-key, x-admin-secret'
);
}
function sendJson(res, status, body) {
withCors(res);
res.statusCode = status;
res.setHeader('content-type', 'application/json; charset=utf-8');
res.end(JSON.stringify(body));
}
async function readBodyJson(req, { maxBytes }) {
const chunks = [];
let total = 0;
for await (const chunk of req) {
total += chunk.length;
if (total > maxBytes) throw new Error('payload_too_large');
chunks.push(chunk);
}
const text = Buffer.concat(chunks).toString('utf8');
if (!text.trim()) return {};
try {
return JSON.parse(text);
} catch {
throw new Error('invalid_json');
}
}
function parseNumeric(value) {
if (value == null) return null;
if (typeof value === 'number') return Number.isFinite(value) ? value : null;
if (typeof value === 'string') {
const s = value.trim();
if (!s) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
return null;
}
function clampInt(value, min, max) {
const n = Number.parseInt(String(value), 10);
if (!Number.isFinite(n) || !Number.isInteger(n)) return min;
return Math.min(max, Math.max(min, n));
}
function clampNumber(value, min, max, fallback) {
const n = typeof value === 'number' ? value : Number(value);
if (!Number.isFinite(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
function isUuid(value) {
const s = String(value || '').trim();
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(s);
}
async function hasuraRequest(query, variables) {
if (!HASURA_ADMIN_SECRET) throw new Error('Missing HASURA_ADMIN_SECRET');
const res = await fetch(HASURA_URL, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-hasura-admin-secret': HASURA_ADMIN_SECRET,
},
body: JSON.stringify({ query, variables }),
});
const text = await res.text();
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
const json = text ? JSON.parse(text) : {};
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
return json.data;
}
function normalizeScopes(value) {
if (!value) return [];
if (Array.isArray(value)) return value.map((v) => String(v)).filter(Boolean);
if (typeof value === 'string') {
return value
.split(',')
.map((s) => s.trim())
.filter(Boolean);
}
return [];
}
async function requireValidToken(req, requiredScope) {
const token = readBearerToken(req);
if (!token) return { ok: false, status: 401, error: 'missing_token' };
const hash = sha256Hex(token);
const query = `
query ValidToken($hash: String!) {
api_tokens(where: {token_hash: {_eq: $hash}, revoked_at: {_is_null: true}}, limit: 1) {
id
name
scopes
}
}
`;
let data;
try {
data = await hasuraRequest(query, { hash });
} catch (err) {
return { ok: false, status: 500, error: String(err?.message || err) };
}
const row = data?.api_tokens?.[0];
if (!row?.id) return { ok: false, status: 401, error: 'invalid_or_revoked_token' };
const scopes = normalizeScopes(row.scopes);
if (requiredScope && !scopes.includes(requiredScope)) {
return { ok: false, status: 403, error: 'missing_scope' };
}
// best-effort touch
const touch = `
mutation TouchToken($id: uuid!, $ts: timestamptz!) {
update_api_tokens_by_pk(pk_columns: {id: $id}, _set: {last_used_at: $ts}) { id }
}
`;
hasuraRequest(touch, { id: row.id, ts: getIsoNow() }).catch(() => {});
return { ok: true, token: { id: row.id, name: row.name } };
}
async function readSolPriceUsd() {
try {
const q = `
query SolPrice {
dlob_stats_latest(where: {market_name: {_eq: "SOL-PERP"}}, limit: 1) {
mid_price
mark_price
oracle_price
}
}
`;
const data = await hasuraRequest(q, {});
const row = data?.dlob_stats_latest?.[0];
const p = parseNumeric(row?.mid_price) ?? parseNumeric(row?.mark_price) ?? parseNumeric(row?.oracle_price);
if (p != null && p > 0) return p;
} catch {
// ignore
}
return null;
}
function getByPath(obj, pathStr) {
if (!obj || typeof obj !== 'object') return undefined;
const parts = String(pathStr || '').split('.').filter(Boolean);
let cur = obj;
for (const p of parts) {
if (!cur || typeof cur !== 'object') return undefined;
cur = cur[p];
}
return cur;
}
function readNumberFromPayload(payload, paths) {
for (const p of paths) {
const v = getByPath(payload, p);
const n = parseNumeric(v);
if (n != null) return n;
}
return null;
}
function readTextFromPayload(payload, paths) {
for (const p of paths) {
const v = getByPath(payload, p);
if (typeof v === 'string' && v.trim()) return v.trim();
}
return null;
}
function inferContractSizeUsd(contract) {
return (
readNumberFromPayload(contract, [
'desired.size_usd',
'desired.sizeUsd',
'desired.notional_usd',
'desired.notionalUsd',
'entry.size_usd',
'entry.sizeUsd',
'entry.notional_usd',
'entry.notionalUsd',
'entry.order_intent.size_usd',
'entry.order_intent.sizeUsd',
'desired.order_intent.size_usd',
'desired.order_intent.sizeUsd',
]) || null
);
}
function inferContractSide(contract) {
const raw =
readTextFromPayload(contract, [
'desired.side',
'entry.side',
'entry.order_intent.side',
'desired.order_intent.side',
'desired.direction',
'entry.direction',
]) || '';
const v = raw.toLowerCase();
if (v === 'long' || v === 'buy') return 'long';
if (v === 'short' || v === 'sell') return 'short';
return null;
}
function sumCostsFromEvents(events) {
const totals = {
tradeFeeUsd: 0,
txFeeUsd: 0,
slippageUsd: 0,
fundingUsd: 0,
realizedPnlUsd: 0,
txCount: 0,
fillCount: 0,
cancelCount: 0,
modifyCount: 0,
errorCount: 0,
};
for (const ev of events || []) {
const t = String(ev?.event_type || '').toLowerCase();
const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {};
const tradeFeeUsd =
readNumberFromPayload(payload, [
'realized_fee_usd',
'trade_fee_usd',
'fee_usd',
'fees.trade_fee_usd',
'fees.usd',
]) || 0;
const txFeeUsd =
readNumberFromPayload(payload, [
'realized_tx_usd',
'tx_fee_usd',
'network_fee_usd',
'fees.tx_fee_usd',
'fees.network_usd',
]) || 0;
const slippageUsd =
readNumberFromPayload(payload, [
'slippage_usd',
'realized_slippage_usd',
'execution_usd',
'realized_execution_usd',
]) || 0;
const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0;
const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0;
const txCount = readNumberFromPayload(payload, ['tx_count', 'txCount']) || 0;
totals.tradeFeeUsd += tradeFeeUsd;
totals.txFeeUsd += txFeeUsd;
totals.slippageUsd += slippageUsd;
totals.fundingUsd += fundingUsd;
totals.realizedPnlUsd += pnlUsd;
totals.txCount += txCount;
if (t.includes('fill')) totals.fillCount += 1;
if (t.includes('cancel')) totals.cancelCount += 1;
if (t.includes('modify') || t.includes('reprice')) totals.modifyCount += 1;
if (t.includes('error') || String(ev?.severity || '').toLowerCase() === 'error') totals.errorCount += 1;
}
const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd;
return {
...totals,
totalCostsUsd,
netPnlUsd: totals.realizedPnlUsd - totalCostsUsd,
};
}
function buildCostSeriesFromEvents(events, { maxPoints }) {
const points = [];
const totals = {
tradeFeeUsd: 0,
txFeeUsd: 0,
slippageUsd: 0,
fundingUsd: 0,
realizedPnlUsd: 0,
};
for (const ev of events || []) {
const ts = ev?.ts;
if (!ts) continue;
const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {};
const tradeFeeUsd =
readNumberFromPayload(payload, [
'realized_fee_usd',
'trade_fee_usd',
'fee_usd',
'fees.trade_fee_usd',
'fees.usd',
]) || 0;
const txFeeUsd =
readNumberFromPayload(payload, [
'realized_tx_usd',
'tx_fee_usd',
'network_fee_usd',
'fees.tx_fee_usd',
'fees.network_usd',
]) || 0;
const slippageUsd =
readNumberFromPayload(payload, [
'slippage_usd',
'realized_slippage_usd',
'execution_usd',
'realized_execution_usd',
]) || 0;
const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0;
const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0;
totals.tradeFeeUsd += tradeFeeUsd;
totals.txFeeUsd += txFeeUsd;
totals.slippageUsd += slippageUsd;
totals.fundingUsd += fundingUsd;
totals.realizedPnlUsd += pnlUsd;
const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd;
points.push({
ts,
tradeFeeUsd: totals.tradeFeeUsd,
txFeeUsd: totals.txFeeUsd,
slippageUsd: totals.slippageUsd,
fundingUsd: totals.fundingUsd,
totalCostsUsd,
realizedPnlUsd: totals.realizedPnlUsd,
netPnlUsd: totals.realizedPnlUsd - totalCostsUsd,
});
}
const cap = Math.max(50, Math.min(10_000, Number(maxPoints) || 600));
if (points.length <= cap) return points;
const step = Math.ceil(points.length / cap);
const sampled = [];
for (let i = 0; i < points.length; i += step) sampled.push(points[i]);
const last = points[points.length - 1];
if (sampled[sampled.length - 1] !== last) sampled.push(last);
return sampled;
}
function proxyToUpstream(req, res) {
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const headers = { ...req.headers };
delete headers.host;
delete headers.connection;
const opts = {
host: UPSTREAM_HOST,
port: UPSTREAM_PORT,
method: req.method,
path: url.pathname + url.search,
headers,
};
const upstreamReq = http.request(opts, (upstreamRes) => {
withCors(res);
res.statusCode = upstreamRes.statusCode || 502;
for (const [k, v] of Object.entries(upstreamRes.headers || {})) {
if (!k) continue;
if (k.toLowerCase() === 'content-length') continue;
if (k.toLowerCase().startsWith('access-control-')) continue;
if (v != null) res.setHeader(k, v);
}
upstreamRes.pipe(res);
});
upstreamReq.on('error', (err) => {
sendJson(res, 502, { ok: false, error: String(err?.message || err) });
});
req.pipe(upstreamReq);
}
async function handleMonitor(req, res, url) {
const auth = await requireValidToken(req, 'read');
if (!auth.ok) {
sendJson(res, auth.status, { ok: false, error: auth.error });
return;
}
const pathname = url.pathname;
const parts = pathname.split('/').filter(Boolean);
const contractId = parts[2];
if (!isUuid(contractId)) {
sendJson(res, 400, { ok: false, error: 'invalid_contract_id' });
return;
}
const limit = clampInt(url.searchParams.get('eventsLimit') || '2000', 10, 50_000);
const wantSeries = (url.searchParams.get('series') || '').trim() === '1';
const seriesMax = clampInt(url.searchParams.get('seriesMax') || '600', 50, 10_000);
const qContract = `
query ContractByPk($id: uuid!) {
bot_contracts_by_pk(id: $id) {
id
decision_id
bot_id
model_version
market_name
subaccount_id
status
desired
entry
manage
exit
gates
created_at
updated_at
last_heartbeat_at
ended_at
reason
}
}
`;
const qEvents = `
query ContractEvents($id: uuid!, $limit: Int!) {
bot_events(where: {contract_id: {_eq: $id}}, order_by: {ts: asc}, limit: $limit) {
ts
contract_id
decision_id
bot_id
market_name
event_type
severity
payload
}
}
`;
try {
const data = await hasuraRequest(qContract, { id: contractId });
const contract = data?.bot_contracts_by_pk;
if (!contract?.id) {
sendJson(res, 404, { ok: false, error: 'contract_not_found' });
return;
}
const evData = await hasuraRequest(qEvents, { id: contractId, limit });
const events = evData?.bot_events || [];
const costs = sumCostsFromEvents(events);
const series = wantSeries ? buildCostSeriesFromEvents(events, { maxPoints: seriesMax }) : null;
const sizeUsd = inferContractSizeUsd(contract);
const side = inferContractSide(contract);
let closeEst = null;
if (contract.market_name && sizeUsd != null) {
const qSlip = `
query Slippage($market: String!) {
dlob_slippage_latest(where: {market_name: {_eq: $market}}) {
market_name
side
size_usd
mid_price
vwap_price
worst_price
impact_bps
fill_pct
updated_at
}
}
`;
const slipData = await hasuraRequest(qSlip, { market: contract.market_name });
const rows = slipData?.dlob_slippage_latest || [];
const pickNearest = (wantedSide) => {
const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide);
if (!candidates.length) return null;
let best = null;
let bestD = Infinity;
for (const r of candidates) {
const s = parseNumeric(r.size_usd);
if (s == null) continue;
const d = Math.abs(s - sizeUsd);
if (d < bestD) {
bestD = d;
best = r;
}
}
return best;
};
closeEst = {
requestedSizeUsd: sizeUsd,
entrySide: side,
suggestedCloseSide: side === 'long' ? 'sell' : side === 'short' ? 'buy' : null,
buy: pickNearest('buy'),
sell: pickNearest('sell'),
};
}
sendJson(res, 200, {
ok: true,
contract,
eventsCount: events.length,
costs,
series,
closeEstimate: closeEst,
});
} catch (err) {
sendJson(res, 500, { ok: false, error: String(err?.message || err) });
}
}
async function handleEstimate(req, res) {
const auth = await requireValidToken(req, 'read');
if (!auth.ok) {
sendJson(res, auth.status, { ok: false, error: auth.error });
return;
}
let body;
try {
body = await readBodyJson(req, { maxBytes: 256 * 1024 });
} catch (err) {
const msg = String(err?.message || err);
if (msg === 'payload_too_large') {
sendJson(res, 413, { ok: false, error: 'payload_too_large' });
return;
}
sendJson(res, 400, { ok: false, error: 'invalid_json' });
return;
}
const market = String(body?.market_name || body?.market || '').trim();
if (!market) {
sendJson(res, 400, { ok: false, error: 'missing_market' });
return;
}
const notionalUsd = parseNumeric(body?.notional_usd ?? body?.notionalUsd ?? body?.size_usd ?? body?.sizeUsd);
if (!(notionalUsd != null && notionalUsd > 0)) {
sendJson(res, 400, { ok: false, error: 'invalid_notional_usd' });
return;
}
const entrySideRaw = String(body?.side || 'long').trim().toLowerCase();
const entrySide = entrySideRaw === 'short' ? 'short' : 'long';
const orderType = String(body?.order_type || body?.orderType || 'market').trim().toLowerCase();
const isMarket = orderType === 'market' || orderType === 'taker';
const takerBps = clampNumber(parseNumeric(body?.fee_taker_bps) ?? 5, 0, 1000, 5);
const makerBps = clampNumber(parseNumeric(body?.fee_maker_bps) ?? 0, -1000, 1000, 0);
const feeBps = isMarket ? takerBps : makerBps;
let txFeeUsdEst = parseNumeric(body?.tx_fee_usd_est);
if (txFeeUsdEst == null) {
const baseLamports = 5000;
const sigs = 1;
const priorityLamports = 0;
const lamports = baseLamports * sigs + priorityLamports;
const sol = lamports / 1_000_000_000;
const solUsd = await readSolPriceUsd();
txFeeUsdEst = solUsd != null ? sol * solUsd : 0;
}
txFeeUsdEst = clampNumber(txFeeUsdEst, 0, 100, 0);
const expectedReprices = clampInt(body?.expected_reprices_per_entry ?? body?.expectedReprices ?? '0', 0, 500);
const modifyTxCount = clampInt(body?.modify_tx_count ?? body?.modifyTxCount ?? '2', 0, 10);
try {
const qSlip = `
query Slippage($market: String!) {
dlob_slippage_latest(where: {market_name: {_eq: $market}}) {
market_name
side
size_usd
mid_price
vwap_price
worst_price
impact_bps
fill_pct
updated_at
}
}
`;
const slipData = await hasuraRequest(qSlip, { market });
const rows = slipData?.dlob_slippage_latest || [];
const wantedSide = entrySide === 'long' ? 'buy' : 'sell';
const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide);
let best = null;
let bestD = Infinity;
for (const r of candidates) {
const s = parseNumeric(r.size_usd);
if (s == null) continue;
const d = Math.abs(s - notionalUsd);
if (d < bestD) {
bestD = d;
best = r;
}
}
const impactBps = parseNumeric(best?.impact_bps) ?? 0;
const slippageUsd = (notionalUsd * impactBps) / 10_000;
const tradeFeeUsd = (notionalUsd * feeBps) / 10_000;
const modifyCostUsd = expectedReprices * modifyTxCount * txFeeUsdEst;
const totalUsd = tradeFeeUsd + slippageUsd + txFeeUsdEst + modifyCostUsd;
const totalBps = (totalUsd / notionalUsd) * 10_000;
sendJson(res, 200, {
ok: true,
input: {
market_name: market,
notional_usd: notionalUsd,
side: entrySide,
order_type: orderType,
fee_bps: feeBps,
tx_fee_usd_est: txFeeUsdEst,
expected_reprices_per_entry: expectedReprices,
},
dlob: best
? {
size_usd: best.size_usd,
side: best.side,
mid_price: best.mid_price,
vwap_price: best.vwap_price,
impact_bps: best.impact_bps,
fill_pct: best.fill_pct,
updated_at: best.updated_at,
}
: null,
breakdown: {
trade_fee_usd: tradeFeeUsd,
slippage_usd: slippageUsd,
tx_fee_usd: txFeeUsdEst,
expected_modify_usd: modifyCostUsd,
total_usd: totalUsd,
total_bps: totalBps,
breakeven_bps: totalBps,
},
});
} catch (err) {
sendJson(res, 500, { ok: false, error: String(err?.message || err) });
}
}
let upstreamChild = null;
function startUpstream() {
const env = { ...process.env, PORT: String(UPSTREAM_PORT) };
upstreamChild = spawn('node', [UPSTREAM_ENTRY], { env, stdio: 'inherit' });
upstreamChild.on('exit', (code, signal) => {
console.error(`upstream exited: code=${code} signal=${signal}`);
});
}
function shutdown() {
if (upstreamChild && !upstreamChild.killed) upstreamChild.kill('SIGTERM');
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
startUpstream();
const server = http.createServer(async (req, res) => {
if (req.method === 'OPTIONS') {
withCors(res);
res.statusCode = 204;
res.end();
return;
}
const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`);
const pathname = url.pathname;
if (req.method === 'GET' && pathname === '/healthz') {
// Check upstream quickly; if it's down, we fail readiness.
const opts = { host: UPSTREAM_HOST, port: UPSTREAM_PORT, path: '/healthz', method: 'GET', timeout: 800 };
const upstreamOk = await new Promise((resolve) => {
const r = http.request(opts, (rr) => {
rr.resume();
resolve(rr.statusCode === 200);
});
r.on('timeout', () => {
r.destroy();
resolve(false);
});
r.on('error', () => resolve(false));
r.end();
});
if (!upstreamOk) {
sendJson(res, 503, { ok: false, error: 'upstream_not_ready' });
return;
}
sendJson(res, 200, {
ok: true,
service: 'trade-api-wrapper',
startedAt: getIsoNow(),
upstream: { host: UPSTREAM_HOST, port: UPSTREAM_PORT },
});
return;
}
if (req.method === 'POST' && pathname === '/v1/contracts/costs/estimate') {
await handleEstimate(req, res);
return;
}
if (req.method === 'GET' && pathname.startsWith('/v1/contracts/') && pathname.endsWith('/monitor')) {
await handleMonitor(req, res, url);
return;
}
proxyToUpstream(req, res);
});
server.listen(WRAPPER_PORT, () => {
console.log(
JSON.stringify(
{
service: 'trade-api-wrapper',
port: WRAPPER_PORT,
upstream: { entry: UPSTREAM_ENTRY, host: UPSTREAM_HOST, port: UPSTREAM_PORT },
hasuraUrl: HASURA_URL,
hasuraAdminSecret: HASURA_ADMIN_SECRET ? '***' : undefined,
},
null,
2
)
);
});

View File

@@ -0,0 +1,498 @@
const HASURA_URL = process.env.HASURA_URL || 'http://hasura:8080';
const ADMIN_SECRET = process.env.HASURA_ADMIN_SECRET || 'devsecret';
const TARGET_TICKS_TABLE = process.env.TICKS_TABLE;
const TARGET_CANDLES_FUNCTION = process.env.CANDLES_FUNCTION;
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
async function httpJson(url, body) {
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-hasura-admin-secret': ADMIN_SECRET,
},
body: JSON.stringify(body),
});
const text = await res.text();
let json;
try {
json = JSON.parse(text);
} catch {
json = { raw: text };
}
return { ok: res.ok, status: res.status, json, text };
}
async function waitForHasura() {
const healthUrl = `${HASURA_URL.replace(/\/$/, '')}/healthz`;
for (let i = 0; i < 60; i++) {
try {
const res = await fetch(healthUrl);
if (res.ok) return;
} catch {
// ignore
}
await sleep(1000);
}
throw new Error(`Hasura not healthy at ${healthUrl}`);
}
function isAlreadyExistsError(errText) {
const t = String(errText || '').toLowerCase();
return (
t.includes('already tracked') ||
t.includes('already exists') ||
t.includes('already present') ||
t.includes('already been tracked') ||
(t.includes('permission') && t.includes('already exists'))
);
}
function isMissingRelationError(errText) {
const t = String(errText || '').toLowerCase();
return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked') || t.includes('already untracked');
}
function normalizeName(name) {
const v = String(name || '')
.trim()
.toLowerCase();
if (!v) return undefined;
if (!/^[a-z][a-z0-9_]*$/.test(v)) throw new Error(`Invalid name: ${name}`);
return v;
}
async function metadata(op) {
const url = `${HASURA_URL.replace(/\/$/, '')}/v1/metadata`;
const res = await httpJson(url, op);
if (res.ok) return { status: 'ok', res };
const errText = res.json?.error || res.text;
if (isAlreadyExistsError(errText)) return { status: 'skip', res };
throw new Error(`Hasura metadata error (${res.status}): ${JSON.stringify(res.json)}`);
}
async function metadataIgnore(op) {
const url = `${HASURA_URL.replace(/\/$/, '')}/v1/metadata`;
const res = await httpJson(url, op);
if (res.ok) return { status: 'ok', res };
const errText = res.json?.error || res.text;
if (isAlreadyExistsError(errText) || isMissingRelationError(errText)) return { status: 'skip', res };
throw new Error(`Hasura metadata error (${res.status}): ${JSON.stringify(res.json)}`);
}
async function main() {
console.log(`[hasura-bootstrap] HASURA_URL=${HASURA_URL}`);
await waitForHasura();
const PUBLIC_DLOB_SOURCE_HEADER = 'X-Hasura-Dlob-Source';
const apiTokensTable = { schema: 'public', name: 'api_tokens' };
const botConfigTable = { schema: 'public', name: 'bot_config' };
const botStateTable = { schema: 'public', name: 'bot_state' };
const botEventsTable = { schema: 'public', name: 'bot_events' };
const source = 'default';
const baseTicks = { schema: 'public', name: 'drift_ticks' };
const dlobL2LatestTable = { schema: 'public', name: 'dlob_l2_latest' };
const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' };
const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' };
const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' };
const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' };
const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' };
const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' };
const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' };
const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' };
const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' };
const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' };
const candlesReturnTable = { schema: 'public', name: 'drift_candles' };
const extraTicksName = normalizeName(TARGET_TICKS_TABLE);
const extraCandlesName = normalizeName(TARGET_CANDLES_FUNCTION);
const tickTables = [baseTicks];
if (extraTicksName && extraTicksName !== baseTicks.name) {
tickTables.push({ schema: 'public', name: extraTicksName });
}
const candleFns = [baseCandlesFn];
if (extraCandlesName && extraCandlesName !== baseCandlesFn.name) {
candleFns.push({ schema: 'public', name: extraCandlesName });
}
const ensureTickTable = async (table) => {
await metadata({ type: 'pg_track_table', args: { source, table } });
// Ensure latest permission definition (drop+create avoids stale column sets).
await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } });
await metadata({
type: 'pg_create_select_permission',
args: {
source,
table,
role: 'public',
permission: {
columns: ['ts', 'market_index', 'symbol', 'oracle_price', 'mark_price', 'oracle_slot', 'source'],
filter: {},
},
},
});
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_insert_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
check: {},
set: {},
columns: ['ts', 'market_index', 'symbol', 'oracle_price', 'mark_price', 'oracle_slot', 'source', 'raw'],
},
},
});
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_update_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
filter: {},
check: {},
columns: ['oracle_price', 'mark_price', 'oracle_slot', 'source', 'raw'],
},
},
});
};
for (const t of tickTables) {
await ensureTickTable(t);
}
// Cached candles table (precomputed by worker; public read).
await ensurePublicSelectTable(candlesCacheTable, [
'bucket',
'bucket_seconds',
'symbol',
'source',
'open',
'high',
'low',
'close',
'oracle_close',
'ticks',
'updated_at',
]);
const ensureDlobTable = async (table, columns, { publicFilter } = {}) => {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } });
await metadata({
type: 'pg_create_select_permission',
args: {
source,
table,
role: 'public',
permission: {
columns,
filter: publicFilter || {},
},
},
});
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_insert_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
check: {},
set: {},
columns,
},
},
});
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
await metadata({
type: 'pg_create_update_permission',
args: {
source,
table,
role: 'ingestor',
permission: {
filter: {},
check: {},
columns,
},
},
});
};
async function ensurePublicSelectTable(table, columns, { publicFilter } = {}) {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } });
await metadata({ type: 'pg_track_table', args: { source, table } });
await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } });
await metadata({
type: 'pg_create_select_permission',
args: {
source,
table,
role: 'public',
permission: {
columns,
filter: publicFilter || {},
},
},
});
// Computed/archived tables are written by workers using admin secret; keep ingestor off by default.
await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } });
await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } });
}
const dlobPublicFilter = { source: { _eq: PUBLIC_DLOB_SOURCE_HEADER } };
await ensureDlobTable(dlobL2LatestTable, [
'source',
'market_name',
'market_type',
'market_index',
'ts',
'slot',
'mark_price',
'oracle_price',
'best_bid_price',
'best_ask_price',
'bids',
'asks',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensureDlobTable(dlobStatsLatestTable, [
'source',
'market_name',
'market_type',
'market_index',
'ts',
'slot',
'mark_price',
'oracle_price',
'best_bid_price',
'best_ask_price',
'mid_price',
'spread_abs',
'spread_bps',
'depth_levels',
'depth_bid_base',
'depth_ask_base',
'depth_bid_usd',
'depth_ask_usd',
'imbalance',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsLatestTable, [
'source',
'market_name',
'band_bps',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'bid_base',
'ask_base',
'bid_usd',
'ask_usd',
'imbalance',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestTable, [
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageLatestV2Table, [
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
'updated_at',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobStatsTsTable, [
'ts',
'id',
'source',
'market_name',
'market_type',
'market_index',
'source_ts',
'slot',
'mark_price',
'oracle_price',
'best_bid_price',
'best_ask_price',
'mid_price',
'spread_abs',
'spread_bps',
'depth_levels',
'depth_bid_base',
'depth_ask_base',
'depth_bid_usd',
'depth_ask_usd',
'imbalance',
'raw',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobDepthBpsTsTable, [
'ts',
'id',
'source',
'market_name',
'band_bps',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'best_bid_price',
'best_ask_price',
'bid_base',
'ask_base',
'bid_usd',
'ask_usd',
'imbalance',
'raw',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsTable, [
'ts',
'id',
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
], { publicFilter: dlobPublicFilter });
await ensurePublicSelectTable(dlobSlippageTsV2Table, [
'ts',
'id',
'source',
'market_name',
'side',
'size_usd',
'market_type',
'market_index',
'source_ts',
'slot',
'mid_price',
'vwap_price',
'worst_price',
'filled_usd',
'filled_base',
'impact_bps',
'levels_consumed',
'fill_pct',
'raw',
], { publicFilter: dlobPublicFilter });
// Bot control-plane tables (tracked; permissions are intentionally not created here by default).
for (const t of [botConfigTable, botStateTable, botEventsTable]) {
await metadataIgnore({ type: 'pg_untrack_table', args: { source, table: t } });
await metadata({ type: 'pg_track_table', args: { source, table: t } });
}
// Return table type for candle functions (needed for Hasura to track the function).
await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } });
try {
await metadata({ type: 'pg_track_table', args: { source, table: apiTokensTable } });
} catch (err) {
const msg = String(err?.message || err);
if (msg.toLowerCase().includes('api_tokens') && isMissingRelationError(msg)) {
console.log('[hasura-bootstrap] api_tokens missing (run initdb SQL to create it); skipping track');
} else {
throw err;
}
}
for (const fn of candleFns) {
// Function for aggregated candle queries (used by trade-api).
await metadataIgnore({ type: 'pg_untrack_function', args: { source, function: fn } });
await metadata({ type: 'pg_track_function', args: { source, function: fn } });
}
console.log('[hasura-bootstrap] ok');
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,860 @@
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- `drift_ticks` is an append-only tick log.
--
-- TimescaleDB hypertables require every UNIQUE index / PRIMARY KEY to include the partitioning column (`ts`).
-- Therefore we use a composite primary key (ts, id) instead of PRIMARY KEY(id).
CREATE TABLE IF NOT EXISTS drift_ticks (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
market_index INTEGER NOT NULL,
symbol TEXT NOT NULL,
oracle_price NUMERIC NOT NULL,
mark_price NUMERIC,
oracle_slot BIGINT,
source TEXT NOT NULL DEFAULT 'drift_oracle',
raw JSONB,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS mark_price NUMERIC;
ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS id BIGSERIAL;
-- Migrate price columns to NUMERIC (Hasura `numeric` scalar returns strings; see app code).
DO $$
DECLARE
oracle_type text;
mark_type text;
BEGIN
SELECT data_type INTO oracle_type
FROM information_schema.columns
WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='oracle_price'
LIMIT 1;
IF oracle_type IS NOT NULL AND oracle_type <> 'numeric' THEN
EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN oracle_price TYPE numeric USING oracle_price::numeric';
END IF;
SELECT data_type INTO mark_type
FROM information_schema.columns
WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='mark_price'
LIMIT 1;
IF mark_type IS NOT NULL AND mark_type <> 'numeric' THEN
EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN mark_price TYPE numeric USING mark_price::numeric';
END IF;
END $$;
-- Ensure PRIMARY KEY is (ts, id) (Timescale hypertables require partition column in any UNIQUE/PK).
-- IMPORTANT: keep this idempotent so we can run migrations while the ingestor keeps writing ticks.
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'drift_ticks'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)';
ELSIF pk_cols <> ARRAY['ts','id'] THEN
EXECUTE format('ALTER TABLE public.drift_ticks DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)';
END IF;
END $$;
-- Convert to hypertable (migrate existing rows if any).
SELECT create_hypertable('drift_ticks', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
-- Historical note: earlier versions used a UNIQUE(market_index, ts) upsert model with ts rounded to seconds.
-- For "full ticks" (ms precision + multiple sources), we keep drift_ticks as an append-only event log.
ALTER TABLE drift_ticks DROP CONSTRAINT IF EXISTS drift_ticks_market_ts_unique;
CREATE INDEX IF NOT EXISTS drift_ticks_market_ts_desc_idx
ON drift_ticks (market_index, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_symbol_ts_desc_idx
ON drift_ticks (symbol, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_market_source_ts_desc_idx
ON drift_ticks (market_index, source, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_symbol_source_ts_desc_idx
ON drift_ticks (symbol, source, ts DESC);
-- Revocable API tokens for external algs (store only hashes, never raw tokens).
CREATE TABLE IF NOT EXISTS api_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
token_hash TEXT NOT NULL UNIQUE,
scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[],
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
revoked_at TIMESTAMPTZ,
last_used_at TIMESTAMPTZ,
meta JSONB
);
ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[];
CREATE INDEX IF NOT EXISTS api_tokens_revoked_at_idx
ON api_tokens (revoked_at);
-- Bot control-plane (desired state) + executor telemetry (state/events).
--
-- MVP intent:
-- - `bot_config`: desired state (mode/market/limits/kill switch)
-- - `bot_state`: heartbeat + last error/action
-- - `bot_events`: append-only audit log
CREATE TABLE IF NOT EXISTS public.bot_config (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL UNIQUE,
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
mode TEXT NOT NULL DEFAULT 'off',
kill_switch BOOLEAN NOT NULL DEFAULT FALSE,
params JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS bot_config_updated_at_desc_idx
ON public.bot_config (updated_at DESC);
CREATE TABLE IF NOT EXISTS public.bot_state (
bot_id UUID PRIMARY KEY REFERENCES public.bot_config(id) ON DELETE CASCADE,
last_heartbeat_at TIMESTAMPTZ,
last_action_at TIMESTAMPTZ,
last_error TEXT,
state JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS bot_state_updated_at_desc_idx
ON public.bot_state (updated_at DESC);
CREATE TABLE IF NOT EXISTS public.bot_events (
id BIGSERIAL PRIMARY KEY,
bot_id UUID NOT NULL REFERENCES public.bot_config(id) ON DELETE CASCADE,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
type TEXT NOT NULL,
payload JSONB
);
CREATE INDEX IF NOT EXISTS bot_events_bot_ts_desc_idx
ON public.bot_events (bot_id, ts DESC);
-- Compute OHLC candles from `drift_ticks` for a symbol and bucket size.
-- Exposed via Hasura (track function) and used by trade-api to compute indicators server-side.
-- Hasura tracks functions only if they return SETOF a table/view type.
-- This table is used purely as the return type for candle functions.
CREATE TABLE IF NOT EXISTS public.drift_candles (
bucket timestamptz,
open numeric,
high numeric,
low numeric,
close numeric,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric,
ticks bigint
);
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric;
-- Precomputed candle cache (materialized by a worker).
-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand.
-- NOTE: `source=''` means "any source" (no source filter).
CREATE TABLE IF NOT EXISTS public.drift_candles_cache (
bucket timestamptz NOT NULL,
bucket_seconds integer NOT NULL,
symbol text NOT NULL,
source text NOT NULL DEFAULT '',
open numeric NOT NULL,
high numeric NOT NULL,
low numeric NOT NULL,
close numeric NOT NULL,
oracle_open numeric,
oracle_high numeric,
oracle_low numeric,
oracle_close numeric,
ticks bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (bucket, bucket_seconds, symbol, source)
);
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric;
ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric;
SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx
ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC);
-- If an older version of the function exists with an incompatible return type,
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
CREATE OR REPLACE FUNCTION public.get_drift_candles(
p_symbol text,
p_bucket_seconds integer,
p_limit integer DEFAULT 500,
p_source text DEFAULT NULL
)
RETURNS SETOF public.drift_candles
LANGUAGE sql
STABLE
AS $$
-- Zwraca zawsze "ciągłe" buckety (fill forward), nawet jeśli nie było ticków w danej sekundzie/minucie.
-- Dzięki temu frontend może rysować regularną oś czasu (np. 1px = 1s) bez dziwnych przeskoków.
WITH src AS (
SELECT COALESCE(p_source, '') AS source_key
),
raw_cached AS (
SELECT
c.bucket,
c.open,
c.high,
c.low,
c.close,
c.oracle_open,
c.oracle_high,
c.oracle_low,
c.oracle_close,
c.ticks
FROM public.drift_candles_cache c, src
WHERE c.symbol = p_symbol
AND c.bucket_seconds = p_bucket_seconds
AND c.source = src.source_key
ORDER BY c.bucket DESC
LIMIT p_limit
),
raw_fallback AS (
SELECT
time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks, src
WHERE symbol = p_symbol
AND (src.source_key = '' OR source = src.source_key)
AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2))
),
computed AS (
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open,
max(oracle_px) AS oracle_high,
min(oracle_px) AS oracle_low,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*) AS ticks
FROM raw_fallback
GROUP BY bucket
),
data AS (
SELECT * FROM raw_cached
UNION ALL
SELECT * FROM computed
WHERE NOT EXISTS (SELECT 1 FROM raw_cached)
),
bounds AS (
SELECT max(bucket) AS end_bucket FROM data
),
params AS (
SELECT
make_interval(secs => p_bucket_seconds) AS step,
make_interval(secs => (p_bucket_seconds * (p_limit - 1))) AS span
),
series AS (
SELECT generate_series(
bounds.end_bucket - params.span,
bounds.end_bucket,
params.step
) AS bucket
FROM bounds, params
WHERE bounds.end_bucket IS NOT NULL
),
joined AS (
SELECT
s.bucket,
d.open,
d.high,
d.low,
d.close,
d.oracle_open,
d.oracle_high,
d.oracle_low,
d.oracle_close,
d.ticks
FROM series s
LEFT JOIN data d USING (bucket)
ORDER BY s.bucket ASC
),
grouped AS (
SELECT
*,
sum(CASE WHEN close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_close,
sum(CASE WHEN oracle_close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_oracle
FROM joined
),
first_vals AS (
SELECT
(SELECT close FROM grouped WHERE close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_close,
(SELECT oracle_close FROM grouped WHERE oracle_close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_oracle
),
ff AS (
SELECT
g.bucket,
g.open,
g.high,
g.low,
g.close,
g.oracle_open,
g.oracle_high,
g.oracle_low,
g.oracle_close,
g.ticks,
COALESCE(
g.close,
max(g.close) OVER (PARTITION BY g.grp_close),
f.first_close
) AS ff_close,
COALESCE(
g.oracle_close,
max(g.oracle_close) OVER (PARTITION BY g.grp_oracle),
f.first_oracle
) AS ff_oracle
FROM grouped g
CROSS JOIN first_vals f
)
SELECT
bucket,
COALESCE(open, ff_close) AS open,
COALESCE(high, ff_close) AS high,
COALESCE(low, ff_close) AS low,
COALESCE(close, ff_close) AS close,
COALESCE(oracle_open, ff_oracle) AS oracle_open,
COALESCE(oracle_high, ff_oracle) AS oracle_high,
COALESCE(oracle_low, ff_oracle) AS oracle_low,
COALESCE(oracle_close, ff_oracle) AS oracle_close,
COALESCE(ticks, 0) AS ticks
FROM ff
ORDER BY bucket DESC
LIMIT p_limit;
$$;
-- Latest DLOB orderbook snapshots (top-N levels), per market.
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mark_price NUMERIC,
oracle_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
bids JSONB,
asks JSONB,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
ON public.dlob_l2_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx
ON public.dlob_l2_latest (source, updated_at DESC);
-- Derived stats for fast UI display.
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mark_price NUMERIC,
oracle_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
mid_price NUMERIC,
spread_abs NUMERIC,
spread_bps NUMERIC,
depth_levels INTEGER,
depth_bid_base NUMERIC,
depth_ask_base NUMERIC,
depth_bid_usd NUMERIC,
depth_ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
ON public.dlob_stats_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx
ON public.dlob_stats_latest (source, updated_at DESC);
-- Depth snapshots within bps bands around mid-price (per market, per band).
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
bid_base NUMERIC,
ask_base NUMERIC,
bid_usd NUMERIC,
ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name, band_bps)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN
EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
ON public.dlob_depth_bps_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
ON public.dlob_depth_bps_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx
ON public.dlob_depth_bps_latest (source, market_name);
-- Slippage/impact estimates for "market" orders at common USD sizes.
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
ON public.dlob_slippage_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx
ON public.dlob_slippage_latest (source, market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx
ON public.dlob_slippage_latest_v2 (source, market_name);
-- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables.
CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mark_price NUMERIC,
oracle_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
mid_price NUMERIC,
spread_abs NUMERIC,
spread_bps NUMERIC,
depth_levels INTEGER,
depth_bid_base NUMERIC,
depth_ask_base NUMERIC,
depth_bid_usd NUMERIC,
depth_ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx
ON public.dlob_stats_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx
ON public.dlob_stats_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
best_bid_price NUMERIC,
best_ask_price NUMERIC,
bid_base NUMERIC,
ask_base NUMERIC,
bid_usd NUMERIC,
ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx
ON public.dlob_depth_bps_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx
ON public.dlob_depth_bps_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx
ON public.dlob_slippage_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd NUMERIC NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
source_ts BIGINT,
slot BIGINT,
mid_price NUMERIC,
vwap_price NUMERIC,
worst_price NUMERIC,
filled_usd NUMERIC,
filled_base NUMERIC,
impact_bps NUMERIC,
levels_consumed INTEGER,
fill_pct NUMERIC,
raw JSONB,
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present).
DO $$
BEGIN
PERFORM add_retention_policy('dlob_stats_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
-- ignore if policy exists or function unavailable
END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_depth_bps_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;
DO $$
BEGIN
PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days');
EXCEPTION WHEN OTHERS THEN
END $$;

View File

@@ -0,0 +1,52 @@
apiVersion: batch/v1
kind: Job
metadata:
name: hasura-bootstrap
namespace: trade-r001-canary
spec:
backoffLimit: 5
template:
spec:
restartPolicy: OnFailure
initContainers:
- name: wait-db
image: postgres:16
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: trade-postgres
command:
- sh
- -ec
- |
export PGPASSWORD="$POSTGRES_PASSWORD"
until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done
until psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -c "select 1 from public.drift_ticks limit 1" >/dev/null 2>&1; do sleep 1; done
containers:
- name: hasura-bootstrap
image: node:20-slim
imagePullPolicy: IfNotPresent
env:
- name: HASURA_URL
value: http://hasura:8080
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: TICKS_TABLE
value: drift_ticks
- name: CANDLES_FUNCTION
value: get_drift_candles
command:
- node
- /app/hasura-bootstrap.mjs
volumeMounts:
- name: script
mountPath: /app/hasura-bootstrap.mjs
subPath: hasura-bootstrap.mjs
readOnly: true
volumes:
- name: script
configMap:
name: hasura-bootstrap-script

View File

@@ -0,0 +1,61 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: hasura
namespace: trade-r001-canary
spec:
replicas: 1
selector:
matchLabels:
app: hasura
template:
metadata:
labels:
app: hasura
spec:
containers:
- name: hasura
image: hasura/graphql-engine:v2.40.1
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8080
envFrom:
- secretRef:
name: trade-postgres
- secretRef:
name: trade-hasura
env:
- name: HASURA_GRAPHQL_DATABASE_URL
value: postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB)
- name: HASURA_GRAPHQL_JWT_SECRET
value: '{"type":"HS256","key":"$(HASURA_JWT_KEY)"}'
- name: HASURA_GRAPHQL_ENABLE_CONSOLE
value: "false"
- name: HASURA_GRAPHQL_DEV_MODE
value: "false"
- name: HASURA_GRAPHQL_CORS_DOMAIN
value: http://localhost:5173,http://127.0.0.1:5173
- name: HASURA_GRAPHQL_UNAUTHORIZED_ROLE
value: public
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: "1"
memory: 1Gi
readinessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
livenessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 3

View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: hasura
namespace: trade-r001-canary
spec:
type: ClusterIP
selector:
app: hasura
ports:
- name: http
port: 8080
targetPort: http

View File

@@ -1,7 +1,35 @@
apiVersion: kustomize.config.k8s.io/v1beta1 apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization kind: Kustomization
namespace: trade-r001-canary
generatorOptions:
disableNameSuffixHash: true
resources: resources:
- namespace.yaml - namespace.yaml
- resourcequota.yaml - resourcequota.yaml
- limitrange.yaml - limitrange.yaml
- postgres-alias-service.yaml
- hasura-service.yaml
- hasura-deployment.yaml
- postgres-migrate-job.yaml
- hasura-bootstrap-job.yaml
- trade-api-service.yaml
- trade-api-deployment.yaml
- trade-frontend-service.yaml
- trade-frontend-deployment.yaml
configMapGenerator:
- name: postgres-initdb
files:
- assets/postgres/001_init.sql
- name: hasura-bootstrap-script
files:
- assets/hasura/hasura-bootstrap.mjs
- name: trade-api-wrapper
files:
- assets/api/wrapper.mjs
- name: trade-api-upstream
files:
- assets/api/server.mjs

View File

@@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: trade-r001-canary
spec:
type: ExternalName
externalName: postgres-host.trade-infra.svc.cluster.local
ports:
- name: postgres
port: 5432
targetPort: 5432

View File

@@ -0,0 +1,33 @@
apiVersion: batch/v1
kind: Job
metadata:
name: postgres-migrate
namespace: trade-r001-canary
spec:
backoffLimit: 3
template:
spec:
restartPolicy: OnFailure
containers:
- name: postgres-migrate
image: postgres:16
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: trade-postgres
command:
- sh
- -ec
- |
export PGPASSWORD="$POSTGRES_PASSWORD"
until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done
psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -f /migrations/001_init.sql
volumeMounts:
- name: migrations
mountPath: /migrations/001_init.sql
subPath: 001_init.sql
readOnly: true
volumes:
- name: migrations
configMap:
name: postgres-initdb

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
TARGET_HOST="${TARGET_HOST:-mevnode}"
TARGET_NAMESPACE="${TARGET_NAMESPACE:-trade-r001-canary}"
REGISTRY_HOST="${REGISTRY_HOST:-gitea.mpabi.pl}"
REGISTRY_USER="${REGISTRY_USER:-u1}"
REGISTRY_TOKEN_FILE="${REGISTRY_TOKEN_FILE:-/home/user/dev/trade/tokens/gitea-registry.token}"
ssh_target() {
ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@"
}
REGISTRY_TOKEN="$(tr -d '\r\n' < "$REGISTRY_TOKEN_FILE")"
if [ -z "$REGISTRY_TOKEN" ]; then
echo "Registry token is empty" >&2
exit 1
fi
ssh_target "sudo k3s kubectl get ns ${TARGET_NAMESPACE} >/dev/null 2>&1 || sudo k3s kubectl create ns ${TARGET_NAMESPACE} >/dev/null"
ssh_target "sudo k3s kubectl -n ${TARGET_NAMESPACE} create secret docker-registry gitea-registry --docker-server=${REGISTRY_HOST} --docker-username=${REGISTRY_USER} --docker-password='${REGISTRY_TOKEN}' --dry-run=client -o yaml | sudo k3s kubectl apply -f - >/dev/null"
echo "Updated imagePullSecret gitea-registry in ${TARGET_NAMESPACE} on ${TARGET_HOST}"

View File

@@ -0,0 +1,59 @@
#!/usr/bin/env bash
set -euo pipefail
SOURCE_HOST="${SOURCE_HOST:-mevnode_bot}"
SOURCE_NAMESPACE="${SOURCE_NAMESPACE:-trade-staging}"
TARGET_HOST="${TARGET_HOST:-mevnode}"
PG_VERSION="${PG_VERSION:-16}"
ssh_source() {
ssh -o StrictHostKeyChecking=no "$SOURCE_HOST" "$@"
}
ssh_target() {
ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@"
}
SRC_SECRET_JSON="$(ssh_source "sudo k3s kubectl -n ${SOURCE_NAMESPACE} get secret trade-postgres -o json")"
POSTGRES_USER="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_USER' | base64 -d)"
POSTGRES_PASSWORD="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_PASSWORD' | base64 -d)"
POSTGRES_DB="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_DB' | base64 -d)"
ssh_target "if ! dpkg -l | grep -q '^ii timescaledb-2-postgresql-${PG_VERSION} '; then curl -fsSL https://packagecloud.io/install/repositories/timescale/timescaledb/script.deb.sh | sudo bash && sudo apt-get update && sudo DEBIAN_FRONTEND=noninteractive apt-get install -y timescaledb-2-postgresql-${PG_VERSION}; fi"
CURRENT_PRELOAD="$(ssh_target "sudo -u postgres psql -Atqc \"show shared_preload_libraries\"")"
case ",${CURRENT_PRELOAD}," in
*,timescaledb,*)
NEW_PRELOAD="${CURRENT_PRELOAD}"
;;
,,)
NEW_PRELOAD="timescaledb"
;;
*)
NEW_PRELOAD="${CURRENT_PRELOAD},timescaledb"
;;
esac
ssh_target "sudo -u postgres psql -Atqc \"ALTER SYSTEM SET shared_preload_libraries = '${NEW_PRELOAD}';\" && sudo systemctl restart postgresql"
APP_USER_SQL=$(printf "%s" "$POSTGRES_USER" | sed "s/'/''/g")
APP_PASSWORD_SQL=$(printf "%s" "$POSTGRES_PASSWORD" | sed "s/'/''/g")
APP_DB_SQL=$(printf "%s" "$POSTGRES_DB" | sed "s/'/''/g")
ssh_target "sudo -u postgres psql -v ON_ERROR_STOP=1 <<'SQL'
DO \$\$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = '${APP_USER_SQL}') THEN
EXECUTE format('CREATE ROLE %I LOGIN PASSWORD %L', '${APP_USER_SQL}', '${APP_PASSWORD_SQL}');
ELSE
EXECUTE format('ALTER ROLE %I WITH LOGIN PASSWORD %L', '${APP_USER_SQL}', '${APP_PASSWORD_SQL}');
END IF;
END
\$\$;
SELECT format('CREATE DATABASE %I OWNER %I', '${APP_DB_SQL}', '${APP_USER_SQL}')
WHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${APP_DB_SQL}')
\\gexec
ALTER DATABASE \"${POSTGRES_DB}\" OWNER TO \"${POSTGRES_USER}\";
SQL"
echo "Prepared host Postgres on ${TARGET_HOST} for ${POSTGRES_USER}/${POSTGRES_DB} with TimescaleDB enabled"

View File

@@ -0,0 +1,33 @@
#!/usr/bin/env bash
set -euo pipefail
SOURCE_HOST="${SOURCE_HOST:-mevnode_bot}"
SOURCE_NAMESPACE="${SOURCE_NAMESPACE:-trade-staging}"
TARGET_HOST="${TARGET_HOST:-mevnode}"
TARGET_NAMESPACE="${TARGET_NAMESPACE:-trade-r001-canary}"
SECRETS=(
trade-postgres
trade-hasura
trade-api
trade-frontend-tokens
trade-basic-auth
)
ssh_source() {
ssh -o StrictHostKeyChecking=no "$SOURCE_HOST" "$@"
}
ssh_target() {
ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@"
}
ssh_target "sudo k3s kubectl get ns ${TARGET_NAMESPACE} >/dev/null 2>&1 || sudo k3s kubectl create ns ${TARGET_NAMESPACE} >/dev/null"
for secret_name in "${SECRETS[@]}"; do
SECRET_JSON="$(ssh_source "sudo k3s kubectl -n ${SOURCE_NAMESPACE} get secret ${secret_name} -o json")"
printf '%s' "$SECRET_JSON" \
| jq --arg ns "$TARGET_NAMESPACE" 'del(.metadata.uid,.metadata.resourceVersion,.metadata.creationTimestamp,.metadata.managedFields,.metadata.ownerReferences,.metadata.selfLink,.metadata.annotations["kubectl.kubernetes.io/last-applied-configuration"]) | .metadata.namespace = $ns' \
| ssh_target "sudo k3s kubectl apply -f - >/dev/null"
echo "Synced ${secret_name} to ${TARGET_NAMESPACE}"
done

View File

@@ -0,0 +1,87 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-api
namespace: trade-r001-canary
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: trade-api
template:
metadata:
labels:
app.kubernetes.io/name: trade-api
spec:
imagePullSecrets:
- name: gitea-registry
volumes:
- name: trade-api-wrapper
configMap:
name: trade-api-wrapper
- name: trade-api-upstream
configMap:
name: trade-api-upstream
containers:
- name: api
image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435
imagePullPolicy: IfNotPresent
command:
- node
- /override/wrapper.mjs
ports:
- name: http
containerPort: 8787
volumeMounts:
- name: trade-api-wrapper
mountPath: /override/wrapper.mjs
subPath: wrapper.mjs
readOnly: true
- name: trade-api-upstream
mountPath: /app/services/api/server.mjs
subPath: server.mjs
readOnly: true
env:
- name: PORT
value: "8787"
- name: UPSTREAM_PORT
value: "8788"
- name: UPSTREAM_ENTRY
value: /app/services/api/server.mjs
- name: UPSTREAM_HOST
value: 127.0.0.1
- name: APP_VERSION
value: canary-r001
- name: HASURA_GRAPHQL_URL
value: http://hasura:8080/v1/graphql
- name: HASURA_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-hasura
key: HASURA_GRAPHQL_ADMIN_SECRET
- name: API_ADMIN_SECRET
valueFrom:
secretKeyRef:
name: trade-api
key: API_ADMIN_SECRET
- name: CORS_ORIGIN
value: "*"
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: "1"
memory: 1Gi
readinessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 2
periodSeconds: 5
livenessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: trade-api
namespace: trade-r001-canary
spec:
type: ClusterIP
selector:
app.kubernetes.io/name: trade-api
ports:
- name: http
port: 8787
targetPort: http

View File

@@ -0,0 +1,75 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-frontend
namespace: trade-r001-canary
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: trade-frontend
template:
metadata:
labels:
app.kubernetes.io/name: trade-frontend
spec:
imagePullSecrets:
- name: gitea-registry
containers:
- name: frontend
image: gitea.mpabi.pl/trade/trade-frontend:pg-derived-phase1-20260319-030252
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8081
env:
- name: PORT
value: "8081"
- name: APP_VERSION
value: canary-r001
- name: API_UPSTREAM
value: http://trade-api:8787
- name: BASIC_AUTH_FILE
value: /tokens/frontend.json
- name: API_READ_TOKEN_FILE
value: /tokens/read.json
- name: BASIC_AUTH_MODE
value: "off"
- name: AUTH_MODE
value: session
- name: HTPASSWD_FILE
value: /auth/users
volumeMounts:
- name: auth
mountPath: /auth/users
readOnly: true
subPath: users
- name: tokens
mountPath: /tokens
readOnly: true
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
readinessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 2
periodSeconds: 5
livenessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 10
periodSeconds: 10
volumes:
- name: auth
secret:
secretName: trade-basic-auth
- name: tokens
secret:
secretName: trade-frontend-tokens

View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: trade-frontend
namespace: trade-r001-canary
spec:
type: ClusterIP
selector:
app.kubernetes.io/name: trade-frontend
ports:
- name: http
port: 8081
targetPort: http