MasteryMade · Foundation PRD

PRD 3: Universal Content Ingestion Service

PRD 3 of 12 Depends on PRD 1 + 2 Owner: Lane A + C
Parent: Master Registry v1.0 — Section 04

3.1 Purpose

One service ingests all content types into a standard schema. Replaces three previous PRDs (RSS Intelligence Router, Content Notebook, Signal Engine ingestion). Python Docker container on Forge. What changes per use case isn't the ingestion — it's the gate tag and the lens applied downstream.

3.2 Service Architecture

Forge VPS (Docker)
├── ingest-service/
│   ├── main.py                  # FastAPI app
│   ├── extractors/
│   │   ├── youtube.py           # Gemini API (visual + spoken)
│   │   ├── podcast.py           # Whisper transcription
│   │   ├── meta_ads.py          # Puppeteer/Playwright
│   │   ├── social.py            # Platform API + scraper
│   │   ├── web.py               # Readability + Cheerio
│   │   ├── rss.py               # feedparser + full text
│   │   ├── email_cal.py         # Gmail + Calendar MCP proxy
│   │   ├── transcript.py        # Tactiq/Fireflies webhook
│   │   └── document.py          # PDF/DOCX extraction
│   ├── processors/
│   │   ├── gate_tagger.py       # Gate assignment (PRD 2 rules)
│   │   ├── annotator.py         # Gemini annotation, hook class
│   │   ├── embedder.py          # pgvector embedding gen
│   │   └── deduplicator.py      # Source URL + content hash
│   ├── research/
│   │   └── research_service.py  # Pre-demo intel (Gate 2 trigger)
│   ├── config/
│   │   ├── sources.yaml         # URLs, schedules, gate maps
│   │   └── api_keys.env
│   ├── Dockerfile
│   └── requirements.txt

3.3 Content Table — Full Schema

CREATE TABLE content (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  source_type TEXT NOT NULL CHECK (source_type IN (
    'youtube','podcast','meta_ads','social','web','rss',
    'email','calendar','session','transcript','document'
  )),
  source_url TEXT,
  source_hash TEXT,  -- SHA256 for dedup

  gate INT NOT NULL CHECK (gate IN (1,2,3,4)),
  entity_id UUID REFERENCES entities(id),

  title TEXT,
  raw_text TEXT NOT NULL,
  summary TEXT,  -- AI-generated 2-3 sentences

  annotations JSONB DEFAULT '{}',
  -- youtube: {hooks:[],pattern_interrupts:[],format:"",on_screen_text:[]}
  -- meta_ads: {ad_copy:"",cta:"",offer:"",creative_format:""}
  -- social: {platform:"",post_type:""}
  -- rss: {category:"",publication:""}

  metadata JSONB DEFAULT '{}',
  -- Common: {publish_date,author,duration_seconds,word_count}
  -- youtube: {views,likes,comments,shares}
  -- social: {likes,comments,shares,saves,reach}
  -- meta_ads: {spend,impressions,ctr,cpc,roas}

  embedding vector(1536),

  status TEXT NOT NULL DEFAULT 'raw' CHECK (status IN (
    'raw','annotated','embedded','scored','processed'
  )),

  ingested_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  source_published_at TIMESTAMPTZ,
  updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_content_gate ON content(gate);
CREATE INDEX idx_content_entity ON content(entity_id);
CREATE INDEX idx_content_source_type ON content(source_type);
CREATE INDEX idx_content_status ON content(status);
CREATE INDEX idx_content_source_hash ON content(source_hash);
CREATE INDEX idx_content_ingested ON content(ingested_at DESC);
CREATE INDEX idx_content_embedding ON content
  USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

3.4 Extractor Module Specs

YouTube (youtube.py)

Input: YouTube URL or channel ID. Process: 1) YouTube Data API: metadata (title, description, date, views, likes). 2) Gemini API (Flash or Pro): visual + spoken annotation — full transcript, on-screen text, hook identification (first 3s, first 30s), pattern interrupts, format classification. 3) Store transcript as raw_text, annotations as JSONB. Rate limits: YouTube API 10K units/day. Gemini: batch in 5s, 2s delay. Cost: ~$0.01-0.05/video (Flash). ~$5/day for 100-500 videos.

Meta Ad Library (meta_ads.py)

Input: Advertiser name or page ID. Process: 1) Playwright: navigate Meta Ad Library. 2) Search advertiser. 3) Per active ad: extract copy, creative screenshot, CTA, launch date, platforms. 4) Store copy as raw_text, metadata as annotations. Gate: Always Gate 4. Rate limit: 2s between pages, max 50 ads/advertiser/run. Reference: Merci Larry pattern — 213 ads across 4 competitors in 1 hour.

RSS (rss.py)

Input: Feed URL from sources.yaml. Process: feedparser → new entries since last check → fetch full HTML → Readability extraction → dedup check → store. Gate: Gate 1 (Jason's feeds). Exception: expert niche feeds → Gate 2 or 4. Schedule: Every 6h for daily, every 1h for HN/breaking. Lookback: 7 days first run, then incremental.

Podcasts (podcast.py)

Input: Podcast RSS or audio URL. Process: Download → Whisper API (or local whisper.cpp) → speaker diarization → topic segmentation. Cost: API ~$0.006/min. Local: free but slower.

Social (social.py)

Platforms: Instagram (Graph API), LinkedIn (scrape or manual), X (API v2). Per post: content, engagement, media type, timestamp. Image/video posts get Gemini Vision description. Rate limits: IG 200/hr, X 300 reads/15min.

Email/Calendar (email_cal.py)

Proxy to Gmail + Google Calendar MCP integrations. Extract threads, participants, action items, events. Always Gate 1.

Documents (document.py)

PDF/DOCX extraction via existing skills. Full text, structure, embedded images. Gate by entity_id.

3.5 Deduplication

def check_duplicate(source_url, raw_text):
    content_hash = sha256(f"{source_url}:{raw_text[:500]}").hexdigest()
    existing = supabase.table('content')
      .select('id').eq('source_hash', content_hash).execute()
    if existing.data:
        return True, existing.data[0]['id']
    return False, None
# If dup found: skip insert, optionally update metadata (engagement may change)

3.6 Embedding Pipeline

Model: text-embedding-3-small (OpenAI, 1536 dims). Chunking: Long content (>2000 tokens): split into 1500-token chunks with 200-token overlap. Each chunk gets own embedding + parent link. Batch: Groups of 50. Rate: 3000 RPM.

Content chunks table

CREATE TABLE content_chunks (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  parent_id UUID NOT NULL REFERENCES content(id) ON DELETE CASCADE,
  chunk_index INT NOT NULL,
  chunk_text TEXT NOT NULL,
  embedding vector(1536),
  UNIQUE(parent_id, chunk_index)
);
CREATE INDEX idx_chunks_parent ON content_chunks(parent_id);
CREATE INDEX idx_chunks_embedding ON content_chunks
  USING ivfflat (embedding vector_cosine_ops);

3.7 n8n Scheduler Workflows

WorkflowTriggerAction
RSS PollCron: every 6hRead sources.yaml → call /api/ingest/rss per feed
YouTube Channel CheckDaily 6 AM CTRead tracked channels → call /api/ingest/youtube per new video
Competitor Ad RefreshWeekly Mon 3 AM CTRead Gate 4 entities → call /api/ingest/meta-ads per competitor
On-demand IngestWebhookAccept { source_type, url, entity_id, gate } → route to extractor

3.8 Research Service

Trigger: New entity with type='prospect' AND gate=2 created in entities table.

Auto-runs: 1) Web scraper: company website → copy, offers, positioning, tech stack. 2) Social discovery: find all profiles → store URLs. 3) YouTube scan: find channel → queue videos. 4) Podcast scan: search directories for appearances. 5) Competitor ID: ask Claude for top 5 competitors → create Gate 4 entities. 6) LinkedIn profile extraction. 7) Compile research brief.

Research brief output structure

{
  "company": { "name":"", "website":"", "founded":"", "team_size":"" },
  "audience": { "size_estimate":"", "demographics":"", "primary_platform":"" },
  "content": { "youtube_videos":0, "podcast_episodes":0, "blog_posts":0, "social_cadence":"" },
  "offers": [{ "name":"", "price":"", "format":"" }],
  "positioning": "",
  "pain_points_inferred": [],
  "competitors_identified": [],
  "channels": { "youtube":"", "instagram":"", "linkedin":"", "twitter":"", "tiktok":"" }
}

3.9 FastAPI Endpoints

POST /api/ingest/youtube       { url, entity_id, gate }
POST /api/ingest/rss           { feed_url, gate }
POST /api/ingest/meta-ads      { advertiser, entity_id }
POST /api/ingest/social        { platform, handle, entity_id, gate }
POST /api/ingest/web           { url, entity_id, gate }
POST /api/ingest/document      { file_path, entity_id, gate }
POST /api/ingest/transcript    { webhook payload from Tactiq/Fireflies }
POST /api/research/run         { entity_id }
GET  /api/status               # health + queue depth
GET  /api/stats                # counts by gate, source_type, date

3.10 PRD Reconciliation

Previous PRDWhat we keepWhat changes
RSS Intelligence Router (Feb 2026)YouTube API + RSS extraction logic, 7-day lookbackFile-based → Supabase. Hardcoded → sources.yaml. Single-tenant → multi-gate.
Content Notebook (Feb 2026)n8n pipeline design, dedup, embedding pipelineStandalone n8n → n8n as scheduler for Python service. RSS-only → all sources.
Signal Engine conceptHN/Medium/Substack sources, sprint filteringBecomes Gate 1 config. Sprint filtering moves to lens layer.

3.11 Acceptance Criteria

MASTERYMADE — PRD 3 of 12 — plan.jasondmacdonald.com

Dominia Facta. Build what compounds.