diff --git a/.gitea/workflows/deploy-trade-r001-canary.yaml b/.gitea/workflows/deploy-trade-r001-canary.yaml index 855e36e..01f3f54 100644 --- a/.gitea/workflows/deploy-trade-r001-canary.yaml +++ b/.gitea/workflows/deploy-trade-r001-canary.yaml @@ -30,13 +30,47 @@ jobs: install -m 0755 /tmp/kubectl /usr/local/bin/kubectl kubectl version --client - - name: Apply canary namespace + - name: Verify prerequisite secrets + env: + KUBECONFIG: /tmp/kubeconfig + run: | + kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth gitea-registry + + - name: Recreate bootstrap jobs + env: + KUBECONFIG: /tmp/kubeconfig + run: | + kubectl -n trade-r001-canary delete job postgres-migrate hasura-bootstrap --ignore-not-found=true + + - name: Apply canary environment env: KUBECONFIG: /tmp/kubeconfig run: | kubectl apply -k environments/sol/trade-r001-canary kubectl get ns trade-r001-canary --show-labels - kubectl -n trade-r001-canary get resourcequota,limitrange + kubectl -n trade-r001-canary get svc,resourcequota,limitrange + + - name: Restart application surface + env: + KUBECONFIG: /tmp/kubeconfig + run: | + kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend + + - name: Wait for database and metadata bootstrap + env: + KUBECONFIG: /tmp/kubeconfig + run: | + kubectl -n trade-r001-canary wait --for=condition=complete job/postgres-migrate --timeout=300s + kubectl -n trade-r001-canary wait --for=condition=complete job/hasura-bootstrap --timeout=300s + + - name: Wait for application rollouts + env: + KUBECONFIG: /tmp/kubeconfig + run: | + kubectl -n trade-r001-canary rollout status deploy/hasura --timeout=300s + kubectl -n trade-r001-canary rollout status deploy/trade-api --timeout=300s + kubectl -n trade-r001-canary rollout status deploy/trade-frontend --timeout=300s + kubectl -n trade-r001-canary get deploy,pods -o wide - name: Verify canary namespace connectivity env: @@ -60,4 +94,19 @@ jobs: with socket.create_connection((host, port), timeout=5): print(f"OK {host}:{port}") PY + kubectl -n trade-r001-canary exec canary-netcheck -- python - <<'PY' + import urllib.request + + targets = [ + "http://hasura:8080/healthz", + "http://trade-api:8787/healthz", + "http://trade-frontend:8081/healthz", + ] + + for url in targets: + with urllib.request.urlopen(url, timeout=10) as response: + if response.status != 200: + raise SystemExit(f"Unexpected status for {url}: {response.status}") + print(f"OK {url}") + PY kubectl -n trade-r001-canary delete pod canary-netcheck --wait=true diff --git a/environments/sol/trade-r001-canary/README.md b/environments/sol/trade-r001-canary/README.md index dc461d1..60f72de 100644 --- a/environments/sol/trade-r001-canary/README.md +++ b/environments/sol/trade-r001-canary/README.md @@ -7,6 +7,7 @@ Minimal canary namespace for migration baseline `R001` on `sol`. - Reserve a dedicated namespace for the first reconstructed trade deployment. - Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land. - Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`. +- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, and `trade-frontend`. ## Current Guardrails @@ -30,7 +31,28 @@ Minimal canary namespace for migration baseline `R001` on `sol`. ## Notes - This namespace is intentionally conservative until item `14` and the validator protection envelope are fully defined. -- The namespace does not yet recreate `gitea-registry`; that remains a separate migration prerequisite. - Current shared infrastructure endpoints expected by canary workloads: - `postgres-host.trade-infra.svc.cluster.local:5432` - `redis-host.trade-infra.svc.cluster.local:6379` + +## Application Surface + +- `postgres` in this namespace is an `ExternalName` alias that points to `postgres-host.trade-infra.svc.cluster.local`. +- `Hasura` uses the live `R001` secrets copied from `trade-staging`, but connects to the host `Postgres` on `sol`. +- `trade-api` and `trade-frontend` use the current live images from Gitea registry and the same bootstrap wrapper/config pattern as the source environment. +- The canary workflow re-runs: + - `postgres-migrate` + - `hasura-bootstrap` + before it waits for `Hasura`, `trade-api`, and `trade-frontend` to become healthy. + +## Operator Flow + +From the repository root: + +```bash +./environments/sol/trade-r001-canary/scripts/prepare-sol-postgres.sh +./environments/sol/trade-r001-canary/scripts/create-gitea-registry-secret.sh +./environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh +``` + +After the prerequisites are seeded, push to `main` and let `deploy-trade-r001-canary` apply the environment. diff --git a/environments/sol/trade-r001-canary/assets/api/server.mjs b/environments/sol/trade-r001-canary/assets/api/server.mjs new file mode 100644 index 0000000..00233fb --- /dev/null +++ b/environments/sol/trade-r001-canary/assets/api/server.mjs @@ -0,0 +1,1793 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import http from 'node:http'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url)); +const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '..', '..'); +const TOKENS_DIR = path.join(PROJECT_ROOT, 'tokens'); + +function readJsonFile(filePath) { + try { + const raw = fs.readFileSync(filePath, 'utf8'); + return JSON.parse(raw); + } catch { + return undefined; + } +} + +function getIsoNow() { + return new Date().toISOString(); +} + +function base64Url(buf) { + return Buffer.from(buf) + .toString('base64') + .replace(/=/g, '') + .replace(/\+/g, '-') + .replace(/\//g, '_'); +} + +function sha256Hex(text) { + return crypto.createHash('sha256').update(text, 'utf8').digest('hex'); +} + +function generateToken() { + return `alg_${base64Url(crypto.randomBytes(32))}`; +} + +function getHeader(req, name) { + const v = req.headers[String(name).toLowerCase()]; + return Array.isArray(v) ? v[0] : v; +} + +function readBearerToken(req) { + const auth = getHeader(req, 'authorization'); + if (auth && typeof auth === 'string') { + const m = auth.match(/^Bearer\s+(.+)$/i); + if (m && m[1]) return m[1].trim(); + } + const apiKey = getHeader(req, 'x-api-key'); + if (apiKey && typeof apiKey === 'string' && apiKey.trim()) return apiKey.trim(); + return undefined; +} + +function withCors(res, corsOrigin) { + res.setHeader('access-control-allow-origin', corsOrigin); + res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS'); + res.setHeader( + 'access-control-allow-headers', + 'content-type, authorization, x-api-key, x-admin-secret' + ); +} + +function sendJson(res, status, body, corsOrigin) { + withCors(res, corsOrigin); + res.statusCode = status; + res.setHeader('content-type', 'application/json; charset=utf-8'); + res.end(JSON.stringify(body)); +} + +async function readBodyJson(req, { maxBytes }) { + const chunks = []; + let total = 0; + for await (const chunk of req) { + total += chunk.length; + if (total > maxBytes) throw new Error('payload_too_large'); + chunks.push(chunk); + } + const text = Buffer.concat(chunks).toString('utf8'); + if (!text.trim()) return {}; + try { + return JSON.parse(text); + } catch { + throw new Error('invalid_json'); + } +} + +function normalizeGraphqlName(value, fallback, label) { + const v = String(value || fallback || '') + .trim() + .toLowerCase(); + if (!v) return String(fallback || ''); + if (!/^[a-z][a-z0-9_]*$/.test(v)) throw new Error(`Invalid ${label}: ${value}`); + return v; +} + +function resolveConfig() { + const hasuraTokens = readJsonFile(path.join(TOKENS_DIR, 'hasura.json')) || {}; + const apiTokens = readJsonFile(path.join(TOKENS_DIR, 'api.json')) || {}; + + const portRaw = process.env.PORT || process.env.API_PORT || apiTokens.port || '8787'; + const port = Number.parseInt(String(portRaw), 10); + if (!Number.isInteger(port) || port <= 0) throw new Error(`Invalid PORT: ${portRaw}`); + + const hasuraUrl = + process.env.HASURA_GRAPHQL_URL || + hasuraTokens.graphqlUrl || + hasuraTokens.apiUrl || + 'http://localhost:8080/v1/graphql'; + + const hasuraAdminSecret = + process.env.HASURA_ADMIN_SECRET || hasuraTokens.adminSecret || hasuraTokens.hasuraAdminSecret; + + const apiAdminSecret = process.env.API_ADMIN_SECRET || apiTokens.adminSecret; + + const corsOrigin = process.env.CORS_ORIGIN || apiTokens.corsOrigin || '*'; + + const appVersion = String(process.env.APP_VERSION || 'v1').trim() || 'v1'; + const buildTimestamp = String(process.env.BUILD_TIMESTAMP || '').trim() || undefined; + const startedAt = getIsoNow(); + + const ticksTable = normalizeGraphqlName(process.env.TICKS_TABLE, 'drift_ticks', 'TICKS_TABLE'); + const candlesFunction = normalizeGraphqlName( + process.env.CANDLES_FUNCTION, + 'get_drift_candles', + 'CANDLES_FUNCTION' + ); + + return { + port, + hasuraUrl, + hasuraAdminSecret, + apiAdminSecret, + corsOrigin, + appVersion, + buildTimestamp, + startedAt, + ticksTable, + candlesFunction, + }; +} + +async function hasuraRequest(cfg, { admin }, query, variables) { + const headers = { 'content-type': 'application/json' }; + if (admin) { + if (!cfg.hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET (or tokens/hasura.json adminSecret)'); + headers['x-hasura-admin-secret'] = cfg.hasuraAdminSecret; + } + + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers, + body: JSON.stringify({ query, variables }), + }); + + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + + let json; + try { + json = JSON.parse(text); + } catch { + throw new Error(`Hasura invalid JSON: ${text}`); + } + if (json.errors?.length) { + throw new Error(json.errors.map((e) => e.message).join(' | ')); + } + return json.data; +} + +async function readSolPriceUsd(cfg) { + // Best-effort: use DLOB stats for SOL-PERP if available, else latest tick mark/oracle. + try { + const q = ` + query SolPrice { + dlob_stats_latest(where: {market_name: {_eq: "SOL-PERP"}}, limit: 1) { + mid_price + mark_price + oracle_price + } + } + `; + const data = await hasuraRequest(cfg, { admin: true }, q, {}); + const row = data?.dlob_stats_latest?.[0]; + const p = parseNumeric(row?.mid_price) ?? parseNumeric(row?.mark_price) ?? parseNumeric(row?.oracle_price); + if (p != null && p > 0) return p; + } catch { + // ignore + } + + try { + const table = cfg.ticksTable; + const q = ` + query SolTick($limit: Int!) { + ${table}(where: {symbol: {_eq: "SOL-PERP"}}, order_by: {ts: desc}, limit: $limit) { + mark_price + oracle_price + } + } + `; + const data = await hasuraRequest(cfg, { admin: true }, q, { limit: 1 }); + const row = data?.[table]?.[0]; + const p = parseNumeric(row?.mark_price) ?? parseNumeric(row?.oracle_price); + if (p != null && p > 0) return p; + } catch { + // ignore + } + + return null; +} + +function normalizeScopes(value) { + if (!value) return []; + if (Array.isArray(value)) return value.map((v) => String(v)).filter(Boolean); + if (typeof value === 'string') { + return value + .split(',') + .map((s) => s.trim()) + .filter(Boolean); + } + return []; +} + +async function requireValidToken(cfg, req, requiredScope) { + const token = readBearerToken(req); + if (!token) return { ok: false, status: 401, error: 'missing_token' }; + + const hash = sha256Hex(token); + const query = ` + query ValidToken($hash: String!) { + api_tokens(where: {token_hash: {_eq: $hash}, revoked_at: {_is_null: true}}, limit: 1) { + id + name + scopes + } + } + `; + + let data; + try { + data = await hasuraRequest(cfg, { admin: true }, query, { hash }); + } catch (err) { + return { ok: false, status: 500, error: String(err?.message || err) }; + } + + const row = data?.api_tokens?.[0]; + if (!row?.id) return { ok: false, status: 401, error: 'invalid_or_revoked_token' }; + const scopes = normalizeScopes(row.scopes); + if (requiredScope && !scopes.includes(requiredScope)) { + return { ok: false, status: 403, error: 'missing_scope' }; + } + + // best-effort touch + const touch = ` + mutation TouchToken($id: uuid!, $ts: timestamptz!) { + update_api_tokens_by_pk(pk_columns: {id: $id}, _set: {last_used_at: $ts}) { id } + } + `; + hasuraRequest(cfg, { admin: true }, touch, { id: row.id, ts: new Date().toISOString() }).catch(() => {}); + + return { ok: true, token: { id: row.id, name: row.name } }; +} + +function toNumericString(value, fieldName) { + if (value == null) throw new Error(`invalid_${fieldName}`); + if (typeof value === 'number') { + if (!Number.isFinite(value)) throw new Error(`invalid_${fieldName}`); + return String(value); + } + if (typeof value === 'string') { + const s = value.trim(); + if (!s) throw new Error(`invalid_${fieldName}`); + const n = Number(s); + if (!Number.isFinite(n)) throw new Error(`invalid_${fieldName}`); + return s; + } + // best-effort: allow bigint-like or BN-like objects + if (typeof value?.toString === 'function') { + const s = String(value.toString()).trim(); + if (!s) throw new Error(`invalid_${fieldName}`); + const n = Number(s); + if (!Number.isFinite(n)) throw new Error(`invalid_${fieldName}`); + return s; + } + throw new Error(`invalid_${fieldName}`); +} + +function normalizeTick(input, tokenInfo) { + const ts = (input?.ts || input?.timestamp || getIsoNow())?.toString?.(); + const market_index = input?.market_index ?? input?.marketIndex; + const symbol = input?.symbol; + const oracle_price = input?.oracle_price ?? input?.oraclePrice ?? input?.price; + const mark_price = input?.mark_price ?? input?.markPrice ?? input?.mark; + const oracle_slot = input?.oracle_slot ?? input?.oracleSlot ?? input?.slot; + const source = (input?.source || 'api')?.toString?.(); + const raw = input?.raw && typeof input.raw === 'object' ? input.raw : undefined; + + if (!ts || Number.isNaN(Date.parse(ts))) throw new Error('invalid_ts'); + if (!Number.isInteger(market_index)) throw new Error('invalid_market_index'); + if (typeof symbol !== 'string' || !symbol.trim()) throw new Error('invalid_symbol'); + const oracleStr = toNumericString(oracle_price, 'oracle_price'); + const markStr = mark_price == null ? undefined : toNumericString(mark_price, 'mark_price'); + + const slotNum = + oracle_slot == null ? undefined : Number.isFinite(Number(oracle_slot)) ? Number(oracle_slot) : undefined; + + const mergedRaw = + raw || tokenInfo + ? { + ...(raw || {}), + ingestedBy: tokenInfo ? { tokenId: tokenInfo.id, name: tokenInfo.name } : undefined, + } + : undefined; + + return { + ts, + market_index, + symbol: symbol.trim(), + // Postgres columns are NUMERIC; Hasura `numeric` scalar returns strings and expects string inputs. + oracle_price: oracleStr, + mark_price: markStr, + oracle_slot: slotNum, + source, + raw: mergedRaw, + }; +} + +async function insertTick(cfg, tick) { + const table = cfg.ticksTable; + const insertField = `insert_${table}_one`; + const mutation = ` + mutation InsertTick($object: ${table}_insert_input!) { + ${insertField}(object: $object) { + id + } + } + `; + + const data = await hasuraRequest(cfg, { admin: true }, mutation, { object: tick }); + return data?.[insertField]?.id; +} + +async function createApiToken(cfg, name, scopes, meta) { + const mutation = ` + mutation CreateToken($name: String!, $hash: String!, $scopes: [String!]!, $meta: jsonb) { + insert_api_tokens_one(object: {name: $name, token_hash: $hash, scopes: $scopes, meta: $meta}) { + id + name + created_at + } + } + `; + + for (let attempt = 0; attempt < 5; attempt++) { + const token = generateToken(); + const hash = sha256Hex(token); + try { + const data = await hasuraRequest(cfg, { admin: true }, mutation, { name, hash, scopes, meta }); + const row = data?.insert_api_tokens_one; + if (!row?.id) throw new Error('token_insert_failed'); + return { token, row }; + } catch (err) { + const msg = String(err?.message || err); + const isUniqueConflict = msg.toLowerCase().includes('unique') || msg.toLowerCase().includes('constraint'); + if (!isUniqueConflict) throw err; + } + } + throw new Error('token_generation_failed'); +} + +async function revokeApiToken(cfg, id) { + const mutation = ` + mutation RevokeToken($id: uuid!, $ts: timestamptz!) { + update_api_tokens_by_pk(pk_columns: {id: $id}, _set: {revoked_at: $ts}) { + id + revoked_at + } + } + `; + const data = await hasuraRequest(cfg, { admin: true }, mutation, { id, ts: new Date().toISOString() }); + return data?.update_api_tokens_by_pk?.id; +} + +function clampInt(value, min, max) { + const n = Number.parseInt(String(value), 10); + if (!Number.isFinite(n) || !Number.isInteger(n)) return min; + return Math.min(max, Math.max(min, n)); +} + +function clampNumber(value, min, max, fallback) { + const n = typeof value === 'number' ? value : Number(value); + if (!Number.isFinite(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function parseNumeric(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value === 'string') { + const s = value.trim(); + if (!s) return null; + const n = Number(s); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function tsToUnixSeconds(value) { + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value !== 'string') return null; + const ms = Date.parse(value); + if (!Number.isFinite(ms)) return null; + return Math.floor(ms / 1000); +} + +function isUuid(value) { + const s = String(value || '').trim(); + return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(s); +} + +function getByPath(obj, pathStr) { + if (!obj || typeof obj !== 'object') return undefined; + const parts = String(pathStr || '').split('.').filter(Boolean); + let cur = obj; + for (const p of parts) { + if (!cur || typeof cur !== 'object') return undefined; + cur = cur[p]; + } + return cur; +} + +function readNumberFromPayload(payload, paths) { + for (const p of paths) { + const v = getByPath(payload, p); + const n = parseNumeric(v); + if (n != null) return n; + } + return null; +} + +function readTextFromPayload(payload, paths) { + for (const p of paths) { + const v = getByPath(payload, p); + if (typeof v === 'string' && v.trim()) return v.trim(); + } + return null; +} + +function inferContractSizeUsd(contract) { + return ( + readNumberFromPayload(contract, [ + 'desired.size_usd', + 'desired.sizeUsd', + 'desired.notional_usd', + 'desired.notionalUsd', + 'entry.size_usd', + 'entry.sizeUsd', + 'entry.notional_usd', + 'entry.notionalUsd', + 'entry.order_intent.size_usd', + 'entry.order_intent.sizeUsd', + 'desired.order_intent.size_usd', + 'desired.order_intent.sizeUsd', + ]) || null + ); +} + +function inferContractSide(contract) { + const raw = + readTextFromPayload(contract, [ + 'desired.side', + 'entry.side', + 'entry.order_intent.side', + 'desired.order_intent.side', + 'desired.direction', + 'entry.direction', + ]) || ''; + const v = raw.toLowerCase(); + if (v === 'long' || v === 'buy') return 'long'; + if (v === 'short' || v === 'sell') return 'short'; + return null; +} + +function sumCostsFromEvents(events) { + const totals = { + tradeFeeUsd: 0, + txFeeUsd: 0, + slippageUsd: 0, + fundingUsd: 0, + realizedPnlUsd: 0, + txCount: 0, + fillCount: 0, + cancelCount: 0, + modifyCount: 0, + errorCount: 0, + }; + + for (const ev of events || []) { + const t = String(ev?.event_type || '').toLowerCase(); + const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {}; + + const tradeFeeUsd = + readNumberFromPayload(payload, ['realized_fee_usd', 'trade_fee_usd', 'fee_usd', 'fees.trade_fee_usd', 'fees.usd']) || 0; + const txFeeUsd = + readNumberFromPayload(payload, ['realized_tx_usd', 'tx_fee_usd', 'network_fee_usd', 'fees.tx_fee_usd', 'fees.network_usd']) || 0; + const slippageUsd = + readNumberFromPayload(payload, ['slippage_usd', 'realized_slippage_usd', 'execution_usd', 'realized_execution_usd']) || 0; + const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0; + const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0; + const txCount = readNumberFromPayload(payload, ['tx_count', 'txCount']) || 0; + + totals.tradeFeeUsd += tradeFeeUsd; + totals.txFeeUsd += txFeeUsd; + totals.slippageUsd += slippageUsd; + totals.fundingUsd += fundingUsd; + totals.realizedPnlUsd += pnlUsd; + totals.txCount += txCount; + + if (t.includes('fill')) totals.fillCount += 1; + if (t.includes('cancel')) totals.cancelCount += 1; + if (t.includes('modify') || t.includes('reprice')) totals.modifyCount += 1; + if (t.includes('error') || String(ev?.severity || '').toLowerCase() === 'error') totals.errorCount += 1; + } + + const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd; + + return { + ...totals, + totalCostsUsd, + netPnlUsd: totals.realizedPnlUsd - totalCostsUsd, + }; +} + +function buildCostSeriesFromEvents(events, { maxPoints }) { + const points = []; + const totals = { + tradeFeeUsd: 0, + txFeeUsd: 0, + slippageUsd: 0, + fundingUsd: 0, + realizedPnlUsd: 0, + }; + + for (const ev of events || []) { + const ts = ev?.ts; + if (!ts) continue; + const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {}; + + const tradeFeeUsd = + readNumberFromPayload(payload, ['realized_fee_usd', 'trade_fee_usd', 'fee_usd', 'fees.trade_fee_usd', 'fees.usd']) || 0; + const txFeeUsd = + readNumberFromPayload(payload, ['realized_tx_usd', 'tx_fee_usd', 'network_fee_usd', 'fees.tx_fee_usd', 'fees.network_usd']) || 0; + const slippageUsd = + readNumberFromPayload(payload, ['slippage_usd', 'realized_slippage_usd', 'execution_usd', 'realized_execution_usd']) || 0; + const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0; + const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0; + + totals.tradeFeeUsd += tradeFeeUsd; + totals.txFeeUsd += txFeeUsd; + totals.slippageUsd += slippageUsd; + totals.fundingUsd += fundingUsd; + totals.realizedPnlUsd += pnlUsd; + + const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd; + points.push({ + ts, + tradeFeeUsd: totals.tradeFeeUsd, + txFeeUsd: totals.txFeeUsd, + slippageUsd: totals.slippageUsd, + fundingUsd: totals.fundingUsd, + totalCostsUsd, + realizedPnlUsd: totals.realizedPnlUsd, + netPnlUsd: totals.realizedPnlUsd - totalCostsUsd, + }); + } + + const cap = Math.max(50, Math.min(10_000, Number(maxPoints) || 600)); + if (points.length <= cap) return points; + + const step = Math.ceil(points.length / cap); + const sampled = []; + for (let i = 0; i < points.length; i += step) sampled.push(points[i]); + const last = points[points.length - 1]; + if (sampled[sampled.length - 1] !== last) sampled.push(last); + return sampled; +} + +function flowFromDelta(delta) { + if (delta > 0) return { up: 1, down: 0, flat: 0 }; + if (delta < 0) return { up: 0, down: 1, flat: 0 }; + return { up: 0, down: 0, flat: 1 }; +} + +function computeCandleFlowFromTicks({ candle, bucketSeconds, points, nowSec, isCurrent }) { + const start = candle.time; + const end = start + bucketSeconds; + const progressEnd = isCurrent ? Math.min(end, Math.max(start, nowSec)) : end; + + const totalWindow = progressEnd - start; + if (!(totalWindow > 0)) return flowFromDelta(candle.close - candle.open); + + let up = 0; + let down = 0; + let flat = 0; + + let prevT = start; + let prevP = candle.open; + + for (const pt of points || []) { + const t = clampNumber(pt.t, prevT, progressEnd, prevT); + const dt = t - prevT; + if (dt > 0) { + const delta = pt.p - prevP; + if (delta > 0) up += dt; + else if (delta < 0) down += dt; + else flat += dt; + prevT = t; + } + prevP = pt.p; + if (prevT >= progressEnd) break; + } + + const tail = progressEnd - prevT; + if (tail > 0) flat += tail; + + const sum = up + down + flat; + if (!(sum > 0)) return flowFromDelta(candle.close - candle.open); + return { up: up / sum, down: down / sum, flat: flat / sum }; +} + +function computeCandleFlowRowsFromTicks({ candle, bucketSeconds, points, rows, nowSec, isCurrent }) { + const start = candle.time; + const end = start + bucketSeconds; + const progressEnd = isCurrent ? Math.min(end, Math.max(start, nowSec)) : end; + const totalWindow = progressEnd - start; + if (!(totalWindow > 0)) { + const overall = candle.close - candle.open; + const dir = overall > 0 ? 1 : overall < 0 ? -1 : 0; + return new Array(rows).fill(dir); + } + + const rowDirs = new Array(rows).fill(0); + + const pts = [{ t: start, p: candle.open }, ...(points || []), { t: progressEnd, p: candle.close }]; + for (let i = 1; i < pts.length; i += 1) { + const a = pts[i - 1]; + const b = pts[i]; + const t1 = clampNumber(a.t, start, progressEnd, start); + const t2 = clampNumber(b.t, start, progressEnd, start); + if (!(t2 > t1)) continue; + + const delta = b.p - a.p; + const dir = delta > 0 ? 1 : delta < 0 ? -1 : 0; + + const from = Math.max(0, Math.min(rows - 1, Math.floor(((t1 - start) / bucketSeconds) * rows))); + const to = Math.max(0, Math.min(rows - 1, Math.floor(((t2 - start) / bucketSeconds) * rows))); + for (let r = from; r <= to; r += 1) rowDirs[r] = dir; + } + + return rowDirs; +} + +function computeCandleFlowSlicesFromTicks({ candle, bucketSeconds, points, rows, nowSec, isCurrent }) { + const start = candle.time; + const end = start + bucketSeconds; + const progressEnd = isCurrent ? Math.min(end, Math.max(start, nowSec)) : end; + const dirs = new Array(rows).fill(0); + const moves = new Array(rows).fill(0); + + const totalWindow = progressEnd - start; + if (!(totalWindow > 0)) { + const overall = candle.close - candle.open; + const dir = overall > 0 ? 1 : overall < 0 ? -1 : 0; + dirs.fill(dir); + return { dirs, moves }; + } + + const pts = [...(points || []), { t: progressEnd, p: candle.close }]; + let idx = 0; + let lastP = candle.open; + + for (let r = 0; r < rows; r += 1) { + const sliceStart = start + (r / rows) * bucketSeconds; + if (!(sliceStart < progressEnd)) break; + const sliceEnd = Math.min(progressEnd, start + ((r + 1) / rows) * bucketSeconds); + + while (idx < pts.length && pts[idx].t <= sliceStart) { + lastP = pts[idx].p; + idx += 1; + } + const pStart = lastP; + + while (idx < pts.length && pts[idx].t <= sliceEnd) { + lastP = pts[idx].p; + idx += 1; + } + const pEnd = lastP; + + const delta = pEnd - pStart; + const dir = delta > 0 ? 1 : delta < 0 ? -1 : 0; + const move = Math.abs(delta); + dirs[r] = dir; + moves[r] = Number.isFinite(move) ? move : 0; + } + + return { dirs, moves }; +} + +function computeCandleFlowMovesFromTicks({ candle, bucketSeconds, points, rows, nowSec, isCurrent }) { + const start = candle.time; + const end = start + bucketSeconds; + const progressEnd = isCurrent ? Math.min(end, Math.max(start, nowSec)) : end; + const totalWindow = progressEnd - start; + const out = new Array(rows).fill(0); + if (!(totalWindow > 0)) return out; + + const pts = [...(points || []), { t: progressEnd, p: candle.close }]; + let idx = 0; + let lastP = candle.open; + + for (let r = 0; r < rows; r += 1) { + const sliceStart = start + (r / rows) * bucketSeconds; + if (!(sliceStart < progressEnd)) break; + const sliceEnd = Math.min(progressEnd, start + ((r + 1) / rows) * bucketSeconds); + + while (idx < pts.length && pts[idx].t <= sliceStart) { + lastP = pts[idx].p; + idx += 1; + } + const pStart = lastP; + + while (idx < pts.length && pts[idx].t <= sliceEnd) { + lastP = pts[idx].p; + idx += 1; + } + const pEnd = lastP; + + const move = Math.abs(pEnd - pStart); + out[r] = Number.isFinite(move) ? move : 0; + } + + return out; +} + +function parseTimeframeToSeconds(tf) { + const v = normalizeChartTimeframe(tf); + if (!v) return 60; + const m = v.match(/^(\d+)(s|m|h|d)$/); + if (!m) throw new Error(`invalid_tf`); + const num = Number.parseInt(m[1], 10); + if (!Number.isInteger(num) || num <= 0) throw new Error(`invalid_tf`); + const unit = m[2]; + const mult = unit === 's' ? 1 : unit === 'm' ? 60 : unit === 'h' ? 3600 : 86400; + return num * mult; +} + +const SUPPORTED_CHART_TIMEFRAMES = new Set([ + '1s', + '5s', + '15s', + '30s', + '1m', + '5m', + '15m', + '30m', + '1h', + '4h', + '8h', + '12h', + '1d', +]); + +function normalizeChartTimeframe(tf) { + const raw = String(tf || '').trim(); + if (!raw) return '1m'; + + const aliased = + raw === '1D' || raw === '24h' || raw === '24H' + ? '1d' + : raw === '1H' + ? '1h' + : raw.toLowerCase(); + + if (!SUPPORTED_CHART_TIMEFRAMES.has(aliased)) { + throw new Error('invalid_tf'); + } + + return aliased; +} + +function fillForwardCandles(candles, { bucketSeconds, limit, nowSec }) { + if (!Array.isArray(candles) || candles.length === 0) return []; + if (!Number.isFinite(bucketSeconds) || bucketSeconds <= 0) return candles; + if (!Number.isFinite(limit) || limit <= 0) return candles; + + // `candles` should be ascending by time. + const cleaned = candles + .filter((c) => c && Number.isFinite(c.time) && Number.isFinite(c.close)) + .slice() + .sort((a, b) => a.time - b.time); + + if (cleaned.length === 0) return []; + + const map = new Map(cleaned.map((c) => [c.time, c])); + const lastDataTime = cleaned[cleaned.length - 1].time; + const now = Number.isFinite(nowSec) ? nowSec : Math.floor(Date.now() / 1000); + const alignedNow = Math.floor(now / bucketSeconds) * bucketSeconds; + const endTime = Math.max(alignedNow, lastDataTime); + const startTime = endTime - bucketSeconds * (limit - 1); + + const baseline = cleaned[0]; + const out = []; + out.length = limit; + + let prev = null; + for (let i = 0; i < limit; i += 1) { + const t = startTime + i * bucketSeconds; + const hit = map.get(t); + if (hit) { + const c = { ...hit }; + c.volume = Number.isFinite(c.volume) ? c.volume : 0; + // Keep continuity: next candle opens where previous candle closed. + // This avoids visual "gaps" when ticks are sparse. + if (prev && Number.isFinite(prev.close)) { + const prevClose = Number(prev.close); + c.open = prevClose; + c.high = Math.max(Number(c.high), prevClose, Number(c.close)); + c.low = Math.min(Number(c.low), prevClose, Number(c.close)); + } + out[i] = c; + prev = c; + continue; + } + + const base = prev || baseline; + const close = Number(base.close); + const oracle = base.oracle == null ? null : Number(base.oracle); + + const filled = { + time: t, + open: close, + high: close, + low: close, + close, + volume: 0, + oracle: Number.isFinite(oracle) ? oracle : null, + }; + out[i] = filled; + prev = filled; + } + + return out.filter((c) => c && Number.isFinite(c.time) && Number.isFinite(c.open) && Number.isFinite(c.close)); +} + +function pickFlowPointBucketSeconds(bucketSeconds, rowsPerCandle) { + // We want a point step that is: + // - small enough to capture intra-candle direction, + // - but derived from already-cached candle buckets (1s/3s/5s/...). + const targetStep = Math.max(1, Math.floor(bucketSeconds / Math.max(1, rowsPerCandle))); + const candidates = [1, 3, 5, 15, 30, 60, 180, 300, 900, 1800, 3600, 14_400, 43_200, 86_400]; + let best = candidates[0]; + for (const c of candidates) { + if (c <= targetStep) best = c; + } + return best; +} + +function mean(values) { + if (!values.length) return 0; + return values.reduce((a, b) => a + b, 0) / values.length; +} + +function stddev(values) { + if (!values.length) return 0; + const m = mean(values); + const v = values.reduce((acc, x) => acc + (x - m) * (x - m), 0) / values.length; + return Math.sqrt(v); +} + +function sma(values, period) { + if (period <= 0) throw new Error('period must be > 0'); + const out = new Array(values.length).fill(null); + let sum = 0; + for (let i = 0; i < values.length; i++) { + sum += values[i]; + if (i >= period) sum -= values[i - period]; + if (i >= period - 1) out[i] = sum / period; + } + return out; +} + +function ema(values, period) { + if (period <= 0) throw new Error('period must be > 0'); + const out = new Array(values.length).fill(null); + const k = 2 / (period + 1); + if (values.length < period) return out; + const first = mean(values.slice(0, period)); + out[period - 1] = first; + let prev = first; + for (let i = period; i < values.length; i++) { + const next = values[i] * k + prev * (1 - k); + out[i] = next; + prev = next; + } + return out; +} + +function rsi(values, period) { + if (period <= 0) throw new Error('period must be > 0'); + const out = new Array(values.length).fill(null); + if (values.length <= period) return out; + + let gains = 0; + let losses = 0; + for (let i = 1; i <= period; i++) { + const change = values[i] - values[i - 1]; + if (change >= 0) gains += change; + else losses -= change; + } + let avgGain = gains / period; + let avgLoss = losses / period; + + const rs = avgLoss === 0 ? Number.POSITIVE_INFINITY : avgGain / avgLoss; + out[period] = 100 - 100 / (1 + rs); + + for (let i = period + 1; i < values.length; i++) { + const change = values[i] - values[i - 1]; + const gain = Math.max(change, 0); + const loss = Math.max(-change, 0); + avgGain = (avgGain * (period - 1) + gain) / period; + avgLoss = (avgLoss * (period - 1) + loss) / period; + const rs2 = avgLoss === 0 ? Number.POSITIVE_INFINITY : avgGain / avgLoss; + out[i] = 100 - 100 / (1 + rs2); + } + + return out; +} + +function bollingerBands(values, period, stdDevMult) { + if (period <= 0) throw new Error('period must be > 0'); + const upper = new Array(values.length).fill(null); + const lower = new Array(values.length).fill(null); + const mid = sma(values, period); + + for (let i = period - 1; i < values.length; i++) { + const window = values.slice(i - period + 1, i + 1); + const sd = stddev(window); + const m = mid[i]; + if (m == null) continue; + upper[i] = m + stdDevMult * sd; + lower[i] = m - stdDevMult * sd; + } + + return { upper, lower, mid }; +} + +function macd(values, fastPeriod = 12, slowPeriod = 26, signalPeriod = 9) { + const fast = ema(values, fastPeriod); + const slow = ema(values, slowPeriod); + const macdLine = values.map((_, i) => { + const f = fast[i]; + const s = slow[i]; + return f == null || s == null ? null : f - s; + }); + + const signal = new Array(values.length).fill(null); + const k = 2 / (signalPeriod + 1); + let seeded = false; + let prev = 0; + const buf = []; + + for (let i = 0; i < macdLine.length; i++) { + const v = macdLine[i]; + if (v == null) continue; + + if (!seeded) { + buf.push(v); + if (buf.length === signalPeriod) { + const first = mean(buf); + signal[i] = first; + prev = first; + seeded = true; + } + continue; + } + + const next = v * k + prev * (1 - k); + signal[i] = next; + prev = next; + } + + return { macd: macdLine, signal }; +} + +function toSeries(times, values) { + return times.map((t, i) => ({ time: t, value: values[i] ?? null })); +} + +function isAdmin(cfg, req) { + if (!cfg.apiAdminSecret) return false; + const provided = getHeader(req, 'x-admin-secret'); + return typeof provided === 'string' && provided === cfg.apiAdminSecret; +} + +async function handler(cfg, req, res) { + if (req.method === 'OPTIONS') { + withCors(res, cfg.corsOrigin); + res.statusCode = 204; + res.end(); + return; + } + + const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`); + const pathname = url.pathname; + + if (req.method === 'GET' && pathname === '/healthz') { + sendJson( + res, + 200, + { + ok: true, + version: cfg.appVersion, + buildTimestamp: cfg.buildTimestamp, + startedAt: cfg.startedAt, + ticksTable: cfg.ticksTable, + candlesFunction: cfg.candlesFunction, + }, + cfg.corsOrigin + ); + return; + } + + if (req.method === 'GET' && pathname === '/v1/chart') { + const auth = await requireValidToken(cfg, req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }, cfg.corsOrigin); + return; + } + + const symbol = (url.searchParams.get('symbol') || '').trim(); + const source = (url.searchParams.get('source') || '').trim(); + const basisRaw = (url.searchParams.get('basis') || '').trim().toLowerCase(); + const tf = (url.searchParams.get('tf') || url.searchParams.get('timeframe') || '1m').trim(); + const limit = clampInt(url.searchParams.get('limit') || '300', 10, 2000); + + if (!symbol) { + sendJson(res, 400, { ok: false, error: 'missing_symbol' }, cfg.corsOrigin); + return; + } + + let bucketSeconds; + try { + bucketSeconds = parseTimeframeToSeconds(tf); + } catch { + sendJson(res, 400, { ok: false, error: 'invalid_tf' }, cfg.corsOrigin); + return; + } + + const sourceKey = source || ''; + const basis = basisRaw === 'mark' ? 'mark' : basisRaw === 'oracle' || !basisRaw ? 'oracle' : null; + if (!basis) { + sendJson(res, 400, { ok: false, error: 'invalid_basis' }, cfg.corsOrigin); + return; + } + + try { + // Cache-first: read precomputed candles from `drift_candles_cache`. + // Fallback: compute on-demand via `get_drift_candles()` if cache not warmed yet. + const qCache = ` + query CachedCandles($symbol: String!, $bucket: Int!, $limit: Int!, $source: String!) { + drift_candles_cache( + where: {symbol: {_eq: $symbol}, bucket_seconds: {_eq: $bucket}, source: {_eq: $source}} + order_by: {bucket: desc} + limit: $limit + ) { + bucket + open + high + low + close + oracle_open + oracle_high + oracle_low + oracle_close + ticks + } + } + `; + + const cacheData = await hasuraRequest(cfg, { admin: true }, qCache, { + symbol, + bucket: bucketSeconds, + limit, + source: sourceKey, + }); + + let rows = cacheData?.drift_candles_cache || []; + + if (!rows.length) { + const fn = cfg.candlesFunction; + const qFn = ` + query Candles($symbol: String!, $bucket: Int!, $limit: Int!, $source: String) { + ${fn}(args: { p_symbol: $symbol, p_bucket_seconds: $bucket, p_limit: $limit, p_source: $source }) { + bucket + open + high + low + close + oracle_open + oracle_high + oracle_low + oracle_close + ticks + } + } + `; + const data = await hasuraRequest(cfg, { admin: true }, qFn, { + symbol, + bucket: bucketSeconds, + limit, + source: source || null, + }); + rows = data?.[fn] || []; + } + + const nowSec = Math.floor(Date.now() / 1000); + + let candles = rows + .slice() + .reverse() + .map((r) => { + const time = Math.floor(Date.parse(r.bucket) / 1000); + const oracleClose = r.oracle_close == null ? null : Number(r.oracle_close); + const oracleOpen = r.oracle_open == null ? oracleClose : Number(r.oracle_open); + const oracleHigh = r.oracle_high == null ? oracleClose : Number(r.oracle_high); + const oracleLow = r.oracle_low == null ? oracleClose : Number(r.oracle_low); + + const markOpen = Number(r.open); + const markHigh = Number(r.high); + const markLow = Number(r.low); + const markClose = Number(r.close); + + const open = basis === 'oracle' ? oracleOpen : markOpen; + const high = basis === 'oracle' ? oracleHigh : markHigh; + const low = basis === 'oracle' ? oracleLow : markLow; + const close = basis === 'oracle' ? oracleClose : markClose; + + // Always expose oracle close (even if basis=mark). + const oracle = oracleClose; + const volume = Number(r.ticks || 0); + return { time, open, high, low, close, volume, oracle }; + }) + .filter((c) => Number.isFinite(c.time) && Number.isFinite(c.open) && Number.isFinite(c.close)); + + // Make candles continuous in time: if no tick happened in a bucket, emit a flat candle using last close. + // This keeps the chart stable for 1s/3s/... views and makes timeframe switching instant (cache + no gaps). + candles = fillForwardCandles(candles, { bucketSeconds, limit, nowSec }); + + // Flow = share of time spent moving up/down/flat inside each bucket. + // Used by the UI to render stacked volume bars describing microstructure. + const windowSeconds = bucketSeconds * candles.length; + const canComputeFlow = candles.length > 0 && windowSeconds > 0 && windowSeconds <= 86_400; // cap at 24h + const rowsPerCandle = Math.min(60, Math.max(12, Math.floor(bucketSeconds))); + const flowPointBucketSeconds = pickFlowPointBucketSeconds(bucketSeconds, rowsPerCandle); + + if (canComputeFlow) { + const firstStart = candles[0].time; + const lastStart = candles[candles.length - 1].time; + const fromIso = new Date(firstStart * 1000).toISOString(); + const toIso = new Date((lastStart + bucketSeconds) * 1000).toISOString(); + + try { + // Prefer flow computed from cached candles (fast, no raw tick scan). + // If cache is missing, fall back to a simple delta-based approximation. + const maxPoints = Math.min(86_400, Math.max(1_000, candles.length * rowsPerCandle * 2)); + + const q1s = ` + query FlowPts($symbol: String!, $source: String!, $bucketSeconds: Int!, $from: timestamptz!, $to: timestamptz!, $limit: Int!) { + drift_candles_cache( + where: { + symbol: {_eq: $symbol} + source: {_eq: $source} + bucket_seconds: {_eq: $bucketSeconds} + bucket: {_gte: $from, _lt: $to} + } + order_by: {bucket: asc} + limit: $limit + ) { + bucket + close + oracle_close + } + } + `; + + const pData = await hasuraRequest(cfg, { admin: true }, q1s, { + symbol, + source: sourceKey, + bucketSeconds: flowPointBucketSeconds, + from: fromIso, + to: toIso, + limit: maxPoints, + }); + + const ptsRows = pData?.drift_candles_cache || []; + const visibleStarts = new Set(candles.map((c) => c.time)); + const pointsByCandle = new Map(); + + for (const r of ptsRows) { + const t = tsToUnixSeconds(r.bucket); + if (t == null) continue; + const p = + basis === 'oracle' + ? parseNumeric(r.oracle_close) ?? parseNumeric(r.close) + : parseNumeric(r.close) ?? parseNumeric(r.oracle_close); + if (p == null) continue; + const idx = Math.floor((t - firstStart) / bucketSeconds); + const start = firstStart + idx * bucketSeconds; + if (!visibleStarts.has(start)) continue; + const list = pointsByCandle.get(start) || []; + list.push({ t, p }); + pointsByCandle.set(start, list); + } + + const lastCandleTime = candles[candles.length - 1]?.time ?? null; + for (const c of candles) { + const pts = pointsByCandle.get(c.time) || []; + const isCurrent = lastCandleTime != null && c.time === lastCandleTime; + c.flow = computeCandleFlowFromTicks({ candle: c, bucketSeconds, points: pts, nowSec, isCurrent }); + const slices = computeCandleFlowSlicesFromTicks({ + candle: c, + bucketSeconds, + points: pts, + rows: rowsPerCandle, + nowSec, + isCurrent, + }); + c.flowRows = slices.dirs; + c.flowMoves = slices.moves; + } + } catch { + for (const c of candles) { + const fallback = flowFromDelta(c.close - c.open); + c.flow = fallback; + const dir = fallback.up ? 1 : fallback.down ? -1 : 0; + c.flowRows = new Array(rowsPerCandle).fill(dir); + c.flowMoves = new Array(rowsPerCandle).fill(0); + } + } + } else { + for (const c of candles) { + const fallback = flowFromDelta(c.close - c.open); + c.flow = fallback; + const dir = fallback.up ? 1 : fallback.down ? -1 : 0; + c.flowRows = new Array(rowsPerCandle).fill(dir); + c.flowMoves = new Array(rowsPerCandle).fill(0); + } + } + + const times = candles.map((c) => c.time); + const closes = candles.map((c) => c.close); + const oracleSeries = toSeries(times, candles.map((c) => (c.oracle == null ? null : c.oracle))); + + const sma20 = toSeries(times, sma(closes, 20)); + const ema20 = toSeries(times, ema(closes, 20)); + const bb = bollingerBands(closes, 20, 2); + const bbUpper = toSeries(times, bb.upper); + const bbLower = toSeries(times, bb.lower); + const bbMid = toSeries(times, bb.mid); + const rsi14 = toSeries(times, rsi(closes, 14)); + const macdOut = macd(closes, 12, 26, 9); + const macdLine = toSeries(times, macdOut.macd); + const macdSignal = toSeries(times, macdOut.signal); + + sendJson( + res, + 200, + { + ok: true, + version: cfg.appVersion, + buildTimestamp: cfg.buildTimestamp, + ticksTable: cfg.ticksTable, + candlesFunction: cfg.candlesFunction, + symbol, + source: source || null, + basis, + tf, + bucketSeconds, + candles, + indicators: { + oracle: oracleSeries, + sma20, + ema20, + bb20: { upper: bbUpper, lower: bbLower, mid: bbMid }, + rsi14, + macd: { macd: macdLine, signal: macdSignal }, + }, + }, + cfg.corsOrigin + ); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + if (req.method === 'POST' && pathname === '/v1/ingest/tick') { + const auth = await requireValidToken(cfg, req, 'write'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }, cfg.corsOrigin); + return; + } + + let body; + try { + body = await readBodyJson(req, { maxBytes: 1024 * 1024 }); + } catch (err) { + const msg = String(err?.message || err); + if (msg === 'payload_too_large') { + sendJson(res, 413, { ok: false, error: 'payload_too_large' }, cfg.corsOrigin); + return; + } + sendJson(res, 400, { ok: false, error: 'invalid_json' }, cfg.corsOrigin); + return; + } + + let tick; + try { + tick = normalizeTick(body, auth.token); + } catch (err) { + sendJson(res, 400, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + return; + } + + try { + const id = await insertTick(cfg, tick); + sendJson(res, 200, { ok: true, id }, cfg.corsOrigin); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + if (req.method === 'GET' && pathname === '/v1/ticks') { + const auth = await requireValidToken(cfg, req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }, cfg.corsOrigin); + return; + } + + const symbol = url.searchParams.get('symbol') || ''; + const source = url.searchParams.get('source'); + const limitRaw = url.searchParams.get('limit') || '1000'; + const limit = Math.min(5000, Math.max(1, Number.parseInt(limitRaw, 10) || 1000)); + const from = url.searchParams.get('from'); + const to = url.searchParams.get('to'); + + if (!symbol.trim()) { + sendJson(res, 400, { ok: false, error: 'missing_symbol' }, cfg.corsOrigin); + return; + } + + const where = { symbol: { _eq: symbol.trim() } }; + if (source && source.trim()) where.source = { _eq: source.trim() }; + if (from || to) { + where.ts = {}; + if (from) where.ts._gte = from; + if (to) where.ts._lte = to; + } + + const table = cfg.ticksTable; + const query = ` + query Ticks($where: ${table}_bool_exp!, $limit: Int!) { + ${table}(where: $where, order_by: {ts: desc}, limit: $limit) { + ts + market_index + symbol + oracle_price + mark_price + oracle_slot + source + } + } + `; + + try { + const data = await hasuraRequest(cfg, { admin: true }, query, { where, limit }); + const ticks = (data?.[table] || []).slice().reverse(); + sendJson(res, 200, { ok: true, version: cfg.appVersion, ticksTable: cfg.ticksTable, ticks }, cfg.corsOrigin); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + // Contract monitoring + cost compute (read scope). + // This endpoint is meant for "live UI" polling/subscription aggregation on the backend. + // It is intentionally resilient to varying bot_events payload schemas. + if (req.method === 'GET' && pathname.startsWith('/v1/contracts/') && pathname.endsWith('/monitor')) { + const auth = await requireValidToken(cfg, req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }, cfg.corsOrigin); + return; + } + + const parts = pathname.split('/').filter(Boolean); + const contractId = parts[2]; + if (!isUuid(contractId)) { + sendJson(res, 400, { ok: false, error: 'invalid_contract_id' }, cfg.corsOrigin); + return; + } + + const limit = clampInt(url.searchParams.get('eventsLimit') || '2000', 10, 50_000); + const wantSeries = (url.searchParams.get('series') || '').trim() === '1'; + const seriesMax = clampInt(url.searchParams.get('seriesMax') || '600', 50, 10_000); + + const qContract = ` + query ContractByPk($id: uuid!) { + bot_contracts_by_pk(id: $id) { + id + decision_id + bot_id + model_version + market_name + subaccount_id + status + desired + entry + manage + exit + gates + created_at + updated_at + last_heartbeat_at + ended_at + reason + } + } + `; + const qEvents = ` + query ContractEvents($id: uuid!, $limit: Int!) { + bot_events(where: {contract_id: {_eq: $id}}, order_by: {ts: asc}, limit: $limit) { + ts + contract_id + decision_id + bot_id + market_name + event_type + severity + payload + } + } + `; + + try { + const data = await hasuraRequest(cfg, { admin: true }, qContract, { id: contractId }); + const contract = data?.bot_contracts_by_pk; + if (!contract?.id) { + sendJson(res, 404, { ok: false, error: 'contract_not_found' }, cfg.corsOrigin); + return; + } + + const evData = await hasuraRequest(cfg, { admin: true }, qEvents, { id: contractId, limit }); + const events = evData?.bot_events || []; + const costs = sumCostsFromEvents(events); + const series = wantSeries ? buildCostSeriesFromEvents(events, { maxPoints: seriesMax }) : null; + + const sizeUsd = inferContractSizeUsd(contract); + const side = inferContractSide(contract); + + let closeEst = null; + if (contract.market_name && sizeUsd != null) { + const qSlip = ` + query Slippage($market: String!) { + dlob_slippage_latest_v2(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + dlob_slippage_latest(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + } + `; + const slipData = await hasuraRequest(cfg, { admin: true }, qSlip, { market: contract.market_name }); + const rowsV2 = slipData?.dlob_slippage_latest_v2 || []; + const rowsV1 = slipData?.dlob_slippage_latest || []; + const rows = rowsV2.length ? rowsV2 : rowsV1; + const pickNearest = (wantedSide) => { + const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide); + if (!candidates.length) return null; + let best = null; + let bestD = Infinity; + for (const r of candidates) { + const s = parseNumeric(r.size_usd); + if (s == null) continue; + const d = Math.abs(s - sizeUsd); + if (d < bestD) { + bestD = d; + best = r; + } + } + return best; + }; + + const buy = pickNearest('buy'); + const sell = pickNearest('sell'); + closeEst = { + requestedSizeUsd: sizeUsd, + entrySide: side, + suggestedCloseSide: side === 'long' ? 'sell' : side === 'short' ? 'buy' : null, + buy, + sell, + }; + } + + sendJson( + res, + 200, + { + ok: true, + contract, + eventsCount: events.length, + costs, + series, + closeEstimate: closeEst, + }, + cfg.corsOrigin + ); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + // Estimate costs for a new contract (read scope). + // Uses DLOB slippage table and simple fee/tx estimates (inputs). + if (req.method === 'POST' && pathname === '/v1/contracts/costs/estimate') { + const auth = await requireValidToken(cfg, req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }, cfg.corsOrigin); + return; + } + + let body; + try { + body = await readBodyJson(req, { maxBytes: 256 * 1024 }); + } catch (err) { + const msg = String(err?.message || err); + if (msg === 'payload_too_large') { + sendJson(res, 413, { ok: false, error: 'payload_too_large' }, cfg.corsOrigin); + return; + } + sendJson(res, 400, { ok: false, error: 'invalid_json' }, cfg.corsOrigin); + return; + } + + const market = String(body?.market_name || body?.market || '').trim(); + if (!market) { + sendJson(res, 400, { ok: false, error: 'missing_market' }, cfg.corsOrigin); + return; + } + + const notionalUsd = parseNumeric(body?.notional_usd ?? body?.notionalUsd ?? body?.size_usd ?? body?.sizeUsd); + if (notionalUsd == null || !(notionalUsd > 0)) { + sendJson(res, 400, { ok: false, error: 'invalid_notional_usd' }, cfg.corsOrigin); + return; + } + + const sideRaw = String(body?.side || 'long').trim().toLowerCase(); + const entrySide = sideRaw === 'short' || sideRaw === 'sell' ? 'short' : 'long'; + const defaultTakerBps = parseNumeric(process.env.FEE_TAKER_BPS_DEFAULT) ?? 5; + const defaultMakerBps = parseNumeric(process.env.FEE_MAKER_BPS_DEFAULT) ?? 0; + const takerBps = clampNumber(parseNumeric(body?.fee_taker_bps) ?? defaultTakerBps, 0, 1000, defaultTakerBps); + const makerBps = clampNumber(parseNumeric(body?.fee_maker_bps) ?? defaultMakerBps, -1000, 1000, defaultMakerBps); + const orderType = String(body?.order_type || body?.orderType || 'market').trim().toLowerCase(); + const isMarket = orderType === 'market' || orderType === 'taker'; + const feeBps = isMarket ? takerBps : makerBps; + + let txFeeUsdEst = parseNumeric(body?.tx_fee_usd_est); + if (txFeeUsdEst == null) { + const baseLamports = parseNumeric(process.env.TX_BASE_FEE_LAMPORTS_EST) ?? 5000; + const sigs = parseNumeric(process.env.TX_SIGNATURES_EST) ?? 1; + const priorityLamports = parseNumeric(process.env.TX_PRIORITY_FEE_LAMPORTS_EST) ?? 0; + const lamports = Math.max(0, baseLamports) * Math.max(1, sigs) + Math.max(0, priorityLamports); + const sol = lamports / 1_000_000_000; + const solUsd = await readSolPriceUsd(cfg); + txFeeUsdEst = solUsd != null ? sol * solUsd : 0; + } + txFeeUsdEst = clampNumber(txFeeUsdEst, 0, 100, 0); + const defaultReprices = parseNumeric(process.env.EXPECTED_REPRICES_PER_ENTRY_DEFAULT) ?? 0; + const expectedReprices = clampInt(body?.expected_reprices_per_entry ?? body?.expectedReprices ?? String(defaultReprices), 0, 500); + const modifyTxCount = clampInt(body?.modify_tx_count ?? body?.modifyTxCount ?? '2', 0, 10); + + try { + const qSlip = ` + query Slippage($market: String!) { + dlob_slippage_latest_v2(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + dlob_slippage_latest(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + } + `; + const slipData = await hasuraRequest(cfg, { admin: true }, qSlip, { market }); + const rowsV2 = slipData?.dlob_slippage_latest_v2 || []; + const rowsV1 = slipData?.dlob_slippage_latest || []; + const rows = rowsV2.length ? rowsV2 : rowsV1; + const wantedSide = entrySide === 'long' ? 'buy' : 'sell'; + const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide); + let best = null; + let bestD = Infinity; + for (const r of candidates) { + const s = parseNumeric(r.size_usd); + if (s == null) continue; + const d = Math.abs(s - notionalUsd); + if (d < bestD) { + bestD = d; + best = r; + } + } + + const impactBps = parseNumeric(best?.impact_bps) ?? 0; + const slippageUsd = (notionalUsd * impactBps) / 10_000; + const tradeFeeUsd = (notionalUsd * feeBps) / 10_000; + const modifyCostUsd = expectedReprices * modifyTxCount * txFeeUsdEst; + const totalUsd = tradeFeeUsd + slippageUsd + txFeeUsdEst + modifyCostUsd; + const totalBps = (totalUsd / notionalUsd) * 10_000; + + sendJson( + res, + 200, + { + ok: true, + input: { + market_name: market, + notional_usd: notionalUsd, + side: entrySide, + order_type: orderType, + fee_bps: feeBps, + tx_fee_usd_est: txFeeUsdEst, + expected_reprices_per_entry: expectedReprices, + }, + dlob: best + ? { + size_usd: best.size_usd, + side: best.side, + mid_price: best.mid_price, + vwap_price: best.vwap_price, + impact_bps: best.impact_bps, + fill_pct: best.fill_pct, + updated_at: best.updated_at, + } + : null, + breakdown: { + trade_fee_usd: tradeFeeUsd, + slippage_usd: slippageUsd, + tx_fee_usd: txFeeUsdEst, + expected_modify_usd: modifyCostUsd, + total_usd: totalUsd, + total_bps: totalBps, + breakeven_bps: totalBps, + }, + }, + cfg.corsOrigin + ); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + if (req.method === 'POST' && pathname === '/v1/admin/tokens') { + if (!isAdmin(cfg, req)) { + sendJson(res, 401, { ok: false, error: 'admin_unauthorized' }, cfg.corsOrigin); + return; + } + + let body; + try { + body = await readBodyJson(req, { maxBytes: 1024 * 1024 }); + } catch { + sendJson(res, 400, { ok: false, error: 'invalid_json' }, cfg.corsOrigin); + return; + } + + const name = (body?.name || 'algo')?.toString?.().trim(); + if (!name) { + sendJson(res, 400, { ok: false, error: 'missing_name' }, cfg.corsOrigin); + return; + } + const scopes = normalizeScopes(body?.scopes); + const resolvedScopes = scopes.length ? scopes : ['write']; + + try { + const { token, row } = await createApiToken(cfg, name, resolvedScopes, body?.meta); + sendJson(res, 200, { ok: true, token, id: row.id, name: row.name, created_at: row.created_at }, cfg.corsOrigin); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + if (req.method === 'POST' && pathname === '/v1/admin/tokens/revoke') { + if (!isAdmin(cfg, req)) { + sendJson(res, 401, { ok: false, error: 'admin_unauthorized' }, cfg.corsOrigin); + return; + } + + let body; + try { + body = await readBodyJson(req, { maxBytes: 1024 * 1024 }); + } catch { + sendJson(res, 400, { ok: false, error: 'invalid_json' }, cfg.corsOrigin); + return; + } + + const id = (body?.id || '')?.toString?.().trim(); + if (!id) { + sendJson(res, 400, { ok: false, error: 'missing_id' }, cfg.corsOrigin); + return; + } + + try { + const revokedId = await revokeApiToken(cfg, id); + sendJson(res, 200, { ok: true, id: revokedId }, cfg.corsOrigin); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }, cfg.corsOrigin); + } + return; + } + + sendJson(res, 404, { ok: false, error: 'not_found' }, cfg.corsOrigin); +} + +function main() { + const cfg = resolveConfig(); + const server = http.createServer((req, res) => void handler(cfg, req, res)); + + server.listen(cfg.port, () => { + console.log( + JSON.stringify( + { + service: 'trade-api', + version: cfg.appVersion, + buildTimestamp: cfg.buildTimestamp, + port: cfg.port, + hasuraUrl: cfg.hasuraUrl, + ticksTable: cfg.ticksTable, + candlesFunction: cfg.candlesFunction, + hasuraAdminSecret: cfg.hasuraAdminSecret ? '***' : undefined, + apiAdminSecret: cfg.apiAdminSecret ? '***' : undefined, + }, + null, + 2 + ) + ); + }); +} + +main(); diff --git a/environments/sol/trade-r001-canary/assets/api/wrapper.mjs b/environments/sol/trade-r001-canary/assets/api/wrapper.mjs new file mode 100644 index 0000000..197ead9 --- /dev/null +++ b/environments/sol/trade-r001-canary/assets/api/wrapper.mjs @@ -0,0 +1,783 @@ +import crypto from 'node:crypto'; +import http from 'node:http'; +import { spawn } from 'node:child_process'; + +const WRAPPER_PORT = Number.parseInt(String(process.env.PORT || '8787'), 10); +const UPSTREAM_PORT = Number.parseInt(String(process.env.UPSTREAM_PORT || '8788'), 10); +const UPSTREAM_HOST = String(process.env.UPSTREAM_HOST || '127.0.0.1'); +const UPSTREAM_ENTRY = String(process.env.UPSTREAM_ENTRY || '/app/services/api/server.mjs'); + +const HASURA_URL = String(process.env.HASURA_GRAPHQL_URL || 'http://hasura:8080/v1/graphql'); +const HASURA_ADMIN_SECRET = String(process.env.HASURA_ADMIN_SECRET || ''); +const CORS_ORIGIN = String(process.env.CORS_ORIGIN || '*'); + +if (!Number.isInteger(WRAPPER_PORT) || WRAPPER_PORT <= 0) throw new Error('Invalid PORT'); +if (!Number.isInteger(UPSTREAM_PORT) || UPSTREAM_PORT <= 0) throw new Error('Invalid UPSTREAM_PORT'); + +function getIsoNow() { + return new Date().toISOString(); +} + +function sha256Hex(text) { + return crypto.createHash('sha256').update(text, 'utf8').digest('hex'); +} + +function getHeader(req, name) { + const v = req.headers[String(name).toLowerCase()]; + return Array.isArray(v) ? v[0] : v; +} + +function readBearerToken(req) { + const auth = getHeader(req, 'authorization'); + if (auth && typeof auth === 'string') { + const m = auth.match(/^Bearer\s+(.+)$/i); + if (m && m[1]) return m[1].trim(); + } + const apiKey = getHeader(req, 'x-api-key'); + if (apiKey && typeof apiKey === 'string' && apiKey.trim()) return apiKey.trim(); + return undefined; +} + +function withCors(res) { + res.setHeader('access-control-allow-origin', CORS_ORIGIN); + res.setHeader('access-control-allow-methods', 'GET,POST,OPTIONS'); + res.setHeader( + 'access-control-allow-headers', + 'content-type, authorization, x-api-key, x-admin-secret' + ); +} + +function sendJson(res, status, body) { + withCors(res); + res.statusCode = status; + res.setHeader('content-type', 'application/json; charset=utf-8'); + res.end(JSON.stringify(body)); +} + +async function readBodyJson(req, { maxBytes }) { + const chunks = []; + let total = 0; + for await (const chunk of req) { + total += chunk.length; + if (total > maxBytes) throw new Error('payload_too_large'); + chunks.push(chunk); + } + const text = Buffer.concat(chunks).toString('utf8'); + if (!text.trim()) return {}; + try { + return JSON.parse(text); + } catch { + throw new Error('invalid_json'); + } +} + +function parseNumeric(value) { + if (value == null) return null; + if (typeof value === 'number') return Number.isFinite(value) ? value : null; + if (typeof value === 'string') { + const s = value.trim(); + if (!s) return null; + const n = Number(s); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function clampInt(value, min, max) { + const n = Number.parseInt(String(value), 10); + if (!Number.isFinite(n) || !Number.isInteger(n)) return min; + return Math.min(max, Math.max(min, n)); +} + +function clampNumber(value, min, max, fallback) { + const n = typeof value === 'number' ? value : Number(value); + if (!Number.isFinite(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + +function isUuid(value) { + const s = String(value || '').trim(); + return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(s); +} + +async function hasuraRequest(query, variables) { + if (!HASURA_ADMIN_SECRET) throw new Error('Missing HASURA_ADMIN_SECRET'); + const res = await fetch(HASURA_URL, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-hasura-admin-secret': HASURA_ADMIN_SECRET, + }, + body: JSON.stringify({ query, variables }), + }); + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + const json = text ? JSON.parse(text) : {}; + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +function normalizeScopes(value) { + if (!value) return []; + if (Array.isArray(value)) return value.map((v) => String(v)).filter(Boolean); + if (typeof value === 'string') { + return value + .split(',') + .map((s) => s.trim()) + .filter(Boolean); + } + return []; +} + +async function requireValidToken(req, requiredScope) { + const token = readBearerToken(req); + if (!token) return { ok: false, status: 401, error: 'missing_token' }; + + const hash = sha256Hex(token); + const query = ` + query ValidToken($hash: String!) { + api_tokens(where: {token_hash: {_eq: $hash}, revoked_at: {_is_null: true}}, limit: 1) { + id + name + scopes + } + } + `; + + let data; + try { + data = await hasuraRequest(query, { hash }); + } catch (err) { + return { ok: false, status: 500, error: String(err?.message || err) }; + } + + const row = data?.api_tokens?.[0]; + if (!row?.id) return { ok: false, status: 401, error: 'invalid_or_revoked_token' }; + const scopes = normalizeScopes(row.scopes); + if (requiredScope && !scopes.includes(requiredScope)) { + return { ok: false, status: 403, error: 'missing_scope' }; + } + + // best-effort touch + const touch = ` + mutation TouchToken($id: uuid!, $ts: timestamptz!) { + update_api_tokens_by_pk(pk_columns: {id: $id}, _set: {last_used_at: $ts}) { id } + } + `; + hasuraRequest(touch, { id: row.id, ts: getIsoNow() }).catch(() => {}); + + return { ok: true, token: { id: row.id, name: row.name } }; +} + +async function readSolPriceUsd() { + try { + const q = ` + query SolPrice { + dlob_stats_latest(where: {market_name: {_eq: "SOL-PERP"}}, limit: 1) { + mid_price + mark_price + oracle_price + } + } + `; + const data = await hasuraRequest(q, {}); + const row = data?.dlob_stats_latest?.[0]; + const p = parseNumeric(row?.mid_price) ?? parseNumeric(row?.mark_price) ?? parseNumeric(row?.oracle_price); + if (p != null && p > 0) return p; + } catch { + // ignore + } + return null; +} + +function getByPath(obj, pathStr) { + if (!obj || typeof obj !== 'object') return undefined; + const parts = String(pathStr || '').split('.').filter(Boolean); + let cur = obj; + for (const p of parts) { + if (!cur || typeof cur !== 'object') return undefined; + cur = cur[p]; + } + return cur; +} + +function readNumberFromPayload(payload, paths) { + for (const p of paths) { + const v = getByPath(payload, p); + const n = parseNumeric(v); + if (n != null) return n; + } + return null; +} + +function readTextFromPayload(payload, paths) { + for (const p of paths) { + const v = getByPath(payload, p); + if (typeof v === 'string' && v.trim()) return v.trim(); + } + return null; +} + +function inferContractSizeUsd(contract) { + return ( + readNumberFromPayload(contract, [ + 'desired.size_usd', + 'desired.sizeUsd', + 'desired.notional_usd', + 'desired.notionalUsd', + 'entry.size_usd', + 'entry.sizeUsd', + 'entry.notional_usd', + 'entry.notionalUsd', + 'entry.order_intent.size_usd', + 'entry.order_intent.sizeUsd', + 'desired.order_intent.size_usd', + 'desired.order_intent.sizeUsd', + ]) || null + ); +} + +function inferContractSide(contract) { + const raw = + readTextFromPayload(contract, [ + 'desired.side', + 'entry.side', + 'entry.order_intent.side', + 'desired.order_intent.side', + 'desired.direction', + 'entry.direction', + ]) || ''; + const v = raw.toLowerCase(); + if (v === 'long' || v === 'buy') return 'long'; + if (v === 'short' || v === 'sell') return 'short'; + return null; +} + +function sumCostsFromEvents(events) { + const totals = { + tradeFeeUsd: 0, + txFeeUsd: 0, + slippageUsd: 0, + fundingUsd: 0, + realizedPnlUsd: 0, + txCount: 0, + fillCount: 0, + cancelCount: 0, + modifyCount: 0, + errorCount: 0, + }; + + for (const ev of events || []) { + const t = String(ev?.event_type || '').toLowerCase(); + const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {}; + + const tradeFeeUsd = + readNumberFromPayload(payload, [ + 'realized_fee_usd', + 'trade_fee_usd', + 'fee_usd', + 'fees.trade_fee_usd', + 'fees.usd', + ]) || 0; + const txFeeUsd = + readNumberFromPayload(payload, [ + 'realized_tx_usd', + 'tx_fee_usd', + 'network_fee_usd', + 'fees.tx_fee_usd', + 'fees.network_usd', + ]) || 0; + const slippageUsd = + readNumberFromPayload(payload, [ + 'slippage_usd', + 'realized_slippage_usd', + 'execution_usd', + 'realized_execution_usd', + ]) || 0; + const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0; + const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0; + const txCount = readNumberFromPayload(payload, ['tx_count', 'txCount']) || 0; + + totals.tradeFeeUsd += tradeFeeUsd; + totals.txFeeUsd += txFeeUsd; + totals.slippageUsd += slippageUsd; + totals.fundingUsd += fundingUsd; + totals.realizedPnlUsd += pnlUsd; + totals.txCount += txCount; + + if (t.includes('fill')) totals.fillCount += 1; + if (t.includes('cancel')) totals.cancelCount += 1; + if (t.includes('modify') || t.includes('reprice')) totals.modifyCount += 1; + if (t.includes('error') || String(ev?.severity || '').toLowerCase() === 'error') totals.errorCount += 1; + } + + const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd; + + return { + ...totals, + totalCostsUsd, + netPnlUsd: totals.realizedPnlUsd - totalCostsUsd, + }; +} + +function buildCostSeriesFromEvents(events, { maxPoints }) { + const points = []; + const totals = { + tradeFeeUsd: 0, + txFeeUsd: 0, + slippageUsd: 0, + fundingUsd: 0, + realizedPnlUsd: 0, + }; + + for (const ev of events || []) { + const ts = ev?.ts; + if (!ts) continue; + const payload = ev?.payload && typeof ev.payload === 'object' ? ev.payload : {}; + + const tradeFeeUsd = + readNumberFromPayload(payload, [ + 'realized_fee_usd', + 'trade_fee_usd', + 'fee_usd', + 'fees.trade_fee_usd', + 'fees.usd', + ]) || 0; + const txFeeUsd = + readNumberFromPayload(payload, [ + 'realized_tx_usd', + 'tx_fee_usd', + 'network_fee_usd', + 'fees.tx_fee_usd', + 'fees.network_usd', + ]) || 0; + const slippageUsd = + readNumberFromPayload(payload, [ + 'slippage_usd', + 'realized_slippage_usd', + 'execution_usd', + 'realized_execution_usd', + ]) || 0; + const fundingUsd = readNumberFromPayload(payload, ['funding_usd', 'realized_funding_usd']) || 0; + const pnlUsd = readNumberFromPayload(payload, ['realized_pnl_usd', 'pnl_usd']) || 0; + + totals.tradeFeeUsd += tradeFeeUsd; + totals.txFeeUsd += txFeeUsd; + totals.slippageUsd += slippageUsd; + totals.fundingUsd += fundingUsd; + totals.realizedPnlUsd += pnlUsd; + + const totalCostsUsd = totals.tradeFeeUsd + totals.txFeeUsd + totals.slippageUsd + totals.fundingUsd; + points.push({ + ts, + tradeFeeUsd: totals.tradeFeeUsd, + txFeeUsd: totals.txFeeUsd, + slippageUsd: totals.slippageUsd, + fundingUsd: totals.fundingUsd, + totalCostsUsd, + realizedPnlUsd: totals.realizedPnlUsd, + netPnlUsd: totals.realizedPnlUsd - totalCostsUsd, + }); + } + + const cap = Math.max(50, Math.min(10_000, Number(maxPoints) || 600)); + if (points.length <= cap) return points; + + const step = Math.ceil(points.length / cap); + const sampled = []; + for (let i = 0; i < points.length; i += step) sampled.push(points[i]); + const last = points[points.length - 1]; + if (sampled[sampled.length - 1] !== last) sampled.push(last); + return sampled; +} + +function proxyToUpstream(req, res) { + const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`); + + const headers = { ...req.headers }; + delete headers.host; + delete headers.connection; + + const opts = { + host: UPSTREAM_HOST, + port: UPSTREAM_PORT, + method: req.method, + path: url.pathname + url.search, + headers, + }; + + const upstreamReq = http.request(opts, (upstreamRes) => { + withCors(res); + res.statusCode = upstreamRes.statusCode || 502; + + for (const [k, v] of Object.entries(upstreamRes.headers || {})) { + if (!k) continue; + if (k.toLowerCase() === 'content-length') continue; + if (k.toLowerCase().startsWith('access-control-')) continue; + if (v != null) res.setHeader(k, v); + } + + upstreamRes.pipe(res); + }); + + upstreamReq.on('error', (err) => { + sendJson(res, 502, { ok: false, error: String(err?.message || err) }); + }); + + req.pipe(upstreamReq); +} + +async function handleMonitor(req, res, url) { + const auth = await requireValidToken(req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }); + return; + } + + const pathname = url.pathname; + const parts = pathname.split('/').filter(Boolean); + const contractId = parts[2]; + if (!isUuid(contractId)) { + sendJson(res, 400, { ok: false, error: 'invalid_contract_id' }); + return; + } + + const limit = clampInt(url.searchParams.get('eventsLimit') || '2000', 10, 50_000); + const wantSeries = (url.searchParams.get('series') || '').trim() === '1'; + const seriesMax = clampInt(url.searchParams.get('seriesMax') || '600', 50, 10_000); + + const qContract = ` + query ContractByPk($id: uuid!) { + bot_contracts_by_pk(id: $id) { + id + decision_id + bot_id + model_version + market_name + subaccount_id + status + desired + entry + manage + exit + gates + created_at + updated_at + last_heartbeat_at + ended_at + reason + } + } + `; + const qEvents = ` + query ContractEvents($id: uuid!, $limit: Int!) { + bot_events(where: {contract_id: {_eq: $id}}, order_by: {ts: asc}, limit: $limit) { + ts + contract_id + decision_id + bot_id + market_name + event_type + severity + payload + } + } + `; + + try { + const data = await hasuraRequest(qContract, { id: contractId }); + const contract = data?.bot_contracts_by_pk; + if (!contract?.id) { + sendJson(res, 404, { ok: false, error: 'contract_not_found' }); + return; + } + + const evData = await hasuraRequest(qEvents, { id: contractId, limit }); + const events = evData?.bot_events || []; + const costs = sumCostsFromEvents(events); + const series = wantSeries ? buildCostSeriesFromEvents(events, { maxPoints: seriesMax }) : null; + + const sizeUsd = inferContractSizeUsd(contract); + const side = inferContractSide(contract); + + let closeEst = null; + if (contract.market_name && sizeUsd != null) { + const qSlip = ` + query Slippage($market: String!) { + dlob_slippage_latest(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + } + `; + const slipData = await hasuraRequest(qSlip, { market: contract.market_name }); + const rows = slipData?.dlob_slippage_latest || []; + + const pickNearest = (wantedSide) => { + const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide); + if (!candidates.length) return null; + let best = null; + let bestD = Infinity; + for (const r of candidates) { + const s = parseNumeric(r.size_usd); + if (s == null) continue; + const d = Math.abs(s - sizeUsd); + if (d < bestD) { + bestD = d; + best = r; + } + } + return best; + }; + + closeEst = { + requestedSizeUsd: sizeUsd, + entrySide: side, + suggestedCloseSide: side === 'long' ? 'sell' : side === 'short' ? 'buy' : null, + buy: pickNearest('buy'), + sell: pickNearest('sell'), + }; + } + + sendJson(res, 200, { + ok: true, + contract, + eventsCount: events.length, + costs, + series, + closeEstimate: closeEst, + }); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }); + } +} + +async function handleEstimate(req, res) { + const auth = await requireValidToken(req, 'read'); + if (!auth.ok) { + sendJson(res, auth.status, { ok: false, error: auth.error }); + return; + } + + let body; + try { + body = await readBodyJson(req, { maxBytes: 256 * 1024 }); + } catch (err) { + const msg = String(err?.message || err); + if (msg === 'payload_too_large') { + sendJson(res, 413, { ok: false, error: 'payload_too_large' }); + return; + } + sendJson(res, 400, { ok: false, error: 'invalid_json' }); + return; + } + + const market = String(body?.market_name || body?.market || '').trim(); + if (!market) { + sendJson(res, 400, { ok: false, error: 'missing_market' }); + return; + } + + const notionalUsd = parseNumeric(body?.notional_usd ?? body?.notionalUsd ?? body?.size_usd ?? body?.sizeUsd); + if (!(notionalUsd != null && notionalUsd > 0)) { + sendJson(res, 400, { ok: false, error: 'invalid_notional_usd' }); + return; + } + + const entrySideRaw = String(body?.side || 'long').trim().toLowerCase(); + const entrySide = entrySideRaw === 'short' ? 'short' : 'long'; + + const orderType = String(body?.order_type || body?.orderType || 'market').trim().toLowerCase(); + const isMarket = orderType === 'market' || orderType === 'taker'; + + const takerBps = clampNumber(parseNumeric(body?.fee_taker_bps) ?? 5, 0, 1000, 5); + const makerBps = clampNumber(parseNumeric(body?.fee_maker_bps) ?? 0, -1000, 1000, 0); + const feeBps = isMarket ? takerBps : makerBps; + + let txFeeUsdEst = parseNumeric(body?.tx_fee_usd_est); + if (txFeeUsdEst == null) { + const baseLamports = 5000; + const sigs = 1; + const priorityLamports = 0; + const lamports = baseLamports * sigs + priorityLamports; + const sol = lamports / 1_000_000_000; + const solUsd = await readSolPriceUsd(); + txFeeUsdEst = solUsd != null ? sol * solUsd : 0; + } + txFeeUsdEst = clampNumber(txFeeUsdEst, 0, 100, 0); + + const expectedReprices = clampInt(body?.expected_reprices_per_entry ?? body?.expectedReprices ?? '0', 0, 500); + const modifyTxCount = clampInt(body?.modify_tx_count ?? body?.modifyTxCount ?? '2', 0, 10); + + try { + const qSlip = ` + query Slippage($market: String!) { + dlob_slippage_latest(where: {market_name: {_eq: $market}}) { + market_name + side + size_usd + mid_price + vwap_price + worst_price + impact_bps + fill_pct + updated_at + } + } + `; + const slipData = await hasuraRequest(qSlip, { market }); + const rows = slipData?.dlob_slippage_latest || []; + const wantedSide = entrySide === 'long' ? 'buy' : 'sell'; + const candidates = rows.filter((r) => String(r.side || '').toLowerCase() === wantedSide); + + let best = null; + let bestD = Infinity; + for (const r of candidates) { + const s = parseNumeric(r.size_usd); + if (s == null) continue; + const d = Math.abs(s - notionalUsd); + if (d < bestD) { + bestD = d; + best = r; + } + } + + const impactBps = parseNumeric(best?.impact_bps) ?? 0; + const slippageUsd = (notionalUsd * impactBps) / 10_000; + const tradeFeeUsd = (notionalUsd * feeBps) / 10_000; + const modifyCostUsd = expectedReprices * modifyTxCount * txFeeUsdEst; + const totalUsd = tradeFeeUsd + slippageUsd + txFeeUsdEst + modifyCostUsd; + const totalBps = (totalUsd / notionalUsd) * 10_000; + + sendJson(res, 200, { + ok: true, + input: { + market_name: market, + notional_usd: notionalUsd, + side: entrySide, + order_type: orderType, + fee_bps: feeBps, + tx_fee_usd_est: txFeeUsdEst, + expected_reprices_per_entry: expectedReprices, + }, + dlob: best + ? { + size_usd: best.size_usd, + side: best.side, + mid_price: best.mid_price, + vwap_price: best.vwap_price, + impact_bps: best.impact_bps, + fill_pct: best.fill_pct, + updated_at: best.updated_at, + } + : null, + breakdown: { + trade_fee_usd: tradeFeeUsd, + slippage_usd: slippageUsd, + tx_fee_usd: txFeeUsdEst, + expected_modify_usd: modifyCostUsd, + total_usd: totalUsd, + total_bps: totalBps, + breakeven_bps: totalBps, + }, + }); + } catch (err) { + sendJson(res, 500, { ok: false, error: String(err?.message || err) }); + } +} + +let upstreamChild = null; + +function startUpstream() { + const env = { ...process.env, PORT: String(UPSTREAM_PORT) }; + upstreamChild = spawn('node', [UPSTREAM_ENTRY], { env, stdio: 'inherit' }); + upstreamChild.on('exit', (code, signal) => { + console.error(`upstream exited: code=${code} signal=${signal}`); + }); +} + +function shutdown() { + if (upstreamChild && !upstreamChild.killed) upstreamChild.kill('SIGTERM'); +} + +process.on('SIGTERM', shutdown); +process.on('SIGINT', shutdown); + +startUpstream(); + +const server = http.createServer(async (req, res) => { + if (req.method === 'OPTIONS') { + withCors(res); + res.statusCode = 204; + res.end(); + return; + } + + const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`); + const pathname = url.pathname; + + if (req.method === 'GET' && pathname === '/healthz') { + // Check upstream quickly; if it's down, we fail readiness. + const opts = { host: UPSTREAM_HOST, port: UPSTREAM_PORT, path: '/healthz', method: 'GET', timeout: 800 }; + const upstreamOk = await new Promise((resolve) => { + const r = http.request(opts, (rr) => { + rr.resume(); + resolve(rr.statusCode === 200); + }); + r.on('timeout', () => { + r.destroy(); + resolve(false); + }); + r.on('error', () => resolve(false)); + r.end(); + }); + + if (!upstreamOk) { + sendJson(res, 503, { ok: false, error: 'upstream_not_ready' }); + return; + } + + sendJson(res, 200, { + ok: true, + service: 'trade-api-wrapper', + startedAt: getIsoNow(), + upstream: { host: UPSTREAM_HOST, port: UPSTREAM_PORT }, + }); + return; + } + + if (req.method === 'POST' && pathname === '/v1/contracts/costs/estimate') { + await handleEstimate(req, res); + return; + } + + if (req.method === 'GET' && pathname.startsWith('/v1/contracts/') && pathname.endsWith('/monitor')) { + await handleMonitor(req, res, url); + return; + } + + proxyToUpstream(req, res); +}); + +server.listen(WRAPPER_PORT, () => { + console.log( + JSON.stringify( + { + service: 'trade-api-wrapper', + port: WRAPPER_PORT, + upstream: { entry: UPSTREAM_ENTRY, host: UPSTREAM_HOST, port: UPSTREAM_PORT }, + hasuraUrl: HASURA_URL, + hasuraAdminSecret: HASURA_ADMIN_SECRET ? '***' : undefined, + }, + null, + 2 + ) + ); +}); diff --git a/environments/sol/trade-r001-canary/assets/hasura/hasura-bootstrap.mjs b/environments/sol/trade-r001-canary/assets/hasura/hasura-bootstrap.mjs new file mode 100644 index 0000000..2c09cf4 --- /dev/null +++ b/environments/sol/trade-r001-canary/assets/hasura/hasura-bootstrap.mjs @@ -0,0 +1,498 @@ +const HASURA_URL = process.env.HASURA_URL || 'http://hasura:8080'; +const ADMIN_SECRET = process.env.HASURA_ADMIN_SECRET || 'devsecret'; +const TARGET_TICKS_TABLE = process.env.TICKS_TABLE; +const TARGET_CANDLES_FUNCTION = process.env.CANDLES_FUNCTION; + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +async function httpJson(url, body) { + const res = await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-hasura-admin-secret': ADMIN_SECRET, + }, + body: JSON.stringify(body), + }); + const text = await res.text(); + let json; + try { + json = JSON.parse(text); + } catch { + json = { raw: text }; + } + return { ok: res.ok, status: res.status, json, text }; +} + +async function waitForHasura() { + const healthUrl = `${HASURA_URL.replace(/\/$/, '')}/healthz`; + for (let i = 0; i < 60; i++) { + try { + const res = await fetch(healthUrl); + if (res.ok) return; + } catch { + // ignore + } + await sleep(1000); + } + throw new Error(`Hasura not healthy at ${healthUrl}`); +} + +function isAlreadyExistsError(errText) { + const t = String(errText || '').toLowerCase(); + return ( + t.includes('already tracked') || + t.includes('already exists') || + t.includes('already present') || + t.includes('already been tracked') || + (t.includes('permission') && t.includes('already exists')) + ); +} + +function isMissingRelationError(errText) { + const t = String(errText || '').toLowerCase(); + return t.includes('does not exist') || t.includes('not found') || t.includes('not tracked') || t.includes('already untracked'); +} + +function normalizeName(name) { + const v = String(name || '') + .trim() + .toLowerCase(); + if (!v) return undefined; + if (!/^[a-z][a-z0-9_]*$/.test(v)) throw new Error(`Invalid name: ${name}`); + return v; +} + +async function metadata(op) { + const url = `${HASURA_URL.replace(/\/$/, '')}/v1/metadata`; + const res = await httpJson(url, op); + if (res.ok) return { status: 'ok', res }; + + const errText = res.json?.error || res.text; + if (isAlreadyExistsError(errText)) return { status: 'skip', res }; + + throw new Error(`Hasura metadata error (${res.status}): ${JSON.stringify(res.json)}`); +} + +async function metadataIgnore(op) { + const url = `${HASURA_URL.replace(/\/$/, '')}/v1/metadata`; + const res = await httpJson(url, op); + if (res.ok) return { status: 'ok', res }; + + const errText = res.json?.error || res.text; + if (isAlreadyExistsError(errText) || isMissingRelationError(errText)) return { status: 'skip', res }; + + throw new Error(`Hasura metadata error (${res.status}): ${JSON.stringify(res.json)}`); +} + +async function main() { + console.log(`[hasura-bootstrap] HASURA_URL=${HASURA_URL}`); + await waitForHasura(); + + const PUBLIC_DLOB_SOURCE_HEADER = 'X-Hasura-Dlob-Source'; + + const apiTokensTable = { schema: 'public', name: 'api_tokens' }; + const botConfigTable = { schema: 'public', name: 'bot_config' }; + const botStateTable = { schema: 'public', name: 'bot_state' }; + const botEventsTable = { schema: 'public', name: 'bot_events' }; + const source = 'default'; + + const baseTicks = { schema: 'public', name: 'drift_ticks' }; + const dlobL2LatestTable = { schema: 'public', name: 'dlob_l2_latest' }; + const dlobStatsLatestTable = { schema: 'public', name: 'dlob_stats_latest' }; + const dlobDepthBpsLatestTable = { schema: 'public', name: 'dlob_depth_bps_latest' }; + const dlobSlippageLatestTable = { schema: 'public', name: 'dlob_slippage_latest' }; + const dlobSlippageLatestV2Table = { schema: 'public', name: 'dlob_slippage_latest_v2' }; + const candlesCacheTable = { schema: 'public', name: 'drift_candles_cache' }; + const dlobStatsTsTable = { schema: 'public', name: 'dlob_stats_ts' }; + const dlobDepthBpsTsTable = { schema: 'public', name: 'dlob_depth_bps_ts' }; + const dlobSlippageTsTable = { schema: 'public', name: 'dlob_slippage_ts' }; + const dlobSlippageTsV2Table = { schema: 'public', name: 'dlob_slippage_ts_v2' }; + const baseCandlesFn = { schema: 'public', name: 'get_drift_candles' }; + const candlesReturnTable = { schema: 'public', name: 'drift_candles' }; + + const extraTicksName = normalizeName(TARGET_TICKS_TABLE); + const extraCandlesName = normalizeName(TARGET_CANDLES_FUNCTION); + + const tickTables = [baseTicks]; + if (extraTicksName && extraTicksName !== baseTicks.name) { + tickTables.push({ schema: 'public', name: extraTicksName }); + } + + const candleFns = [baseCandlesFn]; + if (extraCandlesName && extraCandlesName !== baseCandlesFn.name) { + candleFns.push({ schema: 'public', name: extraCandlesName }); + } + + const ensureTickTable = async (table) => { + await metadata({ type: 'pg_track_table', args: { source, table } }); + + // Ensure latest permission definition (drop+create avoids stale column sets). + await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } }); + await metadata({ + type: 'pg_create_select_permission', + args: { + source, + table, + role: 'public', + permission: { + columns: ['ts', 'market_index', 'symbol', 'oracle_price', 'mark_price', 'oracle_slot', 'source'], + filter: {}, + }, + }, + }); + + await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } }); + await metadata({ + type: 'pg_create_insert_permission', + args: { + source, + table, + role: 'ingestor', + permission: { + check: {}, + set: {}, + columns: ['ts', 'market_index', 'symbol', 'oracle_price', 'mark_price', 'oracle_slot', 'source', 'raw'], + }, + }, + }); + + await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } }); + await metadata({ + type: 'pg_create_update_permission', + args: { + source, + table, + role: 'ingestor', + permission: { + filter: {}, + check: {}, + columns: ['oracle_price', 'mark_price', 'oracle_slot', 'source', 'raw'], + }, + }, + }); + }; + + for (const t of tickTables) { + await ensureTickTable(t); + } + + // Cached candles table (precomputed by worker; public read). + await ensurePublicSelectTable(candlesCacheTable, [ + 'bucket', + 'bucket_seconds', + 'symbol', + 'source', + 'open', + 'high', + 'low', + 'close', + 'oracle_close', + 'ticks', + 'updated_at', + ]); + + const ensureDlobTable = async (table, columns, { publicFilter } = {}) => { + await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); + await metadata({ type: 'pg_track_table', args: { source, table } }); + + await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } }); + await metadata({ + type: 'pg_create_select_permission', + args: { + source, + table, + role: 'public', + permission: { + columns, + filter: publicFilter || {}, + }, + }, + }); + + await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } }); + await metadata({ + type: 'pg_create_insert_permission', + args: { + source, + table, + role: 'ingestor', + permission: { + check: {}, + set: {}, + columns, + }, + }, + }); + + await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } }); + await metadata({ + type: 'pg_create_update_permission', + args: { + source, + table, + role: 'ingestor', + permission: { + filter: {}, + check: {}, + columns, + }, + }, + }); + }; + + async function ensurePublicSelectTable(table, columns, { publicFilter } = {}) { + await metadataIgnore({ type: 'pg_untrack_table', args: { source, table } }); + await metadata({ type: 'pg_track_table', args: { source, table } }); + + await metadataIgnore({ type: 'pg_drop_select_permission', args: { source, table, role: 'public' } }); + await metadata({ + type: 'pg_create_select_permission', + args: { + source, + table, + role: 'public', + permission: { + columns, + filter: publicFilter || {}, + }, + }, + }); + + // Computed/archived tables are written by workers using admin secret; keep ingestor off by default. + await metadataIgnore({ type: 'pg_drop_insert_permission', args: { source, table, role: 'ingestor' } }); + await metadataIgnore({ type: 'pg_drop_update_permission', args: { source, table, role: 'ingestor' } }); + } + + const dlobPublicFilter = { source: { _eq: PUBLIC_DLOB_SOURCE_HEADER } }; + + await ensureDlobTable(dlobL2LatestTable, [ + 'source', + 'market_name', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mark_price', + 'oracle_price', + 'best_bid_price', + 'best_ask_price', + 'bids', + 'asks', + 'raw', + 'updated_at', + ], { publicFilter: dlobPublicFilter }); + + await ensureDlobTable(dlobStatsLatestTable, [ + 'source', + 'market_name', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mark_price', + 'oracle_price', + 'best_bid_price', + 'best_ask_price', + 'mid_price', + 'spread_abs', + 'spread_bps', + 'depth_levels', + 'depth_bid_base', + 'depth_ask_base', + 'depth_bid_usd', + 'depth_ask_usd', + 'imbalance', + 'raw', + 'updated_at', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobDepthBpsLatestTable, [ + 'source', + 'market_name', + 'band_bps', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'bid_base', + 'ask_base', + 'bid_usd', + 'ask_usd', + 'imbalance', + 'raw', + 'updated_at', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobSlippageLatestTable, [ + 'source', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + 'updated_at', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobSlippageLatestV2Table, [ + 'source', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + 'updated_at', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobStatsTsTable, [ + 'ts', + 'id', + 'source', + 'market_name', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mark_price', + 'oracle_price', + 'best_bid_price', + 'best_ask_price', + 'mid_price', + 'spread_abs', + 'spread_bps', + 'depth_levels', + 'depth_bid_base', + 'depth_ask_base', + 'depth_bid_usd', + 'depth_ask_usd', + 'imbalance', + 'raw', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobDepthBpsTsTable, [ + 'ts', + 'id', + 'source', + 'market_name', + 'band_bps', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'best_bid_price', + 'best_ask_price', + 'bid_base', + 'ask_base', + 'bid_usd', + 'ask_usd', + 'imbalance', + 'raw', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobSlippageTsTable, [ + 'ts', + 'id', + 'source', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + ], { publicFilter: dlobPublicFilter }); + + await ensurePublicSelectTable(dlobSlippageTsV2Table, [ + 'ts', + 'id', + 'source', + 'market_name', + 'side', + 'size_usd', + 'market_type', + 'market_index', + 'source_ts', + 'slot', + 'mid_price', + 'vwap_price', + 'worst_price', + 'filled_usd', + 'filled_base', + 'impact_bps', + 'levels_consumed', + 'fill_pct', + 'raw', + ], { publicFilter: dlobPublicFilter }); + + // Bot control-plane tables (tracked; permissions are intentionally not created here by default). + for (const t of [botConfigTable, botStateTable, botEventsTable]) { + await metadataIgnore({ type: 'pg_untrack_table', args: { source, table: t } }); + await metadata({ type: 'pg_track_table', args: { source, table: t } }); + } + + // Return table type for candle functions (needed for Hasura to track the function). + await metadataIgnore({ type: 'pg_track_table', args: { source, table: candlesReturnTable } }); + + try { + await metadata({ type: 'pg_track_table', args: { source, table: apiTokensTable } }); + } catch (err) { + const msg = String(err?.message || err); + if (msg.toLowerCase().includes('api_tokens') && isMissingRelationError(msg)) { + console.log('[hasura-bootstrap] api_tokens missing (run initdb SQL to create it); skipping track'); + } else { + throw err; + } + } + + for (const fn of candleFns) { + // Function for aggregated candle queries (used by trade-api). + await metadataIgnore({ type: 'pg_untrack_function', args: { source, function: fn } }); + await metadata({ type: 'pg_track_function', args: { source, function: fn } }); + } + + console.log('[hasura-bootstrap] ok'); +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +}); diff --git a/environments/sol/trade-r001-canary/assets/postgres/001_init.sql b/environments/sol/trade-r001-canary/assets/postgres/001_init.sql new file mode 100644 index 0000000..d7a2b32 --- /dev/null +++ b/environments/sol/trade-r001-canary/assets/postgres/001_init.sql @@ -0,0 +1,860 @@ +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +-- `drift_ticks` is an append-only tick log. +-- +-- TimescaleDB hypertables require every UNIQUE index / PRIMARY KEY to include the partitioning column (`ts`). +-- Therefore we use a composite primary key (ts, id) instead of PRIMARY KEY(id). +CREATE TABLE IF NOT EXISTS drift_ticks ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_index INTEGER NOT NULL, + symbol TEXT NOT NULL, + oracle_price NUMERIC NOT NULL, + mark_price NUMERIC, + oracle_slot BIGINT, + source TEXT NOT NULL DEFAULT 'drift_oracle', + raw JSONB, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS mark_price NUMERIC; +ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS id BIGSERIAL; + +-- Migrate price columns to NUMERIC (Hasura `numeric` scalar returns strings; see app code). +DO $$ +DECLARE + oracle_type text; + mark_type text; +BEGIN + SELECT data_type INTO oracle_type + FROM information_schema.columns + WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='oracle_price' + LIMIT 1; + + IF oracle_type IS NOT NULL AND oracle_type <> 'numeric' THEN + EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN oracle_price TYPE numeric USING oracle_price::numeric'; + END IF; + + SELECT data_type INTO mark_type + FROM information_schema.columns + WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='mark_price' + LIMIT 1; + + IF mark_type IS NOT NULL AND mark_type <> 'numeric' THEN + EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN mark_price TYPE numeric USING mark_price::numeric'; + END IF; +END $$; + +-- Ensure PRIMARY KEY is (ts, id) (Timescale hypertables require partition column in any UNIQUE/PK). +-- IMPORTANT: keep this idempotent so we can run migrations while the ingestor keeps writing ticks. +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'drift_ticks' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; + ELSIF pk_cols <> ARRAY['ts','id'] THEN + EXECUTE format('ALTER TABLE public.drift_ticks DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; + END IF; +END $$; + +-- Convert to hypertable (migrate existing rows if any). +SELECT create_hypertable('drift_ticks', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +-- Historical note: earlier versions used a UNIQUE(market_index, ts) upsert model with ts rounded to seconds. +-- For "full ticks" (ms precision + multiple sources), we keep drift_ticks as an append-only event log. +ALTER TABLE drift_ticks DROP CONSTRAINT IF EXISTS drift_ticks_market_ts_unique; + +CREATE INDEX IF NOT EXISTS drift_ticks_market_ts_desc_idx + ON drift_ticks (market_index, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_symbol_ts_desc_idx + ON drift_ticks (symbol, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_market_source_ts_desc_idx + ON drift_ticks (market_index, source, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_symbol_source_ts_desc_idx + ON drift_ticks (symbol, source, ts DESC); + +-- Revocable API tokens for external algs (store only hashes, never raw tokens). +CREATE TABLE IF NOT EXISTS api_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + token_hash TEXT NOT NULL UNIQUE, + scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[], + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + revoked_at TIMESTAMPTZ, + last_used_at TIMESTAMPTZ, + meta JSONB +); + +ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[]; + +CREATE INDEX IF NOT EXISTS api_tokens_revoked_at_idx + ON api_tokens (revoked_at); + +-- Bot control-plane (desired state) + executor telemetry (state/events). +-- +-- MVP intent: +-- - `bot_config`: desired state (mode/market/limits/kill switch) +-- - `bot_state`: heartbeat + last error/action +-- - `bot_events`: append-only audit log +CREATE TABLE IF NOT EXISTS public.bot_config ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL UNIQUE, + market_name TEXT NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + mode TEXT NOT NULL DEFAULT 'off', + kill_switch BOOLEAN NOT NULL DEFAULT FALSE, + params JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS bot_config_updated_at_desc_idx + ON public.bot_config (updated_at DESC); + +CREATE TABLE IF NOT EXISTS public.bot_state ( + bot_id UUID PRIMARY KEY REFERENCES public.bot_config(id) ON DELETE CASCADE, + last_heartbeat_at TIMESTAMPTZ, + last_action_at TIMESTAMPTZ, + last_error TEXT, + state JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS bot_state_updated_at_desc_idx + ON public.bot_state (updated_at DESC); + +CREATE TABLE IF NOT EXISTS public.bot_events ( + id BIGSERIAL PRIMARY KEY, + bot_id UUID NOT NULL REFERENCES public.bot_config(id) ON DELETE CASCADE, + ts TIMESTAMPTZ NOT NULL DEFAULT now(), + type TEXT NOT NULL, + payload JSONB +); + +CREATE INDEX IF NOT EXISTS bot_events_bot_ts_desc_idx + ON public.bot_events (bot_id, ts DESC); + +-- Compute OHLC candles from `drift_ticks` for a symbol and bucket size. +-- Exposed via Hasura (track function) and used by trade-api to compute indicators server-side. +-- Hasura tracks functions only if they return SETOF a table/view type. +-- This table is used purely as the return type for candle functions. +CREATE TABLE IF NOT EXISTS public.drift_candles ( + bucket timestamptz, + open numeric, + high numeric, + low numeric, + close numeric, + oracle_open numeric, + oracle_high numeric, + oracle_low numeric, + oracle_close numeric, + ticks bigint +); + +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric; +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric; +ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric; + +-- Precomputed candle cache (materialized by a worker). +-- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand. +-- NOTE: `source=''` means "any source" (no source filter). +CREATE TABLE IF NOT EXISTS public.drift_candles_cache ( + bucket timestamptz NOT NULL, + bucket_seconds integer NOT NULL, + symbol text NOT NULL, + source text NOT NULL DEFAULT '', + open numeric NOT NULL, + high numeric NOT NULL, + low numeric NOT NULL, + close numeric NOT NULL, + oracle_open numeric, + oracle_high numeric, + oracle_low numeric, + oracle_close numeric, + ticks bigint NOT NULL DEFAULT 0, + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (bucket, bucket_seconds, symbol, source) +); + +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric; +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric; +ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric; + +SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx + ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC); + +-- If an older version of the function exists with an incompatible return type, +-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent). +DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text); + +CREATE OR REPLACE FUNCTION public.get_drift_candles( + p_symbol text, + p_bucket_seconds integer, + p_limit integer DEFAULT 500, + p_source text DEFAULT NULL +) +RETURNS SETOF public.drift_candles +LANGUAGE sql +STABLE +AS $$ + -- Zwraca zawsze "ciągłe" buckety (fill forward), nawet jeśli nie było ticków w danej sekundzie/minucie. + -- Dzięki temu frontend może rysować regularną oś czasu (np. 1px = 1s) bez dziwnych przeskoków. + WITH src AS ( + SELECT COALESCE(p_source, '') AS source_key + ), + raw_cached AS ( + SELECT + c.bucket, + c.open, + c.high, + c.low, + c.close, + c.oracle_open, + c.oracle_high, + c.oracle_low, + c.oracle_close, + c.ticks + FROM public.drift_candles_cache c, src + WHERE c.symbol = p_symbol + AND c.bucket_seconds = p_bucket_seconds + AND c.source = src.source_key + ORDER BY c.bucket DESC + LIMIT p_limit + ), + raw_fallback AS ( + SELECT + time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket, + ts, + COALESCE(mark_price, oracle_price) AS px, + oracle_price AS oracle_px + FROM public.drift_ticks, src + WHERE symbol = p_symbol + AND (src.source_key = '' OR source = src.source_key) + AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2)) + ), + computed AS ( + SELECT + bucket, + (array_agg(px ORDER BY ts ASC))[1] AS open, + max(px) AS high, + min(px) AS low, + (array_agg(px ORDER BY ts DESC))[1] AS close, + (array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open, + max(oracle_px) AS oracle_high, + min(oracle_px) AS oracle_low, + (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, + count(*) AS ticks + FROM raw_fallback + GROUP BY bucket + ), + data AS ( + SELECT * FROM raw_cached + UNION ALL + SELECT * FROM computed + WHERE NOT EXISTS (SELECT 1 FROM raw_cached) + ), + bounds AS ( + SELECT max(bucket) AS end_bucket FROM data + ), + params AS ( + SELECT + make_interval(secs => p_bucket_seconds) AS step, + make_interval(secs => (p_bucket_seconds * (p_limit - 1))) AS span + ), + series AS ( + SELECT generate_series( + bounds.end_bucket - params.span, + bounds.end_bucket, + params.step + ) AS bucket + FROM bounds, params + WHERE bounds.end_bucket IS NOT NULL + ), + joined AS ( + SELECT + s.bucket, + d.open, + d.high, + d.low, + d.close, + d.oracle_open, + d.oracle_high, + d.oracle_low, + d.oracle_close, + d.ticks + FROM series s + LEFT JOIN data d USING (bucket) + ORDER BY s.bucket ASC + ), + grouped AS ( + SELECT + *, + sum(CASE WHEN close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_close, + sum(CASE WHEN oracle_close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_oracle + FROM joined + ), + first_vals AS ( + SELECT + (SELECT close FROM grouped WHERE close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_close, + (SELECT oracle_close FROM grouped WHERE oracle_close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_oracle + ), + ff AS ( + SELECT + g.bucket, + g.open, + g.high, + g.low, + g.close, + g.oracle_open, + g.oracle_high, + g.oracle_low, + g.oracle_close, + g.ticks, + COALESCE( + g.close, + max(g.close) OVER (PARTITION BY g.grp_close), + f.first_close + ) AS ff_close, + COALESCE( + g.oracle_close, + max(g.oracle_close) OVER (PARTITION BY g.grp_oracle), + f.first_oracle + ) AS ff_oracle + FROM grouped g + CROSS JOIN first_vals f + ) + SELECT + bucket, + COALESCE(open, ff_close) AS open, + COALESCE(high, ff_close) AS high, + COALESCE(low, ff_close) AS low, + COALESCE(close, ff_close) AS close, + COALESCE(oracle_open, ff_oracle) AS oracle_open, + COALESCE(oracle_high, ff_oracle) AS oracle_high, + COALESCE(oracle_low, ff_oracle) AS oracle_low, + COALESCE(oracle_close, ff_oracle) AS oracle_close, + COALESCE(ticks, 0) AS ticks + FROM ff + ORDER BY bucket DESC + LIMIT p_limit; +$$; + +-- Latest DLOB orderbook snapshots (top-N levels), per market. +-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions. +CREATE TABLE IF NOT EXISTS public.dlob_l2_latest ( + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mark_price NUMERIC, + oracle_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + bids JSONB, + asks JSONB, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (source, market_name) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL; + +-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel). +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)'; + ELSIF pk_cols <> ARRAY['source','market_name'] THEN + EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)'; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx + ON public.dlob_l2_latest (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx + ON public.dlob_l2_latest (source, updated_at DESC); + +-- Derived stats for fast UI display. +CREATE TABLE IF NOT EXISTS public.dlob_stats_latest ( + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mark_price NUMERIC, + oracle_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + mid_price NUMERIC, + spread_abs NUMERIC, + spread_bps NUMERIC, + depth_levels INTEGER, + depth_bid_base NUMERIC, + depth_ask_base NUMERIC, + depth_bid_usd NUMERIC, + depth_ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (source, market_name) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL; + +-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel). +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)'; + ELSIF pk_cols <> ARRAY['source','market_name'] THEN + EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)'; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx + ON public.dlob_stats_latest (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx + ON public.dlob_stats_latest (source, updated_at DESC); + +-- Depth snapshots within bps bands around mid-price (per market, per band). +-- Filled by a derived worker that reads `dlob_l2_latest`. +CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest ( + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + band_bps INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + bid_base NUMERIC, + ask_base NUMERIC, + bid_usd NUMERIC, + ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (source, market_name, band_bps) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL; + +-- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel). +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)'; + ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN + EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)'; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx + ON public.dlob_depth_bps_latest (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx + ON public.dlob_depth_bps_latest (market_name); + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx + ON public.dlob_depth_bps_latest (source, market_name); + +-- Slippage/impact estimates for "market" orders at common USD sizes. +-- Filled by a derived worker that reads `dlob_l2_latest`. +CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest ( + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + side TEXT NOT NULL, + size_usd INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (source, market_name, side, size_usd), + CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell')) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL; + +-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel). +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)'; + ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN + EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)'; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx + ON public.dlob_slippage_latest (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx + ON public.dlob_slippage_latest (market_name); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx + ON public.dlob_slippage_latest (source, market_name); + +-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side. +-- Keep v1 intact for backward compatibility and to avoid data loss. +CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 ( + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + side TEXT NOT NULL, -- buy|sell + size_usd NUMERIC NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (source, market_name, side, size_usd), + CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell')) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL; + +-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel). +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)'; + ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN + EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)'; + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx + ON public.dlob_slippage_latest_v2 (updated_at DESC); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx + ON public.dlob_slippage_latest_v2 (market_name); + +CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx + ON public.dlob_slippage_latest_v2 (source, market_name); + +-- Time-series tables for UI history (start: 7 days). +-- Keep these append-only; use Timescale hypertables. + +CREATE TABLE IF NOT EXISTS public.dlob_stats_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mark_price NUMERIC, + oracle_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + mid_price NUMERIC, + spread_abs NUMERIC, + spread_bps NUMERIC, + depth_levels INTEGER, + depth_bid_base NUMERIC, + depth_ask_base NUMERIC, + depth_bid_usd NUMERIC, + depth_ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL; + +SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx + ON public.dlob_stats_ts (market_name, ts DESC); + +CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx + ON public.dlob_stats_ts (source, market_name, ts DESC); + +CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + band_bps INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + best_bid_price NUMERIC, + best_ask_price NUMERIC, + bid_base NUMERIC, + ask_base NUMERIC, + bid_usd NUMERIC, + ask_usd NUMERIC, + imbalance NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL; + +SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx + ON public.dlob_depth_bps_ts (market_name, ts DESC); + +CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx + ON public.dlob_depth_bps_ts (source, market_name, ts DESC); + +CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + side TEXT NOT NULL, + size_usd INTEGER NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL; + +SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx + ON public.dlob_slippage_ts (market_name, ts DESC); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx + ON public.dlob_slippage_ts (source, market_name, ts DESC); + +CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + source TEXT NOT NULL DEFAULT 'mevnode', + market_name TEXT NOT NULL, + side TEXT NOT NULL, + size_usd NUMERIC NOT NULL, + market_type TEXT NOT NULL DEFAULT 'perp', + market_index INTEGER, + source_ts BIGINT, + slot BIGINT, + mid_price NUMERIC, + vwap_price NUMERIC, + worst_price NUMERIC, + filled_usd NUMERIC, + filled_base NUMERIC, + impact_bps NUMERIC, + levels_consumed INTEGER, + fill_pct NUMERIC, + raw JSONB, + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT; +ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode'; +UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL; +ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL; + +SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx + ON public.dlob_slippage_ts_v2 (market_name, ts DESC); + +CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx + ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC); +-- Retention policies (best-effort; safe if Timescale is present). +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_stats_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN + -- ignore if policy exists or function unavailable +END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_depth_bps_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; +DO $$ +BEGIN + PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days'); +EXCEPTION WHEN OTHERS THEN +END $$; diff --git a/environments/sol/trade-r001-canary/hasura-bootstrap-job.yaml b/environments/sol/trade-r001-canary/hasura-bootstrap-job.yaml new file mode 100644 index 0000000..00aae01 --- /dev/null +++ b/environments/sol/trade-r001-canary/hasura-bootstrap-job.yaml @@ -0,0 +1,52 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: hasura-bootstrap + namespace: trade-r001-canary +spec: + backoffLimit: 5 + template: + spec: + restartPolicy: OnFailure + initContainers: + - name: wait-db + image: postgres:16 + imagePullPolicy: IfNotPresent + envFrom: + - secretRef: + name: trade-postgres + command: + - sh + - -ec + - | + export PGPASSWORD="$POSTGRES_PASSWORD" + until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done + until psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -c "select 1 from public.drift_ticks limit 1" >/dev/null 2>&1; do sleep 1; done + containers: + - name: hasura-bootstrap + image: node:20-slim + imagePullPolicy: IfNotPresent + env: + - name: HASURA_URL + value: http://hasura:8080 + - name: HASURA_ADMIN_SECRET + valueFrom: + secretKeyRef: + name: trade-hasura + key: HASURA_GRAPHQL_ADMIN_SECRET + - name: TICKS_TABLE + value: drift_ticks + - name: CANDLES_FUNCTION + value: get_drift_candles + command: + - node + - /app/hasura-bootstrap.mjs + volumeMounts: + - name: script + mountPath: /app/hasura-bootstrap.mjs + subPath: hasura-bootstrap.mjs + readOnly: true + volumes: + - name: script + configMap: + name: hasura-bootstrap-script diff --git a/environments/sol/trade-r001-canary/hasura-deployment.yaml b/environments/sol/trade-r001-canary/hasura-deployment.yaml new file mode 100644 index 0000000..759ff7b --- /dev/null +++ b/environments/sol/trade-r001-canary/hasura-deployment.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hasura + namespace: trade-r001-canary +spec: + replicas: 1 + selector: + matchLabels: + app: hasura + template: + metadata: + labels: + app: hasura + spec: + containers: + - name: hasura + image: hasura/graphql-engine:v2.40.1 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + envFrom: + - secretRef: + name: trade-postgres + - secretRef: + name: trade-hasura + env: + - name: HASURA_GRAPHQL_DATABASE_URL + value: postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB) + - name: HASURA_GRAPHQL_JWT_SECRET + value: '{"type":"HS256","key":"$(HASURA_JWT_KEY)"}' + - name: HASURA_GRAPHQL_ENABLE_CONSOLE + value: "false" + - name: HASURA_GRAPHQL_DEV_MODE + value: "false" + - name: HASURA_GRAPHQL_CORS_DOMAIN + value: http://localhost:5173,http://127.0.0.1:5173 + - name: HASURA_GRAPHQL_UNAUTHORIZED_ROLE + value: public + resources: + requests: + cpu: 250m + memory: 512Mi + limits: + cpu: "1" + memory: 1Gi + readinessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 + livenessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 3 diff --git a/environments/sol/trade-r001-canary/hasura-service.yaml b/environments/sol/trade-r001-canary/hasura-service.yaml new file mode 100644 index 0000000..0005702 --- /dev/null +++ b/environments/sol/trade-r001-canary/hasura-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: hasura + namespace: trade-r001-canary +spec: + type: ClusterIP + selector: + app: hasura + ports: + - name: http + port: 8080 + targetPort: http diff --git a/environments/sol/trade-r001-canary/kustomization.yaml b/environments/sol/trade-r001-canary/kustomization.yaml index c2b0dd4..61d2e02 100644 --- a/environments/sol/trade-r001-canary/kustomization.yaml +++ b/environments/sol/trade-r001-canary/kustomization.yaml @@ -1,7 +1,35 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization +namespace: trade-r001-canary + +generatorOptions: + disableNameSuffixHash: true + resources: - namespace.yaml - resourcequota.yaml - limitrange.yaml + - postgres-alias-service.yaml + - hasura-service.yaml + - hasura-deployment.yaml + - postgres-migrate-job.yaml + - hasura-bootstrap-job.yaml + - trade-api-service.yaml + - trade-api-deployment.yaml + - trade-frontend-service.yaml + - trade-frontend-deployment.yaml + +configMapGenerator: + - name: postgres-initdb + files: + - assets/postgres/001_init.sql + - name: hasura-bootstrap-script + files: + - assets/hasura/hasura-bootstrap.mjs + - name: trade-api-wrapper + files: + - assets/api/wrapper.mjs + - name: trade-api-upstream + files: + - assets/api/server.mjs diff --git a/environments/sol/trade-r001-canary/postgres-alias-service.yaml b/environments/sol/trade-r001-canary/postgres-alias-service.yaml new file mode 100644 index 0000000..901a27f --- /dev/null +++ b/environments/sol/trade-r001-canary/postgres-alias-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: trade-r001-canary +spec: + type: ExternalName + externalName: postgres-host.trade-infra.svc.cluster.local + ports: + - name: postgres + port: 5432 + targetPort: 5432 diff --git a/environments/sol/trade-r001-canary/postgres-migrate-job.yaml b/environments/sol/trade-r001-canary/postgres-migrate-job.yaml new file mode 100644 index 0000000..25124ba --- /dev/null +++ b/environments/sol/trade-r001-canary/postgres-migrate-job.yaml @@ -0,0 +1,33 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: postgres-migrate + namespace: trade-r001-canary +spec: + backoffLimit: 3 + template: + spec: + restartPolicy: OnFailure + containers: + - name: postgres-migrate + image: postgres:16 + imagePullPolicy: IfNotPresent + envFrom: + - secretRef: + name: trade-postgres + command: + - sh + - -ec + - | + export PGPASSWORD="$POSTGRES_PASSWORD" + until pg_isready -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB"; do sleep 1; done + psql -h postgres -U "$POSTGRES_USER" -d "$POSTGRES_DB" -v ON_ERROR_STOP=1 -f /migrations/001_init.sql + volumeMounts: + - name: migrations + mountPath: /migrations/001_init.sql + subPath: 001_init.sql + readOnly: true + volumes: + - name: migrations + configMap: + name: postgres-initdb diff --git a/environments/sol/trade-r001-canary/scripts/create-gitea-registry-secret.sh b/environments/sol/trade-r001-canary/scripts/create-gitea-registry-secret.sh new file mode 100755 index 0000000..b1d7536 --- /dev/null +++ b/environments/sol/trade-r001-canary/scripts/create-gitea-registry-secret.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +TARGET_HOST="${TARGET_HOST:-mevnode}" +TARGET_NAMESPACE="${TARGET_NAMESPACE:-trade-r001-canary}" +REGISTRY_HOST="${REGISTRY_HOST:-gitea.mpabi.pl}" +REGISTRY_USER="${REGISTRY_USER:-u1}" +REGISTRY_TOKEN_FILE="${REGISTRY_TOKEN_FILE:-/home/user/dev/trade/tokens/gitea-registry.token}" + +ssh_target() { + ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@" +} + +REGISTRY_TOKEN="$(tr -d '\r\n' < "$REGISTRY_TOKEN_FILE")" +if [ -z "$REGISTRY_TOKEN" ]; then + echo "Registry token is empty" >&2 + exit 1 +fi + +ssh_target "sudo k3s kubectl get ns ${TARGET_NAMESPACE} >/dev/null 2>&1 || sudo k3s kubectl create ns ${TARGET_NAMESPACE} >/dev/null" + +ssh_target "sudo k3s kubectl -n ${TARGET_NAMESPACE} create secret docker-registry gitea-registry --docker-server=${REGISTRY_HOST} --docker-username=${REGISTRY_USER} --docker-password='${REGISTRY_TOKEN}' --dry-run=client -o yaml | sudo k3s kubectl apply -f - >/dev/null" + +echo "Updated imagePullSecret gitea-registry in ${TARGET_NAMESPACE} on ${TARGET_HOST}" diff --git a/environments/sol/trade-r001-canary/scripts/prepare-sol-postgres.sh b/environments/sol/trade-r001-canary/scripts/prepare-sol-postgres.sh new file mode 100755 index 0000000..7e521c6 --- /dev/null +++ b/environments/sol/trade-r001-canary/scripts/prepare-sol-postgres.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +set -euo pipefail + +SOURCE_HOST="${SOURCE_HOST:-mevnode_bot}" +SOURCE_NAMESPACE="${SOURCE_NAMESPACE:-trade-staging}" +TARGET_HOST="${TARGET_HOST:-mevnode}" +PG_VERSION="${PG_VERSION:-16}" + +ssh_source() { + ssh -o StrictHostKeyChecking=no "$SOURCE_HOST" "$@" +} + +ssh_target() { + ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@" +} + +SRC_SECRET_JSON="$(ssh_source "sudo k3s kubectl -n ${SOURCE_NAMESPACE} get secret trade-postgres -o json")" +POSTGRES_USER="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_USER' | base64 -d)" +POSTGRES_PASSWORD="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_PASSWORD' | base64 -d)" +POSTGRES_DB="$(printf '%s' "$SRC_SECRET_JSON" | jq -r '.data.POSTGRES_DB' | base64 -d)" + +ssh_target "if ! dpkg -l | grep -q '^ii timescaledb-2-postgresql-${PG_VERSION} '; then curl -fsSL https://packagecloud.io/install/repositories/timescale/timescaledb/script.deb.sh | sudo bash && sudo apt-get update && sudo DEBIAN_FRONTEND=noninteractive apt-get install -y timescaledb-2-postgresql-${PG_VERSION}; fi" + +CURRENT_PRELOAD="$(ssh_target "sudo -u postgres psql -Atqc \"show shared_preload_libraries\"")" +case ",${CURRENT_PRELOAD}," in + *,timescaledb,*) + NEW_PRELOAD="${CURRENT_PRELOAD}" + ;; + ,,) + NEW_PRELOAD="timescaledb" + ;; + *) + NEW_PRELOAD="${CURRENT_PRELOAD},timescaledb" + ;; +esac + +ssh_target "sudo -u postgres psql -Atqc \"ALTER SYSTEM SET shared_preload_libraries = '${NEW_PRELOAD}';\" && sudo systemctl restart postgresql" + +APP_USER_SQL=$(printf "%s" "$POSTGRES_USER" | sed "s/'/''/g") +APP_PASSWORD_SQL=$(printf "%s" "$POSTGRES_PASSWORD" | sed "s/'/''/g") +APP_DB_SQL=$(printf "%s" "$POSTGRES_DB" | sed "s/'/''/g") + +ssh_target "sudo -u postgres psql -v ON_ERROR_STOP=1 <<'SQL' +DO \$\$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = '${APP_USER_SQL}') THEN + EXECUTE format('CREATE ROLE %I LOGIN PASSWORD %L', '${APP_USER_SQL}', '${APP_PASSWORD_SQL}'); + ELSE + EXECUTE format('ALTER ROLE %I WITH LOGIN PASSWORD %L', '${APP_USER_SQL}', '${APP_PASSWORD_SQL}'); + END IF; +END +\$\$; +SELECT format('CREATE DATABASE %I OWNER %I', '${APP_DB_SQL}', '${APP_USER_SQL}') +WHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${APP_DB_SQL}') +\\gexec +ALTER DATABASE \"${POSTGRES_DB}\" OWNER TO \"${POSTGRES_USER}\"; +SQL" + +echo "Prepared host Postgres on ${TARGET_HOST} for ${POSTGRES_USER}/${POSTGRES_DB} with TimescaleDB enabled" diff --git a/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh b/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh new file mode 100755 index 0000000..684e113 --- /dev/null +++ b/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +set -euo pipefail + +SOURCE_HOST="${SOURCE_HOST:-mevnode_bot}" +SOURCE_NAMESPACE="${SOURCE_NAMESPACE:-trade-staging}" +TARGET_HOST="${TARGET_HOST:-mevnode}" +TARGET_NAMESPACE="${TARGET_NAMESPACE:-trade-r001-canary}" + +SECRETS=( + trade-postgres + trade-hasura + trade-api + trade-frontend-tokens + trade-basic-auth +) + +ssh_source() { + ssh -o StrictHostKeyChecking=no "$SOURCE_HOST" "$@" +} + +ssh_target() { + ssh -o StrictHostKeyChecking=no "$TARGET_HOST" "$@" +} + +ssh_target "sudo k3s kubectl get ns ${TARGET_NAMESPACE} >/dev/null 2>&1 || sudo k3s kubectl create ns ${TARGET_NAMESPACE} >/dev/null" + +for secret_name in "${SECRETS[@]}"; do + SECRET_JSON="$(ssh_source "sudo k3s kubectl -n ${SOURCE_NAMESPACE} get secret ${secret_name} -o json")" + printf '%s' "$SECRET_JSON" \ + | jq --arg ns "$TARGET_NAMESPACE" 'del(.metadata.uid,.metadata.resourceVersion,.metadata.creationTimestamp,.metadata.managedFields,.metadata.ownerReferences,.metadata.selfLink,.metadata.annotations["kubectl.kubernetes.io/last-applied-configuration"]) | .metadata.namespace = $ns' \ + | ssh_target "sudo k3s kubectl apply -f - >/dev/null" + echo "Synced ${secret_name} to ${TARGET_NAMESPACE}" +done diff --git a/environments/sol/trade-r001-canary/trade-api-deployment.yaml b/environments/sol/trade-r001-canary/trade-api-deployment.yaml new file mode 100644 index 0000000..fdf499c --- /dev/null +++ b/environments/sol/trade-r001-canary/trade-api-deployment.yaml @@ -0,0 +1,87 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: trade-api + namespace: trade-r001-canary +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: trade-api + template: + metadata: + labels: + app.kubernetes.io/name: trade-api + spec: + imagePullSecrets: + - name: gitea-registry + volumes: + - name: trade-api-wrapper + configMap: + name: trade-api-wrapper + - name: trade-api-upstream + configMap: + name: trade-api-upstream + containers: + - name: api + image: gitea.mpabi.pl/trade/trade-api:k3s-20260111095435 + imagePullPolicy: IfNotPresent + command: + - node + - /override/wrapper.mjs + ports: + - name: http + containerPort: 8787 + volumeMounts: + - name: trade-api-wrapper + mountPath: /override/wrapper.mjs + subPath: wrapper.mjs + readOnly: true + - name: trade-api-upstream + mountPath: /app/services/api/server.mjs + subPath: server.mjs + readOnly: true + env: + - name: PORT + value: "8787" + - name: UPSTREAM_PORT + value: "8788" + - name: UPSTREAM_ENTRY + value: /app/services/api/server.mjs + - name: UPSTREAM_HOST + value: 127.0.0.1 + - name: APP_VERSION + value: canary-r001 + - name: HASURA_GRAPHQL_URL + value: http://hasura:8080/v1/graphql + - name: HASURA_ADMIN_SECRET + valueFrom: + secretKeyRef: + name: trade-hasura + key: HASURA_GRAPHQL_ADMIN_SECRET + - name: API_ADMIN_SECRET + valueFrom: + secretKeyRef: + name: trade-api + key: API_ADMIN_SECRET + - name: CORS_ORIGIN + value: "*" + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: "1" + memory: 1Gi + readinessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 2 + periodSeconds: 5 + livenessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 10 + periodSeconds: 10 diff --git a/environments/sol/trade-r001-canary/trade-api-service.yaml b/environments/sol/trade-r001-canary/trade-api-service.yaml new file mode 100644 index 0000000..e794bfd --- /dev/null +++ b/environments/sol/trade-r001-canary/trade-api-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: trade-api + namespace: trade-r001-canary +spec: + type: ClusterIP + selector: + app.kubernetes.io/name: trade-api + ports: + - name: http + port: 8787 + targetPort: http diff --git a/environments/sol/trade-r001-canary/trade-frontend-deployment.yaml b/environments/sol/trade-r001-canary/trade-frontend-deployment.yaml new file mode 100644 index 0000000..0a8444f --- /dev/null +++ b/environments/sol/trade-r001-canary/trade-frontend-deployment.yaml @@ -0,0 +1,75 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: trade-frontend + namespace: trade-r001-canary +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: trade-frontend + template: + metadata: + labels: + app.kubernetes.io/name: trade-frontend + spec: + imagePullSecrets: + - name: gitea-registry + containers: + - name: frontend + image: gitea.mpabi.pl/trade/trade-frontend:pg-derived-phase1-20260319-030252 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8081 + env: + - name: PORT + value: "8081" + - name: APP_VERSION + value: canary-r001 + - name: API_UPSTREAM + value: http://trade-api:8787 + - name: BASIC_AUTH_FILE + value: /tokens/frontend.json + - name: API_READ_TOKEN_FILE + value: /tokens/read.json + - name: BASIC_AUTH_MODE + value: "off" + - name: AUTH_MODE + value: session + - name: HTPASSWD_FILE + value: /auth/users + volumeMounts: + - name: auth + mountPath: /auth/users + readOnly: true + subPath: users + - name: tokens + mountPath: /tokens + readOnly: true + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + readinessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 2 + periodSeconds: 5 + livenessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 10 + periodSeconds: 10 + volumes: + - name: auth + secret: + secretName: trade-basic-auth + - name: tokens + secret: + secretName: trade-frontend-tokens diff --git a/environments/sol/trade-r001-canary/trade-frontend-service.yaml b/environments/sol/trade-r001-canary/trade-frontend-service.yaml new file mode 100644 index 0000000..18bde18 --- /dev/null +++ b/environments/sol/trade-r001-canary/trade-frontend-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: trade-frontend + namespace: trade-r001-canary +spec: + type: ClusterIP + selector: + app.kubernetes.io/name: trade-frontend + ports: + - name: http + port: 8081 + targetPort: http