diff --git a/kustomize/base/initdb/001_init.sql b/kustomize/base/initdb/001_init.sql new file mode 100644 index 0000000..8062f96 --- /dev/null +++ b/kustomize/base/initdb/001_init.sql @@ -0,0 +1,165 @@ +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +-- `drift_ticks` is an append-only tick log. +-- +-- TimescaleDB hypertables require every UNIQUE index / PRIMARY KEY to include the partitioning column (`ts`). +-- Therefore we use a composite primary key (ts, id) instead of PRIMARY KEY(id). +CREATE TABLE IF NOT EXISTS drift_ticks ( + ts TIMESTAMPTZ NOT NULL, + id BIGSERIAL NOT NULL, + market_index INTEGER NOT NULL, + symbol TEXT NOT NULL, + oracle_price NUMERIC NOT NULL, + mark_price NUMERIC, + oracle_slot BIGINT, + source TEXT NOT NULL DEFAULT 'drift_oracle', + raw JSONB, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (ts, id) +); + +-- Schema upgrades (idempotent for existing volumes) +ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS mark_price NUMERIC; +ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS id BIGSERIAL; + +-- Migrate price columns to NUMERIC (Hasura `numeric` scalar returns strings; see app code). +DO $$ +DECLARE + oracle_type text; + mark_type text; +BEGIN + SELECT data_type INTO oracle_type + FROM information_schema.columns + WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='oracle_price' + LIMIT 1; + + IF oracle_type IS NOT NULL AND oracle_type <> 'numeric' THEN + EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN oracle_price TYPE numeric USING oracle_price::numeric'; + END IF; + + SELECT data_type INTO mark_type + FROM information_schema.columns + WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='mark_price' + LIMIT 1; + + IF mark_type IS NOT NULL AND mark_type <> 'numeric' THEN + EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN mark_price TYPE numeric USING mark_price::numeric'; + END IF; +END $$; + +-- Ensure PRIMARY KEY is (ts, id) (Timescale hypertables require partition column in any UNIQUE/PK). +-- IMPORTANT: keep this idempotent so we can run migrations while the ingestor keeps writing ticks. +DO $$ +DECLARE + pk_name text; + pk_cols text[]; +BEGIN + SELECT + con.conname, + array_agg(att.attname ORDER BY ord.ordinality) + INTO pk_name, pk_cols + FROM pg_constraint con + JOIN pg_class rel ON rel.oid = con.conrelid + JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace + JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true + JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum + WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'drift_ticks' + GROUP BY con.conname; + + IF pk_name IS NULL THEN + EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; + ELSIF pk_cols <> ARRAY['ts','id'] THEN + EXECUTE format('ALTER TABLE public.drift_ticks DROP CONSTRAINT %I', pk_name); + EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; + END IF; +END $$; + +-- Convert to hypertable (migrate existing rows if any). +SELECT create_hypertable('drift_ticks', 'ts', if_not_exists => TRUE, migrate_data => TRUE); + +-- Historical note: earlier versions used a UNIQUE(market_index, ts) upsert model with ts rounded to seconds. +-- For "full ticks" (ms precision + multiple sources), we keep drift_ticks as an append-only event log. +ALTER TABLE drift_ticks DROP CONSTRAINT IF EXISTS drift_ticks_market_ts_unique; + +CREATE INDEX IF NOT EXISTS drift_ticks_market_ts_desc_idx + ON drift_ticks (market_index, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_symbol_ts_desc_idx + ON drift_ticks (symbol, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_market_source_ts_desc_idx + ON drift_ticks (market_index, source, ts DESC); + +CREATE INDEX IF NOT EXISTS drift_ticks_symbol_source_ts_desc_idx + ON drift_ticks (symbol, source, ts DESC); + +-- Revocable API tokens for external algs (store only hashes, never raw tokens). +CREATE TABLE IF NOT EXISTS api_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + token_hash TEXT NOT NULL UNIQUE, + scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[], + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + revoked_at TIMESTAMPTZ, + last_used_at TIMESTAMPTZ, + meta JSONB +); + +ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[]; + +CREATE INDEX IF NOT EXISTS api_tokens_revoked_at_idx + ON api_tokens (revoked_at); + +-- Compute OHLC candles from `drift_ticks` for a symbol and bucket size. +-- Exposed via Hasura (track function) and used by trade-api to compute indicators server-side. +-- Hasura tracks functions only if they return SETOF a table/view type. +-- This table is used purely as the return type for candle functions. +CREATE TABLE IF NOT EXISTS public.drift_candles ( + bucket timestamptz, + open numeric, + high numeric, + low numeric, + close numeric, + oracle_close numeric, + ticks bigint +); + +-- If an older version of the function exists with an incompatible return type, +-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent). +DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text); + +CREATE OR REPLACE FUNCTION public.get_drift_candles( + p_symbol text, + p_bucket_seconds integer, + p_limit integer DEFAULT 500, + p_source text DEFAULT NULL +) +RETURNS SETOF public.drift_candles +LANGUAGE sql +STABLE +AS $$ + WITH base AS ( + SELECT + time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket, + ts, + COALESCE(mark_price, oracle_price) AS px, + oracle_price AS oracle_px + FROM public.drift_ticks + WHERE symbol = p_symbol + AND (p_source IS NULL OR source = p_source) + AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2)) + ) + SELECT + bucket, + (array_agg(px ORDER BY ts ASC))[1] AS open, + max(px) AS high, + min(px) AS low, + (array_agg(px ORDER BY ts DESC))[1] AS close, + (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, + count(*) AS ticks + FROM base + GROUP BY bucket + ORDER BY bucket DESC + LIMIT p_limit; +$$;