# from playwright.sync_api import sync_playwright from playwright.async_api import async_playwright, Playwright, Browser, BrowserContext, Page, Route from collections import Counter from functools import partial from typing import Optional import os import re import time import logging import uuid import asyncio import copy class RoutePage(object): def __init__(self, page: Page, route, func): self.page = page def __enter__(self): self.page.route(self.route, self.func) def __exit__(self): self.page.unroute(self.route) class Renderer: # TODO: URL decide the size of canvas EDIT_URL = os.environ.get("CANVA_EDIT_URL", "") BLOCKED_URL = [ r"www.canva.com/_ajax/ae/createBatch.*", r"telemetry.canva.com/v1/traces.*", r"www.canva.com/_ajax/search/media/usage.*", r"www.canva.com/_ajax/reaction/reactions/summaries.*" r"www.canva.com/_ajax/search/templates.*", r"www.canva.com/_ajax/search/related-items.*", r"www.canva.com/_ajax/profile/v2/brands/summary.*", r"sentry.io/api/1766513/envelope.*", r"www.canva.com/cdn-cgi/rum.*", r"www.canva.com/_ajax/usage/struct.*", r"www.canva.com/_ajax/alerts.*", ] def __init__(self, storage_state: str = "me@xinyang.life.json", debug: bool = False, concurrent_workers: int = 1): self.storage_state = storage_state self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.queue: asyncio.Queue[tuple[bytes, asyncio.Queue]] = asyncio.Queue(maxsize=1000) self.debug = debug self.logger = logging.getLogger(__name__) self.concurrent_workers = concurrent_workers self.worker_pages = dict() async def render_page(self, cdf: bytes): result = asyncio.Queue(maxsize=1) await self.queue.put((cdf, result)) page: Optional[bytes] = copy.deepcopy(await result.get()) del result return page async def worker_screenshot(self, worker_id: int) -> bytes: return await self.worker_pages[worker_id].screenshot() async def worker_content(self, worker_id: int) -> bytes: return await self.worker_pages[worker_id].content() async def run(self): async with async_playwright() as p: self.browser = await p.firefox.launch() self.context = await self.browser.new_context(storage_state=self.storage_state, viewport={"width": 2000, "height": 2500}) tasks = set() for worker_id in range(self.concurrent_workers): tasks.add(self._workers(worker_id, delay=worker_id * 15)) try: await asyncio.gather(*tasks, return_exceptions=False) except asyncio.CancelledError: self.logger.error("a worker cancelled") async def _workers(self, worker_id: int, delay: float = 0): await asyncio.sleep(delay) page = await self._new_page() self.worker_pages[worker_id] = page self.logger.info(f"Created page in worker {worker_id}") logger = self.logger.getChild(f"worker-{worker_id}") remain_time = 100000 * 60 # Won't reach memory limit on Singularity, so disable this start_time = time.time() while True: cdf, result = await self.queue.get() logger.info(f"Got cdf from queue") # Restart worker every 30 minutes to avoid memory leak if time.time() - start_time > remain_time: await page.close() page = await self._new_page() self.worker_pages[worker_id] = page logger.info(f"Restart page in worker {worker_id}") start_time = time.time() try: await result.put(await self._render(page, cdf, logger=logger)) logger.info(f"Put result to queue") except Exception as e: logger.error(f"Error: {e}") await result.put(None) self.queue.task_done() async def _new_page(self): assert self.context is not None page = await self.context.new_page() # Disable websocket here! await page.add_init_script("WebSocket = () => {};") await page.goto(self.EDIT_URL) # Wait for search panel to be loaded await page.wait_for_timeout(10000) if self.debug: await page.screenshot(path="start_page.png") await page.get_by_role("tab", name="Starred").click() # make sure you got something in starred tab self.template_selector = page.get_by_label("Black Million Waves Logo").first await self.template_selector.click() await page.wait_for_timeout(5000) await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0}) await page.wait_for_timeout(50) self.init_render = await page.locator("css=._2y_DBA").first.screenshot() await self.block(page, self.BLOCKED_URL) # await page.route(re.compile(r"https://www.canva.com/_online/.*"), lambda x: x.fulfill(status=200, body=b"")) # await self.context.storage_state(path="me@xinyang.life.json") await page.screenshot(path="final_start_page.png") return page async def _render(self, page: Page, cdf: bytes, logger: Optional[logging.Logger] = None) -> Optional[bytes]: transaction_id = uuid.uuid4() if logger is None: logger = self.logger logger.info(f"start rendering page") with RoutePage(page, re.compile(r"https://template.canva.com/.*\.cdf"), partial(self.handle, cdf)) as r: try: await page.get_by_label("Black Million Waves Logo", exact=True).click() except Exception as e: logger.error(f"Exception({transaction_id}): {e} (trying to click continue editing)") await page.screenshot(path=f"{transaction_id}-error-open-template.png") try: await page.get_by_text("continue editing").click() await page.get_by_label("Black Million Waves Logo", exact=True).click() except Exception as e: await page.get_by_text("continue editing").highlight() logger.error(f"Exception({transaction_id}): no `continue editing` button") await page.screenshot(path=f"{transaction_id}-error-continue-editing.png") return None logger.info(f"template item opened") try: await page.get_by_role("button", name="Replace current page").click(timeout=5000) logger.info("Replace current page popup, clicked") await page.get_by_label("Black Million Waves Logo", exact=True).click() except: pass # try: # await page.locator("css=.yzhVgQ").nth(1).wait_for(state="attached") # except Exception as e: # logger.error(f"Exception({transaction_id}): {e}") # await page.screenshot(path=f"{transaction_id}-error-yzhVgQ.png") await page.wait_for_timeout(8000) await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0}) await page.wait_for_timeout(50) result = await page.locator("css=._2y_DBA").first.screenshot() logger.info(f"screenshot taken") # Assert the new page is different from init_render if result == self.init_render: logger.error(f"Exception({transaction_id}): result is the same as init_render") return None return result async def block(self, page: Page, block_list=[]): async def log_abort(route: Route): # self.logger.info(f"BLOCKED {route.request.url}") await route.abort() for url in block_list: await page.route(re.compile(url), log_abort) async def unblock(self, page: Page, block_list=[]): for url in block_list: await page.route(re.compile(url), lambda x: x.continue_()) async def handle(self, cdf: bytes, route: Route): self.logger.info(f"HIJACKED {route.request.url}") await route.fulfill(body=cdf)