CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE EXTENSION IF NOT EXISTS pgcrypto; -- `drift_ticks` is an append-only tick log. -- -- TimescaleDB hypertables require every UNIQUE index / PRIMARY KEY to include the partitioning column (`ts`). -- Therefore we use a composite primary key (ts, id) instead of PRIMARY KEY(id). CREATE TABLE IF NOT EXISTS drift_ticks ( ts TIMESTAMPTZ NOT NULL, id BIGSERIAL NOT NULL, market_index INTEGER NOT NULL, symbol TEXT NOT NULL, oracle_price NUMERIC NOT NULL, mark_price NUMERIC, oracle_slot BIGINT, source TEXT NOT NULL DEFAULT 'drift_oracle', raw JSONB, inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (ts, id) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS mark_price NUMERIC; ALTER TABLE drift_ticks ADD COLUMN IF NOT EXISTS id BIGSERIAL; -- Migrate price columns to NUMERIC (Hasura `numeric` scalar returns strings; see app code). DO $$ DECLARE oracle_type text; mark_type text; BEGIN SELECT data_type INTO oracle_type FROM information_schema.columns WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='oracle_price' LIMIT 1; IF oracle_type IS NOT NULL AND oracle_type <> 'numeric' THEN EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN oracle_price TYPE numeric USING oracle_price::numeric'; END IF; SELECT data_type INTO mark_type FROM information_schema.columns WHERE table_schema='public' AND table_name='drift_ticks' AND column_name='mark_price' LIMIT 1; IF mark_type IS NOT NULL AND mark_type <> 'numeric' THEN EXECUTE 'ALTER TABLE public.drift_ticks ALTER COLUMN mark_price TYPE numeric USING mark_price::numeric'; END IF; END $$; -- Ensure PRIMARY KEY is (ts, id) (Timescale hypertables require partition column in any UNIQUE/PK). -- IMPORTANT: keep this idempotent so we can run migrations while the ingestor keeps writing ticks. DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'drift_ticks' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; ELSIF pk_cols <> ARRAY['ts','id'] THEN EXECUTE format('ALTER TABLE public.drift_ticks DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.drift_ticks ADD CONSTRAINT drift_ticks_pkey PRIMARY KEY (ts, id)'; END IF; END $$; -- Convert to hypertable (migrate existing rows if any). SELECT create_hypertable('drift_ticks', 'ts', if_not_exists => TRUE, migrate_data => TRUE); -- Historical note: earlier versions used a UNIQUE(market_index, ts) upsert model with ts rounded to seconds. -- For "full ticks" (ms precision + multiple sources), we keep drift_ticks as an append-only event log. ALTER TABLE drift_ticks DROP CONSTRAINT IF EXISTS drift_ticks_market_ts_unique; CREATE INDEX IF NOT EXISTS drift_ticks_market_ts_desc_idx ON drift_ticks (market_index, ts DESC); CREATE INDEX IF NOT EXISTS drift_ticks_symbol_ts_desc_idx ON drift_ticks (symbol, ts DESC); CREATE INDEX IF NOT EXISTS drift_ticks_market_source_ts_desc_idx ON drift_ticks (market_index, source, ts DESC); CREATE INDEX IF NOT EXISTS drift_ticks_symbol_source_ts_desc_idx ON drift_ticks (symbol, source, ts DESC); -- Revocable API tokens for external algs (store only hashes, never raw tokens). CREATE TABLE IF NOT EXISTS api_tokens ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name TEXT NOT NULL, token_hash TEXT NOT NULL UNIQUE, scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[], created_at TIMESTAMPTZ NOT NULL DEFAULT now(), revoked_at TIMESTAMPTZ, last_used_at TIMESTAMPTZ, meta JSONB ); ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS scopes TEXT[] NOT NULL DEFAULT ARRAY[]::text[]; CREATE INDEX IF NOT EXISTS api_tokens_revoked_at_idx ON api_tokens (revoked_at); -- Compute OHLC candles from `drift_ticks` for a symbol and bucket size. -- Exposed via Hasura (track function) and used by trade-api to compute indicators server-side. -- Hasura tracks functions only if they return SETOF a table/view type. -- This table is used purely as the return type for candle functions. CREATE TABLE IF NOT EXISTS public.drift_candles ( bucket timestamptz, open numeric, high numeric, low numeric, close numeric, oracle_open numeric, oracle_high numeric, oracle_low numeric, oracle_close numeric, ticks bigint ); ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_open numeric; ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_high numeric; ALTER TABLE public.drift_candles ADD COLUMN IF NOT EXISTS oracle_low numeric; -- Precomputed candle cache (materialized by a worker). -- Purpose: make tf switching instant by reading ready-made candles instead of aggregating `drift_ticks` on demand. -- NOTE: `source=''` means "any source" (no source filter). CREATE TABLE IF NOT EXISTS public.drift_candles_cache ( bucket timestamptz NOT NULL, bucket_seconds integer NOT NULL, symbol text NOT NULL, source text NOT NULL DEFAULT '', open numeric NOT NULL, high numeric NOT NULL, low numeric NOT NULL, close numeric NOT NULL, oracle_open numeric, oracle_high numeric, oracle_low numeric, oracle_close numeric, ticks bigint NOT NULL DEFAULT 0, updated_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (bucket, bucket_seconds, symbol, source) ); ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_open numeric; ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_high numeric; ALTER TABLE public.drift_candles_cache ADD COLUMN IF NOT EXISTS oracle_low numeric; SELECT create_hypertable('drift_candles_cache', 'bucket', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS drift_candles_cache_symbol_source_bucket_idx ON public.drift_candles_cache (symbol, source, bucket_seconds, bucket DESC); -- If an older version of the function exists with an incompatible return type, -- CREATE OR REPLACE will fail. Drop the old signature first (safe/idempotent). DROP FUNCTION IF EXISTS public.get_drift_candles(text, integer, integer, text); CREATE OR REPLACE FUNCTION public.get_drift_candles( p_symbol text, p_bucket_seconds integer, p_limit integer DEFAULT 500, p_source text DEFAULT NULL ) RETURNS SETOF public.drift_candles LANGUAGE sql STABLE AS $$ -- Zwraca zawsze "ciągłe" buckety (fill forward), nawet jeśli nie było ticków w danej sekundzie/minucie. -- Dzięki temu frontend może rysować regularną oś czasu (np. 1px = 1s) bez dziwnych przeskoków. WITH src AS ( SELECT COALESCE(p_source, '') AS source_key ), raw_cached AS ( SELECT c.bucket, c.open, c.high, c.low, c.close, c.oracle_open, c.oracle_high, c.oracle_low, c.oracle_close, c.ticks FROM public.drift_candles_cache c, src WHERE c.symbol = p_symbol AND c.bucket_seconds = p_bucket_seconds AND c.source = src.source_key ORDER BY c.bucket DESC LIMIT p_limit ), raw_fallback AS ( SELECT time_bucket(make_interval(secs => p_bucket_seconds), ts) AS bucket, ts, COALESCE(mark_price, oracle_price) AS px, oracle_price AS oracle_px FROM public.drift_ticks, src WHERE symbol = p_symbol AND (src.source_key = '' OR source = src.source_key) AND ts >= now() - make_interval(secs => (p_bucket_seconds * p_limit * 2)) ), computed AS ( SELECT bucket, (array_agg(px ORDER BY ts ASC))[1] AS open, max(px) AS high, min(px) AS low, (array_agg(px ORDER BY ts DESC))[1] AS close, (array_agg(oracle_px ORDER BY ts ASC))[1] AS oracle_open, max(oracle_px) AS oracle_high, min(oracle_px) AS oracle_low, (array_agg(oracle_px ORDER BY ts DESC))[1] AS oracle_close, count(*) AS ticks FROM raw_fallback GROUP BY bucket ), data AS ( SELECT * FROM raw_cached UNION ALL SELECT * FROM computed WHERE NOT EXISTS (SELECT 1 FROM raw_cached) ), bounds AS ( SELECT max(bucket) AS end_bucket FROM data ), params AS ( SELECT make_interval(secs => p_bucket_seconds) AS step, make_interval(secs => (p_bucket_seconds * (p_limit - 1))) AS span ), series AS ( SELECT generate_series( bounds.end_bucket - params.span, bounds.end_bucket, params.step ) AS bucket FROM bounds, params WHERE bounds.end_bucket IS NOT NULL ), joined AS ( SELECT s.bucket, d.open, d.high, d.low, d.close, d.oracle_open, d.oracle_high, d.oracle_low, d.oracle_close, d.ticks FROM series s LEFT JOIN data d USING (bucket) ORDER BY s.bucket ASC ), grouped AS ( SELECT *, sum(CASE WHEN close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_close, sum(CASE WHEN oracle_close IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY bucket ASC) AS grp_oracle FROM joined ), first_vals AS ( SELECT (SELECT close FROM grouped WHERE close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_close, (SELECT oracle_close FROM grouped WHERE oracle_close IS NOT NULL ORDER BY bucket ASC LIMIT 1) AS first_oracle ), ff AS ( SELECT g.bucket, g.open, g.high, g.low, g.close, g.oracle_open, g.oracle_high, g.oracle_low, g.oracle_close, g.ticks, COALESCE( g.close, max(g.close) OVER (PARTITION BY g.grp_close), f.first_close ) AS ff_close, COALESCE( g.oracle_close, max(g.oracle_close) OVER (PARTITION BY g.grp_oracle), f.first_oracle ) AS ff_oracle FROM grouped g CROSS JOIN first_vals f ) SELECT bucket, COALESCE(open, ff_close) AS open, COALESCE(high, ff_close) AS high, COALESCE(low, ff_close) AS low, COALESCE(close, ff_close) AS close, COALESCE(oracle_open, ff_oracle) AS oracle_open, COALESCE(oracle_high, ff_oracle) AS oracle_high, COALESCE(oracle_low, ff_oracle) AS oracle_low, COALESCE(oracle_close, ff_oracle) AS oracle_close, COALESCE(ticks, 0) AS ticks FROM ff ORDER BY bucket DESC LIMIT p_limit; $$; -- Latest DLOB orderbook snapshots (top-N levels), per market. -- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions. CREATE TABLE IF NOT EXISTS public.dlob_l2_latest ( source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, ts BIGINT, slot BIGINT, mark_price NUMERIC, oracle_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, bids JSONB, asks JSONB, raw JSONB, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (source, market_name) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL; -- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel). DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)'; ELSIF pk_cols <> ARRAY['source','market_name'] THEN EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)'; END IF; END $$; CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx ON public.dlob_l2_latest (updated_at DESC); CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx ON public.dlob_l2_latest (source, updated_at DESC); -- Derived stats for fast UI display. CREATE TABLE IF NOT EXISTS public.dlob_stats_latest ( source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, ts BIGINT, slot BIGINT, mark_price NUMERIC, oracle_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, mid_price NUMERIC, spread_abs NUMERIC, spread_bps NUMERIC, depth_levels INTEGER, depth_bid_base NUMERIC, depth_ask_base NUMERIC, depth_bid_usd NUMERIC, depth_ask_usd NUMERIC, imbalance NUMERIC, raw JSONB, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (source, market_name) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL; -- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel). DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)'; ELSIF pk_cols <> ARRAY['source','market_name'] THEN EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)'; END IF; END $$; CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx ON public.dlob_stats_latest (updated_at DESC); CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx ON public.dlob_stats_latest (source, updated_at DESC); -- Depth snapshots within bps bands around mid-price (per market, per band). -- Filled by a derived worker that reads `dlob_l2_latest`. CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest ( source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, band_bps INTEGER NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, ts BIGINT, slot BIGINT, mid_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, bid_base NUMERIC, ask_base NUMERIC, bid_usd NUMERIC, ask_usd NUMERIC, imbalance NUMERIC, raw JSONB, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (source, market_name, band_bps) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL; -- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel). DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)'; ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)'; END IF; END $$; CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx ON public.dlob_depth_bps_latest (updated_at DESC); CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx ON public.dlob_depth_bps_latest (market_name); CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx ON public.dlob_depth_bps_latest (source, market_name); -- Slippage/impact estimates for "market" orders at common USD sizes. -- Filled by a derived worker that reads `dlob_l2_latest`. CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest ( source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, side TEXT NOT NULL, size_usd INTEGER NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, ts BIGINT, slot BIGINT, mid_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, vwap_price NUMERIC, worst_price NUMERIC, filled_usd NUMERIC, filled_base NUMERIC, impact_bps NUMERIC, levels_consumed INTEGER, fill_pct NUMERIC, raw JSONB, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (source, market_name, side, size_usd), CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell')) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL; -- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel). DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)'; ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)'; END IF; END $$; CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx ON public.dlob_slippage_latest (updated_at DESC); CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx ON public.dlob_slippage_latest (market_name); CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx ON public.dlob_slippage_latest (source, market_name); -- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side. -- Keep v1 intact for backward compatibility and to avoid data loss. CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 ( source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, side TEXT NOT NULL, -- buy|sell size_usd NUMERIC NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, ts BIGINT, slot BIGINT, mid_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, vwap_price NUMERIC, worst_price NUMERIC, filled_usd NUMERIC, filled_base NUMERIC, impact_bps NUMERIC, levels_consumed INTEGER, fill_pct NUMERIC, raw JSONB, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (source, market_name, side, size_usd), CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell')) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL; -- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel). DO $$ DECLARE pk_name text; pk_cols text[]; BEGIN SELECT con.conname, array_agg(att.attname ORDER BY ord.ordinality) INTO pk_name, pk_cols FROM pg_constraint con JOIN pg_class rel ON rel.oid = con.conrelid JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2' GROUP BY con.conname; IF pk_name IS NULL THEN EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)'; ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name); EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)'; END IF; END $$; CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx ON public.dlob_slippage_latest_v2 (updated_at DESC); CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx ON public.dlob_slippage_latest_v2 (market_name); CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx ON public.dlob_slippage_latest_v2 (source, market_name); -- Time-series tables for UI history (start: 7 days). -- Keep these append-only; use Timescale hypertables. CREATE TABLE IF NOT EXISTS public.dlob_stats_ts ( ts TIMESTAMPTZ NOT NULL, id BIGSERIAL NOT NULL, source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, source_ts BIGINT, slot BIGINT, mark_price NUMERIC, oracle_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, mid_price NUMERIC, spread_abs NUMERIC, spread_bps NUMERIC, depth_levels INTEGER, depth_bid_base NUMERIC, depth_ask_base NUMERIC, depth_bid_usd NUMERIC, depth_ask_usd NUMERIC, imbalance NUMERIC, raw JSONB, PRIMARY KEY (ts, id) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL; SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx ON public.dlob_stats_ts (market_name, ts DESC); CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx ON public.dlob_stats_ts (source, market_name, ts DESC); CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts ( ts TIMESTAMPTZ NOT NULL, id BIGSERIAL NOT NULL, source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, band_bps INTEGER NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, source_ts BIGINT, slot BIGINT, mid_price NUMERIC, best_bid_price NUMERIC, best_ask_price NUMERIC, bid_base NUMERIC, ask_base NUMERIC, bid_usd NUMERIC, ask_usd NUMERIC, imbalance NUMERIC, raw JSONB, PRIMARY KEY (ts, id) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL; SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx ON public.dlob_depth_bps_ts (market_name, ts DESC); CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx ON public.dlob_depth_bps_ts (source, market_name, ts DESC); CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts ( ts TIMESTAMPTZ NOT NULL, id BIGSERIAL NOT NULL, source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, side TEXT NOT NULL, size_usd INTEGER NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, source_ts BIGINT, slot BIGINT, mid_price NUMERIC, vwap_price NUMERIC, worst_price NUMERIC, filled_usd NUMERIC, filled_base NUMERIC, impact_bps NUMERIC, levels_consumed INTEGER, fill_pct NUMERIC, raw JSONB, PRIMARY KEY (ts, id) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL; SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx ON public.dlob_slippage_ts (market_name, ts DESC); CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx ON public.dlob_slippage_ts (source, market_name, ts DESC); CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 ( ts TIMESTAMPTZ NOT NULL, id BIGSERIAL NOT NULL, source TEXT NOT NULL DEFAULT 'mevnode', market_name TEXT NOT NULL, side TEXT NOT NULL, size_usd NUMERIC NOT NULL, market_type TEXT NOT NULL DEFAULT 'perp', market_index INTEGER, source_ts BIGINT, slot BIGINT, mid_price NUMERIC, vwap_price NUMERIC, worst_price NUMERIC, filled_usd NUMERIC, filled_base NUMERIC, impact_bps NUMERIC, levels_consumed INTEGER, fill_pct NUMERIC, raw JSONB, PRIMARY KEY (ts, id) ); -- Schema upgrades (idempotent for existing volumes) ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT; ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode'; UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL; ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL; SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE); CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx ON public.dlob_slippage_ts_v2 (market_name, ts DESC); CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC); -- Retention policies (best-effort; safe if Timescale is present). DO $$ BEGIN PERFORM add_retention_policy('dlob_stats_ts', INTERVAL '7 days'); EXCEPTION WHEN OTHERS THEN -- ignore if policy exists or function unavailable END $$; DO $$ BEGIN PERFORM add_retention_policy('dlob_depth_bps_ts', INTERVAL '7 days'); EXCEPTION WHEN OTHERS THEN END $$; DO $$ BEGIN PERFORM add_retention_policy('dlob_slippage_ts', INTERVAL '7 days'); EXCEPTION WHEN OTHERS THEN END $$; DO $$ BEGIN PERFORM add_retention_policy('dlob_slippage_ts_v2', INTERVAL '7 days'); EXCEPTION WHEN OTHERS THEN END $$;