HackWatch

Production architecture

HackWatch CTI Platform v1

Production specification for hackwatch.io with strict source-contract enforcement, canonical normalization, analytics-first serving and legal-safe public visibility for commercial operation.

SQL schemaKafka topologySync matrixAPI contractsCompliance rules

Hard production rules

  • No connector writes directly to a public endpoint.
  • Every source passes through source_contract and canonical normalization.
  • Public pages expose aggregates and derived views by default.
  • Raw redistribution is governed per source license policy.

Core source coverage

This v1 model is designed around these source families:

URLhaus

ThreatFox

MalwareBazaar

SSLBL

Feodo

OpenPhish

AbuseIPDB

CrowdSec CTI

Spamhaus SIA

CISA KEV

NVD

EPSS

MITRE ATT&CK TAXII

Shodan

Censys

Netlas

Ransomware.live

Tor exit bulk list

Platform stack

  • Redpanda or Kafka for ingest bus
  • PostgreSQL for canonical entities and source contracts
  • ClickHouse for sighting events and map/stats rollups
  • MinIO or S3 for immutable raw payload storage
  • Redis for API and rollup cache
  • OpenSearch for IOC/CVE/brand full-text search
  • FastAPI or Go for serving APIs
  • Dedicated workers for ingestion, normalization, enrichment and rollups

PostgreSQL canonical schema

This is the v1 core SQL for canonical entities, ingest tracking and source contracts.

CREATE SCHEMA IF NOT EXISTS hw_core;
CREATE EXTENSION IF NOT EXISTS pgcrypto;

CREATE TABLE hw_core.source (
  id SMALLSERIAL PRIMARY KEY,
  code TEXT NOT NULL UNIQUE,
  name TEXT NOT NULL,
  tier TEXT NOT NULL CHECK (tier IN ('community','premium','enterprise','internal')),
  mode TEXT NOT NULL CHECK (mode IN ('pull','stream','manual')),
  enabled BOOLEAN NOT NULL DEFAULT TRUE,
  docs_ref TEXT,
  notes TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE hw_core.source_contract (
  source_id SMALLINT PRIMARY KEY REFERENCES hw_core.source(id) ON DELETE CASCADE,
  min_poll_interval_sec INTEGER NOT NULL DEFAULT 300,
  can_store_raw BOOLEAN NOT NULL DEFAULT TRUE,
  can_show_public_raw BOOLEAN NOT NULL DEFAULT FALSE,
  can_show_public_derived BOOLEAN NOT NULL DEFAULT TRUE,
  can_redistribute_raw BOOLEAN NOT NULL DEFAULT FALSE,
  needs_license_review BOOLEAN NOT NULL DEFAULT TRUE,
  public_policy TEXT NOT NULL DEFAULT 'derived_only' CHECK (public_policy IN ('disabled','derived_only','public_raw_allowed')),
  cache_ttl_sec INTEGER NOT NULL DEFAULT 300,
  last_reviewed_at TIMESTAMPTZ,
  notes TEXT
);

CREATE TABLE hw_core.ingest_run (
  id BIGSERIAL PRIMARY KEY,
  source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
  job_name TEXT NOT NULL,
  requested_from TIMESTAMPTZ,
  requested_to TIMESTAMPTZ,
  started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  finished_at TIMESTAMPTZ,
  status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','ok','partial','failed','cancelled')),
  cursor_in TEXT,
  cursor_out TEXT,
  records_seen INTEGER NOT NULL DEFAULT 0,
  records_parsed INTEGER NOT NULL DEFAULT 0,
  records_failed INTEGER NOT NULL DEFAULT 0,
  error_summary TEXT
);

CREATE TABLE hw_core.raw_object (
  id BIGSERIAL PRIMARY KEY,
  source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
  ingest_run_id BIGINT REFERENCES hw_core.ingest_run(id) ON DELETE SET NULL,
  external_id TEXT,
  content_sha256 CHAR(64) NOT NULL,
  observed_at TIMESTAMPTZ,
  fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  object_uri TEXT,
  payload_json JSONB,
  headers_json JSONB NOT NULL DEFAULT '{}'::jsonb,
  license_snapshot_json JSONB NOT NULL DEFAULT '{}'::jsonb,
  normalized BOOLEAN NOT NULL DEFAULT FALSE,
  parse_error TEXT,
  UNIQUE (source_id, content_sha256)
);

CREATE TABLE hw_core.observable (
  id BIGSERIAL PRIMARY KEY,
  public_id UUID NOT NULL DEFAULT gen_random_uuid(),
  type TEXT NOT NULL CHECK (type IN ('ip','domain','fqdn','url','sha256','sha1','md5','ja3','cert_sha256','cve','asn','email','onion')),
  value_raw TEXT NOT NULL,
  value_normalized TEXT NOT NULL,
  value_sha256 CHAR(64) NOT NULL,
  first_seen_at TIMESTAMPTZ,
  last_seen_at TIMESTAMPTZ,
  current_confidence SMALLINT NOT NULL DEFAULT 0 CHECK (current_confidence BETWEEN 0 AND 100),
  current_risk_score SMALLINT NOT NULL DEFAULT 0 CHECK (current_risk_score BETWEEN 0 AND 100),
  geo_country_code CHAR(2),
  asn BIGINT,
  public_allowed BOOLEAN NOT NULL DEFAULT FALSE,
  source_first_id SMALLINT REFERENCES hw_core.source(id),
  source_last_id SMALLINT REFERENCES hw_core.source(id),
  attributes JSONB NOT NULL DEFAULT '{}'::jsonb,
  UNIQUE (type, value_normalized),
  UNIQUE (public_id)
);

CREATE TABLE hw_core.observable_relation (
  id BIGSERIAL PRIMARY KEY,
  src_observable_id BIGINT NOT NULL REFERENCES hw_core.observable(id) ON DELETE CASCADE,
  dst_observable_id BIGINT NOT NULL REFERENCES hw_core.observable(id) ON DELETE CASCADE,
  relation_type TEXT NOT NULL CHECK (relation_type IN ('resolves_to','hosts','delivers','drops','uses_cert','uses_ja3','related_to','exploits_cve','targets','signed_by','derived_from','seen_with')),
  source_id SMALLINT NOT NULL REFERENCES hw_core.source(id),
  confidence SMALLINT NOT NULL DEFAULT 50 CHECK (confidence BETWEEN 0 AND 100),
  first_seen_at TIMESTAMPTZ,
  last_seen_at TIMESTAMPTZ,
  evidence_count INTEGER NOT NULL DEFAULT 1,
  attributes JSONB NOT NULL DEFAULT '{}'::jsonb,
  UNIQUE (src_observable_id, dst_observable_id, relation_type, source_id)
);

CREATE TABLE hw_core.cve (
  cve_id TEXT PRIMARY KEY,
  published_at TIMESTAMPTZ,
  modified_at TIMESTAMPTZ,
  vendor TEXT,
  product TEXT,
  cvss_v2 NUMERIC(4,1),
  cvss_v31 NUMERIC(4,1),
  cvss_v40 NUMERIC(4,1),
  cwe_ids TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
  description TEXT,
  attributes JSONB NOT NULL DEFAULT '{}'::jsonb
);

CREATE TABLE hw_core.cve_epss_daily (
  cve_id TEXT NOT NULL REFERENCES hw_core.cve(cve_id) ON DELETE CASCADE,
  score_date DATE NOT NULL,
  epss NUMERIC(8,7) NOT NULL,
  percentile NUMERIC(8,7) NOT NULL,
  PRIMARY KEY (cve_id, score_date)
);

CREATE TABLE hw_core.cve_kev (
  cve_id TEXT PRIMARY KEY REFERENCES hw_core.cve(cve_id) ON DELETE CASCADE,
  vendor_project TEXT,
  product TEXT,
  vulnerability_name TEXT,
  short_description TEXT,
  date_added DATE,
  due_date DATE,
  known_ransomware_campaign_use TEXT,
  notes TEXT,
  last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

ClickHouse analytics schema

Append-only events with rollups for map layers and hourly metrics.

CREATE SCHEMA IF NOT EXISTS hw_analytics;

CREATE TABLE hw_analytics.sighting_event
(
  observed_at DateTime64(3, 'UTC'),
  bucket_5m DateTime('UTC'),
  bucket_1h DateTime('UTC'),
  source_code LowCardinality(String),
  observable_id UInt64,
  observable_type LowCardinality(String),
  threat_class LowCardinality(String),
  subtype LowCardinality(String),
  geo_role LowCardinality(String),
  country_code FixedString(2),
  asn UInt32,
  lat Float32,
  lon Float32,
  brand_slug LowCardinality(String),
  malware_slug LowCardinality(String),
  cve_id String,
  confidence UInt8,
  risk_score UInt8,
  public_allowed UInt8,
  raw_object_id UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(observed_at)
ORDER BY (threat_class, observable_type, observable_id, observed_at, source_code)
TTL observed_at + INTERVAL 365 DAY;

CREATE TABLE hw_analytics.map_cell_5m
(
  bucket_5m DateTime('UTC'),
  layer LowCardinality(String),
  geo_role LowCardinality(String),
  cell_id String,
  country_code FixedString(2),
  events UInt64,
  observables UInt64,
  max_risk UInt8,
  top_brand String,
  top_malware String,
  updated_at DateTime('UTC')
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY toYYYYMM(bucket_5m)
ORDER BY (layer, geo_role, bucket_5m, cell_id);

CREATE TABLE hw_analytics.metric_1h
(
  bucket_1h DateTime('UTC'),
  metric_name LowCardinality(String),
  layer LowCardinality(String),
  source_code LowCardinality(String),
  country_code FixedString(2),
  metric_value UInt64,
  updated_at DateTime('UTC')
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY toYYYYMM(bucket_1h)
ORDER BY (metric_name, layer, bucket_1h, source_code, country_code);

Kafka or Redpanda topic plan

TopicKeyRole
hw.raw.*.v1content_sha256 or external_idRaw source events from connectors
hw.normalize.observable.v1type:value_normalizedCanonical observable upsert stream
hw.normalize.observable_state.v1type:value_normalizedCompacted latest entity state
hw.normalize.relation.v1src:rel:dstEntity relation stream
hw.enrich.score.v1observable_idConfidence/risk scoring results
hw.analytics.sighting.v1observable_idAnalytics-ready sighting events
hw.rollup.map.5m.v1layer:bucket:cell5-minute map cell rollups
hw.rollup.metric.1h.v1metric:bucket:layerHourly metric rollups
hw.alert.watchlist.v1watchlist_idWatchlist detections and alert fan-out
hw.dlq.v1source:failed_keyDead-letter stream for failed events

Sync schedule matrix

Near-real-time feeds are tuned to official cadence or safe polling windows, while reference sources run in controlled batch windows.

JobCadenceOutput topic
urlhaus_recent_pull*/5 * * * *hw.raw.urlhaus.v1
threatfox_recent_pull*/5 * * * *hw.raw.threatfox.v1
malwarebazaar_recent_pull*/15 * * * *hw.raw.malwarebazaar.v1
sslbl_pull*/5 * * * *hw.raw.sslbl.v1
feodo_pull*/5 * * * *hw.raw.feodotracker.v1
openphish_pull*/5 * * * *hw.raw.openphish.v1
kev_sync*/15 * * * *hw.raw.cisa_kev.v1
nvd_cvehistory_sync17 * * * *hw.raw.nvd.v1
epss_daily_sync15 4 * * *hw.raw.epss.v1
attack_daily_sync35 3 * * *hw.raw.mitre_attack.v1
ransomware_live_sync*/15 * * * *hw.raw.ransomware_live.v1
tor_exit_sync5 * * * *hw.raw.tor_exit.v1

Public API surface

Public reads are key-auth or anonymous with strict rate limits, while watchlist and admin operations stay behind authenticated endpoints.

MethodPathPurpose
GET/api/v1/healthHealthcheck endpoint
GET/api/v1/sourcesActive sources and freshness
GET/api/v1/stats/overviewGlobal counters by window
GET/api/v1/map/layersAvailable map layers
GET/api/v1/map/eventsFiltered event list for map and debug
GET/api/v1/searchFederated search for IOC/CVE/brand/malware
GET/api/v1/iocs/{type}/{value}IOC profile card
GET/api/v1/iocs/{type}/{value}/relationsGraph pivot relations
GET/api/v1/cves/{cve_id}CVE profile with KEV/EPSS context
GET/api/v1/ransomware/victimsVictim list by filters
GET/api/v1/exposure/hosts/{ip}Current exposure profile

End-to-end processing flow

  1. Connector pulls feed/API and publishes raw event to hw.raw.<source>.v1
  2. Raw writer stores immutable blob and metadata in raw_object
  3. Normalizer upserts canonical observables
  4. Relation linker attaches IOC, malware, CVE, ATT&CK and brand links
  5. Score engine computes confidence and risk values
  6. Event builder emits analytics sighting stream
  7. ClickHouse writer persists sighting_event and rollups
  8. API reads canonical entities from PostgreSQL and stats/maps from ClickHouse
  9. Alert engine compares watchlists against new events and emits watchlist alerts

License and policy enforcement

  • No connector writes directly to public API. Everything passes source_contract and normalization.
  • Public frontend shows derived and aggregated views by default, not unrestricted raw feed mirrors.
  • Set URLhaus Community to derived-only for public output and keep raw redistribution disabled.
  • Disable VirusTotal Public API in commercial production mode unless enterprise terms allow usage.
  • Keep Ransomware.live as derived-only until legal review confirms business-safe policy.
  • Allow broader public use only where licensing is explicitly permissive, for example CC0-like sources such as SSLBL data.

Recommended rollout order

  • Start with source, source_contract, ingest_run, raw_object, observable and observable_relation in PostgreSQL.
  • Add ClickHouse sighting_event, map_cell_5m and metric_1h.
  • Enable raw topics plus normalize.observable and analytics.sighting streams.
  • Launch connectors first for URLhaus, ThreatFox, SSLBL, Feodo, OpenPhish, KEV, NVD and EPSS.
  • Expand later with Shodan stream, Censys/Netlas exposure, CrowdSec/Spamhaus enrichment, Ransomware.live and MISP export.

This sequence delivers the fastest route to a usable threat map, IOC explorer and vulnerability board without blocking the platform on premium feed onboarding.