diff --git a/.gitea/workflows/deploy-trade-r001-canary.yaml b/.gitea/workflows/deploy-trade-r001-canary.yaml index 01f3f54..4dbd84c 100644 --- a/.gitea/workflows/deploy-trade-r001-canary.yaml +++ b/.gitea/workflows/deploy-trade-r001-canary.yaml @@ -34,7 +34,7 @@ jobs: env: KUBECONFIG: /tmp/kubeconfig run: | - kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth gitea-registry + kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth trade-ingestor-tokens gitea-registry - name: Recreate bootstrap jobs env: @@ -54,7 +54,7 @@ jobs: env: KUBECONFIG: /tmp/kubeconfig run: | - kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend + kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend deploy/trade-ingestor - name: Wait for database and metadata bootstrap env: @@ -70,8 +70,19 @@ jobs: kubectl -n trade-r001-canary rollout status deploy/hasura --timeout=300s kubectl -n trade-r001-canary rollout status deploy/trade-api --timeout=300s kubectl -n trade-r001-canary rollout status deploy/trade-frontend --timeout=300s + kubectl -n trade-r001-canary rollout status deploy/trade-ingestor --timeout=300s kubectl -n trade-r001-canary get deploy,pods -o wide + - name: Verify trade-ingestor runtime + env: + KUBECONFIG: /tmp/kubeconfig + run: | + sleep 10 + pod_name="$(kubectl -n trade-r001-canary get pod -l app.kubernetes.io/name=trade-ingestor -o jsonpath='{.items[0].metadata.name}')" + restart_count="$(kubectl -n trade-r001-canary get pod "$pod_name" -o jsonpath='{.status.containerStatuses[0].restartCount}')" + test "${restart_count}" = "0" + kubectl -n trade-r001-canary logs "$pod_name" --tail=20 + - name: Verify canary namespace connectivity env: KUBECONFIG: /tmp/kubeconfig diff --git a/environments/sol/trade-r001-canary/README.md b/environments/sol/trade-r001-canary/README.md index 60f72de..20acdbc 100644 --- a/environments/sol/trade-r001-canary/README.md +++ b/environments/sol/trade-r001-canary/README.md @@ -7,7 +7,7 @@ Minimal canary namespace for migration baseline `R001` on `sol`. - Reserve a dedicated namespace for the first reconstructed trade deployment. - Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land. - Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`. -- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, and `trade-frontend`. +- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, `trade-frontend`, and the first canary `trade-ingestor` path. ## Current Guardrails @@ -43,7 +43,9 @@ Minimal canary namespace for migration baseline `R001` on `sol`. - The canary workflow re-runs: - `postgres-migrate` - `hasura-bootstrap` - before it waits for `Hasura`, `trade-api`, and `trade-frontend` to become healthy. + before it waits for `Hasura`, `trade-api`, `trade-frontend`, and `trade-ingestor` to become healthy. +- The current canary `trade-ingestor` is intentionally pinned to the schema already reconstructed on `sol` and reads from `dlob_stats_latest`. +- The exact live `R001` ingestor path that reads `dlob_*_derived_latest` remains a follow-up substep after the DLOB writer chain is reconstructed. ## Operator Flow diff --git a/environments/sol/trade-r001-canary/assets/ingestor/dlob-ingestor.mjs b/environments/sol/trade-r001-canary/assets/ingestor/dlob-ingestor.mjs new file mode 100644 index 0000000..d49a9c0 --- /dev/null +++ b/environments/sol/trade-r001-canary/assets/ingestor/dlob-ingestor.mjs @@ -0,0 +1,218 @@ +import process from 'node:process'; +import { setTimeout as sleep } from 'node:timers/promises'; + +function getIsoNow() { + return new Date().toISOString(); +} + +function envString(name, fallback) { + const v = process.env[name]; + if (v == null) return fallback; + const s = String(v).trim(); + return s ? s : fallback; +} + +function envInt(name, fallback, { min, max } = {}) { + const v = process.env[name]; + if (v == null) return fallback; + const n = Number.parseInt(String(v), 10); + if (!Number.isFinite(n)) return fallback; + const low = typeof min === 'number' ? min : n; + const high = typeof max === 'number' ? max : n; + return Math.max(low, Math.min(high, n)); +} + +function envList(name, fallbackCsv) { + const raw = process.env[name] ?? fallbackCsv; + return String(raw) + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +} + +function toIntOrNull(v) { + if (v == null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null; + if (typeof v === 'string') { + const s = v.trim(); + if (!s) return null; + const n = Number.parseInt(s, 10); + return Number.isFinite(n) ? n : null; + } + return null; +} + +function numStr(v) { + if (v == null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null; + if (typeof v === 'string') { + const s = v.trim(); + return s ? s : null; + } + return null; +} + +function isoFromEpochMs(v) { + const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN; + if (!Number.isFinite(n) || n <= 0) return null; + const d = new Date(n); + const ms = d.getTime(); + if (!Number.isFinite(ms)) return null; + return d.toISOString(); +} + +function resolveConfig() { + const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql'); + const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', ''); + if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET'); + + const markets = envList('DLOB_MARKETS', 'SOL-PERP,PUMP-PERP'); + const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 }); + const source = envString('TICKS_SOURCE', 'dlob_stats'); + + return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source }; +} + +async function graphqlRequest(cfg, query, variables) { + const res = await fetch(cfg.hasuraUrl, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-hasura-admin-secret': cfg.hasuraAdminSecret, + }, + body: JSON.stringify({ query, variables }), + signal: AbortSignal.timeout(10_000), + }); + + const text = await res.text(); + if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`); + + let json; + try { + json = JSON.parse(text); + } catch { + throw new Error(`Hasura: invalid json: ${text}`); + } + + if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | ')); + return json.data; +} + +async function fetchStats(cfg) { + const query = ` + query DlobStatsLatest($markets: [String!]!) { + dlob_stats_latest(where: { market_name: { _in: $markets } }) { + market_name + market_index + ts + slot + oracle_price + mark_price + mid_price + best_bid_price + best_ask_price + updated_at + } + } + `; + + const data = await graphqlRequest(cfg, query, { markets: cfg.markets }); + return Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : []; +} + +async function insertTicks(cfg, objects) { + if (!objects.length) return 0; + + const mutation = ` + mutation InsertTicks($objects: [drift_ticks_insert_input!]!) { + insert_drift_ticks(objects: $objects) { affected_rows } + } + `; + + const data = await graphqlRequest(cfg, mutation, { objects }); + return Number(data?.insert_drift_ticks?.affected_rows || 0); +} + +async function main() { + const cfg = resolveConfig(); + const lastUpdatedAtByMarket = new Map(); + + console.log( + JSON.stringify( + { + service: 'trade-ingestor', + mode: 'dlob_stats_ticks', + startedAt: getIsoNow(), + hasuraUrl: cfg.hasuraUrl, + markets: cfg.markets, + pollMs: cfg.pollMs, + source: cfg.source, + }, + null, + 2 + ) + ); + + while (true) { + try { + const rows = await fetchStats(cfg); + const nowIso = getIsoNow(); + + const objects = []; + for (const r of rows) { + const marketName = String(r?.market_name || '').trim(); + if (!marketName) continue; + + const updatedAt = r?.updated_at ? String(r.updated_at) : ''; + if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue; + if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt); + + const marketIndex = toIntOrNull(r?.market_index) ?? 0; + const dlobIso = isoFromEpochMs(r?.ts); + const tsIso = dlobIso || nowIso; + + const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price); + const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice; + if (!oraclePrice) continue; + + objects.push({ + ts: tsIso, + market_index: marketIndex, + symbol: marketName, + oracle_price: oraclePrice, + mark_price: markPrice, + oracle_slot: r?.slot == null ? null : String(r.slot), + source: cfg.source, + raw: { + from: 'dlob_stats_latest', + market_name: marketName, + market_index: marketIndex, + dlob: { + ts: r?.ts ?? null, + slot: r?.slot ?? null, + best_bid_price: r?.best_bid_price ?? null, + best_ask_price: r?.best_ask_price ?? null, + mid_price: r?.mid_price ?? null, + updated_at: updatedAt || null, + }, + }, + }); + } + + const inserted = await insertTicks(cfg, objects); + if (inserted) { + console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`); + } + } catch (err) { + console.error(`[dlob-ticks] error: ${String(err?.message || err)}`); + await sleep(2_000); + } + + await sleep(cfg.pollMs); + } +} + +main().catch((err) => { + console.error(String(err?.stack || err)); + process.exitCode = 1; +}); diff --git a/environments/sol/trade-r001-canary/kustomization.yaml b/environments/sol/trade-r001-canary/kustomization.yaml index 61d2e02..ca36f42 100644 --- a/environments/sol/trade-r001-canary/kustomization.yaml +++ b/environments/sol/trade-r001-canary/kustomization.yaml @@ -19,6 +19,7 @@ resources: - trade-api-deployment.yaml - trade-frontend-service.yaml - trade-frontend-deployment.yaml + - trade-ingestor-deployment.yaml configMapGenerator: - name: postgres-initdb @@ -33,3 +34,6 @@ configMapGenerator: - name: trade-api-upstream files: - assets/api/server.mjs + - name: trade-dlob-ingestor-script + files: + - assets/ingestor/dlob-ingestor.mjs diff --git a/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh b/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh index 684e113..e272578 100755 --- a/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh +++ b/environments/sol/trade-r001-canary/scripts/sync-live-secrets.sh @@ -12,6 +12,7 @@ SECRETS=( trade-api trade-frontend-tokens trade-basic-auth + trade-ingestor-tokens ) ssh_source() { diff --git a/environments/sol/trade-r001-canary/trade-ingestor-deployment.yaml b/environments/sol/trade-r001-canary/trade-ingestor-deployment.yaml new file mode 100644 index 0000000..8db8c33 --- /dev/null +++ b/environments/sol/trade-r001-canary/trade-ingestor-deployment.yaml @@ -0,0 +1,60 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: trade-ingestor + namespace: trade-r001-canary +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: trade-ingestor + template: + metadata: + labels: + app.kubernetes.io/name: trade-ingestor + spec: + imagePullSecrets: + - name: gitea-registry + volumes: + - name: dlob-script + configMap: + name: trade-dlob-ingestor-script + - name: tokens + secret: + secretName: trade-ingestor-tokens + containers: + - name: ingestor + image: node:20-slim + imagePullPolicy: IfNotPresent + command: + - node + - /opt/dlob/dlob-ingestor.mjs + volumeMounts: + - name: dlob-script + mountPath: /opt/dlob/dlob-ingestor.mjs + subPath: dlob-ingestor.mjs + readOnly: true + - name: tokens + mountPath: /app/tokens + readOnly: true + env: + - name: HASURA_GRAPHQL_URL + value: http://hasura:8080/v1/graphql + - name: HASURA_ADMIN_SECRET + valueFrom: + secretKeyRef: + name: trade-hasura + key: HASURA_GRAPHQL_ADMIN_SECRET + - name: DLOB_MARKETS + value: SOL-PERP,PUMP-PERP + - name: TICKS_POLL_MS + value: "1000" + - name: TICKS_SOURCE + value: dlob_stats + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi