Skip to content

Commit 588b5fd

Browse files
committed
update(S3, Azure Blog Storage): Support Parquet format
https://emqx.atlassian.net/browse/EMQX-14831
1 parent 9a7867f commit 588b5fd

File tree

4 files changed

+323
-17
lines changed

4 files changed

+323
-17
lines changed

en_US/data-integration/azure-blob-storage.md

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Azure
1414
1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information.
1515
2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine.
1616
3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information.
17-
4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration.
17+
4. **Writing to Azure Blob Storage**: The rule triggers an action to write the message to Storage Container. Using the Azure Blob Storage Sink, users can extract data from processing results and send it to Blob Storage. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines, or Parquet file, depending on the message content and the Sink configuration.
1818

1919
After events and message data are written to Storage Container, you can connect to Azure Blob Storage to read the data for flexible application development, such as:
2020

@@ -102,7 +102,7 @@ This section demonstrates how to create a rule in EMQX to process messages from
102102

103103
7. Set the **Container** by entering `iot-data`.
104104

105-
9. Select the **Upload Method**. The differences between the two methods are as follows:
105+
8. Select the **Upload Method**. The differences between the two methods are as follows:
106106

107107
- **Direct Upload**: Each time the rule is triggered, data is uploaded directly to Azure Storage according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files.
108108
- **Aggregated Upload**: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to Azure Storage, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency.
@@ -137,11 +137,17 @@ This section demonstrates how to create a rule in EMQX to process messages from
137137

138138
Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the Blob Name as path suffixes to avoid duplication. All other placeholders are considered invalid.
139139

140-
- **Aggregation Type**: Currently, CSV and JSON Lines are supported.
140+
- **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in Azure Storage. Supported values:
141+
141142
- `CSV`: Data will be written to Azure Storage in comma-separated CSV format.
142-
- `JSON Lines`: Data will be written to Azure Storage in [JSON Lines](https://jsonlines.org/) format.
143143

144-
- **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.
144+
- `JSON Lines`: Data will be written to Azure Storage in [JSON Lines](https://jsonlines.org/) format.
145+
146+
- `parquet`: Data will be written to Azure Storage in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets.
147+
148+
> For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options).
149+
150+
- **Column Order** (for `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.
145151

146152
- **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval.
147153

@@ -163,6 +169,79 @@ You have now successfully created the rule. You can see the newly created rule o
163169

164170
You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into Azure Storage container after being parsed by the rule `my_rule`.
165171

172+
### Parquet Format Options
173+
174+
When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads.
175+
176+
This section describes all configurable options for the Parquet output format.
177+
178+
#### Parquet Schema (Avro)
179+
180+
This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data.
181+
182+
You can choose one of the following options:
183+
184+
- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro) managed in EMQX [Schema Registry](./schema-registry).
185+
186+
When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization.
187+
188+
::: tip
189+
190+
Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems.
191+
192+
:::
193+
194+
- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field.
195+
196+
Example:
197+
198+
```json
199+
{
200+
"type": "record",
201+
"name": "MessageRecord",
202+
"fields": [
203+
{"name": "clientid", "type": "string"},
204+
{"name": "timestamp", "type": "long"},
205+
{"name": "payload", "type": "string"}
206+
]
207+
}
208+
```
209+
210+
::: tip
211+
212+
Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet.
213+
214+
:::
215+
216+
#### Parquet Default Compression
217+
218+
This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data.
219+
220+
Supported values:
221+
222+
| Value | Description |
223+
| ------------------ | ------------------------------------------------------------ |
224+
| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. |
225+
| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. |
226+
| `None` | Disables compression. Suitable only for debugging or when compression is not desired. |
227+
228+
#### Parquet Max Row Group Bytes
229+
230+
This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one.
231+
232+
- **Default value:** `128 MB`
233+
234+
Guidelines:
235+
236+
- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark.
237+
- **Reduce the value** to limit memory usage during writing or when using small datasets.
238+
239+
::: tip
240+
241+
Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance.
242+
243+
:::
244+
166245
## Test the Rule
167246

168247
This section shows how to test the rule configured with the direct upload method.

en_US/data-integration/s3.md

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ EMQX utilizes rules engines and Sinks to forward device events and data to Amazo
2525
1. **Device Connection to EMQX**: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information.
2626
2. **Device Message Publishing and Receiving**: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine.
2727
3. **Rules Engine Processing Messages**: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information.
28-
4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV or JSON Lines file, depending on the message content and the Sink configuration.
28+
4. **Writing to Amazon S3**: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV, JSON Lines or Parquet file, depending on the message content and the Sink configuration.
2929

3030
After events and message data are written to Amazon S3, you can connect to Amazon S3 to read the data for flexible application development, such as:
3131

@@ -212,11 +212,17 @@ This section demonstrates how to create a rule in EMQX to process messages from
212212

213213
Note that if all placeholders marked as required are not used in the template, these placeholders will be automatically added to the S3 object key as path suffixes to avoid duplication. All other placeholders are considered invalid.
214214

215-
- **Aggregation Type**: Currently, CSV and JSON Lines are supported.
215+
- **Aggregation Type**: Defines the format of the data file used to store batched MQTT messages in S3. Supported values:
216+
216217
- `CSV`: Data will be written to S3 in comma-separated CSV format.
218+
217219
- `JSON Lines`: Data will be written to S3 in [JSON Lines](https://jsonlines.org/) format.
218-
219-
- **Column Order** (applies only when the Aggregation Type is `CSV`): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.
220+
221+
- `parquet`: Data will be written to S3 in [Apache Parquet](https://parquet.apache.org/) format, which is column-based and optimized for analytical queries over large datasets.
222+
223+
> For detailed configuration options, including schema definition, compression, and row group settings, see [Parquet Format Options](#parquet-format-options).
224+
225+
- **Column Order** (for `CSV` ): Adjust the order of rule result columns through a dropdown selection. The generated CSV file will first be sorted by the selected columns, with unselected columns sorted alphabetically following the selected columns.
220226

221227
- **Max Records**: When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval.
222228

@@ -226,8 +232,6 @@ This section demonstrates how to create a rule in EMQX to process messages from
226232

227233
::::
228234

229-
10. Set the **Object Content**. By default, it is a JSON text format containing all fields. It supports `${var}` format placeholders. Here, enter `${payload}` to indicate using the message body as the object content. In this case, the object's storage format depends on the message body's format, supporting compressed packages, images, or other binary formats.
230-
231235
11. **Fallback Actions (Optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details.
232236

233237
12. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings).
@@ -240,6 +244,79 @@ You have now successfully created the rule. You can see the newly created rule o
240244

241245
You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into S3 after being parsed by the rule `my_rule`.
242246

247+
### Parquet Format Options
248+
249+
When the **Aggregation Type** is set to `parquet`, EMQX stores aggregated rule results in the Apache Parquet format, a columnar, compressed file format optimized for analytical workloads.
250+
251+
This section describes all configurable options for the Parquet output format.
252+
253+
#### Parquet Schema (Avro)
254+
255+
This option defines how MQTT message fields are mapped to the columns in the Parquet file. EMQX uses the Apache Avro schema specification to describe the structure of the Parquet data.
256+
257+
You can choose one of the following options:
258+
259+
- **Avro Schema That Lives in Schema Registry**: Use an existing [Avro schema](./schema-registry-example-avro) managed in EMQX [Schema Registry](./schema-registry).
260+
261+
When this option is chosen, you must also specify a **Schema Name**, which identifies the schema to use for serialization.
262+
263+
::: tip
264+
265+
Use this option if you manage schemas centrally in a registry and want consistent schema evolution across multiple systems.
266+
267+
:::
268+
269+
- **Avro Schema Defined**: Define the Avro schema directly within EMQX by entering the schema JSON structure in the **Schema Definition** field.
270+
271+
Example:
272+
273+
```json
274+
{
275+
"type": "record",
276+
"name": "MessageRecord",
277+
"fields": [
278+
{"name": "clientid", "type": "string"},
279+
{"name": "timestamp", "type": "long"},
280+
{"name": "payload", "type": "string"}
281+
]
282+
}
283+
```
284+
285+
::: tip
286+
287+
Ensure that the field names and data types match the fields returned by your rule SQL. Incorrect or missing fields may cause serialization errors when writing to Parquet.
288+
289+
:::
290+
291+
#### Parquet Default Compression
292+
293+
This option specifies the compression algorithm applied to Parquet data pages within each row group. Compression helps reduce storage space and can improve I/O efficiency when querying data.
294+
295+
Supported values:
296+
297+
| Value | Description |
298+
| ------------------ | ------------------------------------------------------------ |
299+
| `snappy` (default) | A balanced choice offering fast compression and decompression with good compression ratio. Recommended for most cases. |
300+
| `zstd` | Provides a higher compression ratio with moderate CPU usage. Ideal for large-scale analytical data or long-term storage. |
301+
| `None` | Disables compression. Suitable only for debugging or when compression is not desired. |
302+
303+
#### Parquet Max Row Group Bytes
304+
305+
This option specifies the maximum size (in bytes) for each Parquet row group, which is the fundamental unit of work for reading and writing data. Once the buffered data size exceeds this threshold, EMQX flushes the current row group and starts a new one.
306+
307+
- **Default value:** `128 MB`
308+
309+
Guidelines:
310+
311+
- **Increase the value** to improve read performance for analytical queries, especially when using Athena or Spark.
312+
- **Reduce the value** to limit memory usage during writing or when using small datasets.
313+
314+
::: tip
315+
316+
Parquet readers load data at the row group level. Larger row groups typically reduce metadata overhead and improve analytical query performance.
317+
318+
:::
319+
243320
## Test the Rule
244321

245322
This section shows how to test the rule configured with the direct upload method.

0 commit comments

Comments
 (0)