Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified en_US/data-integration/assets/alloydb_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/clickhouse_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/cockroachdb_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/emqx-integration-s3-tables.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/lindorm_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/redshift_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified en_US/data-integration/assets/snowflake-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
95 changes: 87 additions & 8 deletions en_US/data-integration/azure-blob-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Azure
1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information.
2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine.
3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information.
4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration.
4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines, or Parquet file, depending on the message content and the Sink configuration.

After events and message data are written to Storage Container, you can connect to Azure Blob Storage to read the data for flexible application development, such as:

Expand Down Expand Up @@ -102,10 +102,10 @@ This section demonstrates how to create a rule in EMQX to process messages from

7. Set the **Container** by entering `iot-data`.

9. Select the **Upload Method**. The differences between the two methods are as follows:
8. Select the **Upload Method**. The differences between the two methods are as follows:

- **Direct Upload**: Each time the rule is triggered, data is uploaded directly to Azure Storage according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files.
- **Aggregated Upload**: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to Azure Storage, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency.
- **Direct Upload**: Each time the rule is triggered, data is uploaded directly to Azure Blob Storage according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files.
- **Aggregated Upload**: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to Azure Blob Storage, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency.

The configuration parameters differ for each method. Please configure according to the selected method:

Expand Down Expand Up @@ -137,11 +137,17 @@ This section demonstrates how to create a rule in EMQX to process messages from

Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the Blob Name as path suffixes to avoid duplication. All other placeholders are considered invalid.

- **Aggregation Type**: Currently, CSV and JSON Lines are supported.
- `CSV`: Data will be written to Azure Storage in comma-separated CSV format.
- `JSON Lines`: Data will be written to Azure Storage in [JSON Lines](https://jsonlines.org/) format.
- **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in Azure Storage. Supported values:

- **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.
- `CSV`: Data will be written to Azure Blob Storage in comma-separated CSV format.

- `JSON Lines`: Data will be written to Azure Blob Storage in [JSON Lines](https://jsonlines.org/) format.

- `parquet`: Data will be written to Azure Blob Storage in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets.

> For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options).

- **Column Order** (for `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.

- **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval.

Expand All @@ -163,6 +169,79 @@ You have now successfully created the rule. You can see the newly created rule o

You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into Azure Storage container after being parsed by the rule `my_rule`.

### Parquet Format Options

When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads.

This section describes all configurable options for the Parquet output format.

#### Parquet Schema (Avro)

This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data.

You can choose one of the following options:

- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro.md) managed in EMQX [Schema Registry](./schema-registry.md).

When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization.

::: tip

Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems.

:::

- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field.

Example:

```json
{
"type": "record",
"name": "MessageRecord",
"fields": [
{"name": "clientid", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "payload", "type": "string"}
]
}
```

::: tip

Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet.

:::

#### Parquet Default Compression

This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data.

Supported values:

| Value | Description |
| ------------------ | ------------------------------------------------------------ |
| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. |
| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. |
| `None` | Disables compression. Suitable only for debugging or when compression is not desired. |

#### Parquet Max Row Group Bytes

This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one.

- **Default value:** `128 MB`

Guidelines:

- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark.
- **Reduce the value** to limit memory usage during writing or when using small datasets.

::: tip

Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance.

:::

## Test the Rule

This section shows how to test the rule configured with the direct upload method.
Expand Down
88 changes: 82 additions & 6 deletions en_US/data-integration/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Amazo
1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information.
2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine.
3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information.
4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration.
4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines or Parquet file, depending on the message content and the Sink configuration.

After events and message data are written to Amazon S3, you can connect to Amazon S3 to read the data for flexible application development, such as:

Expand Down Expand Up @@ -212,11 +212,16 @@ This section demonstrates how to create a rule in EMQX to process messages from

Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the S3 object key as path suffixes to avoid duplication. All other placeholders are considered invalid.

- **Aggregation Type**: Currently, CSV and JSON Lines are supported.
- **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in S3. Supported values:
- `CSV`: Data will be written to S3 in comma-separated CSV format.

- `JSON Lines`: Data will be written to S3 in [JSON Lines](https://jsonlines.org/) format.

- **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.

- `parquet`: Data will be written to S3 in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets.

> For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options).

- **Column Order** (for `CSV` ): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.

- **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval.

Expand All @@ -226,8 +231,6 @@ This section demonstrates how to create a rule in EMQX to process messages from

::::

10. Set the **Object Content**. By default, it is a JSON text format containing all fields. It supports `${var}` format placeholders. Here, enter `${payload}` to indicate using the message body as the object content. In this case, the object's storage format depends on the message body's format, supporting compressed packages, images, or other binary formats.

11. **Fallback Actions (Optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details.

12. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings).
Expand All @@ -240,6 +243,79 @@ You have now successfully created the rule. You can see the newly created rule o

You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into S3 after being parsed by the rule `my_rule`.

### Parquet Format Options

When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads.

This section describes all configurable options for the Parquet output format.

#### Parquet Schema (Avro)

This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data.

You can choose one of the following options:

- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro.md) managed in EMQX [Schema Registry](./schema-registry.md).

When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization.

::: tip

Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems.

:::

- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field.

Example:

```json
{
"type": "record",
"name": "MessageRecord",
"fields": [
{"name": "clientid", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "payload", "type": "string"}
]
}
```

::: tip

Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet.

:::

#### Parquet Default Compression

This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data.

Supported values:

| Value | Description |
| ------------------ | ------------------------------------------------------------ |
| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. |
| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. |
| `None` | Disables compression. Suitable only for debugging or when compression is not desired. |

#### Parquet Max Row Group Bytes

This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one.

- **Default value:** `128 MB`

Guidelines:

- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark.
- **Reduce the value** to limit memory usage during writing or when using small datasets.

::: tip

Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance.

:::

## Test the Rule

This section shows how to test the rule configured with the direct upload method.
Expand Down
Binary file modified zh_CN/data-integration/assets/alloydb_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/clickhouse_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/cockroachdb_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/emqx-integration-s3-tables.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/lindorm_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/redshift_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified zh_CN/data-integration/assets/snowflake-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading