PostgreSQL canonical schema
This is the v1 core SQL for canonical entities, ingest tracking and source contracts.
CREATE SCHEMA IF NOT EXISTS hw_core;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE hw_core.source (
id SMALLSERIAL PRIMARY KEY,
code TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
tier TEXT NOT NULL CHECK (tier IN ('community','premium','enterprise','internal')),
mode TEXT NOT NULL CHECK (mode IN ('pull','stream','manual')),
enabled BOOLEAN NOT NULL DEFAULT TRUE,
docs_ref TEXT,
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE hw_core.source_contract (
source_id SMALLINT PRIMARY KEY REFERENCES hw_core.source(id) ON DELETE CASCADE,
min_poll_interval_sec INTEGER NOT NULL DEFAULT 300,
can_store_raw BOOLEAN NOT NULL DEFAULT TRUE,
can_show_public_raw BOOLEAN NOT NULL DEFAULT FALSE,
can_show_public_derived BOOLEAN NOT NULL DEFAULT TRUE,
can_redistribute_raw BOOLEAN NOT NULL DEFAULT FALSE,
needs_license_review BOOLEAN NOT NULL DEFAULT TRUE,
public_policy TEXT NOT NULL DEFAULT 'derived_only' CHECK (public_policy IN ('disabled','derived_only','public_raw_allowed')),
cache_ttl_sec INTEGER NOT NULL DEFAULT 300,
last_reviewed_at TIMESTAMPTZ,
notes TEXT
);
CREATE TABLE hw_core.ingest_run (
id BIGSERIAL PRIMARY KEY,
source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
job_name TEXT NOT NULL,
requested_from TIMESTAMPTZ,
requested_to TIMESTAMPTZ,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','ok','partial','failed','cancelled')),
cursor_in TEXT,
cursor_out TEXT,
records_seen INTEGER NOT NULL DEFAULT 0,
records_parsed INTEGER NOT NULL DEFAULT 0,
records_failed INTEGER NOT NULL DEFAULT 0,
error_summary TEXT
);
CREATE TABLE hw_core.raw_object (
id BIGSERIAL PRIMARY KEY,
source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
ingest_run_id BIGINT REFERENCES hw_core.ingest_run(id) ON DELETE SET NULL,
external_id TEXT,
content_sha256 CHAR(64) NOT NULL,
observed_at TIMESTAMPTZ,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
object_uri TEXT,
payload_json JSONB,
headers_json JSONB NOT NULL DEFAULT '{}'::jsonb,
license_snapshot_json JSONB NOT NULL DEFAULT '{}'::jsonb,
normalized BOOLEAN NOT NULL DEFAULT FALSE,
parse_error TEXT,
UNIQUE (source_id, content_sha256)
);
CREATE TABLE hw_core.observable (
id BIGSERIAL PRIMARY KEY,
public_id UUID NOT NULL DEFAULT gen_random_uuid(),
type TEXT NOT NULL CHECK (type IN ('ip','domain','fqdn','url','sha256','sha1','md5','ja3','cert_sha256','cve','asn','email','onion')),
value_raw TEXT NOT NULL,
value_normalized TEXT NOT NULL,
value_sha256 CHAR(64) NOT NULL,
first_seen_at TIMESTAMPTZ,
last_seen_at TIMESTAMPTZ,
current_confidence SMALLINT NOT NULL DEFAULT 0 CHECK (current_confidence BETWEEN 0 AND 100),
current_risk_score SMALLINT NOT NULL DEFAULT 0 CHECK (current_risk_score BETWEEN 0 AND 100),
geo_country_code CHAR(2),
asn BIGINT,
public_allowed BOOLEAN NOT NULL DEFAULT FALSE,
source_first_id SMALLINT REFERENCES hw_core.source(id),
source_last_id SMALLINT REFERENCES hw_core.source(id),
attributes JSONB NOT NULL DEFAULT '{}'::jsonb,
UNIQUE (type, value_normalized),
UNIQUE (public_id)
);
CREATE TABLE hw_core.observable_relation (
id BIGSERIAL PRIMARY KEY,
src_observable_id BIGINT NOT NULL REFERENCES hw_core.observable(id) ON DELETE CASCADE,
dst_observable_id BIGINT NOT NULL REFERENCES hw_core.observable(id) ON DELETE CASCADE,
relation_type TEXT NOT NULL CHECK (relation_type IN ('resolves_to','hosts','delivers','drops','uses_cert','uses_ja3','related_to','exploits_cve','targets','signed_by','derived_from','seen_with')),
source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
confidence SMALLINT NOT NULL DEFAULT 50 CHECK (confidence BETWEEN 0 AND 100),
first_seen_at TIMESTAMPTZ,
last_seen_at TIMESTAMPTZ,
evidence_count INTEGER NOT NULL DEFAULT 1,
attributes JSONB NOT NULL DEFAULT '{}'::jsonb,
UNIQUE (src_observable_id, dst_observable_id, relation_type, source_id)
);
CREATE TABLE hw_core.cve (
cve_id TEXT PRIMARY KEY,
published_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
vendor TEXT,
product TEXT,
cvss_v2 NUMERIC(4,1),
cvss_v31 NUMERIC(4,1),
cvss_v40 NUMERIC(4,1),
cwe_ids TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
description TEXT,
attributes JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE TABLE hw_core.cve_epss_daily (
cve_id TEXT NOT NULL REFERENCES hw_core.cve(cve_id) ON DELETE CASCADE,
score_date DATE NOT NULL,
epss NUMERIC(8,7) NOT NULL,
percentile NUMERIC(8,7) NOT NULL,
PRIMARY KEY (cve_id, score_date)
);
CREATE TABLE hw_core.cve_kev (
cve_id TEXT PRIMARY KEY REFERENCES hw_core.cve(cve_id) ON DELETE CASCADE,
vendor_project TEXT,
product TEXT,
vulnerability_name TEXT,
short_description TEXT,
date_added DATE,
due_date DATE,
known_ransomware_campaign_use TEXT,
notes TEXT,
last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
);ClickHouse analytics schema
Append-only events with rollups for map layers and hourly metrics.
CREATE SCHEMA IF NOT EXISTS hw_analytics;
CREATE TABLE hw_analytics.sighting_event
(
observed_at DateTime64(3, 'UTC'),
bucket_5m DateTime('UTC'),
bucket_1h DateTime('UTC'),
source_code LowCardinality(String),
observable_id UInt64,
observable_type LowCardinality(String),
threat_class LowCardinality(String),
subtype LowCardinality(String),
geo_role LowCardinality(String),
country_code FixedString(2),
asn UInt32,
lat Float32,
lon Float32,
brand_slug LowCardinality(String),
malware_slug LowCardinality(String),
cve_id String,
confidence UInt8,
risk_score UInt8,
public_allowed UInt8,
raw_object_id UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(observed_at)
ORDER BY (threat_class, observable_type, observable_id, observed_at, source_code)
TTL observed_at + INTERVAL 365 DAY;
CREATE TABLE hw_analytics.map_cell_5m
(
bucket_5m DateTime('UTC'),
layer LowCardinality(String),
geo_role LowCardinality(String),
cell_id String,
country_code FixedString(2),
events UInt64,
observables UInt64,
max_risk UInt8,
top_brand String,
top_malware String,
updated_at DateTime('UTC')
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY toYYYYMM(bucket_5m)
ORDER BY (layer, geo_role, bucket_5m, cell_id);
CREATE TABLE hw_analytics.metric_1h
(
bucket_1h DateTime('UTC'),
metric_name LowCardinality(String),
layer LowCardinality(String),
source_code LowCardinality(String),
country_code FixedString(2),
metric_value UInt64,
updated_at DateTime('UTC')
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY toYYYYMM(bucket_1h)
ORDER BY (metric_name, layer, bucket_1h, source_code, country_code);