Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ ATLAN_START_TO_CLOSE_TIMEOUT_SECONDS=7200 # 2 hours
# if you plan to deploy the app in customer environment, below is a sample environment variables you need to set
ATLAN_WORKFLOW_HOST="tenant-temporal.atlan.com"
ATLAN_WORKFLOW_PORT="443"
ATLAN_WORKFLOW_AUTH_ENABLED="true"
ATLAN_AUTH_ENABLED="true"
ATLAN_AUTH_URL="https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token"
ATLAN_WORKFLOW_TLS_ENABLED="true"
ENABLE_ATLAN_UPLOAD="true"
ATLAN_DEPLOYMENT_SECRETS='{"appName_app_client_id":"enter_client_id","appName_app_client_secret":"enter_secret","atlan_auth_url":"https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token","workflow_tls_enabled":"true","deployment_name":"agent-v2"}'

# Storage:
DEPLOYMENT_OBJECT_STORE_NAME="objectstore"
UPSTREAM_OBJECT_STORE_NAME="atlan-storage"
UPSTREAM_OBJECT_STORE_NAME="atlan-objectstore"

# Secret Store
SECRET_STORE_NAME="aws-secrets"
SECRET_STORE_NAME="aws-secrets"

# Client ID/Secret key fields, the value is the name of the key in the secret store
ATLAN_AUTH_CLIENT_ID_KEY="ATLAN_AUTH_CLIENT_ID"
ATLAN_AUTH_CLIENT_SECRET_KEY="ATLAN_AUTH_CLIENT_SECRET"
30 changes: 13 additions & 17 deletions application_sdk/clients/atlan_auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""OAuth2 token manager with automatic secret store discovery."""

import time
from typing import Any, Dict, Optional
from typing import Dict, Optional

import aiohttp

from application_sdk.common.error_codes import ClientError
from application_sdk.constants import (
APPLICATION_NAME,
AUTH_ENABLED,
AUTH_URL,
WORKFLOW_AUTH_CLIENT_ID_KEY,
WORKFLOW_AUTH_CLIENT_SECRET_KEY,
WORKFLOW_AUTH_ENABLED,
WORKFLOW_AUTH_URL_KEY,
)
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.services.secretstore import SecretStore
Expand Down Expand Up @@ -39,9 +39,8 @@ def __init__(self):
(environment variables, AWS Secrets Manager, Azure Key Vault, etc.)
"""
self.application_name = APPLICATION_NAME
self.auth_config: Dict[str, Any] = SecretStore.get_deployment_secret()
self.auth_enabled: bool = WORKFLOW_AUTH_ENABLED
self.auth_url: Optional[str] = None
self.auth_enabled: bool = AUTH_ENABLED
self.auth_url: Optional[str] = AUTH_URL

# Secret store credentials (cached after first fetch)
self.credentials: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -175,18 +174,17 @@ def get_time_until_expiry(self) -> Optional[float]:

async def _extract_auth_credentials(self) -> Optional[Dict[str, str]]:
"""Fetch app credentials from secret store - auth-specific logic"""
if (
WORKFLOW_AUTH_CLIENT_ID_KEY in self.auth_config
and WORKFLOW_AUTH_CLIENT_SECRET_KEY in self.auth_config
):
client_id = SecretStore.get_deployment_secret(WORKFLOW_AUTH_CLIENT_ID_KEY)
client_secret = SecretStore.get_deployment_secret(
WORKFLOW_AUTH_CLIENT_SECRET_KEY
)

if client_id and client_secret:
credentials = {
"client_id": self.auth_config[WORKFLOW_AUTH_CLIENT_ID_KEY],
"client_secret": self.auth_config[WORKFLOW_AUTH_CLIENT_SECRET_KEY],
"client_id": client_id,
"client_secret": client_secret,
}

if WORKFLOW_AUTH_URL_KEY in self.auth_config:
self.auth_url = self.auth_config[WORKFLOW_AUTH_URL_KEY]

return credentials
return None

Expand All @@ -199,10 +197,8 @@ def clear_cache(self) -> None:
"""
# we are doing this to force a fetch of the credentials from secret store
self.credentials = None
self.auth_url = None
self._access_token = None
self._token_expiry = 0
self.auth_config = {}

def calculate_refresh_interval(self) -> int:
"""Calculate the optimal token refresh interval based on token expiry.
Expand Down
17 changes: 4 additions & 13 deletions application_sdk/clients/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
from application_sdk.constants import (
APPLICATION_NAME,
DEPLOYMENT_NAME,
DEPLOYMENT_NAME_KEY,
IS_LOCKING_DISABLED,
MAX_CONCURRENT_ACTIVITIES,
WORKFLOW_HOST,
WORKFLOW_MAX_TIMEOUT_HOURS,
WORKFLOW_NAMESPACE,
WORKFLOW_PORT,
WORKFLOW_TLS_ENABLED_KEY,
WORKFLOW_TLS_ENABLED,
)
from application_sdk.events.models import (
ApplicationEventNames,
Expand Down Expand Up @@ -97,7 +96,6 @@ def __init__(
self.port = port if port else WORKFLOW_PORT
self.namespace = namespace if namespace else WORKFLOW_NAMESPACE

self.deployment_config: Dict[str, Any] = SecretStore.get_deployment_secret()
self.worker_task_queue = self.get_worker_task_queue()
self.auth_manager = AtlanAuthClient()

Expand All @@ -118,12 +116,8 @@ def get_worker_task_queue(self) -> str:
Returns:
str: The task queue name in format "app_name-deployment_name".
"""
deployment_name = self.deployment_config.get(
DEPLOYMENT_NAME_KEY, DEPLOYMENT_NAME
)

if deployment_name:
return f"atlan-{self.application_name}-{deployment_name}"
if DEPLOYMENT_NAME:
return f"atlan-{self.application_name}-{DEPLOYMENT_NAME}"
else:
return self.application_name

Expand Down Expand Up @@ -228,12 +222,9 @@ async def load(self) -> None:
connection_options: Dict[str, Any] = {
"target_host": self.get_connection_string(),
"namespace": self.namespace,
"tls": False,
"tls": WORKFLOW_TLS_ENABLED,
}

connection_options["tls"] = self.deployment_config.get(
WORKFLOW_TLS_ENABLED_KEY, False
)
self.worker_task_queue = self.get_worker_task_queue()

if self.auth_manager.auth_enabled:
Expand Down
21 changes: 14 additions & 7 deletions application_sdk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,23 @@
DEPLOYMENT_SECRET_PATH = os.getenv(
"ATLAN_DEPLOYMENT_SECRET_PATH", "ATLAN_DEPLOYMENT_SECRETS"
)
WORKFLOW_AUTH_ENABLED = (
os.getenv("ATLAN_WORKFLOW_AUTH_ENABLED", "false").lower() == "true"
AUTH_ENABLED = os.getenv("ATLAN_AUTH_ENABLED", "false").lower() == "true"
#: OAuth2 authentication URL for workflow services
AUTH_URL = os.getenv("ATLAN_AUTH_URL")
#: Whether to enable TLS for Temporal workflow connections
WORKFLOW_TLS_ENABLED = (
os.getenv("ATLAN_WORKFLOW_TLS_ENABLED", "false").lower() == "true"
)

# Deployment Secret Store Key Names
WORKFLOW_AUTH_CLIENT_ID_KEY = f"{APPLICATION_NAME}_app_client_id"
WORKFLOW_AUTH_CLIENT_SECRET_KEY = f"{APPLICATION_NAME}_app_client_secret"
WORKFLOW_AUTH_URL_KEY = "atlan_auth_url"
WORKFLOW_TLS_ENABLED_KEY = "workflow_tls_enabled"
DEPLOYMENT_NAME_KEY = "deployment_name"
#: Key name for OAuth2 client ID in deployment secrets (can be overridden via ATLAN_AUTH_CLIENT_ID_KEY)
WORKFLOW_AUTH_CLIENT_ID_KEY = os.getenv(
"ATLAN_AUTH_CLIENT_ID_KEY", "ATLAN_AUTH_CLIENT_ID"
)
#: Key name for OAuth2 client secret in deployment secrets (can be overridden via ATLAN_AUTH_CLIENT_SECRET_KEY)
WORKFLOW_AUTH_CLIENT_SECRET_KEY = os.getenv(
"ATLAN_AUTH_CLIENT_SECRET_KEY", "ATLAN_AUTH_CLIENT_SECRET"
)

# Workflow Constants
#: Timeout duration for activity heartbeats
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class WorkerStartEventData(BaseModel):

Attributes:
application_name: Name of the application the worker belongs to.
deployment_name: Name of the deployment the worker belongs to.
task_queue: Task queue name for the worker.
namespace: Temporal namespace for the worker.
host: Host address of the Temporal server.
Expand All @@ -167,6 +168,7 @@ class WorkerStartEventData(BaseModel):

version: str = WORKER_START_EVENT_VERSION
application_name: str
deployment_name: str
task_queue: str
namespace: str
host: str
Expand Down
58 changes: 38 additions & 20 deletions application_sdk/services/secretstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,41 +235,59 @@ def resolve_credentials(
return credentials

@classmethod
def get_deployment_secret(cls) -> Dict[str, Any]:
"""Get deployment configuration from the deployment secret store.
def get_deployment_secret(cls, key: str) -> Any:
"""Get a specific key from deployment configuration in the deployment secret store.

Validates that the deployment secret store component is registered
before attempting to fetch secrets to prevent errors. This method
is commonly used to retrieve environment-specific configuration.
fetches only the specified key from the deployment secret, rather than
the entire secret dictionary.

Args:
key (str): The key to fetch from the deployment secret.

Returns:
Dict[str, Any]: Deployment configuration data, or empty dict if
component is unavailable or fetch fails.
Any: The value for the specified key, or None if the key is not found
or the component is unavailable.

Examples:
>>> # Get deployment configuration
>>> config = SecretStore.get_deployment_secret()
>>> if config:
... print(f"Environment: {config.get('environment')}")
... print(f"Region: {config.get('region')}")
>>> else:
... print("No deployment configuration available")
>>> # Use in application initialization
>>> deployment_config = SecretStore.get_deployment_secret()
>>> if deployment_config.get('debug_mode'):
... logging.getLogger().setLevel(logging.DEBUG)
>>> # Get a specific deployment configuration value
>>> auth_url = SecretStore.get_deployment_secret("ATLAN_AUTH_CLIENT_ID")
>>> if auth_url:
... print(f"Auth URL: {auth_url}")
>>> # Get deployment name
>>> deployment_name = SecretStore.get_deployment_secret("deployment_name")
>>> if deployment_name:
... print(f"Deployment: {deployment_name}")
"""
if not is_component_registered(DEPLOYMENT_SECRET_STORE_NAME):
logger.warning(
f"Deployment secret store component '{DEPLOYMENT_SECRET_STORE_NAME}' not registered."
)
return {}
return None

try:
return cls.get_secret(DEPLOYMENT_SECRET_PATH, DEPLOYMENT_SECRET_STORE_NAME)
secret_data = cls.get_secret(
DEPLOYMENT_SECRET_PATH, DEPLOYMENT_SECRET_STORE_NAME
)
if isinstance(secret_data, dict) and key in secret_data:
return secret_data[key]

logger.debug(f"Multi-key not found, checking single-key secret for '{key}'")
single_secret_data = cls.get_secret(key, DEPLOYMENT_SECRET_STORE_NAME)
if isinstance(single_secret_data, dict):
# Handle both {key:value} and {"value": "..."} cases
if key in single_secret_data:
return single_secret_data[key]
elif len(single_secret_data) == 1:
# extract single value
return list(single_secret_data.values())[0]

return None

except Exception as e:
logger.error(f"Failed to fetch deployment config: {e}")
return {}
logger.error(f"Failed to fetch deployment config key '{key}': {e}")
return None

@classmethod
def get_secret(
Expand Down
3 changes: 2 additions & 1 deletion application_sdk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from temporalio.worker import Worker as TemporalWorker

from application_sdk.clients.workflow import WorkflowClient
from application_sdk.constants import MAX_CONCURRENT_ACTIVITIES
from application_sdk.constants import DEPLOYMENT_NAME, MAX_CONCURRENT_ACTIVITIES
from application_sdk.events.models import (
ApplicationEventNames,
Event,
Expand Down Expand Up @@ -122,6 +122,7 @@ def __init__(
if self.workflow_client:
self._worker_creation_event_data = WorkerStartEventData(
application_name=self.workflow_client.application_name,
deployment_name=DEPLOYMENT_NAME,
task_queue=self.workflow_client.worker_task_queue,
namespace=self.workflow_client.namespace,
host=self.workflow_client.host,
Expand Down
27 changes: 16 additions & 11 deletions components/atlan-storage.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: atlan-storage
name: atlan-objectstore
spec:
version: v1
type: bindings.aws.s3
ignoreErrors: true
version: v1
metadata:
- name: bucket
value: atlan-bucket
- name: region
value: us-east-1
- name: endpoint
value: https://{{tenant}}.atlan.com/api/blobstorage
- name: accessKey
value: "{{clientId}}"
secretKeyRef:
name: ATLAN_AUTH_CLIENT_ID
key: ATLAN_AUTH_CLIENT_ID
- name: secretKey
value: "{{clientSecret}}"
- name: endpoint
value: "https://{{tenant}}.atlan.com/api/blobstorage"
- name: bucket
value: "atlan-default-bucket"
secretKeyRef:
name: ATLAN_AUTH_CLIENT_SECRET
key: ATLAN_AUTH_CLIENT_SECRET
- name: forcePathStyle
value: "true"
- name: region
value: "us-east-1"
auth:
secretStore: deployment-secret-store
10 changes: 5 additions & 5 deletions docs/docs/concepts/temporal_auth.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ The authentication system uses a Dapr secret store component that can be configu
**Environment Variables:**
```bash
# Authentication settings
ATLAN_WORKFLOW_AUTH_ENABLED=true
ATLAN_WORKFLOW_AUTH_URL=https://your-oauth-provider.com/oauth/token
ATLAN_AUTH_ENABLED=true
ATLAN_AUTH_URL=https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token

# Secret store component configuration
ATLAN_DEPLOYMENT_SECRET_COMPONENT=deployment-secret-store
Expand Down Expand Up @@ -329,7 +329,7 @@ dapr invoke --app-id your-app --method get-secret --data '{"key": "atlan-deploym
**Symptom**: Token refresh failures
```bash
# Verify auth URL accessibility
curl -X POST $ATLAN_WORKFLOW_AUTH_URL \
curl -X POST $ATLAN_AUTH_URL \
-d "grant_type=client_credentials&client_id=...&client_secret=..."

# Check credential validity in secret store
Expand Down Expand Up @@ -507,8 +507,8 @@ spec:
**Environment Variables:**
```bash
# Authentication settings
ATLAN_WORKFLOW_AUTH_ENABLED=true
ATLAN_WORKFLOW_AUTH_URL=https://your-oauth-server.com/oauth/token
ATLAN_AUTH_ENABLED=true
ATLAN_AUTH_URL=https://tenant.atlan.com/auth/realms/default/protocol/openid-connect/token

# Secret store configuration
ATLAN_DEPLOYMENT_SECRET_COMPONENT=deployment-secret-store
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The Application SDK uses environment variables for configuration. These can be s
| `ATLAN_MAX_CONCURRENT_ACTIVITIES` | Maximum number of activities that can run concurrently | `5` | Controls resource usage and prevents overwhelming target systems |
| `ATLAN_HEARTBEAT_TIMEOUT_SECONDS` | Timeout duration for activity heartbeats (in seconds) | `300` | Detects stuck activities and enables recovery |
| `ATLAN_START_TO_CLOSE_TIMEOUT_SECONDS` | Maximum duration an activity can run before timing out (in seconds) | `7200` | Prevents activities from running indefinitely |
| `ATLAN_WORKFLOW_AUTH_ENABLED` | Whether to enable authentication for Temporal workflows | `false` | Used in production deployments with secure Temporal clusters |
| `ATLAN_AUTH_ENABLED` | Whether to enable authentication for Temporal workflows | `false` | Used in production deployments with secure Temporal clusters |
| `ATLAN_DEPLOYMENT_SECRET_PATH` | Path to deployment secrets in secret store | `ATLAN_DEPLOYMENT_SECRETS` | Contains authentication credentials for production |

## SQL Client Configuration
Expand Down Expand Up @@ -129,7 +129,7 @@ For local development, most defaults work out of the box. Key configurations to

### Production Deployment
For production deployments, consider these essential configurations:
- `ATLAN_WORKFLOW_AUTH_ENABLED=true`: Enable authentication for Temporal
- `ATLAN_AUTH_ENABLED=true`: Enable authentication for Temporal
- `ENABLE_ATLAN_UPLOAD=true`: Enable data upload to Atlan platform
- `ATLAN_DEPLOYMENT_SECRETS`: Configure authentication secrets
- `ATLAN_WORKFLOW_HOST` and `ATLAN_WORKFLOW_PORT`: Point to production Temporal cluster
Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
@pytest.fixture(autouse=True)
def mock_secret_store():
"""Automatically mock SecretStore.get_deployment_secret for all tests."""

def mock_get_deployment_secret(key: str):
"""Default mock that returns None for all keys."""
return None

with patch(
"application_sdk.services.secretstore.SecretStore.get_deployment_secret",
return_value={},
side_effect=mock_get_deployment_secret,
):
yield

Expand Down
Loading
Loading