Handle 2FA codes that have the otpauth URIs (#1465)
This commit is contained in:
@@ -3,6 +3,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import urllib.parse
|
||||||
from enum import StrEnum
|
from enum import StrEnum
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
@@ -137,6 +138,43 @@ class BitwardenService:
|
|||||||
f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}"
|
f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_totp_secret(totp_value: str) -> str:
|
||||||
|
"""
|
||||||
|
Extract the TOTP secret from either a raw secret or a TOTP URI.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
totp_value: Raw TOTP secret or URI (otpauth://totp/...)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The extracted TOTP secret
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> BitwardenService.extract_totp_secret("AAAAAABBBBBBB")
|
||||||
|
"AAAAAABBBBBBB"
|
||||||
|
>>> BitwardenService.extract_totp_secret("otpauth://totp/user@domain.com?secret=AAAAAABBBBBBB")
|
||||||
|
"AAAAAABBBBBBB"
|
||||||
|
"""
|
||||||
|
if not totp_value:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# Handle TOTP URI format
|
||||||
|
if totp_value.startswith("otpauth://"):
|
||||||
|
try:
|
||||||
|
# Parse the URI to extract the secret
|
||||||
|
query = urllib.parse.urlparse(totp_value).query
|
||||||
|
params = dict(urllib.parse.parse_qsl(query))
|
||||||
|
return params.get("secret", "")
|
||||||
|
except Exception:
|
||||||
|
LOG.error(
|
||||||
|
"Failed to parse TOTP URI",
|
||||||
|
totp_value=totp_value,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
return ""
|
||||||
|
|
||||||
|
return totp_value
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _get_secret_value_from_url(
|
async def _get_secret_value_from_url(
|
||||||
client_id: str,
|
client_id: str,
|
||||||
@@ -204,18 +242,24 @@ class BitwardenService:
|
|||||||
collection_id_str = f" in collection with ID: {collection_id}" if collection_id else ""
|
collection_id_str = f" in collection with ID: {collection_id}" if collection_id else ""
|
||||||
raise BitwardenListItemsError(f"No items found in Bitwarden for URL: {url}{collection_id_str}")
|
raise BitwardenListItemsError(f"No items found in Bitwarden for URL: {url}{collection_id_str}")
|
||||||
|
|
||||||
bitwarden_result: list[BitwardenQueryResult] = [
|
bitwarden_result: list[BitwardenQueryResult] = []
|
||||||
BitwardenQueryResult(
|
for item in items:
|
||||||
credential={
|
if "login" not in item:
|
||||||
BitwardenConstants.USERNAME: item.get("login", {}).get("username", ""),
|
continue
|
||||||
BitwardenConstants.PASSWORD: item.get("login", {}).get("password", ""),
|
|
||||||
BitwardenConstants.TOTP: item.get("login", {}).get("totp", "") or "",
|
login = item["login"]
|
||||||
},
|
totp = BitwardenService.extract_totp_secret(login.get("totp", ""))
|
||||||
uris=[uri.get("uri") for uri in item.get("login", {}).get("uris", []) if "uri" in uri],
|
|
||||||
|
bitwarden_result.append(
|
||||||
|
BitwardenQueryResult(
|
||||||
|
credential={
|
||||||
|
BitwardenConstants.USERNAME: login.get("username", ""),
|
||||||
|
BitwardenConstants.PASSWORD: login.get("password", ""),
|
||||||
|
BitwardenConstants.TOTP: totp,
|
||||||
|
},
|
||||||
|
uris=[uri.get("uri") for uri in login.get("uris", []) if "uri" in uri],
|
||||||
|
)
|
||||||
)
|
)
|
||||||
for item in items
|
|
||||||
if "login" in item
|
|
||||||
]
|
|
||||||
|
|
||||||
if len(bitwarden_result) == 0:
|
if len(bitwarden_result) == 0:
|
||||||
return {}
|
return {}
|
||||||
|
|||||||
Reference in New Issue
Block a user