#!/usr/bin/env python3 # V98 Doctrine #100 - LinkedIn browser automation via Playwright # Piloted by WEVIA Master chat. No API token required - uses persistent cookie session. import asyncio, json, sys, os, time from pathlib import Path QUEUE = Path('/opt/weval-l99/linkedin-post-queue.jsonl') PUBLISHED = Path('/opt/weval-l99/linkedin-published.jsonl') SCHEDULED = Path('/opt/weval-l99/linkedin-scheduled.jsonl') SESSION_DIR = Path('/opt/weval-l99/browser-sessions/linkedin') SCREENSHOT_DIR = Path('/var/www/html/api/playwright-results/v98-linkedin-publish') SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) LOG = Path('/var/log/v98-linkedin-browser.log') def log(msg): LOG.open('a').write(f"[{time.strftime('%Y-%m-%dT%H:%M:%S')}] {msg}\n") print(msg) def entries(f): if not f.exists(): return [] out=[] for line in f.read_text().splitlines(): if line.strip(): try: out.append(json.loads(line)) except: pass return out def write_all(f, entries): f.write_text('\n'.join(json.dumps(e) for e in entries) + ('\n' if entries else '')) async def check_login_and_publish(post_content, post_id): from playwright.async_api import async_playwright ts = time.strftime('%Y%m%d-%H%M%S') async with async_playwright() as p: ctx = await p.chromium.launch_persistent_context( str(SESSION_DIR), headless=True, viewport={'width': 1400, 'height': 900}, args=['--disable-blink-features=AutomationControlled'], user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', ) page = await ctx.new_page() try: # Try direct navigation to page admin await page.goto('https://www.linkedin.com/company/69533182/admin/page-posts/published/', timeout=25000, wait_until='domcontentloaded') await page.wait_for_timeout(3000) # Check login status url = page.url screenshot_path = SCREENSHOT_DIR / f'{post_id}-{ts}-01-init.png' await page.screenshot(path=str(screenshot_path)) if 'login' in url or 'checkpoint' in url or 'uas/login' in url: log(f"LOGIN_WALL for {post_id}: url={url}") return {'ok': False, 'err': 'LINKEDIN_SESSION_EXPIRED', 'url': url, 'screenshot': str(screenshot_path), 'action_required': 'Yacine one-time login via /api/v98-linkedin-login.php'} # Click "Start a post" button (or similar variant) clicked = False for sel in [ 'button:has-text("Start a post")', 'button:has-text("Commencer un post")', 'button:has-text("Start")', '[aria-label*="post"]', '.share-box-feed-entry__trigger', ]: try: await page.wait_for_selector(sel, timeout=4000) await page.click(sel) clicked = True break except: continue if not clicked: await page.screenshot(path=str(SCREENSHOT_DIR / f'{post_id}-{ts}-02-nobutton.png')) log(f"NO_START_POST_BUTTON for {post_id}") return {'ok': False, 'err': 'START_POST_BUTTON_NOT_FOUND', 'screenshot': str(screenshot_path)} await page.wait_for_timeout(3000) await page.screenshot(path=str(SCREENSHOT_DIR / f'{post_id}-{ts}-03-editor.png')) # Type post content into editor typed = False for sel in [ '.ql-editor', '[role="textbox"]', 'div[contenteditable="true"]', ]: try: await page.wait_for_selector(sel, timeout=4000) await page.fill(sel, post_content) typed = True break except: try: await page.wait_for_selector(sel, timeout=2000) await page.click(sel) await page.keyboard.type(post_content, delay=20) typed = True break except: continue if not typed: log(f"CANT_TYPE for {post_id}") return {'ok': False, 'err': 'EDITOR_NOT_FOUND'} await page.wait_for_timeout(2000) await page.screenshot(path=str(SCREENSHOT_DIR / f'{post_id}-{ts}-04-typed.png')) # Click "Post" / "Publier" button posted = False for sel in [ 'button:has-text("Post")', 'button:has-text("Publier")', 'button.share-actions__primary-action', '[aria-label*="Post"]', ]: try: await page.wait_for_selector(sel, timeout=4000) await page.click(sel) posted = True break except: continue if not posted: log(f"NO_POST_BUTTON for {post_id}") return {'ok': False, 'err': 'POST_BUTTON_NOT_FOUND'} await page.wait_for_timeout(6000) await page.screenshot(path=str(SCREENSHOT_DIR / f'{post_id}-{ts}-05-published.png')) log(f"PUBLISHED {post_id} via browser") return {'ok': True, 'via': 'browser_playwright', 'post_id': post_id, 'screenshot_proof': str(SCREENSHOT_DIR / f'{post_id}-{ts}-05-published.png')} except Exception as e: log(f"ERROR {post_id}: {e}") return {'ok': False, 'err': str(e)[:200]} finally: await ctx.close() async def main(): action = sys.argv[1] if len(sys.argv) > 1 else 'publish_due' results = {'action': action, 'results': []} if action == 'check_session': from playwright.async_api import async_playwright async with async_playwright() as p: ctx = await p.chromium.launch_persistent_context(str(SESSION_DIR), headless=True) page = await ctx.new_page() await page.goto('https://www.linkedin.com/feed/', timeout=20000, wait_until='domcontentloaded') await page.wait_for_timeout(3000) url = page.url logged = 'login' not in url and 'checkpoint' not in url await page.screenshot(path=str(SCREENSHOT_DIR / 'session-check.png')) await ctx.close() results = {'logged_in': logged, 'url': url, 'session_dir': str(SESSION_DIR)} elif action == 'publish_due': # Find scheduled posts due scheduled = entries(SCHEDULED) remaining = [] published_list = entries(PUBLISHED) now = time.time() for p in scheduled: due = time.mktime(time.strptime(p.get('scheduled_at', '2099-01-01T00:00:00')[:19], '%Y-%m-%dT%H:%M:%S')) if due <= now: r = await check_login_and_publish(p.get('post', ''), p.get('id', 'unk')) results['results'].append({'id': p['id'], **r}) if r.get('ok'): p['published_at'] = time.strftime('%Y-%m-%dT%H:%M:%S') p['published_via'] = 'browser_playwright_v98' p['status'] = 'published' p['screenshot'] = r.get('screenshot_proof', '') published_list.append(p) else: p['status'] = 'publish_failed' p['last_error'] = r.get('err', '') remaining.append(p) else: remaining.append(p) write_all(SCHEDULED, remaining) write_all(PUBLISHED, published_list) elif action == 'publish_id': target_id = sys.argv[2] if len(sys.argv) > 2 else '' queue = entries(QUEUE) for p in queue: if p.get('id') == target_id: r = await check_login_and_publish(p.get('post', ''), target_id) results['results'].append({'id': target_id, **r}) if r.get('ok'): p['published_at'] = time.strftime('%Y-%m-%dT%H:%M:%S') p['published_via'] = 'browser_playwright_v98' p['status'] = 'published' p['screenshot'] = r.get('screenshot_proof', '') published = entries(PUBLISHED) published.append(p) write_all(PUBLISHED, published) queue = [q for q in queue if q.get('id') != target_id] write_all(QUEUE, queue) break print(json.dumps(results, indent=2)) if __name__ == '__main__': asyncio.run(main())