diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index 5ffe3dd8187..231f65bb1ac 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -200,6 +200,10 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrCacheDrainSizeMb() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB].as(); } +sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolSizeMb() { + return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE_MB].as(); +} + sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as(); } diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index c19ea9d702a..4102cf59e81 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -152,6 +152,9 @@ class HeronInternalsConfigReader : public YamlFileReader { // The sized based threshold in MB for draining the tuple cache sp_int32 GetHeronStreammgrCacheDrainSizeMb(); + // The max size of the memory pool for all types of messages + sp_int32 GetHeronStreammgrMempoolSizeMb(); + // Get the Nbucket value, for efficient acknowledgement sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index fd2ce84cc19..385f4d71be2 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_FREQUENCY_ "heron.streammgr.cache.drain.frequency.ms"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB = "heron.streammgr.cache.drain.size.mb"; +const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE_MB = + "heron.streammgr.mempool.size.mb"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS = "heron.streammgr.xormgr.rotatingmap.nbuckets"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_INTERVAL_SEC = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index 8d4ab540cef..6de1eb9eeaa 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,6 +140,9 @@ class HeronInternalsConfigVars { // The sized based threshold in MB for draining the tuple cache static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB; + // The max size of the memory pool for all types of messages + static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE_MB; + // For efficient acknowledgement static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS; diff --git a/heron/common/src/cpp/network/BUILD b/heron/common/src/cpp/network/BUILD index 222da5d2330..493d37f5475 100644 --- a/heron/common/src/cpp/network/BUILD +++ b/heron/common/src/cpp/network/BUILD @@ -19,6 +19,7 @@ cc_library( "networkoptions.cpp", "packet.cpp", "server.cpp", + "mempool.cpp", "regevent.h", "asyncdns.h", diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h index 7c640f2b259..9cd22885c2e 100644 --- a/heron/common/src/cpp/network/client.h +++ b/heron/common/src/cpp/network/client.h @@ -174,9 +174,6 @@ class Client : public BaseClient { // Return the underlying EventLoop. EventLoop* getEventLoop() { return eventLoop_; } - // TODO(mfu): - MemPool _heron_message_pool; - template void release(M* m) { _heron_message_pool.release(m); @@ -214,6 +211,8 @@ class Client : public BaseClient { virtual void StopBackPressureConnectionCb(Connection* _connection); private: + MemPool _heron_message_pool; + //! Imlement methods of BaseClient virtual BaseConnection* CreateConnection(ConnectionEndPoint* endpoint, ConnectionOptions* options, EventLoop* eventLoop); diff --git a/heron/common/src/cpp/network/mempool.cpp b/heron/common/src/cpp/network/mempool.cpp new file mode 100644 index 00000000000..d8e2718fcc9 --- /dev/null +++ b/heron/common/src/cpp/network/mempool.cpp @@ -0,0 +1,26 @@ +/* + * Copyright 2016 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "network/mempool.h" +#include + +// All types of BaseMemPool should be derived from google::protobuf::Message +// otherwise this would not work. +template<> +sp_int32 BaseMemPool::size_ = 0; + +template<> +sp_int32 BaseMemPool::limit_ = 0; diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index a8467dfcbe1..77b166d4857 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -16,76 +16,89 @@ #ifndef MEM_POOL_H #define MEM_POOL_H -#include -#include +#include #include +#include +#include "basics/sptypes.h" template class BaseMemPool { public: - template - T* acquire(Args&&... args) { + static void set_limit(sp_int32 limit) { + if (limit_ == 0) + limit_ = limit; + } + + BaseMemPool() { + } + + ~BaseMemPool() { + for (auto& p : pool_) { + delete p; + } + pool_.clear(); + } + + template + S* acquire(S* unused) { if (pool_.empty()) { - return new T(std::forward(args)...); + return new S(); } - T* t = pool_.back(); + S* t = static_cast(pool_.back()); pool_.pop_back(); + size_ -= sizeof(S); return t; } - void release(T* t) { + + template + void release(S* t) { + if (limit_ == 0) { + delete t; + return; + } + if (size_ > limit_) { + auto first = pool_.front(); + pool_.pop_front(); + size_ -= sizeof(S); + delete first; + } pool_.push_back(t); + size_ += sizeof(S); } - BaseMemPool() { - } - ~BaseMemPool() { - for (auto& p : pool_) { - delete p; - } - pool_.clear(); - } + private: - std::vector pool_; + static sp_int32 limit_; + static sp_int32 size_; + std::deque pool_; }; -template +template class MemPool { public: MemPool() { } - // TODO(cwang): we have a memory leak here. ~MemPool() { - for (auto& m : map_) { - for (auto& n : m.second) { - delete n; - } - m.second.clear(); - } map_.clear(); } - template - M* acquire(M* m) { - std::type_index type = typeid(M); - std::vector& pool = map_[type]; - - if (pool.empty()) { - return new M(); - } - B* t = pool.back(); - pool.pop_back(); - return static_cast(t); + template + KeyType* acquire(KeyType* m) { + std::type_index type = typeid(KeyType); + auto& pool = map_[type]; + return pool.acquire(m); } - template - void release(M* ptr) { - std::type_index type = typeid(M); - map_[type].push_back(static_cast(ptr)); + template + void release(KeyType* ptr) { + std::type_index type = typeid(KeyType); + auto& pool = map_[type]; + pool.release(ptr); } private: - std::unordered_map> map_; + std::unordered_map> map_; }; #endif diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h index ed78e03fe1d..bbf90f45f3f 100644 --- a/heron/common/src/cpp/network/server.h +++ b/heron/common/src/cpp/network/server.h @@ -208,8 +208,6 @@ class Server : public BaseServer { // Called when the connection is closed virtual void HandleConnectionClose_Base(BaseConnection* connection, NetworkErrorCode _status); - MemPool _heron_message_pool; - template void release(M* m) { _heron_message_pool.release(m); @@ -221,6 +219,8 @@ class Server : public BaseServer { } private: + MemPool _heron_message_pool; + // When a new packet arrives on the connection, this is invoked by the Connection void OnNewPacket(Connection* connection, IncomingPacket* packet); diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index ab44597c87f..9281ac43855 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index b19ba51fbb1..b3383dd63bd 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index b0f449b5aae..cbe24af4390 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -49,6 +49,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgement heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in second for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index 6abf8f539d0..57f11589a07 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# The max size of the memory pool for all types of messages +heron.streammgr.mempool.size.mb: 100 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index fced037edd0..53d9f933573 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -26,6 +26,7 @@ #include "network/network.h" #include "config/helper.h" #include "metrics/metrics.h" +#include "config/heron-internals-config-reader.h" namespace heron { namespace stmgr { diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp index 79404b15802..e6e6272c556 100644 --- a/heron/stmgr/src/cpp/server/stmgr-main.cpp +++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp @@ -70,6 +70,10 @@ int main(int argc, char* argv[]) { LOG(FATAL) << "Corrupt topology defn file" << std::endl; } + sp_int32 pool_limit = + heron::config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); + BaseMemPool::set_limit(pool_limit * 1024 * 1024); + heron::stmgr::StMgr mgr(&ss, myport, topology_name, topology_id, topology, myid, instances, zkhostportlist, topdir, metricsmgr_port, shell_port); mgr.Init(); diff --git a/heron/stmgr/src/cpp/util/tuple-cache.h b/heron/stmgr/src/cpp/util/tuple-cache.h index adf7e47bbf0..81a70fc08a1 100644 --- a/heron/stmgr/src/cpp/util/tuple-cache.h +++ b/heron/stmgr/src/cpp/util/tuple-cache.h @@ -74,7 +74,8 @@ class TupleCache { std::function _drainer); proto::system::HeronTupleSet2* acquire() { - return heron_tuple_set_pool_.acquire(); + proto::system::HeronTupleSet2* unused = nullptr; + return heron_tuple_set_pool_.acquire(unused); } proto::system::HeronTupleSet2* acquire_clean_set() { @@ -88,7 +89,7 @@ class TupleCache { } private: - BaseMemPool heron_tuple_set_pool_; + BaseMemPool heron_tuple_set_pool_; std::deque tuples_; proto::system::HeronTupleSet2* current_; sp_uint64 current_size_;