195 lines
8.2 KiB
Python
195 lines
8.2 KiB
Python
|
# from playwright.sync_api import sync_playwright
|
||
|
from playwright.async_api import async_playwright, Playwright, Browser, BrowserContext, Page, Route
|
||
|
from collections import Counter
|
||
|
from functools import partial
|
||
|
from typing import Optional
|
||
|
import os
|
||
|
import re
|
||
|
import time
|
||
|
import logging
|
||
|
import uuid
|
||
|
import asyncio
|
||
|
import copy
|
||
|
|
||
|
class RoutePage(object):
|
||
|
|
||
|
def __init__(self, page: Page, route, func):
|
||
|
self.page = page
|
||
|
|
||
|
def __enter__(self):
|
||
|
self.page.route(self.route, self.func)
|
||
|
|
||
|
def __exit__(self):
|
||
|
self.page.unroute(self.route)
|
||
|
|
||
|
class Renderer:
|
||
|
# TODO: URL decide the size of canvas
|
||
|
EDIT_URL = os.environ.get("CANVA_EDIT_URL", "")
|
||
|
BLOCKED_URL = [
|
||
|
r"www.canva.com/_ajax/ae/createBatch.*",
|
||
|
r"telemetry.canva.com/v1/traces.*",
|
||
|
r"www.canva.com/_ajax/search/media/usage.*",
|
||
|
r"www.canva.com/_ajax/reaction/reactions/summaries.*"
|
||
|
r"www.canva.com/_ajax/search/templates.*",
|
||
|
r"www.canva.com/_ajax/search/related-items.*",
|
||
|
r"www.canva.com/_ajax/profile/v2/brands/summary.*",
|
||
|
r"sentry.io/api/1766513/envelope.*",
|
||
|
r"www.canva.com/cdn-cgi/rum.*",
|
||
|
r"www.canva.com/_ajax/usage/struct.*",
|
||
|
r"www.canva.com/_ajax/alerts.*",
|
||
|
]
|
||
|
def __init__(self, storage_state: str = "me@xinyang.life.json", debug: bool = False, concurrent_workers: int = 1):
|
||
|
self.storage_state = storage_state
|
||
|
self.browser: Optional[Browser] = None
|
||
|
self.context: Optional[BrowserContext] = None
|
||
|
self.queue: asyncio.Queue[tuple[bytes, asyncio.Queue]] = asyncio.Queue(maxsize=1000)
|
||
|
self.debug = debug
|
||
|
self.logger = logging.getLogger(__name__)
|
||
|
self.concurrent_workers = concurrent_workers
|
||
|
self.worker_pages = dict()
|
||
|
|
||
|
async def render_page(self, cdf: bytes):
|
||
|
result = asyncio.Queue(maxsize=1)
|
||
|
await self.queue.put((cdf, result))
|
||
|
page: Optional[bytes] = copy.deepcopy(await result.get())
|
||
|
del result
|
||
|
return page
|
||
|
|
||
|
async def worker_screenshot(self, worker_id: int) -> bytes:
|
||
|
return await self.worker_pages[worker_id].screenshot()
|
||
|
|
||
|
async def worker_content(self, worker_id: int) -> bytes:
|
||
|
return await self.worker_pages[worker_id].content()
|
||
|
|
||
|
async def run(self):
|
||
|
async with async_playwright() as p:
|
||
|
self.browser = await p.firefox.launch()
|
||
|
self.context = await self.browser.new_context(storage_state=self.storage_state, viewport={"width": 2000, "height": 2500})
|
||
|
tasks = set()
|
||
|
for worker_id in range(self.concurrent_workers):
|
||
|
tasks.add(self._workers(worker_id, delay=worker_id * 15))
|
||
|
try:
|
||
|
await asyncio.gather(*tasks, return_exceptions=False)
|
||
|
except asyncio.CancelledError:
|
||
|
self.logger.error("a worker cancelled")
|
||
|
|
||
|
async def _workers(self, worker_id: int, delay: float = 0):
|
||
|
await asyncio.sleep(delay)
|
||
|
page = await self._new_page()
|
||
|
self.worker_pages[worker_id] = page
|
||
|
self.logger.info(f"Created page in worker {worker_id}")
|
||
|
logger = self.logger.getChild(f"worker-{worker_id}")
|
||
|
remain_time = 100000 * 60 # Won't reach memory limit on Singularity, so disable this
|
||
|
start_time = time.time()
|
||
|
while True:
|
||
|
cdf, result = await self.queue.get()
|
||
|
logger.info(f"Got cdf from queue")
|
||
|
# Restart worker every 30 minutes to avoid memory leak
|
||
|
if time.time() - start_time > remain_time:
|
||
|
await page.close()
|
||
|
page = await self._new_page()
|
||
|
self.worker_pages[worker_id] = page
|
||
|
logger.info(f"Restart page in worker {worker_id}")
|
||
|
start_time = time.time()
|
||
|
try:
|
||
|
await result.put(await self._render(page, cdf, logger=logger))
|
||
|
logger.info(f"Put result to queue")
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error: {e}")
|
||
|
await result.put(None)
|
||
|
self.queue.task_done()
|
||
|
|
||
|
async def _new_page(self):
|
||
|
assert self.context is not None
|
||
|
page = await self.context.new_page()
|
||
|
|
||
|
# Disable websocket here!
|
||
|
await page.add_init_script("WebSocket = () => {};")
|
||
|
|
||
|
await page.goto(self.EDIT_URL)
|
||
|
|
||
|
# Wait for search panel to be loaded
|
||
|
await page.wait_for_timeout(10000)
|
||
|
|
||
|
if self.debug:
|
||
|
await page.screenshot(path="start_page.png")
|
||
|
|
||
|
await page.get_by_role("tab", name="Starred").click()
|
||
|
# make sure you got something in starred tab
|
||
|
self.template_selector = page.get_by_label("Black Million Waves Logo").first
|
||
|
await self.template_selector.click()
|
||
|
await page.wait_for_timeout(5000)
|
||
|
|
||
|
await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0})
|
||
|
await page.wait_for_timeout(50)
|
||
|
self.init_render = await page.locator("css=._2y_DBA").first.screenshot()
|
||
|
|
||
|
await self.block(page, self.BLOCKED_URL)
|
||
|
# await page.route(re.compile(r"https://www.canva.com/_online/.*"), lambda x: x.fulfill(status=200, body=b""))
|
||
|
# await self.context.storage_state(path="me@xinyang.life.json")
|
||
|
await page.screenshot(path="final_start_page.png")
|
||
|
return page
|
||
|
|
||
|
async def _render(self, page: Page, cdf: bytes, logger: Optional[logging.Logger] = None) -> Optional[bytes]:
|
||
|
transaction_id = uuid.uuid4()
|
||
|
if logger is None:
|
||
|
logger = self.logger
|
||
|
logger.info(f"start rendering page")
|
||
|
with RoutePage(page, re.compile(r"https://template.canva.com/.*\.cdf"), partial(self.handle, cdf)) as r:
|
||
|
try:
|
||
|
await page.get_by_label("Black Million Waves Logo", exact=True).click()
|
||
|
except Exception as e:
|
||
|
logger.error(f"Exception({transaction_id}): {e} (trying to click continue editing)")
|
||
|
await page.screenshot(path=f"{transaction_id}-error-open-template.png")
|
||
|
try:
|
||
|
await page.get_by_text("continue editing").click()
|
||
|
await page.get_by_label("Black Million Waves Logo", exact=True).click()
|
||
|
except Exception as e:
|
||
|
await page.get_by_text("continue editing").highlight()
|
||
|
logger.error(f"Exception({transaction_id}): no `continue editing` button")
|
||
|
await page.screenshot(path=f"{transaction_id}-error-continue-editing.png")
|
||
|
return None
|
||
|
|
||
|
logger.info(f"template item opened")
|
||
|
|
||
|
try:
|
||
|
await page.get_by_role("button", name="Replace current page").click(timeout=5000)
|
||
|
logger.info("Replace current page popup, clicked")
|
||
|
await page.get_by_label("Black Million Waves Logo", exact=True).click()
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
# try:
|
||
|
# await page.locator("css=.yzhVgQ").nth(1).wait_for(state="attached")
|
||
|
# except Exception as e:
|
||
|
# logger.error(f"Exception({transaction_id}): {e}")
|
||
|
# await page.screenshot(path=f"{transaction_id}-error-yzhVgQ.png")
|
||
|
await page.wait_for_timeout(8000)
|
||
|
|
||
|
await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0})
|
||
|
await page.wait_for_timeout(50)
|
||
|
result = await page.locator("css=._2y_DBA").first.screenshot()
|
||
|
logger.info(f"screenshot taken")
|
||
|
|
||
|
# Assert the new page is different from init_render
|
||
|
if result == self.init_render:
|
||
|
logger.error(f"Exception({transaction_id}): result is the same as init_render")
|
||
|
return None
|
||
|
|
||
|
return result
|
||
|
|
||
|
async def block(self, page: Page, block_list=[]):
|
||
|
async def log_abort(route: Route):
|
||
|
# self.logger.info(f"BLOCKED {route.request.url}")
|
||
|
await route.abort()
|
||
|
for url in block_list:
|
||
|
await page.route(re.compile(url), log_abort)
|
||
|
|
||
|
async def unblock(self, page: Page, block_list=[]):
|
||
|
for url in block_list:
|
||
|
await page.route(re.compile(url), lambda x: x.continue_())
|
||
|
|
||
|
async def handle(self, cdf: bytes, route: Route):
|
||
|
self.logger.info(f"HIJACKED {route.request.url}")
|
||
|
await route.fulfill(body=cdf)
|