diff --git a/en_US/data-integration/assets/alloydb_architecture.png b/en_US/data-integration/assets/alloydb_architecture.png index 004f7f442..aa6320de4 100644 Binary files a/en_US/data-integration/assets/alloydb_architecture.png and b/en_US/data-integration/assets/alloydb_architecture.png differ diff --git a/en_US/data-integration/assets/azure-blob-storage-architecture.png b/en_US/data-integration/assets/azure-blob-storage-architecture.png index 8fbf7a45c..df2d2b405 100644 Binary files a/en_US/data-integration/assets/azure-blob-storage-architecture.png and b/en_US/data-integration/assets/azure-blob-storage-architecture.png differ diff --git a/en_US/data-integration/assets/clickhouse_architecture.png b/en_US/data-integration/assets/clickhouse_architecture.png index 79ed2413c..ba4a24cf6 100644 Binary files a/en_US/data-integration/assets/clickhouse_architecture.png and b/en_US/data-integration/assets/clickhouse_architecture.png differ diff --git a/en_US/data-integration/assets/cockroachdb_architecture.png b/en_US/data-integration/assets/cockroachdb_architecture.png index aece3c78f..523210c2d 100644 Binary files a/en_US/data-integration/assets/cockroachdb_architecture.png and b/en_US/data-integration/assets/cockroachdb_architecture.png differ diff --git a/en_US/data-integration/assets/emqx-integration-s3-tables.png b/en_US/data-integration/assets/emqx-integration-s3-tables.png index 7c6d85e24..8e3666bcd 100644 Binary files a/en_US/data-integration/assets/emqx-integration-s3-tables.png and b/en_US/data-integration/assets/emqx-integration-s3-tables.png differ diff --git a/en_US/data-integration/assets/lindorm_architecture.png b/en_US/data-integration/assets/lindorm_architecture.png index 5d92ccba0..e07e3840c 100644 Binary files a/en_US/data-integration/assets/lindorm_architecture.png and b/en_US/data-integration/assets/lindorm_architecture.png differ diff --git a/en_US/data-integration/assets/mongodb_architecture.png b/en_US/data-integration/assets/mongodb_architecture.png new file mode 100644 index 000000000..c96adac60 Binary files /dev/null and b/en_US/data-integration/assets/mongodb_architecture.png differ diff --git a/en_US/data-integration/assets/redshift_architecture.png b/en_US/data-integration/assets/redshift_architecture.png index 5e0386bb5..98c2f6411 100644 Binary files a/en_US/data-integration/assets/redshift_architecture.png and b/en_US/data-integration/assets/redshift_architecture.png differ diff --git a/en_US/data-integration/assets/snowflake-architecture.png b/en_US/data-integration/assets/snowflake-architecture.png index e05116df6..e739f4f8b 100644 Binary files a/en_US/data-integration/assets/snowflake-architecture.png and b/en_US/data-integration/assets/snowflake-architecture.png differ diff --git a/en_US/data-integration/azure-blob-storage.md b/en_US/data-integration/azure-blob-storage.md index c43afdfe1..ca78bdaea 100644 --- a/en_US/data-integration/azure-blob-storage.md +++ b/en_US/data-integration/azure-blob-storage.md @@ -14,7 +14,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Azure 1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information. 2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine. 3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information. -4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration. +4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines, or Parquet file, depending on the message content and the Sink configuration. After events and message data are written to Storage Container, you can connect to Azure Blob Storage to read the data for flexible application development, such as: @@ -102,10 +102,10 @@ This section demonstrates how to create a rule in EMQX to process messages from 7. Set the **Container** by entering `iot-data`. -9. Select the **Upload Method**. The differences between the two methods are as follows: +8. Select the **Upload Method**. The differences between the two methods are as follows: - - **Direct Upload**: Each time the rule is triggered, data is uploaded directly to Azure Storage according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files. - - **Aggregated Upload**: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to Azure Storage, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency. + - **Direct Upload**: Each time the rule is triggered, data is uploaded directly to Azure Blob Storage according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files. + - **Aggregated Upload**: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to Azure Blob Storage, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency. The configuration parameters differ for each method. Please configure according to the selected method: @@ -137,11 +137,17 @@ This section demonstrates how to create a rule in EMQX to process messages from Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the Blob Name as path suffixes to avoid duplication. All other placeholders are considered invalid. - - **Aggregation Type**: Currently, CSV and JSON Lines are supported. - - `CSV`: Data will be written to Azure Storage in comma-separated CSV format. - - `JSON Lines`: Data will be written to Azure Storage in [JSON Lines](https://jsonlines.org/) format. + - **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in Azure Storage. Supported values: - - **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns. + - `CSV`: Data will be written to Azure Blob Storage in comma-separated CSV format. + + - `JSON Lines`: Data will be written to Azure Blob Storage in [JSON Lines](https://jsonlines.org/) format. + + - `parquet`: Data will be written to Azure Blob Storage in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets. + + > For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options). + + - **Column Order** (for `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns. - **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. @@ -163,6 +169,79 @@ You have now successfully created the rule. You can see the newly created rule o You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into Azure Storage container after being parsed by the rule `my_rule`. +### Parquet Format Options + +When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads. + +This section describes all configurable options for the Parquet output format. + +#### Parquet Schema (Avro) + +This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data. + +You can choose one of the following options: + +- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro.md) managed in EMQX [Schema Registry](./schema-registry.md). + + When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization. + + ::: tip + + Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems. + + ::: + +- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field. + + Example: + + ```json + { + "type": "record", + "name": "MessageRecord", + "fields": [ + {"name": "clientid", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "payload", "type": "string"} + ] + } + ``` + +::: tip + +Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet. + +::: + +#### Parquet Default Compression + +This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data. + +Supported values: + +| Value | Description | +| ------------------ | ------------------------------------------------------------ | +| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. | +| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. | +| `None` | Disables compression. Suitable only for debugging or when compression is not desired. | + +#### Parquet Max Row Group Bytes + +This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one. + +- **Default value:** `128 MB` + +Guidelines: + +- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark. +- **Reduce the value** to limit memory usage during writing or when using small datasets. + +::: tip + +Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance. + +::: + ## Test the Rule This section shows how to test the rule configured with the direct upload method. diff --git a/en_US/data-integration/s3.md b/en_US/data-integration/s3.md index 9e8ffe030..40ff5a8fe 100644 --- a/en_US/data-integration/s3.md +++ b/en_US/data-integration/s3.md @@ -25,7 +25,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Amazo 1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information. 2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine. 3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information. -4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration. +4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines or Parquet file, depending on the message content and the Sink configuration. After events and message data are written to Amazon S3, you can connect to Amazon S3 to read the data for flexible application development, such as: @@ -212,11 +212,16 @@ This section demonstrates how to create a rule in EMQX to process messages from Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the S3 object key as path suffixes to avoid duplication. All other placeholders are considered invalid. - - **Aggregation Type**: Currently, CSV and JSON Lines are supported. + - **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in S3. Supported values: - `CSV`: Data will be written to S3 in comma-separated CSV format. + - `JSON Lines`: Data will be written to S3 in [JSON Lines](https://jsonlines.org/) format. - - - **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns. + + - `parquet`: Data will be written to S3 in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets. + + > For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options). + + - **Column Order** (for `CSV` ): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns. - **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. @@ -226,8 +231,6 @@ This section demonstrates how to create a rule in EMQX to process messages from :::: -10. Set the **Object Content**. By default, it is a JSON text format containing all fields. It supports `${var}` format placeholders. Here, enter `${payload}` to indicate using the message body as the object content. In this case, the object's storage format depends on the message body's format, supporting compressed packages, images, or other binary formats. - 11. **Fallback Actions (Optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. 12. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). @@ -240,6 +243,79 @@ You have now successfully created the rule. You can see the newly created rule o You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into S3 after being parsed by the rule `my_rule`. +### Parquet Format Options + +When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads. + +This section describes all configurable options for the Parquet output format. + +#### Parquet Schema (Avro) + +This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data. + +You can choose one of the following options: + +- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro.md) managed in EMQX [Schema Registry](./schema-registry.md). + + When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization. + + ::: tip + + Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems. + + ::: + +- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field. + + Example: + + ```json + { + "type": "record", + "name": "MessageRecord", + "fields": [ + {"name": "clientid", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "payload", "type": "string"} + ] + } + ``` + +::: tip + +Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet. + +::: + +#### Parquet Default Compression + +This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data. + +Supported values: + +| Value | Description | +| ------------------ | ------------------------------------------------------------ | +| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. | +| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. | +| `None` | Disables compression. Suitable only for debugging or when compression is not desired. | + +#### Parquet Max Row Group Bytes + +This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one. + +- **Default value:** `128 MB` + +Guidelines: + +- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark. +- **Reduce the value** to limit memory usage during writing or when using small datasets. + +::: tip + +Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance. + +::: + ## Test the Rule This section shows how to test the rule configured with the direct upload method. diff --git a/zh_CN/data-integration/assets/alloydb_architecture.png b/zh_CN/data-integration/assets/alloydb_architecture.png index 004f7f442..aa6320de4 100644 Binary files a/zh_CN/data-integration/assets/alloydb_architecture.png and b/zh_CN/data-integration/assets/alloydb_architecture.png differ diff --git a/zh_CN/data-integration/assets/azure-blob-storage-architecture.png b/zh_CN/data-integration/assets/azure-blob-storage-architecture.png index 8fbf7a45c..df2d2b405 100644 Binary files a/zh_CN/data-integration/assets/azure-blob-storage-architecture.png and b/zh_CN/data-integration/assets/azure-blob-storage-architecture.png differ diff --git a/zh_CN/data-integration/assets/clickhouse_architecture.png b/zh_CN/data-integration/assets/clickhouse_architecture.png index 79ed2413c..ba4a24cf6 100644 Binary files a/zh_CN/data-integration/assets/clickhouse_architecture.png and b/zh_CN/data-integration/assets/clickhouse_architecture.png differ diff --git a/zh_CN/data-integration/assets/cockroachdb_architecture.png b/zh_CN/data-integration/assets/cockroachdb_architecture.png index aece3c78f..523210c2d 100644 Binary files a/zh_CN/data-integration/assets/cockroachdb_architecture.png and b/zh_CN/data-integration/assets/cockroachdb_architecture.png differ diff --git a/zh_CN/data-integration/assets/emqx-integration-s3-tables.png b/zh_CN/data-integration/assets/emqx-integration-s3-tables.png index 7c6d85e24..8e3666bcd 100644 Binary files a/zh_CN/data-integration/assets/emqx-integration-s3-tables.png and b/zh_CN/data-integration/assets/emqx-integration-s3-tables.png differ diff --git a/zh_CN/data-integration/assets/lindorm_architecture.png b/zh_CN/data-integration/assets/lindorm_architecture.png index 5d92ccba0..e07e3840c 100644 Binary files a/zh_CN/data-integration/assets/lindorm_architecture.png and b/zh_CN/data-integration/assets/lindorm_architecture.png differ diff --git a/zh_CN/data-integration/assets/mongodb_architecture.png b/zh_CN/data-integration/assets/mongodb_architecture.png new file mode 100644 index 000000000..c96adac60 Binary files /dev/null and b/zh_CN/data-integration/assets/mongodb_architecture.png differ diff --git a/zh_CN/data-integration/assets/redshift_architecture.png b/zh_CN/data-integration/assets/redshift_architecture.png index 5e0386bb5..98c2f6411 100644 Binary files a/zh_CN/data-integration/assets/redshift_architecture.png and b/zh_CN/data-integration/assets/redshift_architecture.png differ diff --git a/zh_CN/data-integration/assets/snowflake-architecture.png b/zh_CN/data-integration/assets/snowflake-architecture.png index e05116df6..e739f4f8b 100644 Binary files a/zh_CN/data-integration/assets/snowflake-architecture.png and b/zh_CN/data-integration/assets/snowflake-architecture.png differ diff --git a/zh_CN/data-integration/azure-blob-storage.md b/zh_CN/data-integration/azure-blob-storage.md index 873b7ed8f..7659d85f0 100644 --- a/zh_CN/data-integration/azure-blob-storage.md +++ b/zh_CN/data-integration/azure-blob-storage.md @@ -15,7 +15,7 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure 1. **设备连接到 EMQX**:IoT 设备通过 MQTT 协议成功连接后触发上线事件。该事件包括设备 ID、来源 IP 地址以及其他属性信息。 2. **设备消息发布和接收**:设备通过特定主题发布遥测和状态数据。EMQX 接收这些消息,并在规则引擎中进行匹配。 3. **规则引擎处理消息**:内置规则引擎根据主题匹配处理来自特定来源的消息和事件。它匹配相应的规则,并处理消息和事件,如数据格式转换、过滤特定信息或用上下文信息丰富消息。 -4. **写入 Azure Blob Storage**:规则触发一个动作,将消息写入存储容器。使用 Azure Blob Storage Sink,用户可以从处理结果中提取数据并发送到 Blob Storage。根据消息内容和 Sink 中的配置,消息可以以文本或二进制格式存储,或将多行结构化数据汇总到单个 CSV 文件或 JSON Lines 文件中。 +4. **写入 Azure Blob Storage**:规则触发一个动作,将消息写入存储容器。使用 Azure Blob Storage Sink,用户可以从处理结果中提取数据并发送到 Blob Storage。根据消息内容和 Sink 中的配置,消息可以以文本或二进制格式存储,或将多行结构化数据汇总为单个 CSV 文件、JSON Lines 文件或 Parquet 格式文件。 事件和消息数据写入存储容器后,可以连接到 Azure Blob Storage 读取数据,以实现灵活的应用开发,例如: @@ -60,7 +60,7 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure 在添加 Azure Blob Storage 数据 Sink 之前,您需要创建相应的连接器。 -1. 转到 Dashboard **集成** -> **连接器** 页面。 +1. 转到 Dashboard **集成** -> **连接器**页面。 2. 点击右上角的**创建**按钮。 3. 选择 **Azure Blob Storage** 作为连接器类型,然后点击**下一步**。 4. 输入连接器名称,名称应为大小写字母和数字的组合。在这里,输入 `my-azure`。 @@ -74,7 +74,7 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure ## 创建 Azure Blob Storage Sink 规则 -本节演示如何在 EMQX 中创建规则,以处理来自源 MQTT 主题 `t/#` 的消息,并通过配置的 Sink 将处理结果写入 Azure Storage 中的 `iot-data` 容器。 +本节演示如何在 EMQX 中创建规则,以处理来自源 MQTT 主题 `t/#` 的消息,并通过配置的 Sink 将处理结果写入 Azure Blob Storage 中的 `iot-data` 容器。 1. 转到 Dashboard **集成** -> **规则**页面。 @@ -105,8 +105,8 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure 8. 选择 **上传方式**。两种方式的区别如下: - - **直接上传**:每次触发规则时,数据会根据预设的对象键和值直接上传到 Azure Storage。这种方式适合存储二进制或大型文本数据,但可能会生成大量文件。 - - **聚合上传**:此方式将多个规则触发结果打包到一个文件(如 CSV 文件)中,并上传到 Azure Storage,适合存储结构化数据。它可以减少文件数量并提高写入效率。 + - **直接上传**:每次触发规则时,数据会根据预设的对象键和值直接上传到 Azure Blog Storage。这种方式适合存储二进制或大型文本数据,但可能会生成大量文件。 + - **聚合上传**:此方式将多个规则触发结果打包到一个文件(如 CSV 文件)中,并上传到 Azure Blob Storage,适合存储结构化数据。它可以减少文件数量并提高写入效率。 每种方式的配置参数不同。请根据选择的方式进行配置: @@ -138,13 +138,17 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure 请注意,如果模板中未使用所有标记为必需的占位符,这些占位符将自动添加到 Blob 名称作为路径后缀,以避免重复。所有其他占位符被视为无效。 - - **增强类型**:目前仅支持 CSV 和 JSON Lines。 + - **聚合上传文件格式**:定义用于在 Azure Storage 中存储批量 MQTT 消息的数据文件格式。支持以下取值: - - `CSV`:数据将以逗号分隔的 CSV 格式写入 Azure Storage。 + - `CSV`:数据将以逗号分隔的 CSV 格式写入 Azure Blob Storage。 - - `JSON Lines`:数据将以 [JSON Lines](https://jsonlines.org/) 格式写入 Azure Storage。 + - `JSON Lines`:数据将以 [JSON Lines](https://jsonlines.org/) 格式写入 Azure Blob Storage。 - - **列排序**(仅用于增强类型为 `CSV` 时):通过下拉选择调整规则结果列的顺序。生成的 CSV 文件将首先按所选列排序,未选择的列按字母顺序排列在所选列之后。 + - `Parquet`: 数据将以 [Apache Parquet](https://parquet.apache.org/) 格式写入 Azure Blob Storage。该格式是一种列式存储格式,专为大规模数据集的分析型查询进行优化。 + + > 如需了解详细的配置选项(包括 **Schema 定义**、**压缩方式**和**行组大小设置**等),请参阅 [Parquet 格式选项](#parquet-格式选项)。 + + - **列排序**(当聚合上传文件格式为 `CSV` 时):通过下拉选择调整规则结果列的顺序。生成的 CSV 文件将首先按所选列排序,未选择的列按字母顺序排列在所选列之后。 - **最大记录数**:当达到最大记录数时,将完成单个文件的聚合并上传,重置时间间隔。 @@ -166,6 +170,77 @@ EMQX 利用规则引擎和数据接收器将设备事件和数据转发到 Azure 您还可以点击**集成** -> **Flow 设计器**查看拓扑。拓扑图形化地展示了主题 `t/#` 下的消息如何在被规则 `my_rule` 解析后写入 Azure Storage 容器中。 +## Parquet 格式选项 + +当**聚合上传文件格式** 设置为 `parquet` 时,EMQX 会将聚合后的规则处理结果以 Apache Parquet 格式存储到目标存储中。Parquet 是一种列式、支持压缩的文件格式,针对大规模分析型工作负载进行了优化。 + +本节介绍 Parquet 输出格式的可配置参数。 + +### Parquet Schema (Avro) + +该选项定义 MQTT 消息字段与 Parquet 文件列之间的映射关系。EMQX 使用 Apache Avro 的 Schema 规范描述 Parquet 数据结构。 + +可选择以下两种配置方式: + +- **Schema Registry 中已添加的 Avro Schema**: 使用已在 EMQX [Schema Registry](./schema-registry.md) 中注册的 [Avro Schema](./schema-registry-example-avro.md)。选择此项时,需要在 **Schema 名称** 字段中指定已注册的 Schema 名称。EMQX 会在写入 Parquet 文件时自动从注册表中加载对应的 Schema。 + + ::: tip + + 如果你希望集中管理数据结构并保持多系统间 Schema 的一致性,建议使用此方式。 + + ::: + +- **Avro Schema 定义**:在 EMQX 中直接定义 Avro Schema。在 **Schema 定义**字段中输入以 JSON 格式定义的 Avro Schema。 + + **示例:** + + ```json + { + "type": "record", + "name": "MessageRecord", + "fields": [ + {"name": "clientid", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "payload", "type": "string"} + ] + } + ``` + + ::: tip + + 请确保字段名称与规则 SQL 输出字段一致,否则可能导致写入 Parquet 时序列化失败。 + + ::: + +### Parquet 默认压缩算法 + +该选项指定用于压缩 Parquet 行组中数据页的默认压缩算法。压缩有助于减少存储空间,并在执行分析查询时提升 I/O 效率。 + +支持的取值: + +| 取值 | 说明 | +| ---------------- | ------------------------------------------------------ | +| `snappy`(默认) | 压缩速度快、解压高效、压缩率适中。推荐大多数场景使用。 | +| `zstd` | 压缩率更高但 CPU 开销略大,适合大规模或长期存储场景。 | +| `None` | 不使用压缩,仅用于调试或对压缩无需求的情况。 | + +### Parquet 最大行组大小 + +该选项指定每个 Parquet 行组的最大缓冲大小(单位:MB)。行组是 Parquet 文件中读写数据的基本单位,当行组大小超过此值时,EMQX 将刷新当前行组并开始新的行组。 + +- **默认值:** `128 MB` + +**使用建议:** + +- 增大该值可提高分析查询性能; +- 减小该值可降低写入时的内存占用,适合小规模数据。 + +::: tip + +Parquet 读取器以行组为单位读取数据。较大的行组可减少元数据开销,从而提升分析查询性能。 + +::: + ## 测试规则 本节展示如何测试配置了直接上传方式的规则。 diff --git a/zh_CN/data-integration/data-bridge-mongodb.md b/zh_CN/data-integration/data-bridge-mongodb.md index 67cf98788..5cae9b241 100644 --- a/zh_CN/data-integration/data-bridge-mongodb.md +++ b/zh_CN/data-integration/data-bridge-mongodb.md @@ -10,7 +10,7 @@ MongoDB 数据集成是 EMQX 中的开箱即用功能,旨在弥合基于 MQTT 下图展示了 EMQX 与 MongoDB 之间数据集成的典型架构。 -mongdb_bridge_architecture +mongodb_architecture 将 MQTT 数据写入 MongoDB 的过程如下: diff --git a/zh_CN/data-integration/s3.md b/zh_CN/data-integration/s3.md index 174511fc2..4283ee602 100644 --- a/zh_CN/data-integration/s3.md +++ b/zh_CN/data-integration/s3.md @@ -24,7 +24,7 @@ EMQX 通过规则引擎与 Sink 将设备事件和数据转发至 Amazon S3, 1. **设备连接到 EMQX**:物联网设备通过 MQTT 协议连接成功后将触发上线事件,事件包含设备 ID、来源 IP 地址以及其他属性等信息。 2. **设备消息发布和接收**:设备通过特定的主题发布遥测和状态数据,EMQX 接收到消息后将在规则引擎中进行比对。 3. **规则引擎处理消息**:通过内置的规则引擎,可以根据主题匹配处理特定来源的消息和事件。规则引擎会匹配对应的规则,并对消息和事件进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。 -4. **写入到 Amazon S3**:规则触发后,消息会被写入到 Amazon S3。通过使用 Amazon S3 Sink,用户可以从处理结果中提取数据并发送到 S3。消息可以以文本、二进制格式存储,或者将多行结构化数据聚合成一个 CSV 文件或 JSON Lines 文件,具体取决于消息内容和 Sink 的配置。 +4. **写入到 Amazon S3**:规则触发后,消息会被写入到 Amazon S3。通过使用 Amazon S3 Sink,用户可以从处理结果中提取数据并发送到 S3。消息可以以文本、二进制格式存储,或者将多行结构化数据聚合成一个 CSV 文件、 JSON Lines 文件或 Parquet 格式文件,具体取决于消息内容和 Sink 的配置。 事件和消息数据写入到 Amazon S3 后,您可以连接到 Amazon S3 读取数据,进行灵活的应用开发,例如: @@ -211,13 +211,17 @@ EMQX 支持 Amazon S3 以及兼容 S3 的存储服务,您可以使用 AWS 云 请注意,如果模板中没有使用所有标记为必需的占位符,这些占位符将作为路径后缀自动添加到 S3 对象键中,以避免重复。所有其他占位符均视为无效。 - - **聚合方式**:目前仅支持 CSV 和 JSON Lines。 + - **聚合上传文件格式**:定义用于在 S3 中存储批量 MQTT 消息的数据文件格式。支持以下取值: - `CSV`:数据将以逗号分隔的 CSV 格式写入到 S3。 - `JSON Lines`:数据将以 [JSON Lines](https://jsonlines.org/) 格式写入到 S3。 - - **列排序**(仅用于聚合方式为 `CSV` 时):通过下拉选择调整规则结果列的顺序。生成的 CSV 文件将首先按所选列排序,未选中的列将按字典顺序排在所选列之后。 + - `Parquet`: 数据将以 [Apache Parquet](https://parquet.apache.org/) 格式写入 S3。该格式是一种列式存储格式,专为大规模数据集的分析型查询进行优化。 + + > 如需了解详细的配置选项(包括 **Schema 定义**、**压缩方式**和**行组大小设置**等),请参阅 [Parquet 格式选项](#parquet-格式选项)。 + + - **列排序**(当聚合上传文件格式为 `CSV` 时):通过下拉选择调整规则结果列的顺序。生成的 CSV 文件将首先按所选列排序,未选中的列将按字典顺序排在所选列之后。 - **最大记录数**:达到最大记录数时将完成单个文件的聚合进行上传,并重置时间间隔。 @@ -238,6 +242,77 @@ EMQX 支持 Amazon S3 以及兼容 S3 的存储服务,您可以使用 AWS 云 您也可以点击 **集成** -> **Flow 设计器**查看拓扑,通过拓扑可以直观的看到,主题 `t/#` 下的消息在经过规则 `my_rule` 解析后被写入到 S3 中。 +## Parquet 格式选项 + +当**聚合上传文件格式** 设置为 `parquet` 时,EMQX 会将聚合后的规则处理结果以 Apache Parquet 格式存储到目标存储中。Parquet 是一种列式、支持压缩的文件格式,针对大规模分析型工作负载进行了优化。 + +本节介绍 Parquet 输出格式的可配置参数。 + +### Parquet Schema (Avro) + +该选项定义 MQTT 消息字段与 Parquet 文件列之间的映射关系。EMQX 使用 Apache Avro 的 Schema 规范描述 Parquet 数据结构。 + +可选择以下两种配置方式: + +- **Schema Registry 中已添加的 Avro Schema**: 使用已在 EMQX [Schema Registry](./schema-registry.md) 中注册的 [Avro Schema](./schema-registry-example-avro.md)。选择此项时,需要在 **Schema 名称** 字段中指定已注册的 Schema 名称。EMQX 会在写入 Parquet 文件时自动从注册表中加载对应的 Schema。 + + ::: tip + + 如果你希望集中管理数据结构并保持多系统间 Schema 的一致性,建议使用此方式。 + + ::: + +- **Avro Schema 定义**:在 EMQX 中直接定义 Avro Schema。在 **Schema 定义**字段中输入以 JSON 格式定义的 Avro Schema。 + + **示例:** + + ```json + { + "type": "record", + "name": "MessageRecord", + "fields": [ + {"name": "clientid", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "payload", "type": "string"} + ] + } + ``` + + ::: tip + + 请确保字段名称与规则 SQL 输出字段一致,否则可能导致写入 Parquet 时序列化失败。 + + ::: + +### Parquet 默认压缩算法 + +该选项指定用于压缩 Parquet 行组中数据页的默认压缩算法。压缩有助于减少存储空间,并在执行分析查询时提升 I/O 效率。 + +支持的取值: + +| 取值 | 说明 | +| ---------------- | ------------------------------------------------------ | +| `snappy`(默认) | 压缩速度快、解压高效、压缩率适中。推荐大多数场景使用。 | +| `zstd` | 压缩率更高但 CPU 开销略大,适合大规模或长期存储场景。 | +| `None` | 不使用压缩,仅用于调试或对压缩无需求的情况。 | + +### Parquet 最大行组大小 + +该选项指定每个 Parquet 行组的最大缓冲大小(单位:MB)。行组是 Parquet 文件中读写数据的基本单位,当行组大小超过此值时,EMQX 将刷新当前行组并开始新的行组。 + +- **默认值:** `128 MB` + +**使用建议:** + +- 增大该值可提高分析查询性能; +- 减小该值可降低写入时的内存占用,适合小规模数据。 + +::: tip + +Parquet 读取器以行组为单位读取数据。较大的行组可减少元数据开销,从而提升分析查询性能。 + +::: + ## 测试规则 此处以直接上传为例进行测试。使用 MQTTX 向 `t/1` 主题发布消息: