Add support for s3 storage classes + added logging when we upload to s3 (#2610)
This commit is contained in:
@@ -10,6 +10,18 @@ from skyvern.config import settings
|
|||||||
LOG = structlog.get_logger()
|
LOG = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
# We only include the storage classes that we want to use in our application.
|
||||||
|
class S3StorageClass(StrEnum):
|
||||||
|
STANDARD = "STANDARD"
|
||||||
|
# REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY"
|
||||||
|
# INTELLIGENT_TIERING = "INTELLIGENT_TIERING"
|
||||||
|
ONEZONE_IA = "ONEZONE_IA"
|
||||||
|
GLACIER = "GLACIER"
|
||||||
|
# DEEP_ARCHIVE = "DEEP_ARCHIVE"
|
||||||
|
# OUTPOSTS = "OUTPOSTS"
|
||||||
|
# STANDARD_IA = "STANDARD_IA"
|
||||||
|
|
||||||
|
|
||||||
class AWSClientType(StrEnum):
|
class AWSClientType(StrEnum):
|
||||||
S3 = "s3"
|
S3 = "s3"
|
||||||
SECRETS_MANAGER = "secretsmanager"
|
SECRETS_MANAGER = "secretsmanager"
|
||||||
@@ -68,21 +80,33 @@ class AsyncAWSClient:
|
|||||||
LOG.exception("Failed to delete secret.", secret_name=secret_name)
|
LOG.exception("Failed to delete secret.", secret_name=secret_name)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
async def upload_file(self, uri: str, data: bytes) -> str | None:
|
async def upload_file(
|
||||||
|
self, uri: str, data: bytes, storage_class: S3StorageClass = S3StorageClass.STANDARD
|
||||||
|
) -> str | None:
|
||||||
|
if storage_class not in S3StorageClass:
|
||||||
|
raise ValueError(f"Invalid storage class: {storage_class}. Must be one of {list(S3StorageClass)}")
|
||||||
try:
|
try:
|
||||||
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
||||||
parsed_uri = S3Uri(uri)
|
parsed_uri = S3Uri(uri)
|
||||||
await client.put_object(Body=data, Bucket=parsed_uri.bucket, Key=parsed_uri.key)
|
await client.put_object(
|
||||||
|
Body=data, Bucket=parsed_uri.bucket, Key=parsed_uri.key, StorageClass=str(storage_class)
|
||||||
|
)
|
||||||
return uri
|
return uri
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("S3 upload failed.", uri=uri)
|
LOG.exception("S3 upload failed.", uri=uri)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def upload_file_stream(self, uri: str, file_obj: IO[bytes]) -> str | None:
|
async def upload_file_stream(
|
||||||
|
self, uri: str, file_obj: IO[bytes], storage_class: S3StorageClass = S3StorageClass.STANDARD
|
||||||
|
) -> str | None:
|
||||||
|
if storage_class not in S3StorageClass:
|
||||||
|
raise ValueError(f"Invalid storage class: {storage_class}. Must be one of {list(S3StorageClass)}")
|
||||||
try:
|
try:
|
||||||
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
||||||
parsed_uri = S3Uri(uri)
|
parsed_uri = S3Uri(uri)
|
||||||
await client.upload_fileobj(file_obj, parsed_uri.bucket, parsed_uri.key)
|
await client.upload_fileobj(
|
||||||
|
file_obj, parsed_uri.bucket, parsed_uri.key, StorageClass=str(storage_class)
|
||||||
|
)
|
||||||
LOG.debug("Upload file stream success", uri=uri)
|
LOG.debug("Upload file stream success", uri=uri)
|
||||||
return uri
|
return uri
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -93,22 +117,21 @@ class AsyncAWSClient:
|
|||||||
self,
|
self,
|
||||||
uri: str,
|
uri: str,
|
||||||
file_path: str,
|
file_path: str,
|
||||||
|
storage_class: S3StorageClass = S3StorageClass.STANDARD,
|
||||||
metadata: dict | None = None,
|
metadata: dict | None = None,
|
||||||
raise_exception: bool = False,
|
raise_exception: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
try:
|
try:
|
||||||
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client:
|
||||||
parsed_uri = S3Uri(uri)
|
parsed_uri = S3Uri(uri)
|
||||||
params: dict[str, Any] = {
|
extra_args: dict[str, Any] = {"ExtraArgs": {"Metadata": metadata}} if metadata else {}
|
||||||
"Filename": file_path,
|
await client.upload_file(
|
||||||
"Bucket": parsed_uri.bucket,
|
Filename=file_path,
|
||||||
"Key": parsed_uri.key,
|
Bucket=parsed_uri.bucket,
|
||||||
}
|
Key=parsed_uri.key,
|
||||||
|
StorageClass=str(storage_class),
|
||||||
if metadata:
|
**extra_args,
|
||||||
params["ExtraArgs"] = {"Metadata": metadata}
|
)
|
||||||
|
|
||||||
await client.upload_file(**params)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception("S3 upload failed.", uri=uri)
|
LOG.exception("S3 upload failed.", uri=uri)
|
||||||
if raise_exception:
|
if raise_exception:
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import structlog
|
|||||||
|
|
||||||
from skyvern.config import settings
|
from skyvern.config import settings
|
||||||
from skyvern.constants import DOWNLOAD_FILE_PREFIX
|
from skyvern.constants import DOWNLOAD_FILE_PREFIX
|
||||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient
|
from skyvern.forge.sdk.api.aws import AsyncAWSClient, S3StorageClass
|
||||||
from skyvern.forge.sdk.api.files import (
|
from skyvern.forge.sdk.api.files import (
|
||||||
calculate_sha256_for_file,
|
calculate_sha256_for_file,
|
||||||
create_named_temporary_file,
|
create_named_temporary_file,
|
||||||
@@ -67,7 +67,18 @@ class S3Storage(BaseStorage):
|
|||||||
return f"s3://{self.bucket}/{settings.ENV}/ai_suggestions/{ai_suggestion.ai_suggestion_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
|
return f"s3://{self.bucket}/{settings.ENV}/ai_suggestions/{ai_suggestion.ai_suggestion_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
|
||||||
|
|
||||||
async def store_artifact(self, artifact: Artifact, data: bytes) -> None:
|
async def store_artifact(self, artifact: Artifact, data: bytes) -> None:
|
||||||
await self.async_client.upload_file(artifact.uri, data)
|
sc = await self._get_storage_class_for_org(artifact.organization_id)
|
||||||
|
LOG.info(
|
||||||
|
"Storing artifact",
|
||||||
|
artifact_id=artifact.id,
|
||||||
|
organization_id=artifact.organization_id,
|
||||||
|
uri=artifact.uri,
|
||||||
|
storage_class=sc,
|
||||||
|
)
|
||||||
|
await self.async_client.upload_file(artifact.uri, data, storage_class=sc)
|
||||||
|
|
||||||
|
async def _get_storage_class_for_org(self, organization_id: str | None) -> S3StorageClass:
|
||||||
|
return S3StorageClass.STANDARD
|
||||||
|
|
||||||
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
|
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
|
||||||
return await self.async_client.download_file(artifact.uri)
|
return await self.async_client.download_file(artifact.uri)
|
||||||
@@ -80,12 +91,30 @@ class S3Storage(BaseStorage):
|
|||||||
return await self.async_client.create_presigned_urls([artifact.uri for artifact in artifacts])
|
return await self.async_client.create_presigned_urls([artifact.uri for artifact in artifacts])
|
||||||
|
|
||||||
async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None:
|
async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None:
|
||||||
await self.async_client.upload_file_from_path(artifact.uri, path)
|
sc = await self._get_storage_class_for_org(artifact.organization_id)
|
||||||
|
LOG.info(
|
||||||
|
"Storing artifact from path",
|
||||||
|
artifact_id=artifact.id,
|
||||||
|
organization_id=artifact.organization_id,
|
||||||
|
uri=artifact.uri,
|
||||||
|
storage_class=sc,
|
||||||
|
path=path,
|
||||||
|
)
|
||||||
|
await self.async_client.upload_file_from_path(artifact.uri, path, storage_class=sc)
|
||||||
|
|
||||||
async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
|
async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
|
||||||
from_path = f"{get_skyvern_temp_dir()}/{organization_id}/{file_name}"
|
from_path = f"{get_skyvern_temp_dir()}/{organization_id}/{file_name}"
|
||||||
to_path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}"
|
to_path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}"
|
||||||
await self.async_client.upload_file_from_path(to_path, from_path)
|
sc = await self._get_storage_class_for_org(organization_id)
|
||||||
|
LOG.info(
|
||||||
|
"Saving streaming file",
|
||||||
|
organization_id=organization_id,
|
||||||
|
file_name=file_name,
|
||||||
|
from_path=from_path,
|
||||||
|
to_path=to_path,
|
||||||
|
storage_class=sc,
|
||||||
|
)
|
||||||
|
await self.async_client.upload_file_from_path(to_path, from_path, storage_class=sc)
|
||||||
|
|
||||||
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
|
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
|
||||||
path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}"
|
path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}"
|
||||||
@@ -96,7 +125,16 @@ class S3Storage(BaseStorage):
|
|||||||
temp_zip_file = create_named_temporary_file()
|
temp_zip_file = create_named_temporary_file()
|
||||||
zip_file_path = shutil.make_archive(temp_zip_file.name, "zip", directory)
|
zip_file_path = shutil.make_archive(temp_zip_file.name, "zip", directory)
|
||||||
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
||||||
await self.async_client.upload_file_from_path(browser_session_uri, zip_file_path)
|
sc = await self._get_storage_class_for_org(organization_id)
|
||||||
|
LOG.info(
|
||||||
|
"Storing browser session",
|
||||||
|
organization_id=organization_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
zip_file_path=zip_file_path,
|
||||||
|
browser_session_uri=browser_session_uri,
|
||||||
|
storage_class=sc,
|
||||||
|
)
|
||||||
|
await self.async_client.upload_file_from_path(browser_session_uri, zip_file_path, storage_class=sc)
|
||||||
|
|
||||||
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
||||||
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
||||||
@@ -117,6 +155,7 @@ class S3Storage(BaseStorage):
|
|||||||
) -> None:
|
) -> None:
|
||||||
download_dir = get_download_dir(workflow_run_id=workflow_run_id, task_id=task_id)
|
download_dir = get_download_dir(workflow_run_id=workflow_run_id, task_id=task_id)
|
||||||
files = os.listdir(download_dir)
|
files = os.listdir(download_dir)
|
||||||
|
sc = await self._get_storage_class_for_org(organization_id)
|
||||||
for file in files:
|
for file in files:
|
||||||
fpath = os.path.join(download_dir, file)
|
fpath = os.path.join(download_dir, file)
|
||||||
if os.path.isfile(fpath):
|
if os.path.isfile(fpath):
|
||||||
@@ -124,11 +163,19 @@ class S3Storage(BaseStorage):
|
|||||||
|
|
||||||
# Calculate SHA-256 checksum
|
# Calculate SHA-256 checksum
|
||||||
checksum = calculate_sha256_for_file(fpath)
|
checksum = calculate_sha256_for_file(fpath)
|
||||||
LOG.info("Calculated checksum for file", file=file, checksum=checksum)
|
LOG.info(
|
||||||
|
"Calculated checksum for file",
|
||||||
|
file=file,
|
||||||
|
checksum=checksum,
|
||||||
|
organization_id=organization_id,
|
||||||
|
storage_class=sc,
|
||||||
|
)
|
||||||
# Upload file with checksum metadata
|
# Upload file with checksum metadata
|
||||||
await self.async_client.upload_file_from_path(
|
await self.async_client.upload_file_from_path(
|
||||||
uri=uri, file_path=fpath, metadata={"sha256_checksum": checksum, "original_filename": file}
|
uri=uri,
|
||||||
|
file_path=fpath,
|
||||||
|
metadata={"sha256_checksum": checksum, "original_filename": file},
|
||||||
|
storage_class=sc,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_downloaded_files(
|
async def get_downloaded_files(
|
||||||
|
|||||||
Reference in New Issue
Block a user