Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ prettyplease = { version = "0.2.4", optional = true }
schemars = { version = "0.8.0", optional = true }
syn = { version = "2.0.11", optional = true }
typify = { version = "0.0.16", optional = true }
url = "2.5.4"

[build-dependencies]
prettyplease = { version = "0.2.4", optional = true }
Expand Down
90 changes: 80 additions & 10 deletions python/generate_python_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
import shutil
import subprocess
import sys
from typing import Any, Optional, Union

import sentry_kafka_schemas
import sentry_kafka_schemas.sentry_kafka_schemas
from jsonschema_gentypes import cli, jsonschema_draft_06, jsonschema_draft_2020_12_applicator
from jsonschema_gentypes.resolver import RefResolver, UnRedolvedException
from sentry_kafka_schemas.codecs.json import file_handler


def run(target_folder: str = "python/sentry_kafka_schemas/schema_types/") -> None:
Expand Down Expand Up @@ -32,22 +36,88 @@ def run(target_folder: str = "python/sentry_kafka_schemas/schema_types/") -> Non
f"conflict: two schemas are ending up in module name {schema_tmp_module_name}"
)

cli.jsonschema_gentypes.resolver.RefResolver = FileRefResolver # type: ignore[attr-defined]
sys.argv = [
"generate_python_types",
"--json-schema",
schema_data["schema_filepath"],
"--python",
os.path.join(target_folder, f"{schema_tmp_module_name}.py"),
]

already_used_filenames.add(schema_tmp_module_name)
subprocess.check_call(
[
"jsonschema-gentypes",
"--json-schema",
schema_data["schema_filepath"],
"--python",
os.path.join(target_folder, f"{schema_tmp_module_name}.py"),
]
)
cli.main()

index_code_path = os.path.join(target_folder, "__init__.py")
with open(index_code_path, "w"):
# just touch file so it exists
pass


class FileRefResolver(RefResolver):
"""Extended RefResolver that can handle file:// references"""

def __init__(
self,
base_url: str,
schema: Optional[
Union[
jsonschema_draft_06.JSONSchemaItemD6,
jsonschema_draft_2020_12_applicator.JSONSchemaItemD2020,
]
] = None,
schemas_dir: str = "schemas",
):
super().__init__(base_url, schema)
self.schemas_dir = schemas_dir
self.base_path = os.path.dirname(base_url) if isinstance(base_url, str) else None
self.loaded_schemas: dict[str, Any] = {}

def lookup(self, uri: str) -> Any:
if uri.startswith("#/"):
fragment = uri[2:]
# Try current schema first
try:
return self._resolve_internal_ref(fragment)
except UnRedolvedException:
# Try all loaded schemas
for loaded_schema in self.loaded_schemas.values():
try:
return self._resolve_internal_ref(fragment, loaded_schema)
except UnRedolvedException:
continue

if uri.startswith("file://"):
filename, fragment = uri.split("#", 1) if "#" in uri else (uri, "")

referenced_schema = file_handler(filename)
self.loaded_schemas[filename] = referenced_schema

if fragment:
parts = fragment.split("/")
current = referenced_schema
for part in parts[1:]:
if not isinstance(current, dict) or part not in current:
raise UnRedolvedException(f"Invalid reference path: {fragment}")
current = current[part]
return current
return referenced_schema
return super().lookup(uri)

def _resolve_internal_ref(self, ref_path: str, schema: Any | None = None) -> Any:
"""Resolve internal references within a schema"""
if schema is None:
schema = self.schema

parts = ref_path.split("/")
current = schema

for part in parts:
if not isinstance(current, dict) or part not in current:
raise UnRedolvedException(f"Reference not found: {ref_path}")
current = current[part] # type: ignore[literal-required]
return current


if __name__ == "__main__":
run()
13 changes: 12 additions & 1 deletion python/sentry_kafka_schemas/codecs/json.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import pathlib
from typing import Any, Optional, TypeVar, cast

import fastjsonschema
Expand All @@ -8,14 +9,24 @@

T = TypeVar("T")

BASE_DIR = pathlib.Path(__file__).parent.parent / "schemas"


def file_handler(uri: str) -> Any:
absolute_path = BASE_DIR / uri[7:]
if not absolute_path.exists():
raise FileNotFoundError(f"Schema file not found: {absolute_path}")
with open(absolute_path) as f:
return rapidjson.load(f)


class JsonCodec(Codec[T]):
def __init__(
self,
json_schema: Optional[object],
) -> None:
if json_schema is not None:
self.__validate = fastjsonschema.compile(json_schema)
self.__validate = fastjsonschema.compile(json_schema, handlers={"file": file_handler})
else:
self.__validate = lambda _: None

Expand Down
3 changes: 2 additions & 1 deletion python/sentry_kafka_schemas/codecs/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fastjsonschema
import msgpack
from sentry_kafka_schemas.codecs import Codec, ValidationError
from sentry_kafka_schemas.codecs.json import file_handler

T = TypeVar("T")

Expand All @@ -14,7 +15,7 @@ class MsgpackCodec(Codec[T]):

def __init__(self, json_schema: Optional[object]) -> None:
if json_schema is not None:
self.__validate = fastjsonschema.compile(json_schema)
self.__validate = fastjsonschema.compile(json_schema, handlers={"file": file_handler})
else:
self.__validate = lambda _: None

Expand Down
12 changes: 10 additions & 2 deletions python/tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import jsonschema
import pytest
import rapidjson
from jsonschema import RefResolver
from sentry_kafka_schemas import iter_examples
from sentry_kafka_schemas.codecs.json import BASE_DIR, file_handler
from sentry_kafka_schemas.sentry_kafka_schemas import _get_schema, get_codec, get_topic, list_topics
from sentry_kafka_schemas.types import Example

Expand Down Expand Up @@ -65,11 +67,17 @@ def test_json_examples(
example_data = example.load()

if jsonschema_library == "fastjsonschema":
compiled = fastjsonschema.compile(schema)
compiled = fastjsonschema.compile(schema, handlers={"file": file_handler})
compiled(example_data)
elif jsonschema_library == "jsonschema":
try:
jsonschema.validate(example_data, schema)
# TODO: Is this even necessary? Looks like we removed usage of jsonschema?
resolver = RefResolver(
base_uri=f"file:/{BASE_DIR}",
referrer=schema,
handlers={"file": file_handler}, # Use the custom_resolver function
)
jsonschema.validate(example_data, schema, resolver=resolver)
except jsonschema.ValidationError as e:
_get_most_specific_jsonschema_error(e)
elif jsonschema_library == "rapidjson":
Expand Down
54 changes: 48 additions & 6 deletions rust/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use jsonschema::JSONSchema;
use jsonschema::{JSONSchema, SchemaResolver, SchemaResolverError};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use thiserror::Error;

use url::Url;
// This file is supposed to be auto-generated via rust/build.rs
pub mod schema_types {
include!(concat!(env!("OUT_DIR"), "/schema_types.rs"));
Expand Down Expand Up @@ -148,6 +150,37 @@ fn get_topic_schema(topic: &str, version: Option<u16>) -> Result<TopicSchema, Sc
Ok(schema_metadata)
}

struct FileSchemaResolver {}

impl FileSchemaResolver {
fn new() -> Self {
Self {}
}
}

impl SchemaResolver for FileSchemaResolver {
fn resolve(
&self,
_root_schema: &Value,
url: &Url,
_original_reference: &str,
) -> Result<Arc<Value>, SchemaResolverError> {
if url.scheme() == "file" {
let url_str = url.as_str();
let relative_path = &url_str[7..url_str.len() - 1];
let schema = find_entry(SCHEMAS, relative_path).ok_or(SchemaError::InvalidSchema)?;
let schema_json =
serde_json::from_str(schema).map_err(|_| SchemaError::InvalidSchema)?;
return Ok(Arc::new(schema_json));
}

Err(SchemaResolverError::new(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Unsupported URL scheme: {}", url.scheme()),
))))
}
}

/// Returns the schema for a topic. If `version` is passed, return the schema for
/// the specified version, otherwise the latest version is returned.
///
Expand All @@ -163,7 +196,11 @@ pub fn get_schema(topic: &str, version: Option<u16>) -> Result<Schema, SchemaErr
find_entry(SCHEMAS, &schema_metadata.resource).ok_or(SchemaError::InvalidSchema)?;

let s = serde_json::from_str(schema).map_err(|_| SchemaError::InvalidSchema)?;
let compiled_json_schema = JSONSchema::compile(&s).map_err(|_| SchemaError::InvalidSchema)?;
let resolver = FileSchemaResolver::new();
let compiled_json_schema = JSONSchema::options()
.with_resolver(resolver)
.compile(&s)
.map_err(|_| SchemaError::InvalidSchema)?;

// FIXME(swatinem): This assumes that there is only a single `examples` entry in the definition.
// If we would want to support multiple, we would have to either merge those in code generation,
Expand Down Expand Up @@ -211,9 +248,8 @@ mod tests {
get_schema("transactions", Some(1)).unwrap();
}

#[test]
fn test_validate() {
let schema = get_schema("snuba-queries", None).unwrap();
fn validate_schema(schema_name: &str) {
let schema = get_schema(schema_name, None).unwrap();

let examples = schema.examples();
assert!(!examples.is_empty());
Expand All @@ -226,4 +262,10 @@ mod tests {
Err(SchemaError::InvalidMessage)
));
}

#[test]
fn test_validate() {
validate_schema("snuba-queries");
validate_schema("uptime-results");
}
}
Loading