feat(canary): add ingestor rollout on sol
Some checks failed
deploy-trade-r001-canary / apply (push) Failing after 56s
Some checks failed
deploy-trade-r001-canary / apply (push) Failing after 56s
This commit is contained in:
@@ -34,7 +34,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
KUBECONFIG: /tmp/kubeconfig
|
KUBECONFIG: /tmp/kubeconfig
|
||||||
run: |
|
run: |
|
||||||
kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth gitea-registry
|
kubectl -n trade-r001-canary get secret trade-postgres trade-hasura trade-api trade-frontend-tokens trade-basic-auth trade-ingestor-tokens gitea-registry
|
||||||
|
|
||||||
- name: Recreate bootstrap jobs
|
- name: Recreate bootstrap jobs
|
||||||
env:
|
env:
|
||||||
@@ -54,7 +54,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
KUBECONFIG: /tmp/kubeconfig
|
KUBECONFIG: /tmp/kubeconfig
|
||||||
run: |
|
run: |
|
||||||
kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend
|
kubectl -n trade-r001-canary rollout restart deploy/hasura deploy/trade-api deploy/trade-frontend deploy/trade-ingestor
|
||||||
|
|
||||||
- name: Wait for database and metadata bootstrap
|
- name: Wait for database and metadata bootstrap
|
||||||
env:
|
env:
|
||||||
@@ -70,8 +70,19 @@ jobs:
|
|||||||
kubectl -n trade-r001-canary rollout status deploy/hasura --timeout=300s
|
kubectl -n trade-r001-canary rollout status deploy/hasura --timeout=300s
|
||||||
kubectl -n trade-r001-canary rollout status deploy/trade-api --timeout=300s
|
kubectl -n trade-r001-canary rollout status deploy/trade-api --timeout=300s
|
||||||
kubectl -n trade-r001-canary rollout status deploy/trade-frontend --timeout=300s
|
kubectl -n trade-r001-canary rollout status deploy/trade-frontend --timeout=300s
|
||||||
|
kubectl -n trade-r001-canary rollout status deploy/trade-ingestor --timeout=300s
|
||||||
kubectl -n trade-r001-canary get deploy,pods -o wide
|
kubectl -n trade-r001-canary get deploy,pods -o wide
|
||||||
|
|
||||||
|
- name: Verify trade-ingestor runtime
|
||||||
|
env:
|
||||||
|
KUBECONFIG: /tmp/kubeconfig
|
||||||
|
run: |
|
||||||
|
sleep 10
|
||||||
|
pod_name="$(kubectl -n trade-r001-canary get pod -l app.kubernetes.io/name=trade-ingestor -o jsonpath='{.items[0].metadata.name}')"
|
||||||
|
restart_count="$(kubectl -n trade-r001-canary get pod "$pod_name" -o jsonpath='{.status.containerStatuses[0].restartCount}')"
|
||||||
|
test "${restart_count}" = "0"
|
||||||
|
kubectl -n trade-r001-canary logs "$pod_name" --tail=20
|
||||||
|
|
||||||
- name: Verify canary namespace connectivity
|
- name: Verify canary namespace connectivity
|
||||||
env:
|
env:
|
||||||
KUBECONFIG: /tmp/kubeconfig
|
KUBECONFIG: /tmp/kubeconfig
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ Minimal canary namespace for migration baseline `R001` on `sol`.
|
|||||||
- Reserve a dedicated namespace for the first reconstructed trade deployment.
|
- Reserve a dedicated namespace for the first reconstructed trade deployment.
|
||||||
- Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land.
|
- Put hard upper bounds on namespace-level CPU, memory, object count, and PVC growth before application manifests land.
|
||||||
- Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`.
|
- Verify that workloads in the namespace can resolve and reach the shared `trade-infra` services for `Postgres` and `Redis`.
|
||||||
- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, and `trade-frontend`.
|
- Recreate the `R001` application surface in a controlled way: `Hasura`, `trade-api`, `trade-frontend`, and the first canary `trade-ingestor` path.
|
||||||
|
|
||||||
## Current Guardrails
|
## Current Guardrails
|
||||||
|
|
||||||
@@ -43,7 +43,9 @@ Minimal canary namespace for migration baseline `R001` on `sol`.
|
|||||||
- The canary workflow re-runs:
|
- The canary workflow re-runs:
|
||||||
- `postgres-migrate`
|
- `postgres-migrate`
|
||||||
- `hasura-bootstrap`
|
- `hasura-bootstrap`
|
||||||
before it waits for `Hasura`, `trade-api`, and `trade-frontend` to become healthy.
|
before it waits for `Hasura`, `trade-api`, `trade-frontend`, and `trade-ingestor` to become healthy.
|
||||||
|
- The current canary `trade-ingestor` is intentionally pinned to the schema already reconstructed on `sol` and reads from `dlob_stats_latest`.
|
||||||
|
- The exact live `R001` ingestor path that reads `dlob_*_derived_latest` remains a follow-up substep after the DLOB writer chain is reconstructed.
|
||||||
|
|
||||||
## Operator Flow
|
## Operator Flow
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,218 @@
|
|||||||
|
import process from 'node:process';
|
||||||
|
import { setTimeout as sleep } from 'node:timers/promises';
|
||||||
|
|
||||||
|
function getIsoNow() {
|
||||||
|
return new Date().toISOString();
|
||||||
|
}
|
||||||
|
|
||||||
|
function envString(name, fallback) {
|
||||||
|
const v = process.env[name];
|
||||||
|
if (v == null) return fallback;
|
||||||
|
const s = String(v).trim();
|
||||||
|
return s ? s : fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
function envInt(name, fallback, { min, max } = {}) {
|
||||||
|
const v = process.env[name];
|
||||||
|
if (v == null) return fallback;
|
||||||
|
const n = Number.parseInt(String(v), 10);
|
||||||
|
if (!Number.isFinite(n)) return fallback;
|
||||||
|
const low = typeof min === 'number' ? min : n;
|
||||||
|
const high = typeof max === 'number' ? max : n;
|
||||||
|
return Math.max(low, Math.min(high, n));
|
||||||
|
}
|
||||||
|
|
||||||
|
function envList(name, fallbackCsv) {
|
||||||
|
const raw = process.env[name] ?? fallbackCsv;
|
||||||
|
return String(raw)
|
||||||
|
.split(',')
|
||||||
|
.map((s) => s.trim())
|
||||||
|
.filter(Boolean);
|
||||||
|
}
|
||||||
|
|
||||||
|
function toIntOrNull(v) {
|
||||||
|
if (v == null) return null;
|
||||||
|
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
|
||||||
|
if (typeof v === 'string') {
|
||||||
|
const s = v.trim();
|
||||||
|
if (!s) return null;
|
||||||
|
const n = Number.parseInt(s, 10);
|
||||||
|
return Number.isFinite(n) ? n : null;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function numStr(v) {
|
||||||
|
if (v == null) return null;
|
||||||
|
if (typeof v === 'number') return Number.isFinite(v) ? String(v) : null;
|
||||||
|
if (typeof v === 'string') {
|
||||||
|
const s = v.trim();
|
||||||
|
return s ? s : null;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isoFromEpochMs(v) {
|
||||||
|
const n = typeof v === 'number' ? v : typeof v === 'string' ? Number(v.trim()) : NaN;
|
||||||
|
if (!Number.isFinite(n) || n <= 0) return null;
|
||||||
|
const d = new Date(n);
|
||||||
|
const ms = d.getTime();
|
||||||
|
if (!Number.isFinite(ms)) return null;
|
||||||
|
return d.toISOString();
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveConfig() {
|
||||||
|
const hasuraUrl = envString('HASURA_GRAPHQL_URL', 'http://hasura:8080/v1/graphql');
|
||||||
|
const hasuraAdminSecret = envString('HASURA_ADMIN_SECRET', '');
|
||||||
|
if (!hasuraAdminSecret) throw new Error('Missing HASURA_ADMIN_SECRET');
|
||||||
|
|
||||||
|
const markets = envList('DLOB_MARKETS', 'SOL-PERP,PUMP-PERP');
|
||||||
|
const pollMs = envInt('TICKS_POLL_MS', 1000, { min: 250, max: 60_000 });
|
||||||
|
const source = envString('TICKS_SOURCE', 'dlob_stats');
|
||||||
|
|
||||||
|
return { hasuraUrl, hasuraAdminSecret, markets, pollMs, source };
|
||||||
|
}
|
||||||
|
|
||||||
|
async function graphqlRequest(cfg, query, variables) {
|
||||||
|
const res = await fetch(cfg.hasuraUrl, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'content-type': 'application/json',
|
||||||
|
'x-hasura-admin-secret': cfg.hasuraAdminSecret,
|
||||||
|
},
|
||||||
|
body: JSON.stringify({ query, variables }),
|
||||||
|
signal: AbortSignal.timeout(10_000),
|
||||||
|
});
|
||||||
|
|
||||||
|
const text = await res.text();
|
||||||
|
if (!res.ok) throw new Error(`Hasura HTTP ${res.status}: ${text}`);
|
||||||
|
|
||||||
|
let json;
|
||||||
|
try {
|
||||||
|
json = JSON.parse(text);
|
||||||
|
} catch {
|
||||||
|
throw new Error(`Hasura: invalid json: ${text}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (json.errors?.length) throw new Error(json.errors.map((e) => e.message).join(' | '));
|
||||||
|
return json.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function fetchStats(cfg) {
|
||||||
|
const query = `
|
||||||
|
query DlobStatsLatest($markets: [String!]!) {
|
||||||
|
dlob_stats_latest(where: { market_name: { _in: $markets } }) {
|
||||||
|
market_name
|
||||||
|
market_index
|
||||||
|
ts
|
||||||
|
slot
|
||||||
|
oracle_price
|
||||||
|
mark_price
|
||||||
|
mid_price
|
||||||
|
best_bid_price
|
||||||
|
best_ask_price
|
||||||
|
updated_at
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`;
|
||||||
|
|
||||||
|
const data = await graphqlRequest(cfg, query, { markets: cfg.markets });
|
||||||
|
return Array.isArray(data?.dlob_stats_latest) ? data.dlob_stats_latest : [];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function insertTicks(cfg, objects) {
|
||||||
|
if (!objects.length) return 0;
|
||||||
|
|
||||||
|
const mutation = `
|
||||||
|
mutation InsertTicks($objects: [drift_ticks_insert_input!]!) {
|
||||||
|
insert_drift_ticks(objects: $objects) { affected_rows }
|
||||||
|
}
|
||||||
|
`;
|
||||||
|
|
||||||
|
const data = await graphqlRequest(cfg, mutation, { objects });
|
||||||
|
return Number(data?.insert_drift_ticks?.affected_rows || 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function main() {
|
||||||
|
const cfg = resolveConfig();
|
||||||
|
const lastUpdatedAtByMarket = new Map();
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
service: 'trade-ingestor',
|
||||||
|
mode: 'dlob_stats_ticks',
|
||||||
|
startedAt: getIsoNow(),
|
||||||
|
hasuraUrl: cfg.hasuraUrl,
|
||||||
|
markets: cfg.markets,
|
||||||
|
pollMs: cfg.pollMs,
|
||||||
|
source: cfg.source,
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
const rows = await fetchStats(cfg);
|
||||||
|
const nowIso = getIsoNow();
|
||||||
|
|
||||||
|
const objects = [];
|
||||||
|
for (const r of rows) {
|
||||||
|
const marketName = String(r?.market_name || '').trim();
|
||||||
|
if (!marketName) continue;
|
||||||
|
|
||||||
|
const updatedAt = r?.updated_at ? String(r.updated_at) : '';
|
||||||
|
if (updatedAt && lastUpdatedAtByMarket.get(marketName) === updatedAt) continue;
|
||||||
|
if (updatedAt) lastUpdatedAtByMarket.set(marketName, updatedAt);
|
||||||
|
|
||||||
|
const marketIndex = toIntOrNull(r?.market_index) ?? 0;
|
||||||
|
const dlobIso = isoFromEpochMs(r?.ts);
|
||||||
|
const tsIso = dlobIso || nowIso;
|
||||||
|
|
||||||
|
const oraclePrice = numStr(r?.oracle_price) || numStr(r?.mark_price) || numStr(r?.mid_price);
|
||||||
|
const markPrice = numStr(r?.mark_price) || numStr(r?.mid_price) || oraclePrice;
|
||||||
|
if (!oraclePrice) continue;
|
||||||
|
|
||||||
|
objects.push({
|
||||||
|
ts: tsIso,
|
||||||
|
market_index: marketIndex,
|
||||||
|
symbol: marketName,
|
||||||
|
oracle_price: oraclePrice,
|
||||||
|
mark_price: markPrice,
|
||||||
|
oracle_slot: r?.slot == null ? null : String(r.slot),
|
||||||
|
source: cfg.source,
|
||||||
|
raw: {
|
||||||
|
from: 'dlob_stats_latest',
|
||||||
|
market_name: marketName,
|
||||||
|
market_index: marketIndex,
|
||||||
|
dlob: {
|
||||||
|
ts: r?.ts ?? null,
|
||||||
|
slot: r?.slot ?? null,
|
||||||
|
best_bid_price: r?.best_bid_price ?? null,
|
||||||
|
best_ask_price: r?.best_ask_price ?? null,
|
||||||
|
mid_price: r?.mid_price ?? null,
|
||||||
|
updated_at: updatedAt || null,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const inserted = await insertTicks(cfg, objects);
|
||||||
|
if (inserted) {
|
||||||
|
console.log(`[dlob-ticks] inserted=${inserted} ts=${nowIso}`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`[dlob-ticks] error: ${String(err?.message || err)}`);
|
||||||
|
await sleep(2_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
await sleep(cfg.pollMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
main().catch((err) => {
|
||||||
|
console.error(String(err?.stack || err));
|
||||||
|
process.exitCode = 1;
|
||||||
|
});
|
||||||
@@ -19,6 +19,7 @@ resources:
|
|||||||
- trade-api-deployment.yaml
|
- trade-api-deployment.yaml
|
||||||
- trade-frontend-service.yaml
|
- trade-frontend-service.yaml
|
||||||
- trade-frontend-deployment.yaml
|
- trade-frontend-deployment.yaml
|
||||||
|
- trade-ingestor-deployment.yaml
|
||||||
|
|
||||||
configMapGenerator:
|
configMapGenerator:
|
||||||
- name: postgres-initdb
|
- name: postgres-initdb
|
||||||
@@ -33,3 +34,6 @@ configMapGenerator:
|
|||||||
- name: trade-api-upstream
|
- name: trade-api-upstream
|
||||||
files:
|
files:
|
||||||
- assets/api/server.mjs
|
- assets/api/server.mjs
|
||||||
|
- name: trade-dlob-ingestor-script
|
||||||
|
files:
|
||||||
|
- assets/ingestor/dlob-ingestor.mjs
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ SECRETS=(
|
|||||||
trade-api
|
trade-api
|
||||||
trade-frontend-tokens
|
trade-frontend-tokens
|
||||||
trade-basic-auth
|
trade-basic-auth
|
||||||
|
trade-ingestor-tokens
|
||||||
)
|
)
|
||||||
|
|
||||||
ssh_source() {
|
ssh_source() {
|
||||||
|
|||||||
@@ -0,0 +1,60 @@
|
|||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: trade-ingestor
|
||||||
|
namespace: trade-r001-canary
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app.kubernetes.io/name: trade-ingestor
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app.kubernetes.io/name: trade-ingestor
|
||||||
|
spec:
|
||||||
|
imagePullSecrets:
|
||||||
|
- name: gitea-registry
|
||||||
|
volumes:
|
||||||
|
- name: dlob-script
|
||||||
|
configMap:
|
||||||
|
name: trade-dlob-ingestor-script
|
||||||
|
- name: tokens
|
||||||
|
secret:
|
||||||
|
secretName: trade-ingestor-tokens
|
||||||
|
containers:
|
||||||
|
- name: ingestor
|
||||||
|
image: node:20-slim
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
|
command:
|
||||||
|
- node
|
||||||
|
- /opt/dlob/dlob-ingestor.mjs
|
||||||
|
volumeMounts:
|
||||||
|
- name: dlob-script
|
||||||
|
mountPath: /opt/dlob/dlob-ingestor.mjs
|
||||||
|
subPath: dlob-ingestor.mjs
|
||||||
|
readOnly: true
|
||||||
|
- name: tokens
|
||||||
|
mountPath: /app/tokens
|
||||||
|
readOnly: true
|
||||||
|
env:
|
||||||
|
- name: HASURA_GRAPHQL_URL
|
||||||
|
value: http://hasura:8080/v1/graphql
|
||||||
|
- name: HASURA_ADMIN_SECRET
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: trade-hasura
|
||||||
|
key: HASURA_GRAPHQL_ADMIN_SECRET
|
||||||
|
- name: DLOB_MARKETS
|
||||||
|
value: SOL-PERP,PUMP-PERP
|
||||||
|
- name: TICKS_POLL_MS
|
||||||
|
value: "1000"
|
||||||
|
- name: TICKS_SOURCE
|
||||||
|
value: dlob_stats
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: 100m
|
||||||
|
memory: 128Mi
|
||||||
|
limits:
|
||||||
|
cpu: 500m
|
||||||
|
memory: 512Mi
|
||||||
Reference in New Issue
Block a user