MasteryMade · Foundation PRD
One service ingests all content types into a standard schema. Replaces three previous PRDs (RSS Intelligence Router, Content Notebook, Signal Engine ingestion). Python Docker container on Forge. What changes per use case isn't the ingestion — it's the gate tag and the lens applied downstream.
Forge VPS (Docker)
├── ingest-service/
│ ├── main.py # FastAPI app
│ ├── extractors/
│ │ ├── youtube.py # Gemini API (visual + spoken)
│ │ ├── podcast.py # Whisper transcription
│ │ ├── meta_ads.py # Puppeteer/Playwright
│ │ ├── social.py # Platform API + scraper
│ │ ├── web.py # Readability + Cheerio
│ │ ├── rss.py # feedparser + full text
│ │ ├── email_cal.py # Gmail + Calendar MCP proxy
│ │ ├── transcript.py # Tactiq/Fireflies webhook
│ │ └── document.py # PDF/DOCX extraction
│ ├── processors/
│ │ ├── gate_tagger.py # Gate assignment (PRD 2 rules)
│ │ ├── annotator.py # Gemini annotation, hook class
│ │ ├── embedder.py # pgvector embedding gen
│ │ └── deduplicator.py # Source URL + content hash
│ ├── research/
│ │ └── research_service.py # Pre-demo intel (Gate 2 trigger)
│ ├── config/
│ │ ├── sources.yaml # URLs, schedules, gate maps
│ │ └── api_keys.env
│ ├── Dockerfile
│ └── requirements.txt
CREATE TABLE content (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_type TEXT NOT NULL CHECK (source_type IN (
'youtube','podcast','meta_ads','social','web','rss',
'email','calendar','session','transcript','document'
)),
source_url TEXT,
source_hash TEXT, -- SHA256 for dedup
gate INT NOT NULL CHECK (gate IN (1,2,3,4)),
entity_id UUID REFERENCES entities(id),
title TEXT,
raw_text TEXT NOT NULL,
summary TEXT, -- AI-generated 2-3 sentences
annotations JSONB DEFAULT '{}',
-- youtube: {hooks:[],pattern_interrupts:[],format:"",on_screen_text:[]}
-- meta_ads: {ad_copy:"",cta:"",offer:"",creative_format:""}
-- social: {platform:"",post_type:""}
-- rss: {category:"",publication:""}
metadata JSONB DEFAULT '{}',
-- Common: {publish_date,author,duration_seconds,word_count}
-- youtube: {views,likes,comments,shares}
-- social: {likes,comments,shares,saves,reach}
-- meta_ads: {spend,impressions,ctr,cpc,roas}
embedding vector(1536),
status TEXT NOT NULL DEFAULT 'raw' CHECK (status IN (
'raw','annotated','embedded','scored','processed'
)),
ingested_at TIMESTAMPTZ NOT NULL DEFAULT now(),
source_published_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_content_gate ON content(gate);
CREATE INDEX idx_content_entity ON content(entity_id);
CREATE INDEX idx_content_source_type ON content(source_type);
CREATE INDEX idx_content_status ON content(status);
CREATE INDEX idx_content_source_hash ON content(source_hash);
CREATE INDEX idx_content_ingested ON content(ingested_at DESC);
CREATE INDEX idx_content_embedding ON content
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
Input: YouTube URL or channel ID. Process: 1) YouTube Data API: metadata (title, description, date, views, likes). 2) Gemini API (Flash or Pro): visual + spoken annotation — full transcript, on-screen text, hook identification (first 3s, first 30s), pattern interrupts, format classification. 3) Store transcript as raw_text, annotations as JSONB. Rate limits: YouTube API 10K units/day. Gemini: batch in 5s, 2s delay. Cost: ~$0.01-0.05/video (Flash). ~$5/day for 100-500 videos.
Input: Advertiser name or page ID. Process: 1) Playwright: navigate Meta Ad Library. 2) Search advertiser. 3) Per active ad: extract copy, creative screenshot, CTA, launch date, platforms. 4) Store copy as raw_text, metadata as annotations. Gate: Always Gate 4. Rate limit: 2s between pages, max 50 ads/advertiser/run. Reference: Merci Larry pattern — 213 ads across 4 competitors in 1 hour.
Input: Feed URL from sources.yaml. Process: feedparser → new entries since last check → fetch full HTML → Readability extraction → dedup check → store. Gate: Gate 1 (Jason's feeds). Exception: expert niche feeds → Gate 2 or 4. Schedule: Every 6h for daily, every 1h for HN/breaking. Lookback: 7 days first run, then incremental.
Input: Podcast RSS or audio URL. Process: Download → Whisper API (or local whisper.cpp) → speaker diarization → topic segmentation. Cost: API ~$0.006/min. Local: free but slower.
Platforms: Instagram (Graph API), LinkedIn (scrape or manual), X (API v2). Per post: content, engagement, media type, timestamp. Image/video posts get Gemini Vision description. Rate limits: IG 200/hr, X 300 reads/15min.
Proxy to Gmail + Google Calendar MCP integrations. Extract threads, participants, action items, events. Always Gate 1.
PDF/DOCX extraction via existing skills. Full text, structure, embedded images. Gate by entity_id.
def check_duplicate(source_url, raw_text):
content_hash = sha256(f"{source_url}:{raw_text[:500]}").hexdigest()
existing = supabase.table('content')
.select('id').eq('source_hash', content_hash).execute()
if existing.data:
return True, existing.data[0]['id']
return False, None
# If dup found: skip insert, optionally update metadata (engagement may change)
Model: text-embedding-3-small (OpenAI, 1536 dims). Chunking: Long content (>2000 tokens): split into 1500-token chunks with 200-token overlap. Each chunk gets own embedding + parent link. Batch: Groups of 50. Rate: 3000 RPM.
CREATE TABLE content_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
parent_id UUID NOT NULL REFERENCES content(id) ON DELETE CASCADE,
chunk_index INT NOT NULL,
chunk_text TEXT NOT NULL,
embedding vector(1536),
UNIQUE(parent_id, chunk_index)
);
CREATE INDEX idx_chunks_parent ON content_chunks(parent_id);
CREATE INDEX idx_chunks_embedding ON content_chunks
USING ivfflat (embedding vector_cosine_ops);
| Workflow | Trigger | Action |
|---|---|---|
| RSS Poll | Cron: every 6h | Read sources.yaml → call /api/ingest/rss per feed |
| YouTube Channel Check | Daily 6 AM CT | Read tracked channels → call /api/ingest/youtube per new video |
| Competitor Ad Refresh | Weekly Mon 3 AM CT | Read Gate 4 entities → call /api/ingest/meta-ads per competitor |
| On-demand Ingest | Webhook | Accept { source_type, url, entity_id, gate } → route to extractor |
Trigger: New entity with type='prospect' AND gate=2 created in entities table.
Auto-runs: 1) Web scraper: company website → copy, offers, positioning, tech stack. 2) Social discovery: find all profiles → store URLs. 3) YouTube scan: find channel → queue videos. 4) Podcast scan: search directories for appearances. 5) Competitor ID: ask Claude for top 5 competitors → create Gate 4 entities. 6) LinkedIn profile extraction. 7) Compile research brief.
{
"company": { "name":"", "website":"", "founded":"", "team_size":"" },
"audience": { "size_estimate":"", "demographics":"", "primary_platform":"" },
"content": { "youtube_videos":0, "podcast_episodes":0, "blog_posts":0, "social_cadence":"" },
"offers": [{ "name":"", "price":"", "format":"" }],
"positioning": "",
"pain_points_inferred": [],
"competitors_identified": [],
"channels": { "youtube":"", "instagram":"", "linkedin":"", "twitter":"", "tiktok":"" }
}
POST /api/ingest/youtube { url, entity_id, gate }
POST /api/ingest/rss { feed_url, gate }
POST /api/ingest/meta-ads { advertiser, entity_id }
POST /api/ingest/social { platform, handle, entity_id, gate }
POST /api/ingest/web { url, entity_id, gate }
POST /api/ingest/document { file_path, entity_id, gate }
POST /api/ingest/transcript { webhook payload from Tactiq/Fireflies }
POST /api/research/run { entity_id }
GET /api/status # health + queue depth
GET /api/stats # counts by gate, source_type, date
| Previous PRD | What we keep | What changes |
|---|---|---|
| RSS Intelligence Router (Feb 2026) | YouTube API + RSS extraction logic, 7-day lookback | File-based → Supabase. Hardcoded → sources.yaml. Single-tenant → multi-gate. |
| Content Notebook (Feb 2026) | n8n pipeline design, dedup, embedding pipeline | Standalone n8n → n8n as scheduler for Python service. RSS-only → all sources. |
| Signal Engine concept | HN/Medium/Substack sources, sprint filtering | Becomes Gate 1 config. Sprint filtering moves to lens layer. |
MASTERYMADE — PRD 3 of 12 — plan.jasondmacdonald.com
Dominia Facta. Build what compounds.