Pedro.loop without explicit extraction (#3051)
Co-authored-by: Shuchang Zheng <shu@skyvern.com>
This commit is contained in:
@@ -72,7 +72,7 @@ export const helpTooltips = {
|
|||||||
loop: {
|
loop: {
|
||||||
...baseHelpTooltipContent,
|
...baseHelpTooltipContent,
|
||||||
loopValue:
|
loopValue:
|
||||||
"Define this parameterized field with a parameter key to let Skyvern know the core value you're iterating over. Use {{ current_value }} elsewhere in the loop to get the current value for a given iteration.",
|
"Define the values to iterate over. Use a parameter reference or natural language (e.g., 'Extract links of the top 2 posts'). Natural language automatically creates an extraction block that generates a list of string values. Use {{ current_value }} in the loop to get the current iteration value.",
|
||||||
},
|
},
|
||||||
sendEmail: {
|
sendEmail: {
|
||||||
...baseHelpTooltipContent,
|
...baseHelpTooltipContent,
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
Analyze the current webpage and extract information based on this request: {{ natural_language_prompt }}
|
||||||
|
|
||||||
|
You need to identify what values should be iterated over (loop_values). Each value should be the primary data that will be used in each loop iteration.
|
||||||
|
|
||||||
|
For example:
|
||||||
|
- If the request is "go to each product page", extract product URLs as strings
|
||||||
|
- If the request is "extract the authors of the top 4 posts", extract author names as strings
|
||||||
|
- If the request is "summarize the text of each article", extract article URLs as strings
|
||||||
|
- If the request is "download each file", extract file URLs as strings
|
||||||
|
- If the request is "check if these articles are AI-related", extract article titles as strings
|
||||||
|
|
||||||
|
Return the results in the specified schema format with loop_values containing an array of strings, where each string is the primary value to be used in the loop iteration.
|
||||||
@@ -6,11 +6,14 @@ import asyncio
|
|||||||
import csv
|
import csv
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
import smtplib
|
import smtplib
|
||||||
|
import string
|
||||||
import textwrap
|
import textwrap
|
||||||
import uuid
|
import uuid
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
from email.message import EmailMessage
|
from email.message import EmailMessage
|
||||||
from enum import StrEnum
|
from enum import StrEnum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -71,6 +74,7 @@ from skyvern.forge.sdk.workflow.models.parameter import (
|
|||||||
AWSSecretParameter,
|
AWSSecretParameter,
|
||||||
ContextParameter,
|
ContextParameter,
|
||||||
OutputParameter,
|
OutputParameter,
|
||||||
|
ParameterType,
|
||||||
WorkflowParameter,
|
WorkflowParameter,
|
||||||
)
|
)
|
||||||
from skyvern.schemas.runs import RunEngine
|
from skyvern.schemas.runs import RunEngine
|
||||||
@@ -82,6 +86,11 @@ LOG = structlog.get_logger()
|
|||||||
jinja_sandbox_env = SandboxedEnvironment()
|
jinja_sandbox_env = SandboxedEnvironment()
|
||||||
|
|
||||||
|
|
||||||
|
def _generate_random_string(length: int = 8) -> str:
|
||||||
|
"""Generate a random string for unique identifiers."""
|
||||||
|
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
|
||||||
|
|
||||||
|
|
||||||
class BlockType(StrEnum):
|
class BlockType(StrEnum):
|
||||||
TASK = "task"
|
TASK = "task"
|
||||||
TaskV2 = "task_v2"
|
TaskV2 = "task_v2"
|
||||||
@@ -892,6 +901,7 @@ class ForLoopBlock(Block):
|
|||||||
|
|
||||||
def get_loop_block_context_parameters(self, workflow_run_id: str, loop_data: Any) -> list[ContextParameter]:
|
def get_loop_block_context_parameters(self, workflow_run_id: str, loop_data: Any) -> list[ContextParameter]:
|
||||||
context_parameters = []
|
context_parameters = []
|
||||||
|
|
||||||
for loop_block in self.loop_blocks:
|
for loop_block in self.loop_blocks:
|
||||||
# todo: handle the case where the loop_block is a ForLoopBlock
|
# todo: handle the case where the loop_block is a ForLoopBlock
|
||||||
|
|
||||||
@@ -922,17 +932,116 @@ class ForLoopBlock(Block):
|
|||||||
|
|
||||||
return context_parameters
|
return context_parameters
|
||||||
|
|
||||||
def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]:
|
async def get_loop_over_parameter_values(
|
||||||
|
self,
|
||||||
|
workflow_run_context: WorkflowRunContext,
|
||||||
|
workflow_run_id: str,
|
||||||
|
workflow_run_block_id: str,
|
||||||
|
organization_id: str | None = None,
|
||||||
|
) -> list[Any]:
|
||||||
# parse the value from self.loop_variable_reference and then from self.loop_over
|
# parse the value from self.loop_variable_reference and then from self.loop_over
|
||||||
if self.loop_variable_reference:
|
if self.loop_variable_reference:
|
||||||
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
|
LOG.debug("Processing loop variable reference", loop_variable_reference=self.loop_variable_reference)
|
||||||
try:
|
|
||||||
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
# Check if this looks like a parameter path (contains dots and/or _output)
|
||||||
value_template, workflow_run_context
|
is_likely_parameter_path = "extracted_information." in self.loop_variable_reference
|
||||||
)
|
|
||||||
except Exception as e:
|
# Try parsing as Jinja template
|
||||||
raise FailedToFormatJinjaStyleParameter(value_template, str(e))
|
parameter_value = self.try_parse_jinja_template(workflow_run_context)
|
||||||
parameter_value = json.loads(value_json)
|
|
||||||
|
if parameter_value is None and not is_likely_parameter_path:
|
||||||
|
try:
|
||||||
|
# Create and execute extraction block using the current block's workflow_id
|
||||||
|
extraction_block = self._create_initial_extraction_block(self.loop_variable_reference)
|
||||||
|
|
||||||
|
LOG.info(
|
||||||
|
"Processing natural language loop input",
|
||||||
|
prompt=self.loop_variable_reference,
|
||||||
|
extraction_goal=extraction_block.data_extraction_goal,
|
||||||
|
)
|
||||||
|
|
||||||
|
extraction_result = await extraction_block.execute(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not extraction_result.success:
|
||||||
|
LOG.error("Extraction block failed", failure_reason=extraction_result.failure_reason)
|
||||||
|
raise ValueError(f"Extraction block failed: {extraction_result.failure_reason}")
|
||||||
|
|
||||||
|
LOG.debug("Extraction block succeeded", output=extraction_result.output_parameter_value)
|
||||||
|
|
||||||
|
# Store the extraction result in the workflow context
|
||||||
|
await extraction_block.record_output_parameter_value(
|
||||||
|
workflow_run_context=workflow_run_context,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
value=extraction_result.output_parameter_value,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the extracted information
|
||||||
|
if not isinstance(extraction_result.output_parameter_value, dict):
|
||||||
|
LOG.error(
|
||||||
|
"Extraction result output_parameter_value is not a dict",
|
||||||
|
output_parameter_value=extraction_result.output_parameter_value,
|
||||||
|
)
|
||||||
|
raise ValueError("Extraction result output_parameter_value is not a dictionary")
|
||||||
|
|
||||||
|
if "extracted_information" not in extraction_result.output_parameter_value:
|
||||||
|
LOG.error(
|
||||||
|
"Extraction result missing extracted_information key",
|
||||||
|
output_parameter_value=extraction_result.output_parameter_value,
|
||||||
|
)
|
||||||
|
raise ValueError("Extraction result missing extracted_information key")
|
||||||
|
|
||||||
|
extracted_info = extraction_result.output_parameter_value["extracted_information"]
|
||||||
|
|
||||||
|
# Handle different possible structures of extracted_info
|
||||||
|
if isinstance(extracted_info, list):
|
||||||
|
# If it's a list, take the first element
|
||||||
|
if len(extracted_info) > 0:
|
||||||
|
extracted_info = extracted_info[0]
|
||||||
|
else:
|
||||||
|
LOG.error("Extracted information list is empty")
|
||||||
|
raise ValueError("Extracted information list is empty")
|
||||||
|
|
||||||
|
# At this point, extracted_info should be a dict
|
||||||
|
if not isinstance(extracted_info, dict):
|
||||||
|
LOG.error("Invalid extraction result structure - not a dict", extracted_info=extracted_info)
|
||||||
|
raise ValueError("Extraction result is not a dictionary")
|
||||||
|
|
||||||
|
# Extract the loop values
|
||||||
|
loop_values = extracted_info.get("loop_values", [])
|
||||||
|
|
||||||
|
if not loop_values:
|
||||||
|
LOG.error("No loop values found in extraction result")
|
||||||
|
raise ValueError("No loop values found in extraction result")
|
||||||
|
|
||||||
|
LOG.info("Extracted loop values", count=len(loop_values), values=loop_values)
|
||||||
|
|
||||||
|
# Update the loop variable reference to point to the extracted loop values
|
||||||
|
# We'll use a temporary key that we can reference
|
||||||
|
temp_key = f"extracted_loop_values_{_generate_random_string()}"
|
||||||
|
workflow_run_context.set_value(temp_key, loop_values)
|
||||||
|
self.loop_variable_reference = temp_key
|
||||||
|
|
||||||
|
# Now try parsing again with the updated reference
|
||||||
|
parameter_value = self.try_parse_jinja_template(workflow_run_context)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error("Failed to process natural language loop input", error=str(e))
|
||||||
|
raise FailedToFormatJinjaStyleParameter(self.loop_variable_reference, str(e))
|
||||||
|
|
||||||
|
if parameter_value is None:
|
||||||
|
# Fall back to the original Jinja template approach
|
||||||
|
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
|
||||||
|
try:
|
||||||
|
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
||||||
|
value_template, workflow_run_context
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise FailedToFormatJinjaStyleParameter(value_template, str(e))
|
||||||
|
parameter_value = json.loads(value_json)
|
||||||
|
|
||||||
elif self.loop_over is not None:
|
elif self.loop_over is not None:
|
||||||
if isinstance(self.loop_over, WorkflowParameter):
|
if isinstance(self.loop_over, WorkflowParameter):
|
||||||
@@ -971,6 +1080,90 @@ class ForLoopBlock(Block):
|
|||||||
# TODO (kerem): Should we raise an error here?
|
# TODO (kerem): Should we raise an error here?
|
||||||
return [parameter_value]
|
return [parameter_value]
|
||||||
|
|
||||||
|
def try_parse_jinja_template(self, workflow_run_context: WorkflowRunContext) -> Any | None:
|
||||||
|
"""Try to parse the loop variable reference as a Jinja template."""
|
||||||
|
try:
|
||||||
|
# Try the exact reference first
|
||||||
|
try:
|
||||||
|
if self.loop_variable_reference is None:
|
||||||
|
return None
|
||||||
|
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
|
||||||
|
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
||||||
|
value_template, workflow_run_context
|
||||||
|
)
|
||||||
|
parameter_value = json.loads(value_json)
|
||||||
|
if parameter_value is not None:
|
||||||
|
return parameter_value
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# If that fails, try common access patterns for extraction results
|
||||||
|
if self.loop_variable_reference is None:
|
||||||
|
return None
|
||||||
|
access_patterns = [
|
||||||
|
f"{self.loop_variable_reference}.extracted_information",
|
||||||
|
f"{self.loop_variable_reference}.extracted_information.results",
|
||||||
|
f"{self.loop_variable_reference}.results",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in access_patterns:
|
||||||
|
try:
|
||||||
|
value_template = f"{{{{ {pattern.strip(' {}')} | tojson }}}}"
|
||||||
|
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
||||||
|
value_template, workflow_run_context
|
||||||
|
)
|
||||||
|
parameter_value = json.loads(value_json)
|
||||||
|
if parameter_value is not None:
|
||||||
|
return parameter_value
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _create_initial_extraction_block(self, natural_language_prompt: str) -> ExtractionBlock:
|
||||||
|
"""Create an extraction block to process natural language input."""
|
||||||
|
|
||||||
|
# Create a schema that only extracts loop values
|
||||||
|
data_schema = {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"loop_values": {
|
||||||
|
"type": "array",
|
||||||
|
"description": "Array of values to iterate over. Each value should be the primary data needed for the loop blocks.",
|
||||||
|
"items": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The primary value to be used in the loop iteration (e.g., URL, text, identifier, etc.)",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create extraction goal that includes the natural language prompt
|
||||||
|
extraction_goal = prompt_engine.load_prompt(
|
||||||
|
"extraction_prompt_for_nat_language_loops", natural_language_prompt=natural_language_prompt
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a temporary output parameter using the current block's workflow_id
|
||||||
|
|
||||||
|
output_param = OutputParameter(
|
||||||
|
output_parameter_id=str(uuid.uuid4()),
|
||||||
|
key=f"natural_lang_extraction_{_generate_random_string()}",
|
||||||
|
workflow_id=self.output_parameter.workflow_id,
|
||||||
|
created_at=datetime.now(),
|
||||||
|
modified_at=datetime.now(),
|
||||||
|
parameter_type=ParameterType.OUTPUT,
|
||||||
|
description="Natural language extraction result",
|
||||||
|
)
|
||||||
|
|
||||||
|
return ExtractionBlock(
|
||||||
|
label=f"natural_lang_extraction_{_generate_random_string()}",
|
||||||
|
data_extraction_goal=extraction_goal,
|
||||||
|
data_schema=data_schema,
|
||||||
|
output_parameter=output_param,
|
||||||
|
)
|
||||||
|
|
||||||
async def execute_loop_helper(
|
async def execute_loop_helper(
|
||||||
self,
|
self,
|
||||||
workflow_run_id: str,
|
workflow_run_id: str,
|
||||||
@@ -985,6 +1178,7 @@ class ForLoopBlock(Block):
|
|||||||
current_block: BlockTypeVar | None = None
|
current_block: BlockTypeVar | None = None
|
||||||
|
|
||||||
for loop_idx, loop_over_value in enumerate(loop_over_values):
|
for loop_idx, loop_over_value in enumerate(loop_over_values):
|
||||||
|
LOG.info("Starting loop iteration", loop_idx=loop_idx, loop_over_value=loop_over_value)
|
||||||
context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value)
|
context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value)
|
||||||
for context_parameter in context_parameters_with_value:
|
for context_parameter in context_parameters_with_value:
|
||||||
workflow_run_context.set_value(context_parameter.key, context_parameter.value)
|
workflow_run_context.set_value(context_parameter.key, context_parameter.value)
|
||||||
@@ -1015,6 +1209,14 @@ class ForLoopBlock(Block):
|
|||||||
if workflow_run_context.has_value(block_output.output_parameter.key)
|
if workflow_run_context.has_value(block_output.output_parameter.key)
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Log the output value for debugging
|
||||||
|
if block_output.output_parameter.key.endswith("_output"):
|
||||||
|
LOG.debug("Block output", block_type=loop_block.block_type, output_value=output_value)
|
||||||
|
|
||||||
|
# Log URL information for goto_url blocks
|
||||||
|
if loop_block.block_type == BlockType.GOTO_URL:
|
||||||
|
LOG.info("Goto URL block executed", url=loop_block.url, loop_idx=loop_idx)
|
||||||
each_loop_output_values.append(
|
each_loop_output_values.append(
|
||||||
{
|
{
|
||||||
"loop_value": loop_over_value,
|
"loop_value": loop_over_value,
|
||||||
@@ -1089,7 +1291,12 @@ class ForLoopBlock(Block):
|
|||||||
) -> BlockResult:
|
) -> BlockResult:
|
||||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||||
try:
|
try:
|
||||||
loop_over_values = self.get_loop_over_parameter_values(workflow_run_context)
|
loop_over_values = await self.get_loop_over_parameter_values(
|
||||||
|
workflow_run_context=workflow_run_context,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return await self.build_block_result(
|
return await self.build_block_result(
|
||||||
success=False,
|
success=False,
|
||||||
|
|||||||
Reference in New Issue
Block a user