feat(dlob): support two sources + per-user switch

- Add "source" column + composite PKs for DLOB tables\n- Filter public Hasura selects by X-Hasura-Dlob-Source\n- Run parallel workers for mevnode + dlob.drift.trade\n- Frontend proxy sets x-hasura-dlob-source from cookie and injects UI switch
This commit is contained in:
u1
2026-02-13 10:48:20 +01:00
parent 9e7d7b88ac
commit 57433c7e75
17 changed files with 501 additions and 54 deletions

View File

@@ -321,7 +321,8 @@ $$;
-- Latest DLOB orderbook snapshots (top-N levels), per market.
-- Filled by a VPS worker (collector) and consumed by the UI via Hasura subscriptions.
CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
market_name TEXT PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
@@ -333,15 +334,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_l2_latest (
bids JSONB,
asks JSONB,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_l2_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_l2_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_l2_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_l2_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_l2_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_l2_latest ADD CONSTRAINT dlob_l2_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_l2_latest_updated_at_idx
ON public.dlob_l2_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_l2_latest_source_updated_at_idx
ON public.dlob_l2_latest (source, updated_at DESC);
-- Derived stats for fast UI display.
CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
market_name TEXT PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
ts BIGINT,
@@ -360,15 +398,52 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_latest (
depth_ask_usd NUMERIC,
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (source, market_name)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_stats_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
ELSIF pk_cols <> ARRAY['source','market_name'] THEN
EXECUTE format('ALTER TABLE public.dlob_stats_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_stats_latest ADD CONSTRAINT dlob_stats_latest_pkey PRIMARY KEY (source, market_name)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_stats_latest_updated_at_idx
ON public.dlob_stats_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_latest_source_updated_at_idx
ON public.dlob_stats_latest (source, updated_at DESC);
-- Depth snapshots within bps bands around mid-price (per market, per band).
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
@@ -385,18 +460,54 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_latest (
imbalance NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, band_bps)
PRIMARY KEY (source, market_name, band_bps)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, band_bps) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_depth_bps_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
ELSIF pk_cols <> ARRAY['source','market_name','band_bps'] THEN
EXECUTE format('ALTER TABLE public.dlob_depth_bps_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_depth_bps_latest ADD CONSTRAINT dlob_depth_bps_latest_pkey PRIMARY KEY (source, market_name, band_bps)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_updated_at_idx
ON public.dlob_depth_bps_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_market_name_idx
ON public.dlob_depth_bps_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_latest_source_market_name_idx
ON public.dlob_depth_bps_latest (source, market_name);
-- Slippage/impact estimates for "market" orders at common USD sizes.
-- Filled by a derived worker that reads `dlob_l2_latest`.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
@@ -416,19 +527,55 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest (
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest ADD CONSTRAINT dlob_slippage_latest_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_updated_at_idx
ON public.dlob_slippage_latest (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_market_name_idx
ON public.dlob_slippage_latest (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_source_market_name_idx
ON public.dlob_slippage_latest (source, market_name);
-- Slippage v2: supports fractional order sizes (e.g. 0.1/0.2/0.5 USD), per market and side.
-- Keep v1 intact for backward compatibility and to avoid data loss.
CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL, -- buy|sell
size_usd NUMERIC NOT NULL,
@@ -448,22 +595,58 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_latest_v2 (
fill_pct NUMERIC,
raw JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (market_name, side, size_usd),
PRIMARY KEY (source, market_name, side, size_usd),
CONSTRAINT dlob_slippage_latest_v2_side_chk CHECK (side IN ('buy', 'sell'))
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_latest_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_latest_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_latest_v2 ALTER COLUMN source SET NOT NULL;
-- Ensure PRIMARY KEY is (source, market_name, side, size_usd) (required to keep 2 sources in parallel).
DO $$
DECLARE
pk_name text;
pk_cols text[];
BEGIN
SELECT
con.conname,
array_agg(att.attname ORDER BY ord.ordinality)
INTO pk_name, pk_cols
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace
JOIN unnest(con.conkey) WITH ORDINALITY AS ord(attnum, ordinality) ON true
JOIN pg_attribute att ON att.attrelid = rel.oid AND att.attnum = ord.attnum
WHERE con.contype = 'p' AND nsp.nspname = 'public' AND rel.relname = 'dlob_slippage_latest_v2'
GROUP BY con.conname;
IF pk_name IS NULL THEN
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
ELSIF pk_cols <> ARRAY['source','market_name','side','size_usd'] THEN
EXECUTE format('ALTER TABLE public.dlob_slippage_latest_v2 DROP CONSTRAINT %I', pk_name);
EXECUTE 'ALTER TABLE public.dlob_slippage_latest_v2 ADD CONSTRAINT dlob_slippage_latest_v2_pkey PRIMARY KEY (source, market_name, side, size_usd)';
END IF;
END $$;
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_updated_at_idx
ON public.dlob_slippage_latest_v2 (updated_at DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_market_name_idx
ON public.dlob_slippage_latest_v2 (market_name);
CREATE INDEX IF NOT EXISTS dlob_slippage_latest_v2_source_market_name_idx
ON public.dlob_slippage_latest_v2 (source, market_name);
-- Time-series tables for UI history (start: 7 days).
-- Keep these append-only; use Timescale hypertables.
CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
market_index INTEGER,
@@ -486,14 +669,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_stats_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_stats_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_stats_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_stats_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_stats_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_market_ts_desc_idx
ON public.dlob_stats_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_stats_ts_source_market_ts_desc_idx
ON public.dlob_stats_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
band_bps INTEGER NOT NULL,
market_type TEXT NOT NULL DEFAULT 'perp',
@@ -512,14 +705,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_depth_bps_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_depth_bps_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_depth_bps_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_depth_bps_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_depth_bps_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_market_ts_desc_idx
ON public.dlob_depth_bps_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_depth_bps_ts_source_market_ts_desc_idx
ON public.dlob_depth_bps_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd INTEGER NOT NULL,
@@ -539,14 +742,24 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_market_ts_desc_idx
ON public.dlob_slippage_ts (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_source_market_ts_desc_idx
ON public.dlob_slippage_ts (source, market_name, ts DESC);
CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
ts TIMESTAMPTZ NOT NULL,
id BIGSERIAL NOT NULL,
source TEXT NOT NULL DEFAULT 'mevnode',
market_name TEXT NOT NULL,
side TEXT NOT NULL,
size_usd NUMERIC NOT NULL,
@@ -566,11 +779,19 @@ CREATE TABLE IF NOT EXISTS public.dlob_slippage_ts_v2 (
PRIMARY KEY (ts, id)
);
-- Schema upgrades (idempotent for existing volumes)
ALTER TABLE public.dlob_slippage_ts_v2 ADD COLUMN IF NOT EXISTS source TEXT;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET DEFAULT 'mevnode';
UPDATE public.dlob_slippage_ts_v2 SET source = 'mevnode' WHERE source IS NULL;
ALTER TABLE public.dlob_slippage_ts_v2 ALTER COLUMN source SET NOT NULL;
SELECT create_hypertable('dlob_slippage_ts_v2', 'ts', if_not_exists => TRUE, migrate_data => TRUE);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (market_name, ts DESC);
CREATE INDEX IF NOT EXISTS dlob_slippage_ts_v2_source_market_ts_desc_idx
ON public.dlob_slippage_ts_v2 (source, market_name, ts DESC);
-- Retention policies (best-effort; safe if Timescale is present).
DO $$
BEGIN