Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,72 @@
# kernelci-storage ChangeLog

## 0.1.0 (2025-01-06)
All notable changes to this project will be documented in this file.

- Initial release
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.0] - 2025-10-23

### Added
- Streaming multipart upload support for large files (200MB+)
- New `write_file_streaming()` async method in Driver trait
- `FieldStream` wrapper to convert multipart fields to AsyncRead
- Streaming implementations for both Azure and Local storage backends
- `async-trait` dependency (0.1) for trait async methods
- `bytes` dependency (1.9) for efficient byte buffer handling

### Changed
- Refactored `ax_post_file` handler to stream uploads directly without loading entire file into memory
- Azure backend now streams uploads in 10MB chunks directly to blob storage
- Local backend now streams uploads in 10MB chunks directly to filesystem
- Upload handler processes multipart fields sequentially and starts streaming immediately

### Performance Improvements
- Reduced memory usage: Only 10MB buffer in memory at any time instead of entire file
- Improved upload performance for large files by eliminating full memory buffering
- Removed temporary file creation during Azure uploads (streaming directly from multipart)
- Better scalability: Can handle files larger than available RAM

### Technical Details
- No breaking changes to existing API - fully backward compatible
- Existing upload clients continue to work without modifications
- All existing functionality preserved (permissions, content-type, owner tags, file locking)
- Proper lifetime management for multipart field streaming
- Uses async/await with tokio AsyncRead trait

## [0.1.0] - 2025-01-06

### Added
- Initial release of KernelCI Storage Server
- JWT token-based authentication with HMAC-SHA256
- File upload/download with local caching
- Azure Blob Storage backend support with chunked uploads (10MB blocks)
- Local filesystem backend support
- Range request support for partial content downloads
- Prometheus metrics endpoint (`/metrics`)
- File locking mechanism to prevent concurrent uploads to same path
- Automatic cache cleanup when disk space < 12%
- LRU-style cache deletion (removes files older than 60 minutes)
- User-specific upload path restrictions via configuration
- SSL/TLS support with certificate configuration
- Docker support with Dockerfile

### Features
- RESTful API with endpoints:
- `GET /` - Server status
- `POST /upload` or `POST /v1/file` - File upload (requires JWT)
- `GET /*filepath` - File download (public)
- `GET /v1/checkauth` - Validate JWT token
- `GET /v1/list` - List all files (public)
- `GET /metrics` - Prometheus metrics
- Header preservation for HTTP caching (ETag, Last-Modified)
- Content-type detection with heuristics
- Configurable via TOML configuration file
- Environment variable override for config path (`KCI_STORAGE_CONFIG`)
- Verbose logging support via `--verbose` flag
- Command-line utilities:
- `--generate-jwt-secret` - Generate JWT secret
- `--generate-jwt-token` - Generate JWT token for user
- SHA-512 based cache filenames to avoid path conflicts
- Client IP detection with X-Forwarded-For support
- Disk space monitoring with hysteresis
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
[package]
name = "kernelci-storage"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[dependencies]
async-trait = "0.1"
axum = { version = "0.7.9", features = ["tracing", "multipart", "macros"] }
axum-server = { version = "0.7.1", features = ["rustls", "rustls-pemfile", "tls-rustls"] }
azure_blob_uploader = "0.1.4"
azure_storage = "0.21.0"
azure_storage_blobs = "0.21.0"
bytes = "1.9"
chksum-hash-sha2-512 = "0.0.1"
chrono = "0.4.39"
clap = { version = "4.5.23", features = ["derive"] }
Expand Down
118 changes: 117 additions & 1 deletion src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ impl AzureDriver {
}

use crate::{debug_log, get_config_content, ReceivedFile};
use async_trait::async_trait;
use axum::http::{HeaderName, HeaderValue};
use azure_storage::StorageCredentials;
use azure_storage_blobs::container::operations::BlobItem;
Expand All @@ -26,6 +27,7 @@ use std::io::Read;
use std::io::Write;
use std::sync::Arc;
use tempfile::Builder;
use tokio::io::AsyncReadExt;
use toml::Table;

#[derive(Deserialize)]
Expand Down Expand Up @@ -75,7 +77,110 @@ fn calculate_checksum(filename: &String, data: &[u8]) {
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
}

/// Write file to Azure blob storage
/// Write file to Azure blob storage using streaming (new version)
async fn write_file_to_blob_streaming(
filename: String,
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
cont_type: String,
owner_email: Option<String>,
) -> (&'static str, usize) {
let azure_cfg = Arc::new(get_azure_credentials("azure"));

let storage_account = azure_cfg.account.as_str();
let storage_key = azure_cfg.key.clone();
let storage_container = azure_cfg.container.as_str();
let storage_blob = filename.as_str();
let storage_credential = StorageCredentials::access_key(storage_account, storage_key);
let blob_client = ClientBuilder::new(storage_account, storage_credential)
.blob_client(storage_container, storage_blob);

let mut total_bytes_uploaded: usize = 0;
let chunk_size = 10 * 1024 * 1024; // 10MB chunks
let mut blocks = BlockList::default();

loop {
let mut buffer = vec![0u8; chunk_size];
let bytes_read = data.read(&mut buffer).await;
match bytes_read {
Ok(bytes_read) => {
if bytes_read == 0 {
break;
}
buffer.truncate(bytes_read);
let block_id = BlockId::new(hex::encode(total_bytes_uploaded.to_le_bytes()));
blocks
.blocks
.push(BlobBlockType::Uncommitted(block_id.clone()));
match blob_client.put_block(block_id, buffer).await {
Ok(_) => {
total_bytes_uploaded += bytes_read;
debug_log!("Uploaded {} bytes", total_bytes_uploaded);
}
Err(e) => {
eprintln!("Error uploading block: {:?}", e);
break;
}
}
}
Err(e) => {
eprintln!("Error reading stream: {:?}", e);
break;
}
}
}
match blob_client
.put_block_list(blocks)
.content_type(cont_type)
.await
{
Ok(_) => {
debug_log!("Block list uploaded");
let blob_url_res = blob_client.url();
match blob_url_res {
Ok(blob_url) => {
debug_log!("Blob URL: {}", blob_url);
}
Err(e) => {
eprintln!("Error getting blob URL: {:?}", e);
}
}

// Set owner tag if email is provided
if let Some(email) = owner_email {
let mut tags = Tags::new();
let sanitized = sanitize_tag_component(&email);
if sanitized != email {
debug_log!(
"Sanitized owner tag value from '{}' to '{}'",
email,
sanitized
);
}
// Ensure non-empty value
let final_value = if sanitized.is_empty() {
"_".to_string()
} else {
sanitized
};
tags.insert("owner".to_string(), final_value);
match blob_client.set_tags(tags).await {
Ok(_) => {
debug_log!("Owner tag set successfully");
}
Err(e) => {
eprintln!("Error setting owner tag: {:?}", e);
}
}
}
}
Err(e) => {
eprintln!("Error uploading block list: {:?}", e);
}
}
("OK", total_bytes_uploaded)
}

/// Write file to Azure blob storage (legacy version using Vec<u8>)
/// TBD: Rework, do not keep whole file as Vec<u8> in memory!!!
async fn write_file_to_blob(
filename: String,
Expand Down Expand Up @@ -429,6 +534,7 @@ async fn azure_list_files(directory: String) -> Vec<String> {
}

/// Implement Driver trait for AzureDriver
#[async_trait]
impl super::Driver for AzureDriver {
fn write_file(
&self,
Expand All @@ -445,6 +551,16 @@ impl super::Driver for AzureDriver {
});
filenameret
}
async fn write_file_streaming(
&self,
filename: String,
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
cont_type: String,
owner_email: Option<String>,
) -> (String, usize) {
let (_status, size) = write_file_to_blob_streaming(filename.clone(), data, cont_type, owner_email).await;
(filename, size)
}
fn tag_file(
&self,
filename: String,
Expand Down
71 changes: 70 additions & 1 deletion src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
// Author: Denys Fedoryshchenko <[email protected]>

use crate::{debug_log, get_config_content, ReceivedFile};
use async_trait::async_trait;
use axum::http::{HeaderName, HeaderValue};
use chksum_hash_sha2_512 as sha2_512;
use headers::HeaderMap;
use serde::Deserialize;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt;
use toml::Table;

pub struct LocalDriver;
Expand Down Expand Up @@ -91,7 +93,57 @@ fn calculate_checksum(filename: &str, data: &[u8]) {
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
}

/// Write file to local storage
/// Write file to local storage using streaming (new version)
async fn write_file_to_local_streaming(
filename: String,
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
cont_type: String,
owner_email: Option<String>,
) -> Result<(String, usize), String> {
let file_path = get_storage_file_path(&filename);

// Ensure directory structure exists
if let Err(e) = ensure_directory_exists(&file_path) {
return Err(format!("Failed to create directory structure: {}", e));
}

// Write the file with streaming
let mut file = tokio::fs::File::create(&file_path)
.await
.map_err(|e| format!("Failed to create file: {}", e))?;

let mut buffer = vec![0u8; 10 * 1024 * 1024]; // 10MB buffer
let mut total_bytes = 0;

loop {
match data.read(&mut buffer).await {
Ok(0) => break, // EOF
Ok(n) => {
tokio::io::AsyncWriteExt::write_all(&mut file, &buffer[..n])
.await
.map_err(|e| format!("Failed to write file: {}", e))?;
total_bytes += n;
debug_log!("Written {} bytes to local storage", total_bytes);
}
Err(e) => return Err(format!("Failed to read stream: {}", e)),
}
}

// Create and write metadata (headers and owner tag)
let metadata_path = get_metadata_file_path(&filename);
if let Ok(mut metadata_file) = File::create(&metadata_path) {
let mut metadata_content = format!("content-type:{}\n", cont_type);
if let Some(email) = owner_email {
metadata_content.push_str(&format!("tag-owner:{}\n", email));
}
let _ = metadata_file.write_all(metadata_content.as_bytes());
}

debug_log!("File written to local storage: {}", file_path.display());
Ok((filename, total_bytes))
}

/// Write file to local storage (legacy version using Vec<u8>)
fn write_file_to_local(
filename: String,
data: Vec<u8>,
Expand Down Expand Up @@ -272,6 +324,7 @@ fn set_tags_for_local_file(
}

/// Implement Driver trait for LocalDriver
#[async_trait]
impl super::Driver for LocalDriver {
fn write_file(
&self,
Expand All @@ -289,6 +342,22 @@ impl super::Driver for LocalDriver {
}
}

async fn write_file_streaming(
&self,
filename: String,
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
cont_type: String,
owner_email: Option<String>,
) -> (String, usize) {
match write_file_to_local_streaming(filename.clone(), data, cont_type, owner_email).await {
Ok((fname, size)) => (fname, size),
Err(e) => {
eprintln!("Local storage streaming write error: {}", e);
(String::new(), 0)
}
}
}

fn get_file(&self, filename: String) -> ReceivedFile {
get_file_from_local(filename)
}
Expand Down
Loading