diff --git a/CHANGELOG.md b/CHANGELOG.md index 676751d2..ccb314b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [3.6.1] - 2025-05-14 +## [3.6.2] - 2025-06-17 +## Fixed +- Extended the use of Hive's partition `createTime` to event-driven scheduling. Cleanup events are now scheduled based on the partition's actual creation time in Hive, not the event processing time. + +- ## [3.6.1] - 2025-05-14 ## Fixed - Use Hive partition creation time (`createTime`) for scheduling partition cleanup, ensuring accurate expiry timing for both new and existing partitions. - Updated workflows to use `Ubuntu 22.04` instead of `Ubuntu 20.04`. diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java index bd9d6853..53893e35 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java @@ -116,8 +116,9 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler( @Bean(name = "expiredHousekeepingMetadataGenerator") public HousekeepingEntityGenerator expiredHousekeepingMetadataGenerator( - @Value("${properties.beekeeper.default-expiration-delay}") String cleanupDelay) { - return new ExpiredHousekeepingMetadataGenerator(cleanupDelay); + @Value("${properties.beekeeper.default-expiration-delay}") String cleanupDelay, + @Qualifier("hiveClientFactory") HiveClientFactory hiveClientFactory) { + return new ExpiredHousekeepingMetadataGenerator(cleanupDelay, hiveClientFactory); } @Bean(name = "expiredHousekeepingMetadataMessageEventHandler") diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGenerator.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGenerator.java index 159dc0d8..65e24879 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGenerator.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGenerator.java @@ -26,6 +26,8 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,6 +49,9 @@ import com.expediagroup.beekeeper.core.model.LifecycleEventType; import com.expediagroup.beekeeper.core.model.PeriodDuration; import com.expediagroup.beekeeper.scheduler.apiary.generator.utils.CleanupDelayExtractor; +import com.expediagroup.beekeeper.scheduler.hive.HiveClient; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; +import com.expediagroup.beekeeper.scheduler.hive.PartitionInfo; public class ExpiredHousekeepingMetadataGenerator implements HousekeepingEntityGenerator { @@ -57,16 +62,19 @@ public class ExpiredHousekeepingMetadataGenerator implements HousekeepingEntityG private final CleanupDelayExtractor cleanupDelayExtractor; private final Clock clock; + private final HiveClientFactory hiveClientFactory; - public ExpiredHousekeepingMetadataGenerator(String cleanupDelay) { + public ExpiredHousekeepingMetadataGenerator(String cleanupDelay, HiveClientFactory hiveClientFactory) { this(new CleanupDelayExtractor(EXPIRED_DATA_RETENTION_PERIOD_PROPERTY_KEY, cleanupDelay), - Clock.systemDefaultZone()); + Clock.systemDefaultZone(), hiveClientFactory); } @VisibleForTesting - ExpiredHousekeepingMetadataGenerator(CleanupDelayExtractor cleanupDelayExtractor, Clock clock) { + ExpiredHousekeepingMetadataGenerator(CleanupDelayExtractor cleanupDelayExtractor, Clock clock, + HiveClientFactory hiveClientFactory) { this.cleanupDelayExtractor = cleanupDelayExtractor; this.clock = clock; + this.hiveClientFactory = hiveClientFactory; } @Override @@ -136,10 +144,13 @@ private HousekeepingEntity generateHousekeepingEntity( String path, String partitionName) { PeriodDuration cleanupDelay = cleanupDelayExtractor.extractCleanupDelay(listenerEvent); + + LocalDateTime creationTime = getPartitionCreationTime(listenerEvent.getDbName(), listenerEvent.getTableName(), partitionName); + return HousekeepingMetadata .builder() .housekeepingStatus(SCHEDULED) - .creationTimestamp(LocalDateTime.now(clock)) + .creationTimestamp(creationTime) .cleanupDelay(cleanupDelay) .lifecycleType(LIFECYCLE_EVENT_TYPE.toString()) .clientId(clientId) @@ -163,4 +174,22 @@ private String generatePartitionName(List keys, List values) { .mapToObj(i -> keys.get(i) + "=" + values.get(i)) .collect(Collectors.joining("/")); } + + private LocalDateTime getPartitionCreationTime(String databaseName, String tableName, String partitionName) { + try (HiveClient hiveClient = hiveClientFactory.newInstance()) { + Optional partitionInfo = hiveClient.getSinglePartitionInfo(databaseName, tableName, partitionName); + + if (partitionInfo.isPresent()) { + return partitionInfo.get().getCreateTime(); + } + + log.warn("Partition {} not found in Hive for table {}.{}, using current time", + partitionName, databaseName, tableName); + return LocalDateTime.now(clock); + } catch (Exception e) { + log.warn("Failed to get partition creation time from Hive for {}.{}.{}, using current time", + databaseName, tableName, partitionName, e); + return LocalDateTime.now(clock); + } + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java index ff1d5443..6211b5b5 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java @@ -46,6 +46,8 @@ import com.expediagroup.beekeeper.scheduler.apiary.handler.MessageEventHandler; import com.expediagroup.beekeeper.scheduler.apiary.messaging.BeekeeperEventReader; import com.expediagroup.beekeeper.scheduler.apiary.messaging.RetryingMessageReader; +import com.expediagroup.beekeeper.scheduler.hive.PartitionIteratorFactory; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; import com.expediagroup.beekeeper.scheduler.service.SchedulerService; import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; @@ -110,7 +112,8 @@ public void validateUnreferencedHousekeepingPathMessageEventHandler() { @Test public void validateExpiredHousekeepingMetadataGenerator() { - HousekeepingEntityGenerator generator = commonBeans.expiredHousekeepingMetadataGenerator("P30D"); + HiveClientFactory mockHiveClientFactory = mock(HiveClientFactory.class); + HousekeepingEntityGenerator generator = commonBeans.expiredHousekeepingMetadataGenerator("P30D", mockHiveClientFactory); assertThat(generator).isInstanceOf(ExpiredHousekeepingMetadataGenerator.class); } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java index 5041c910..34cfdc16 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java @@ -30,8 +30,10 @@ import static com.expediagroup.beekeeper.core.model.LifecycleEventType.EXPIRED; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,6 +52,9 @@ import com.expediagroup.beekeeper.core.error.BeekeeperException; import com.expediagroup.beekeeper.core.model.HousekeepingEntity; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; +import com.expediagroup.beekeeper.scheduler.hive.HiveClient; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; +import com.expediagroup.beekeeper.scheduler.hive.PartitionInfo; @ExtendWith(MockitoExtension.class) public class ExpiredHousekeepingMetadataGeneratorTest extends HousekeepingEntityGeneratorTestBase { @@ -65,11 +70,13 @@ public class ExpiredHousekeepingMetadataGeneratorTest extends HousekeepingEntity @Mock private AlterTableEvent alterTableEvent; @Mock private AddPartitionEvent addPartitionEvent; @Mock private AlterPartitionEvent alterPartitionEvent; + @Mock private HiveClientFactory hiveClientFactory; + @Mock private HiveClient hiveClient; private ExpiredHousekeepingMetadataGenerator generator; @BeforeEach public void setup() { - generator = new ExpiredHousekeepingMetadataGenerator(cleanupDelayExtractor, clock); + generator = new ExpiredHousekeepingMetadataGenerator(cleanupDelayExtractor, clock, hiveClientFactory); } @Test @@ -101,6 +108,8 @@ public void typicalHandleAddPartitionEvent() { when(addPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH); when(addPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS); when(addPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); + when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.empty()); List housekeepingEntities = generator.generate(addPartitionEvent, CLIENT_ID); assertThat(housekeepingEntities.size()).isEqualTo(1); @@ -114,6 +123,8 @@ public void typicalHandleAlterPartitionEvent() { when(alterPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH); when(alterPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS); when(alterPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); + when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.empty()); List housekeepingEntities = generator.generate(alterPartitionEvent, CLIENT_ID); assertThat(housekeepingEntities.size()).isEqualTo(1); @@ -142,6 +153,26 @@ public void exceptionThrownOnUnhandledEvent() { } } + @Test + public void usesPartitionCreationTimeFromHive() { + setupListenerEvent(addPartitionEvent, ADD_PARTITION); + when(cleanupDelayExtractor.extractCleanupDelay(addPartitionEvent)).thenReturn(CLEANUP_DELAY); + when(addPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH); + when(addPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS); + when(addPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); + LocalDateTime hiveCreationTime = LocalDateTime.of(2023, 6, 15, 10, 30); + PartitionInfo partitionInfo = new PartitionInfo(PARTITION_PATH, hiveCreationTime); + when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.of(partitionInfo)); + + List housekeepingEntities = generator.generate(addPartitionEvent, CLIENT_ID); + + assertThat(housekeepingEntities.size()).isEqualTo(1); + HousekeepingMetadata metadata = (HousekeepingMetadata) housekeepingEntities.get(0); + assertThat(metadata.getCreationTimestamp()).isEqualTo(hiveCreationTime); + assertThat(metadata.getPartitionName()).isEqualTo(PARTITION_NAME); + } + private void assertExpiredHousekeepingMetadataEntity(HousekeepingEntity housekeepingEntity, String partitionName) { HousekeepingMetadata housekeepingMetadata = (HousekeepingMetadata) housekeepingEntity; assertHousekeepingEntity(housekeepingMetadata, EXPIRED); diff --git a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java index b8541909..25234381 100644 --- a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java +++ b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -75,6 +76,36 @@ public Map getTablePartitionsInfo(String databaseName, St } } + /** + * Retrieves information for a single partition from Hive metastore. + * + * @param databaseName the name of the Hive database + * @param tableName the name of the Hive table + * @param partitionName the partition identifier in Hive's standard format: "key1=value1/key2=value2" + * (e.g., "event_date=2024-01-01/event_hour=1"), or null for table-level events + * @return Optional containing PartitionInfo if found, empty if partition doesn't exist, on error, or if partitionName is null + */ + public Optional getSinglePartitionInfo(String databaseName, String tableName, String partitionName) { + if (partitionName == null) { + return Optional.empty(); + } + try { + List partitionValues = Warehouse.getPartValuesFromPartName(partitionName); + Partition partition = metaStoreClient.getPartition(databaseName, tableName, partitionValues); + String path = partition.getSd().getLocation(); + LocalDateTime createTime = extractCreateTime(partition); + + log.debug("Retrieved partition '{}' with path '{}' for table {}.{}", + partitionName, path, databaseName, tableName); + + return Optional.of(new PartitionInfo(path, createTime)); + } catch (TException e) { + log.warn("Failed to get partition info for {}.{}.{}", + databaseName, tableName, partitionName, e); + return Optional.empty(); + } + } + private LocalDateTime extractCreateTime(Partition partition) { if (partition.getCreateTime() > 0) { return LocalDateTime.ofInstant( diff --git a/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java index 13a49a00..8aff9458 100644 --- a/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java +++ b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java @@ -25,6 +25,7 @@ import java.time.ZoneId; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -213,4 +214,56 @@ public void partitionInfoWithNegativeCreateTime() throws TException { assertThat(createTime).isAfterOrEqualTo(beforeTest); assertThat(createTime).isBeforeOrEqualTo(afterTest); } + + @Test + public void getSinglePartitionInfoSuccess() throws TException { + when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenReturn(partition); + when(partition.getSd()).thenReturn(storageDescriptor); + when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH); + when(partition.getCreateTime()).thenReturn(1234567890); + + Optional partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME); + + assertThat(partitionInfo).isPresent(); + assertThat(partitionInfo.get().getPath()).isEqualTo(PARTITION_PATH); + assertThat(partitionInfo.get().getCreateTime()) + .isEqualTo(LocalDateTime.ofInstant(Instant.ofEpochSecond(1234567890), ZoneId.systemDefault())); + } + + @Test + public void getSinglePartitionInfoWithMissingCreateTime() throws TException { + when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenReturn(partition); + when(partition.getSd()).thenReturn(storageDescriptor); + when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH); + when(partition.getCreateTime()).thenReturn(0); + + LocalDateTime beforeTest = LocalDateTime.now(); + + Optional partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME); + + LocalDateTime afterTest = LocalDateTime.now(); + + assertThat(partitionInfo).isPresent(); + assertThat(partitionInfo.get().getPath()).isEqualTo(PARTITION_PATH); + LocalDateTime createTime = partitionInfo.get().getCreateTime(); + assertThat(createTime).isNotNull(); + assertThat(createTime).isAfterOrEqualTo(beforeTest); + assertThat(createTime).isBeforeOrEqualTo(afterTest); + } + + @Test + public void getSinglePartitionInfoNotFound() throws TException { + when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenThrow(TException.class); + + Optional partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME); + + assertThat(partitionInfo).isEmpty(); + } + + @Test + public void getSinglePartitionInfoWithNullPartitionName() { + Optional partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, null); + + assertThat(partitionInfo).isEmpty(); + } }