Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.TickerType;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

Expand Down Expand Up @@ -75,6 +76,7 @@ public class RocksDBDAO {
private final String rocksDBBasePath;
private final transient ConcurrentHashMap<String, CustomSerializer<?>> columnFamilySerializers;
private transient WriteOptions defaultWriteOptions;
private transient Statistics statistics;
private final boolean disableWALForWrites;
@Getter
private long totalBytesWritten;
Expand Down Expand Up @@ -113,9 +115,10 @@ private void init() {
managedDescriptorMap = new ConcurrentHashMap<>();

// If already present, loads the existing column-family handles

final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
.setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics());
.setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300);
this.statistics = new Statistics();
dbOptions.setStatistics(statistics);
dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
@Override
protected void log(InfoLogLevel infoLogLevel, String logMsg) {
Expand Down Expand Up @@ -465,6 +468,20 @@ public void addColumnFamily(String columnFamilyName) {
});
}

/**
* Retrieves a numeric property aggregated across all column families.
*/
public synchronized long getLongProperty(String property) throws RocksDBException {
return closed ? 0L : getRocksDB().getAggregatedLongProperty(property);
}

/**
* Retrieves the current ticker count.
*/
public synchronized long getTickerCount(TickerType tickerType) {
return closed || statistics == null ? 0L : statistics.getTickerCount(tickerType);
}

/**
* Note : Does not delete from underlying DB. Just closes the handle.
*
Expand Down Expand Up @@ -499,6 +516,10 @@ public synchronized void close() {
defaultWriteOptions.close();
}
getRocksDB().close();
if (statistics != null) {
statistics.close();
statistics = null;
}
try {
FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.metrics;

import org.apache.hudi.sink.partitioner.index.RocksDBIndexBackend;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.rocksdb.TickerType;

/**
* Metrics for RocksDB-backed index bootstrap state in flink bucket assign.
*/
public class FlinkRocksDBIndexMetrics extends HoodieFlinkMetrics {
private static final String TOTAL_SST_FILES_SIZE_PROPERTY = "rocksdb.total-sst-files-size";
private static final String LIVE_SST_FILES_SIZE_PROPERTY = "rocksdb.live-sst-files-size";
private static final String BLOCK_CACHE_CAPACITY_PROPERTY = "rocksdb.block-cache-capacity";
private static final String BLOCK_CACHE_USAGE_PROPERTY = "rocksdb.block-cache-usage";
private static final String ACTIVE_MEMTABLE_SIZE_PROPERTY = "rocksdb.cur-size-active-mem-table";
private static final String ALL_MEMTABLES_SIZE_PROPERTY = "rocksdb.cur-size-all-mem-tables";
private static final String IMMUTABLE_MEMTABLE_COUNT_PROPERTY = "rocksdb.num-immutable-mem-table";

public static final String ROCKSDB_DISK_TOTAL_SST_FILES_SIZE = "rocksdb.disk.total_sst_files_size";
public static final String ROCKSDB_DISK_LIVE_SST_FILES_SIZE = "rocksdb.disk.live_sst_files_size";
public static final String ROCKSDB_BLOCK_CACHE_CAPACITY = "rocksdb.block_cache.capacity";
public static final String ROCKSDB_BLOCK_CACHE_USAGE = "rocksdb.block_cache.usage";
public static final String ROCKSDB_BLOCK_CACHE_HIT_RATIO = "rocksdb.block_cache.hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO = "rocksdb.block_cache.data_hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO = "rocksdb.block_cache.index_hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO = "rocksdb.block_cache.filter_hit_ratio";

public static final String ROCKSDB_MEMTABLE_ACTIVE_SIZE = "rocksdb.memtable.active_size";
public static final String ROCKSDB_MEMTABLE_ALL_SIZE = "rocksdb.memtable.all_size";
public static final String ROCKSDB_MEMTABLE_IMMUTABLE_COUNT = "rocksdb.memtable.immutable_count";
public static final String ROCKSDB_MEMTABLE_HIT_RATIO = "rocksdb.memtable.hit_ratio";

private final RocksDBIndexBackend rocksDBIndexBackend;

public FlinkRocksDBIndexMetrics(MetricGroup metricGroup, RocksDBIndexBackend rocksDBIndexBackend) {
super(metricGroup);
this.rocksDBIndexBackend = rocksDBIndexBackend;
}

@Override
public void registerMetrics() {
// disk metric
metricGroup.gauge(ROCKSDB_DISK_TOTAL_SST_FILES_SIZE, (Gauge<Long>) this::getTotalSstFilesSize);
metricGroup.gauge(ROCKSDB_DISK_LIVE_SST_FILES_SIZE, (Gauge<Long>) this::getLiveSstFilesSize);

// block cache metric
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_CAPACITY, (Gauge<Long>) this::getBlockCacheCapacity);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_USAGE, (Gauge<Long>) this::getBlockCacheUsage);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_HIT_RATIO, (Gauge<Double>) this::getBlockCacheHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO, (Gauge<Double>) this::getBlockCacheDataHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO, (Gauge<Double>) this::getBlockCacheIndexHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO, (Gauge<Double>) this::getBlockCacheFilterHitRatio);

// mem-table metric
metricGroup.gauge(ROCKSDB_MEMTABLE_ACTIVE_SIZE, (Gauge<Long>) this::getActiveMemTableSize);
metricGroup.gauge(ROCKSDB_MEMTABLE_ALL_SIZE, (Gauge<Long>) this::getAllMemTablesSize);
metricGroup.gauge(ROCKSDB_MEMTABLE_IMMUTABLE_COUNT, (Gauge<Long>) this::getImmutableMemTableCount);
metricGroup.gauge(ROCKSDB_MEMTABLE_HIT_RATIO, (Gauge<Double>) this::getMemTableHitRatio);
}

private long getTotalSstFilesSize() {
return rocksDBIndexBackend.getLongMetric(TOTAL_SST_FILES_SIZE_PROPERTY);
}

private long getLiveSstFilesSize() {
return rocksDBIndexBackend.getLongMetric(LIVE_SST_FILES_SIZE_PROPERTY);
}

private long getBlockCacheCapacity() {
return rocksDBIndexBackend.getLongMetric(BLOCK_CACHE_CAPACITY_PROPERTY);
}

private long getBlockCacheUsage() {
return rocksDBIndexBackend.getLongMetric(BLOCK_CACHE_USAGE_PROPERTY);
}

private double getBlockCacheHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_MISS);
}

private double getBlockCacheDataHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_DATA_HIT, TickerType.BLOCK_CACHE_DATA_MISS);
}

private double getBlockCacheIndexHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_INDEX_HIT, TickerType.BLOCK_CACHE_INDEX_MISS);
}

private double getBlockCacheFilterHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_FILTER_HIT, TickerType.BLOCK_CACHE_FILTER_MISS);
}

private long getActiveMemTableSize() {
return rocksDBIndexBackend.getLongMetric(ACTIVE_MEMTABLE_SIZE_PROPERTY);
}

private long getAllMemTablesSize() {
return rocksDBIndexBackend.getLongMetric(ALL_MEMTABLES_SIZE_PROPERTY);
}

private long getImmutableMemTableCount() {
return rocksDBIndexBackend.getLongMetric(IMMUTABLE_MEMTABLE_COUNT_PROPERTY);
}

private double getMemTableHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.MEMTABLE_HIT, TickerType.MEMTABLE_MISS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void open(Configuration parameters) throws Exception {
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.indexBackend = IndexBackendFactory.create(conf, context, getRuntimeContext());
this.indexBackend.registerMetrics(getRuntimeContext().getMetricGroup());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.sink.event.Correspondent;

import org.apache.flink.metrics.MetricGroup;

import java.io.Closeable;
import java.io.IOException;

Expand Down Expand Up @@ -64,4 +66,13 @@ default void onCheckpoint(long checkpointId) {
default void onCheckpointComplete(Correspondent correspondent, long completedCheckpointId) {
// do nothing.
}

/**
* Registers metrics for this backend.
*
* @param metricGroup flink metric group
*/
default void registerMetrics(MetricGroup metricGroup) {
// do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.util.collection.RocksDBDAO;
import org.apache.hudi.metrics.FlinkRocksDBIndexMetrics;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;
import org.rocksdb.RocksDBException;
import org.rocksdb.TickerType;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
* An implementation of {@link IndexBackend} based on RocksDB.
*/
@Slf4j
public class RocksDBIndexBackend implements IndexBackend {
private static final String COLUMN_FAMILY = "index_cache";

private final RocksDBDAO rocksDBDAO;
private transient FlinkRocksDBIndexMetrics rocksDBIndexMetrics;

public RocksDBIndexBackend(String rocksDbBasePath) {
// Register custom serializer for HoodieRecordGlobalLocation to minimize storage overhead
Expand All @@ -52,6 +60,36 @@ public void update(String recordKey, HoodieRecordGlobalLocation recordGlobalLoca
this.rocksDBDAO.put(COLUMN_FAMILY, recordKey, recordGlobalLocation);
}

@Override
public void registerMetrics(MetricGroup metricGroup) {
if (rocksDBIndexMetrics != null) {
return;
}
this.rocksDBIndexMetrics = new FlinkRocksDBIndexMetrics(metricGroup, this);
this.rocksDBIndexMetrics.registerMetrics();
}

public long getLongMetric(String property) {
try {
return this.rocksDBDAO.getLongProperty(property);
} catch (RocksDBException | RuntimeException e) {
log.debug("Failed to read RocksDB metric property {}", property, e);
return 0L;
}
}

public double getRatioMetric(TickerType hitTicker, TickerType missTicker) {
try {
long hits = this.rocksDBDAO.getTickerCount(hitTicker);
long misses = this.rocksDBDAO.getTickerCount(missTicker);
long total = hits + misses;
return total == 0 ? 0D : (double) hits / total;
} catch (RuntimeException e) {
log.debug("Failed to read RocksDB ticker metrics {} and {}", hitTicker, missTicker, e);
return 0D;
}
}

@Override
public void close() throws IOException {
this.rocksDBDAO.close();
Expand Down
Loading