From 427d434d9de3c4e7a1038facf6a0a22202a28119 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Fri, 10 Oct 2025 00:38:22 +0200 Subject: [PATCH] Add dbt-deltastream documentation --- website/docs/docs/community-adapters.md | 19 +- .../deltastream-setup.md | 96 +++ .../resource-configs/deltastream-configs.md | 738 ++++++++++++++++++ website/sidebars.js | 2 + 4 files changed, 846 insertions(+), 9 deletions(-) create mode 100644 website/docs/docs/core/connect-data-platform/deltastream-setup.md create mode 100644 website/docs/reference/resource-configs/deltastream-configs.md diff --git a/website/docs/docs/community-adapters.md b/website/docs/docs/community-adapters.md index ffd91f633c7..b3c81a11a99 100644 --- a/website/docs/docs/community-adapters.md +++ b/website/docs/docs/community-adapters.md @@ -7,12 +7,13 @@ Community adapters are adapter plugins contributed and maintained by members of | Data platforms (click to view setup guide) ||| | ------------------------------------------ | -------------------------------- | ------------------------------------- | -| [CrateDB](/docs/core/connect-data-platform/cratedb-setup) | [MaxCompute](/docs/core/connect-data-platform/maxcompute-setup) | [Databend Cloud](/docs/core/connect-data-platform/databend-setup) | -| [Doris & SelectDB](/docs/core/connect-data-platform/doris-setup) | [DuckDB](/docs/core/connect-data-platform/duckdb-setup) | [Exasol Analytics](/docs/core/connect-data-platform/exasol-setup) | -|[Extrica](/docs/core/connect-data-platform/extrica-setup) | [Hive](/docs/core/connect-data-platform/hive-setup) | [IBM DB2](/docs/core/connect-data-platform/ibmdb2-setup) | -| [Impala](/docs/core/connect-data-platform/impala-setup) | [Infer](/docs/core/connect-data-platform/infer-setup) | [iomete](/docs/core/connect-data-platform/iomete-setup) | -| [MindsDB](/docs/core/connect-data-platform/mindsdb-setup) | [MySQL](/docs/core/connect-data-platform/mysql-setup) | [RisingWave](/docs/core/connect-data-platform/risingwave-setup) | -| [Rockset](/docs/core/connect-data-platform/rockset-setup) | [SingleStore](/docs/core/connect-data-platform/singlestore-setup)| [SQL Server & Azure SQL](/docs/core/connect-data-platform/mssql-setup) | -| [SQLite](/docs/core/connect-data-platform/sqlite-setup) | [Starrocks](/docs/core/connect-data-platform/starrocks-setup) | [TiDB](/docs/core/connect-data-platform/tidb-setup)| -| [TimescaleDB](https://dbt-timescaledb.debruyn.dev/) | [Upsolver](/docs/core/connect-data-platform/upsolver-setup) | [Vertica](/docs/core/connect-data-platform/vertica-setup) | -| [Watsonx-Presto](/docs/core/connect-data-platform/watsonx-presto-setup) | [IBM watsonx.data - Spark](/docs/core/connect-data-platform/watsonx-spark-setup) | [Yellowbrick](/docs/core/connect-data-platform/yellowbrick-setup) | +| [CrateDB](/docs/core/connect-data-platform/cratedb-setup) | [Databend Cloud](/docs/core/connect-data-platform/databend-setup) | [DeltaStream](/docs/core/connect-data-platform/deltastream-setup) | +| [Doris & SelectDB](/docs/core/connect-data-platform/doris-setup) | [DuckDB](/docs/core/connect-data-platform/duckdb-setup) | [Exasol Analytics](/docs/core/connect-data-platform/exasol-setup) | +| [Extrica](/docs/core/connect-data-platform/extrica-setup) | [Hive](/docs/core/connect-data-platform/hive-setup) | [IBM DB2](/docs/core/connect-data-platform/ibmdb2-setup) | +| [IBM watsonx.data - Spark](/docs/core/connect-data-platform/watsonx-spark-setup) | [Impala](/docs/core/connect-data-platform/impala-setup) | [Infer](/docs/core/connect-data-platform/infer-setup) | +| [iomete](/docs/core/connect-data-platform/iomete-setup) | [MaxCompute](/docs/core/connect-data-platform/maxcompute-setup) | [MindsDB](/docs/core/connect-data-platform/mindsdb-setup) | +| [MySQL](/docs/core/connect-data-platform/mysql-setup) | [RisingWave](/docs/core/connect-data-platform/risingwave-setup) | [Rockset](/docs/core/connect-data-platform/rockset-setup) | +| [SingleStore](/docs/core/connect-data-platform/singlestore-setup) | [SQL Server & Azure SQL](/docs/core/connect-data-platform/mssql-setup) | [SQLite](/docs/core/connect-data-platform/sqlite-setup) | +| [Starrocks](/docs/core/connect-data-platform/starrocks-setup) | [TiDB](/docs/core/connect-data-platform/tidb-setup) | [TimescaleDB](https://dbt-timescaledb.debruyn.dev/) | +| [Upsolver](/docs/core/connect-data-platform/upsolver-setup) | [Vertica](/docs/core/connect-data-platform/vertica-setup) | [Watsonx-Presto](/docs/core/connect-data-platform/watsonx-presto-setup) | +| [Yellowbrick](/docs/core/connect-data-platform/yellowbrick-setup) | | | diff --git a/website/docs/docs/core/connect-data-platform/deltastream-setup.md b/website/docs/docs/core/connect-data-platform/deltastream-setup.md new file mode 100644 index 00000000000..6f01e05eaa5 --- /dev/null +++ b/website/docs/docs/core/connect-data-platform/deltastream-setup.md @@ -0,0 +1,96 @@ +--- +title: "DeltaStream setup" +description: "Read this guide to learn about the DeltaStream warehouse setup in dbt." +meta: + maintained_by: Community + authors: 'DeltaStream Team' + github_repo: 'deltastreaminc/dbt-deltastream' + pypi_package: 'dbt-deltastream' + min_core_version: 'v1.10.0' + cloud_support: Not Supported + min_supported_version: '?' + slack_channel_name: '#db-deltastream' + platform_name: 'DeltaStream' + config_page: '/reference/resource-configs/deltastream-configs' +--- + +import SetUpPages from '/snippets/_setup-pages-intro.md'; + + + +## Connecting to DeltaStream with **dbt-deltastream** + +To connect to DeltaStream from dbt, you'll need to add a [profile](/docs/core/connect-data-platform/connection-profiles) +to your `profiles.yml` file. A DeltaStream profile conforms to the following syntax: + + + +```yaml +: + target: + outputs: + : + type: deltastream + + # Required Parameters + token: [ your-api-token ] # Authentication token for DeltaStream API + database: [ your-database ] # Target database name + schema: [ your-schema ] # Target schema name + organization_id: [ your-org-id ] # Organization identifier + + # Optional Parameters + url: [ https://api.deltastream.io/v2 ] # DeltaStream API URL, defaults to https://api.deltastream.io/v2 + timezone: [ UTC ] # Timezone for operations, defaults to UTC + session_id: [ ] # Custom session identifier for debugging purpose + role: [ ] # User role + store: [ ] # Target store name + compute_pool: [ ] # Compute pool name to be used if any else use the default compute pool +``` + + + +### Description of DeltaStream Profile Fields + +| Field | Required | Description | +|-------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `type` | ✅ | This must be included either in `profiles.yml` or in the `dbt_project.yml` file. Must be set to `deltastream`. | +| `token` | ✅ | Authentication token for DeltaStream API. This should be stored securely, preferably as an environment variable. | +| `database` | ✅ | Target default database name in DeltaStream where your dbt models will be created. | +| `schema` | ✅ | Target default schema name within the specified database. | +| `organization_id` | ✅ | Organization identifier that determines which DeltaStream organization you're connecting to. | +| `url` | ❌ | DeltaStream API URL. Defaults to `https://api.deltastream.io/v2` if not specified. | +| `timezone` | ❌ | Timezone for operations. Defaults to `UTC` if not specified. | +| `session_id` | ❌ | Custom session identifier for debugging purposes. Helps track operations in DeltaStream logs. | +| `role` | ❌ | User role within the DeltaStream organization. If not specified, uses the default role associated with the token. | +| `store` | ❌ | Target default store name. Stores represent external system connections (Kafka, PostgreSQL, etc.) in DeltaStream. | +| `compute_pool` | ❌ | Compute pool name to be used for models that require computational resources. If not specified, uses the default compute pool. | + +## Security Best Practices + +When configuring your project for production, it is strongly recommended to use environment variables to store sensitive information such as the authentication token: + + + +```yaml +your_profile_name: + target: prod + outputs: + prod: + type: deltastream + token: "{{ env_var('DELTASTREAM_API_TOKEN') }}" + database: "{{ env_var('DELTASTREAM_DATABASE') }}" + schema: "{{ env_var('DELTASTREAM_SCHEMA') }}" + organization_id: "{{ env_var('DELTASTREAM_ORG_ID') }}" +``` + + + +## Troubleshooting Connections + +If you encounter issues connecting to DeltaStream from dbt, verify the following: + +### Authentication Issues + +- Ensure your API token is valid and has not expired +- Verify the token has appropriate permissions for the target organization +- Check that the `organization_id` matches your DeltaStream organization diff --git a/website/docs/reference/resource-configs/deltastream-configs.md b/website/docs/reference/resource-configs/deltastream-configs.md new file mode 100644 index 00000000000..eea3d85f5ba --- /dev/null +++ b/website/docs/reference/resource-configs/deltastream-configs.md @@ -0,0 +1,738 @@ +--- +title: "DeltaStream configurations" +description: "DeltaStream Configurations - Read this in-depth guide to learn about configurations in dbt." +id: "deltastream-configs" +--- + +# DeltaStream Resource Configurations + +## Supported Materializations + +DeltaStream supports several unique materialization types that align with its streaming processing capabilities: + +### Standard Materializations + +| Materialization | Description | +|---------------------|----------------------------------------------------------------------------------------------------------| +| `ephemeral` | This materialization uses common table expressions in DeltaStream under the hood. | +| `table` | Traditional batch table materialization | +| `materialized_view` | Continuously updated view that automatically refreshes as underlying data changes | + +### Streaming Materializations + +| Materialization | Description | +|-----------------|----------------------------------------------------------------------------------------------------------| +| `stream` | Pure streaming transformation that processes data in real-time | +| `changelog` | Change data capture (CDC) stream that tracks changes in data | + +### Infrastructure Materializations + +| Materialization | Description | +|----------------------|----------------------------------------------------------------------------------------------------------| +| `store` | External system connection (Kafka, PostgreSQL, etc.) | +| `entity` | Entity definition within a store | +| `database` | Database definition | +| `compute_pool` | Compute pool definition for resource management | +| `function` | User-defined functions (UDFs) in Java | +| `function_source` | JAR file sources for UDFs | +| `descriptor_source` | Protocol buffer schema sources | +| `schema_registry` | Schema registry connections (Confluent, etc.) | + +## SQL Model Configurations + +### Table Materialization + +Creates a traditional batch table for aggregated data: + +**Project file configuration:** +```yaml +models: + : + +materialized: table +``` + +**Config block configuration:** +```sql +{{ config(materialized = "table") }} + +SELECT + date, + SUM(amount) as daily_total +FROM {{ ref('transactions') }} +GROUP BY date +``` + +### Stream Materialization + +Creates a continuous streaming transformation: + +**Project file configuration:** +```yaml +models: + : + +materialized: stream + +parameters: + topic: 'stream_topic' + value.format: 'json' + key.format: 'primitive' + key.type: 'VARCHAR' + timestamp: 'event_time' +``` + +**Config block configuration:** +```sql +{{ config( + materialized='stream', + parameters={ + 'topic': 'purchase_events', + 'value.format': 'json', + 'key.format': 'primitive', + 'key.type': 'VARCHAR', + 'timestamp': 'event_time' + } +) }} + +SELECT + event_time, + user_id, + action +FROM {{ ref('source_stream') }} +WHERE action = 'purchase' +``` + +#### Stream Configuration Options + +| Option | Description | Required? | +|----------------|-----------------------------------------------------------------------------------------------|-----------| +| `materialized` | How the model will be materialized. Must be `stream` to create a streaming model. | Required | +| `topic` | The topic name for the stream output. | Required | +| `value.format` | Format for the stream values (e.g., 'json', 'avro'). | Required | +| `key.format` | Format for the stream keys (e.g., 'primitive', 'json'). | Optional | +| `key.type` | Data type for the stream keys (e.g., 'VARCHAR', 'BIGINT'). | Optional | +| `timestamp` | Column name to use as the event timestamp. | Optional | + +### Changelog Materialization + +Captures changes in the data stream: + +**Project file configuration:** +```yaml +models: + : + +materialized: changelog + +parameters: + topic: 'changelog_topic' + value.format: 'json' + +primary_key: [column_name] +``` + +**Config block configuration:** +```sql +{{ config( + materialized='changelog', + parameters={ + 'topic': 'order_updates', + 'value.format': 'json' + }, + primary_key=['order_id'] +) }} + +SELECT + order_id, + status, + updated_at +FROM {{ ref('orders_stream') }} +``` + +#### Changelog Configuration Options + +| Option | Description | Required? | +|----------------|-----------------------------------------------------------------------------------------------|-----------| +| `materialized` | How the model will be materialized. Must be `changelog` to create a changelog model. | Required | +| `topic` | The topic name for the changelog output. | Required | +| `value.format` | Format for the changelog values (e.g., 'json', 'avro'). | Required | +| `primary_key` | List of column names that uniquely identify rows for change tracking. | Required | + +### Materialized View + +Creates a continuously updated view: + +**Config block configuration:** +```sql +{{ config(materialized='materialized_view') }} + +SELECT + product_id, + COUNT(*) as purchase_count +FROM {{ ref('purchase_events') }} +GROUP BY product_id +``` + +## YAML-Only Resource Configurations + +DeltaStream supports two types of model definitions for infrastructure components: + +1. **Managed Resources (Models)** - Automatically included in the dbt DAG +2. **Unmanaged Resources (Sources)** - Created on-demand using specific macros + +### Should You Use Managed or Unmanaged Resources? + +If you plan to be able to recreate all the infrastructure in different environments and/or use graph operators to execute only the creation of specific resources and downstream transformations, you should use managed resources. Otherwise, it might be simpler to use unmanaged resources to avoid placeholder files. + +### Managed Resources (Models) + +Managed resources are automatically included in the dbt DAG and defined as models: + +```yaml +version: 2 +models: + - name: my_kafka_store + config: + materialized: store + parameters: + type: KAFKA + access_region: "AWS us-east-1" + uris: "kafka.broker1.url:9092,kafka.broker2.url:9092" + tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt" + + - name: ps_store + config: + materialized: store + parameters: + type: POSTGRESQL + access_region: "AWS us-east-1" + uris: "postgresql://mystore.com:5432/demo" + postgres.username: "user" + postgres.password: "password" + + - name: user_events_stream + config: + materialized: stream + columns: + event_time: + type: TIMESTAMP + not_null: true + user_id: + type: VARCHAR + action: + type: VARCHAR + parameters: + topic: 'user_events' + value.format: 'json' + key.format: 'primitive' + key.type: 'VARCHAR' + timestamp: 'event_time' + + - name: order_changes + config: + materialized: changelog + columns: + order_id: + type: VARCHAR + not_null: true + status: + type: VARCHAR + updated_at: + type: TIMESTAMP + primary_key: + - order_id + parameters: + topic: 'order_updates' + value.format: 'json' + + - name: pv_kinesis + config: + materialized: entity + store: kinesis_store + parameters: + 'kinesis.shards': 3 + + - name: my_compute_pool + config: + materialized: compute_pool + parameters: + 'compute_pool.size': 'small' + 'compute_pool.timeout_min': 5 + + - name: my_function_source + config: + materialized: function_source + parameters: + file: '@/path/to/my-functions.jar' + description: 'Custom utility functions' + + - name: my_descriptor_source + config: + materialized: descriptor_source + parameters: + file: '@/path/to/schemas.desc' + description: 'Protocol buffer schemas for data structures' + + - name: my_custom_function + config: + materialized: function + parameters: + args: + - name: input_text + type: VARCHAR + returns: VARCHAR + language: JAVA + source.name: 'my_function_source' + class.name: 'com.example.TextProcessor' + + - name: my_schema_registry + config: + materialized: schema_registry + parameters: + type: "CONFLUENT" + access_region: "AWS us-east-1" + uris: "https://url.to.schema.registry.listener:8081" + 'confluent.username': 'fake_username' + 'confluent.password': 'fake_password' + 'tls.client.cert_file': '@/path/to/tls/client_cert_file' + 'tls.client.key_file': '@/path/to/tls_key' +``` + +**Note:** Due to current dbt limitations, managed YAML-only resources require a placeholder .sql file that doesn't contain a SELECT statement. For example, create `my_kafka_store.sql` with: + +```sql +-- Placeholder +``` + +### Unmanaged Resources (Sources) + +Unmanaged resources are defined as sources and created on-demand using specific macros: + +```yaml +version: 2 +sources: + - name: infrastructure + tables: + - name: my_kafka_store + config: + materialized: store + parameters: + type: KAFKA + access_region: "AWS us-east-1" + uris: "kafka.broker1.url:9092,kafka.broker2.url:9092" + tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt" + + - name: ps_store + config: + materialized: store + parameters: + type: POSTGRESQL + access_region: "AWS us-east-1" + uris: "postgresql://mystore.com:5432/demo" + postgres.username: "user" + postgres.password: "password" + + - name: user_events_stream + config: + materialized: stream + columns: + event_time: + type: TIMESTAMP + not_null: true + user_id: + type: VARCHAR + action: + type: VARCHAR + parameters: + topic: 'user_events' + value.format: 'json' + key.format: 'primitive' + key.type: 'VARCHAR' + timestamp: 'event_time' + + - name: order_changes + config: + materialized: changelog + columns: + order_id: + type: VARCHAR + not_null: true + status: + type: VARCHAR + updated_at: + type: TIMESTAMP + primary_key: + - order_id + parameters: + topic: 'order_updates' + value.format: 'json' + + - name: pv_kinesis + config: + materialized: entity + store: kinesis_store + parameters: + 'kinesis.shards': 3 + + - name: compute_pool_small + config: + materialized: compute_pool + parameters: + 'compute_pool.size': 'small' + 'compute_pool.timeout_min': 5 + + - name: my_function_source + config: + materialized: function_source + parameters: + file: '@/path/to/my-functions.jar' + description: 'Custom utility functions' + + - name: my_descriptor_source + config: + materialized: descriptor_source + parameters: + file: '@/path/to/schemas.desc' + description: 'Protocol buffer schemas for data structures' + + - name: my_custom_function + config: + materialized: function + parameters: + args: + - name: input_text + type: VARCHAR + returns: VARCHAR + language: JAVA + source.name: 'my_function_source' + class.name: 'com.example.TextProcessor' + + - name: my_schema_registry + config: + materialized: schema_registry + parameters: + type: "CONFLUENT" + access_region: "AWS us-east-1" + uris: "https://url.to.schema.registry.listener:8081" + 'confluent.username': 'fake_username' + 'confluent.password': 'fake_password' + 'tls.client.cert_file': '@/path/to/tls/client_cert_file' + 'tls.client.key_file': '@/path/to/tls_key' +``` + +To create unmanaged resources: + +```bash +# Create all sources +dbt run-operation create_sources + +# Create a specific source +dbt run-operation create_source_by_name --args '{source_name: infrastructure}' +``` + +## Store Configurations + +### Kafka Store + +```yaml +- name: my_kafka_store + config: + materialized: store + parameters: + type: KAFKA + access_region: "AWS us-east-1" + uris: "kafka.broker1.url:9092,kafka.broker2.url:9092" + tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt" +``` + +### PostgreSQL Store + +```yaml +- name: postgres_store + config: + materialized: store + parameters: + type: POSTGRESQL + access_region: "AWS us-east-1" + uris: "postgresql://mystore.com:5432/demo" + postgres.username: "user" + postgres.password: "password" +``` + +## Entity Configuration + +```yaml +- name: kinesis_entity + config: + materialized: entity + store: kinesis_store + parameters: + 'kinesis.shards': 3 +``` + +## Compute Pool Configuration + +```yaml +- name: processing_pool + config: + materialized: compute_pool + parameters: + 'compute_pool.size': 'small' + 'compute_pool.timeout_min': 5 +``` + +## Referencing Resources + +### Managed Resources + +Use the standard `ref()` function: + +```sql +SELECT * FROM {{ ref('my_kafka_stream') }} +``` + +### Unmanaged Resources + +Use the `source()` function: + +```sql +SELECT * FROM {{ source('infrastructure', 'user_events_stream') }} +``` + +## Seeds + +Load CSV data into existing DeltaStream entities using the `seed` materialization. Unlike traditional dbt seeds that create new tables, DeltaStream seeds insert data into pre-existing entities. + +### Configuration + +Seeds must be configured in YAML with the following properties: + +**Required:** + +- `entity`: The name of the target entity to insert data into + +**Optional:** + +- `store`: The name of the store containing the entity (omit if entity is not in a store) +- `with_params`: A dictionary of parameters for the WITH clause +- `quote_columns`: Control which columns get quoted. Default: `false` (no columns quoted). Can be: + - `true`: Quote all columns + - `false`: Quote no columns (default) + - `string`: If set to `'*'`, quote all columns + - `list`: List of column names to quote + +### Example Configuration + +**With Store (quoting enabled):** + +```yaml +# seeds.yml +version: 2 + +seeds: + - name: user_data_with_store_quoted + config: + entity: 'user_events' + store: 'kafka_store' + with_params: + kafka.topic.retention.ms: '86400000' + partitioned: true + quote_columns: true # Quote all columns +``` + +### Usage + +1. Place CSV files in your `seeds/` directory +2. Configure seeds in YAML with the required `entity` parameter +3. Optionally specify `store` if the entity is in a store +4. Run `dbt seed` to load the data + +:::info Important +The target entity must already exist in DeltaStream before running seeds. Seeds only insert data, they do not create entities. +::: + +## Function and Source Materializations + +DeltaStream supports user-defined functions (UDFs) and their dependencies through specialized materializations. + +### File Attachment Support + +The adapter provides seamless file attachment for function sources and descriptor sources: + +- **Standardized Interface**: Common file handling logic for both function sources and descriptor sources +- **Path Resolution**: Supports both absolute paths and relative paths (including `@` syntax for project-relative paths) +- **Automatic Validation**: Files are validated for existence and accessibility before attachment + +### Function Source + +Creates a function source from a JAR file containing Java functions: + +**Config block configuration:** + +```sql +{{ config( + materialized='function_source', + parameters={ + 'file': '@/path/to/my-functions.jar', + 'description': 'Custom utility functions' + } +) }} + +SELECT 1 as placeholder +``` + +### Descriptor Source + +Creates a descriptor source from compiled protocol buffer descriptor files: + +**Config block configuration:** + +```sql +{{ config( + materialized='descriptor_source', + parameters={ + 'file': '@/path/to/schemas.desc', + 'description': 'Protocol buffer schemas for data structures' + } +) }} + +SELECT 1 as placeholder +``` + +:::info Note +Descriptor sources require compiled `.desc` files, not raw `.proto` files. Compile your protobuf schemas using: + +```bash +protoc --descriptor_set_out=schemas/my_schemas.desc schemas/my_schemas.proto +``` + +::: + +### Function + +Creates a user-defined function that references a function source: + +**Config block configuration:** + +```sql +{{ config( + materialized='function', + parameters={ + 'args': [ + {'name': 'input_text', 'type': 'VARCHAR'} + ], + 'returns': 'VARCHAR', + 'language': 'JAVA', + 'source.name': 'my_function_source', + 'class.name': 'com.example.TextProcessor' + } +) }} + +SELECT 1 as placeholder +``` + +### Schema Registry + +Creates a schema registry connection: + +**Config block configuration:** + +```sql +{{ config( + materialized='schema_registry', + parameters={ + 'type': 'CONFLUENT', + 'access_region': 'AWS us-east-1', + 'uris': 'https://url.to.schema.registry.listener:8081', + 'confluent.username': 'fake_username', + 'confluent.password': 'fake_password', + 'tls.client.cert_file': '@/path/to/tls/client_cert_file', + 'tls.client.key_file': '@/path/to/tls_key' + } +) }} + +SELECT 1 as placeholder +``` + +## Query Management Macros + +DeltaStream dbt adapter provides macros to help you manage and terminate running queries directly from dbt. + +### List All Queries + +The `list_all_queries` macro displays all queries currently known to DeltaStream, including their state, owner, and SQL: + +```bash +dbt run-operation list_all_queries +``` + +### Describe Query + +Use the `describe_query` macro to check the logs and details of a specific query: + +```bash +dbt run-operation describe_query --args '{query_id: ""}' +``` + +### Terminate a Specific Query + +Use the `terminate_query` macro to terminate a query by its ID: + +```bash +dbt run-operation terminate_query --args '{query_id: ""}' +``` + +### Terminate All Running Queries + +Use the `terminate_all_queries` macro to terminate all currently running queries: + +```bash +dbt run-operation terminate_all_queries +``` + +### Restart a Query + +Use the `restart_query` macro to restart a failed query by its ID: + +```bash +dbt run-operation restart_query --args '{query_id: ""}' +``` + +## Application Macro + +### Execute Multiple Statements as a Unit + +The `application` macro allows you to execute multiple DeltaStream SQL statements as a single unit of work with all-or-nothing semantics: + +```bash +dbt run-operation application --args '{ + application_name: "my_data_pipeline", + statements: [ + "USE DATABASE my_db", + "CREATE STREAM user_events WITH (topic='"'"'events'"'"', value.format='"'"'json'"'"')", + "CREATE MATERIALIZED VIEW user_counts AS SELECT user_id, COUNT(*) FROM user_events GROUP BY user_id" + ] +}' +``` + +## Troubleshooting + +### Function Source Readiness + +If you encounter "function source is not ready" errors when creating functions: + +1. **Automatic Retry**: The adapter automatically retries function creation with exponential backoff +2. **Timeout Configuration**: The default 30-second timeout can be extended if needed for large JAR files +3. **Dependency Order**: Ensure function sources are created before dependent functions +4. **Manual Retry**: If automatic retry fails, wait a few minutes and retry the operation + +### File Attachment Issues + +For problems with file attachments in function sources and descriptor sources: + +1. **File Paths**: Use `@/path/to/file` syntax for project-relative paths +2. **File Types**: + - Function sources require `.jar` files + - Descriptor sources require compiled `.desc` files (not `.proto`) +3. **File Validation**: The adapter validates file existence before attempting attachment +4. **Compilation**: For descriptor sources, ensure protobuf files are compiled: + + ```bash + protoc --descriptor_set_out=output.desc input.proto + ``` + diff --git a/website/sidebars.js b/website/sidebars.js index f6b7ba18a05..3cf2603b2c7 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -271,6 +271,7 @@ const sidebarSettings = { "docs/core/connect-data-platform/spark-setup", "docs/core/connect-data-platform/bigquery-setup", "docs/core/connect-data-platform/databricks-setup", + "docs/core/connect-data-platform/deltastream-setup", "docs/core/connect-data-platform/fabric-setup", "docs/core/connect-data-platform/fabricspark-setup", "docs/core/connect-data-platform/postgres-setup", @@ -1114,6 +1115,7 @@ const sidebarSettings = { "reference/resource-configs/bigquery-configs", "reference/resource-configs/clickhouse-configs", "reference/resource-configs/databricks-configs", + "reference/resource-configs/deltastream-configs", "reference/resource-configs/doris-configs", "reference/resource-configs/duckdb-configs", "reference/resource-configs/fabric-configs",