Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,35 @@ version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::put(const VT& value, b
return ret;
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
version_tuple PersistentCascadeStore<KT, VT, IK, IV, ST>::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe, the server should reject the message here if the timestamp is too old. (The client's put_by_time function shouldn't send the message with an old timestamp in the first place, but someone might write a nonstandard client, and other server functions are similarly defensive in rejecting invalid arguments).

debug_enter_func_with_args("value.get_key_ref()={}, timestamp_us={}", value.get_key_ref(), timestamp_us);

uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds
uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US);
uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US);
uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us;
if (timestamp_us < threshold_us) {
dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)",
timestamp_us, threshold_us, now_us);
throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)");
}

LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value);

derecho::Replicated<PersistentCascadeStore>& subgroup_handle = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index);
auto results = subgroup_handle.template ordered_send_with_timestamp<RPC_NAME(ordered_put)>(timestamp_us, value, as_trigger);
auto& replies = results.get();
version_tuple ret{CURRENT_VERSION, 0};
for(auto& reply_pair : replies) {
ret = reply_pair.second.get();
}

LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_END, group, value);
debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret));
return ret;
}

template <typename KT, typename VT, KT* IK, VT* IV, persistent::StorageType ST>
void PersistentCascadeStore<KT, VT, IK, IV, ST>::put_and_forget(const VT& value, bool as_trigger) const {
debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref());
Expand Down
108 changes: 107 additions & 1 deletion include/cascade/detail/service_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cascade/config.h>
#include <cascade/utils.hpp>
#include <derecho/conf/conf.hpp>
#include <derecho/core/derecho.hpp>
#include <mutex>
#include <typeindex>
Expand Down Expand Up @@ -64,7 +65,8 @@ std::unique_ptr<CascadeType> client_stub_factory() {
template <typename... CascadeTypes>
ServiceClient<CascadeTypes...>::ServiceClient(derecho::Group<CascadeMetadataService<CascadeTypes...>, CascadeTypes...>* _group_ptr)
: external_group_ptr(nullptr),
group_ptr(_group_ptr) {
group_ptr(_group_ptr),
server_clock_skew_delta_us(derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US)) {
if(group_ptr == nullptr) {
this->external_group_ptr = std::make_unique<derecho::ExternalGroupClient<CascadeMetadataService<CascadeTypes...>, CascadeTypes...>>(
client_stub_factory<CascadeMetadataService<CascadeTypes...>>,
Expand Down Expand Up @@ -758,6 +760,110 @@ void ServiceClient<CascadeTypes...>::collective_trigger_put(
}
}

template <typename... CascadeTypes>
template <typename ObjectType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put_by_time(
const ObjectType& value, const uint64_t& timestamp_us, bool as_trigger) {
// STEP 1 - get key
if constexpr (!std::is_base_of_v<ICascadeObject<std::string,ObjectType>,ObjectType>) {
throw derecho::derecho_exception(std::string("ServiceClient<>::put_by_time() only support object of type ICascadeObject<std::string,ObjectType>,but we got ") + typeid(ObjectType).name());
}

// STEP 2 - validate timestamp: reject if timestamp_us < get_walltime() - Delta
uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds
uint64_t delta_us = server_clock_skew_delta_us / 1000ULL; // Convert nanoseconds to microseconds

if (timestamp_us < (now_us - delta_us)) {
dbg_default_warn("put_by_time: timestamp {}us is too old (now={}us, delta={}us), rejecting",
timestamp_us, now_us, delta_us);
throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)");
}

// STEP 3 - get shard
uint32_t subgroup_type_index,subgroup_index,shard_index;
std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref());

// STEP 4 - call recursive put_by_time
return this->template type_recursive_put_by_time<ObjectType,CascadeTypes...>(subgroup_type_index,value,timestamp_us,subgroup_index,shard_index,as_trigger);
}

template <typename... CascadeTypes>
template <typename SubgroupType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::put_by_time(
const typename SubgroupType::ObjectType& value,
const uint64_t& timestamp_us,
uint32_t subgroup_index,
uint32_t shard_index,
bool as_trigger) {
LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_BY_TIME_START,
(std::is_base_of<IHasMessageID,typename SubgroupType::ObjectType>::value?value.get_message_id():0));
// Validate timestamp: timestamp_us must be >= (now - delta)
uint64_t now_ns = get_walltime();
uint64_t timestamp_ns = timestamp_us * 1000ULL; // Convert microseconds to nanoseconds
uint64_t delta_ns = server_clock_skew_delta_us * 1000ULL;

if (timestamp_ns < now_ns - delta_ns) {
// Timestamp is too old, reject the request
throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)");
}
if (!is_external_client()) {
std::lock_guard<std::mutex> lck(this->group_ptr_mutex);
if (static_cast<uint32_t>(group_ptr->template get_my_shard<SubgroupType>(subgroup_index)) == shard_index) {
// ordered put_by_time as a shard member - use ordered_send with timestamp
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template ordered_send_with_timestamp<RPC_NAME(ordered_put)>(timestamp_us, value, as_trigger);
} else {
// p2p put_with_timestamp - send to a shard member who will do ordered_send_with_timestamp
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
try {
auto& subgroup_handle = group_ptr->template get_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template p2p_send<RPC_NAME(put_with_timestamp)>(node_id,value,timestamp_us,as_trigger);
} catch (derecho::invalid_subgroup_exception& ex) {
auto& subgroup_handle = group_ptr->template get_nonmember_subgroup<SubgroupType>(subgroup_index);
return subgroup_handle.template p2p_send<RPC_NAME(put_with_timestamp)>(node_id,value,timestamp_us,as_trigger);
}
}
} else {
// External client - use put_with_timestamp P2P call
std::lock_guard<std::mutex> lck(this->external_group_ptr_mutex);
auto& caller = external_group_ptr->template get_subgroup_caller<SubgroupType>(subgroup_index);
node_id_t node_id = pick_member_by_policy<SubgroupType>(subgroup_index,shard_index,value.get_key_ref());
return caller.template p2p_send<RPC_NAME(put_with_timestamp)>(node_id,value,timestamp_us,as_trigger);
}
}

template <typename... CascadeTypes>
template <typename ObjectType, typename FirstType, typename SecondType, typename... RestTypes>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::type_recursive_put_by_time(
uint32_t type_index,
const ObjectType& object,
const uint64_t& timestamp_us,
uint32_t subgroup_index,
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
return this->template put_by_time<FirstType>(object, timestamp_us, subgroup_index, shard_index, as_trigger);
} else {
return this->template type_recursive_put_by_time<ObjectType,SecondType,RestTypes...>(type_index-1,object,timestamp_us,subgroup_index,shard_index,as_trigger);
}
}

template <typename... CascadeTypes>
template <typename ObjectType, typename LastType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::type_recursive_put_by_time(
uint32_t type_index,
const ObjectType& object,
const uint64_t& timestamp_us,
uint32_t subgroup_index,
uint32_t shard_index,
bool as_trigger) {
if (type_index == 0) {
return this->template put_by_time<LastType>(object, timestamp_us, subgroup_index, shard_index, as_trigger);
} else {
throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary.");
}
}

template <typename... CascadeTypes>
template <typename SubgroupType>
derecho::rpc::QueryResults<version_tuple> ServiceClient<CascadeTypes...>::remove(
Expand Down
6 changes: 6 additions & 0 deletions include/cascade/detail/trigger_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ version_tuple TriggerCascadeNoStore<KT, VT, IK, IV>::put(const VT& value, bool a
return {persistent::INVALID_VERSION, 0};
}

template <typename KT, typename VT, KT* IK, VT* IV>
version_tuple TriggerCascadeNoStore<KT, VT, IK, IV>::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const {
dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__);
return {persistent::INVALID_VERSION, 0};
}

template <typename KT, typename VT, KT* IK, VT* IV>
void TriggerCascadeNoStore<KT, VT, IK, IV>::put_and_forget(const VT& value, bool as_trigger) const {
dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__);
Expand Down
29 changes: 29 additions & 0 deletions include/cascade/detail/volatile_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ version_tuple VolatileCascadeStore<KT, VT, IK, IV>::put(const VT& value, bool as
return ret;
}

template <typename KT, typename VT, KT* IK, VT* IV>
version_tuple VolatileCascadeStore<KT, VT, IK, IV>::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const {
debug_enter_func_with_args("value.get_key_ref={}, timestamp_us={}", value.get_key_ref(), timestamp_us);

uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds
uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US);
uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US);
uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us;
if (timestamp_us < threshold_us) {
dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)",
timestamp_us, threshold_us, now_us);
throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)");
}

LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value);

derecho::Replicated<VolatileCascadeStore>& subgroup_handle = group->template get_subgroup<VolatileCascadeStore>(this->subgroup_index);
auto results = subgroup_handle.template ordered_send_with_timestamp<RPC_NAME(ordered_put)>(timestamp_us, value, as_trigger);
auto& replies = results.get();
version_tuple ret{CURRENT_VERSION, 0};
for(auto& reply_pair : replies) {
ret = reply_pair.second.get();
}

LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_END, group, value);
debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret));
return ret;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void VolatileCascadeStore<KT, VT, IK, IV>::put_and_forget(const VT& value, bool as_trigger) const {
debug_enter_func_with_args("value.get_key_ref={}", value.get_key_ref());
Expand Down
2 changes: 2 additions & 0 deletions include/cascade/persistent_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class PersistentCascadeStore : public ICascadeStore<KT, VT, IK, IV>,
REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(PersistentCascadeStore,
P2P_TARGETS(
put,
put_with_timestamp,
put_and_forget,
#ifdef ENABLE_EVALUATION
perf_put,
Expand Down Expand Up @@ -86,6 +87,7 @@ class PersistentCascadeStore : public ICascadeStore<KT, VT, IK, IV>,
#endif // ENABLE_EVALUATION
virtual void trigger_put(const VT& value) const override;
virtual version_tuple put(const VT& value, bool as_trigger) const override;
virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const;
virtual void put_and_forget(const VT& value, bool as_trigger) const override;
#ifdef ENABLE_EVALUATION
virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override;
Expand Down
62 changes: 62 additions & 0 deletions include/cascade/service_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <derecho/core/notification.hpp>
#include <derecho/mutils-serialization/SerializationSupport.hpp>
#include <derecho/persistent/PersistentInterface.hpp>
#include <derecho/utils/time.h>
#include <functional>
#include <hs/hs.h>
#include <memory>
Expand Down Expand Up @@ -118,6 +119,8 @@ class ServiceClient {
do_hash<std::tuple<std::type_index, uint32_t, uint32_t>>>
member_cache;
mutable std::shared_mutex member_cache_mutex;
// config clock skew delta (in microseconds) for time consistency
const uint64_t server_clock_skew_delta_us;
/**
* 'object_pool_info_cache' is a local cache for object pool metadata. This cache is used to accelerate the
* object access process. If an object pool does not exists, it will be loaded from metadata service.
Expand Down Expand Up @@ -519,7 +522,66 @@ class ServiceClient {
void collective_trigger_put(const typename SubgroupType::ObjectType& object,
uint32_t subgroup_index,
std::unordered_map<node_id_t, std::unique_ptr<derecho::rpc::QueryResults<void>>>& nodes_and_futures);
/**
* "put_by_time" writes an object to a given subgroup/shard with a custom timestamp.
*
* @param[in] object the object to write.
* @param[in] timestamp_us the timestamp in microseconds to use for the message header.
* The request will be rejected if timestamp_us < get_walltime() - Delta.
* @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be
* used to update the state.
*/
template <typename ObjectType>
derecho::rpc::QueryResults<version_tuple> put_by_time(const ObjectType& object, const uint64_t& timestamp_us, bool as_trigger = false);

/**
* "put_by_time" writes an object to a given subgroup/shard with a custom timestamp.
*
* @param[in] object the object to write.
* @param[in] timestamp_us the timestamp in microseconds to use for the message header.
* The request will be rejected if timestamp_us < get_walltime() - Delta.
* @param[in] subgroup_index the subgroup index of CascadeType
* @param[in] shard_index the shard index.
* @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be
* used to update the state.
*/
template <typename SubgroupType>
derecho::rpc::QueryResults<version_tuple> put_by_time(const typename SubgroupType::ObjectType& object,
const uint64_t& timestamp_us, uint32_t subgroup_index, uint32_t shard_index, bool as_trigger = false);

protected:
/**
* "type_recursive_put" is a helper function for internal use only.
* @param[in] type_index the index of the subgroup type in the CascadeTypes... list. And the FirstType,
* SecondType, ..., RestTypes should be in the same order.
* @param[in] object the object to write
* @param[in] timestamp_us the timestamp in microseconds to use for the message header.
* @param[in] subgroup_index
* the subgroup index in the subgroup type designated by type_index
* @param[in] shard_index the shard index
* @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be
* used to update the state.
*
* @return a future to the version and timestamp of the put operation.
*/
template <typename ObjectType, typename FirstType, typename SecondType, typename... RestTypes>
derecho::rpc::QueryResults<version_tuple> type_recursive_put_by_time(
uint32_t type_index,
const ObjectType& object,
const uint64_t& timestamp_us,
uint32_t subgroup_index,
uint32_t shard_index,
bool as_trigger = false);

template <typename ObjectType, typename LastType>
derecho::rpc::QueryResults<version_tuple> type_recursive_put_by_time(
uint32_t type_index,
const ObjectType& object,
const uint64_t& timestamp_us,
uint32_t subgroup_index,
uint32_t shard_index,
bool as_trigger = false);
public:
/**
* "remove" deletes an object with the given key.
*
Expand Down
2 changes: 2 additions & 0 deletions include/cascade/trigger_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TriggerCascadeNoStore : public ICascadeStore<KT, VT, IK, IV>,
REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(TriggerCascadeNoStore,
P2P_TARGETS(
put,
put_with_timestamp,
put_and_forget,
#ifdef ENABLE_EVALUATION
perf_put,
Expand Down Expand Up @@ -81,6 +82,7 @@ class TriggerCascadeNoStore : public ICascadeStore<KT, VT, IK, IV>,
#endif // ENABLE_EVALUATION
virtual void trigger_put(const VT& value) const override;
virtual version_tuple put(const VT& value, bool as_trigger) const override;
virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const;
virtual void put_and_forget(const VT& value, bool as_trigger) const override;
#ifdef ENABLE_EVALUATION
virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override;
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ inline uint64_t get_time_us(bool use_wall_clock = true) {
return get_time_ns(use_wall_clock)/INT64_1E3;
}


/**
* decompose the prefix into tokens. Please note that the token after the last separator is not considered a part of
* the prefix and hence dropped if the "prefix_only" is true
Expand Down Expand Up @@ -204,6 +203,7 @@ class OpenLoopLatencyCollector: public OpenLoopLatencyCollectorClient {
#define TLT_SERVICE_CLIENT_MULTI_LIST_KEYS_START (1009)
#define TLT_SERVICE_CLIENT_GET_SIZE_START (1010)
#define TLT_SERVICE_CLIENT_MULTI_GET_SIZE_START (1011)
#define TLT_SERVICE_CLIENT_PUT_BY_TIME_START (1012)

/* For VolatileCascadeStore:
* ::put():
Expand Down
Loading
Loading