Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ namespace DB
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \
M(tiflash_coprocessor_response_bytes, "Total bytes of response body", Counter) \
M(tiflash_kvstore_lock_cf_size, "Total size of lock cf value in kvstore", Gauge) \
M(tiflash_kvstore_default_and_write_cf_size, "Total size of default cf and write cf value in kvstore", Gauge) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/GeoUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#endif

#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Storages/Transaction/BackgroundService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
#include <Common/TiFlashMetrics.h>

namespace DB
{
Expand All @@ -36,6 +37,23 @@ BackgroundService::BackgroundService(TMTContext & tmt_)
},
false);

kvstore_metric_handle = background_pool.addTask(
[this] {
size_t lock_cf_size = 0;
size_t default_and_write_cf_size = 0;
size_t default_cf_num = 0;
tmt.getKVStore()->traverseRegions([&lock_cf_size, &default_and_write_cf_size, &default_cf_num](RegionID, const RegionPtr & region) {
lock_cf_size += region->lockInfoSize();
default_and_write_cf_size += region->dataSize();
default_cf_num += region->defaultCFCount();
});
GET_METRIC(tiflash_kvstore_lock_cf_size).Set(lock_cf_size);
GET_METRIC(tiflash_kvstore_default_and_write_cf_size).Set(default_and_write_cf_size);
LOG_FMT_INFO(log, "default cf num {}.", default_cf_num);
return false;
},
false);

if (!tmt.isBgFlushDisabled())
{
table_flush_handle = background_pool.addTask([this] {
Expand Down Expand Up @@ -120,6 +138,11 @@ BackgroundService::~BackgroundService()
background_pool.removeTask(storage_gc_handle);
storage_gc_handle = nullptr;
}
if (kvstore_metric_handle)
{
background_pool.removeTask(kvstore_metric_handle);
kvstore_metric_handle = nullptr;
}
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/BackgroundService.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class BackgroundService : boost::noncopyable
BackgroundProcessingPool::TaskHandle table_flush_handle;
BackgroundProcessingPool::TaskHandle region_handle;
BackgroundProcessingPool::TaskHandle storage_gc_handle;
BackgroundProcessingPool::TaskHandle kvstore_metric_handle;
};

} // namespace DB
11 changes: 11 additions & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,23 @@ size_t Region::dataSize() const
return data.dataSize();
}

size_t Region::lockInfoSize() const
{
return data.lockCFDataSize();
}

size_t Region::writeCFCount() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
return data.writeCF().getSize();
}

size_t Region::defaultCFCount() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
return data.defaultCF().getSize();
}

std::string Region::dataInfo() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class Region : public std::enable_shared_from_this<Region>
void setStateApplying();

size_t dataSize() const;
size_t lockInfoSize() const;
size_t writeCFCount() const;
size_t defaultCFCount() const;
std::string dataInfo() const;

void markCompactLog() const;
Expand Down
21 changes: 11 additions & 10 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
{
Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value));
// according to the process of pessimistic lock, just overwrite.
data.insert_or_assign(std::move(kv_pair.first), std::move(kv_pair.second));
return 0;
auto [it, ok] = data.insert_or_assign(std::move(kv_pair.first), std::move(kv_pair.second));
if (ok)
{
return calcTiKVKeyValueSize(it->second);
}
else
{
// TODO: calculate the updated size?
return 0;
}
}

template <typename Trait>
Expand All @@ -79,14 +87,7 @@ size_t RegionCFDataBase<Trait>::calcTiKVKeyValueSize(const Value & value)
template <typename Trait>
size_t RegionCFDataBase<Trait>::calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value)
{
if constexpr (std::is_same<Trait, RegionLockCFDataTrait>::value)
{
std::ignore = key;
std::ignore = value;
return 0;
}
else
return key.dataSize() + value.dataSize();
return key.dataSize() + value.dataSize();
}


Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <map>

#include "absl/container/flat_hash_map.h"

namespace DB
{

Expand Down Expand Up @@ -95,7 +97,7 @@ struct RegionLockCFDataTrait
auto key = std::make_shared<const TiKVKey>(std::move(key_));
auto value = std::make_shared<const TiKVValue>(std::move(value_));
return {{key, std::string_view(key->data(), key->dataSize())},
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};t
}
};

Expand Down
28 changes: 22 additions & 6 deletions dbms/src/Storages/Transaction/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value)
}
case ColumnFamilyType::Lock:
{
lock_cf.insert(std::move(key), std::move(value));
lock_cf_data_size += lock_cf.insert(std::move(key), std::move(value));
return;
}
}
Expand Down Expand Up @@ -78,7 +78,8 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key)
}
case ColumnFamilyType::Lock:
{
lock_cf.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(key.data(), key.dataSize())}, true);
lock_cf_data_size -= lock_cf.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(key.data(), key.dataSize())}, true);
lock_cf_data_size = std::max(lock_cf_data_size, 0);
return;
}
}
Expand Down Expand Up @@ -164,34 +165,45 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query)
void RegionData::splitInto(const RegionRange & range, RegionData & new_region_data)
{
size_t size_changed = 0;
size_t lock_cf_size_changed = 0;
size_changed += default_cf.splitInto(range, new_region_data.default_cf);
size_changed += write_cf.splitInto(range, new_region_data.write_cf);
size_changed += lock_cf.splitInto(range, new_region_data.lock_cf);
lock_cf_size_changed += lock_cf.splitInto(range, new_region_data.lock_cf);
cf_data_size -= size_changed;
lock_cf_data_size -= lock_cf_size_changed;
new_region_data.cf_data_size += size_changed;
new_region_data.lock_cf_data_size += lock_cf_size_changed;
}

void RegionData::mergeFrom(const RegionData & ori_region_data)
{
size_t size_changed = 0;
size_t lock_cf_size_changed = 0;
size_changed += default_cf.mergeFrom(ori_region_data.default_cf);
size_changed += write_cf.mergeFrom(ori_region_data.write_cf);
size_changed += lock_cf.mergeFrom(ori_region_data.lock_cf);
lock_cf_size_changed += lock_cf.mergeFrom(ori_region_data.lock_cf);
cf_data_size += size_changed;
lock_cf_data_size += lock_cf_size_changed;
}

size_t RegionData::dataSize() const
{
return cf_data_size;
}

size_t RegionData::lockCFDataSize() const
{
return lock_cf_data_size;
}

void RegionData::assignRegionData(RegionData && new_region_data)
{
default_cf = std::move(new_region_data.default_cf);
write_cf = std::move(new_region_data.write_cf);
lock_cf = std::move(new_region_data.lock_cf);

cf_data_size = new_region_data.cf_data_size.load();
lock_cf_data_size = new_region_data.cf_data_size.load();
}

size_t RegionData::serialize(WriteBuffer & buf) const
Expand All @@ -208,11 +220,13 @@ size_t RegionData::serialize(WriteBuffer & buf) const
void RegionData::deserialize(ReadBuffer & buf, RegionData & region_data)
{
size_t total_size = 0;
size_t lock_cf_total_size = 0;
total_size += RegionDefaultCFData::deserialize(buf, region_data.default_cf);
total_size += RegionWriteCFData::deserialize(buf, region_data.write_cf);
total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf);
lock_cf_total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf);

region_data.cf_data_size += total_size;
region_data.lock_cf_data_size += lock_cf_total_size;
}

RegionWriteCFData & RegionData::writeCF()
Expand All @@ -239,14 +253,15 @@ const RegionLockCFData & RegionData::lockCF() const

bool RegionData::isEqual(const RegionData & r2) const
{
return default_cf == r2.default_cf && write_cf == r2.write_cf && lock_cf == r2.lock_cf && cf_data_size == r2.cf_data_size;
return default_cf == r2.default_cf && write_cf == r2.write_cf && lock_cf == r2.lock_cf && cf_data_size == r2.cf_data_size && lock_cf_data_size == r2.lock_cf_data_size;
}

RegionData::RegionData(RegionData && data)
: write_cf(std::move(data.write_cf))
, default_cf(std::move(data.default_cf))
, lock_cf(std::move(data.lock_cf))
, cf_data_size(data.cf_data_size.load())
, lock_cf_data_size(data.lock_cf_data_size.load())
{}

RegionData & RegionData::operator=(RegionData && rhs)
Expand All @@ -255,6 +270,7 @@ RegionData & RegionData::operator=(RegionData && rhs)
default_cf = std::move(rhs.default_cf);
lock_cf = std::move(rhs.lock_cf);
cf_data_size = rhs.cf_data_size.load();
lock_cf_data_size = rhs.lock_cf_data_size.load();
return *this;
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class RegionData

size_t dataSize() const;

size_t lockCFDataSize() const;

void assignRegionData(RegionData && new_region_data);

size_t serialize(WriteBuffer & buf) const;
Expand Down Expand Up @@ -83,6 +85,8 @@ class RegionData

// Size of data cf & write cf, without lock cf.
std::atomic<size_t> cf_data_size = 0;
// Size of lock cf.
std::atomic<size_t> lock_cf_data_size = 0;
};

} // namespace DB
Loading