Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/src/AggregateFunctions/AggregateFunctionGroupArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ struct GroupArrayListNodeBase
UInt64 size;
readVarUInt(size, buf);

Node * node = reinterpret_cast<Node *>(arena->alloc(sizeof(Node) + size));
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + size, alignof(Node)));
node->size = size;
buf.read(node->data(), size);
return node;
Expand All @@ -198,7 +198,7 @@ struct GroupArrayListNodeString : public GroupArrayListNodeBase<GroupArrayListNo
{
StringRef string = static_cast<const ColumnString &>(column).getDataAt(row_num);

Node * node = reinterpret_cast<Node *>(arena->alloc(sizeof(Node) + string.size));
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + string.size, alignof(Node)));
node->next = nullptr;
node->size = string.size;
memcpy(node->data(), string.data, string.size);
Expand All @@ -215,7 +215,7 @@ struct GroupArrayListNodeGeneral : public GroupArrayListNodeBase<GroupArrayListN

static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
{
const char * begin = arena->alloc(sizeof(Node));
const char * begin = arena->alignedAlloc(sizeof(Node), alignof(Node));
StringRef value = column.serializeValueIntoArena(row_num, *arena, begin);

Node * node = reinterpret_cast<Node *>(const_cast<char *>(begin));
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void ColumnAggregateFunction::insert(const Field & x)

Arena & arena = createOrGetArena();

getData().push_back(arena.alloc(function->sizeOfData()));
getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());
ReadBufferFromString read_buffer(x.get<const String &>());
function->deserialize(getData().back(), read_buffer, &arena);
Expand All @@ -309,7 +309,7 @@ void ColumnAggregateFunction::insertDefault()

Arena & arena = createOrGetArena();

getData().push_back(arena.alloc(function->sizeOfData()));
getData().push_back(arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());
}

Expand Down Expand Up @@ -337,7 +337,7 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(
*/
Arena & dst_arena = createOrGetArena();

getData().push_back(dst_arena.alloc(function->sizeOfData()));
getData().push_back(dst_arena.alignedAlloc(function->sizeOfData(), function->alignOfData()));
function->create(getData().back());

/** We will read from src_arena.
Expand Down
19 changes: 9 additions & 10 deletions dbms/src/DataTypes/DataTypeAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std::string DataTypeAggregateFunction::getName() const

void DataTypeAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const String & s = get<const String &>(field);
const auto & s = get<const String &>(field);
writeVarUInt(s.size(), ostr);
writeString(s, ostr);
}
Expand All @@ -73,7 +73,7 @@ void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & is
UInt64 size;
readVarUInt(size, istr);
field = String();
String & s = get<String &>(field);
auto & s = get<String &>(field);
s.resize(size);
istr.readStrict(&s[0], size);
}
Expand All @@ -85,11 +85,11 @@ void DataTypeAggregateFunction::serializeBinary(const IColumn & column, size_t r

void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr) const
{
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
auto & column_concrete = static_cast<ColumnAggregateFunction &>(column);

Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());

function->create(place);
try
Expand Down Expand Up @@ -143,8 +143,7 @@ void DataTypeAggregateFunction::deserializeBinaryBulk(
{
if (istr.eof())
break;

AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());

function->create(place);

Expand All @@ -171,11 +170,11 @@ static String serializeToString(const AggregateFunctionPtr & function, const ICo

static void deserializeFromString(const AggregateFunctionPtr & function, IColumn & column, const String & s)
{
ColumnAggregateFunction & column_concrete = static_cast<ColumnAggregateFunction &>(column);
auto & column_concrete = static_cast<ColumnAggregateFunction &>(column);

Arena & arena = column_concrete.createOrGetArena();
size_t size_of_state = function->sizeOfData();
AggregateDataPtr place = arena.alloc(size_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, function->alignOfData());

function->create(place);

Expand Down Expand Up @@ -317,7 +316,7 @@ static DataTypePtr create(const ASTPtr & arguments)
"name of aggregate function and list of data types for arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

if (const ASTFunction * parametric = typeid_cast<const ASTFunction *>(arguments->children[0].get()))
if (const auto * parametric = typeid_cast<const ASTFunction *>(arguments->children[0].get()))
{
if (parametric->parameters)
throw Exception("Unexpected level of parameters to aggregate function", ErrorCodes::SYNTAX_ERROR);
Expand All @@ -328,7 +327,7 @@ static DataTypePtr create(const ASTPtr & arguments)

for (size_t i = 0; i < parameters.size(); ++i)
{
const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(parameters[i].get());
const auto * lit = typeid_cast<const ASTLiteral *>(parameters[i].get());
if (!lit)
throw Exception(
"Parameters to aggregate functions must be literals",
Expand Down
94 changes: 0 additions & 94 deletions dbms/src/Interpreters/AggregationCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,100 +235,6 @@ static inline UInt128 ALWAYS_INLINE hash128(
return key;
}


/// Copy keys to the pool. Then put into pool StringRefs to them and return the pointer to the first.
static inline StringRef * ALWAYS_INLINE placeKeysInPool(size_t keys_size, StringRefs & keys, Arena & pool)
{
for (size_t j = 0; j < keys_size; ++j)
{
char * place = pool.alloc(keys[j].size);
memcpy(place, keys[j].data, keys[j].size); /// TODO padding in Arena and memcpySmall
keys[j].data = place;
}

/// Place the StringRefs on the newly copied keys in the pool.
char * res = pool.alloc(keys_size * sizeof(StringRef));
memcpy(res, keys.data(), keys_size * sizeof(StringRef));

return reinterpret_cast<StringRef *>(res);
}

/*
/// Copy keys to the pool. Then put into pool StringRefs to them and return the pointer to the first.
static inline StringRef * ALWAYS_INLINE extractKeysAndPlaceInPool(
size_t i,
size_t keys_size,
const ColumnRawPtrs & key_columns,
StringRefs & keys,
Arena & pool)
{
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAtWithTerminatingZero(i);
char * place = pool.alloc(keys[j].size);
memcpy(place, keys[j].data, keys[j].size);
keys[j].data = place;
}

/// Place the StringRefs on the newly copied keys in the pool.
char * res = pool.alloc(keys_size * sizeof(StringRef));
memcpy(res, keys.data(), keys_size * sizeof(StringRef));

return reinterpret_cast<StringRef *>(res);
}


/// Copy the specified keys to a continuous memory chunk of a pool.
/// Subsequently append StringRef objects referring to each key.
///
/// [key1][key2]...[keyN][ref1][ref2]...[refN]
/// ^ ^ : | |
/// +-----|--------:-----+ |
/// : +--------:-----------+
/// : :
/// <-------------->
/// (1)
///
/// Return a StringRef object, referring to the area (1) of the memory
/// chunk that contains the keys. In other words, we ignore their StringRefs.
inline StringRef ALWAYS_INLINE extractKeysAndPlaceInPoolContiguous(
size_t i,
size_t keys_size,
const ColumnRawPtrs & key_columns,
StringRefs & keys,
const TiDB::TiDBCollators & collators,
std::vector<std::string> & sort_key_containers,
Arena & pool)
{
size_t sum_keys_size = 0;
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAtWithTerminatingZero(i);
if (!collators.empty() && collators[j] != nullptr)
{
// todo check if need to handle the terminating zero
keys[j] = collators[j]->sortKey(keys[j].data, keys[j].size - 1, sort_key_containers[j]);
}
sum_keys_size += keys[j].size;
}

char * res = pool.alloc(sum_keys_size + keys_size * sizeof(StringRef));
char * place = res;

for (size_t j = 0; j < keys_size; ++j)
{
memcpy(place, keys[j].data, keys[j].size);
keys[j].data = place;
place += keys[j].size;
}

/// Place the StringRefs on the newly copied keys in the pool.
memcpy(place, keys.data(), keys_size * sizeof(StringRef));

return {res, sum_keys_size};
}
*/

/** Serialize keys into a continuous chunk of memory.
*/
static inline StringRef ALWAYS_INLINE serializeKeysToPoolContiguous(
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/JoinPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void RowsNotInsertToMap::insertRow(Block * stored_block, size_t index, bool need
}
else
{
auto * elem = reinterpret_cast<RowRefList *>(pool.alloc(sizeof(RowRefList)));
auto * elem = reinterpret_cast<RowRefList *>(pool.alignedAlloc(sizeof(RowRefList), alignof(RowRefList)));
new (elem) RowRefList(stored_block, index);
insertRowToList(&head, elem);
}
Expand Down Expand Up @@ -496,7 +496,7 @@ struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
* We will insert each time the element into the second place.
* That is, the former second element, if it was, will be the third, and so on.
*/
auto elem = reinterpret_cast<MappedType *>(pool.alloc(sizeof(MappedType)));
auto elem = reinterpret_cast<MappedType *>(pool.alignedAlloc(sizeof(MappedType), alignof(MappedType)));
new (elem) typename Map::mapped_type(stored_block, i);
insertRowToList(&emplace_result.getMapped(), elem);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/SpecializedAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ void NO_INLINE Aggregator::executeSpecializedCase(

method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);

AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states);
AggregateDataPtr place
= aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);

AggregateFunctionsList::forEach(
AggregateFunctionsCreator(aggregate_functions, offsets_of_aggregate_states, place));
Expand Down