canva-render/render.py
2023-12-19 14:39:13 +08:00

194 lines
8.2 KiB
Python

# from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright, Playwright, Browser, BrowserContext, Page, Route
from collections import Counter
from functools import partial
from typing import Optional
import os
import re
import time
import logging
import uuid
import asyncio
import copy
class RoutePage(object):
def __init__(self, page: Page, route, func):
self.page = page
def __enter__(self):
self.page.route(self.route, self.func)
def __exit__(self):
self.page.unroute(self.route)
class Renderer:
# TODO: URL decide the size of canvas
EDIT_URL = os.environ.get("CANVA_EDIT_URL", "")
BLOCKED_URL = [
r"www.canva.com/_ajax/ae/createBatch.*",
r"telemetry.canva.com/v1/traces.*",
r"www.canva.com/_ajax/search/media/usage.*",
r"www.canva.com/_ajax/reaction/reactions/summaries.*"
r"www.canva.com/_ajax/search/templates.*",
r"www.canva.com/_ajax/search/related-items.*",
r"www.canva.com/_ajax/profile/v2/brands/summary.*",
r"sentry.io/api/1766513/envelope.*",
r"www.canva.com/cdn-cgi/rum.*",
r"www.canva.com/_ajax/usage/struct.*",
r"www.canva.com/_ajax/alerts.*",
]
def __init__(self, storage_state: str = "me@xinyang.life.json", debug: bool = False, concurrent_workers: int = 1):
self.storage_state = storage_state
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.queue: asyncio.Queue[tuple[bytes, asyncio.Queue]] = asyncio.Queue(maxsize=1000)
self.debug = debug
self.logger = logging.getLogger(__name__)
self.concurrent_workers = concurrent_workers
self.worker_pages = dict()
async def render_page(self, cdf: bytes):
result = asyncio.Queue(maxsize=1)
await self.queue.put((cdf, result))
page: Optional[bytes] = copy.deepcopy(await result.get())
del result
return page
async def worker_screenshot(self, worker_id: int) -> bytes:
return await self.worker_pages[worker_id].screenshot()
async def worker_content(self, worker_id: int) -> bytes:
return await self.worker_pages[worker_id].content()
async def run(self):
async with async_playwright() as p:
self.browser = await p.firefox.launch()
self.context = await self.browser.new_context(storage_state=self.storage_state, viewport={"width": 2000, "height": 2500})
tasks = set()
for worker_id in range(self.concurrent_workers):
tasks.add(self._workers(worker_id, delay=worker_id * 15))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except asyncio.CancelledError:
self.logger.error("a worker cancelled")
async def _workers(self, worker_id: int, delay: float = 0):
await asyncio.sleep(delay)
page = await self._new_page()
self.worker_pages[worker_id] = page
self.logger.info(f"Created page in worker {worker_id}")
logger = self.logger.getChild(f"worker-{worker_id}")
remain_time = 100000 * 60 # Won't reach memory limit on Singularity, so disable this
start_time = time.time()
while True:
cdf, result = await self.queue.get()
logger.info(f"Got cdf from queue")
# Restart worker every 30 minutes to avoid memory leak
if time.time() - start_time > remain_time:
await page.close()
page = await self._new_page()
self.worker_pages[worker_id] = page
logger.info(f"Restart page in worker {worker_id}")
start_time = time.time()
try:
await result.put(await self._render(page, cdf, logger=logger))
logger.info(f"Put result to queue")
except Exception as e:
logger.error(f"Error: {e}")
await result.put(None)
self.queue.task_done()
async def _new_page(self):
assert self.context is not None
page = await self.context.new_page()
# Disable websocket here!
await page.add_init_script("WebSocket = () => {};")
await page.goto(self.EDIT_URL)
# Wait for search panel to be loaded
await page.wait_for_timeout(10000)
if self.debug:
await page.screenshot(path="start_page.png")
await page.get_by_role("tab", name="Starred").click()
# make sure you got something in starred tab
self.template_selector = page.get_by_label("Black Million Waves Logo").first
await self.template_selector.click()
await page.wait_for_timeout(5000)
await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0})
await page.wait_for_timeout(50)
self.init_render = await page.locator("css=._2y_DBA").first.screenshot()
await self.block(page, self.BLOCKED_URL)
# await page.route(re.compile(r"https://www.canva.com/_online/.*"), lambda x: x.fulfill(status=200, body=b""))
# await self.context.storage_state(path="me@xinyang.life.json")
await page.screenshot(path="final_start_page.png")
return page
async def _render(self, page: Page, cdf: bytes, logger: Optional[logging.Logger] = None) -> Optional[bytes]:
transaction_id = uuid.uuid4()
if logger is None:
logger = self.logger
logger.info(f"start rendering page")
with RoutePage(page, re.compile(r"https://template.canva.com/.*\.cdf"), partial(self.handle, cdf)) as r:
try:
await page.get_by_label("Black Million Waves Logo", exact=True).click()
except Exception as e:
logger.error(f"Exception({transaction_id}): {e} (trying to click continue editing)")
await page.screenshot(path=f"{transaction_id}-error-open-template.png")
try:
await page.get_by_text("continue editing").click()
await page.get_by_label("Black Million Waves Logo", exact=True).click()
except Exception as e:
await page.get_by_text("continue editing").highlight()
logger.error(f"Exception({transaction_id}): no `continue editing` button")
await page.screenshot(path=f"{transaction_id}-error-continue-editing.png")
return None
logger.info(f"template item opened")
try:
await page.get_by_role("button", name="Replace current page").click(timeout=5000)
logger.info("Replace current page popup, clicked")
await page.get_by_label("Black Million Waves Logo", exact=True).click()
except:
pass
# try:
# await page.locator("css=.yzhVgQ").nth(1).wait_for(state="attached")
# except Exception as e:
# logger.error(f"Exception({transaction_id}): {e}")
# await page.screenshot(path=f"{transaction_id}-error-yzhVgQ.png")
await page.wait_for_timeout(8000)
await page.locator("css=.HTh_Cg").first.click(force=True, position={"x": 0, "y": 0})
await page.wait_for_timeout(50)
result = await page.locator("css=._2y_DBA").first.screenshot()
logger.info(f"screenshot taken")
# Assert the new page is different from init_render
if result == self.init_render:
logger.error(f"Exception({transaction_id}): result is the same as init_render")
return None
return result
async def block(self, page: Page, block_list=[]):
async def log_abort(route: Route):
# self.logger.info(f"BLOCKED {route.request.url}")
await route.abort()
for url in block_list:
await page.route(re.compile(url), log_abort)
async def unblock(self, page: Page, block_list=[]):
for url in block_list:
await page.route(re.compile(url), lambda x: x.continue_())
async def handle(self, cdf: bytes, route: Route):
self.logger.info(f"HIJACKED {route.request.url}")
await route.fulfill(body=cdf)