feat(db): add initdb SQL

This commit is contained in:
u1
2026-01-06 00:19:38 +00:00
parent 66b300f638
commit d54405e32a

View File

@@ -0,0 +1,165 @@
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- `drift_ticks` is an append-only tick log.
--
-- TimescaleDB hypertables require every UNIQUE index / PRIMARY KEY to include the partitioning column (`ts`).
-- Therefore we use a composite primary key (ts, id) instead of PRIMARY KEY(id).
CREATE TABLE IF NOT EXISTS drift_ticks (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
market_index INTEGER NOT NULL,
symbol TEXT NOT NULL,
oracle_price NUMERIC NOT NULL,
mark_price NUMERIC,
oracle_slot BIGINT,
source TEXT NOT NULL DEFAULT 'drift_oracle',
raw JSONB,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS mark_price NUMERIC;
ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS id BIGSERIAL;
-- Migrate price columns to NUMERIC (Hasura `numeric` scalar returns strings; see app code).
DO $$
DECLARE
oracle_type text;
mark_type text;
BEGIN
SELECT data_type INTO oracle_type
FROM information_schema.columns
WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='oracle_price'
LIMIT 1;
IF oracle_type IS NOT NULL AND oracle_type <> 'numeric' THEN
EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN oracle_price TYPE numeric USING oracle_price::numeric';
END IF;
SELECT data_type INTO mark_type
FROM information_schema.columns
WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='mark_price'
LIMIT 1;
IF mark_type IS NOT NULL AND mark_type <> 'numeric' THEN
EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN mark_price TYPE numeric USING mark_price::numeric';
END IF;
END $$;
-- Ensure PRIMARY KEY is (ts, id) (Timescale hypertables require partition column in any UNIQUE/PK).
-- IMPORTANT: keep this idempotent so we can run migrations while the ingestor keeps writing ticks.
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'drift_ticks'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)';
ELSIF pk_cols <> ARRAY['ts','id'] THEN
EXECUTE format('ALTER TABLE public.drift_ticks DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)';
END IF;
END $$;
-- Convert to hypertable (migrate existing rows if any).
SELECT create_hypertable('drift_ticks', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
-- Historical note: earlier versions used a UNIQUE(market_index, ts) upsert model with ts rounded to seconds.
-- For "full ticks" (ms precision + multiple sources), we keep drift_ticks as an append-only event log.
ALTER TABLE drift_ticks DROP CONSTRAINT IF EXISTS drift_ticks_market_ts_unique;
CREATE INDEX IF NOT EXISTS drift_ticks_market_ts_desc_idx
ON drift_ticks (market_index, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_symbol_ts_desc_idx
ON drift_ticks (symbol, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_market_source_ts_desc_idx
ON drift_ticks (market_index, source, ts DESC);
CREATE INDEX IF NOT EXISTS drift_ticks_symbol_source_ts_desc_idx
ON drift_ticks (symbol, source, ts DESC);
-- Revocable API tokens for external algs (store only hashes, never raw tokens).
CREATE TABLE IF NOT EXISTS api_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
token_hash TEXT NOT NULL UNIQUE,
scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[],
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
revoked_at TIMESTAMPTZ,
last_used_at TIMESTAMPTZ,
meta JSONB
);
ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[];
CREATE INDEX IF NOT EXISTS api_tokens_revoked_at_idx
ON api_tokens (revoked_at);
-- Compute OHLC candles from `drift_ticks` for a symbol and bucket size.
-- Exposed via Hasura (track function) and used by trade-api to compute indicators server-side.
-- Hasura tracks functions only if they return SETOF a table/view type.
-- This table is used purely as the return type for candle functions.
CREATE TABLE IF NOT EXISTS public.drift_candles (
bucket timestamptz,
open numeric,
high numeric,
low numeric,
close numeric,
oracle_close numeric,
ticks bigint
);
-- If an older version of the function exists with an incompatible return type,
-- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent).
DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text);
CREATE OR REPLACE FUNCTION public.get_drift_candles(
p_symbol text,
p_bucket_seconds integer,
p_limit integer DEFAULT 500,
p_source text DEFAULT NULL
)
RETURNS SETOF public.drift_candles
LANGUAGE sql
STABLE
AS $$
WITH base AS (
SELECT
time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket,
ts,
COALESCE(mark_price, oracle_price) AS px,
oracle_price AS oracle_px
FROM public.drift_ticks
WHERE symbol = p_symbol
AND (p_source IS NULL OR source = p_source)
AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2))
)
SELECT
bucket,
(array_agg(px ORDER BY ts ASC))[1] AS open,
max(px) AS high,
min(px) AS low,
(array_agg(px ORDER BY ts DESC))[1] AS close,
(array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close,
count(*) AS ticks
FROM base
GROUP BY bucket
ORDER BY bucket DESC
LIMIT p_limit;
$$;