diff --git a/.env.example b/.env.example
index 35ea12a8856..e21134a55ca 100644
--- a/.env.example
+++ b/.env.example
@@ -19,4 +19,14 @@ FORWARDED_ALLOW_IPS='*'
# DO NOT TRACK
SCARF_NO_ANALYTICS=true
DO_NOT_TRACK=true
-ANONYMIZED_TELEMETRY=false
\ No newline at end of file
+ANONYMIZED_TELEMETRY=false
+
+# WorkOS SSO Configuration
+# For easy enterprise SSO integration with SAML, OIDC, and OAuth providers
+# Get your credentials from https://dashboard.workos.com
+# WORKOS_CLIENT_ID='client_...'
+# WORKOS_API_KEY='sk_...'
+# Either organization_id OR connection_id is required:
+# WORKOS_ORGANIZATION_ID='org_...' # For organization-based SSO
+# WORKOS_CONNECTION_ID='conn_...' # For connection-based SSO
+# WORKOS_REDIRECT_URI='http://localhost:8080/oauth/workos/callback'
\ No newline at end of file
diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py
index 5245a078141..fb0adc81779 100644
--- a/backend/open_webui/config.py
+++ b/backend/open_webui/config.py
@@ -545,6 +545,37 @@ def __getattr__(self, key):
os.environ.get("FEISHU_REDIRECT_URI", ""),
)
+# WorkOS SSO Configuration
+WORKOS_CLIENT_ID = PersistentConfig(
+ "WORKOS_CLIENT_ID",
+ "oauth.workos.client_id",
+ os.environ.get("WORKOS_CLIENT_ID", ""),
+)
+
+WORKOS_API_KEY = PersistentConfig(
+ "WORKOS_API_KEY",
+ "oauth.workos.api_key",
+ os.environ.get("WORKOS_API_KEY", ""),
+)
+
+WORKOS_ORGANIZATION_ID = PersistentConfig(
+ "WORKOS_ORGANIZATION_ID",
+ "oauth.workos.organization_id",
+ os.environ.get("WORKOS_ORGANIZATION_ID", ""),
+)
+
+WORKOS_CONNECTION_ID = PersistentConfig(
+ "WORKOS_CONNECTION_ID",
+ "oauth.workos.connection_id",
+ os.environ.get("WORKOS_CONNECTION_ID", ""),
+)
+
+WORKOS_REDIRECT_URI = PersistentConfig(
+ "WORKOS_REDIRECT_URI",
+ "oauth.workos.redirect_uri",
+ os.environ.get("WORKOS_REDIRECT_URI", ""),
+)
+
ENABLE_OAUTH_ROLE_MANAGEMENT = PersistentConfig(
"ENABLE_OAUTH_ROLE_MANAGEMENT",
"oauth.enable_role_mapping",
@@ -769,6 +800,14 @@ def feishu_oauth_register(oauth: OAuth):
"sub_claim": "user_id",
}
+ # WorkOS SSO - uses its own SDK, not standard OAuth registration
+ if WORKOS_CLIENT_ID.value and WORKOS_API_KEY.value:
+ OAUTH_PROVIDERS["workos"] = {
+ "name": "WorkOS SSO",
+ "redirect_uri": WORKOS_REDIRECT_URI.value,
+ "uses_workos_sdk": True, # Flag to indicate WorkOS uses its own SDK
+ }
+
configured_providers = []
if GOOGLE_CLIENT_ID.value:
configured_providers.append("Google")
@@ -778,9 +817,13 @@ def feishu_oauth_register(oauth: OAuth):
configured_providers.append("GitHub")
if FEISHU_CLIENT_ID.value:
configured_providers.append("Feishu")
+ if WORKOS_CLIENT_ID.value:
+ configured_providers.append("WorkOS")
- if configured_providers and not OPENID_PROVIDER_URL.value:
- provider_list = ", ".join(configured_providers)
+ # WorkOS handles logout differently, so exclude it from this warning
+ non_workos_providers = [p for p in configured_providers if p != "WorkOS"]
+ if non_workos_providers and not OPENID_PROVIDER_URL.value:
+ provider_list = ", ".join(non_workos_providers)
log.warning(
f"⚠️ OAuth providers configured ({provider_list}) but OPENID_PROVIDER_URL not set - logout will not work!"
)
diff --git a/backend/open_webui/test/apps/webui/utils/test_workos_oauth.py b/backend/open_webui/test/apps/webui/utils/test_workos_oauth.py
new file mode 100644
index 00000000000..2ffec1ab2d4
--- /dev/null
+++ b/backend/open_webui/test/apps/webui/utils/test_workos_oauth.py
@@ -0,0 +1,288 @@
+"""
+Tests for WorkOS SSO integration.
+"""
+
+import pytest
+from unittest.mock import MagicMock, patch, AsyncMock
+from datetime import datetime
+
+
+class TestWorkOSConfiguration:
+ """Test WorkOS configuration loading and validation."""
+
+ def test_workos_config_variables_exist(self):
+ """Test that WorkOS configuration variables are defined in config."""
+ from open_webui.config import (
+ WORKOS_CLIENT_ID,
+ WORKOS_API_KEY,
+ WORKOS_ORGANIZATION_ID,
+ WORKOS_CONNECTION_ID,
+ WORKOS_REDIRECT_URI,
+ )
+
+ # These should be PersistentConfig objects
+ assert hasattr(WORKOS_CLIENT_ID, "value")
+ assert hasattr(WORKOS_API_KEY, "value")
+ assert hasattr(WORKOS_ORGANIZATION_ID, "value")
+ assert hasattr(WORKOS_CONNECTION_ID, "value")
+ assert hasattr(WORKOS_REDIRECT_URI, "value")
+
+ def test_workos_provider_not_registered_without_credentials(self):
+ """Test that WorkOS provider is not registered without credentials."""
+ from open_webui.config import OAUTH_PROVIDERS, load_oauth_providers
+
+ # Temporarily clear and reload with empty credentials
+ with patch("open_webui.config.WORKOS_CLIENT_ID") as mock_client_id, patch(
+ "open_webui.config.WORKOS_API_KEY"
+ ) as mock_api_key:
+ mock_client_id.value = ""
+ mock_api_key.value = ""
+ load_oauth_providers()
+ # WorkOS should not be in providers when credentials are missing
+ # (we check it's not there OR if there it has the flag)
+ if "workos" in OAUTH_PROVIDERS:
+ assert OAUTH_PROVIDERS["workos"].get("uses_workos_sdk") is True
+
+
+class TestWorkOSOAuthManager:
+ """Test WorkOS-specific OAuth manager functionality."""
+
+ @pytest.fixture
+ def mock_workos_profile(self):
+ """Create a mock WorkOS profile object."""
+ profile = MagicMock()
+ profile.id = "prof_test123"
+ profile.email = "user@example.com"
+ profile.first_name = "Test"
+ profile.last_name = "User"
+ profile.connection_id = "conn_test123"
+ profile.connection_type = "OktaSAML"
+ profile.organization_id = "org_test123"
+ profile.idp_id = "idp_test123"
+ profile.profile_picture_url = None
+ profile.role = None
+ return profile
+
+ @pytest.fixture
+ def mock_workos_client(self, mock_workos_profile):
+ """Create a mock WorkOS client."""
+ client = MagicMock()
+ client.sso.get_authorization_url.return_value = (
+ "https://api.workos.com/sso/authorize?client_id=test"
+ )
+
+ # Mock profile_and_token response
+ profile_and_token = MagicMock()
+ profile_and_token.profile = mock_workos_profile
+ profile_and_token.access_token = "test_access_token"
+ client.sso.get_profile_and_token.return_value = profile_and_token
+
+ return client
+
+ def test_workos_sdk_import(self):
+ """Test that WorkOS SDK can be imported."""
+ try:
+ from workos import WorkOSClient
+
+ assert WorkOSClient is not None
+ except ImportError:
+ pytest.skip("WorkOS SDK not installed")
+
+ def test_workos_available_flag(self):
+ """Test WORKOS_AVAILABLE flag in oauth module."""
+ from open_webui.utils.oauth import WORKOS_AVAILABLE
+
+ # Should be True if workos is installed, False otherwise
+ assert isinstance(WORKOS_AVAILABLE, bool)
+
+ @patch("open_webui.utils.oauth.WORKOS_CLIENT_ID")
+ @patch("open_webui.utils.oauth.WORKOS_API_KEY")
+ @patch("open_webui.utils.oauth.WORKOS_ORGANIZATION_ID")
+ @patch("open_webui.utils.oauth.WORKOS_CONNECTION_ID")
+ @patch("open_webui.utils.oauth.OAUTH_PROVIDERS")
+ def test_handle_login_requires_org_or_connection(
+ self,
+ mock_providers,
+ mock_connection_id,
+ mock_org_id,
+ mock_api_key,
+ mock_client_id,
+ ):
+ """Test that WorkOS login requires organization_id or connection_id."""
+ from open_webui.utils.oauth import OAuthManager
+ from fastapi import HTTPException
+
+ # Setup mocks
+ mock_client_id.value = "client_test"
+ mock_api_key.value = "sk_test"
+ mock_org_id.value = ""
+ mock_connection_id.value = ""
+ mock_providers.__contains__ = lambda self, x: x == "workos"
+ mock_providers.__getitem__ = lambda self, x: {"uses_workos_sdk": True}
+
+ # Create manager with mocked WorkOS client
+ app = MagicMock()
+ manager = OAuthManager(app)
+ manager._workos_client = MagicMock()
+
+ request = MagicMock()
+ request.app.state.config.WEBUI_URL = "http://localhost:8080"
+
+ # Should raise error when neither org_id nor connection_id is set
+ with pytest.raises(HTTPException) as exc_info:
+ import asyncio
+
+ asyncio.get_event_loop().run_until_complete(
+ manager._handle_workos_login(request)
+ )
+ assert exc_info.value.status_code == 500
+
+
+class TestWorkOSProfileMapping:
+ """Test WorkOS profile to user data mapping."""
+
+ def test_profile_to_user_data_mapping(self):
+ """Test that WorkOS profile fields are correctly mapped to user data."""
+ # Create mock profile
+ profile = MagicMock()
+ profile.id = "prof_123"
+ profile.email = "test@example.com"
+ profile.first_name = "John"
+ profile.last_name = "Doe"
+ profile.connection_id = "conn_123"
+ profile.connection_type = "GoogleOAuth"
+ profile.organization_id = "org_123"
+ profile.idp_id = "idp_123"
+
+ # This simulates what _handle_workos_callback does
+ user_data = {
+ "sub": profile.id,
+ "email": profile.email,
+ "name": f"{profile.first_name or ''} {profile.last_name or ''}".strip()
+ or profile.email,
+ "first_name": profile.first_name,
+ "last_name": profile.last_name,
+ "picture": getattr(profile, "profile_picture_url", None),
+ "connection_id": profile.connection_id,
+ "connection_type": profile.connection_type,
+ "organization_id": profile.organization_id,
+ "idp_id": profile.idp_id,
+ }
+
+ assert user_data["sub"] == "prof_123"
+ assert user_data["email"] == "test@example.com"
+ assert user_data["name"] == "John Doe"
+ assert user_data["organization_id"] == "org_123"
+
+ def test_profile_with_role_mapping(self):
+ """Test that WorkOS profile roles are correctly mapped."""
+ # Create mock profile with role
+ profile = MagicMock()
+ profile.id = "prof_123"
+ profile.email = "admin@example.com"
+ profile.first_name = "Admin"
+ profile.last_name = "User"
+
+ # Mock role object
+ role = MagicMock()
+ role.slug = "admin"
+ profile.role = role
+
+ # This simulates role extraction logic
+ user_data = {"roles": []}
+ if hasattr(profile, "role") and profile.role:
+ user_data["roles"] = (
+ [profile.role.slug]
+ if hasattr(profile.role, "slug")
+ else [profile.role]
+ )
+
+ assert user_data["roles"] == ["admin"]
+
+
+class TestWorkOSEmailDomainValidation:
+ """Test email domain validation for WorkOS SSO."""
+
+ def test_email_domain_allowed_wildcard(self):
+ """Test that wildcard domain allows all emails."""
+ from open_webui.utils.oauth import auth_manager_config
+
+ allowed_domains = ["*"]
+ email = "user@anydomain.com"
+
+ # Simulate domain check logic
+ is_allowed = (
+ "*" in allowed_domains or email.split("@")[-1] in allowed_domains
+ )
+ assert is_allowed is True
+
+ def test_email_domain_allowed_specific(self):
+ """Test that specific domain restricts access."""
+ allowed_domains = ["example.com", "company.com"]
+
+ # Test allowed email
+ email_allowed = "user@example.com"
+ is_allowed = (
+ "*" in allowed_domains or email_allowed.split("@")[-1] in allowed_domains
+ )
+ assert is_allowed is True
+
+ # Test disallowed email
+ email_denied = "user@otherdomain.com"
+ is_denied = (
+ "*" in allowed_domains or email_denied.split("@")[-1] in allowed_domains
+ )
+ assert is_denied is False
+
+
+class TestWorkOSProviderSubject:
+ """Test WorkOS provider subject ID generation."""
+
+ def test_provider_sub_format(self):
+ """Test that provider_sub is correctly formatted."""
+ profile_id = "prof_01ABC123"
+ provider_sub = f"workos@{profile_id}"
+
+ assert provider_sub == "workos@prof_01ABC123"
+ assert provider_sub.startswith("workos@")
+
+ def test_provider_sub_uniqueness(self):
+ """Test that different profile IDs create different provider_subs."""
+ profile_ids = ["prof_123", "prof_456", "prof_789"]
+ provider_subs = [f"workos@{pid}" for pid in profile_ids]
+
+ assert len(set(provider_subs)) == len(profile_ids)
+
+
+class TestWorkOSAuthorizationURL:
+ """Test WorkOS authorization URL generation."""
+
+ def test_authorization_url_with_organization_id(self):
+ """Test authorization URL generation with organization_id."""
+ mock_client = MagicMock()
+ mock_client.sso.get_authorization_url.return_value = (
+ "https://api.workos.com/sso/authorize?organization_id=org_123"
+ )
+
+ auth_params = {
+ "redirect_uri": "http://localhost/callback",
+ "organization_id": "org_123",
+ }
+
+ url = mock_client.sso.get_authorization_url(**auth_params)
+ assert "organization_id=org_123" in url
+
+ def test_authorization_url_with_connection_id(self):
+ """Test authorization URL generation with connection_id."""
+ mock_client = MagicMock()
+ mock_client.sso.get_authorization_url.return_value = (
+ "https://api.workos.com/sso/authorize?connection_id=conn_123"
+ )
+
+ auth_params = {
+ "redirect_uri": "http://localhost/callback",
+ "connection_id": "conn_123",
+ }
+
+ url = mock_client.sso.get_authorization_url(**auth_params)
+ assert "connection_id=conn_123" in url
diff --git a/backend/open_webui/utils/oauth.py b/backend/open_webui/utils/oauth.py
index e0bf7582c6d..a28106a8fb0 100644
--- a/backend/open_webui/utils/oauth.py
+++ b/backend/open_webui/utils/oauth.py
@@ -25,6 +25,14 @@
from starlette.responses import RedirectResponse
from typing import Optional
+# WorkOS SDK - conditionally imported
+try:
+ from workos import WorkOSClient
+ WORKOS_AVAILABLE = True
+except ImportError:
+ WORKOS_AVAILABLE = False
+ WorkOSClient = None
+
from open_webui.models.auths import Auths
from open_webui.models.oauth_sessions import OAuthSessions
@@ -54,6 +62,12 @@
WEBHOOK_URL,
JWT_EXPIRES_IN,
AppConfig,
+ # WorkOS configuration
+ WORKOS_CLIENT_ID,
+ WORKOS_API_KEY,
+ WORKOS_ORGANIZATION_ID,
+ WORKOS_CONNECTION_ID,
+ WORKOS_REDIRECT_URI,
)
from open_webui.constants import ERROR_MESSAGES, WEBHOOK_MESSAGES
from open_webui.env import (
@@ -643,8 +657,28 @@ def __init__(self, app):
self.app = app
self._clients = {}
+ self._workos_client = None
+
+ # Initialize WorkOS client if configured
+ if (
+ WORKOS_AVAILABLE
+ and WORKOS_CLIENT_ID.value
+ and WORKOS_API_KEY.value
+ ):
+ try:
+ self._workos_client = WorkOSClient(
+ api_key=WORKOS_API_KEY.value,
+ client_id=WORKOS_CLIENT_ID.value,
+ )
+ log.info("WorkOS SSO client initialized successfully")
+ except Exception as e:
+ log.error(f"Failed to initialize WorkOS client: {e}")
+ self._workos_client = None
for name, provider_config in OAUTH_PROVIDERS.items():
+ # Skip WorkOS as it uses its own SDK
+ if provider_config.get("uses_workos_sdk"):
+ continue
if "register" not in provider_config:
log.error(f"OAuth provider {name} missing register function")
continue
@@ -1085,6 +1119,11 @@ async def _process_picture_url(
async def handle_login(self, request, provider):
if provider not in OAUTH_PROVIDERS:
raise HTTPException(404)
+
+ # Handle WorkOS SSO separately
+ if provider == "workos":
+ return await self._handle_workos_login(request)
+
# If the provider has a custom redirect URL, use that, otherwise automatically generate one
redirect_uri = OAUTH_PROVIDERS[provider].get("redirect_uri") or request.url_for(
"oauth_login_callback", provider=provider
@@ -1094,10 +1133,55 @@ async def handle_login(self, request, provider):
raise HTTPException(404)
return await client.authorize_redirect(request, redirect_uri)
+ async def _handle_workos_login(self, request):
+ """Handle WorkOS SSO login initiation."""
+ if not self._workos_client:
+ log.error("WorkOS client not initialized")
+ raise HTTPException(500, detail="WorkOS SSO not configured")
+
+ redirect_base_url = (
+ str(request.app.state.config.WEBUI_URL or request.base_url)
+ ).rstrip("/")
+
+ # Use configured redirect URI or generate one
+ redirect_uri = WORKOS_REDIRECT_URI.value or f"{redirect_base_url}/oauth/workos/callback"
+
+ try:
+ # Build authorization URL parameters
+ auth_params = {
+ "redirect_uri": redirect_uri,
+ }
+
+ # Use organization_id if provided, otherwise use connection_id
+ if WORKOS_ORGANIZATION_ID.value:
+ auth_params["organization_id"] = WORKOS_ORGANIZATION_ID.value
+ elif WORKOS_CONNECTION_ID.value:
+ auth_params["connection_id"] = WORKOS_CONNECTION_ID.value
+ else:
+ log.error("WorkOS requires either organization_id or connection_id")
+ raise HTTPException(
+ 500,
+ detail="WorkOS SSO requires either WORKOS_ORGANIZATION_ID or WORKOS_CONNECTION_ID",
+ )
+
+ # Generate authorization URL using WorkOS SDK
+ authorization_url = self._workos_client.sso.get_authorization_url(**auth_params)
+
+ log.debug(f"WorkOS authorization URL: {authorization_url}")
+ return RedirectResponse(url=authorization_url)
+
+ except Exception as e:
+ log.error(f"Error generating WorkOS authorization URL: {e}")
+ raise HTTPException(500, detail=f"Failed to initiate WorkOS SSO: {str(e)}")
+
async def handle_callback(self, request, provider, response):
if provider not in OAUTH_PROVIDERS:
raise HTTPException(404)
+ # Handle WorkOS SSO callback separately
+ if provider == "workos":
+ return await self._handle_workos_callback(request, response)
+
error_message = None
try:
client = self.get_client(provider)
@@ -1372,3 +1456,216 @@ async def handle_callback(self, request, provider, response):
log.error(f"Failed to store OAuth session server-side: {e}")
return response
+
+ async def _handle_workos_callback(self, request, response):
+ """Handle WorkOS SSO callback and authenticate user."""
+ if not self._workos_client:
+ log.error("WorkOS client not initialized")
+ raise HTTPException(500, detail="WorkOS SSO not configured")
+
+ error_message = None
+ user = None
+ jwt_token = None
+
+ try:
+ # Get the authorization code from the callback
+ code = request.query_params.get("code")
+ if not code:
+ error = request.query_params.get("error")
+ error_description = request.query_params.get("error_description", "")
+ log.warning(f"WorkOS callback error: {error} - {error_description}")
+ raise HTTPException(400, detail=error_description or ERROR_MESSAGES.INVALID_CRED)
+
+ # Exchange the code for user profile
+ try:
+ profile_and_token = self._workos_client.sso.get_profile_and_token(code)
+ profile = profile_and_token.profile
+ access_token = profile_and_token.access_token
+ except Exception as e:
+ log.warning(f"WorkOS authentication error: {e}")
+ raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
+
+ if not profile:
+ log.warning("WorkOS callback failed, profile is missing")
+ raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
+
+ # Validate organization_id if configured
+ if WORKOS_ORGANIZATION_ID.value and profile.organization_id != WORKOS_ORGANIZATION_ID.value:
+ log.warning(
+ f"WorkOS organization mismatch: expected {WORKOS_ORGANIZATION_ID.value}, got {profile.organization_id}"
+ )
+ raise HTTPException(403, detail=ERROR_MESSAGES.ACCESS_PROHIBITED)
+
+ # Extract user data from WorkOS profile
+ sub = profile.id # WorkOS profile ID
+ email = profile.email
+ if not email:
+ log.warning(f"WorkOS callback failed, email is missing: {profile}")
+ raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
+ email = email.lower()
+
+ # Check email domain restrictions
+ if (
+ "*" not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
+ and email.split("@")[-1] not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
+ ):
+ log.warning(
+ f"WorkOS callback failed, e-mail domain is not in the list of allowed domains: {email}"
+ )
+ raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
+
+ provider_sub = f"workos@{sub}"
+
+ # Build user_data dict for role management compatibility
+ user_data = {
+ "sub": sub,
+ "email": email,
+ "name": f"{profile.first_name or ''} {profile.last_name or ''}".strip() or email,
+ "first_name": profile.first_name,
+ "last_name": profile.last_name,
+ "picture": getattr(profile, "profile_picture_url", None),
+ "connection_id": profile.connection_id,
+ "connection_type": profile.connection_type,
+ "organization_id": profile.organization_id,
+ "idp_id": profile.idp_id,
+ }
+
+ # Add role if available from WorkOS profile
+ if hasattr(profile, "role") and profile.role:
+ user_data["roles"] = [profile.role.slug] if hasattr(profile.role, "slug") else [profile.role]
+
+ # Check if the user exists
+ user = Users.get_user_by_oauth_sub(provider_sub)
+ if not user:
+ # If the user does not exist, check if merging is enabled
+ if auth_manager_config.OAUTH_MERGE_ACCOUNTS_BY_EMAIL:
+ user = Users.get_user_by_email(email)
+ if user:
+ Users.update_user_oauth_sub_by_id(user.id, provider_sub)
+
+ if user:
+ determined_role = self.get_user_role(user, user_data)
+ if user.role != determined_role:
+ Users.update_user_role_by_id(user.id, determined_role)
+ # Update profile picture if enabled
+ if auth_manager_config.OAUTH_UPDATE_PICTURE_ON_LOGIN:
+ picture_url = user_data.get("picture")
+ if picture_url:
+ processed_picture_url = await self._process_picture_url(picture_url)
+ if processed_picture_url != user.profile_image_url:
+ Users.update_user_profile_image_url_by_id(user.id, processed_picture_url)
+ log.debug(f"Updated profile picture for user {user.email}")
+ else:
+ # If the user does not exist, check if signups are enabled
+ if auth_manager_config.ENABLE_OAUTH_SIGNUP:
+ existing_user = Users.get_user_by_email(email)
+ if existing_user:
+ raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
+
+ picture_url = user_data.get("picture")
+ if picture_url:
+ picture_url = await self._process_picture_url(picture_url)
+ else:
+ picture_url = "/user.png"
+
+ name = user_data.get("name") or email
+
+ user = Auths.insert_new_auth(
+ email=email,
+ password=get_password_hash(str(uuid.uuid4())),
+ name=name,
+ profile_image_url=picture_url,
+ role=self.get_user_role(None, user_data),
+ oauth_sub=provider_sub,
+ )
+
+ if auth_manager_config.WEBHOOK_URL:
+ await post_webhook(
+ WEBUI_NAME,
+ auth_manager_config.WEBHOOK_URL,
+ WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
+ {
+ "action": "signup",
+ "message": WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
+ "user": user.model_dump_json(exclude_none=True),
+ },
+ )
+ else:
+ raise HTTPException(
+ status.HTTP_403_FORBIDDEN,
+ detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
+ )
+
+ jwt_token = create_token(
+ data={"id": user.id},
+ expires_delta=parse_duration(auth_manager_config.JWT_EXPIRES_IN),
+ )
+
+ if auth_manager_config.ENABLE_OAUTH_GROUP_MANAGEMENT and user.role != "admin":
+ self.update_user_groups(
+ user=user,
+ user_data=user_data,
+ default_permissions=request.app.state.config.USER_PERMISSIONS,
+ )
+
+ except Exception as e:
+ log.error(f"Error during WorkOS OAuth process: {e}")
+ error_message = (
+ e.detail
+ if isinstance(e, HTTPException) and e.detail
+ else ERROR_MESSAGES.DEFAULT("Error during WorkOS SSO process")
+ )
+
+ redirect_base_url = (
+ str(request.app.state.config.WEBUI_URL or request.base_url)
+ ).rstrip("/")
+ redirect_url = f"{redirect_base_url}/auth"
+
+ if error_message:
+ redirect_url = f"{redirect_url}?error={error_message}"
+ return RedirectResponse(url=redirect_url, headers=response.headers)
+
+ response = RedirectResponse(url=redirect_url, headers=response.headers)
+
+ # Set the cookie token
+ response.set_cookie(
+ key="token",
+ value=jwt_token,
+ httponly=False,
+ samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
+ secure=WEBUI_AUTH_COOKIE_SECURE,
+ )
+
+ try:
+ # Create a pseudo-token for session storage
+ token_data = {
+ "access_token": access_token,
+ "issued_at": datetime.now().timestamp(),
+ "expires_at": datetime.now().timestamp() + 3600, # 1 hour default
+ }
+
+ # Clean up any existing sessions for this user/provider first
+ sessions = OAuthSessions.get_sessions_by_user_id(user.id)
+ for session in sessions:
+ if session.provider == "workos":
+ OAuthSessions.delete_session_by_id(session.id)
+
+ session = OAuthSessions.create_session(
+ user_id=user.id,
+ provider="workos",
+ token=token_data,
+ )
+
+ response.set_cookie(
+ key="oauth_session_id",
+ value=session.id,
+ httponly=True,
+ samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
+ secure=WEBUI_AUTH_COOKIE_SECURE,
+ )
+
+ log.info(f"Stored WorkOS OAuth session server-side for user {user.id}")
+ except Exception as e:
+ log.error(f"Failed to store WorkOS OAuth session server-side: {e}")
+
+ return response
diff --git a/pyproject.toml b/pyproject.toml
index cd5a08fba21..89176fcbc75 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -113,6 +113,8 @@ dependencies = [
"azure-storage-blob==12.24.1",
"ldap3==2.9.1",
+
+ "workos>=5.0.0",
]
readme = "README.md"
requires-python = ">= 3.11, < 3.13.0a1"
diff --git a/src/routes/auth/+page.svelte b/src/routes/auth/+page.svelte
index a6dd15f9979..b300d247aa6 100644
--- a/src/routes/auth/+page.svelte
+++ b/src/routes/auth/+page.svelte
@@ -533,6 +533,34 @@
{$i18n.t('Continue with {{provider}}', { provider: 'Feishu' })}
{/if}
+ {#if $config?.oauth?.providers?.workos}
+
+ {/if}
{/if}