support multi tabs (#692)

This commit is contained in:
LawyZheng
2024-08-09 10:46:52 +08:00
committed by GitHub
parent e5c7d8b4dd
commit 6c0f94ac0a
5 changed files with 126 additions and 90 deletions

View File

@@ -105,17 +105,18 @@ class ForgeAgent:
task_url = task_block.url task_url = task_block.url
if task_url is None: if task_url is None:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(workflow_run=workflow_run) browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(workflow_run=workflow_run)
if not browser_state.page: working_page = await browser_state.get_working_page()
if not working_page:
LOG.error( LOG.error(
"BrowserState has no page", "BrowserState has no page",
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
) )
raise MissingBrowserStatePage(workflow_run_id=workflow_run.workflow_run_id) raise MissingBrowserStatePage(workflow_run_id=workflow_run.workflow_run_id)
if browser_state.page.url == "about:blank": if working_page.url == "about:blank":
raise InvalidWorkflowTaskURLState(workflow_run.workflow_run_id) raise InvalidWorkflowTaskURLState(workflow_run.workflow_run_id)
task_url = browser_state.page.url task_url = working_page.url
task = await app.DATABASE.create_task( task = await app.DATABASE.create_task(
url=task_url, url=task_url,
@@ -258,8 +259,8 @@ class ForgeAgent:
detailed_output, detailed_output,
) = await self._initialize_execution_state(task, step, workflow_run) ) = await self._initialize_execution_state(task, step, workflow_run)
if browser_state.page: if page := await browser_state.get_working_page():
self.register_async_operations(organization, task, browser_state.page) self.register_async_operations(organization, task, page)
step, detailed_output = await self.agent_step(task, step, browser_state, organization=organization) step, detailed_output = await self.agent_step(task, step, browser_state, organization=organization)
task = await self.update_task_errors_from_detailed_output(task, detailed_output) task = await self.update_task_errors_from_detailed_output(task, detailed_output)
@@ -796,7 +797,8 @@ class ForgeAgent:
return failed_step, detailed_agent_step_output.get_clean_detailed_output() return failed_step, detailed_agent_step_output.get_clean_detailed_output()
async def record_artifacts_after_action(self, task: Task, step: Step, browser_state: BrowserState) -> None: async def record_artifacts_after_action(self, task: Task, step: Step, browser_state: BrowserState) -> None:
if not browser_state.page: working_page = await browser_state.get_working_page()
if not working_page:
raise BrowserStateMissingPage() raise BrowserStateMissingPage()
try: try:
screenshot = await browser_state.take_screenshot(full_page=True) screenshot = await browser_state.take_screenshot(full_page=True)
@@ -814,7 +816,7 @@ class ForgeAgent:
) )
try: try:
skyvern_frame = await SkyvernFrame.create_instance(frame=browser_state.page) skyvern_frame = await SkyvernFrame.create_instance(frame=working_page)
html = await skyvern_frame.get_content() html = await skyvern_frame.get_content()
await app.ARTIFACT_MANAGER.create_artifact( await app.ARTIFACT_MANAGER.create_artifact(
step=step, step=step,
@@ -830,12 +832,15 @@ class ForgeAgent:
) )
try: try:
video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
await app.ARTIFACT_MANAGER.update_artifact_data( task_id=task.task_id, browser_state=browser_state
artifact_id=browser_state.browser_artifacts.video_artifact_id,
organization_id=task.organization_id,
data=video_data,
) )
for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=video_artifact.video_artifact_id,
organization_id=task.organization_id,
data=video_artifact.video_data,
)
except Exception: except Exception:
LOG.error( LOG.error(
"Failed to record video after action", "Failed to record video after action",
@@ -854,14 +859,20 @@ class ForgeAgent:
else: else:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_task(task) browser_state = await app.BROWSER_MANAGER.get_or_create_for_task(task)
# Initialize video artifact for the task here, afterwards it'll only get updated # Initialize video artifact for the task here, afterwards it'll only get updated
if browser_state and not browser_state.browser_artifacts.video_artifact_id: if browser_state and browser_state.browser_artifacts:
video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
video_artifact_id = await app.ARTIFACT_MANAGER.create_artifact( task_id=task.task_id, browser_state=browser_state
step=step,
artifact_type=ArtifactType.RECORDING,
data=video_data,
) )
app.BROWSER_MANAGER.set_video_artifact_for_task(task, video_artifact_id) for idx, video_artifact in enumerate(video_artifacts):
if video_artifact.video_artifact_id:
continue
video_artifact_id = await app.ARTIFACT_MANAGER.create_artifact(
step=step,
artifact_type=ArtifactType.RECORDING,
data=video_artifact.video_data,
)
video_artifacts[idx].video_artifact_id = video_artifact_id
app.BROWSER_MANAGER.set_video_artifact_for_task(task, video_artifacts)
detailed_output = DetailedAgentStepOutput( detailed_output = DetailedAgentStepOutput(
scraped_page=None, scraped_page=None,
@@ -1005,9 +1016,8 @@ class ForgeAgent:
# Generate the extract action prompt # Generate the extract action prompt
navigation_goal = task.navigation_goal navigation_goal = task.navigation_goal
starting_url = task.url starting_url = task.url
current_url = ( page = await browser_state.get_working_page()
await browser_state.page.evaluate("() => document.location.href") if browser_state.page else starting_url current_url = await page.evaluate("() => document.location.href") if page else starting_url
)
final_navigation_payload = self._build_navigation_payload( final_navigation_payload = self._build_navigation_payload(
task, expire_verification_code=expire_verification_code task, expire_verification_code=expire_verification_code
) )
@@ -1346,12 +1356,14 @@ class ForgeAgent:
browser_state = await app.BROWSER_MANAGER.cleanup_for_task(task.task_id, close_browser_on_completion) browser_state = await app.BROWSER_MANAGER.cleanup_for_task(task.task_id, close_browser_on_completion)
if browser_state: if browser_state:
# Update recording artifact after closing the browser, so we can get an accurate recording # Update recording artifact after closing the browser, so we can get an accurate recording
video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
if video_data: task_id=task.task_id, browser_state=browser_state
)
for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data( await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=browser_state.browser_artifacts.video_artifact_id, artifact_id=video_artifact.video_artifact_id,
organization_id=task.organization_id, organization_id=task.organization_id,
data=video_data, data=video_artifact.video_data,
) )
har_data = await app.BROWSER_MANAGER.get_har_data(task_id=task.task_id, browser_state=browser_state) har_data = await app.BROWSER_MANAGER.get_har_data(task_id=task.task_id, browser_state=browser_state)

View File

@@ -259,7 +259,8 @@ class TaskBlock(Block):
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run, url=self.url workflow_run=workflow_run, url=self.url
) )
if not browser_state.page: working_page = await browser_state.get_working_page()
if not working_page:
LOG.error( LOG.error(
"BrowserState has no page", "BrowserState has no page",
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
@@ -278,7 +279,7 @@ class TaskBlock(Block):
if self.url: if self.url:
try: try:
await browser_state.page.goto(self.url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) await working_page.goto(self.url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS)
except Error as playright_error: except Error as playright_error:
LOG.warning(f"Error while navigating to url: {str(playright_error)}") LOG.warning(f"Error while navigating to url: {str(playright_error)}")
# Make sure the task is marked as failed in the database before raising the exception # Make sure the task is marked as failed in the database before raising the exception
@@ -521,8 +522,9 @@ class CodeBlock(Block):
# get all parameters into a dictionary # get all parameters into a dictionary
parameter_values = {} parameter_values = {}
maybe_browser_state = await app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id) maybe_browser_state = await app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id)
if maybe_browser_state and maybe_browser_state.page: if maybe_browser_state:
parameter_values["skyvern_page"] = maybe_browser_state.page if page := await maybe_browser_state.get_working_page():
parameter_values["skyvern_page"] = await page
for parameter in self.parameters: for parameter in self.parameters:
value = workflow_run_context.get_value(parameter.key) value = workflow_run_context.get_value(parameter.key)

View File

@@ -744,16 +744,16 @@ class WorkflowService:
self, browser_state: BrowserState, workflow: Workflow, workflow_run: WorkflowRun self, browser_state: BrowserState, workflow: Workflow, workflow_run: WorkflowRun
) -> None: ) -> None:
# Create recording artifact after closing the browser, so we can get an accurate recording # Create recording artifact after closing the browser, so we can get an accurate recording
video_data = await app.BROWSER_MANAGER.get_video_data( video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
browser_state=browser_state, browser_state=browser_state,
) )
if video_data: for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data( await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=browser_state.browser_artifacts.video_artifact_id, artifact_id=video_artifact.video_artifact_id,
organization_id=workflow.organization_id, organization_id=workflow.organization_id,
data=video_data, data=video_artifact.video_data,
) )
async def persist_har_data( async def persist_har_data(

View File

@@ -85,15 +85,13 @@ class BrowserContextFactory:
@staticmethod @staticmethod
def build_browser_artifacts( def build_browser_artifacts(
video_path: str | None = None, video_artifacts: list[VideoArtifact] | None = None,
har_path: str | None = None, har_path: str | None = None,
video_artifact_id: str | None = None,
traces_dir: str | None = None, traces_dir: str | None = None,
) -> BrowserArtifacts: ) -> BrowserArtifacts:
return BrowserArtifacts( return BrowserArtifacts(
video_path=video_path, video_artifacts=video_artifacts or [],
har_path=har_path, har_path=har_path,
video_artifact_id=video_artifact_id,
traces_dir=traces_dir, traces_dir=traces_dir,
) )
@@ -130,9 +128,14 @@ class BrowserContextFactory:
return await cls._validator(page) return await cls._validator(page)
class BrowserArtifacts(BaseModel): class VideoArtifact(BaseModel):
video_path: str | None = None video_path: str | None = None
video_artifact_id: str | None = None video_artifact_id: str | None = None
video_data: bytes = bytes()
class BrowserArtifacts(BaseModel):
video_artifacts: list[VideoArtifact] = []
har_path: str | None = None har_path: str | None = None
traces_dir: str | None = None traces_dir: str | None = None
@@ -175,24 +178,26 @@ class BrowserState:
browser_artifacts: BrowserArtifacts = BrowserArtifacts(), browser_artifacts: BrowserArtifacts = BrowserArtifacts(),
browser_cleanup: BrowserCleanupFunc = None, browser_cleanup: BrowserCleanupFunc = None,
): ):
self.__page = page
self.pw = pw self.pw = pw
self.browser_context = browser_context self.browser_context = browser_context
self.page = page
self.browser_artifacts = browser_artifacts self.browser_artifacts = browser_artifacts
self.browser_cleanup = browser_cleanup self.browser_cleanup = browser_cleanup
def __assert_page(self) -> Page: async def __assert_page(self) -> Page:
if self.page is not None: page = await self.get_working_page()
return self.page if page is not None:
return page
LOG.error("BrowserState has no page") LOG.error("BrowserState has no page")
raise MissingBrowserStatePage() raise MissingBrowserStatePage()
async def _close_all_other_pages(self) -> None: async def _close_all_other_pages(self) -> None:
if not self.browser_context or not self.page: cur_page = await self.get_working_page()
if not self.browser_context or not cur_page:
return return
pages = self.browser_context.pages pages = self.browser_context.pages
for page in pages: for page in pages:
if page != self.page: if page != cur_page:
await page.close() await page.close()
async def check_and_fix_state( async def check_and_fix_state(
@@ -226,21 +231,22 @@ class BrowserState:
assert self.browser_context is not None assert self.browser_context is not None
if self.page is None: if await self.get_working_page() is None:
success = False success = False
retries = 0 retries = 0
while not success and retries < 3: while not success and retries < 3:
try: try:
LOG.info("Creating a new page") LOG.info("Creating a new page")
self.page = await self.browser_context.new_page() page = await self.browser_context.new_page()
await self.set_working_page(page, 0)
await self._close_all_other_pages() await self._close_all_other_pages()
LOG.info("A new page is created") LOG.info("A new page is created")
if url: if url:
LOG.info(f"Navigating page to {url} and waiting for 5 seconds") LOG.info(f"Navigating page to {url} and waiting for 5 seconds")
try: try:
start_time = time.time() start_time = time.time()
await self.page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) await page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS)
end_time = time.time() end_time = time.time()
LOG.info( LOG.info(
"Page loading time", "Page loading time",
@@ -270,8 +276,33 @@ class BrowserState:
raise e raise e
LOG.info(f"Retrying to create a new page. Retry count: {retries}") LOG.info(f"Retrying to create a new page. Retry count: {retries}")
if self.browser_artifacts.video_path is None: async def get_working_page(self) -> Page | None:
self.browser_artifacts.video_path = await self.page.video.path() if self.page and self.page.video else None # HACK: currently, assuming the last page is always the working page.
# Need to refactor this logic when we want to manipulate multi pages together
if self.__page is None or self.browser_context is None or len(self.browser_context.pages) == 0:
return None
last_page = self.browser_context.pages[-1]
if self.__page == last_page:
return self.__page
await self.set_working_page(last_page, len(self.browser_context.pages) - 1)
return last_page
async def set_working_page(self, page: Page | None, index: int = 0) -> None:
self.__page = page
if page is None:
return
if len(self.browser_artifacts.video_artifacts) > index:
if self.browser_artifacts.video_artifacts[index].video_path is None:
self.browser_artifacts.video_artifacts[index].video_path = await page.video.path()
return
target_lenght = index + 1
self.browser_artifacts.video_artifacts.extend(
[VideoArtifact()] * (target_lenght - len(self.browser_artifacts.video_artifacts))
)
self.browser_artifacts.video_artifacts[index].video_path = await page.video.path()
return
async def get_or_create_page( async def get_or_create_page(
self, self,
@@ -280,8 +311,9 @@ class BrowserState:
task_id: str | None = None, task_id: str | None = None,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
) -> Page: ) -> Page:
if self.page is not None: page = await self.get_working_page()
return self.page if page is not None:
return page
try: try:
await self.check_and_fix_state( await self.check_and_fix_state(
@@ -295,26 +327,26 @@ class BrowserState:
await self.check_and_fix_state( await self.check_and_fix_state(
url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id
) )
assert self.page is not None await self.__assert_page()
if not await BrowserContextFactory.validate_browser_context(self.page): if not await BrowserContextFactory.validate_browser_context(await self.get_working_page()):
await self.close_current_open_page() await self.close_current_open_page()
await self.check_and_fix_state( await self.check_and_fix_state(
url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id
) )
assert self.page is not None await self.__assert_page()
return self.page return page
async def close_current_open_page(self) -> None: async def close_current_open_page(self) -> None:
await self._close_all_other_pages() await self._close_all_other_pages()
if self.browser_context is not None: if self.browser_context is not None:
await self.browser_context.close() await self.browser_context.close()
self.browser_context = None self.browser_context = None
self.page = None await self.set_working_page(None)
async def stop_page_loading(self) -> None: async def stop_page_loading(self) -> None:
page = self.__assert_page() page = await self.__assert_page()
try: try:
await page.evaluate("window.stop()") await page.evaluate("window.stop()")
except Exception as e: except Exception as e:
@@ -322,7 +354,7 @@ class BrowserState:
raise FailedToStopLoadingPage(url=page.url, error_message=repr(e)) raise FailedToStopLoadingPage(url=page.url, error_message=repr(e))
async def reload_page(self) -> None: async def reload_page(self) -> None:
page = self.__assert_page() page = await self.__assert_page()
LOG.info(f"Reload page {page.url} and waiting for 5 seconds") LOG.info(f"Reload page {page.url} and waiting for 5 seconds")
try: try:
@@ -353,5 +385,5 @@ class BrowserState:
LOG.info("Playwright is stopped") LOG.info("Playwright is stopped")
async def take_screenshot(self, full_page: bool = False, file_path: str | None = None) -> bytes: async def take_screenshot(self, full_page: bool = False, file_path: str | None = None) -> bytes:
page = self.__assert_page() page = await self.__assert_page()
return await SkyvernFrame.take_screenshot(page=page, full_page=full_page, file_path=file_path) return await SkyvernFrame.take_screenshot(page=page, full_page=full_page, file_path=file_path)

View File

@@ -6,7 +6,7 @@ from playwright.async_api import async_playwright
from skyvern.exceptions import MissingBrowserState from skyvern.exceptions import MissingBrowserState
from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun
from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState, VideoArtifact
LOG = structlog.get_logger() LOG = structlog.get_logger()
@@ -96,52 +96,42 @@ class BrowserManager:
return self.pages[workflow_run_id] return self.pages[workflow_run_id]
return None return None
def set_video_artifact_for_task(self, task: Task, artifact_id: str) -> None: def set_video_artifact_for_task(self, task: Task, artifacts: list[VideoArtifact]) -> None:
if task.workflow_run_id and task.workflow_run_id in self.pages: if task.workflow_run_id and task.workflow_run_id in self.pages:
if self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id: self.pages[task.workflow_run_id].browser_artifacts.video_artifacts = artifacts
LOG.warning(
"Video artifact is already set for workflow run. Overwriting",
workflow_run_id=task.workflow_run_id,
old_artifact_id=self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id,
new_artifact_id=artifact_id,
)
self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id = artifact_id
return return
if task.task_id in self.pages: if task.task_id in self.pages:
if self.pages[task.task_id].browser_artifacts.video_artifact_id: self.pages[task.task_id].browser_artifacts.video_artifacts = artifacts
LOG.warning(
"Video artifact is already set for task. Overwriting",
task_id=task.task_id,
old_artifact_id=self.pages[task.task_id].browser_artifacts.video_artifact_id,
new_artifact_id=artifact_id,
)
self.pages[task.task_id].browser_artifacts.video_artifact_id = artifact_id
return return
raise MissingBrowserState(task_id=task.task_id) raise MissingBrowserState(task_id=task.task_id)
async def get_video_data( async def get_video_artifacts(
self, self,
browser_state: BrowserState, browser_state: BrowserState,
task_id: str = "", task_id: str = "",
workflow_id: str = "", workflow_id: str = "",
workflow_run_id: str = "", workflow_run_id: str = "",
) -> bytes: ) -> list[VideoArtifact]:
if browser_state: if len(browser_state.browser_artifacts.video_artifacts) == 0:
path = browser_state.browser_artifacts.video_path LOG.warning(
"Video data not found for task",
task_id=task_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
)
return []
for i, video_artifact in enumerate(browser_state.browser_artifacts.video_artifacts):
path = video_artifact.video_path
if path: if path:
try: try:
with open(path, "rb") as f: with open(path, "rb") as f:
return f.read() browser_state.browser_artifacts.video_artifacts[i].video_data = f.read()
except FileNotFoundError: except FileNotFoundError:
pass pass
LOG.warning( return browser_state.browser_artifacts.video_artifacts
"Video data not found for task",
task_id=task_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
)
return b""
async def get_har_data( async def get_har_data(
self, self,