From 7046dfe16aa8938adad18eb4a7af3c445856d9ef Mon Sep 17 00:00:00 2001 From: Nik Date: Thu, 29 Aug 2019 13:22:48 +0300 Subject: [PATCH 01/16] make human face for connector --- connection.h | 11 +++--- connector.cpp | 46 ++++++++++++++++++++++ connector.h | 82 +++++++++++++++++++++++++++++++++++++++ mp_reader.h | 16 +++++++- mp_writer.h | 12 +++++- tntevloop.cpp | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++ tntevloop.h | 35 +++++++++++++++++ 7 files changed, 298 insertions(+), 8 deletions(-) create mode 100644 connector.cpp create mode 100644 connector.h create mode 100644 tntevloop.cpp create mode 100644 tntevloop.h diff --git a/connection.h b/connection.h index 6a90f09..9ed5647 100644 --- a/connection.h +++ b/connection.h @@ -9,7 +9,7 @@ #include #include "wtf_buffer.h" #include "unique_socket.h" -#include "fu2/function2.hpp" +#include "./third_party/fu2/function2.hpp" #include "cs_parser.h" /// Tarantool connector scope @@ -88,10 +88,6 @@ class connection fu2::unique_function _error_cb = nullptr; - void handle_error(std::string_view message = {}, - error internal_error = error::system, - uint32_t db_error = 0) noexcept; - // need to capture watcher, so movable functions preferred fu2::unique_function _socket_watcher_request_cb; fu2::unique_function _on_notify_request; @@ -100,6 +96,11 @@ class connection fu2::unique_function _connected_cb; fu2::unique_function _disconnected_cb; +protected: + void handle_error(std::string_view message = {}, + error internal_error = error::system, + uint32_t db_error = 0) noexcept; + public: connection(std::string_view connection_string = {}); ~connection(); diff --git a/connector.cpp b/connector.cpp new file mode 100644 index 0000000..6218ecc --- /dev/null +++ b/connector.cpp @@ -0,0 +1,46 @@ +#include "connector.h" + +#include "proto.h" + +namespace tnt +{ + + Connector::Connector() + : iproto_writer(*(connection*)this) + { + on_response(bind(&Connector::OnResponse, this, placeholders::_1)); + } + + void Connector::OnResponse(wtf_buffer &buf) + { + mp_reader bunch(buf); + while (mp_reader r = bunch.iproto_message()) + { + try + { + auto encoded_header = r.map(); + Header header; + encoded_header[tnt::header_field::SYNC] >> header.sync; + encoded_header[tnt::header_field::CODE] >> header.errCode; + auto handler = handlers_.find(header.sync); + if (handler == handlers_.end()) + handle_error("unexpected response"); + else + { + auto encoded_body = r.map(); + handler->second(header, encoded_body); + } + } + catch(const mp_reader_error &e) + { + handle_error(e.what()); + } + catch(const exception &e) + { + handle_error(e.what()); + } + } + input_processed(); + } + +} diff --git a/connector.h b/connector.h new file mode 100644 index 0000000..cb6d4b6 --- /dev/null +++ b/connector.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include + +#include "tntevloop.h" +#include "mp_writer.h" +#include "mp_reader.h" + +using namespace std; + +namespace tnt +{ + + class Stream + { + public: + + }; + + class Header + { + public: + uint64_t sync; + uint64_t errCode; + }; + + + //template + //void write(){}; + + class Connector : public TntEvLoop, public iproto_writer + { + public: + class FuncParamTuple + { + public: + template + FuncParamTuple(Args... args) + { + //std::make_tuple(args...); + } + + }; + + typedef fu2::unique_function OnFuncResult; + + Connector(); + + template + void Call(string_view name, OnFuncResult&& resultHundler, Args... args) + { + constexpr size_t countArgs = sizeof...(args); + begin_call(name); + begin_array(countArgs); + write(args...); + if (countArgs) + finalize(); + finalize(); + handlers_[last_request_id()] = move(resultHundler); + flush(); + } + + void write() + {} + + template + void write(T arg, Args... args) + { + *this< handlers_; + + void OnResponse(wtf_buffer &buf); + + }; + + +} diff --git a/mp_reader.h b/mp_reader.h index 1cdd62e..f142cce 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -4,7 +4,9 @@ #include #include #include -#include "msgpuck/msgpuck.h" +#include + +#include "./third_party/msgpuck/msgpuck.h" class wtf_buffer; class mp_map_reader; @@ -100,7 +102,7 @@ class mp_reader } else { - throw mp_reader_error("integer expected", *this); + throw mp_reader_error("integer expected but get " + std::to_string(int(mp_typeof(*_current_pos))) , *this); } throw mp_reader_error("value overflow", *this); } @@ -157,4 +159,14 @@ class mp_array_reader : public mp_reader size_t _cardinality; }; +template +mp_reader& operator>> (mp_reader& reader, std::vector& val) +{ + auto vector = reader.array(); + val.resize(vector.cardinality()); + for (size_t i = 0; i < vector.cardinality(); ++i) + vector >> val[i]; + return reader; +} + #endif // MP_READER_H diff --git a/mp_writer.h b/mp_writer.h index d9844c6..f22badf 100644 --- a/mp_writer.h +++ b/mp_writer.h @@ -3,7 +3,7 @@ #include #include -#include "msgpuck/msgpuck.h" +#include "./third_party/msgpuck/msgpuck.h" #include "wtf_buffer.h" namespace tnt { @@ -48,6 +48,16 @@ class mp_writer return *this; } + template + mp_writer& operator<< (const std::vector &val) noexcept + { + begin_array(val.size()); + for( auto& it : val ) + *this< && sizeof(T) < 16>> mp_writer& operator<< (const T &val) noexcept { diff --git a/tntevloop.cpp b/tntevloop.cpp new file mode 100644 index 0000000..0ef86cd --- /dev/null +++ b/tntevloop.cpp @@ -0,0 +1,104 @@ +#include "tntevloop.h" + +#include +#include + +using namespace std; + +namespace tnt +{ + + TntEvLoop::TntEvLoop() + { + on_opened(bind(&TntEvLoop::OnConnected, this)); + } + + void TntEvLoop::Attach(struct ev_loop* loop) + { + loop_ = loop; + + socketWatcher_.data = this; + ev_init(&socketWatcher_, OnSocketEvent_); + on_socket_watcher_request(bind(&TntEvLoop::OnSocketWatcherRequest, this, placeholders::_1)); + + ev_async_init(&asyncNotifier_, OnAsyncNotifier_); + asyncNotifier_.data = this; + ev_async_start(loop, &asyncNotifier_); + + ev_timer_init(&timer_, OnTimer_, 1, 1); + timer_.data = this; + ev_timer_start(loop, &timer_); + + on_notify_request(bind(ev_async_send, loop, &asyncNotifier_)); + + } + + void TntEvLoop::OnSocketWatcherRequest(int mode) + { + int events = (mode & tnt::socket_state::read ? EV_READ : EV_NONE); + events |= (mode & tnt::socket_state::write ? EV_WRITE : EV_NONE); + + if ((socketWatcher_.events & (EV_READ | EV_WRITE)) != events) + { + if (ev_is_active(&socketWatcher_)) + ev_io_stop(loop_, &socketWatcher_); + ev_io_set(&socketWatcher_, socket_handle(), events); + if (events) + ev_io_start(loop_, &socketWatcher_); + + if (mode & tnt::socket_state::read) + cout << "R"; + if (mode & tnt::socket_state::write) + cout << "W"; + if (!mode) + cout << "N"; + cout << endl; + } + + } + + void TntEvLoop::OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents) + { + ((TntEvLoop*)w->data)->OnSocketEvent(loop, w, revents); + } + + void TntEvLoop::OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents) + { + (void)loop;(void)w; + if (revents & EV_ERROR) + handle_error("EV_ERROR soket state received"); + + if (revents & EV_WRITE) + write(); + if (revents & EV_READ) + read(); + } + + void TntEvLoop::OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents) + { + ((TntEvLoop*)w->data)->OnAsyncNotifier(loop, w, revents); + } + + void TntEvLoop::OnAsyncNotifier(struct ev_loop* loop, ev_async* w, int revents) + { + (void)loop;(void)w;(void)revents; + acquire_notifications(); + } + + void TntEvLoop::OnConnected() + { + } + + void TntEvLoop::OnTimer_(struct ev_loop* loop, ev_timer* w, int revents) + { + ((TntEvLoop*)w->data)->OnTimer(loop, w, revents); + } + + void TntEvLoop::OnTimer(struct ev_loop*, ev_timer*, int) + { + tick_1sec(); + } + + + +} diff --git a/tntevloop.h b/tntevloop.h new file mode 100644 index 0000000..7069bb5 --- /dev/null +++ b/tntevloop.h @@ -0,0 +1,35 @@ +#pragma once + + +#include + +#include "connection.h" + +namespace tnt +{ + + class TntEvLoop : public connection + { + public: + TntEvLoop(); + void Attach(struct ev_loop* loop); + + protected: + struct ev_loop* loop_; + ev_io socketWatcher_; + ev_async asyncNotifier_; + ev_timer timer_; + + static void OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents); + void OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents); + static void OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents); + void OnAsyncNotifier(struct ev_loop* loop, ev_async* w, int revents); + static void OnTimer_(struct ev_loop* loop, ev_timer* w, int revents); + void OnTimer(struct ev_loop* loop, ev_timer* w, int revents); + + void OnSocketWatcherRequest(int mode); + void OnConnected(); + + }; + +} From 247bea7b522b6783e5ef8a06460531bd48189c2a Mon Sep 17 00:00:00 2001 From: Nik Date: Thu, 29 Aug 2019 14:10:36 +0300 Subject: [PATCH 02/16] degradation --- connection.h | 9 +++++---- mp_reader.h | 14 +++++++++++++- mp_writer.h | 10 ++++++++++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/connection.h b/connection.h index 6a90f09..25b2e5e 100644 --- a/connection.h +++ b/connection.h @@ -88,10 +88,6 @@ class connection fu2::unique_function _error_cb = nullptr; - void handle_error(std::string_view message = {}, - error internal_error = error::system, - uint32_t db_error = 0) noexcept; - // need to capture watcher, so movable functions preferred fu2::unique_function _socket_watcher_request_cb; fu2::unique_function _on_notify_request; @@ -100,6 +96,11 @@ class connection fu2::unique_function _connected_cb; fu2::unique_function _disconnected_cb; +protected: + void handle_error(std::string_view message = {}, + error internal_error = error::system, + uint32_t db_error = 0) noexcept; + public: connection(std::string_view connection_string = {}); ~connection(); diff --git a/mp_reader.h b/mp_reader.h index 1cdd62e..43446ac 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -4,6 +4,8 @@ #include #include #include +#include + #include "msgpuck/msgpuck.h" class wtf_buffer; @@ -100,7 +102,7 @@ class mp_reader } else { - throw mp_reader_error("integer expected", *this); + throw mp_reader_error("integer expected but get " + std::to_string(int(mp_typeof(*_current_pos))) , *this); } throw mp_reader_error("value overflow", *this); } @@ -157,4 +159,14 @@ class mp_array_reader : public mp_reader size_t _cardinality; }; +template +mp_reader& operator>> (mp_reader& reader, std::vector& val) +{ + auto vector = reader.array(); + val.resize(vector.cardinality()); + for (size_t i = 0; i < vector.cardinality(); ++i) + vector >> val[i]; + return reader; +} + #endif // MP_READER_H diff --git a/mp_writer.h b/mp_writer.h index d9844c6..0d89579 100644 --- a/mp_writer.h +++ b/mp_writer.h @@ -48,6 +48,16 @@ class mp_writer return *this; } + template + mp_writer& operator<< (const std::vector &val) noexcept + { + begin_array(val.size()); + for( auto& it : val ) + *this< && sizeof(T) < 16>> mp_writer& operator<< (const T &val) noexcept { From af96792fe8378a785be3957f3337aea25d9c39d5 Mon Sep 17 00:00:00 2001 From: Nik Date: Thu, 29 Aug 2019 14:30:01 +0300 Subject: [PATCH 03/16] degradation --- connection.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connection.h b/connection.h index 9ed5647..25b2e5e 100644 --- a/connection.h +++ b/connection.h @@ -9,7 +9,7 @@ #include #include "wtf_buffer.h" #include "unique_socket.h" -#include "./third_party/fu2/function2.hpp" +#include "fu2/function2.hpp" #include "cs_parser.h" /// Tarantool connector scope From 2e519045528a4d79447ee00348b9f3650416f3fe Mon Sep 17 00:00:00 2001 From: Nik Date: Tue, 3 Sep 2019 17:41:31 +0300 Subject: [PATCH 04/16] more improves --- connector.cpp | 29 ++++++++++++++++++++++++++++- connector.h | 33 ++++++++++++++++++++++++++------- mp_reader.cpp | 41 ++++++++++++++++++++++++++++++++++++----- mp_reader.h | 30 +++++++++++++++++++++++++++--- mp_writer.cpp | 4 ++++ mp_writer.h | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ tntevloop.cpp | 10 +--------- tntevloop.h | 8 +++++--- 8 files changed, 177 insertions(+), 28 deletions(-) diff --git a/connector.cpp b/connector.cpp index 6218ecc..91aa7fd 100644 --- a/connector.cpp +++ b/connector.cpp @@ -9,6 +9,8 @@ namespace tnt : iproto_writer(*(connection*)this) { on_response(bind(&Connector::OnResponse, this, placeholders::_1)); + on_closed(bind(&Connector::OnClosed, this)); + on_opened(bind(&Connector::OnOpened, this)); } void Connector::OnResponse(wtf_buffer &buf) @@ -22,13 +24,14 @@ namespace tnt Header header; encoded_header[tnt::header_field::SYNC] >> header.sync; encoded_header[tnt::header_field::CODE] >> header.errCode; + header.errCode &= 0x7fff; auto handler = handlers_.find(header.sync); if (handler == handlers_.end()) handle_error("unexpected response"); else { auto encoded_body = r.map(); - handler->second(header, encoded_body); + handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); } } catch(const mp_reader_error &e) @@ -43,4 +46,28 @@ namespace tnt input_processed(); } + void Connector::AddOnOpened(SimpleEventCallbak cb_) + { + onOpenedHandlers_.push_back(cb_); + } + + void Connector::AddOnClosed(SimpleEventCallbak cb_) + { + onClosedHandlers_.push_back(cb_); + } + + void Connector::OnOpened() + { + for (auto& it : onOpenedHandlers_) + it(); + } + + void Connector::OnClosed() + { + handlers_.clear(); + for (auto& it : onClosedHandlers_) + it(); + } + + } diff --git a/connector.h b/connector.h index cb6d4b6..526c6af 100644 --- a/connector.h +++ b/connector.h @@ -43,21 +43,26 @@ namespace tnt }; - typedef fu2::unique_function OnFuncResult; + typedef function OnFuncResult; + typedef function SimpleEventCallbak; Connector(); - template - void Call(string_view name, OnFuncResult&& resultHundler, Args... args) + template + void Call(string_view name, OnFuncResult&& resultHundler, UserData userData = (void*)nullptr, Args... args) { + static_assert (sizeof (HandlerData::userData_) >= sizeof (userData), "User data too big."); constexpr size_t countArgs = sizeof...(args); begin_call(name); begin_array(countArgs); write(args...); if (countArgs) finalize(); - finalize(); - handlers_[last_request_id()] = move(resultHundler); + finalize(); + HandlerData handler; + handler.handler_ = move(resultHundler); + *((decltype (userData)*)&handler.userData_) = move(userData); + handlers_[last_request_id()] = move(handler); flush(); } @@ -65,16 +70,30 @@ namespace tnt {} template - void write(T arg, Args... args) + void write(T& arg, Args... args) { *this< handlers_; + class HandlerData + { + public: + OnFuncResult handler_; + uint64_t userData_[2]; + }; + + map handlers_; + vector onOpenedHandlers_; + vector onClosedHandlers_; void OnResponse(wtf_buffer &buf); + void OnOpened(); + void OnClosed(); }; diff --git a/mp_reader.cpp b/mp_reader.cpp index a7fca0e..dd3ea9f 100644 --- a/mp_reader.cpp +++ b/mp_reader.cpp @@ -74,14 +74,14 @@ void mp_reader::skip(mp_type type, bool nullable) { auto actual_type = mp_typeof(*_current_pos); if (actual_type != type && (!nullable || actual_type != MP_NIL)) - throw mp_reader_error("unexpected field type", *this); + throw mp_reader_error(string("unexpected field type ") + ToString(actual_type) + " but required " + ToString(type), *this); skip(); } mp_map_reader mp_reader::map() { if (mp_typeof(*_current_pos) != MP_MAP) - throw mp_reader_error("map expected", *this); + throw mp_reader_error(string("map expected but get ") + ToString(mp_typeof(*_current_pos)), *this); auto head = _current_pos; if (mp_check(&_current_pos, _end)) @@ -94,7 +94,7 @@ mp_map_reader mp_reader::map() mp_array_reader mp_reader::array() { if (mp_typeof(*_current_pos) != MP_ARRAY) - throw mp_reader_error("array expected", *this); + throw mp_reader_error(string("array expected") + ToString(mp_typeof(*_current_pos)), *this); auto head = _current_pos; if (mp_check(&_current_pos, _end)) @@ -141,14 +141,14 @@ string_view mp_reader::to_string() return {}; // data() == nullptr } - throw mp_reader_error("string expected", *this); + throw mp_reader_error(string("string expected") + ToString(mp_typeof(*_current_pos)), *this); } mp_reader &mp_reader::operator>>(string &val) { string_view tmp = to_string(); if (!tmp.data()) - throw mp_reader_error("string expected", *this); + throw mp_reader_error("string expected but get nil", *this); val.assign(tmp.data(), tmp.size()); return *this; } @@ -240,3 +240,34 @@ size_t mp_array_reader::cardinality() const noexcept { return _cardinality; } + +const char* ToString(mp_type type) +{ + switch (type) + { + case MP_NIL: + return "nil"; + case MP_UINT: + return "uint"; + case MP_INT: + return "int"; + case MP_STR: + return "string"; + case MP_BIN: + return "binary"; + case MP_ARRAY: + return "array"; + case MP_MAP: + return "map"; + case MP_BOOL: + return "bool"; + case MP_FLOAT: + return "float"; + case MP_DOUBLE: + return "double"; + case MP_EXT: + return "ext"; + default: + return "unkown"; + } +} diff --git a/mp_reader.h b/mp_reader.h index 43446ac..5d70f69 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "msgpuck/msgpuck.h" @@ -15,6 +17,8 @@ class mp_reader; std::string hex_dump(const char *begin, const char *end, const char *pos = nullptr); +const char* ToString(mp_type type); + /// messagepack parsing error class mp_reader_error : public std::runtime_error { @@ -108,6 +112,11 @@ class mp_reader } } + template + mp_reader& operator>> (std::map& val); + template + mp_reader& operator>> (std::vector& val); + protected: const char *_begin, *_end, *_current_pos; }; @@ -160,13 +169,28 @@ class mp_array_reader : public mp_reader }; template -mp_reader& operator>> (mp_reader& reader, std::vector& val) +mp_reader& mp_reader::operator>> (std::vector& val) { - auto vector = reader.array(); + auto vector = array(); val.resize(vector.cardinality()); for (size_t i = 0; i < vector.cardinality(); ++i) vector >> val[i]; - return reader; + return *this; } +template +mp_reader& mp_reader::operator>> (std::map& val) +{ + auto map = this->map(); + for (size_t i = 0; i < map.cardinality(); ++i) + { + K k; + V v; + map >> k >> v; + val[k] = std::move(v); + } + return *this; +} + + #endif // MP_READER_H diff --git a/mp_writer.cpp b/mp_writer.cpp index 8d6badb..543ab7f 100644 --- a/mp_writer.cpp +++ b/mp_writer.cpp @@ -60,6 +60,9 @@ void mp_writer::finalize() auto &c = _opened_containers.pop(); char *head = _buf.data() + c.head_offset; + if (!_opened_containers.empty()) + ++_opened_containers.top().items_count; + if (static_cast(*head) == 0xce) // request head { size_t size = static_cast(_buf.end - head); @@ -132,6 +135,7 @@ void mp_writer::finalize() default: throw runtime_error("previously not implemented container cardinality"); } + } mp_writer& mp_writer::operator<<(const string_view &val) diff --git a/mp_writer.h b/mp_writer.h index f22badf..93b3d45 100644 --- a/mp_writer.h +++ b/mp_writer.h @@ -3,6 +3,8 @@ #include #include +#include +#include #include "./third_party/msgpuck/msgpuck.h" #include "wtf_buffer.h" @@ -58,6 +60,16 @@ class mp_writer return *this; } + template + mp_writer& operator<< (const std::map& val) noexcept + { + begin_map(val.size()); + for( auto& it : val ) + *this << it.first << it.second; + finalize(); + return *this; + } + template && sizeof(T) < 16>> mp_writer& operator<< (const T &val) noexcept { @@ -77,6 +89,44 @@ class mp_writer return *this; } + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + { + TupleWriter::out_tuple(stream, tuple); + *stream << std::get(tuple); + } + }; + + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + { + *stream << std::get<0>(tuple); + } + }; + + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + {} + }; + + template + mp_writer& operator<<(const std::tuple &value) + { + begin_array(std::tuple_size>::value); + TupleWriter, sizeof...(Args)>::out_tuple(this, value); + finalize(); + return *this; + } + protected: template class wtf_stack diff --git a/tntevloop.cpp b/tntevloop.cpp index 0ef86cd..fec666b 100644 --- a/tntevloop.cpp +++ b/tntevloop.cpp @@ -25,7 +25,7 @@ namespace tnt asyncNotifier_.data = this; ev_async_start(loop, &asyncNotifier_); - ev_timer_init(&timer_, OnTimer_, 1, 1); + ev_timer_init(&timer_, OnTimer_, 0, 1); timer_.data = this; ev_timer_start(loop, &timer_); @@ -45,14 +45,6 @@ namespace tnt ev_io_set(&socketWatcher_, socket_handle(), events); if (events) ev_io_start(loop_, &socketWatcher_); - - if (mode & tnt::socket_state::read) - cout << "R"; - if (mode & tnt::socket_state::write) - cout << "W"; - if (!mode) - cout << "N"; - cout << endl; } } diff --git a/tntevloop.h b/tntevloop.h index 7069bb5..0f8a2c6 100644 --- a/tntevloop.h +++ b/tntevloop.h @@ -16,14 +16,16 @@ namespace tnt protected: struct ev_loop* loop_; - ev_io socketWatcher_; - ev_async asyncNotifier_; - ev_timer timer_; + ev_io socketWatcher_; static void OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents); void OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents); + + ev_async asyncNotifier_; static void OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents); void OnAsyncNotifier(struct ev_loop* loop, ev_async* w, int revents); + + ev_timer timer_; static void OnTimer_(struct ev_loop* loop, ev_timer* w, int revents); void OnTimer(struct ev_loop* loop, ev_timer* w, int revents); From c562e36b685f83ea7211cfc8e291af294eaaa98d Mon Sep 17 00:00:00 2001 From: Nik Date: Thu, 5 Sep 2019 12:03:56 +0300 Subject: [PATCH 05/16] bugfix --- connector.cpp | 18 ++++++++++++++++++ connector.h | 43 +++++++++++++++++++++++++++++++------------ mp_writer.cpp | 11 +++++++---- 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/connector.cpp b/connector.cpp index 91aa7fd..65cccd0 100644 --- a/connector.cpp +++ b/connector.cpp @@ -32,6 +32,7 @@ namespace tnt { auto encoded_body = r.map(); handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); + handlers_.erase(handler); } } catch(const mp_reader_error &e) @@ -58,12 +59,29 @@ namespace tnt void Connector::OnOpened() { + isConnected_ = true; for (auto& it : onOpenedHandlers_) it(); } void Connector::OnClosed() { + isConnected_ = false; + wtf_buffer buff; + mp_writer writer(buff); + writer.begin_map(1); + writer< void Call(string_view name, OnFuncResult&& resultHundler, UserData userData = (void*)nullptr, Args... args) { - static_assert (sizeof (HandlerData::userData_) >= sizeof (userData), "User data too big."); - constexpr size_t countArgs = sizeof...(args); - begin_call(name); - begin_array(countArgs); - write(args...); - if (countArgs) + if (isConnected_) + { + static_assert (sizeof (HandlerData::userData_) >= sizeof (userData), "User data too big."); + constexpr size_t countArgs = sizeof...(args); + begin_call(name); + begin_array(countArgs); + write(args...); + if (countArgs) + finalize(); finalize(); - finalize(); - HandlerData handler; - handler.handler_ = move(resultHundler); - *((decltype (userData)*)&handler.userData_) = move(userData); - handlers_[last_request_id()] = move(handler); - flush(); + HandlerData handler; + handler.handler_ = move(resultHundler); + *((decltype (userData)*)&handler.userData_) = move(userData); + handlers_[last_request_id()] = move(handler); + flush(); + } + else + { + wtf_buffer buff; + mp_writer writer(buff); + writer.begin_map(1); + writer< handlers_; vector onOpenedHandlers_; vector onClosedHandlers_; + bool isConnected_ = false; void OnResponse(wtf_buffer &buf); void OnOpened(); diff --git a/mp_writer.cpp b/mp_writer.cpp index 543ab7f..cb385a8 100644 --- a/mp_writer.cpp +++ b/mp_writer.cpp @@ -40,6 +40,9 @@ mp_writer::mp_writer(wtf_buffer &buf) : _buf(buf) {} void mp_writer::begin_array(uint32_t max_cardinality) { + if (!_opened_containers.empty()) + ++_opened_containers.top().items_count; + if (max_cardinality) _opened_containers.push({_buf.size(), max_cardinality}); _buf.end = mp_encode_array(_buf.end, max_cardinality); @@ -47,6 +50,9 @@ void mp_writer::begin_array(uint32_t max_cardinality) void mp_writer::begin_map(uint32_t max_cardinality) { + if (!_opened_containers.empty()) + ++_opened_containers.top().items_count; + if (max_cardinality) _opened_containers.push({_buf.size(), max_cardinality * 2}); _buf.end = mp_encode_map(_buf.end, max_cardinality); @@ -60,9 +66,6 @@ void mp_writer::finalize() auto &c = _opened_containers.pop(); char *head = _buf.data() + c.head_offset; - if (!_opened_containers.empty()) - ++_opened_containers.top().items_count; - if (static_cast(*head) == 0xce) // request head { size_t size = static_cast(_buf.end - head); @@ -103,7 +106,7 @@ void mp_writer::finalize() else if (container_type == MP_MAP) { actual_cardinality = c.items_count / 2; // map cardinality - if (actual_cardinality == c.max_cardinality) + if (actual_cardinality * 2 == c.max_cardinality) return; // get current header size From 1a504546c136d2176c16342beee33d2030bb6966 Mon Sep 17 00:00:00 2001 From: Nik Date: Thu, 5 Sep 2019 17:05:48 +0300 Subject: [PATCH 06/16] fix bug --- connector.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector.cpp b/connector.cpp index 65cccd0..359c6b9 100644 --- a/connector.cpp +++ b/connector.cpp @@ -32,7 +32,7 @@ namespace tnt { auto encoded_body = r.map(); handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); - handlers_.erase(handler); + handlers_.erase(header.sync); } } catch(const mp_reader_error &e) From 044167f033f6eec7eacbf0e86fe0d8110148103b Mon Sep 17 00:00:00 2001 From: Nik Date: Fri, 13 Sep 2019 16:39:36 +0300 Subject: [PATCH 07/16] Refactoring. Handle Reconnect while processing replies. --- connector.cpp | 20 +++++++++++++++++++- connector.h | 13 +++++++++---- tntevloop.cpp | 8 ++++---- tntevloop.h | 5 ++--- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/connector.cpp b/connector.cpp index 359c6b9..7bd458a 100644 --- a/connector.cpp +++ b/connector.cpp @@ -15,6 +15,7 @@ namespace tnt void Connector::OnResponse(wtf_buffer &buf) { + isProcessingReply_ = true; mp_reader bunch(buf); while (mp_reader r = bunch.iproto_message()) { @@ -32,7 +33,8 @@ namespace tnt { auto encoded_body = r.map(); handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); - handlers_.erase(header.sync); + //handlers_.erase(header.sync); + handlers_.erase(handler); } } catch(const mp_reader_error &e) @@ -45,6 +47,12 @@ namespace tnt } } input_processed(); + isProcessingReply_ = false; + if (isNeedsClose_) + { + connection::close(true, isNeedsReconnect_); + isNeedsClose_ = false; + } } void Connector::AddOnOpened(SimpleEventCallbak cb_) @@ -87,5 +95,15 @@ namespace tnt it(); } + void Connector::close(bool reconnect_soon) noexcept + { + if (isProcessingReply_) + { + isNeedsClose_ = true; + isNeedsReconnect_ = reconnect_soon; + } + else + connection::close(true, reconnect_soon); + } } diff --git a/connector.h b/connector.h index c872bae..b80cbc7 100644 --- a/connector.h +++ b/connector.h @@ -1,12 +1,12 @@ #pragma once #include -#include +#include -#include "tntevloop.h" #include "mp_writer.h" #include "mp_reader.h" #include "proto.h" +#include "connection.h" using namespace std; @@ -30,7 +30,7 @@ namespace tnt //template //void write(){}; - class Connector : public TntEvLoop, public iproto_writer + class Connector : public connection, public iproto_writer { public: class FuncParamTuple @@ -96,6 +96,7 @@ namespace tnt void AddOnOpened(SimpleEventCallbak cb_); void AddOnClosed(SimpleEventCallbak cb_); + void close(bool reconnect_soon = false) noexcept; protected: class HandlerData @@ -105,7 +106,7 @@ namespace tnt uint64_t userData_[2]; }; - map handlers_; + unordered_map handlers_; vector onOpenedHandlers_; vector onClosedHandlers_; bool isConnected_ = false; @@ -114,6 +115,10 @@ namespace tnt void OnOpened(); void OnClosed(); + bool isNeedsClose_ = false; + bool isNeedsReconnect_ = false; + bool isProcessingReply_ = false; + }; diff --git a/tntevloop.cpp b/tntevloop.cpp index f26c0ed..d173e70 100644 --- a/tntevloop.cpp +++ b/tntevloop.cpp @@ -10,7 +10,7 @@ namespace tnt TntEvLoop::TntEvLoop() { - on_opened(bind(&TntEvLoop::OnConnected, this)); + AddOnOpened(bind(&TntEvLoop::OnConnected, this)); } void TntEvLoop::Attach(struct ev_loop* loop) @@ -59,12 +59,12 @@ namespace tnt { (void)loop;(void)w; if (revents & EV_ERROR) - handle_error("EV_ERROR soket state received"); + connection::handle_error("EV_ERROR soket state received"); if (revents & EV_WRITE) - write(); + connection::write(); if (revents & EV_READ) - read(); + connection::read(); } void TntEvLoop::OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents) diff --git a/tntevloop.h b/tntevloop.h index 1b25f30..fae9e09 100644 --- a/tntevloop.h +++ b/tntevloop.h @@ -2,13 +2,12 @@ #include - -#include "connection.h" +#include "connector.h" namespace tnt { - class TntEvLoop : public connection + class TntEvLoop : public Connector { public: TntEvLoop(); From 2c32a835dd01d84b71f93745e0f882839c231bc1 Mon Sep 17 00:00:00 2001 From: Nik Date: Fri, 20 Sep 2019 16:00:27 +0300 Subject: [PATCH 08/16] connection status --- connector.cpp | 5 +++++ connector.h | 1 + tntevloop.cpp | 1 + 3 files changed, 7 insertions(+) diff --git a/connector.cpp b/connector.cpp index 7bd458a..953c8fa 100644 --- a/connector.cpp +++ b/connector.cpp @@ -106,4 +106,9 @@ namespace tnt connection::close(true, reconnect_soon); } + bool Connector::IsConnected() + { + return isConnected_; + } + } diff --git a/connector.h b/connector.h index b80cbc7..2f06ab6 100644 --- a/connector.h +++ b/connector.h @@ -97,6 +97,7 @@ namespace tnt void AddOnOpened(SimpleEventCallbak cb_); void AddOnClosed(SimpleEventCallbak cb_); void close(bool reconnect_soon = false) noexcept; + bool IsConnected(); protected: class HandlerData diff --git a/tntevloop.cpp b/tntevloop.cpp index d173e70..38ca76d 100644 --- a/tntevloop.cpp +++ b/tntevloop.cpp @@ -19,6 +19,7 @@ namespace tnt socketWatcher_.data = this; ev_init(&socketWatcher_, OnSocketEvent_); + socketWatcher_.events = 0; on_socket_watcher_request([this](int mode)noexcept{this->OnSocketWatcherRequest(mode);}); From ea9a842030221cf727825e6aa767a36571c53d6f Mon Sep 17 00:00:00 2001 From: Nik Date: Tue, 1 Oct 2019 16:40:05 +0300 Subject: [PATCH 09/16] fix finalize. --- connector.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector.h b/connector.h index 2f06ab6..c026b01 100644 --- a/connector.h +++ b/connector.h @@ -59,7 +59,7 @@ namespace tnt begin_call(name); begin_array(countArgs); write(args...); - if (countArgs) + //if (countArgs) finalize(); finalize(); HandlerData handler; From 3cc4496494df885245726a35c588c8607d68cfe7 Mon Sep 17 00:00:00 2001 From: Nik Date: Tue, 1 Oct 2019 17:39:02 +0300 Subject: [PATCH 10/16] fix crash on stop. --- connection.h | 2 +- tntevloop.cpp | 24 +++++++++++++++++------- tntevloop.h | 6 ++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/connection.h b/connection.h index 596131c..76c8708 100644 --- a/connection.h +++ b/connection.h @@ -112,7 +112,7 @@ class connection public: connection(std::string_view connection_string = {}); - ~connection(); + virtual ~connection(); void open(int delay = 0); void close(bool call_disconnect_handler = true, bool reconnect_soon = false) noexcept; diff --git a/tntevloop.cpp b/tntevloop.cpp index 38ca76d..c84918d 100644 --- a/tntevloop.cpp +++ b/tntevloop.cpp @@ -8,12 +8,18 @@ using namespace std; namespace tnt { - TntEvLoop::TntEvLoop() - { - AddOnOpened(bind(&TntEvLoop::OnConnected, this)); - } - - void TntEvLoop::Attach(struct ev_loop* loop) + TntEvLoop::TntEvLoop() + { + AddOnOpened(bind(&TntEvLoop::OnConnected, this)); + } + + TntEvLoop::~TntEvLoop() + { + isStoped_ = true; + connection::close(false, false); + } + + void TntEvLoop::Attach(struct ev_loop* loop) { loop_ = loop; @@ -37,6 +43,8 @@ namespace tnt void TntEvLoop::OnSocketWatcherRequest(int mode) noexcept { + if (isStoped_) + return; int events = (mode & tnt::socket_state::read ? EV_READ : EV_NONE); events |= (mode & tnt::socket_state::write ? EV_WRITE : EV_NONE); @@ -58,7 +66,9 @@ namespace tnt void TntEvLoop::OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents) { - (void)loop;(void)w; +// if (isStoped_) +// return; + (void)loop;(void)w; if (revents & EV_ERROR) connection::handle_error("EV_ERROR soket state received"); diff --git a/tntevloop.h b/tntevloop.h index fae9e09..fd41e25 100644 --- a/tntevloop.h +++ b/tntevloop.h @@ -10,11 +10,13 @@ namespace tnt class TntEvLoop : public Connector { public: - TntEvLoop(); - void Attach(struct ev_loop* loop); + TntEvLoop(); + ~TntEvLoop(); + void Attach(struct ev_loop* loop); protected: struct ev_loop* loop_; + bool isStoped_ = false; ev_io socketWatcher_; static void OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents); From cca5d46549e745de01ebd3416eea6afde24866c2 Mon Sep 17 00:00:00 2001 From: Nik Date: Fri, 10 Jan 2020 17:11:54 +0300 Subject: [PATCH 11/16] refactoring --- mp_reader.cpp | 2 +- mp_reader.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mp_reader.cpp b/mp_reader.cpp index 2b43e63..f817932 100644 --- a/mp_reader.cpp +++ b/mp_reader.cpp @@ -346,7 +346,7 @@ size_t mp_array_reader::cardinality() const noexcept return _cardinality; } -const char* ToString(mp_type type) +const char* to_string(mp_type type) { switch (type) { diff --git a/mp_reader.h b/mp_reader.h index 7607bcb..e280e0f 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -18,7 +18,7 @@ class mp_reader; std::string hex_dump(const char *begin, const char *end, const char *pos = nullptr); -const char* ToString(mp_type type); +const char* to_string(mp_type type); /// messagepack parsing error class mp_reader_error : public std::runtime_error From 2dd1cafebf65f5b98df2a7fd3aa52c6dfcd6d3c5 Mon Sep 17 00:00:00 2001 From: Nik Date: Mon, 13 Jan 2020 10:41:32 +0300 Subject: [PATCH 12/16] fixes after merge. --- connector.cpp | 6 +++--- connector.h | 2 +- mp_reader.h | 8 +++++++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/connector.cpp b/connector.cpp index 953c8fa..5215d10 100644 --- a/connector.cpp +++ b/connector.cpp @@ -21,7 +21,7 @@ namespace tnt { try { - auto encoded_header = r.map(); + auto encoded_header = r.read(); Header header; encoded_header[tnt::header_field::SYNC] >> header.sync; encoded_header[tnt::header_field::CODE] >> header.errCode; @@ -31,7 +31,7 @@ namespace tnt handle_error("unexpected response"); else { - auto encoded_body = r.map(); + auto encoded_body = r.read(); handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); //handlers_.erase(header.sync); handlers_.erase(handler); @@ -83,7 +83,7 @@ namespace tnt Header header; header.errCode = 77; mp_reader reader(buff); - mp_map_reader body = reader.map(); + auto body = reader.read(); for (auto& it : handlers_) { mp_map_reader body2 = body; diff --git a/connector.h b/connector.h index c026b01..4c75fb7 100644 --- a/connector.h +++ b/connector.h @@ -79,7 +79,7 @@ namespace tnt header.errCode = 77; header.sync = 0; mp_reader reader(buff); - mp_map_reader body = reader.map(); + auto body = reader.read(); resultHundler(header, body, &userData); } } diff --git a/mp_reader.h b/mp_reader.h index 8a693e5..a4bd28e 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -313,7 +313,6 @@ class mp_array_reader : public mp_reader (std::is_integral_v && (sizeof(T) < 16)) || std::is_same_v || std::is_same_v || - std::is_same_v> || is_tuple::value >> mp_array_reader& operator>> (T &val) @@ -322,6 +321,13 @@ class mp_array_reader : public mp_reader return *this; } + template + mp_array_reader& operator>> (std::vector &val) + { + mp_reader::operator>>(val); + return *this; + } + template mp_array_reader& operator>> (std::map &val) { From fa40d2c28a0bb12367f4ba93fbe06fc0e00f1029 Mon Sep 17 00:00:00 2001 From: Nik Date: Tue, 14 Jan 2020 15:40:48 +0300 Subject: [PATCH 13/16] fix merge --- mp_reader.cpp | 8 -------- mp_reader.h | 21 --------------------- 2 files changed, 29 deletions(-) diff --git a/mp_reader.cpp b/mp_reader.cpp index bbf2474..e7610cf 100644 --- a/mp_reader.cpp +++ b/mp_reader.cpp @@ -234,13 +234,6 @@ bool mp_reader::has_next() const noexcept mp_reader &mp_reader::operator>>(string &val) { -<<<<<<< HEAD - string_view tmp; - *this >> tmp; - if (!tmp.data()) - throw mp_reader_error("string expected, got nil", *this); - val.assign(tmp.data(), tmp.size()); -======= if (mp_typeof(*_current_pos) == MP_EXT) { val = to_string(); @@ -253,7 +246,6 @@ mp_reader &mp_reader::operator>>(string &val) throw mp_reader_error("string expected, got no data", *this); val.assign(tmp.data(), tmp.size()); } ->>>>>>> 48245713e74d83df4580b8af8266e68d27b02bcb return *this; } diff --git a/mp_reader.h b/mp_reader.h index d1347bc..08055ac 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -306,38 +306,17 @@ class mp_array_reader : public mp_reader // We need to preserve mp_array_reader type after every reading operation. //using mp_reader::operator>>; -<<<<<<< HEAD template -======= - template && (sizeof(T) < 16)) || - std::is_same_v || - std::is_same_v || - is_tuple::value - >> ->>>>>>> 2dd1cafebf65f5b98df2a7fd3aa52c6dfcd6d3c5 mp_array_reader& operator>> (T &val) { mp_reader::operator>>(val); return *this; } -<<<<<<< HEAD template