Files
wevads-platform/scripts/enrich_contacts.py
2026-02-26 04:53:11 +01:00

426 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Contact Enrichment Pipeline v1.0
Enriches scraped contacts using 3 combined methods:
1. WHOIS lookup (company/org from email domain)
2. LinkedIn profile parsing (name/job/company from source_url)
3. Reverse email pattern analysis (first/last name from email format)
Runs against admin.scrapping_results in adx_system DB.
"""
import psycopg2
import psycopg2.extras
import re
import subprocess
import json
import time
import sys
import os
from collections import defaultdict
# ── DB Connection ──
DB = dict(host='localhost', dbname='adx_system', user='admin', password='admin123')
def get_db():
conn = psycopg2.connect(**DB)
conn.autocommit = True
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SET search_path TO admin, public")
return conn, cur
# ══════════════════════════════════════════
# METHOD 1: Email Pattern → Name Extraction
# ══════════════════════════════════════════
# Patterns: john.doe@, john_doe@, johndoe@, j.doe@, firstname.lastname@
NAME_SEPARATORS = re.compile(r'[._\-]')
COMMON_PREFIXES = {'info','contact','admin','support','sales','hello','hi','office',
'team','noreply','no-reply','webmaster','postmaster','marketing','billing',
'help','service','mail','email','enquiry','enquiries','press','media',
'newsletter','abuse','spam','security','feedback','customerservice',
'developer','dev','test','demo','api','bot','system','root','www',
'subscribe','unsubscribe','reply','bounce','return','mailer-daemon'}
def extract_name_from_email(email):
"""Extract probable first/last name from email local part."""
if not email or '@' not in email:
return None, None
local = email.split('@')[0].lower().strip()
# Skip generic/non-personal addresses
if local in COMMON_PREFIXES:
return None, None
if re.match(r'^[a-z]{1,2}\d{3,}', local): # bot-like: a12345
return None, None
if len(local) < 3:
return None, None
parts = NAME_SEPARATORS.split(local)
parts = [p for p in parts if p and not p.isdigit() and len(p) > 1]
if len(parts) >= 2:
first = parts[0].capitalize()
last = parts[-1].capitalize()
# Filter out numbers from names
first = re.sub(r'\d+', '', first)
last = re.sub(r'\d+', '', last)
if len(first) > 1 and len(last) > 1 and first.lower() not in COMMON_PREFIXES:
return first, last
elif len(parts) == 1:
name = parts[0]
# Try to split camelCase or long names
# Just store as first name if it looks like a name
if len(name) >= 3 and name.isalpha() and name.lower() not in COMMON_PREFIXES:
return name.capitalize(), None
return None, None
# ══════════════════════════════════════════
# METHOD 2: WHOIS → Company/Org from Domain
# ══════════════════════════════════════════
# Cache WHOIS results per domain
_whois_cache = {}
# Known ISP/freemail domains → skip WHOIS
FREEMAIL_DOMAINS = {
'gmail.com','yahoo.com','hotmail.com','outlook.com','aol.com','icloud.com',
'mail.com','protonmail.com','zoho.com','yandex.com','gmx.de','gmx.net',
'web.de','t-online.de','freenet.de','arcor.de','hotmail.de','hotmail.fr',
'yahoo.de','yahoo.fr','live.com','live.de','live.fr','msn.com',
'videotron.ca','bell.net','rogers.com','shaw.ca','telus.net',
'orange.fr','free.fr','sfr.fr','laposte.net','wanadoo.fr',
'btinternet.com','sky.com','virgin.net','ntlworld.com','talktalk.net',
'bluewin.ch','hotmail.ch','gmx.ch','sunrise.ch',
'hotmail.co.uk','yahoo.co.uk','googlemail.com',
'comcast.net','verizon.net','att.net','cox.net','sbcglobal.net',
'earthlink.net','charter.net','optonline.net','frontier.com',
'mail.ru','inbox.ru','list.ru','bk.ru',
'wp.pl','o2.pl','interia.pl','onet.pl',
'libero.it','virgilio.it','alice.it','tin.it','tiscali.it',
'terra.com.br','uol.com.br','bol.com.br',
'rediffmail.com','163.com','126.com','qq.com','sina.com',
'naver.com','daum.net','hanmail.net',
}
def whois_lookup_company(domain):
"""Get organization/company from WHOIS for business domains."""
if domain in _whois_cache:
return _whois_cache[domain]
if domain.lower() in FREEMAIL_DOMAINS:
_whois_cache[domain] = None
return None
try:
import whois
w = whois.whois(domain)
org = None
if hasattr(w, 'org') and w.org:
org = w.org if isinstance(w.org, str) else w.org[0]
elif hasattr(w, 'registrant_name') and w.registrant_name:
org = w.registrant_name
# Clean up
if org:
org = org.strip()
# Skip privacy services
privacy_keywords = ['privacy','proxy','whoisguard','domains by proxy',
'contact privacy','redacted','data protected','withheld',
'identity protect','domain protection','not disclosed']
if any(kw in org.lower() for kw in privacy_keywords):
org = None
_whois_cache[domain] = org
return org
except Exception:
_whois_cache[domain] = None
return None
# ══════════════════════════════════════════
# METHOD 3: LinkedIn Profile → Name/Job/Company
# ══════════════════════════════════════════
def parse_linkedin_source(source_url):
"""Extract name from LinkedIn source URL pattern like linkedin.com/in/firstname-lastname-at-domain"""
if not source_url or 'linkedin.com/in/' not in source_url:
return None, None, None
# Extract the profile slug
m = re.search(r'linkedin\.com/in/([^/?#]+)', source_url)
if not m:
return None, None, None
slug = m.group(1)
# Pattern: "firstname-lastname-at-domain" or "firstname.lastname-at-domain"
# or just "firstnamelastname"
# Try -at- pattern (email-like LinkedIn slugs from scraper)
at_match = re.match(r'^(.+?)-at-(.+)$', slug)
if at_match:
name_part = at_match.group(1)
domain_part = at_match.group(2).replace('-', '.')
# Parse name from name_part
name_parts = re.split(r'[.\-_]', name_part)
name_parts = [p.capitalize() for p in name_parts if p and len(p) > 1]
if len(name_parts) >= 2:
return name_parts[0], name_parts[-1], None
elif len(name_parts) == 1:
return name_parts[0], None, None
# Standard LinkedIn slug: firstname-lastname-xxx
parts = slug.split('-')
# Remove trailing hex/id parts
clean = [p for p in parts if p and not re.match(r'^[0-9a-f]{6,}$', p) and len(p) > 1]
if len(clean) >= 2:
return clean[0].capitalize(), clean[1].capitalize(), None
elif len(clean) == 1:
return clean[0].capitalize(), None, None
return None, None, None
# ══════════════════════════════════════════
# METHOD 4: Domain → Company name heuristic
# ══════════════════════════════════════════
def domain_to_company(domain):
"""Guess company name from domain (for business domains only)."""
if domain.lower() in FREEMAIL_DOMAINS:
return None
# Remove TLD
name = domain.split('.')[0]
if len(name) < 3:
return None
# Convert hyphens/underscores to spaces, capitalize
name = re.sub(r'[-_]', ' ', name)
# CamelCase split
name = re.sub(r'([a-z])([A-Z])', r'\1 \2', name)
name = name.strip().title()
if len(name) >= 3:
return name
return None
# ══════════════════════════════════════════
# ISP Detection from email domain
# ══════════════════════════════════════════
ISP_MAP = {
't-online.de': 'T-Online', 'web.de': 'Web.de', 'gmx.de': 'GMX',
'gmx.net': 'GMX', 'freenet.de': 'Freenet', 'arcor.de': 'Arcor',
'gmail.com': 'Gmail', 'googlemail.com': 'Gmail',
'hotmail.com': 'Hotmail', 'hotmail.de': 'Hotmail', 'hotmail.fr': 'Hotmail',
'hotmail.co.uk': 'Hotmail', 'hotmail.ch': 'Hotmail',
'outlook.com': 'Outlook', 'live.com': 'Outlook', 'live.de': 'Outlook',
'yahoo.com': 'Yahoo', 'yahoo.de': 'Yahoo', 'yahoo.fr': 'Yahoo',
'yahoo.co.uk': 'Yahoo',
'videotron.ca': 'Videotron', 'bell.net': 'Bell', 'rogers.com': 'Rogers',
'orange.fr': 'Orange', 'free.fr': 'Free', 'sfr.fr': 'SFR',
'bluewin.ch': 'Bluewin',
'comcast.net': 'Comcast', 'verizon.net': 'Verizon', 'att.net': 'AT&T',
'aol.com': 'AOL', 'icloud.com': 'iCloud',
'btinternet.com': 'BT', 'sky.com': 'Sky', 'ntlworld.com': 'NTL',
'virgin.net': 'Virgin Media', 'talktalk.net': 'TalkTalk',
}
def detect_isp(email):
"""Detect ISP from email domain."""
domain = email.split('@')[-1].lower()
return ISP_MAP.get(domain, None)
# ══════════════════════════════════════════
# Country detection from domain TLD
# ══════════════════════════════════════════
TLD_COUNTRY = {
'.de': 'Germany', '.fr': 'France', '.uk': 'United Kingdom', '.co.uk': 'United Kingdom',
'.ca': 'Canada', '.ch': 'Switzerland', '.at': 'Austria', '.nl': 'Netherlands',
'.be': 'Belgium', '.it': 'Italy', '.es': 'Spain', '.pt': 'Portugal',
'.pl': 'Poland', '.cz': 'Czech Republic', '.se': 'Sweden', '.no': 'Norway',
'.dk': 'Denmark', '.fi': 'Finland', '.ie': 'Ireland', '.ru': 'Russia',
'.com': None, '.net': None, '.org': None, # generic
'.us': 'United States', '.au': 'Australia', '.nz': 'New Zealand',
'.jp': 'Japan', '.br': 'Brazil', '.mx': 'Mexico',
'.ma': 'Morocco', '.tn': 'Tunisia', '.dz': 'Algeria',
'.ae': 'UAE', '.sa': 'Saudi Arabia', '.za': 'South Africa',
}
def detect_country(email):
"""Detect likely country from email domain TLD."""
domain = email.split('@')[-1].lower()
# Check compound TLDs first (.co.uk)
for tld, country in sorted(TLD_COUNTRY.items(), key=lambda x: -len(x[0])):
if domain.endswith(tld) and country:
return country
return None
# ══════════════════════════════════════════
# MAIN ENRICHMENT PIPELINE
# ══════════════════════════════════════════
def enrich_contact(row):
"""Enrich a single contact using all 3 methods. Returns dict of updates."""
email = row.get('email', '')
source = row.get('source_url', '')
updates = {}
if not email:
return updates
domain = email.split('@')[-1].lower() if '@' in email else ''
# ── Method 1: Extract name from email pattern ──
first, last = extract_name_from_email(email)
if first:
full = f"{first} {last}" if last else first
updates['full_name'] = full
# ── Method 2: LinkedIn profile parsing (override if better) ──
ln_first, ln_last, ln_job = parse_linkedin_source(source)
if ln_first:
full = f"{ln_first} {ln_last}" if ln_last else ln_first
updates['full_name'] = full # LinkedIn name takes priority
# ── Method 3: WHOIS company lookup ──
if domain and domain not in FREEMAIL_DOMAINS:
# Business email → WHOIS for company
org = whois_lookup_company(domain)
if org:
updates['company'] = org
else:
# Fallback: guess from domain name
guess = domain_to_company(domain)
if guess:
updates['company'] = guess
# ── ISP detection for freemail ──
isp = detect_isp(email)
if isp and not updates.get('company'):
updates['revenue_range'] = isp # Store ISP in revenue_range field
# ── Country from TLD ──
country = detect_country(email)
if country:
updates['location'] = country
return updates
def run_enrichment(batch_size=100, limit=None, dry_run=False):
"""Run enrichment on all contacts missing metadata."""
conn, cur = get_db()
# Get contacts needing enrichment
sql = """SELECT id, email, source_url, full_name, company, location
FROM scrapping_results
WHERE (full_name IS NULL OR full_name = '')
ORDER BY id"""
if limit:
sql += f" LIMIT {limit}"
cur.execute(sql)
rows = cur.fetchall()
total = len(rows)
print(f"[ENRICH] {total} contacts to enrich")
enriched = 0
whois_count = 0
name_count = 0
location_count = 0
errors = 0
# Process unique domains for WHOIS (batch)
domains = set()
for r in rows:
if r['email'] and '@' in r['email']:
d = r['email'].split('@')[-1].lower()
if d not in FREEMAIL_DOMAINS:
domains.add(d)
print(f"[WHOIS] {len(domains)} unique business domains to lookup")
# Pre-cache WHOIS for all business domains
for i, domain in enumerate(domains):
if i % 10 == 0:
print(f" WHOIS {i}/{len(domains)}...")
try:
whois_lookup_company(domain)
time.sleep(0.3) # Rate limit
except:
pass
print(f"[WHOIS] Cache filled: {len(_whois_cache)} domains")
# Now enrich each contact
batch = []
for i, row in enumerate(rows):
if i % 500 == 0 and i > 0:
print(f" Processing {i}/{total}...")
try:
updates = enrich_contact(row)
if updates:
enriched += 1
if 'full_name' in updates: name_count += 1
if 'company' in updates: whois_count += 1
if 'location' in updates: location_count += 1
if not dry_run:
sets = []
vals = []
for k, v in updates.items():
sets.append(f"{k} = %s")
vals.append(v)
vals.append(row['id'])
batch.append((sets, vals))
if len(batch) >= batch_size:
for s, v in batch:
cur.execute(f"UPDATE scrapping_results SET {', '.join(s)} WHERE id = %s", v)
batch = []
except Exception as e:
errors += 1
if errors < 5:
print(f" ERROR on {row.get('email','?')}: {e}")
# Flush remaining batch
if batch and not dry_run:
for s, v in batch:
cur.execute(f"UPDATE scrapping_results SET {', '.join(s)} WHERE id = %s", v)
conn.close()
print(f"\n{'='*50}")
print(f"ENRICHMENT COMPLETE")
print(f"{'='*50}")
print(f"Total processed: {total}")
print(f"Enriched: {enriched} ({enriched*100//max(total,1)}%)")
print(f" Names found: {name_count}")
print(f" Companies: {whois_count}")
print(f" Locations: {location_count}")
print(f"Errors: {errors}")
print(f"WHOIS cache: {len(_whois_cache)} domains")
return {'total': total, 'enriched': enriched, 'names': name_count,
'companies': whois_count, 'locations': location_count, 'errors': errors}
if __name__ == '__main__':
dry = '--dry-run' in sys.argv
lim = None
for a in sys.argv[1:]:
if a.isdigit():
lim = int(a)
if dry:
print("[DRY RUN] No database changes will be made")
run_enrichment(limit=lim, dry_run=dry)