Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.6.1] - 2025-05-14
## [3.6.2] - 2025-06-17
## Fixed
- Extended the use of Hive's partition `createTime` to event-driven scheduling. Cleanup events are now scheduled based on the partition's actual creation time in Hive, not the event processing time.

- ## [3.6.1] - 2025-05-14
## Fixed
- Use Hive partition creation time (`createTime`) for scheduling partition cleanup, ensuring accurate expiry timing for both new and existing partitions.
- Updated workflows to use `Ubuntu 22.04` instead of `Ubuntu 20.04`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler(

@Bean(name = "expiredHousekeepingMetadataGenerator")
public HousekeepingEntityGenerator expiredHousekeepingMetadataGenerator(
@Value("${properties.beekeeper.default-expiration-delay}") String cleanupDelay) {
return new ExpiredHousekeepingMetadataGenerator(cleanupDelay);
@Value("${properties.beekeeper.default-expiration-delay}") String cleanupDelay,
@Qualifier("hiveClientFactory") HiveClientFactory hiveClientFactory) {
return new ExpiredHousekeepingMetadataGenerator(cleanupDelay, hiveClientFactory);
}

@Bean(name = "expiredHousekeepingMetadataMessageEventHandler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -47,6 +49,9 @@
import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.core.model.PeriodDuration;
import com.expediagroup.beekeeper.scheduler.apiary.generator.utils.CleanupDelayExtractor;
import com.expediagroup.beekeeper.scheduler.hive.HiveClient;
import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory;
import com.expediagroup.beekeeper.scheduler.hive.PartitionInfo;

public class ExpiredHousekeepingMetadataGenerator implements HousekeepingEntityGenerator {

Expand All @@ -57,16 +62,19 @@ public class ExpiredHousekeepingMetadataGenerator implements HousekeepingEntityG

private final CleanupDelayExtractor cleanupDelayExtractor;
private final Clock clock;
private final HiveClientFactory hiveClientFactory;

public ExpiredHousekeepingMetadataGenerator(String cleanupDelay) {
public ExpiredHousekeepingMetadataGenerator(String cleanupDelay, HiveClientFactory hiveClientFactory) {
this(new CleanupDelayExtractor(EXPIRED_DATA_RETENTION_PERIOD_PROPERTY_KEY, cleanupDelay),
Clock.systemDefaultZone());
Clock.systemDefaultZone(), hiveClientFactory);
}

@VisibleForTesting
ExpiredHousekeepingMetadataGenerator(CleanupDelayExtractor cleanupDelayExtractor, Clock clock) {
ExpiredHousekeepingMetadataGenerator(CleanupDelayExtractor cleanupDelayExtractor, Clock clock,
HiveClientFactory hiveClientFactory) {
this.cleanupDelayExtractor = cleanupDelayExtractor;
this.clock = clock;
this.hiveClientFactory = hiveClientFactory;
}

@Override
Expand Down Expand Up @@ -136,10 +144,13 @@ private HousekeepingEntity generateHousekeepingEntity(
String path,
String partitionName) {
PeriodDuration cleanupDelay = cleanupDelayExtractor.extractCleanupDelay(listenerEvent);

LocalDateTime creationTime = getPartitionCreationTime(listenerEvent.getDbName(), listenerEvent.getTableName(), partitionName);

return HousekeepingMetadata
.builder()
.housekeepingStatus(SCHEDULED)
.creationTimestamp(LocalDateTime.now(clock))
.creationTimestamp(creationTime)
.cleanupDelay(cleanupDelay)
.lifecycleType(LIFECYCLE_EVENT_TYPE.toString())
.clientId(clientId)
Expand All @@ -163,4 +174,22 @@ private String generatePartitionName(List<String> keys, List<String> values) {
.mapToObj(i -> keys.get(i) + "=" + values.get(i))
.collect(Collectors.joining("/"));
}

private LocalDateTime getPartitionCreationTime(String databaseName, String tableName, String partitionName) {
try (HiveClient hiveClient = hiveClientFactory.newInstance()) {
Optional<PartitionInfo> partitionInfo = hiveClient.getSinglePartitionInfo(databaseName, tableName, partitionName);

Comment thread
HamzaJugon marked this conversation as resolved.
if (partitionInfo.isPresent()) {
return partitionInfo.get().getCreateTime();
}

Comment thread
HamzaJugon marked this conversation as resolved.
log.warn("Partition {} not found in Hive for table {}.{}, using current time",
partitionName, databaseName, tableName);
return LocalDateTime.now(clock);
} catch (Exception e) {
log.warn("Failed to get partition creation time from Hive for {}.{}.{}, using current time",
databaseName, tableName, partitionName, e);
return LocalDateTime.now(clock);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import com.expediagroup.beekeeper.scheduler.apiary.handler.MessageEventHandler;
import com.expediagroup.beekeeper.scheduler.apiary.messaging.BeekeeperEventReader;
import com.expediagroup.beekeeper.scheduler.apiary.messaging.RetryingMessageReader;
import com.expediagroup.beekeeper.scheduler.hive.PartitionIteratorFactory;
import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory;
import com.expediagroup.beekeeper.scheduler.service.SchedulerService;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
Expand Down Expand Up @@ -110,7 +112,8 @@ public void validateUnreferencedHousekeepingPathMessageEventHandler() {

@Test
public void validateExpiredHousekeepingMetadataGenerator() {
HousekeepingEntityGenerator generator = commonBeans.expiredHousekeepingMetadataGenerator("P30D");
HiveClientFactory mockHiveClientFactory = mock(HiveClientFactory.class);
HousekeepingEntityGenerator generator = commonBeans.expiredHousekeepingMetadataGenerator("P30D", mockHiveClientFactory);
assertThat(generator).isInstanceOf(ExpiredHousekeepingMetadataGenerator.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

import static com.expediagroup.beekeeper.core.model.LifecycleEventType.EXPIRED;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -50,6 +52,9 @@
import com.expediagroup.beekeeper.core.error.BeekeeperException;
import com.expediagroup.beekeeper.core.model.HousekeepingEntity;
import com.expediagroup.beekeeper.core.model.HousekeepingMetadata;
import com.expediagroup.beekeeper.scheduler.hive.HiveClient;
import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory;
import com.expediagroup.beekeeper.scheduler.hive.PartitionInfo;

@ExtendWith(MockitoExtension.class)
public class ExpiredHousekeepingMetadataGeneratorTest extends HousekeepingEntityGeneratorTestBase {
Expand All @@ -65,11 +70,13 @@ public class ExpiredHousekeepingMetadataGeneratorTest extends HousekeepingEntity
@Mock private AlterTableEvent alterTableEvent;
@Mock private AddPartitionEvent addPartitionEvent;
@Mock private AlterPartitionEvent alterPartitionEvent;
@Mock private HiveClientFactory hiveClientFactory;
@Mock private HiveClient hiveClient;
private ExpiredHousekeepingMetadataGenerator generator;

@BeforeEach
public void setup() {
generator = new ExpiredHousekeepingMetadataGenerator(cleanupDelayExtractor, clock);
generator = new ExpiredHousekeepingMetadataGenerator(cleanupDelayExtractor, clock, hiveClientFactory);
}

@Test
Expand Down Expand Up @@ -101,6 +108,8 @@ public void typicalHandleAddPartitionEvent() {
when(addPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH);
when(addPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS);
when(addPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES);
when(hiveClientFactory.newInstance()).thenReturn(hiveClient);
when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.empty());

List<HousekeepingEntity> housekeepingEntities = generator.generate(addPartitionEvent, CLIENT_ID);
assertThat(housekeepingEntities.size()).isEqualTo(1);
Expand All @@ -114,6 +123,8 @@ public void typicalHandleAlterPartitionEvent() {
when(alterPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH);
when(alterPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS);
when(alterPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES);
when(hiveClientFactory.newInstance()).thenReturn(hiveClient);
when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.empty());

List<HousekeepingEntity> housekeepingEntities = generator.generate(alterPartitionEvent, CLIENT_ID);
assertThat(housekeepingEntities.size()).isEqualTo(1);
Expand Down Expand Up @@ -142,6 +153,26 @@ public void exceptionThrownOnUnhandledEvent() {
}
}

@Test
public void usesPartitionCreationTimeFromHive() {
setupListenerEvent(addPartitionEvent, ADD_PARTITION);
when(cleanupDelayExtractor.extractCleanupDelay(addPartitionEvent)).thenReturn(CLEANUP_DELAY);
when(addPartitionEvent.getPartitionLocation()).thenReturn(PARTITION_PATH);
when(addPartitionEvent.getPartitionKeys()).thenReturn(PARTITION_KEYS);
when(addPartitionEvent.getPartitionValues()).thenReturn(PARTITION_VALUES);
when(hiveClientFactory.newInstance()).thenReturn(hiveClient);
LocalDateTime hiveCreationTime = LocalDateTime.of(2023, 6, 15, 10, 30);
PartitionInfo partitionInfo = new PartitionInfo(PARTITION_PATH, hiveCreationTime);
when(hiveClient.getSinglePartitionInfo(DATABASE, TABLE, PARTITION_NAME)).thenReturn(Optional.of(partitionInfo));

List<HousekeepingEntity> housekeepingEntities = generator.generate(addPartitionEvent, CLIENT_ID);

assertThat(housekeepingEntities.size()).isEqualTo(1);
HousekeepingMetadata metadata = (HousekeepingMetadata) housekeepingEntities.get(0);
assertThat(metadata.getCreationTimestamp()).isEqualTo(hiveCreationTime);
assertThat(metadata.getPartitionName()).isEqualTo(PARTITION_NAME);
}

private void assertExpiredHousekeepingMetadataEntity(HousekeepingEntity housekeepingEntity, String partitionName) {
HousekeepingMetadata housekeepingMetadata = (HousekeepingMetadata) housekeepingEntity;
assertHousekeepingEntity(housekeepingMetadata, EXPIRED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -75,6 +76,36 @@ public Map<String, PartitionInfo> getTablePartitionsInfo(String databaseName, St
}
}

/**
* Retrieves information for a single partition from Hive metastore.
*
* @param databaseName the name of the Hive database
* @param tableName the name of the Hive table
* @param partitionName the partition identifier in Hive's standard format: "key1=value1/key2=value2"
* (e.g., "event_date=2024-01-01/event_hour=1"), or null for table-level events
* @return Optional containing PartitionInfo if found, empty if partition doesn't exist, on error, or if partitionName is null
*/
public Optional<PartitionInfo> getSinglePartitionInfo(String databaseName, String tableName, String partitionName) {
if (partitionName == null) {
return Optional.empty();
}
try {
List<String> partitionValues = Warehouse.getPartValuesFromPartName(partitionName);
Partition partition = metaStoreClient.getPartition(databaseName, tableName, partitionValues);
String path = partition.getSd().getLocation();
LocalDateTime createTime = extractCreateTime(partition);

log.debug("Retrieved partition '{}' with path '{}' for table {}.{}",
partitionName, path, databaseName, tableName);

return Optional.of(new PartitionInfo(path, createTime));
} catch (TException e) {
log.warn("Failed to get partition info for {}.{}.{}",
databaseName, tableName, partitionName, e);
return Optional.empty();
}
}

private LocalDateTime extractCreateTime(Partition partition) {
if (partition.getCreateTime() > 0) {
return LocalDateTime.ofInstant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -213,4 +214,56 @@ public void partitionInfoWithNegativeCreateTime() throws TException {
assertThat(createTime).isAfterOrEqualTo(beforeTest);
assertThat(createTime).isBeforeOrEqualTo(afterTest);
}

@Test
public void getSinglePartitionInfoSuccess() throws TException {
when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenReturn(partition);
when(partition.getSd()).thenReturn(storageDescriptor);
when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH);
when(partition.getCreateTime()).thenReturn(1234567890);

Optional<PartitionInfo> partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME);

assertThat(partitionInfo).isPresent();
assertThat(partitionInfo.get().getPath()).isEqualTo(PARTITION_PATH);
assertThat(partitionInfo.get().getCreateTime())
.isEqualTo(LocalDateTime.ofInstant(Instant.ofEpochSecond(1234567890), ZoneId.systemDefault()));
}

@Test
public void getSinglePartitionInfoWithMissingCreateTime() throws TException {
when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenReturn(partition);
when(partition.getSd()).thenReturn(storageDescriptor);
when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH);
when(partition.getCreateTime()).thenReturn(0);

LocalDateTime beforeTest = LocalDateTime.now();

Optional<PartitionInfo> partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME);

LocalDateTime afterTest = LocalDateTime.now();

assertThat(partitionInfo).isPresent();
assertThat(partitionInfo.get().getPath()).isEqualTo(PARTITION_PATH);
LocalDateTime createTime = partitionInfo.get().getCreateTime();
assertThat(createTime).isNotNull();
assertThat(createTime).isAfterOrEqualTo(beforeTest);
assertThat(createTime).isBeforeOrEqualTo(afterTest);
}

@Test
public void getSinglePartitionInfoNotFound() throws TException {
when(metaStoreClient.getPartition(DATABASE_NAME, TABLE_NAME, List.of("2024-01-01", "1"))).thenThrow(TException.class);

Optional<PartitionInfo> partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, PARTITION_NAME);

assertThat(partitionInfo).isEmpty();
}

@Test
public void getSinglePartitionInfoWithNullPartitionName() {
Optional<PartitionInfo> partitionInfo = hiveClient.getSinglePartitionInfo(DATABASE_NAME, TABLE_NAME, null);

assertThat(partitionInfo).isEmpty();
}
}