Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,11 @@ SET json = JSON_SET(
0.2
)
WHERE name = 'tableDiff';

UPDATE installed_apps
SET json = JSON_SET(
json,
'$.appConfiguration.testCaseResultsRetentionPeriod', 1440,
'$.appConfiguration.profileDataRetentionPeriod', 1440
)
WHERE JSON_EXTRACT(json, '$.name') = 'DataRetentionApplication';
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,15 @@ SET json = json::jsonb || json_build_object(
'version', 0.2
)::jsonb
WHERE name = 'tableDiff';

UPDATE installed_apps
SET json = jsonb_set(
jsonb_set(
json,
'{appConfiguration, testCaseResultsRetentionPeriod}',
'1440'
),
'{appConfiguration, profileDataRetentionPeriod}',
'1440'
)
WHERE json->>'name' = 'DataRetentionApplication';
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityTimeSeriesDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
Expand All @@ -45,11 +46,16 @@ public class DataRetention extends AbstractNativeApplication {
private final FeedRepository feedRepository;
private final CollectionDAO.FeedDAO feedDAO;

private final EntityTimeSeriesDAO testCaseResultsDAO;
private final EntityTimeSeriesDAO profileDataDAO;

public DataRetention(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
this.eventSubscriptionDAO = collectionDAO.eventSubscriptionDAO();
this.feedRepository = Entity.getFeedRepository();
this.feedDAO = Entity.getCollectionDAO().feedDAO();
this.testCaseResultsDAO = collectionDAO.testCaseResultTimeSeriesDao();
this.profileDataDAO = collectionDAO.profilerDataTimeSeriesDao();
}

@Override
Expand Down Expand Up @@ -134,6 +140,18 @@ public void executeCleanup(DataRetentionConfiguration config) {
"Starting cleanup for activity threads with retention period: {} days.",
threadRetentionPeriod);
cleanActivityThreads(threadRetentionPeriod);

int testCaseResultsRetentionPeriod = config.getTestCaseResultsRetentionPeriod();
LOG.info(
"Starting cleanup for test case results with retention period: {} days.",
testCaseResultsRetentionPeriod);
cleanTestCaseResults(testCaseResultsRetentionPeriod);

int profileDataRetentionPeriod = config.getProfileDataRetentionPeriod();
LOG.info(
"Starting cleanup for profile data with retention period: {} days.",
profileDataRetentionPeriod);
cleanProfileData(profileDataRetentionPeriod);
}

@Transaction
Expand Down Expand Up @@ -219,6 +237,29 @@ private void cleanOrphanedRelationshipsAndHierarchies() {
}
}

@Transaction
private void cleanTestCaseResults(int retentionPeriod) {
LOG.info("Initiating test case results cleanup: Retention = {} days.", retentionPeriod);
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);

executeWithStatsTracking(
"test_case_results",
() -> testCaseResultsDAO.deleteRecordsBeforeCutOff(cutoffMillis, BATCH_SIZE));

LOG.info("Test case results cleanup complete.");
}

@Transaction
private void cleanProfileData(int retentionPeriod) {
LOG.info("Initiating profile data cleanup: Retention = {} days.", retentionPeriod);
long cutoffMillis = getRetentionCutoffMillis(retentionPeriod);

executeWithStatsTracking(
"profile_data", () -> profileDataDAO.deleteRecordsBeforeCutOff(cutoffMillis, BATCH_SIZE));

LOG.info("Profile data cleanup complete.");
}

private void executeWithStatsTracking(String entity, Supplier<Integer> deleteFunction) {
int totalDeleted = 0;
int totalFailed = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,27 @@ default void storeTimeSeriesWithOperation(
}
}

@ConnectionAwareSqlUpdate(
value =
"DELETE FROM <table> "
+ "WHERE json->>'id' IN ( "
+ " SELECT json->>'id' FROM <table> "
+ " WHERE timestamp < :cutoffTs ORDER BY timestamp LIMIT :limit "
+ ")",
connectionType = POSTGRES)
@ConnectionAwareSqlUpdate(
value =
"""
DELETE FROM <table> WHERE timestamp < :cutoffTs ORDER BY timestamp LIMIT :limit
""",
connectionType = MYSQL)
int deleteRecordsBeforeCutOff(
@Define("table") String table, @Bind("cutoffTs") long cutoffTs, @Bind("limit") int limit);

default int deleteRecordsBeforeCutOff(long cutoffTs, int limit) {
return deleteRecordsBeforeCutOff(getTimeSeriesTableName(), cutoffTs, limit);
}

/** @deprecated */
@SqlQuery(
"SELECT DISTINCT entityFQN FROM <table> WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"displayName": "Data Retention",
"appConfiguration": {
"changeEventRetentionPeriod": 7,
"activityThreadsRetentionPeriod": 60
"activityThreadsRetentionPeriod": 60,
"profileDataRetentionPeriod": 1440,
"testCaseResultsRetentionPeriod": 1440
},
"appSchedule": {
"scheduleTimeline": "Custom",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4569,6 +4569,56 @@ void test_testCaseResult_64bitIntegerFields(TestInfo testInfo)
assertNull(nullValueResult.getFailedRows(), "Failed rows should be null when not set");
}

@Test
@Order(999)
void delete_testCaseResults_verifyDeletionByTimestamp(TestInfo testInfo)
throws IOException, ParseException {
CreateTestCase create =
createRequest(testInfo)
.withEntityLink(TABLE_LINK)
.withTestDefinition(TEST_DEFINITION4.getFullyQualifiedName())
.withParameterValues(
List.of(new TestCaseParameterValue().withValue("100").withName("maxValue")));
TestCase testCase = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);

long baseTimestamp = dateToTimestamp("2024-03-01");
long dayInMs = 24 * 60 * 60 * 1000L;

for (int i = 0; i < 5; i++) {
CreateTestCaseResult createTestCaseResult =
new CreateTestCaseResult()
.withResult("result " + i)
.withTestCaseStatus(TestCaseStatus.Success)
.withTimestamp(baseTimestamp + (i * dayInMs));
postTestCaseResult(
testCase.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS);
}

long cutoffTs = baseTimestamp + (3 * dayInMs);
int limit = 10000;

int deletedCount =
org.openmetadata.service.Entity.getCollectionDAO()
.testCaseResultTimeSeriesDao()
.deleteRecordsBeforeCutOff(cutoffTs, limit);

ResultList<TestCaseResult> remainingResults =
getTestCaseResults(
testCase.getFullyQualifiedName(),
baseTimestamp,
baseTimestamp + (5 * dayInMs),
ADMIN_AUTH_HEADERS);

assertNotNull(remainingResults);

for (TestCaseResult result : remainingResults.getData()) {
long resultTimestamp = result.getTimestamp();
assertTrue(
resultTimestamp >= cutoffTs,
"All remaining test case results should have timestamps >= cutoff");
}
}

@Test
void test_testCaseFollowerInheritance(TestInfo testInfo)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.openmetadata.schema.type.TableProfile;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.util.TestUtils;
Expand Down Expand Up @@ -470,6 +472,50 @@ void get_entityProfile_notFound_404() {
"table instance for nonexistent.table not found");
}

@Test
@Order(9999)
void delete_profileData_verifyDeletionByTimestamp() throws HttpResponseException, ParseException {
long baseTimestamp = dateToTimestamp("2024-02-01");
long dayInMs = 24 * 60 * 60 * 1000L;

for (int i = 0; i < 5; i++) {
ColumnProfile columnProfile =
new ColumnProfile().withName("age").withValuesCount(100.0 + i).withNullCount(10.0);

CreateEntityProfile createProfile =
new CreateEntityProfile()
.withTimestamp(baseTimestamp + (i * dayInMs))
.withProfileData(columnProfile)
.withProfileType(CreateEntityProfile.ProfileTypeEnum.COLUMN);

createEntityProfile(TEST_TABLE.getId(), createProfile, ADMIN_AUTH_HEADERS);
}

long cutoffTs = baseTimestamp + (3 * dayInMs);
int limit = 10000;

Entity.getCollectionDAO()
.profilerDataTimeSeriesDao()
.deleteRecordsBeforeCutOff(cutoffTs, limit);

ResultList<EntityProfile> remainingProfiles =
getEntityProfiles(
TEST_TABLE.getFullyQualifiedName(),
"table",
baseTimestamp,
baseTimestamp + (5 * dayInMs),
CreateEntityProfile.ProfileTypeEnum.COLUMN,
ADMIN_AUTH_HEADERS);

assertNotNull(remainingProfiles);

for (EntityProfile profile : remainingProfiles.getData()) {
long profileTimestamp = profile.getTimestamp();
assertTrue(
profileTimestamp >= cutoffTs, "All remaining profiles should have timestamps >= cutoff");
}
}

private void addSampleProfileData(UUID tableId) throws HttpResponseException, ParseException {
// Add table profile
TableProfile tableProfile = new TableProfile().withRowCount(1000.0).withColumnCount(3.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@
"type": "integer",
"default": 60,
"minimum": 0
},
"testCaseResultsRetentionPeriod": {
"title": "Test Case Results Retention Period (days)",
"description": "Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 1440
},
"profileDataRetentionPeriod": {
"title": "Profile Data Retention Period (days)",
"description": "Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 1440
}
},
"required": [
"changeEventRetentionPeriod", "activityThreadsRetentionPeriod"
"changeEventRetentionPeriod", "activityThreadsRetentionPeriod", "testCaseResultsRetentionPeriod", "profileDataRetentionPeriod"
],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,18 @@ $$section

Enter the retention period for Activity Threads of type = 'Conversation' records in days (e.g., 30 for one month, 60 for two months).

$$

$$section
### Test Case Results Retention Period (days) $(id="testCaseResultsRetentionPeriod")

Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).

$$

$$section
### Profile Data Retention Period (days) $(id="profileDataRetentionPeriod")

Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).

$$
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,25 @@
"type": "integer",
"default": 60,
"minimum": 0
},
"testCaseResultsRetentionPeriod": {
"title": "Test Case Results Retention Period (days)",
"description": "Enter the retention period for Test Case Results in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 1440
},
"profileDataRetentionPeriod": {
"title": "Profile Data Retention Period (days)",
"description": "Enter the retention period for Profile Data in days (e.g., 30 for one month, 60 for two months).",
"type": "integer",
"default": 1440
}
},
"required": ["changeEventRetentionPeriod", "activityThreadsRetentionPeriod"],
"required": [
"changeEventRetentionPeriod",
"activityThreadsRetentionPeriod",
"testCaseResultsRetentionPeriod",
"profileDataRetentionPeriod"
],
"additionalProperties": false
}
Loading