diff --git a/.env.example b/.env.example index 9dc219624..a67aafe93 100644 --- a/.env.example +++ b/.env.example @@ -24,13 +24,18 @@ ATLAN_START_TO_CLOSE_TIMEOUT_SECONDS=7200 # 2 hours # if you plan to deploy the app in customer environment, below is a sample environment variables you need to set ATLAN_WORKFLOW_HOST="tenant-temporal.atlan.com" ATLAN_WORKFLOW_PORT="443" -ATLAN_WORKFLOW_AUTH_ENABLED="true" +ATLAN_AUTH_ENABLED="true" +ATLAN_AUTH_URL="https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token" +ATLAN_WORKFLOW_TLS_ENABLED="true" ENABLE_ATLAN_UPLOAD="true" -ATLAN_DEPLOYMENT_SECRETS='{"appName_app_client_id":"enter_client_id","appName_app_client_secret":"enter_secret","atlan_auth_url":"https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token","workflow_tls_enabled":"true","deployment_name":"agent-v2"}' # Storage: DEPLOYMENT_OBJECT_STORE_NAME="objectstore" -UPSTREAM_OBJECT_STORE_NAME="atlan-storage" +UPSTREAM_OBJECT_STORE_NAME="atlan-objectstore" # Secret Store -SECRET_STORE_NAME="aws-secrets" \ No newline at end of file +SECRET_STORE_NAME="aws-secrets" + +# Client ID/Secret key fields, the value is the name of the key in the secret store +ATLAN_AUTH_CLIENT_ID_KEY="ATLAN_AUTH_CLIENT_ID" +ATLAN_AUTH_CLIENT_SECRET_KEY="ATLAN_AUTH_CLIENT_SECRET" \ No newline at end of file diff --git a/application_sdk/clients/atlan_auth.py b/application_sdk/clients/atlan_auth.py index 9778c09f5..2290a02fc 100644 --- a/application_sdk/clients/atlan_auth.py +++ b/application_sdk/clients/atlan_auth.py @@ -1,17 +1,17 @@ """OAuth2 token manager with automatic secret store discovery.""" import time -from typing import Any, Dict, Optional +from typing import Dict, Optional import aiohttp from application_sdk.common.error_codes import ClientError from application_sdk.constants import ( APPLICATION_NAME, + AUTH_ENABLED, + AUTH_URL, WORKFLOW_AUTH_CLIENT_ID_KEY, WORKFLOW_AUTH_CLIENT_SECRET_KEY, - WORKFLOW_AUTH_ENABLED, - WORKFLOW_AUTH_URL_KEY, ) from application_sdk.observability.logger_adaptor import get_logger from application_sdk.services.secretstore import SecretStore @@ -39,9 +39,8 @@ def __init__(self): (environment variables, AWS Secrets Manager, Azure Key Vault, etc.) """ self.application_name = APPLICATION_NAME - self.auth_config: Dict[str, Any] = SecretStore.get_deployment_secret() - self.auth_enabled: bool = WORKFLOW_AUTH_ENABLED - self.auth_url: Optional[str] = None + self.auth_enabled: bool = AUTH_ENABLED + self.auth_url: Optional[str] = AUTH_URL # Secret store credentials (cached after first fetch) self.credentials: Optional[Dict[str, str]] = None @@ -175,18 +174,17 @@ def get_time_until_expiry(self) -> Optional[float]: async def _extract_auth_credentials(self) -> Optional[Dict[str, str]]: """Fetch app credentials from secret store - auth-specific logic""" - if ( - WORKFLOW_AUTH_CLIENT_ID_KEY in self.auth_config - and WORKFLOW_AUTH_CLIENT_SECRET_KEY in self.auth_config - ): + client_id = SecretStore.get_deployment_secret(WORKFLOW_AUTH_CLIENT_ID_KEY) + client_secret = SecretStore.get_deployment_secret( + WORKFLOW_AUTH_CLIENT_SECRET_KEY + ) + + if client_id and client_secret: credentials = { - "client_id": self.auth_config[WORKFLOW_AUTH_CLIENT_ID_KEY], - "client_secret": self.auth_config[WORKFLOW_AUTH_CLIENT_SECRET_KEY], + "client_id": client_id, + "client_secret": client_secret, } - if WORKFLOW_AUTH_URL_KEY in self.auth_config: - self.auth_url = self.auth_config[WORKFLOW_AUTH_URL_KEY] - return credentials return None @@ -199,10 +197,8 @@ def clear_cache(self) -> None: """ # we are doing this to force a fetch of the credentials from secret store self.credentials = None - self.auth_url = None self._access_token = None self._token_expiry = 0 - self.auth_config = {} def calculate_refresh_interval(self) -> int: """Calculate the optimal token refresh interval based on token expiry. diff --git a/application_sdk/clients/temporal.py b/application_sdk/clients/temporal.py index f024c5c05..bad4e05ca 100644 --- a/application_sdk/clients/temporal.py +++ b/application_sdk/clients/temporal.py @@ -18,14 +18,13 @@ from application_sdk.constants import ( APPLICATION_NAME, DEPLOYMENT_NAME, - DEPLOYMENT_NAME_KEY, IS_LOCKING_DISABLED, MAX_CONCURRENT_ACTIVITIES, WORKFLOW_HOST, WORKFLOW_MAX_TIMEOUT_HOURS, WORKFLOW_NAMESPACE, WORKFLOW_PORT, - WORKFLOW_TLS_ENABLED_KEY, + WORKFLOW_TLS_ENABLED, ) from application_sdk.events.models import ( ApplicationEventNames, @@ -97,7 +96,6 @@ def __init__( self.port = port if port else WORKFLOW_PORT self.namespace = namespace if namespace else WORKFLOW_NAMESPACE - self.deployment_config: Dict[str, Any] = SecretStore.get_deployment_secret() self.worker_task_queue = self.get_worker_task_queue() self.auth_manager = AtlanAuthClient() @@ -118,12 +116,8 @@ def get_worker_task_queue(self) -> str: Returns: str: The task queue name in format "app_name-deployment_name". """ - deployment_name = self.deployment_config.get( - DEPLOYMENT_NAME_KEY, DEPLOYMENT_NAME - ) - - if deployment_name: - return f"atlan-{self.application_name}-{deployment_name}" + if DEPLOYMENT_NAME: + return f"atlan-{self.application_name}-{DEPLOYMENT_NAME}" else: return self.application_name @@ -228,12 +222,9 @@ async def load(self) -> None: connection_options: Dict[str, Any] = { "target_host": self.get_connection_string(), "namespace": self.namespace, - "tls": False, + "tls": WORKFLOW_TLS_ENABLED, } - connection_options["tls"] = self.deployment_config.get( - WORKFLOW_TLS_ENABLED_KEY, False - ) self.worker_task_queue = self.get_worker_task_queue() if self.auth_manager.auth_enabled: diff --git a/application_sdk/constants.py b/application_sdk/constants.py index f51ed6412..2f57345de 100644 --- a/application_sdk/constants.py +++ b/application_sdk/constants.py @@ -105,16 +105,23 @@ DEPLOYMENT_SECRET_PATH = os.getenv( "ATLAN_DEPLOYMENT_SECRET_PATH", "ATLAN_DEPLOYMENT_SECRETS" ) -WORKFLOW_AUTH_ENABLED = ( - os.getenv("ATLAN_WORKFLOW_AUTH_ENABLED", "false").lower() == "true" +AUTH_ENABLED = os.getenv("ATLAN_AUTH_ENABLED", "false").lower() == "true" +#: OAuth2 authentication URL for workflow services +AUTH_URL = os.getenv("ATLAN_AUTH_URL") +#: Whether to enable TLS for Temporal workflow connections +WORKFLOW_TLS_ENABLED = ( + os.getenv("ATLAN_WORKFLOW_TLS_ENABLED", "false").lower() == "true" ) # Deployment Secret Store Key Names -WORKFLOW_AUTH_CLIENT_ID_KEY = f"{APPLICATION_NAME}_app_client_id" -WORKFLOW_AUTH_CLIENT_SECRET_KEY = f"{APPLICATION_NAME}_app_client_secret" -WORKFLOW_AUTH_URL_KEY = "atlan_auth_url" -WORKFLOW_TLS_ENABLED_KEY = "workflow_tls_enabled" -DEPLOYMENT_NAME_KEY = "deployment_name" +#: Key name for OAuth2 client ID in deployment secrets (can be overridden via ATLAN_AUTH_CLIENT_ID_KEY) +WORKFLOW_AUTH_CLIENT_ID_KEY = os.getenv( + "ATLAN_AUTH_CLIENT_ID_KEY", "ATLAN_AUTH_CLIENT_ID" +) +#: Key name for OAuth2 client secret in deployment secrets (can be overridden via ATLAN_AUTH_CLIENT_SECRET_KEY) +WORKFLOW_AUTH_CLIENT_SECRET_KEY = os.getenv( + "ATLAN_AUTH_CLIENT_SECRET_KEY", "ATLAN_AUTH_CLIENT_SECRET" +) # Workflow Constants #: Timeout duration for activity heartbeats diff --git a/application_sdk/events/models.py b/application_sdk/events/models.py index 03c798415..dc5ad9771 100644 --- a/application_sdk/events/models.py +++ b/application_sdk/events/models.py @@ -155,6 +155,7 @@ class WorkerStartEventData(BaseModel): Attributes: application_name: Name of the application the worker belongs to. + deployment_name: Name of the deployment the worker belongs to. task_queue: Task queue name for the worker. namespace: Temporal namespace for the worker. host: Host address of the Temporal server. @@ -167,6 +168,7 @@ class WorkerStartEventData(BaseModel): version: str = WORKER_START_EVENT_VERSION application_name: str + deployment_name: str task_queue: str namespace: str host: str diff --git a/application_sdk/services/secretstore.py b/application_sdk/services/secretstore.py index 34d82aab0..6933adbd2 100644 --- a/application_sdk/services/secretstore.py +++ b/application_sdk/services/secretstore.py @@ -235,41 +235,59 @@ def resolve_credentials( return credentials @classmethod - def get_deployment_secret(cls) -> Dict[str, Any]: - """Get deployment configuration from the deployment secret store. + def get_deployment_secret(cls, key: str) -> Any: + """Get a specific key from deployment configuration in the deployment secret store. Validates that the deployment secret store component is registered before attempting to fetch secrets to prevent errors. This method - is commonly used to retrieve environment-specific configuration. + fetches only the specified key from the deployment secret, rather than + the entire secret dictionary. + + Args: + key (str): The key to fetch from the deployment secret. Returns: - Dict[str, Any]: Deployment configuration data, or empty dict if - component is unavailable or fetch fails. + Any: The value for the specified key, or None if the key is not found + or the component is unavailable. Examples: - >>> # Get deployment configuration - >>> config = SecretStore.get_deployment_secret() - >>> if config: - ... print(f"Environment: {config.get('environment')}") - ... print(f"Region: {config.get('region')}") - >>> else: - ... print("No deployment configuration available") - >>> # Use in application initialization - >>> deployment_config = SecretStore.get_deployment_secret() - >>> if deployment_config.get('debug_mode'): - ... logging.getLogger().setLevel(logging.DEBUG) + >>> # Get a specific deployment configuration value + >>> auth_url = SecretStore.get_deployment_secret("ATLAN_AUTH_CLIENT_ID") + >>> if auth_url: + ... print(f"Auth URL: {auth_url}") + >>> # Get deployment name + >>> deployment_name = SecretStore.get_deployment_secret("deployment_name") + >>> if deployment_name: + ... print(f"Deployment: {deployment_name}") """ if not is_component_registered(DEPLOYMENT_SECRET_STORE_NAME): logger.warning( f"Deployment secret store component '{DEPLOYMENT_SECRET_STORE_NAME}' not registered." ) - return {} + return None try: - return cls.get_secret(DEPLOYMENT_SECRET_PATH, DEPLOYMENT_SECRET_STORE_NAME) + secret_data = cls.get_secret( + DEPLOYMENT_SECRET_PATH, DEPLOYMENT_SECRET_STORE_NAME + ) + if isinstance(secret_data, dict) and key in secret_data: + return secret_data[key] + + logger.debug(f"Multi-key not found, checking single-key secret for '{key}'") + single_secret_data = cls.get_secret(key, DEPLOYMENT_SECRET_STORE_NAME) + if isinstance(single_secret_data, dict): + # Handle both {key:value} and {"value": "..."} cases + if key in single_secret_data: + return single_secret_data[key] + elif len(single_secret_data) == 1: + # extract single value + return list(single_secret_data.values())[0] + + return None + except Exception as e: - logger.error(f"Failed to fetch deployment config: {e}") - return {} + logger.error(f"Failed to fetch deployment config key '{key}': {e}") + return None @classmethod def get_secret( diff --git a/application_sdk/worker.py b/application_sdk/worker.py index 9e503825a..c30dc041f 100644 --- a/application_sdk/worker.py +++ b/application_sdk/worker.py @@ -14,7 +14,7 @@ from temporalio.worker import Worker as TemporalWorker from application_sdk.clients.workflow import WorkflowClient -from application_sdk.constants import MAX_CONCURRENT_ACTIVITIES +from application_sdk.constants import DEPLOYMENT_NAME, MAX_CONCURRENT_ACTIVITIES from application_sdk.events.models import ( ApplicationEventNames, Event, @@ -122,6 +122,7 @@ def __init__( if self.workflow_client: self._worker_creation_event_data = WorkerStartEventData( application_name=self.workflow_client.application_name, + deployment_name=DEPLOYMENT_NAME, task_queue=self.workflow_client.worker_task_queue, namespace=self.workflow_client.namespace, host=self.workflow_client.host, diff --git a/components/atlan-storage.yaml b/components/atlan-storage.yaml index 9e510a083..90ef68710 100644 --- a/components/atlan-storage.yaml +++ b/components/atlan-storage.yaml @@ -1,21 +1,26 @@ apiVersion: dapr.io/v1alpha1 kind: Component metadata: - name: atlan-storage + name: atlan-objectstore spec: - version: v1 type: bindings.aws.s3 - ignoreErrors: true + version: v1 metadata: + - name: bucket + value: atlan-bucket + - name: region + value: us-east-1 + - name: endpoint + value: https://{{tenant}}.atlan.com/api/blobstorage - name: accessKey - value: "{{clientId}}" + secretKeyRef: + name: ATLAN_AUTH_CLIENT_ID + key: ATLAN_AUTH_CLIENT_ID - name: secretKey - value: "{{clientSecret}}" - - name: endpoint - value: "https://{{tenant}}.atlan.com/api/blobstorage" - - name: bucket - value: "atlan-default-bucket" + secretKeyRef: + name: ATLAN_AUTH_CLIENT_SECRET + key: ATLAN_AUTH_CLIENT_SECRET - name: forcePathStyle value: "true" - - name: region - value: "us-east-1" \ No newline at end of file +auth: + secretStore: deployment-secret-store \ No newline at end of file diff --git a/docs/docs/concepts/temporal_auth.md b/docs/docs/concepts/temporal_auth.md index 1c77e2016..b8eb813e6 100644 --- a/docs/docs/concepts/temporal_auth.md +++ b/docs/docs/concepts/temporal_auth.md @@ -58,8 +58,8 @@ The authentication system uses a Dapr secret store component that can be configu **Environment Variables:** ```bash # Authentication settings -ATLAN_WORKFLOW_AUTH_ENABLED=true -ATLAN_WORKFLOW_AUTH_URL=https://your-oauth-provider.com/oauth/token +ATLAN_AUTH_ENABLED=true +ATLAN_AUTH_URL=https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token # Secret store component configuration ATLAN_DEPLOYMENT_SECRET_COMPONENT=deployment-secret-store @@ -329,7 +329,7 @@ dapr invoke --app-id your-app --method get-secret --data '{"key": "atlan-deploym **Symptom**: Token refresh failures ```bash # Verify auth URL accessibility -curl -X POST $ATLAN_WORKFLOW_AUTH_URL \ +curl -X POST $ATLAN_AUTH_URL \ -d "grant_type=client_credentials&client_id=...&client_secret=..." # Check credential validity in secret store @@ -507,8 +507,8 @@ spec: **Environment Variables:** ```bash # Authentication settings -ATLAN_WORKFLOW_AUTH_ENABLED=true -ATLAN_WORKFLOW_AUTH_URL=https://your-oauth-server.com/oauth/token +ATLAN_AUTH_ENABLED=true +ATLAN_AUTH_URL=https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token # Secret store configuration ATLAN_DEPLOYMENT_SECRET_COMPONENT=deployment-secret-store diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 641eedd43..5ce8882bd 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -30,7 +30,7 @@ The Application SDK uses environment variables for configuration. These can be s | `ATLAN_MAX_CONCURRENT_ACTIVITIES` | Maximum number of activities that can run concurrently | `5` | Controls resource usage and prevents overwhelming target systems | | `ATLAN_HEARTBEAT_TIMEOUT_SECONDS` | Timeout duration for activity heartbeats (in seconds) | `300` | Detects stuck activities and enables recovery | | `ATLAN_START_TO_CLOSE_TIMEOUT_SECONDS` | Maximum duration an activity can run before timing out (in seconds) | `7200` | Prevents activities from running indefinitely | -| `ATLAN_WORKFLOW_AUTH_ENABLED` | Whether to enable authentication for Temporal workflows | `false` | Used in production deployments with secure Temporal clusters | +| `ATLAN_AUTH_ENABLED` | Whether to enable authentication for Temporal workflows | `false` | Used in production deployments with secure Temporal clusters | | `ATLAN_DEPLOYMENT_SECRET_PATH` | Path to deployment secrets in secret store | `ATLAN_DEPLOYMENT_SECRETS` | Contains authentication credentials for production | ## SQL Client Configuration @@ -129,7 +129,7 @@ For local development, most defaults work out of the box. Key configurations to ### Production Deployment For production deployments, consider these essential configurations: -- `ATLAN_WORKFLOW_AUTH_ENABLED=true`: Enable authentication for Temporal +- `ATLAN_AUTH_ENABLED=true`: Enable authentication for Temporal - `ENABLE_ATLAN_UPLOAD=true`: Enable data upload to Atlan platform - `ATLAN_DEPLOYMENT_SECRETS`: Configure authentication secrets - `ATLAN_WORKFLOW_HOST` and `ATLAN_WORKFLOW_PORT`: Point to production Temporal cluster diff --git a/tests/conftest.py b/tests/conftest.py index 9bb0655db..a857490ff 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,9 +8,14 @@ @pytest.fixture(autouse=True) def mock_secret_store(): """Automatically mock SecretStore.get_deployment_secret for all tests.""" + + def mock_get_deployment_secret(key: str): + """Default mock that returns None for all keys.""" + return None + with patch( "application_sdk.services.secretstore.SecretStore.get_deployment_secret", - return_value={}, + side_effect=mock_get_deployment_secret, ): yield diff --git a/tests/unit/clients/test_atlan_auth.py b/tests/unit/clients/test_atlan_auth.py index 15bc382c0..1ff82f81f 100644 --- a/tests/unit/clients/test_atlan_auth.py +++ b/tests/unit/clients/test_atlan_auth.py @@ -11,15 +11,18 @@ @pytest.fixture async def auth_client() -> AtlanAuthClient: """Create an AtlanAuthClient instance for testing.""" - mock_config = { - "test_app_client_id": "test-client", - "test_app_client_secret": "test-secret", - "workflow_auth_enabled": True, - "workflow_auth_url": "http://auth.test/token", - } - - with patch("application_sdk.constants.WORKFLOW_AUTH_ENABLED", True), patch( - "application_sdk.constants.WORKFLOW_AUTH_URL_KEY", "workflow_auth_url" + + def mock_get_deployment_secret(key: str): + """Mock get_deployment_secret to return values based on key.""" + mock_config = { + "test_app_client_id": "test-client", + "test_app_client_secret": "test-secret", + "workflow_auth_url": "http://auth.test/token", + } + return mock_config.get(key) + + with patch("application_sdk.constants.AUTH_ENABLED", True), patch( + "application_sdk.constants.AUTH_URL", "http://auth.test/token" ), patch("application_sdk.constants.APPLICATION_NAME", "test-app"), patch( "application_sdk.clients.atlan_auth.APPLICATION_NAME", "test-app" ), patch( @@ -29,7 +32,7 @@ async def auth_client() -> AtlanAuthClient: "test_app_client_secret", ), patch( "application_sdk.clients.atlan_auth.SecretStore.get_deployment_secret", - return_value=mock_config, + side_effect=mock_get_deployment_secret, ): client = AtlanAuthClient() return client @@ -52,7 +55,7 @@ async def test_credential_discovery_failure(auth_client: AtlanAuthClient) -> Non with patch( "application_sdk.clients.atlan_auth.SecretStore.get_deployment_secret", - return_value={}, # Empty config means no credentials + return_value=None, # Empty config means no credentials ): credentials = await auth_client_no_fallback._extract_auth_credentials() assert credentials is None diff --git a/tests/unit/clients/test_atlanauth.py b/tests/unit/clients/test_atlanauth.py index 092da51e3..de0b35b1d 100644 --- a/tests/unit/clients/test_atlanauth.py +++ b/tests/unit/clients/test_atlanauth.py @@ -11,16 +11,18 @@ @pytest.fixture async def auth_client() -> AtlanAuthClient: """Create an AtlanAuthClient instance for testing.""" - mock_config = { - "test_app_client_id": "test-client", - "test_app_client_secret": "test-secret", - "workflow_auth_enabled": True, - "workflow_auth_url": "http://auth.test/token", - } + + def mock_get_deployment_secret(key: str): + mock_config = { + "test_app_client_id": "test-client", + "test_app_client_secret": "test-secret", + "workflow_auth_url": "http://auth.test/token", + } + return mock_config.get(key) with patch( "application_sdk.clients.atlan_auth.SecretStore.get_deployment_secret", - return_value=mock_config, + side_effect=mock_get_deployment_secret, ): client = AtlanAuthClient() return client @@ -40,7 +42,7 @@ async def test_credential_discovery_failure(auth_client: AtlanAuthClient) -> Non # Create an auth client with empty config with patch( "application_sdk.clients.atlan_auth.SecretStore.get_deployment_secret", - return_value={}, # Empty config means no credentials + return_value=None, # Empty config means no credentials ): auth_client_no_fallback = AtlanAuthClient() diff --git a/tests/unit/clients/test_temporal_client.py b/tests/unit/clients/test_temporal_client.py index ab2df1c54..49389b162 100644 --- a/tests/unit/clients/test_temporal_client.py +++ b/tests/unit/clients/test_temporal_client.py @@ -17,9 +17,14 @@ class MockWorkflow(WorkflowInterface): @pytest.fixture def temporal_client() -> TemporalWorkflowClient: """Create a TemporalWorkflowClient instance for testing.""" + + def mock_get_deployment_secret(key: str): + # Return None for all keys by default (tests can override if needed) + return None + with patch( "application_sdk.clients.temporal.SecretStore.get_deployment_secret", - return_value={}, + side_effect=mock_get_deployment_secret, ): return TemporalWorkflowClient( host="localhost", @@ -57,8 +62,8 @@ async def test_load( temporal_client: TemporalWorkflowClient, ): """Test loading the temporal client.""" - # Mock the deployment config to return empty dict (auth disabled) - mock_get_config.return_value = {} + # Mock the deployment config to return None for all keys (auth disabled) + mock_get_config.return_value = None # Mock the client connection mock_client = AsyncMock()