chore: initial import
This commit is contained in:
5
.dockerignore
Normal file
5
.dockerignore
Normal file
@@ -0,0 +1,5 @@
|
||||
node_modules
|
||||
tokens/*.json
|
||||
!tokens/*.example.json
|
||||
.env
|
||||
*.log
|
||||
7
.gitignore
vendored
Normal file
7
.gitignore
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
node_modules/
|
||||
.env
|
||||
*.log
|
||||
|
||||
# Secrets (never commit)
|
||||
tokens/*
|
||||
!tokens/*.example.json
|
||||
28
Dockerfile
Normal file
28
Dockerfile
Normal file
@@ -0,0 +1,28 @@
|
||||
FROM node:20-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Native deps for optional packages in lockfile (np. node-hid/usb).
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
python3 \
|
||||
make \
|
||||
g++ \
|
||||
pkg-config \
|
||||
libusb-1.0-0-dev \
|
||||
libudev-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY package.json package-lock.json ./
|
||||
RUN npm ci
|
||||
|
||||
COPY scripts ./scripts
|
||||
COPY tokens/*.example.json ./tokens/
|
||||
|
||||
RUN mkdir -p /app/tokens
|
||||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
# Default można nadpisać w K8s/compose przez command/env.
|
||||
CMD ["npm", "run", "ingest:oracle", "--", "--market-name", "PUMP-PERP", "--interval-ms", "1000"]
|
||||
17
README.md
Normal file
17
README.md
Normal file
@@ -0,0 +1,17 @@
|
||||
# trade-ingestor
|
||||
|
||||
Worker ingestujący ticki (Drift oracle) i zapisujący do `trade-api` (albo bezpośrednio do Hasury).
|
||||
|
||||
## Konfiguracja (najczęstsza)
|
||||
|
||||
- `INGEST_VIA=api`
|
||||
- `INGEST_API_URL=http://trade-api:8787` (base URL; endpoint to `/v1/ingest/tick`)
|
||||
- token write w pliku (domyślnie) `/app/tokens/alg.json` albo env `INGEST_AUTH_TOKEN`
|
||||
- RPC: `tokens/heliusN.json` (w k3s montowane jako Secret)
|
||||
|
||||
## Docker
|
||||
|
||||
```bash
|
||||
docker build -t trade-ingestor .
|
||||
docker run --rm trade-ingestor
|
||||
```
|
||||
6500
package-lock.json
generated
Normal file
6500
package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
20
package.json
Normal file
20
package.json
Normal file
@@ -0,0 +1,20 @@
|
||||
{
|
||||
"name": "trade-ingestor",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"ingest:oracle": "tsx scripts/ingest-drift-oracle.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@drift-labs/sdk": "^2.90.0",
|
||||
"@ledgerhq/hw-app-solana": "^7.2.3",
|
||||
"@ledgerhq/hw-transport-node-hid": "^6.28.4",
|
||||
"@solana/spl-token": "^0.4.8",
|
||||
"@solana/web3.js": "^1.95.2",
|
||||
"bn.js": "^5.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"tsx": "^4.19.2",
|
||||
"typescript": "^5.6.3"
|
||||
}
|
||||
}
|
||||
651
scripts/ingest-drift-oracle.ts
Normal file
651
scripts/ingest-drift-oracle.ts
Normal file
@@ -0,0 +1,651 @@
|
||||
import dns from 'node:dns';
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import { Connection, Keypair, PublicKey } from '@solana/web3.js';
|
||||
import {
|
||||
BulkAccountLoader,
|
||||
calculateReservePrice,
|
||||
DriftClient,
|
||||
MarketType,
|
||||
OracleInfo,
|
||||
PRICE_PRECISION,
|
||||
configs,
|
||||
convertToNumber,
|
||||
} from '@drift-labs/sdk';
|
||||
|
||||
dns.setDefaultResultOrder('ipv4first');
|
||||
|
||||
process.stdout.on('error', (err: any) => {
|
||||
if (err?.code === 'EPIPE') process.exit(0);
|
||||
});
|
||||
|
||||
const HELIUS_RPC_BASES = [
|
||||
'https://mainnet.helius-rpc.com/?api-key=',
|
||||
'https://rpc.helius.xyz/?api-key=',
|
||||
];
|
||||
|
||||
const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url));
|
||||
const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '..');
|
||||
|
||||
function readArg(name: string): string | undefined {
|
||||
const idx = process.argv.indexOf(`--${name}`);
|
||||
if (idx === -1 || idx + 1 >= process.argv.length) return undefined;
|
||||
return process.argv[idx + 1];
|
||||
}
|
||||
|
||||
function readFlag(name: string): boolean {
|
||||
return process.argv.includes(`--${name}`);
|
||||
}
|
||||
|
||||
function readIntArg(name: string, fallback: number): number {
|
||||
const v = readArg(name);
|
||||
if (!v) return fallback;
|
||||
const n = Number.parseInt(v, 10);
|
||||
if (!Number.isInteger(n)) throw new Error(`Invalid --${name}: ${v}`);
|
||||
return n;
|
||||
}
|
||||
|
||||
function readJsonFile(filePath: string): unknown | undefined {
|
||||
if (!fs.existsSync(filePath)) return undefined;
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
}
|
||||
|
||||
type HasuraTokensFile = {
|
||||
adminSecret?: string;
|
||||
hasuraAdminSecret?: string;
|
||||
graphqlUrl?: string;
|
||||
apiUrl?: string;
|
||||
};
|
||||
|
||||
function readHasuraTokensFile(): HasuraTokensFile | undefined {
|
||||
return readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'hasura.json')) as HasuraTokensFile | undefined;
|
||||
}
|
||||
|
||||
type ApiTokensFile = {
|
||||
apiUrl?: string;
|
||||
url?: string;
|
||||
};
|
||||
|
||||
function readApiTokensFile(): ApiTokensFile | undefined {
|
||||
return readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'api.json')) as ApiTokensFile | undefined;
|
||||
}
|
||||
|
||||
type AlgTokenFile = {
|
||||
token?: string;
|
||||
jwt?: string;
|
||||
authToken?: string;
|
||||
};
|
||||
|
||||
function readAlgTokenFile(filePath: string): AlgTokenFile | undefined {
|
||||
return readJsonFile(filePath) as AlgTokenFile | undefined;
|
||||
}
|
||||
|
||||
function resolveAuthToken(): string | undefined {
|
||||
const cli = readArg('auth-token');
|
||||
if (cli) return cli;
|
||||
if (process.env.INGEST_AUTH_TOKEN) return process.env.INGEST_AUTH_TOKEN;
|
||||
if (process.env.API_AUTH_TOKEN) return process.env.API_AUTH_TOKEN;
|
||||
if (process.env.HASURA_AUTH_TOKEN) return process.env.HASURA_AUTH_TOKEN;
|
||||
|
||||
const tokenFilePath =
|
||||
readArg('auth-token-file') ||
|
||||
process.env.HASURA_AUTH_TOKEN_FILE ||
|
||||
path.join(PROJECT_ROOT, 'tokens', 'alg.json');
|
||||
|
||||
const fromFile = readAlgTokenFile(tokenFilePath);
|
||||
const tok = fromFile?.token || fromFile?.jwt || fromFile?.authToken;
|
||||
return typeof tok === 'string' && tok.length ? tok : undefined;
|
||||
}
|
||||
|
||||
function redactSecrets(text: string): string {
|
||||
return text.replace(/api-key=([^&\s]+)/gi, 'api-key=***');
|
||||
}
|
||||
|
||||
function resolveRpcCandidates(): string[] {
|
||||
const cli = readArg('rpc');
|
||||
if (cli) return [cli];
|
||||
if (process.env.RPC_URL) return [process.env.RPC_URL];
|
||||
|
||||
const fromTokens = readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'heliusN.json')) as
|
||||
| { heliusApiKey?: string; rpcUrl?: string }
|
||||
| undefined;
|
||||
if (fromTokens?.rpcUrl) return [fromTokens.rpcUrl];
|
||||
if (fromTokens?.heliusApiKey) return HELIUS_RPC_BASES.map((b) => `${b}${fromTokens.heliusApiKey}`);
|
||||
|
||||
const fallback = readJsonFile(path.join(PROJECT_ROOT, 'tokens', 'helius.json')) as
|
||||
| { heliusApiKey?: string; rpcUrl?: string }
|
||||
| undefined;
|
||||
if (fallback?.rpcUrl) return [fallback.rpcUrl];
|
||||
if (fallback?.heliusApiKey) return HELIUS_RPC_BASES.map((b) => `${b}${fallback.heliusApiKey}`);
|
||||
|
||||
throw new Error('RPC not configured. Put heliusApiKey in tokens/heliusN.json (or tokens/helius.json).');
|
||||
}
|
||||
|
||||
class NoBatchBulkAccountLoader extends BulkAccountLoader {
|
||||
// Helius free plans may block JSON-RPC *batch* requests. The SDK's default BulkAccountLoader
|
||||
// uses `_rpcBatchRequest`; we override to use `getMultipleAccountsInfoAndContext` instead.
|
||||
async loadChunk(accountsToLoadChunks: any[][]): Promise<void> {
|
||||
if (accountsToLoadChunks.length === 0) return;
|
||||
|
||||
for (const accountsToLoadChunk of accountsToLoadChunks) {
|
||||
const activeAccounts = accountsToLoadChunk.filter((a) => a?.callbacks?.size > 0);
|
||||
if (activeAccounts.length === 0) continue;
|
||||
|
||||
const pubkeys = activeAccounts.map((a) => a.publicKey);
|
||||
const res = await (this.connection as any).getMultipleAccountsInfoAndContext(pubkeys, this.commitment);
|
||||
const newSlot = res?.context?.slot ?? 0;
|
||||
if (newSlot > this.mostRecentSlot) this.mostRecentSlot = newSlot;
|
||||
|
||||
const values = (res?.value ?? []) as Array<{ data: Buffer } | null>;
|
||||
activeAccounts.forEach((accountToLoad, idx) => {
|
||||
const key = accountToLoad.publicKey.toBase58();
|
||||
const old = this.bufferAndSlotMap.get(key);
|
||||
if (old && newSlot < old.slot) return;
|
||||
|
||||
const info = values[idx] ?? null;
|
||||
const newBuffer: Buffer | undefined = info ? (info.data as any) : undefined;
|
||||
|
||||
if (!old) {
|
||||
this.bufferAndSlotMap.set(key, { slot: newSlot, buffer: newBuffer });
|
||||
this.handleAccountCallbacks(accountToLoad, newBuffer as any, newSlot);
|
||||
return;
|
||||
}
|
||||
|
||||
const oldBuffer = old.buffer;
|
||||
if (newBuffer && (!oldBuffer || !newBuffer.equals(oldBuffer))) {
|
||||
this.bufferAndSlotMap.set(key, { slot: newSlot, buffer: newBuffer });
|
||||
this.handleAccountCallbacks(accountToLoad, newBuffer as any, newSlot);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class KeypairWallet {
|
||||
payer: Keypair;
|
||||
publicKey: PublicKey;
|
||||
constructor(keypair: Keypair) {
|
||||
this.payer = keypair;
|
||||
this.publicKey = keypair.publicKey;
|
||||
}
|
||||
async signTransaction(tx: any): Promise<any> {
|
||||
tx.partialSign(this.payer);
|
||||
return tx;
|
||||
}
|
||||
async signAllTransactions(txs: any[]): Promise<any[]> {
|
||||
return txs.map((tx) => {
|
||||
tx.partialSign(this.payer);
|
||||
return tx;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
type Tick = {
|
||||
ts: string;
|
||||
market_index: number;
|
||||
symbol: string;
|
||||
oracle_price: string;
|
||||
mark_price: string;
|
||||
oracle_slot?: number;
|
||||
source: string;
|
||||
raw?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function getIsoNow(): string {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function resolveHasuraUrl(): string {
|
||||
const cli = readArg('api-url');
|
||||
if (cli) return cli;
|
||||
if (process.env.HASURA_GRAPHQL_URL) return process.env.HASURA_GRAPHQL_URL;
|
||||
|
||||
const fromTokens = readHasuraTokensFile();
|
||||
if (fromTokens?.graphqlUrl) return fromTokens.graphqlUrl;
|
||||
if (fromTokens?.apiUrl) return fromTokens.apiUrl;
|
||||
|
||||
return 'http://localhost:8080/v1/graphql';
|
||||
}
|
||||
|
||||
type IngestVia = 'hasura' | 'api';
|
||||
|
||||
function resolveIngestVia(): IngestVia {
|
||||
const cli = readArg('ingest-via');
|
||||
const env = process.env.INGEST_VIA;
|
||||
const v = (cli || env || 'hasura').toLowerCase();
|
||||
if (v === 'hasura' || v === 'api') return v;
|
||||
throw new Error(`Invalid --ingest-via: ${cli || env}`);
|
||||
}
|
||||
|
||||
function resolveIngestApiTickUrl(): string | undefined {
|
||||
const cli = readArg('ingest-url') || readArg('ingest-api-url');
|
||||
if (cli) {
|
||||
const u = new URL(cli);
|
||||
if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick';
|
||||
return u.toString();
|
||||
}
|
||||
|
||||
if (process.env.INGEST_API_URL) {
|
||||
const u = new URL(process.env.INGEST_API_URL);
|
||||
if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick';
|
||||
return u.toString();
|
||||
}
|
||||
|
||||
const fromTokens = readApiTokensFile();
|
||||
const base = fromTokens?.apiUrl || fromTokens?.url;
|
||||
if (!base) return undefined;
|
||||
const u = new URL(base);
|
||||
if (!u.pathname || u.pathname === '/') u.pathname = '/v1/ingest/tick';
|
||||
return u.toString();
|
||||
}
|
||||
|
||||
function resolveAdminSecret(): string | undefined {
|
||||
const cli = readArg('admin-secret');
|
||||
if (cli) return cli;
|
||||
if (process.env.HASURA_ADMIN_SECRET) return process.env.HASURA_ADMIN_SECRET;
|
||||
|
||||
const fromTokens = readHasuraTokensFile();
|
||||
return fromTokens?.adminSecret || fromTokens?.hasuraAdminSecret;
|
||||
}
|
||||
|
||||
type HasuraAuth = { bearerToken?: string; adminSecret?: string };
|
||||
|
||||
function buildHasuraGraphqlHeaders(auth: HasuraAuth): Record<string, string> {
|
||||
const headers: Record<string, string> = { 'content-type': 'application/json' };
|
||||
if (auth.bearerToken) headers.authorization = `Bearer ${auth.bearerToken}`;
|
||||
else if (auth.adminSecret) headers['x-hasura-admin-secret'] = auth.adminSecret;
|
||||
return headers;
|
||||
}
|
||||
|
||||
async function apiIngestTick(ingestTickUrl: string, bearerToken: string | undefined, tick: Tick) {
|
||||
if (!bearerToken) {
|
||||
throw new Error('Missing ingest auth token. Pass --auth-token / INGEST_AUTH_TOKEN or create tokens/alg.json.');
|
||||
}
|
||||
const res = await fetch(ingestTickUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
authorization: `Bearer ${bearerToken}`,
|
||||
},
|
||||
body: JSON.stringify(tick),
|
||||
});
|
||||
const text = await res.text();
|
||||
if (!res.ok) throw new Error(`Ingest API HTTP ${res.status}: ${text}`);
|
||||
const json = JSON.parse(text) as { ok?: boolean; error?: string };
|
||||
if (!json.ok) throw new Error(`Ingest API error: ${json.error || text}`);
|
||||
}
|
||||
|
||||
async function hasuraInsertTick(hasuraUrl: string, auth: HasuraAuth, tick: Tick) {
|
||||
const query = `
|
||||
mutation InsertTick($object: drift_ticks_insert_input!) {
|
||||
insert_drift_ticks_one(object: $object) {
|
||||
id
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const headers = buildHasuraGraphqlHeaders(auth);
|
||||
|
||||
const attempt = async () => {
|
||||
const res = await fetch(hasuraUrl, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ query, variables: { object: tick } }),
|
||||
});
|
||||
|
||||
const bodyText = await res.text();
|
||||
if (!res.ok) {
|
||||
throw new Error(`Hasura HTTP ${res.status}: ${bodyText}`);
|
||||
}
|
||||
|
||||
const json = JSON.parse(bodyText) as { errors?: Array<{ message: string }>; data?: unknown };
|
||||
if (json.errors?.length) {
|
||||
throw new Error(`Hasura GraphQL error: ${json.errors.map((e) => e.message).join(' | ')}`);
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
await attempt();
|
||||
} catch (err: any) {
|
||||
const msg = String(err?.message || err);
|
||||
const looksUntracked =
|
||||
msg.includes('no mutations exist') ||
|
||||
msg.includes('Cannot query field "insert_drift_ticks_one"') ||
|
||||
msg.includes('Unknown field "insert_drift_ticks_one"') ||
|
||||
msg.includes('not found in type:') ||
|
||||
msg.includes('not found on type:');
|
||||
|
||||
if (!looksUntracked) throw err;
|
||||
|
||||
await ensureHasuraDriftTicksTracked(hasuraUrl, auth.adminSecret);
|
||||
await attempt();
|
||||
}
|
||||
}
|
||||
|
||||
async function ensureHasuraDriftTicksTracked(hasuraUrl: string, adminSecret: string | undefined) {
|
||||
if (!adminSecret) {
|
||||
throw new Error(
|
||||
'Hasura mutations missing (table likely not tracked). ' +
|
||||
'Start the stack with hasura-bootstrap or track drift_ticks in the Hasura console (requires admin secret).'
|
||||
);
|
||||
}
|
||||
|
||||
const url = new URL(hasuraUrl);
|
||||
url.pathname = '/v1/metadata';
|
||||
url.search = '';
|
||||
|
||||
const headers: Record<string, string> = { 'content-type': 'application/json' };
|
||||
headers['x-hasura-admin-secret'] = adminSecret;
|
||||
|
||||
// Track the table so Hasura exposes insert/update mutations.
|
||||
// If it's already tracked, Hasura returns an error; we treat that as success.
|
||||
const track = await fetch(url.toString(), {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({
|
||||
type: 'pg_track_table',
|
||||
args: {
|
||||
source: 'default',
|
||||
table: { schema: 'public', name: 'drift_ticks' },
|
||||
},
|
||||
}),
|
||||
});
|
||||
if (track.ok) return;
|
||||
|
||||
const text = await track.text();
|
||||
const alreadyTracked =
|
||||
text.includes('already tracked') ||
|
||||
text.includes('already exists') ||
|
||||
text.includes('already present') ||
|
||||
text.includes('already been tracked');
|
||||
if (alreadyTracked) return;
|
||||
|
||||
throw new Error(`Hasura metadata track failed (${track.status}): ${text}`);
|
||||
}
|
||||
|
||||
type DriftBuildOptions = {
|
||||
pollingFrequencyMs: number;
|
||||
perpMarketIndexes?: number[];
|
||||
oracleInfos?: OracleInfo[];
|
||||
};
|
||||
|
||||
async function buildDriftClient(opts: DriftBuildOptions): Promise<DriftClient> {
|
||||
const rpcCandidates = resolveRpcCandidates();
|
||||
const errors: string[] = [];
|
||||
|
||||
for (const rpcUrl of rpcCandidates) {
|
||||
const connection = new Connection(rpcUrl, 'confirmed');
|
||||
let driftClient: DriftClient | undefined;
|
||||
try {
|
||||
await connection.getVersion();
|
||||
const wallet = new KeypairWallet(Keypair.generate());
|
||||
const authority = wallet.publicKey;
|
||||
const bulkAccountLoader = new NoBatchBulkAccountLoader(
|
||||
connection as any,
|
||||
'confirmed',
|
||||
opts.pollingFrequencyMs
|
||||
);
|
||||
driftClient = new DriftClient({
|
||||
connection,
|
||||
wallet,
|
||||
env: 'mainnet-beta',
|
||||
authority,
|
||||
skipLoadUsers: true,
|
||||
perpMarketIndexes: opts.perpMarketIndexes,
|
||||
oracleInfos: opts.oracleInfos,
|
||||
accountSubscription: {
|
||||
type: 'polling',
|
||||
accountLoader: bulkAccountLoader,
|
||||
},
|
||||
});
|
||||
await driftClient.subscribe();
|
||||
return driftClient;
|
||||
} catch (err: any) {
|
||||
if (driftClient) {
|
||||
try {
|
||||
await driftClient.unsubscribe();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
errors.push(String(err?.stack || err));
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`RPC connection failed for all candidates.\n\n` +
|
||||
`Tried:\n- ${rpcCandidates.map((u) => redactSecrets(u)).join('\n- ')}\n\n` +
|
||||
`Errors:\n- ${errors.join('\n\n- ')}\n`
|
||||
);
|
||||
}
|
||||
|
||||
type PerpMarketResolution = {
|
||||
marketIndex: number;
|
||||
symbol: string;
|
||||
oracleInfo: OracleInfo;
|
||||
};
|
||||
|
||||
function resolvePerpMarketFromSdkConfig(
|
||||
marketName: string | undefined,
|
||||
marketIndex: number | undefined
|
||||
): PerpMarketResolution | undefined {
|
||||
const cfg = configs['mainnet-beta'];
|
||||
const markets = cfg.PERP_MARKETS;
|
||||
|
||||
if (marketName) {
|
||||
const wanted = marketName.toUpperCase();
|
||||
const m = markets.find((x) => x.symbol.toUpperCase() === wanted);
|
||||
if (!m) return undefined;
|
||||
return {
|
||||
marketIndex: m.marketIndex,
|
||||
symbol: m.symbol,
|
||||
oracleInfo: { publicKey: m.oracle, source: m.oracleSource },
|
||||
};
|
||||
}
|
||||
|
||||
if (marketIndex != null) {
|
||||
const m = markets.find((x) => x.marketIndex === marketIndex);
|
||||
if (!m) return undefined;
|
||||
return {
|
||||
marketIndex: m.marketIndex,
|
||||
symbol: m.symbol,
|
||||
oracleInfo: { publicKey: m.oracle, source: m.oracleSource },
|
||||
};
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const marketName = readArg('market-name') || process.env.MARKET_NAME;
|
||||
const marketIndexRaw = readArg('market-index') || process.env.MARKET_INDEX;
|
||||
const marketIndexArg = marketIndexRaw ? Number.parseInt(marketIndexRaw, 10) : undefined;
|
||||
if (marketIndexRaw && (!Number.isInteger(marketIndexArg) || (marketIndexArg as number) < 0)) {
|
||||
throw new Error(`Invalid --market-index: ${marketIndexRaw}`);
|
||||
}
|
||||
const symbolArg = readArg('symbol') || process.env.SYMBOL;
|
||||
const intervalMs = readIntArg('interval-ms', 1000);
|
||||
const once = readFlag('once');
|
||||
const dryRun = readFlag('dry-run');
|
||||
const source = readArg('source') || process.env.SOURCE || 'drift_oracle';
|
||||
|
||||
const bearerToken = resolveAuthToken();
|
||||
const ingestVia = resolveIngestVia();
|
||||
const ingestTickUrl = ingestVia === 'api' ? resolveIngestApiTickUrl() : undefined;
|
||||
const hasuraUrl = ingestVia === 'hasura' ? resolveHasuraUrl() : undefined;
|
||||
const adminSecret = ingestVia === 'hasura' ? resolveAdminSecret() : undefined;
|
||||
const hasuraAuth: HasuraAuth = ingestVia === 'hasura' ? { bearerToken, adminSecret } : { bearerToken };
|
||||
|
||||
if (!dryRun) {
|
||||
if (ingestVia === 'api') {
|
||||
if (!ingestTickUrl) {
|
||||
throw new Error(
|
||||
'Missing ingest API URL. Pass --ingest-url (or set INGEST_API_URL / tokens/api.json apiUrl) when using --ingest-via api.'
|
||||
);
|
||||
}
|
||||
if (!bearerToken) {
|
||||
throw new Error(
|
||||
'No ingest token configured. Pass --auth-token / INGEST_AUTH_TOKEN (or create tokens/alg.json).'
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if (!hasuraAuth.bearerToken && !hasuraAuth.adminSecret) {
|
||||
throw new Error(
|
||||
'No Hasura auth configured. Create tokens/hasura.json (adminSecret) or a JWT via `npm run token:hasura-jwt`, ' +
|
||||
'or pass --auth-token / HASURA_AUTH_TOKEN.'
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!marketName && marketIndexArg == null) {
|
||||
throw new Error('Missing market. Provide --market-name (e.g. PUMP-PERP) or --market-index.');
|
||||
}
|
||||
|
||||
const resolvedFromConfig = resolvePerpMarketFromSdkConfig(marketName, marketIndexArg);
|
||||
let marketIndex: number;
|
||||
let symbol: string;
|
||||
let oracleInfo: OracleInfo;
|
||||
|
||||
if (resolvedFromConfig) {
|
||||
marketIndex = resolvedFromConfig.marketIndex;
|
||||
symbol = symbolArg || resolvedFromConfig.symbol;
|
||||
oracleInfo = resolvedFromConfig.oracleInfo;
|
||||
} else if (marketName) {
|
||||
// Fallback: one-time discovery (fetch all markets once), then resubscribe narrowly.
|
||||
const driftClientDiscovery = await buildDriftClient({ pollingFrequencyMs: 0 });
|
||||
try {
|
||||
const market = driftClientDiscovery.getMarketIndexAndType(marketName);
|
||||
if (!market) throw new Error(`Unknown market name: ${marketName}`);
|
||||
if (market.marketType !== MarketType.PERP) throw new Error(`Market is not PERP: ${marketName}`);
|
||||
const perp = driftClientDiscovery.getPerpMarketAccount(market.marketIndex);
|
||||
if (!perp) throw new Error(`Perp market account not loaded: marketIndex=${market.marketIndex}`);
|
||||
marketIndex = market.marketIndex;
|
||||
symbol = symbolArg || marketName;
|
||||
oracleInfo = { publicKey: perp.amm.oracle, source: perp.amm.oracleSource };
|
||||
} finally {
|
||||
await driftClientDiscovery.unsubscribe();
|
||||
}
|
||||
} else {
|
||||
// Fallback: market index given, fetch perp market once to get oracle info, then resubscribe narrowly.
|
||||
const driftClientDiscovery = await buildDriftClient({
|
||||
pollingFrequencyMs: 0,
|
||||
perpMarketIndexes: [marketIndexArg as number],
|
||||
});
|
||||
try {
|
||||
const perp = driftClientDiscovery.getPerpMarketAccount(marketIndexArg as number);
|
||||
if (!perp) throw new Error(`Perp market account not loaded: marketIndex=${marketIndexArg}`);
|
||||
marketIndex = marketIndexArg as number;
|
||||
symbol = symbolArg || `market_${marketIndex}`;
|
||||
oracleInfo = { publicKey: perp.amm.oracle, source: perp.amm.oracleSource };
|
||||
} finally {
|
||||
await driftClientDiscovery.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
const driftClient = await buildDriftClient({
|
||||
pollingFrequencyMs: intervalMs,
|
||||
perpMarketIndexes: [marketIndex],
|
||||
oracleInfos: [oracleInfo],
|
||||
});
|
||||
|
||||
let stopping = false;
|
||||
const stop = async () => {
|
||||
if (stopping) return;
|
||||
stopping = true;
|
||||
await driftClient.unsubscribe();
|
||||
};
|
||||
process.on('SIGINT', () => void stop().finally(() => process.exit(0)));
|
||||
process.on('SIGTERM', () => void stop().finally(() => process.exit(0)));
|
||||
|
||||
const appVersion = process.env.APP_VERSION;
|
||||
const buildTimestamp = process.env.BUILD_TIMESTAMP;
|
||||
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
version: appVersion,
|
||||
buildTimestamp,
|
||||
marketIndex,
|
||||
marketName,
|
||||
symbol,
|
||||
source,
|
||||
intervalMs,
|
||||
ingestVia,
|
||||
writeUrl: ingestVia === 'api' ? ingestTickUrl : hasuraUrl,
|
||||
auth: bearerToken ? 'bearer' : adminSecret ? 'admin-secret' : 'none',
|
||||
oracle: oracleInfo.publicKey?.toBase58?.() ?? undefined,
|
||||
rpc: (driftClient.connection as any)?.rpcEndpoint
|
||||
? redactSecrets(String((driftClient.connection as any).rpcEndpoint))
|
||||
: undefined,
|
||||
},
|
||||
null,
|
||||
2
|
||||
)
|
||||
);
|
||||
|
||||
let nextRunAt = Date.now();
|
||||
while (!stopping) {
|
||||
const ts = getIsoNow();
|
||||
const market = driftClient.getPerpMarketAccount(marketIndex);
|
||||
if (!market) {
|
||||
throw new Error(`Perp market account not available for marketIndex=${marketIndex}`);
|
||||
}
|
||||
const oracleData = driftClient.getOracleDataForPerpMarket(marketIndex);
|
||||
if (!oracleData?.price) {
|
||||
throw new Error(`Oracle data not available for marketIndex=${marketIndex}`);
|
||||
}
|
||||
|
||||
const price = convertToNumber(oracleData.price, PRICE_PRECISION);
|
||||
const oracleSlot = oracleData.slot?.toNumber?.() ?? undefined;
|
||||
const mmOracleData = driftClient.getMMOracleDataForPerpMarket(marketIndex);
|
||||
const markPrice = convertToNumber(calculateReservePrice(market, mmOracleData), PRICE_PRECISION);
|
||||
|
||||
const tick: Tick = {
|
||||
ts,
|
||||
market_index: marketIndex,
|
||||
symbol,
|
||||
// Hasura `numeric` columns expect string inputs (and return strings).
|
||||
oracle_price: String(price),
|
||||
mark_price: String(markPrice),
|
||||
oracle_slot: oracleSlot,
|
||||
source,
|
||||
raw: {
|
||||
confidence: oracleData.confidence?.toString?.(),
|
||||
hasSufficientNumberOfDataPoints: oracleData.hasSufficientNumberOfDataPoints,
|
||||
twap: oracleData.twap?.toString?.(),
|
||||
twapConfidence: oracleData.twapConfidence?.toString?.(),
|
||||
maxPrice: oracleData.maxPrice?.toString?.(),
|
||||
sequenceId: oracleData.sequenceId?.toString?.(),
|
||||
mmOracleActive: mmOracleData.isMMOracleActive,
|
||||
},
|
||||
};
|
||||
|
||||
if (dryRun) {
|
||||
console.log(`[dry-run] ${JSON.stringify(tick)}`);
|
||||
} else {
|
||||
if (ingestVia === 'api') {
|
||||
await apiIngestTick(ingestTickUrl as string, bearerToken, tick);
|
||||
} else {
|
||||
await hasuraInsertTick(hasuraUrl as string, hasuraAuth, tick);
|
||||
}
|
||||
console.log(`${tick.ts} ${tick.symbol} oracle=${tick.oracle_price} mark=${tick.mark_price}`);
|
||||
}
|
||||
|
||||
if (once) break;
|
||||
|
||||
nextRunAt += intervalMs;
|
||||
const sleepMs = nextRunAt - Date.now();
|
||||
if (sleepMs > 0) await new Promise((r) => setTimeout(r, sleepMs));
|
||||
}
|
||||
|
||||
await stop();
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(String(err?.stack || err));
|
||||
process.exitCode = 1;
|
||||
});
|
||||
7
tokens/alg.example.json
Normal file
7
tokens/alg.example.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"token": "alg_PASTE_TOKEN_HERE",
|
||||
"id": "00000000-0000-0000-0000-000000000000",
|
||||
"name": "algo1",
|
||||
"createdAt": "2030-01-01T00:00:00.000Z",
|
||||
"scopes": ["write"]
|
||||
}
|
||||
4
tokens/heliusN.example.json
Normal file
4
tokens/heliusN.example.json
Normal file
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"rpcUrl": "https://mainnet.helius-rpc.com/?api-key=REDACTED",
|
||||
"heliusApiKey": "REDACTED"
|
||||
}
|
||||
Reference in New Issue
Block a user