feat(staging): switch ingestor to fake

- Replaces Drift/RPC ingest with a seeded fake generator in the staging overlay.

- Seeds from existing ticks via trade-api /v1/ticks and writes new ticks via /v1/ingest/tick.
This commit is contained in:
u1
2026-01-09 01:13:35 +01:00
parent ac50ca2117
commit 0eef6bca12
3 changed files with 448 additions and 0 deletions

View File

@@ -0,0 +1,407 @@
import fs from 'node:fs';
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function envString(name, fallback) {
const v = process.env[name];
if (v == null) return fallback;
const s = String(v).trim();
return s ? s : fallback;
}
function envNumber(name, fallback) {
const v = process.env[name];
if (v == null) return fallback;
const n = Number(v);
return Number.isFinite(n) ? n : fallback;
}
function envInt(name, fallback, { min, max } = {}) {
const v = process.env[name];
if (v == null) return fallback;
const n = Number.parseInt(String(v), 10);
if (!Number.isInteger(n)) return fallback;
const nn = Math.max(min ?? n, Math.min(max ?? n, n));
return nn;
}
function readJson(filePath) {
try {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
} catch {
return undefined;
}
}
function readTokenFromFile(filePath) {
const json = readJson(filePath);
const raw = json?.token || json?.jwt || json?.authToken;
const tok = typeof raw === 'string' ? raw.trim() : '';
return tok ? tok : undefined;
}
function urlWithPath(baseUrl, path) {
const u = new URL(baseUrl);
const basePath = u.pathname && u.pathname !== '/' ? u.pathname.replace(/\/$/, '') : '';
u.pathname = `${basePath}${path}`;
return u;
}
async function httpJson(url, options) {
const res = await fetch(url, options);
const text = await res.text();
let json;
try {
json = JSON.parse(text);
} catch {
json = undefined;
}
return { ok: res.ok, status: res.status, json, text };
}
function mean(values) {
if (!values.length) return 0;
return values.reduce((a, b) => a + b, 0) / values.length;
}
function stddev(values) {
if (!values.length) return 0;
const m = mean(values);
const v = values.reduce((acc, x) => acc + (x - m) * (x - m), 0) / values.length;
return Math.sqrt(v);
}
function quantile(values, q) {
if (!values.length) return 0;
const sorted = values.slice().sort((a, b) => a - b);
const pos = (sorted.length - 1) * q;
const base = Math.floor(pos);
const rest = pos - base;
if (sorted[base + 1] == null) return sorted[base];
return sorted[base] + rest * (sorted[base + 1] - sorted[base]);
}
function randn() {
// BoxMuller
let u = 0;
let v = 0;
while (u === 0) u = Math.random();
while (v === 0) v = Math.random();
return Math.sqrt(-2 * Math.log(u)) * Math.cos(2 * Math.PI * v);
}
function clamp(v, min, max) {
if (v < min) return min;
if (v > max) return max;
return v;
}
function parsePriceFromTick(t) {
const mp = t?.mark_price ?? t?.markPrice;
const op = t?.oracle_price ?? t?.oraclePrice ?? t?.price;
const v = mp != null ? Number(mp) : Number(op);
return Number.isFinite(v) && v > 0 ? v : null;
}
function computeLogReturns(prices) {
const out = [];
for (let i = 1; i < prices.length; i++) {
const a = prices[i - 1];
const b = prices[i];
if (!(a > 0) || !(b > 0)) continue;
const r = Math.log(b / a);
if (Number.isFinite(r)) out.push(r);
}
return out;
}
class BlockSampler {
#values;
#minBlock;
#maxBlock;
#idx = 0;
#remaining = 0;
constructor(values, { minBlock, maxBlock }) {
this.#values = values.slice();
this.#minBlock = Math.max(1, minBlock);
this.#maxBlock = Math.max(this.#minBlock, maxBlock);
this.#startNewBlock();
}
#startNewBlock() {
const n = this.#values.length;
if (n <= 1) {
this.#idx = 0;
this.#remaining = 1;
return;
}
const start = Math.floor(Math.random() * n);
const span = this.#maxBlock - this.#minBlock + 1;
const len = this.#minBlock + Math.floor(Math.random() * span);
this.#idx = start;
this.#remaining = Math.min(len, n);
}
next() {
const n = this.#values.length;
if (n === 0) return 0;
if (this.#remaining <= 0) this.#startNewBlock();
const v = this.#values[this.#idx];
this.#idx = (this.#idx + 1) % n;
this.#remaining -= 1;
return Number.isFinite(v) ? v : 0;
}
}
async function fetchSeedTicks({ apiBase, readToken, symbol, source, limit }) {
const u = urlWithPath(apiBase, '/v1/ticks');
u.searchParams.set('symbol', symbol);
u.searchParams.set('limit', String(limit));
if (source) u.searchParams.set('source', source);
const res = await httpJson(u.toString(), {
method: 'GET',
headers: {
authorization: `Bearer ${readToken}`,
},
});
if (!res.ok) throw new Error(`seed_ticks_http_${res.status}: ${res.text}`);
if (!res.json?.ok) throw new Error(`seed_ticks_error: ${res.json?.error || res.text}`);
const ticks = Array.isArray(res.json?.ticks) ? res.json.ticks : [];
return ticks;
}
async function ingestTick({ apiBase, writeToken, tick }) {
const u = urlWithPath(apiBase, '/v1/ingest/tick');
const res = await httpJson(u.toString(), {
method: 'POST',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${writeToken}`,
},
body: JSON.stringify(tick),
});
if (!res.ok) throw new Error(`ingest_http_${res.status}: ${res.text}`);
if (!res.json?.ok) throw new Error(`ingest_error: ${res.json?.error || res.text}`);
return res.json?.id || null;
}
function formatNumeric(value) {
if (!Number.isFinite(value)) return '0';
// Avoid scientific notation for small values while keeping reasonable precision.
const abs = Math.abs(value);
if (abs === 0) return '0';
if (abs >= 1) return value.toFixed(8).replace(/\.?0+$/, '');
if (abs >= 0.01) return value.toFixed(10).replace(/\.?0+$/, '');
if (abs >= 0.0001) return value.toFixed(12).replace(/\.?0+$/, '');
return value.toFixed(16).replace(/\.?0+$/, '');
}
async function main() {
const apiBase = envString('INGEST_API_URL', 'http://trade-api:8787');
const symbol = envString('MARKET_NAME', envString('SYMBOL', 'PUMP-PERP'));
const source = envString('SOURCE', 'drift_oracle');
const intervalMs = envInt('INTERVAL_MS', 1000, { min: 50, max: 60_000 });
const readTokenFile = envString('FAKE_READ_TOKEN_FILE', envString('READ_TOKEN_FILE', '/tokens/read.json'));
const writeTokenFile = envString('FAKE_WRITE_TOKEN_FILE', envString('WRITE_TOKEN_FILE', '/app/tokens/alg.json'));
const seedLimit = envInt('FAKE_SEED_LIMIT', 5000, { min: 50, max: 5000 });
const seedSourceRaw = process.env.FAKE_SEED_SOURCE;
const seedSource = seedSourceRaw == null ? source : String(seedSourceRaw).trim();
const minBlock = envInt('FAKE_BLOCK_MIN', 30, { min: 1, max: 5000 });
const maxBlock = envInt('FAKE_BLOCK_MAX', 240, { min: 1, max: 5000 });
const volScale = envNumber('FAKE_VOL_SCALE', 1);
const noiseScale = envNumber('FAKE_NOISE_SCALE', 0.15);
const meanReversion = envNumber('FAKE_MEAN_REVERSION', 0.001);
const markNoiseBps = envNumber('FAKE_MARK_NOISE_BPS', 2);
const logEvery = envInt('FAKE_LOG_EVERY', 30, { min: 1, max: 10_000 });
const marketIndexEnv = process.env.MARKET_INDEX;
const marketIndexFallback = Number.isInteger(Number(marketIndexEnv)) ? Number(marketIndexEnv) : 0;
const startPriceEnv = envNumber('FAKE_START_PRICE', 0);
const readToken = readTokenFromFile(readTokenFile);
const writeToken = readTokenFromFile(writeTokenFile);
if (!writeToken) throw new Error(`Missing write token (expected JSON token at ${writeTokenFile})`);
let seedTicks = [];
let seedPrices = [];
let marketIndex = marketIndexFallback;
let seedFromTs = null;
let seedToTs = null;
if (!readToken) {
console.warn(`[fake-ingestor] No read token at ${readTokenFile}; running without seed data.`);
} else {
seedTicks = await fetchSeedTicks({
apiBase,
readToken,
symbol,
source: seedSource ? seedSource : undefined,
limit: seedLimit,
});
seedFromTs = seedTicks.length ? String(seedTicks[0]?.ts || '') : null;
seedToTs = seedTicks.length ? String(seedTicks[seedTicks.length - 1]?.ts || '') : null;
for (const t of seedTicks) {
const p = parsePriceFromTick(t);
if (p != null) seedPrices.push(p);
}
const lastTick = seedTicks.length ? seedTicks[seedTicks.length - 1] : null;
const mi = lastTick?.market_index ?? lastTick?.marketIndex;
if (Number.isInteger(Number(mi))) marketIndex = Number(mi);
}
if (!seedPrices.length && startPriceEnv > 0) {
seedPrices = [startPriceEnv];
}
if (!seedPrices.length) {
throw new Error(
'No seed prices available. Provide FAKE_START_PRICE (and optionally MARKET_INDEX) or mount a read token to seed from /v1/ticks.'
);
}
const returnsRaw = computeLogReturns(seedPrices);
const mu = mean(returnsRaw);
const sigma = stddev(returnsRaw);
const center = envString('FAKE_CENTER_RETURNS', '1') !== '0';
const returns = returnsRaw.length
? center
? returnsRaw.map((r) => r - mu)
: returnsRaw
: [];
const sampler = new BlockSampler(returns.length ? returns : [0], { minBlock, maxBlock });
const p05 = quantile(seedPrices, 0.05);
const p50 = quantile(seedPrices, 0.5);
const p95 = quantile(seedPrices, 0.95);
const clampMin = p05 > 0 ? p05 * 0.8 : Math.min(...seedPrices) * 0.8;
const clampMax = p95 > 0 ? p95 * 1.2 : Math.max(...seedPrices) * 1.2;
let logPrice = Math.log(seedPrices[seedPrices.length - 1]);
const targetLog = Math.log(p50 > 0 ? p50 : seedPrices[seedPrices.length - 1]);
console.log(
JSON.stringify(
{
service: 'trade-fake-ingestor',
apiBase,
symbol,
source,
intervalMs,
marketIndex,
seed: {
ok: Boolean(seedTicks.length),
limit: seedLimit,
source: seedSource || null,
ticks: seedTicks.length,
prices: seedPrices.length,
fromTs: seedFromTs,
toTs: seedToTs,
},
model: {
type: 'block-bootstrap-returns',
centerReturns: center,
mu,
sigma,
volScale,
noiseScale,
meanReversion,
block: { minBlock, maxBlock },
clamp: { min: clampMin, max: clampMax },
},
},
null,
2
)
);
let stopping = false;
process.on('SIGINT', () => {
stopping = true;
});
process.on('SIGTERM', () => {
stopping = true;
});
let tickCount = 0;
let nextAt = Date.now();
while (!stopping) {
const now = Date.now();
if (now < nextAt) await sleep(nextAt - now);
nextAt += intervalMs;
const r = sampler.next();
const noise = (Number.isFinite(sigma) ? sigma : 0) * noiseScale * randn();
const revert = Number.isFinite(meanReversion) ? meanReversion * (targetLog - logPrice) : 0;
const step = (Number.isFinite(r) ? r : 0) * volScale + noise + revert;
logPrice += step;
let oraclePrice = Math.exp(logPrice);
if (Number.isFinite(clampMin) && Number.isFinite(clampMax) && clampMax > clampMin) {
oraclePrice = clamp(oraclePrice, clampMin, clampMax);
logPrice = Math.log(oraclePrice);
}
const markNoise = (markNoiseBps / 10_000) * randn();
const markPrice = oraclePrice * (1 + markNoise);
const ts = new Date().toISOString();
const tick = {
ts,
market_index: marketIndex,
symbol,
oracle_price: formatNumeric(oraclePrice),
mark_price: formatNumeric(markPrice),
source,
raw: {
fake: true,
model: 'block-bootstrap-returns',
seeded: seedTicks.length
? { symbol, source: seedSource || null, limit: seedLimit, fromTs: seedFromTs, toTs: seedToTs }
: { symbol, source: null, limit: 0, fromTs: null, toTs: null },
},
};
try {
await ingestTick({ apiBase, writeToken, tick });
tickCount += 1;
if (tickCount % logEvery === 0) {
console.log(
`[fake-ingestor] ok ticks=${tickCount} ts=${ts} px=${tick.oracle_price} mark=${tick.mark_price} clamp=[${formatNumeric(
clampMin
)},${formatNumeric(clampMax)}]`
);
}
} catch (err) {
console.warn(`[fake-ingestor] ingest failed: ${String(err?.message || err)}`);
await sleep(Math.min(5000, Math.max(250, intervalMs)));
}
}
console.log('[fake-ingestor] stopped');
}
main().catch((err) => {
console.error(String(err?.stack || err));
process.exitCode = 1;
});

View File

@@ -0,0 +1,32 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: trade-ingestor
spec:
template:
spec:
containers:
- name: ingestor
env:
- name: FAKE_READ_TOKEN_FILE
value: "/tokens/read.json"
- name: FAKE_WRITE_TOKEN_FILE
value: "/app/tokens/alg.json"
- name: FAKE_SEED_LIMIT
value: "5000"
command: ["node"]
args: ["/opt/fake/fake-ingestor.mjs"]
volumeMounts:
- name: fake-script
mountPath: /opt/fake
readOnly: true
- name: read-tokens
mountPath: /tokens
readOnly: true
volumes:
- name: fake-script
configMap:
name: trade-fake-ingestor-script
- name: read-tokens
secret:
secretName: trade-frontend-tokens

View File

@@ -12,6 +12,15 @@ resources:
patchesStrategicMerge:
- hasura-patch.yaml
- frontend-auth-patch.yaml
- ingestor-fake-patch.yaml
configMapGenerator:
- name: trade-fake-ingestor-script
files:
- fake-ingestor.mjs
generatorOptions:
disableNameSuffixHash: true
commonLabels:
env: staging