426 lines
16 KiB
Python
426 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Contact Enrichment Pipeline v1.0
|
|
Enriches scraped contacts using 3 combined methods:
|
|
1. WHOIS lookup (company/org from email domain)
|
|
2. LinkedIn profile parsing (name/job/company from source_url)
|
|
3. Reverse email pattern analysis (first/last name from email format)
|
|
|
|
Runs against admin.scrapping_results in adx_system DB.
|
|
"""
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import re
|
|
import subprocess
|
|
import json
|
|
import time
|
|
import sys
|
|
import os
|
|
from collections import defaultdict
|
|
|
|
# ── DB Connection ──
|
|
DB = dict(host='localhost', dbname='adx_system', user='admin', password='admin123')
|
|
|
|
def get_db():
|
|
conn = psycopg2.connect(**DB)
|
|
conn.autocommit = True
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("SET search_path TO admin, public")
|
|
return conn, cur
|
|
|
|
# ══════════════════════════════════════════
|
|
# METHOD 1: Email Pattern → Name Extraction
|
|
# ══════════════════════════════════════════
|
|
# Patterns: john.doe@, john_doe@, johndoe@, j.doe@, firstname.lastname@
|
|
|
|
NAME_SEPARATORS = re.compile(r'[._\-]')
|
|
COMMON_PREFIXES = {'info','contact','admin','support','sales','hello','hi','office',
|
|
'team','noreply','no-reply','webmaster','postmaster','marketing','billing',
|
|
'help','service','mail','email','enquiry','enquiries','press','media',
|
|
'newsletter','abuse','spam','security','feedback','customerservice',
|
|
'developer','dev','test','demo','api','bot','system','root','www',
|
|
'subscribe','unsubscribe','reply','bounce','return','mailer-daemon'}
|
|
|
|
def extract_name_from_email(email):
|
|
"""Extract probable first/last name from email local part."""
|
|
if not email or '@' not in email:
|
|
return None, None
|
|
local = email.split('@')[0].lower().strip()
|
|
|
|
# Skip generic/non-personal addresses
|
|
if local in COMMON_PREFIXES:
|
|
return None, None
|
|
if re.match(r'^[a-z]{1,2}\d{3,}', local): # bot-like: a12345
|
|
return None, None
|
|
if len(local) < 3:
|
|
return None, None
|
|
|
|
parts = NAME_SEPARATORS.split(local)
|
|
parts = [p for p in parts if p and not p.isdigit() and len(p) > 1]
|
|
|
|
if len(parts) >= 2:
|
|
first = parts[0].capitalize()
|
|
last = parts[-1].capitalize()
|
|
# Filter out numbers from names
|
|
first = re.sub(r'\d+', '', first)
|
|
last = re.sub(r'\d+', '', last)
|
|
if len(first) > 1 and len(last) > 1 and first.lower() not in COMMON_PREFIXES:
|
|
return first, last
|
|
elif len(parts) == 1:
|
|
name = parts[0]
|
|
# Try to split camelCase or long names
|
|
# Just store as first name if it looks like a name
|
|
if len(name) >= 3 and name.isalpha() and name.lower() not in COMMON_PREFIXES:
|
|
return name.capitalize(), None
|
|
|
|
return None, None
|
|
|
|
# ══════════════════════════════════════════
|
|
# METHOD 2: WHOIS → Company/Org from Domain
|
|
# ══════════════════════════════════════════
|
|
|
|
# Cache WHOIS results per domain
|
|
_whois_cache = {}
|
|
|
|
# Known ISP/freemail domains → skip WHOIS
|
|
FREEMAIL_DOMAINS = {
|
|
'gmail.com','yahoo.com','hotmail.com','outlook.com','aol.com','icloud.com',
|
|
'mail.com','protonmail.com','zoho.com','yandex.com','gmx.de','gmx.net',
|
|
'web.de','t-online.de','freenet.de','arcor.de','hotmail.de','hotmail.fr',
|
|
'yahoo.de','yahoo.fr','live.com','live.de','live.fr','msn.com',
|
|
'videotron.ca','bell.net','rogers.com','shaw.ca','telus.net',
|
|
'orange.fr','free.fr','sfr.fr','laposte.net','wanadoo.fr',
|
|
'btinternet.com','sky.com','virgin.net','ntlworld.com','talktalk.net',
|
|
'bluewin.ch','hotmail.ch','gmx.ch','sunrise.ch',
|
|
'hotmail.co.uk','yahoo.co.uk','googlemail.com',
|
|
'comcast.net','verizon.net','att.net','cox.net','sbcglobal.net',
|
|
'earthlink.net','charter.net','optonline.net','frontier.com',
|
|
'mail.ru','inbox.ru','list.ru','bk.ru',
|
|
'wp.pl','o2.pl','interia.pl','onet.pl',
|
|
'libero.it','virgilio.it','alice.it','tin.it','tiscali.it',
|
|
'terra.com.br','uol.com.br','bol.com.br',
|
|
'rediffmail.com','163.com','126.com','qq.com','sina.com',
|
|
'naver.com','daum.net','hanmail.net',
|
|
}
|
|
|
|
def whois_lookup_company(domain):
|
|
"""Get organization/company from WHOIS for business domains."""
|
|
if domain in _whois_cache:
|
|
return _whois_cache[domain]
|
|
|
|
if domain.lower() in FREEMAIL_DOMAINS:
|
|
_whois_cache[domain] = None
|
|
return None
|
|
|
|
try:
|
|
import whois
|
|
w = whois.whois(domain)
|
|
org = None
|
|
if hasattr(w, 'org') and w.org:
|
|
org = w.org if isinstance(w.org, str) else w.org[0]
|
|
elif hasattr(w, 'registrant_name') and w.registrant_name:
|
|
org = w.registrant_name
|
|
|
|
# Clean up
|
|
if org:
|
|
org = org.strip()
|
|
# Skip privacy services
|
|
privacy_keywords = ['privacy','proxy','whoisguard','domains by proxy',
|
|
'contact privacy','redacted','data protected','withheld',
|
|
'identity protect','domain protection','not disclosed']
|
|
if any(kw in org.lower() for kw in privacy_keywords):
|
|
org = None
|
|
|
|
_whois_cache[domain] = org
|
|
return org
|
|
except Exception:
|
|
_whois_cache[domain] = None
|
|
return None
|
|
|
|
# ══════════════════════════════════════════
|
|
# METHOD 3: LinkedIn Profile → Name/Job/Company
|
|
# ══════════════════════════════════════════
|
|
|
|
def parse_linkedin_source(source_url):
|
|
"""Extract name from LinkedIn source URL pattern like linkedin.com/in/firstname-lastname-at-domain"""
|
|
if not source_url or 'linkedin.com/in/' not in source_url:
|
|
return None, None, None
|
|
|
|
# Extract the profile slug
|
|
m = re.search(r'linkedin\.com/in/([^/?#]+)', source_url)
|
|
if not m:
|
|
return None, None, None
|
|
|
|
slug = m.group(1)
|
|
|
|
# Pattern: "firstname-lastname-at-domain" or "firstname.lastname-at-domain"
|
|
# or just "firstnamelastname"
|
|
|
|
# Try -at- pattern (email-like LinkedIn slugs from scraper)
|
|
at_match = re.match(r'^(.+?)-at-(.+)$', slug)
|
|
if at_match:
|
|
name_part = at_match.group(1)
|
|
domain_part = at_match.group(2).replace('-', '.')
|
|
|
|
# Parse name from name_part
|
|
name_parts = re.split(r'[.\-_]', name_part)
|
|
name_parts = [p.capitalize() for p in name_parts if p and len(p) > 1]
|
|
|
|
if len(name_parts) >= 2:
|
|
return name_parts[0], name_parts[-1], None
|
|
elif len(name_parts) == 1:
|
|
return name_parts[0], None, None
|
|
|
|
# Standard LinkedIn slug: firstname-lastname-xxx
|
|
parts = slug.split('-')
|
|
# Remove trailing hex/id parts
|
|
clean = [p for p in parts if p and not re.match(r'^[0-9a-f]{6,}$', p) and len(p) > 1]
|
|
|
|
if len(clean) >= 2:
|
|
return clean[0].capitalize(), clean[1].capitalize(), None
|
|
elif len(clean) == 1:
|
|
return clean[0].capitalize(), None, None
|
|
|
|
return None, None, None
|
|
|
|
# ══════════════════════════════════════════
|
|
# METHOD 4: Domain → Company name heuristic
|
|
# ══════════════════════════════════════════
|
|
|
|
def domain_to_company(domain):
|
|
"""Guess company name from domain (for business domains only)."""
|
|
if domain.lower() in FREEMAIL_DOMAINS:
|
|
return None
|
|
|
|
# Remove TLD
|
|
name = domain.split('.')[0]
|
|
if len(name) < 3:
|
|
return None
|
|
|
|
# Convert hyphens/underscores to spaces, capitalize
|
|
name = re.sub(r'[-_]', ' ', name)
|
|
# CamelCase split
|
|
name = re.sub(r'([a-z])([A-Z])', r'\1 \2', name)
|
|
name = name.strip().title()
|
|
|
|
if len(name) >= 3:
|
|
return name
|
|
return None
|
|
|
|
# ══════════════════════════════════════════
|
|
# ISP Detection from email domain
|
|
# ══════════════════════════════════════════
|
|
|
|
ISP_MAP = {
|
|
't-online.de': 'T-Online', 'web.de': 'Web.de', 'gmx.de': 'GMX',
|
|
'gmx.net': 'GMX', 'freenet.de': 'Freenet', 'arcor.de': 'Arcor',
|
|
'gmail.com': 'Gmail', 'googlemail.com': 'Gmail',
|
|
'hotmail.com': 'Hotmail', 'hotmail.de': 'Hotmail', 'hotmail.fr': 'Hotmail',
|
|
'hotmail.co.uk': 'Hotmail', 'hotmail.ch': 'Hotmail',
|
|
'outlook.com': 'Outlook', 'live.com': 'Outlook', 'live.de': 'Outlook',
|
|
'yahoo.com': 'Yahoo', 'yahoo.de': 'Yahoo', 'yahoo.fr': 'Yahoo',
|
|
'yahoo.co.uk': 'Yahoo',
|
|
'videotron.ca': 'Videotron', 'bell.net': 'Bell', 'rogers.com': 'Rogers',
|
|
'orange.fr': 'Orange', 'free.fr': 'Free', 'sfr.fr': 'SFR',
|
|
'bluewin.ch': 'Bluewin',
|
|
'comcast.net': 'Comcast', 'verizon.net': 'Verizon', 'att.net': 'AT&T',
|
|
'aol.com': 'AOL', 'icloud.com': 'iCloud',
|
|
'btinternet.com': 'BT', 'sky.com': 'Sky', 'ntlworld.com': 'NTL',
|
|
'virgin.net': 'Virgin Media', 'talktalk.net': 'TalkTalk',
|
|
}
|
|
|
|
def detect_isp(email):
|
|
"""Detect ISP from email domain."""
|
|
domain = email.split('@')[-1].lower()
|
|
return ISP_MAP.get(domain, None)
|
|
|
|
# ══════════════════════════════════════════
|
|
# Country detection from domain TLD
|
|
# ══════════════════════════════════════════
|
|
|
|
TLD_COUNTRY = {
|
|
'.de': 'Germany', '.fr': 'France', '.uk': 'United Kingdom', '.co.uk': 'United Kingdom',
|
|
'.ca': 'Canada', '.ch': 'Switzerland', '.at': 'Austria', '.nl': 'Netherlands',
|
|
'.be': 'Belgium', '.it': 'Italy', '.es': 'Spain', '.pt': 'Portugal',
|
|
'.pl': 'Poland', '.cz': 'Czech Republic', '.se': 'Sweden', '.no': 'Norway',
|
|
'.dk': 'Denmark', '.fi': 'Finland', '.ie': 'Ireland', '.ru': 'Russia',
|
|
'.com': None, '.net': None, '.org': None, # generic
|
|
'.us': 'United States', '.au': 'Australia', '.nz': 'New Zealand',
|
|
'.jp': 'Japan', '.br': 'Brazil', '.mx': 'Mexico',
|
|
'.ma': 'Morocco', '.tn': 'Tunisia', '.dz': 'Algeria',
|
|
'.ae': 'UAE', '.sa': 'Saudi Arabia', '.za': 'South Africa',
|
|
}
|
|
|
|
def detect_country(email):
|
|
"""Detect likely country from email domain TLD."""
|
|
domain = email.split('@')[-1].lower()
|
|
# Check compound TLDs first (.co.uk)
|
|
for tld, country in sorted(TLD_COUNTRY.items(), key=lambda x: -len(x[0])):
|
|
if domain.endswith(tld) and country:
|
|
return country
|
|
return None
|
|
|
|
# ══════════════════════════════════════════
|
|
# MAIN ENRICHMENT PIPELINE
|
|
# ══════════════════════════════════════════
|
|
|
|
def enrich_contact(row):
|
|
"""Enrich a single contact using all 3 methods. Returns dict of updates."""
|
|
email = row.get('email', '')
|
|
source = row.get('source_url', '')
|
|
updates = {}
|
|
|
|
if not email:
|
|
return updates
|
|
|
|
domain = email.split('@')[-1].lower() if '@' in email else ''
|
|
|
|
# ── Method 1: Extract name from email pattern ──
|
|
first, last = extract_name_from_email(email)
|
|
if first:
|
|
full = f"{first} {last}" if last else first
|
|
updates['full_name'] = full
|
|
|
|
# ── Method 2: LinkedIn profile parsing (override if better) ──
|
|
ln_first, ln_last, ln_job = parse_linkedin_source(source)
|
|
if ln_first:
|
|
full = f"{ln_first} {ln_last}" if ln_last else ln_first
|
|
updates['full_name'] = full # LinkedIn name takes priority
|
|
|
|
# ── Method 3: WHOIS company lookup ──
|
|
if domain and domain not in FREEMAIL_DOMAINS:
|
|
# Business email → WHOIS for company
|
|
org = whois_lookup_company(domain)
|
|
if org:
|
|
updates['company'] = org
|
|
else:
|
|
# Fallback: guess from domain name
|
|
guess = domain_to_company(domain)
|
|
if guess:
|
|
updates['company'] = guess
|
|
|
|
# ── ISP detection for freemail ──
|
|
isp = detect_isp(email)
|
|
if isp and not updates.get('company'):
|
|
updates['revenue_range'] = isp # Store ISP in revenue_range field
|
|
|
|
# ── Country from TLD ──
|
|
country = detect_country(email)
|
|
if country:
|
|
updates['location'] = country
|
|
|
|
return updates
|
|
|
|
def run_enrichment(batch_size=100, limit=None, dry_run=False):
|
|
"""Run enrichment on all contacts missing metadata."""
|
|
conn, cur = get_db()
|
|
|
|
# Get contacts needing enrichment
|
|
sql = """SELECT id, email, source_url, full_name, company, location
|
|
FROM scrapping_results
|
|
WHERE (full_name IS NULL OR full_name = '')
|
|
ORDER BY id"""
|
|
if limit:
|
|
sql += f" LIMIT {limit}"
|
|
|
|
cur.execute(sql)
|
|
rows = cur.fetchall()
|
|
total = len(rows)
|
|
|
|
print(f"[ENRICH] {total} contacts to enrich")
|
|
|
|
enriched = 0
|
|
whois_count = 0
|
|
name_count = 0
|
|
location_count = 0
|
|
errors = 0
|
|
|
|
# Process unique domains for WHOIS (batch)
|
|
domains = set()
|
|
for r in rows:
|
|
if r['email'] and '@' in r['email']:
|
|
d = r['email'].split('@')[-1].lower()
|
|
if d not in FREEMAIL_DOMAINS:
|
|
domains.add(d)
|
|
|
|
print(f"[WHOIS] {len(domains)} unique business domains to lookup")
|
|
|
|
# Pre-cache WHOIS for all business domains
|
|
for i, domain in enumerate(domains):
|
|
if i % 10 == 0:
|
|
print(f" WHOIS {i}/{len(domains)}...")
|
|
try:
|
|
whois_lookup_company(domain)
|
|
time.sleep(0.3) # Rate limit
|
|
except:
|
|
pass
|
|
|
|
print(f"[WHOIS] Cache filled: {len(_whois_cache)} domains")
|
|
|
|
# Now enrich each contact
|
|
batch = []
|
|
for i, row in enumerate(rows):
|
|
if i % 500 == 0 and i > 0:
|
|
print(f" Processing {i}/{total}...")
|
|
|
|
try:
|
|
updates = enrich_contact(row)
|
|
|
|
if updates:
|
|
enriched += 1
|
|
if 'full_name' in updates: name_count += 1
|
|
if 'company' in updates: whois_count += 1
|
|
if 'location' in updates: location_count += 1
|
|
|
|
if not dry_run:
|
|
sets = []
|
|
vals = []
|
|
for k, v in updates.items():
|
|
sets.append(f"{k} = %s")
|
|
vals.append(v)
|
|
vals.append(row['id'])
|
|
|
|
batch.append((sets, vals))
|
|
|
|
if len(batch) >= batch_size:
|
|
for s, v in batch:
|
|
cur.execute(f"UPDATE scrapping_results SET {', '.join(s)} WHERE id = %s", v)
|
|
batch = []
|
|
except Exception as e:
|
|
errors += 1
|
|
if errors < 5:
|
|
print(f" ERROR on {row.get('email','?')}: {e}")
|
|
|
|
# Flush remaining batch
|
|
if batch and not dry_run:
|
|
for s, v in batch:
|
|
cur.execute(f"UPDATE scrapping_results SET {', '.join(s)} WHERE id = %s", v)
|
|
|
|
conn.close()
|
|
|
|
print(f"\n{'='*50}")
|
|
print(f"ENRICHMENT COMPLETE")
|
|
print(f"{'='*50}")
|
|
print(f"Total processed: {total}")
|
|
print(f"Enriched: {enriched} ({enriched*100//max(total,1)}%)")
|
|
print(f" Names found: {name_count}")
|
|
print(f" Companies: {whois_count}")
|
|
print(f" Locations: {location_count}")
|
|
print(f"Errors: {errors}")
|
|
print(f"WHOIS cache: {len(_whois_cache)} domains")
|
|
|
|
return {'total': total, 'enriched': enriched, 'names': name_count,
|
|
'companies': whois_count, 'locations': location_count, 'errors': errors}
|
|
|
|
if __name__ == '__main__':
|
|
dry = '--dry-run' in sys.argv
|
|
lim = None
|
|
for a in sys.argv[1:]:
|
|
if a.isdigit():
|
|
lim = int(a)
|
|
|
|
if dry:
|
|
print("[DRY RUN] No database changes will be made")
|
|
|
|
run_enrichment(limit=lim, dry_run=dry)
|