import dns from 'node:dns'; import fs from 'node:fs'; import path from 'node:path'; import process from 'node:process'; import { fileURLToPath } from 'node:url'; import { Connection, Keypair, PublicKey } from '@solana/web3.js'; import { BulkAccountLoader, calculateReservePrice, DriftClient, MarketType, OracleInfo, PRICE_PRECISION, configs, convertToNumber, } from '@drift-labs/sdk'; dns.setDefaultResultOrder('ipv4first'); process.stdout.on('error', (err: any) => { if (err?.code === 'EPIPE') process.exit(0); }); const HELIUS_RPC_BASES = [ 'https://mainnet.helius-rpc.com/?api-key=', 'https://rpc.helius.xyz/?api-key=', ]; const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url)); const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '..'); function readArg(name: string): string | undefined { const idx = process.argv.indexOf(`--${name}`); if (idx === -1 || idx + 1 >= process.argv.length) return undefined; return process.argv[idx + 1]; } function readFlag(name: string): boolean { return process.argv.includes(`--${name}`); } function readIntArg(name: string, fallback: number): number { const v = readArg(name); if (!v) return fallback; const n = Number.parseInt(v, 10); if (!Number.isInteger(n)) throw new Error(`Invalid --${name}: ${v}`); return n; } function readJsonFile(filePath: string): unknown | undefined { if (!fs.existsSync(filePath)) return undefined; const raw = fs.readFileSync(filePath, 'utf8'); return JSON.parse(raw); } type HasuraTokensFile = { adminSecret?: string; hasuraAdminSecret?: string; graphqlUrl?: string; apiUrl?: string; }; function readHasuraTokensFile(): HasuraTokensFile | undefined { return readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'hasura.json')) as HasuraTokensFile | undefined; } type ApiTokensFile = { apiUrl?: string; url?: string; }; function readApiTokensFile(): ApiTokensFile | undefined { return readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'api.json')) as ApiTokensFile | undefined; } type AlgTokenFile = { token?: string; jwt?: string; authToken?: string; }; function readAlgTokenFile(filePath: string): AlgTokenFile | undefined { return readJsonFile(filePath) as AlgTokenFile | undefined; } function resolveAuthToken(): string | undefined { const cli = readArg('auth-token'); if (cli) return cli; if (process.env.INGEST_AUTH_TOKEN) return process.env.INGEST_AUTH_TOKEN; if (process.env.API_AUTH_TOKEN) return process.env.API_AUTH_TOKEN; if (process.env.HASURA_AUTH_TOKEN) return process.env.HASURA_AUTH_TOKEN; const tokenFilePath = readArg('auth-token-file') || process.env.HASURA_AUTH_TOKEN_FILE || path.join(PROJECT_ROOT, 'tokens', 'alg.json'); const fromFile = readAlgTokenFile(tokenFilePath); const tok = fromFile?.token || fromFile?.jwt || fromFile?.authToken; return typeof tok === 'string' && tok.length ? tok : undefined; } function redactSecrets(text: string): string { return text.replace(/api-key=([^&\s]+)/gi, 'api-key=***'); } function resolveRpcCandidates(): string[] { const cli = readArg('rpc'); if (cli) return [cli]; if (process.env.RPC_URL) return [process.env.RPC_URL]; const fromTokens = readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'heliusN.json')) as | { heliusApiKey?: string; rpcUrl?: string } | undefined; if (fromTokens?.rpcUrl) return [fromTokens.rpcUrl]; if (fromTokens?.heliusApiKey) return HELIUS_RPC_BASES.map((b) => `${b}${fromTokens.heliusApiKey}`); const fallback = readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'helius.json')) as | { heliusApiKey?: string; rpcUrl?: string } | undefined; if (fallback?.rpcUrl) return [fallback.rpcUrl]; if (fallback?.heliusApiKey) return HELIUS_RPC_BASES.map((b) => `${b}${fallback.heliusApiKey}`); throw new Error('RPC not configured. Put heliusApiKey in tokens/heliusN.json (or tokens/helius.json).'); } class NoBatchBulkAccountLoader extends BulkAccountLoader { // Helius free plans may block JSON-RPC *batch* requests. The SDK's default BulkAccountLoader // uses `_rpcBatchRequest`; we override to use `getMultipleAccountsInfoAndContext` instead. async loadChunk(accountsToLoadChunks: any[][]): Promise { if (accountsToLoadChunks.length === 0) return; for (const accountsToLoadChunk of accountsToLoadChunks) { const activeAccounts = accountsToLoadChunk.filter((a) => a?.callbacks?.size > 0); if (activeAccounts.length === 0) continue; const pubkeys = activeAccounts.map((a) => a.publicKey); const res = await (this.connection as any).getMultipleAccountsInfoAndContext(pubkeys, this.commitment); const newSlot = res?.context?.slot ?? 0; if (newSlot > this.mostRecentSlot) this.mostRecentSlot = newSlot; const values = (res?.value ?? []) as Array<{ data: Buffer } | null>; activeAccounts.forEach((accountToLoad, idx) => { const key = accountToLoad.publicKey.toBase58(); const old = this.bufferAndSlotMap.get(key); if (old && newSlot < old.slot) return; const info = values[idx] ?? null; const newBuffer: Buffer | undefined = info ? (info.data as any) : undefined; if (!old) { this.bufferAndSlotMap.set(key, { slot: newSlot, buffer: newBuffer }); this.handleAccountCallbacks(accountToLoad, newBuffer as any, newSlot); return; } const oldBuffer = old.buffer; if (newBuffer && (!oldBuffer || !newBuffer.equals(oldBuffer))) { this.bufferAndSlotMap.set(key, { slot: newSlot, buffer: newBuffer }); this.handleAccountCallbacks(accountToLoad, newBuffer as any, newSlot); } }); } } } class KeypairWallet { payer: Keypair; publicKey: PublicKey; constructor(keypair: Keypair) { this.payer = keypair; this.publicKey = keypair.publicKey; } async signTransaction(tx: any): Promise { tx.partialSign(this.payer); return tx; } async signAllTransactions(txs: any[]): Promise { return txs.map((tx) => { tx.partialSign(this.payer); return tx; }); } } type Tick = { ts: string; market_index: number; symbol: string; oracle_price: string; mark_price: string; oracle_slot?: number; source: string; raw?: Record; }; function getIsoNow(): string { return new Date().toISOString(); } function resolveHasuraUrl(): string { const cli = readArg('api-url'); if (cli) return cli; if (process.env.HASURA_GRAPHQL_URL) return process.env.HASURA_GRAPHQL_URL; const fromTokens = readHasuraTokensFile(); if (fromTokens?.graphqlUrl) return fromTokens.graphqlUrl; if (fromTokens?.apiUrl) return fromTokens.apiUrl; return 'http://localhost:8080/v1/graphql'; } type IngestVia = 'hasura' | 'api'; function resolveIngestVia(): IngestVia { const cli = readArg('ingest-via'); const env = process.env.INGEST_VIA; const v = (cli || env || 'hasura').toLowerCase(); if (v === 'hasura' || v === 'api') return v; throw new Error(`Invalid --ingest-via: ${cli || env}`); } function resolveIngestApiTickUrl(): string | undefined { const cli = readArg('ingest-url') || readArg('ingest-api-url'); if (cli) { const u = new URL(cli); if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick'; return u.toString(); } if (process.env.INGEST_API_URL) { const u = new URL(process.env.INGEST_API_URL); if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick'; return u.toString(); } const fromTokens = readApiTokensFile(); const base = fromTokens?.apiUrl || fromTokens?.url; if (!base) return undefined; const u = new URL(base); if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick'; return u.toString(); } function resolveAdminSecret(): string | undefined { const cli = readArg('admin-secret'); if (cli) return cli; if (process.env.HASURA_ADMIN_SECRET) return process.env.HASURA_ADMIN_SECRET; const fromTokens = readHasuraTokensFile(); return fromTokens?.adminSecret || fromTokens?.hasuraAdminSecret; } type HasuraAuth = { bearerToken?: string; adminSecret?: string }; function buildHasuraGraphqlHeaders(auth: HasuraAuth): Record { const headers: Record = { 'content-type': 'application/json' }; if (auth.bearerToken) headers.authorization = `Bearer ${auth.bearerToken}`; else if (auth.adminSecret) headers['x-hasura-admin-secret'] = auth.adminSecret; return headers; } async function apiIngestTick(ingestTickUrl: string, bearerToken: string | undefined, tick: Tick) { if (!bearerToken) { throw new Error('Missing ingest auth token. Pass --auth-token / INGEST_AUTH_TOKEN or create tokens/alg.json.'); } const res = await fetch(ingestTickUrl, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${bearerToken}`, }, body: JSON.stringify(tick), }); const text = await res.text(); if (!res.ok) throw new Error(`Ingest API HTTP ${res.status}: ${text}`); const json = JSON.parse(text) as { ok?: boolean; error?: string }; if (!json.ok) throw new Error(`Ingest API error: ${json.error || text}`); } async function hasuraInsertTick(hasuraUrl: string, auth: HasuraAuth, tick: Tick) { const query = ` mutation InsertTick($object: drift_ticks_insert_input!) { insert_drift_ticks_one(object: $object) { id } } `; const headers = buildHasuraGraphqlHeaders(auth); const attempt = async () => { const res = await fetch(hasuraUrl, { method: 'POST', headers, body: JSON.stringify({ query, variables: { object: tick } }), }); const bodyText = await res.text(); if (!res.ok) { throw new Error(`Hasura HTTP ${res.status}: ${bodyText}`); } const json = JSON.parse(bodyText) as { errors?: Array<{ message: string }>; data?: unknown }; if (json.errors?.length) { throw new Error(`Hasura GraphQL error: ${json.errors.map((e) => e.message).join(' | ')}`); } }; try { await attempt(); } catch (err: any) { const msg = String(err?.message || err); const looksUntracked = msg.includes('no mutations exist') || msg.includes('Cannot query field "insert_drift_ticks_one"') || msg.includes('Unknown field "insert_drift_ticks_one"') || msg.includes('not found in type:') || msg.includes('not found on type:'); if (!looksUntracked) throw err; await ensureHasuraDriftTicksTracked(hasuraUrl, auth.adminSecret); await attempt(); } } async function ensureHasuraDriftTicksTracked(hasuraUrl: string, adminSecret: string | undefined) { if (!adminSecret) { throw new Error( 'Hasura mutations missing (table likely not tracked). ' + 'Start the stack with hasura-bootstrap or track drift_ticks in the Hasura console (requires admin secret).' ); } const url = new URL(hasuraUrl); url.pathname = '/v1/metadata'; url.search = ''; const headers: Record = { 'content-type': 'application/json' }; headers['x-hasura-admin-secret'] = adminSecret; // Track the table so Hasura exposes insert/update mutations. // If it's already tracked, Hasura returns an error; we treat that as success. const track = await fetch(url.toString(), { method: 'POST', headers, body: JSON.stringify({ type: 'pg_track_table', args: { source: 'default', table: { schema: 'public', name: 'drift_ticks' }, }, }), }); if (track.ok) return; const text = await track.text(); const alreadyTracked = text.includes('already tracked') || text.includes('already exists') || text.includes('already present') || text.includes('already been tracked'); if (alreadyTracked) return; throw new Error(`Hasura metadata track failed (${track.status}): ${text}`); } type DriftBuildOptions = { pollingFrequencyMs: number; perpMarketIndexes?: number[]; oracleInfos?: OracleInfo[]; }; async function buildDriftClient(opts: DriftBuildOptions): Promise { const rpcCandidates = resolveRpcCandidates(); const errors: string[] = []; for (const rpcUrl of rpcCandidates) { const connection = new Connection(rpcUrl, 'confirmed'); let driftClient: DriftClient | undefined; try { await connection.getVersion(); const wallet = new KeypairWallet(Keypair.generate()); const authority = wallet.publicKey; const bulkAccountLoader = new NoBatchBulkAccountLoader( connection as any, 'confirmed', opts.pollingFrequencyMs ); driftClient = new DriftClient({ connection, wallet, env: 'mainnet-beta', authority, skipLoadUsers: true, perpMarketIndexes: opts.perpMarketIndexes, oracleInfos: opts.oracleInfos, accountSubscription: { type: 'polling', accountLoader: bulkAccountLoader, }, }); await driftClient.subscribe(); return driftClient; } catch (err: any) { if (driftClient) { try { await driftClient.unsubscribe(); } catch { // ignore } } errors.push(String(err?.stack || err)); } } throw new Error( `RPC connection failed for all candidates.\n\n` + `Tried:\n- ${rpcCandidates.map((u) => redactSecrets(u)).join('\n- ')}\n\n` + `Errors:\n- ${errors.join('\n\n- ')}\n` ); } type PerpMarketResolution = { marketIndex: number; symbol: string; oracleInfo: OracleInfo; }; function resolvePerpMarketFromSdkConfig( marketName: string | undefined, marketIndex: number | undefined ): PerpMarketResolution | undefined { const cfg = configs['mainnet-beta']; const markets = cfg.PERP_MARKETS; if (marketName) { const wanted = marketName.toUpperCase(); const m = markets.find((x) => x.symbol.toUpperCase() === wanted); if (!m) return undefined; return { marketIndex: m.marketIndex, symbol: m.symbol, oracleInfo: { publicKey: m.oracle, source: m.oracleSource }, }; } if (marketIndex != null) { const m = markets.find((x) => x.marketIndex === marketIndex); if (!m) return undefined; return { marketIndex: m.marketIndex, symbol: m.symbol, oracleInfo: { publicKey: m.oracle, source: m.oracleSource }, }; } return undefined; } async function main() { const marketName = readArg('market-name') || process.env.MARKET_NAME; const marketIndexRaw = readArg('market-index') || process.env.MARKET_INDEX; const marketIndexArg = marketIndexRaw ? Number.parseInt(marketIndexRaw, 10) : undefined; if (marketIndexRaw && (!Number.isInteger(marketIndexArg) || (marketIndexArg as number) < 0)) { throw new Error(`Invalid --market-index: ${marketIndexRaw}`); } const symbolArg = readArg('symbol') || process.env.SYMBOL; const intervalMs = readIntArg('interval-ms', 1000); const once = readFlag('once'); const dryRun = readFlag('dry-run'); const source = readArg('source') || process.env.SOURCE || 'drift_oracle'; const bearerToken = resolveAuthToken(); const ingestVia = resolveIngestVia(); const ingestTickUrl = ingestVia === 'api' ? resolveIngestApiTickUrl() : undefined; const hasuraUrl = ingestVia === 'hasura' ? resolveHasuraUrl() : undefined; const adminSecret = ingestVia === 'hasura' ? resolveAdminSecret() : undefined; const hasuraAuth: HasuraAuth = ingestVia === 'hasura' ? { bearerToken, adminSecret } : { bearerToken }; if (!dryRun) { if (ingestVia === 'api') { if (!ingestTickUrl) { throw new Error( 'Missing ingest API URL. Pass --ingest-url (or set INGEST_API_URL / tokens/api.json apiUrl) when using --ingest-via api.' ); } if (!bearerToken) { throw new Error( 'No ingest token configured. Pass --auth-token / INGEST_AUTH_TOKEN (or create tokens/alg.json).' ); } } else { if (!hasuraAuth.bearerToken && !hasuraAuth.adminSecret) { throw new Error( 'No Hasura auth configured. Create tokens/hasura.json (adminSecret) or a JWT via `npm run token:hasura-jwt`, ' + 'or pass --auth-token / HASURA_AUTH_TOKEN.' ); } } } if (!marketName && marketIndexArg == null) { throw new Error('Missing market. Provide --market-name (e.g. PUMP-PERP) or --market-index.'); } const resolvedFromConfig = resolvePerpMarketFromSdkConfig(marketName, marketIndexArg); let marketIndex: number; let symbol: string; let oracleInfo: OracleInfo; if (resolvedFromConfig) { marketIndex = resolvedFromConfig.marketIndex; symbol = symbolArg || resolvedFromConfig.symbol; oracleInfo = resolvedFromConfig.oracleInfo; } else if (marketName) { // Fallback: one-time discovery (fetch all markets once), then resubscribe narrowly. const driftClientDiscovery = await buildDriftClient({ pollingFrequencyMs: 0 }); try { const market = driftClientDiscovery.getMarketIndexAndType(marketName); if (!market) throw new Error(`Unknown market name: ${marketName}`); if (market.marketType !== MarketType.PERP) throw new Error(`Market is not PERP: ${marketName}`); const perp = driftClientDiscovery.getPerpMarketAccount(market.marketIndex); if (!perp) throw new Error(`Perp market account not loaded: marketIndex=${market.marketIndex}`); marketIndex = market.marketIndex; symbol = symbolArg || marketName; oracleInfo = { publicKey: perp.amm.oracle, source: perp.amm.oracleSource }; } finally { await driftClientDiscovery.unsubscribe(); } } else { // Fallback: market index given, fetch perp market once to get oracle info, then resubscribe narrowly. const driftClientDiscovery = await buildDriftClient({ pollingFrequencyMs: 0, perpMarketIndexes: [marketIndexArg as number], }); try { const perp = driftClientDiscovery.getPerpMarketAccount(marketIndexArg as number); if (!perp) throw new Error(`Perp market account not loaded: marketIndex=${marketIndexArg}`); marketIndex = marketIndexArg as number; symbol = symbolArg || `market_${marketIndex}`; oracleInfo = { publicKey: perp.amm.oracle, source: perp.amm.oracleSource }; } finally { await driftClientDiscovery.unsubscribe(); } } const driftClient = await buildDriftClient({ pollingFrequencyMs: intervalMs, perpMarketIndexes: [marketIndex], oracleInfos: [oracleInfo], }); let stopping = false; const stop = async () => { if (stopping) return; stopping = true; await driftClient.unsubscribe(); }; process.on('SIGINT', () => void stop().finally(() => process.exit(0))); process.on('SIGTERM', () => void stop().finally(() => process.exit(0))); const appVersion = process.env.APP_VERSION; const buildTimestamp = process.env.BUILD_TIMESTAMP; console.log( JSON.stringify( { version: appVersion, buildTimestamp, marketIndex, marketName, symbol, source, intervalMs, ingestVia, writeUrl: ingestVia === 'api' ? ingestTickUrl : hasuraUrl, auth: bearerToken ? 'bearer' : adminSecret ? 'admin-secret' : 'none', oracle: oracleInfo.publicKey?.toBase58?.() ?? undefined, rpc: (driftClient.connection as any)?.rpcEndpoint ? redactSecrets(String((driftClient.connection as any).rpcEndpoint)) : undefined, }, null, 2 ) ); let nextRunAt = Date.now(); while (!stopping) { const ts = getIsoNow(); const market = driftClient.getPerpMarketAccount(marketIndex); if (!market) { throw new Error(`Perp market account not available for marketIndex=${marketIndex}`); } const oracleData = driftClient.getOracleDataForPerpMarket(marketIndex); if (!oracleData?.price) { throw new Error(`Oracle data not available for marketIndex=${marketIndex}`); } const price = convertToNumber(oracleData.price, PRICE_PRECISION); const oracleSlot = oracleData.slot?.toNumber?.() ?? undefined; const mmOracleData = driftClient.getMMOracleDataForPerpMarket(marketIndex); const markPrice = convertToNumber(calculateReservePrice(market, mmOracleData), PRICE_PRECISION); const tick: Tick = { ts, market_index: marketIndex, symbol, // Hasura `numeric` columns expect string inputs (and return strings). oracle_price: String(price), mark_price: String(markPrice), oracle_slot: oracleSlot, source, raw: { confidence: oracleData.confidence?.toString?.(), hasSufficientNumberOfDataPoints: oracleData.hasSufficientNumberOfDataPoints, twap: oracleData.twap?.toString?.(), twapConfidence: oracleData.twapConfidence?.toString?.(), maxPrice: oracleData.maxPrice?.toString?.(), sequenceId: oracleData.sequenceId?.toString?.(), mmOracleActive: mmOracleData.isMMOracleActive, }, }; if (dryRun) { console.log(`[dry-run] ${JSON.stringify(tick)}`); } else { if (ingestVia === 'api') { await apiIngestTick(ingestTickUrl as string, bearerToken, tick); } else { await hasuraInsertTick(hasuraUrl as string, hasuraAuth, tick); } console.log(`${tick.ts} ${tick.symbol} oracle=${tick.oracle_price} mark=${tick.mark_price}`); } if (once) break; nextRunAt += intervalMs; const sleepMs = nextRunAt - Date.now(); if (sleepMs > 0) await new Promise((r) => setTimeout(r, sleepMs)); } await stop(); } main().catch((err) => { console.error(String(err?.stack || err)); process.exitCode = 1; });