Skip to content

Feature/versioning check rules#1044

Open
STEFANOVIVAS wants to merge 16 commits intodatabrickslabs:mainfrom
STEFANOVIVAS:feature/versioning_check_rules
Open

Feature/versioning check rules#1044
STEFANOVIVAS wants to merge 16 commits intodatabrickslabs:mainfrom
STEFANOVIVAS:feature/versioning_check_rules

Conversation

@STEFANOVIVAS
Copy link
Contributor

Changes

Linked issues

Resolves #672

Tests

  • manually tested
  • added unit tests
  • added integration tests
  • added end-to-end tests
  • added performance tests

@STEFANOVIVAS STEFANOVIVAS requested a review from a team as a code owner February 24, 2026 00:50
@STEFANOVIVAS STEFANOVIVAS requested review from gergo-databricks and removed request for a team February 24, 2026 00:50
@alexott
Copy link
Contributor

alexott commented Feb 26, 2026

Code review

Found 2 issues:

  1. spark.catalog.tableExists re-introduced in the save path, regressing the fix from PR Use WorkspaceClient to check table existence #1035 which replaced it with ws.tables.get() because spark.catalog.tableExists fails for tables with special characters and is blocked in Spark Declarative Pipelines. The existing load path correctly continues to use ws.tables.get(), making the two methods inconsistent.

rule_set_fingerprint = first_row[0] if first_row else None
if self.spark.catalog.tableExists(config.location) and rule_set_fingerprint is not None:
if (
not self.spark.read.table(config.location)

  1. In _load_checks_from_lakebase, the "load latest version" path fetches the most recent rule_set_fingerprint via a subquery filtered by run_config_name, but the outer SELECT only filters by rule_set_fingerprint without also filtering by run_config_name. If two different configs share identical check content (same fingerprint), the query returns rows from both configs mixed together.

select(table.c.rule_set_fingerprint)
.where(table.c.run_config_name == config.run_config_name)
.order_by(table.c.created_at.desc())
.limit(1)
.scalar_subquery()
)
stmt = select(table).where(table.c.rule_set_fingerprint == latest_rule_set_fingerprint)
with engine.connect() as conn:
result = conn.execute(stmt)
checks = result.mappings().all()

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@alexott
Copy link
Contributor

alexott commented Feb 26, 2026

Additional code review findings (lower confidence)

Found 4 additional issues (scored below primary threshold):

  1. Lakebase deduplication dead code when mode="overwrite" — in _save_checks_to_lakebase, the overwrite DELETE runs first, then the fingerprint existence check queries for a row that was just deleted. The SELECT ... WHERE rule_set_fingerprint == ... always returns empty in overwrite mode, so the early-return skip can never trigger. The existence check should run before the delete.

)
self._ensure_rule_version_columns_exist(conn, config)
logger.info("Rule version columns exist or added.")
if config.mode == "overwrite":
delete_stmt = delete(table).where(table.c.run_config_name == config.run_config_name)
result = conn.execute(delete_stmt)
logger.info(f"Deleted {result.rowcount} existing checks for run_config_name '{config.run_config_name}'")
normalized_checks = self._normalize_checks(checks, config)
rule_set_fingerprint = normalized_checks[0].get("rule_set_fingerprint")
exists_rule_set = (
select(table.c.rule_set_fingerprint)
.where(
table.c.run_config_name == config.run_config_name,
table.c.rule_set_fingerprint == rule_set_fingerprint,
)
.limit(1)
)
if conn.execute(exists_rule_set).first():
logger.info(f"Checks with rule_set_fingerprint {rule_set_fingerprint} already exist — skipping")
return
insert_stmt = insert(table)

  1. rule_set_fingerprint = 'None' on schema migration — when latest_row.rule_set_fingerprint is Python None (pre-existing rows whose column was backfilled as NULL via mergeSchema), the f-string produces the Spark SQL predicate rule_set_fingerprint = 'None' — the literal string "None", not SQL NULL. This silently returns zero rows for any table that has pre-versioning data, breaking load after a schema migration.

if not rule_set_fingerprint:
max_created_at = filtered_df.agg(F.max("created_at")).collect()[0][0]
latest_row = filtered_df.where(F.col("created_at") == max_created_at).first()
if latest_row:
rule_set_fingerprint = latest_row.rule_set_fingerprint
filtered_df = filtered_df.where(f"rule_set_fingerprint = '{rule_set_fingerprint}'")
check_rows = filtered_df.collect()

  1. Docstring contradicts the new default — both TableChecksStorageConfig and LakebaseChecksStorageConfig change mode from "overwrite" to "append", but LakebaseChecksStorageConfig's docstring still explicitly states (default is 'overwrite'). This is a breaking behavioral change for existing callers that relied on the default.

run_config_name: Name of the run configuration to use for checks (default is 'default').
mode: The mode for writing checks to a table (e.g., 'append' or 'overwrite'). The *overwrite* mode
only replaces checks for the specific run config and not all checks in the table (default is 'overwrite').
rule_set_fingerprint: Optional SHA-256 fingerprint of the rule set to load.
When provided, loads rules matching this specific fingerprint instead of the latest batch.
When None (default), loads the latest batch.
"""
location: str
instance_name: str | None = None
client_id: str | None = None
port: str = "5432"
run_config_name: str = "default"
mode: str = "append"
rule_set_fingerprint: str | None = None

  1. SQL injection via f-string for rule_set_fingerprint — the new filter in TableChecksStorageHandler.save embeds the user-supplied rule_set_fingerprint config field directly into a Spark SQL string. When the fingerprint is computed internally it is always a safe hex-SHA256, but since TableChecksStorageConfig.rule_set_fingerprint is an externally-supplied field, a crafted value can bypass the filter. The Column API (F.col("rule_set_fingerprint") == rule_set_fingerprint) avoids this, consistent with the rest of the codebase.

first_row = rules_df.select("rule_set_fingerprint").first()
rule_set_fingerprint = first_row[0] if first_row else None
if self.spark.catalog.tableExists(config.location) and rule_set_fingerprint is not None:
if (
not self.spark.read.table(config.location)
.filter(
f"run_config_name = '{config.run_config_name}' and rule_set_fingerprint = '{rule_set_fingerprint}'"
)
.isEmpty()
):

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE]: Versioning of rules

2 participants