Folders, Persistent Import Progress Tracking & UX Enhancements (#3841)

Co-authored-by: Jonathan Dobson <jon.m.dobson@gmail.com>
This commit is contained in:
Celal Zamanoglu
2025-11-05 18:37:18 +03:00
committed by GitHub
parent fcc3f30ba4
commit 75948053b9
32 changed files with 2886 additions and 538 deletions

View File

@@ -1,9 +1,22 @@
import asyncio
from enum import Enum
from typing import Annotated, Any
import structlog
import yaml
from fastapi import BackgroundTasks, Depends, Header, HTTPException, Path, Query, Request, Response, UploadFile, status
from fastapi import (
BackgroundTasks,
Body,
Depends,
Header,
HTTPException,
Path,
Query,
Request,
Response,
UploadFile,
)
from fastapi import status as http_status
from fastapi.responses import ORJSONResponse
from skyvern import analytics
@@ -84,6 +97,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunWithWorkflowResponse,
)
from skyvern.schemas.artifacts import EntityType, entity_type_to_param
from skyvern.schemas.folders import Folder, FolderCreate, FolderUpdate, UpdateWorkflowFolderRequest
from skyvern.schemas.runs import (
CUA_ENGINES,
BlockRunRequest,
@@ -410,7 +424,7 @@ async def get_run(
run_response = await run_service.get_run_response(run_id, organization_id=current_org.organization_id)
if not run_response:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Task run not found {run_id}",
)
return run_response
@@ -604,6 +618,22 @@ async def create_workflow_from_prompt(
return workflow.model_dump(by_alias=True)
async def _validate_file_size(file: UploadFile) -> UploadFile:
try:
file.file.seek(0, 2) # Move the pointer to the end of the file
size = file.file.tell() # Get the current position of the pointer, which represents the file size
file.file.seek(0) # Reset the pointer back to the beginning
except Exception as e:
raise HTTPException(status_code=500, detail="Could not determine file size.") from e
if size > app.SETTINGS_MANAGER.MAX_UPLOAD_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File size exceeds the maximum allowed size ({app.SETTINGS_MANAGER.MAX_UPLOAD_FILE_SIZE / 1024 / 1024} MB)",
)
return file
@legacy_base_router.post(
"/workflows/import-pdf",
response_model=dict[str, Any],
@@ -636,13 +666,97 @@ async def create_workflow_from_prompt(
include_in_schema=False,
)
async def import_workflow_from_pdf(
file: UploadFile,
background_tasks: BackgroundTasks,
file: UploadFile = Depends(_validate_file_size),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> dict[str, Any]:
"""Import a workflow from a PDF file containing Standard Operating Procedures."""
analytics.capture("skyvern-oss-workflow-import-pdf")
return await pdf_import_service.import_workflow_from_pdf(file, current_org)
# Read file and validate early (before creating import record)
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are supported.")
try:
file_contents = await file.read()
file_name = file.filename
finally:
# Release underlying SpooledTemporaryFile ASAP
await file.close()
# Extract text in executor to avoid blocking event loop (1-2 seconds)
try:
sop_text = await asyncio.to_thread(
pdf_import_service.extract_text_from_pdf,
file_contents,
file_name,
)
except HTTPException:
# Re-raise validation errors immediately
raise
# Validation passed! Create empty workflow v1 with status='importing'
empty_workflow = await app.DATABASE.create_workflow(
title=f"Importing {file_name}",
workflow_definition={"parameters": [], "blocks": []},
organization_id=current_org.organization_id,
status=WorkflowStatus.importing,
)
# Process PDF import in background (LLM call is the slow part)
async def process_pdf_import() -> None:
try:
# Create workflow from extracted text (LLM processing)
result = await pdf_import_service.create_workflow_from_sop_text(sop_text, current_org)
# Create v2 with real content
await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization=current_org,
request=WorkflowCreateYAMLRequest.model_validate(result),
workflow_permanent_id=empty_workflow.workflow_permanent_id,
)
# Update v1 status to published (v1 won't show in list since v2 is latest version)
await app.DATABASE.update_workflow(
workflow_id=empty_workflow.workflow_id,
organization_id=current_org.organization_id,
status=WorkflowStatus.published,
)
LOG.info(
"Workflow import completed",
workflow_permanent_id=empty_workflow.workflow_permanent_id,
organization_id=current_org.organization_id,
)
except Exception as e:
# Log full error server-side for debugging
LOG.exception(
"Workflow import failed",
workflow_permanent_id=empty_workflow.workflow_permanent_id,
error=str(e),
organization_id=current_org.organization_id,
)
# Provide sanitized user-facing error message (don't expose internal details/PII)
sanitized_error = "Import failed. Please verify the PDF content and try again."
# Mark v1 as import_failed with sanitized error
await app.DATABASE.update_workflow(
workflow_id=empty_workflow.workflow_id,
organization_id=current_org.organization_id,
status=WorkflowStatus.import_failed,
import_error=sanitized_error,
)
background_tasks.add_task(process_pdf_import)
return {
"workflow_permanent_id": empty_workflow.workflow_permanent_id,
"status": "importing",
"file_name": file.filename,
"organization_id": current_org.organization_id,
"created_at": empty_workflow.created_at.isoformat(),
}
@legacy_base_router.put(
@@ -817,6 +931,251 @@ async def delete_workflow(
await app.WORKFLOW_SERVICE.delete_workflow_by_permanent_id(workflow_id, current_org.organization_id)
################# Folder Endpoints #################
@legacy_base_router.post("/folders", response_model=Folder, tags=["agent"])
@legacy_base_router.post("/folders/", response_model=Folder, include_in_schema=False)
@base_router.post(
"/folders",
response_model=Folder,
tags=["Workflows"],
description="Create a new folder to organize workflows",
summary="Create folder",
responses={
200: {"description": "Successfully created folder"},
400: {"description": "Invalid request"},
},
)
@base_router.post("/folders/", response_model=Folder, include_in_schema=False)
async def create_folder(
data: FolderCreate,
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Folder:
analytics.capture("skyvern-oss-folder-create")
folder_model = await app.DATABASE.create_folder(
organization_id=current_org.organization_id,
title=data.title,
description=data.description,
)
workflow_count = await app.DATABASE.get_folder_workflow_count(
folder_id=folder_model.folder_id,
organization_id=current_org.organization_id,
)
return Folder(
folder_id=folder_model.folder_id,
organization_id=folder_model.organization_id,
title=folder_model.title,
description=folder_model.description,
workflow_count=workflow_count,
created_at=folder_model.created_at,
modified_at=folder_model.modified_at,
)
@legacy_base_router.get("/folders/{folder_id}", response_model=Folder, tags=["agent"])
@legacy_base_router.get("/folders/{folder_id}/", response_model=Folder, include_in_schema=False)
@base_router.get(
"/folders/{folder_id}",
response_model=Folder,
tags=["Workflows"],
description="Get a specific folder by ID",
summary="Get folder",
responses={
200: {"description": "Successfully retrieved folder"},
404: {"description": "Folder not found"},
},
)
@base_router.get("/folders/{folder_id}/", response_model=Folder, include_in_schema=False)
async def get_folder(
folder_id: str = Path(..., description="Folder ID", examples=["fld_123"]),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Folder:
folder = await app.DATABASE.get_folder(
folder_id=folder_id,
organization_id=current_org.organization_id,
)
if not folder:
raise HTTPException(status_code=http_status.HTTP_404_NOT_FOUND, detail=f"Folder {folder_id} not found")
workflow_count = await app.DATABASE.get_folder_workflow_count(
folder_id=folder.folder_id,
organization_id=current_org.organization_id,
)
return Folder(
folder_id=folder.folder_id,
organization_id=folder.organization_id,
title=folder.title,
description=folder.description,
workflow_count=workflow_count,
created_at=folder.created_at,
modified_at=folder.modified_at,
)
@legacy_base_router.get("/folders", response_model=list[Folder], tags=["agent"])
@legacy_base_router.get("/folders/", response_model=list[Folder], include_in_schema=False)
@base_router.get(
"/folders",
response_model=list[Folder],
tags=["Workflows"],
description="Get all folders for the organization",
summary="Get folders",
responses={
200: {"description": "Successfully retrieved folders"},
},
)
@base_router.get("/folders/", response_model=list[Folder], include_in_schema=False)
async def get_folders(
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(100, ge=1, le=500, description="Number of folders per page"),
search: str | None = Query(None, description="Search folders by title or description"),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[Folder]:
folders = await app.DATABASE.get_folders(
organization_id=current_org.organization_id,
page=page,
page_size=page_size,
search_query=search,
)
# Get workflow counts for all folders in a single query
if folders:
folder_ids = [folder.folder_id for folder in folders]
workflow_counts = await app.DATABASE.get_folder_workflow_counts_batch(
folder_ids=folder_ids,
organization_id=current_org.organization_id,
)
else:
workflow_counts = {}
# Build result with workflow counts
result = []
for folder in folders:
result.append(
Folder(
folder_id=folder.folder_id,
organization_id=folder.organization_id,
title=folder.title,
description=folder.description,
workflow_count=workflow_counts.get(folder.folder_id, 0),
created_at=folder.created_at,
modified_at=folder.modified_at,
)
)
return result
@legacy_base_router.put("/folders/{folder_id}", response_model=Folder, tags=["agent"])
@legacy_base_router.put("/folders/{folder_id}/", response_model=Folder, include_in_schema=False)
@base_router.put(
"/folders/{folder_id}",
response_model=Folder,
tags=["Workflows"],
description="Update a folder's title or description",
summary="Update folder",
responses={
200: {"description": "Successfully updated folder"},
404: {"description": "Folder not found"},
},
)
@base_router.put("/folders/{folder_id}/", response_model=Folder, include_in_schema=False)
async def update_folder(
folder_id: str = Path(..., description="Folder ID", examples=["fld_123"]),
data: FolderUpdate = Body(...),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Folder:
folder = await app.DATABASE.update_folder(
folder_id=folder_id,
organization_id=current_org.organization_id,
title=data.title,
description=data.description,
)
if not folder:
raise HTTPException(status_code=http_status.HTTP_404_NOT_FOUND, detail=f"Folder {folder_id} not found")
workflow_count = await app.DATABASE.get_folder_workflow_count(
folder_id=folder.folder_id,
organization_id=current_org.organization_id,
)
return Folder(
folder_id=folder.folder_id,
organization_id=folder.organization_id,
title=folder.title,
description=folder.description,
workflow_count=workflow_count,
created_at=folder.created_at,
modified_at=folder.modified_at,
)
@legacy_base_router.delete("/folders/{folder_id}", tags=["agent"])
@legacy_base_router.delete("/folders/{folder_id}/", include_in_schema=False)
@base_router.delete(
"/folders/{folder_id}",
tags=["Workflows"],
description="Delete a folder. Optionally delete all workflows in the folder.",
summary="Delete folder",
responses={
200: {"description": "Successfully deleted folder"},
404: {"description": "Folder not found"},
},
)
@base_router.delete("/folders/{folder_id}/", include_in_schema=False)
async def delete_folder(
folder_id: str = Path(..., description="Folder ID", examples=["fld_123"]),
delete_workflows: bool = Query(False, description="If true, also delete all workflows in this folder"),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> dict:
analytics.capture("skyvern-oss-folder-delete")
success = await app.DATABASE.soft_delete_folder(
folder_id=folder_id,
organization_id=current_org.organization_id,
delete_workflows=delete_workflows,
)
if not success:
raise HTTPException(status_code=http_status.HTTP_404_NOT_FOUND, detail=f"Folder {folder_id} not found")
return {"status": "deleted", "folder_id": folder_id, "workflows_deleted": delete_workflows}
@legacy_base_router.put("/workflows/{workflow_permanent_id}/folder", response_model=Workflow, tags=["agent"])
@legacy_base_router.put("/workflows/{workflow_permanent_id}/folder/", response_model=Workflow, include_in_schema=False)
@base_router.put(
"/workflows/{workflow_permanent_id}/folder",
response_model=Workflow,
tags=["Workflows"],
description="Update a workflow's folder assignment for the latest version",
summary="Update workflow folder",
responses={
200: {"description": "Successfully updated workflow folder"},
404: {"description": "Workflow not found"},
400: {"description": "Folder not found"},
},
)
@base_router.put("/workflows/{workflow_permanent_id}/folder/", response_model=Workflow, include_in_schema=False)
async def update_workflow_folder(
workflow_permanent_id: str = Path(..., description="Workflow permanent ID", examples=["wpid_123"]),
data: UpdateWorkflowFolderRequest = Body(...),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Workflow:
try:
workflow = await app.DATABASE.update_workflow_folder(
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
folder_id=data.folder_id,
)
if not workflow:
raise HTTPException(
status_code=http_status.HTTP_404_NOT_FOUND, detail=f"Workflow {workflow_permanent_id} not found"
)
return workflow
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) from e
@legacy_base_router.post(
"/utilities/curl-to-http",
tags=["Utilities"],
@@ -896,7 +1255,7 @@ async def get_artifact(
)
if not artifact:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Artifact not found {artifact_id}",
)
if settings.ENV != "local" or settings.GENERATE_PRESIGNED_URLS:
@@ -1027,7 +1386,7 @@ async def get_run_timeline(
run_response = await run_service.get_run_response(run_id, organization_id=current_org.organization_id)
if not run_response:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Run not found {run_id}",
)
@@ -1040,13 +1399,13 @@ async def get_run_timeline(
task_v2 = await app.DATABASE.get_task_v2(task_v2_id=run_id, organization_id=current_org.organization_id)
if not task_v2:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Task v2 not found {run_id}",
)
if not task_v2.workflow_run_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail=f"Task v2 {run_id} has no associated workflow run",
)
@@ -1054,7 +1413,7 @@ async def get_run_timeline(
# Timeline not available for other run types
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail=f"Timeline not available for run type {run_response.run_type}",
)
@@ -1149,7 +1508,7 @@ async def webhook(
payload=payload,
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail="Missing webhook signature or timestamp",
)
@@ -1270,7 +1629,7 @@ async def cancel_task(
task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
if not task_obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Task not found {task_id}",
)
task = await app.agent.update_task(task_obj, status=TaskStatus.canceled)
@@ -1288,7 +1647,7 @@ async def _cancel_workflow_run(workflow_run_id: str, organization_id: str, x_api
if not workflow_run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Workflow run not found {workflow_run_id}",
)
@@ -1324,7 +1683,7 @@ async def _continue_workflow_run(workflow_run_id: str, organization_id: str) ->
if not workflow_run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Workflow run not found {workflow_run_id}",
)
@@ -1398,7 +1757,7 @@ async def retry_webhook(
task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
if not task_obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Task not found {task_id}",
)
@@ -1452,7 +1811,7 @@ async def get_tasks(
analytics.capture("skyvern-oss-agent-tasks-get")
if only_standalone_tasks and workflow_run_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail="only_standalone_tasks and workflow_run_id cannot be used together",
)
tasks = await app.DATABASE.get_tasks(
@@ -1560,7 +1919,7 @@ async def get_artifacts(
if entity_type not in entity_type_to_param:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail=f"Invalid entity_type: {entity_type}",
)
@@ -1828,7 +2187,7 @@ async def get_workflow_and_run_from_workflow_run_id(
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Workflow run not found {workflow_run_id}",
)
@@ -1926,9 +2285,11 @@ async def get_workflows(
only_workflows: bool = Query(False),
search_key: str | None = Query(
None,
description="Unified search across workflow title and parameter metadata (key, description, default_value).",
description="Unified search across workflow title, folder name, and parameter metadata (key, description, default_value).",
),
title: str = Query("", deprecated=True, description="Deprecated: use search_key instead."),
folder_id: str | None = Query(None, description="Filter workflows by folder ID"),
status: Annotated[list[WorkflowStatus] | None, Query()] = None,
current_org: Organization = Depends(org_auth_service.get_current_org),
template: bool = Query(False),
) -> list[Workflow]:
@@ -1936,8 +2297,8 @@ async def get_workflows(
Get all workflows with the latest version for the organization.
Search semantics:
- If `search_key` is provided, its value is used as a unified search term for both
`workflows.title` and workflow parameter metadata (key, description, and default_value for
- If `search_key` is provided, its value is used as a unified search term for
`workflows.title`, `folders.title`, and workflow parameter metadata (key, description, and default_value for
`WorkflowParameterModel`).
- Falls back to deprecated `title` (title-only search) if `search_key` is not provided.
- Parameter metadata search excludes soft-deleted parameter rows across all parameter tables.
@@ -1947,6 +2308,9 @@ async def get_workflows(
# Determine the effective search term: prioritize search_key, fallback to title
effective_search = search_key or (title if title else None)
# Default to published and draft if no status filter provided
effective_statuses = status if status else [WorkflowStatus.published, WorkflowStatus.draft]
if template:
global_workflows_permanent_ids = await app.STORAGE.retrieve_global_workflows()
if not global_workflows_permanent_ids:
@@ -1956,13 +2320,13 @@ async def get_workflows(
page=page,
page_size=page_size,
search_key=effective_search or "",
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
statuses=effective_statuses,
)
return workflows
if only_saved_tasks and only_workflows:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=http_status.HTTP_400_BAD_REQUEST,
detail="only_saved_tasks and only_workflows cannot be used together",
)
@@ -1973,7 +2337,8 @@ async def get_workflows(
only_saved_tasks=only_saved_tasks,
only_workflows=only_workflows,
search_key=effective_search,
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
folder_id=folder_id,
statuses=effective_statuses,
)
@@ -2182,22 +2547,6 @@ async def get_api_keys(
return GetOrganizationAPIKeysResponse(api_keys=api_keys)
async def _validate_file_size(file: UploadFile) -> UploadFile:
try:
file.file.seek(0, 2) # Move the pointer to the end of the file
size = file.file.tell() # Get the current position of the pointer, which represents the file size
file.file.seek(0) # Reset the pointer back to the beginning
except Exception as e:
raise HTTPException(status_code=500, detail="Could not determine file size.") from e
if size > app.SETTINGS_MANAGER.MAX_UPLOAD_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File size exceeds the maximum allowed size ({app.SETTINGS_MANAGER.MAX_UPLOAD_FILE_SIZE / 1024 / 1024} MB)",
)
return file
@legacy_base_router.post(
"/upload_file",
tags=["server"],