From 7d6b75d7e4b4d96fa49a8d6bc7a626717d53fe7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Thu, 10 Oct 2019 10:16:34 +0200 Subject: [PATCH 01/19] new partitioner --- hecuba_py/hecuba/IStorage.py | 75 +--- hecuba_py/hecuba/partitioner.py | 334 ++++++++++++++++++ hecuba_py/tests/storageobj_tests.py | 2 + ...ty.py => istorage_split_locality_tests.py} | 65 ++-- storageAPI/storage/api.py | 47 ++- 5 files changed, 426 insertions(+), 97 deletions(-) create mode 100644 hecuba_py/hecuba/partitioner.py rename hecuba_py/tests/withcassandra/{istorage_split_locality.py => istorage_split_locality_tests.py} (66%) diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index fbc6cdb1..7fab0414 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -1,10 +1,9 @@ import re -import uuid from bisect import bisect_right from collections import namedtuple, defaultdict -from time import time -from hecuba import config, log +from hecuba import config +from hecuba.partitioner import partitioner_split class AlreadyPersistentError(RuntimeError): @@ -75,61 +74,6 @@ def process_path(module_path): return class_name, module -def _tokens_partitions(ksp, table, tokens_ranges, splits_per_node, token_range_size, target_token_range_size): - """ - Method that calculates the new token partitions for a given object - Args: - tokens: current number of tokens of the object - min_tokens_per_worker: defined minimum number of tokens - number_of_workers: defined - Returns: - a partition every time it's called - :type tokens_ranges: list[(long,long)] - """ - - tm = config.cluster.metadata.token_map - tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) - from cassandra.metadata import Murmur3Token - tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) - if not tmap: - tm.rebuild_keyspace(ksp, build_if_absent=True) - tmap = tm.tokens_to_hosts_by_ks[ksp] - - tokens_per_node = defaultdict(list) - for tmumur, t_to in tokens_murmur3: - point = bisect_right(tm.ring, tmumur) - if point == len(tm.ring): - tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) - else: - tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) - - n_nodes = len(tokens_per_node) - step_size = _max_token // (splits_per_node * n_nodes) - if token_range_size: - step_size = token_range_size - elif target_token_range_size: - one = config.session.execute(_size_estimates, [ksp, table]).one() - if one: - (mean_p_size, p_count) = one - estimated_size = mean_p_size * p_count - if estimated_size > 0: - step_size = _max_token // ( - max(estimated_size / target_token_range_size, - splits_per_node * n_nodes) - ) - - for tokens_in_node in tokens_per_node.values(): - partition = [] - for fraction, to in tokens_in_node: - while fraction < to - step_size: - partition.append((fraction, fraction + step_size)) - fraction += step_size - partition.append((fraction, to)) - group_size = max(len(partition) // splits_per_node, 1) - for i in range(0, len(partition), group_size): - yield partition[i:i + group_size] - - def _discrete_token_ranges(tokens): """ Makes proper tokens ranges ensuring that in a tuple (a,b) a <= b @@ -181,20 +125,7 @@ def split(self): Returns: a subobject everytime is called """ - st = time() - tokens = self._build_args.tokens - - for token_split in _tokens_partitions(self._ksp, self._table, tokens, - config.splits_per_node, - config.token_range_size, - config.target_token_range_size): - storage_id = uuid.uuid4() - log.debug('assigning to %s %d tokens', str(storage_id), len(token_split)) - new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) - args_dict = new_args._asdict() - args_dict["built_remotely"] = False - yield self.__class__.build_remotely(args_dict) - log.debug('completed split of %s in %f', self.__class__.__name__, time() - st) + return partitioner_split(self) def _get_istorage_attrs(self, storage_id): return list(config.session.execute(_select_istorage_meta, [storage_id])) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py new file mode 100644 index 00000000..3ed8f2be --- /dev/null +++ b/hecuba_py/hecuba/partitioner.py @@ -0,0 +1,334 @@ +import os +import time +import uuid +from bisect import bisect_right +from collections import defaultdict, deque + +from hecuba import config, log + +_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?") +_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " + "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) + +_max_token = int(((2 ** 63) - 1)) # type: int +_min_token = int(-2 ** 63) # type: int + + +def partitioner_split(father): + if hasattr(config, "partition_strategy"): + return Partitioner(father, config.partition_strategy).split() + else: + return Partitioner(father, "SIMPLE").split() + + +class Partitioner: + + def __init__(self, father, strategy): + self._father = father + self._strategy = strategy + self._rebuild_token_ring(self._father._ksp, self._father._build_args.tokens) + if strategy == "DYNAMIC": + self._setup_dynamic_structures() + + def _rebuild_token_ring(self, ksp, tokens_ranges): + tm = config.cluster.metadata.token_map + tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) + from cassandra.metadata import Murmur3Token + tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) + if not tmap: + tm.rebuild_keyspace(ksp, build_if_absent=True) + tmap = tm.tokens_to_hosts_by_ks[ksp] + + self._tokens_per_node = defaultdict(list) + for tmumur, t_to in tokens_murmur3: + point = bisect_right(tm.ring, tmumur) + if point == len(tm.ring): + self._tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) + else: + self._tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) + + self._nodes_number = len(self._tokens_per_node) + + def _setup_dynamic_structures(self): + try: + config.session.execute("""CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""") + except Exception as ex: + print("Could not create table hecuba.partitioning.") + raise ex + + self._prepared_store_id = \ + config.session.prepare("""INSERT INTO hecuba.partitioning + (partitioning_uuid, storage_id, number_of_partitions) + VALUES (?, ?, ?)""") + self._partitioning_uuid = uuid.uuid4() + self._partitions_time = defaultdict(list) + self._partitions_nodes = dict() + self._idle_cassandra_nodes = deque() + self._partitions_size = dict() + self._best_granularity = None + + self._select_partitions_times = \ + config.session.prepare("""SELECT storage_id, number_of_partitions, start_time, end_time + FROM hecuba.partitioning + WHERE partitioning_uuid = ? ALLOW FILTERING""") + + try: + self._nodes_number = len(os.environ["PYCOMPSS_NODES"].split(",")) - 1 + except KeyError: + self._nodes_number = int(os.environ["NODES_NUMBER"]) - 1 # master and worker + + self._n_idle_nodes = self._nodes_number + self._initial_send = self._nodes_number + # generate list of basic number of partitions + partitions = [(int(2 ** (x / 2)) // len(self._tokens_per_node)) for x in range(10, 21)] + # as many basic number of partitions as the number of nodes + # 11 basic number of partitions, repeating them when more than 11 nodes + self._basic_partitions = (partitions * (self._nodes_number // len(partitions) + 1))[0:self._nodes_number] + + def _tokens_partitions(self, ksp, table, token_range_size, target_token_range_size): + """ + Method that calculates the new token partitions for a given object + Returns: + a tuple (node, partition) every time it's called + """ + partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) + + if self._strategy == "DYNAMIC": + for partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): + yield partition_tokens + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens + + def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): + """ + Compute all the partitions per node. If the strategy is simple partitioning, each node will have + (config.splits_per_node (default 32) * self._nodes_number) partitions. If the strategy is dynamic partitioning, + each node will have 1024 partitions, because if there is only one Cassandra node this will be the minimum + granularity. If there are more nodes, some partitions will be grouped. + Returns: + a dictionary with hosts as keys and partitions of tokens as values + """ + step_size = _max_token // (config.splits_per_node * self._nodes_number) + + if token_range_size: + step_size = token_range_size + elif target_token_range_size: + res = config.session.execute(_size_estimates, [ksp, table]) + if res: + one = res.one() + else: + one = 0 + if one: + (mean_p_size, p_count) = one + estimated_size = mean_p_size * p_count + if estimated_size > 0: + step_size = _max_token // ( + max(estimated_size / target_token_range_size, + config.splits_per_node * self._nodes_number) + ) + if self._strategy == "DYNAMIC": + # 1024 because it is the maximum number of splits per node, in the case of only one Cassandra node + step_size = _max_token // 1024 + + partitions_per_node = defaultdict(list) + for node, tokens_in_node in self._tokens_per_node.items(): + for fraction, to in tokens_in_node: + while fraction < to - step_size: + partitions_per_node[node].append((fraction, fraction + step_size)) + fraction += step_size + partitions_per_node[node].append((fraction, to)) + return partitions_per_node + + def _dynamic_tokens_partitions(self, partitions_per_node): + """ + Main loop of the dynamic partitioning strategy. There are 3 stages: + Sending of initial tasks: it returns as much initial tasks as pycompss nodes + Sending of intermediate tasks: until a best granularity is chosen, it return partitions using the granularity + with the best performance + Sending of final tasks: when a best granularity is chosen, it returns all the remaining partitions using the + best granularity + Returns: + a tuple (node, partition) every time it's called + """ + while True: + if self._initial_send > 0: + for initial_tokens in self._send_initial_tasks(partitions_per_node): + yield initial_tokens + elif self._best_granularity is None: + for intermediate_tokens in self._send_intermediate_tasks(partitions_per_node): + yield intermediate_tokens + if sum([len(partitions) for partitions in partitions_per_node.values()]) == 0: + break + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens + break + + def _send_initial_tasks(self, partitions_per_node): + for node in self._tokens_per_node.keys(): + config.splits_per_node = self._basic_partitions[self._initial_send * -1] + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + self._partitions_time[config.splits_per_node] = [] + self._initial_send -= 1 + if self._initial_send == 0: + break + + def _send_intermediate_tasks(self, partitions_per_node): + self._update_partitions_time() + while not self._all_tasks_finished(): + if self._n_idle_nodes > 0: + # if there is an idle node, send a new task without choosing the best granularity + config.splits_per_node, set_best = self._best_time_per_token() + if [] not in self._partitions_time.values() and set_best: + self._best_granularity = config.splits_per_node + break + time.sleep(1) + self._update_partitions_time() + else: + self._best_granularity, _ = self._best_time_per_token() + config.splits_per_node = self._best_granularity + + node = self._idle_cassandra_nodes.popleft() + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + def _send_final_tasks(self, partitions_per_node): + for partition in partitions_per_node.values(): + group_size = max(len(partition) // config.splits_per_node, 1) + for i in range(0, len(partition), group_size): + yield -1, partition[i:i + group_size] + + def _all_tasks_finished(self): + """ + Checks that there is at least one end_time set for all the granularities + """ + if [] in self._partitions_time.values(): + return False + + for _, partition_times in self._partitions_time.items(): + if not any(times["end_time"] for times in partition_times): + return False + return True + + @staticmethod + def _set_best_granularity(best, unfinished): + for _, time_per_token in unfinished.items(): + if time_per_token < best: + return False + return True + + def _best_time_per_token(self): + """ + The time is not a good measure, because the smaller tasks will be the shortest. + We use a time / tokens proportion + """ + times_per_token = dict() + unfinished_tasks = dict() + actual_time = time.time() + + for splits_per_node, partition_times in self._partitions_time.items(): + if len(partition_times) > 0: + group_size = self._partitions_size[splits_per_node] + partition_time = 0.0 + if not any(times["end_time"] for times in partition_times): + """ + If there isn't at least one end_time set for this granularity, takes the actual time as the + finishing time. If there is already a granularity with better performance, it is selected as the + best granularity. + A granularity with this condition cannot be set as the best granularity. + """ + for t in partition_times: + partition_time += actual_time - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + try: + unfinished_tasks[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + else: + # at least one task finished + for t in partition_times: + if t["end_time"] is not None: + partition_time += t["end_time"] - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + if partition_time >= 2.0: + # to avoid having too much overhead, granularities lasting less than two seconds are discarded + try: + times_per_token[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + + sorted_times = sorted(times_per_token.items(), key=lambda item: item[1]) + + if len(sorted_times) > 0: + best_granularity, best_time = sorted_times[0] + set_best = self._set_best_granularity(best_time, unfinished_tasks) + else: + # if no task lasted at least two seconds, pick the biggest granularity + best_granularity = min(set(self._partitions_time.keys()) - set(unfinished_tasks.keys())) + set_best = False + + return best_granularity, set_best + + def _update_partitions_time(self): + partitions_times = config.session.execute(self._select_partitions_times, [self._partitioning_uuid]) + + for storage_id, partitions, start, end in partitions_times: + if start is not None: + for i, times in enumerate(self._partitions_time[partitions]): + if start == times["start_time"]: + if end is not None and times["end_time"] != end: + self._partitions_time[partitions][i]["end_time"] = end + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) + break + else: + if end is not None: + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) + total_time = {"start_time": start, "end_time": end} + self._partitions_time[partitions].append(total_time) + + def split(self): + """ + Method used to divide an object into sub-objects. + Returns: + a subobject everytime is called + """ + st = time.time() + + for node, token_split in self._tokens_partitions(self._father._ksp, self._father._table, + config.token_range_size, + config.target_token_range_size): + storage_id = uuid.uuid4() + log.debug('assigning to %s %d tokens', str(storage_id), len(token_split)) + new_args = self._father._build_args._replace(tokens=token_split, storage_id=storage_id) + args_dict = new_args._asdict() + args_dict["built_remotely"] = False + if self._strategy == "DYNAMIC": + config.session.execute(self._prepared_store_id, + [self._partitioning_uuid, storage_id, config.splits_per_node]) + self._n_idle_nodes -= 1 + self._partitions_nodes[storage_id] = node + yield self._father.__class__.build_remotely(args_dict) + + log.debug('completed split of %s in %f', self.__class__.__name__, time.time() - st) diff --git a/hecuba_py/tests/storageobj_tests.py b/hecuba_py/tests/storageobj_tests.py index 4e03a4a6..e941b292 100644 --- a/hecuba_py/tests/storageobj_tests.py +++ b/hecuba_py/tests/storageobj_tests.py @@ -88,9 +88,11 @@ def test_parse_3(self): def test_init(self): # still in development + original_exec = config.session.execute config.session.execute = Mock(return_value=None) nopars = Words() config.session.execute.assert_not_called() + config.session.execute = original_exec def test_init_pdict(self): t = TestStorageObj() diff --git a/hecuba_py/tests/withcassandra/istorage_split_locality.py b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py similarity index 66% rename from hecuba_py/tests/withcassandra/istorage_split_locality.py rename to hecuba_py/tests/withcassandra/istorage_split_locality_tests.py index f5c6210f..70c96ef3 100644 --- a/hecuba_py/tests/withcassandra/istorage_split_locality.py +++ b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py @@ -1,12 +1,19 @@ import unittest from collections import defaultdict -from hecuba import config -from hecuba.IStorage import _tokens_partitions, _discrete_token_ranges +from hecuba import config, StorageDict +from hecuba.IStorage import _discrete_token_ranges +from hecuba.partitioner import Partitioner from .. import test_config +class SimpleObj(StorageDict): + ''' + @TypeSpec dict<, v:int> + ''' + + class IStorageSplitLocalityTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -16,36 +23,44 @@ def setUpClass(cls): config.session.execute("CREATE TABLE IF NOT EXISTS test_ksp.tab(k int PRIMARY KEY,v int)") def test_enough_token(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list(_tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=10, token_range_size=None, - target_token_range_size=64 * 1024 * 1024)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 10 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 10, 20) def test_too_little_tokens(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list(_tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1000, token_range_size=None, - target_token_range_size=64 * 1024)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1000 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1000, 1000) def test_splitting_tokens(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list( - _tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1, - token_range_size=int((2 ** 64) / 1000), - target_token_range_size=None)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=int((2 ** 64) / 1000), + target_token_range_size=None)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1, 1000) def test_using_size_estimates(self): - for i in xrange(100000): + for i in range(100000): config.session.execute("INSERT INTO test_ksp.tab(k,v) values(%s,%s)", [i, i]) test_config.ccm_cluster.flush() test_config.ccm_cluster.compact() - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) + + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 tkns_p = list( - _tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1, - token_range_size=None, - target_token_range_size=64)) - #self.check_all(tkns_p, 1, 1000) + partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, target_token_range_size=64)) + # self.check_all(tkns_p, 1, 1000) def check_all(self, tkns_p, split_per_node, expected_total_tkns): self.assertGreaterEqual(len(tkns_p), len(test_config.ccm_cluster.nodes) * split_per_node) @@ -58,11 +73,11 @@ def check_all(self, tkns_p, split_per_node, expected_total_tkns): def check_full_range(self, list_of_ranges): list_of_ranges.sort() - start = map(lambda a: a[0], list_of_ranges) - counts = filter(lambda size: size[1] > 1, map(lambda number: (number, start.count(number)), start)) + start = list(map(lambda a: a[0], list_of_ranges)) + counts = list(filter(lambda size: size[1] > 1, map(lambda number: (number, start.count(number)), start))) self.assertEqual(0, len(counts), "duplicated starts") - end = map(lambda a: a[0], list_of_ranges) - counts = filter(lambda size: size[1] > 1, map(lambda number: (number, end.count(number)), end)) + end = list(map(lambda a: a[0], list_of_ranges)) + counts = list(filter(lambda size: size[1] > 1, map(lambda number: (number, end.count(number)), end))) self.assertEqual(0, len(counts), "duplicated ends") first, last = list_of_ranges[0] @@ -93,6 +108,10 @@ def checkToken(self, tokens): # type: (List[Long]) -> Host from cassandra.metadata import Token tm = config.cluster.metadata.token_map + + # only the first token of each partition is not assigned correctly + tokens = [(tok[0] + 1, tok[1]) for tok in tokens] + hosts = set(map(lambda token: tm.get_replicas("test_ksp", token)[0], map(lambda a: Token(a[0]), tokens))) self.assertEqual(len(hosts), 1, "A token range is local in 2 nodes") diff --git a/storageAPI/storage/api.py b/storageAPI/storage/api.py index 4b3cf603..5b8662bd 100755 --- a/storageAPI/storage/api.py +++ b/storageAPI/storage/api.py @@ -1,5 +1,6 @@ import uuid + def init(config_file_path=None): """ Function that can be useful when running the application with COMPSs >= 2.0 @@ -38,7 +39,28 @@ def start_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break def end_task(params): @@ -48,7 +70,28 @@ def end_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET end_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break class TaskContext(object): From b138fad68671e8aa8e5400579217ba485ada9d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Thu, 10 Oct 2019 10:16:34 +0200 Subject: [PATCH 02/19] new partitioner --- hecuba_py/hecuba/IStorage.py | 75 +--- hecuba_py/hecuba/partitioner.py | 334 ++++++++++++++++++ hecuba_py/tests/storageobj_tests.py | 2 + ...ty.py => istorage_split_locality_tests.py} | 65 ++-- storageAPI/storage/api.py | 47 ++- 5 files changed, 426 insertions(+), 97 deletions(-) create mode 100644 hecuba_py/hecuba/partitioner.py rename hecuba_py/tests/withcassandra/{istorage_split_locality.py => istorage_split_locality_tests.py} (66%) diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index fbc6cdb1..7fab0414 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -1,10 +1,9 @@ import re -import uuid from bisect import bisect_right from collections import namedtuple, defaultdict -from time import time -from hecuba import config, log +from hecuba import config +from hecuba.partitioner import partitioner_split class AlreadyPersistentError(RuntimeError): @@ -75,61 +74,6 @@ def process_path(module_path): return class_name, module -def _tokens_partitions(ksp, table, tokens_ranges, splits_per_node, token_range_size, target_token_range_size): - """ - Method that calculates the new token partitions for a given object - Args: - tokens: current number of tokens of the object - min_tokens_per_worker: defined minimum number of tokens - number_of_workers: defined - Returns: - a partition every time it's called - :type tokens_ranges: list[(long,long)] - """ - - tm = config.cluster.metadata.token_map - tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) - from cassandra.metadata import Murmur3Token - tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) - if not tmap: - tm.rebuild_keyspace(ksp, build_if_absent=True) - tmap = tm.tokens_to_hosts_by_ks[ksp] - - tokens_per_node = defaultdict(list) - for tmumur, t_to in tokens_murmur3: - point = bisect_right(tm.ring, tmumur) - if point == len(tm.ring): - tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) - else: - tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) - - n_nodes = len(tokens_per_node) - step_size = _max_token // (splits_per_node * n_nodes) - if token_range_size: - step_size = token_range_size - elif target_token_range_size: - one = config.session.execute(_size_estimates, [ksp, table]).one() - if one: - (mean_p_size, p_count) = one - estimated_size = mean_p_size * p_count - if estimated_size > 0: - step_size = _max_token // ( - max(estimated_size / target_token_range_size, - splits_per_node * n_nodes) - ) - - for tokens_in_node in tokens_per_node.values(): - partition = [] - for fraction, to in tokens_in_node: - while fraction < to - step_size: - partition.append((fraction, fraction + step_size)) - fraction += step_size - partition.append((fraction, to)) - group_size = max(len(partition) // splits_per_node, 1) - for i in range(0, len(partition), group_size): - yield partition[i:i + group_size] - - def _discrete_token_ranges(tokens): """ Makes proper tokens ranges ensuring that in a tuple (a,b) a <= b @@ -181,20 +125,7 @@ def split(self): Returns: a subobject everytime is called """ - st = time() - tokens = self._build_args.tokens - - for token_split in _tokens_partitions(self._ksp, self._table, tokens, - config.splits_per_node, - config.token_range_size, - config.target_token_range_size): - storage_id = uuid.uuid4() - log.debug('assigning to %s %d tokens', str(storage_id), len(token_split)) - new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) - args_dict = new_args._asdict() - args_dict["built_remotely"] = False - yield self.__class__.build_remotely(args_dict) - log.debug('completed split of %s in %f', self.__class__.__name__, time() - st) + return partitioner_split(self) def _get_istorage_attrs(self, storage_id): return list(config.session.execute(_select_istorage_meta, [storage_id])) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py new file mode 100644 index 00000000..3ed8f2be --- /dev/null +++ b/hecuba_py/hecuba/partitioner.py @@ -0,0 +1,334 @@ +import os +import time +import uuid +from bisect import bisect_right +from collections import defaultdict, deque + +from hecuba import config, log + +_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?") +_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " + "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) + +_max_token = int(((2 ** 63) - 1)) # type: int +_min_token = int(-2 ** 63) # type: int + + +def partitioner_split(father): + if hasattr(config, "partition_strategy"): + return Partitioner(father, config.partition_strategy).split() + else: + return Partitioner(father, "SIMPLE").split() + + +class Partitioner: + + def __init__(self, father, strategy): + self._father = father + self._strategy = strategy + self._rebuild_token_ring(self._father._ksp, self._father._build_args.tokens) + if strategy == "DYNAMIC": + self._setup_dynamic_structures() + + def _rebuild_token_ring(self, ksp, tokens_ranges): + tm = config.cluster.metadata.token_map + tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) + from cassandra.metadata import Murmur3Token + tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) + if not tmap: + tm.rebuild_keyspace(ksp, build_if_absent=True) + tmap = tm.tokens_to_hosts_by_ks[ksp] + + self._tokens_per_node = defaultdict(list) + for tmumur, t_to in tokens_murmur3: + point = bisect_right(tm.ring, tmumur) + if point == len(tm.ring): + self._tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) + else: + self._tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) + + self._nodes_number = len(self._tokens_per_node) + + def _setup_dynamic_structures(self): + try: + config.session.execute("""CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""") + except Exception as ex: + print("Could not create table hecuba.partitioning.") + raise ex + + self._prepared_store_id = \ + config.session.prepare("""INSERT INTO hecuba.partitioning + (partitioning_uuid, storage_id, number_of_partitions) + VALUES (?, ?, ?)""") + self._partitioning_uuid = uuid.uuid4() + self._partitions_time = defaultdict(list) + self._partitions_nodes = dict() + self._idle_cassandra_nodes = deque() + self._partitions_size = dict() + self._best_granularity = None + + self._select_partitions_times = \ + config.session.prepare("""SELECT storage_id, number_of_partitions, start_time, end_time + FROM hecuba.partitioning + WHERE partitioning_uuid = ? ALLOW FILTERING""") + + try: + self._nodes_number = len(os.environ["PYCOMPSS_NODES"].split(",")) - 1 + except KeyError: + self._nodes_number = int(os.environ["NODES_NUMBER"]) - 1 # master and worker + + self._n_idle_nodes = self._nodes_number + self._initial_send = self._nodes_number + # generate list of basic number of partitions + partitions = [(int(2 ** (x / 2)) // len(self._tokens_per_node)) for x in range(10, 21)] + # as many basic number of partitions as the number of nodes + # 11 basic number of partitions, repeating them when more than 11 nodes + self._basic_partitions = (partitions * (self._nodes_number // len(partitions) + 1))[0:self._nodes_number] + + def _tokens_partitions(self, ksp, table, token_range_size, target_token_range_size): + """ + Method that calculates the new token partitions for a given object + Returns: + a tuple (node, partition) every time it's called + """ + partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) + + if self._strategy == "DYNAMIC": + for partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): + yield partition_tokens + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens + + def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): + """ + Compute all the partitions per node. If the strategy is simple partitioning, each node will have + (config.splits_per_node (default 32) * self._nodes_number) partitions. If the strategy is dynamic partitioning, + each node will have 1024 partitions, because if there is only one Cassandra node this will be the minimum + granularity. If there are more nodes, some partitions will be grouped. + Returns: + a dictionary with hosts as keys and partitions of tokens as values + """ + step_size = _max_token // (config.splits_per_node * self._nodes_number) + + if token_range_size: + step_size = token_range_size + elif target_token_range_size: + res = config.session.execute(_size_estimates, [ksp, table]) + if res: + one = res.one() + else: + one = 0 + if one: + (mean_p_size, p_count) = one + estimated_size = mean_p_size * p_count + if estimated_size > 0: + step_size = _max_token // ( + max(estimated_size / target_token_range_size, + config.splits_per_node * self._nodes_number) + ) + if self._strategy == "DYNAMIC": + # 1024 because it is the maximum number of splits per node, in the case of only one Cassandra node + step_size = _max_token // 1024 + + partitions_per_node = defaultdict(list) + for node, tokens_in_node in self._tokens_per_node.items(): + for fraction, to in tokens_in_node: + while fraction < to - step_size: + partitions_per_node[node].append((fraction, fraction + step_size)) + fraction += step_size + partitions_per_node[node].append((fraction, to)) + return partitions_per_node + + def _dynamic_tokens_partitions(self, partitions_per_node): + """ + Main loop of the dynamic partitioning strategy. There are 3 stages: + Sending of initial tasks: it returns as much initial tasks as pycompss nodes + Sending of intermediate tasks: until a best granularity is chosen, it return partitions using the granularity + with the best performance + Sending of final tasks: when a best granularity is chosen, it returns all the remaining partitions using the + best granularity + Returns: + a tuple (node, partition) every time it's called + """ + while True: + if self._initial_send > 0: + for initial_tokens in self._send_initial_tasks(partitions_per_node): + yield initial_tokens + elif self._best_granularity is None: + for intermediate_tokens in self._send_intermediate_tasks(partitions_per_node): + yield intermediate_tokens + if sum([len(partitions) for partitions in partitions_per_node.values()]) == 0: + break + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens + break + + def _send_initial_tasks(self, partitions_per_node): + for node in self._tokens_per_node.keys(): + config.splits_per_node = self._basic_partitions[self._initial_send * -1] + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + self._partitions_time[config.splits_per_node] = [] + self._initial_send -= 1 + if self._initial_send == 0: + break + + def _send_intermediate_tasks(self, partitions_per_node): + self._update_partitions_time() + while not self._all_tasks_finished(): + if self._n_idle_nodes > 0: + # if there is an idle node, send a new task without choosing the best granularity + config.splits_per_node, set_best = self._best_time_per_token() + if [] not in self._partitions_time.values() and set_best: + self._best_granularity = config.splits_per_node + break + time.sleep(1) + self._update_partitions_time() + else: + self._best_granularity, _ = self._best_time_per_token() + config.splits_per_node = self._best_granularity + + node = self._idle_cassandra_nodes.popleft() + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + def _send_final_tasks(self, partitions_per_node): + for partition in partitions_per_node.values(): + group_size = max(len(partition) // config.splits_per_node, 1) + for i in range(0, len(partition), group_size): + yield -1, partition[i:i + group_size] + + def _all_tasks_finished(self): + """ + Checks that there is at least one end_time set for all the granularities + """ + if [] in self._partitions_time.values(): + return False + + for _, partition_times in self._partitions_time.items(): + if not any(times["end_time"] for times in partition_times): + return False + return True + + @staticmethod + def _set_best_granularity(best, unfinished): + for _, time_per_token in unfinished.items(): + if time_per_token < best: + return False + return True + + def _best_time_per_token(self): + """ + The time is not a good measure, because the smaller tasks will be the shortest. + We use a time / tokens proportion + """ + times_per_token = dict() + unfinished_tasks = dict() + actual_time = time.time() + + for splits_per_node, partition_times in self._partitions_time.items(): + if len(partition_times) > 0: + group_size = self._partitions_size[splits_per_node] + partition_time = 0.0 + if not any(times["end_time"] for times in partition_times): + """ + If there isn't at least one end_time set for this granularity, takes the actual time as the + finishing time. If there is already a granularity with better performance, it is selected as the + best granularity. + A granularity with this condition cannot be set as the best granularity. + """ + for t in partition_times: + partition_time += actual_time - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + try: + unfinished_tasks[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + else: + # at least one task finished + for t in partition_times: + if t["end_time"] is not None: + partition_time += t["end_time"] - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + if partition_time >= 2.0: + # to avoid having too much overhead, granularities lasting less than two seconds are discarded + try: + times_per_token[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + + sorted_times = sorted(times_per_token.items(), key=lambda item: item[1]) + + if len(sorted_times) > 0: + best_granularity, best_time = sorted_times[0] + set_best = self._set_best_granularity(best_time, unfinished_tasks) + else: + # if no task lasted at least two seconds, pick the biggest granularity + best_granularity = min(set(self._partitions_time.keys()) - set(unfinished_tasks.keys())) + set_best = False + + return best_granularity, set_best + + def _update_partitions_time(self): + partitions_times = config.session.execute(self._select_partitions_times, [self._partitioning_uuid]) + + for storage_id, partitions, start, end in partitions_times: + if start is not None: + for i, times in enumerate(self._partitions_time[partitions]): + if start == times["start_time"]: + if end is not None and times["end_time"] != end: + self._partitions_time[partitions][i]["end_time"] = end + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) + break + else: + if end is not None: + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) + total_time = {"start_time": start, "end_time": end} + self._partitions_time[partitions].append(total_time) + + def split(self): + """ + Method used to divide an object into sub-objects. + Returns: + a subobject everytime is called + """ + st = time.time() + + for node, token_split in self._tokens_partitions(self._father._ksp, self._father._table, + config.token_range_size, + config.target_token_range_size): + storage_id = uuid.uuid4() + log.debug('assigning to %s %d tokens', str(storage_id), len(token_split)) + new_args = self._father._build_args._replace(tokens=token_split, storage_id=storage_id) + args_dict = new_args._asdict() + args_dict["built_remotely"] = False + if self._strategy == "DYNAMIC": + config.session.execute(self._prepared_store_id, + [self._partitioning_uuid, storage_id, config.splits_per_node]) + self._n_idle_nodes -= 1 + self._partitions_nodes[storage_id] = node + yield self._father.__class__.build_remotely(args_dict) + + log.debug('completed split of %s in %f', self.__class__.__name__, time.time() - st) diff --git a/hecuba_py/tests/storageobj_tests.py b/hecuba_py/tests/storageobj_tests.py index 4e03a4a6..e941b292 100644 --- a/hecuba_py/tests/storageobj_tests.py +++ b/hecuba_py/tests/storageobj_tests.py @@ -88,9 +88,11 @@ def test_parse_3(self): def test_init(self): # still in development + original_exec = config.session.execute config.session.execute = Mock(return_value=None) nopars = Words() config.session.execute.assert_not_called() + config.session.execute = original_exec def test_init_pdict(self): t = TestStorageObj() diff --git a/hecuba_py/tests/withcassandra/istorage_split_locality.py b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py similarity index 66% rename from hecuba_py/tests/withcassandra/istorage_split_locality.py rename to hecuba_py/tests/withcassandra/istorage_split_locality_tests.py index f5c6210f..70c96ef3 100644 --- a/hecuba_py/tests/withcassandra/istorage_split_locality.py +++ b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py @@ -1,12 +1,19 @@ import unittest from collections import defaultdict -from hecuba import config -from hecuba.IStorage import _tokens_partitions, _discrete_token_ranges +from hecuba import config, StorageDict +from hecuba.IStorage import _discrete_token_ranges +from hecuba.partitioner import Partitioner from .. import test_config +class SimpleObj(StorageDict): + ''' + @TypeSpec dict<, v:int> + ''' + + class IStorageSplitLocalityTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -16,36 +23,44 @@ def setUpClass(cls): config.session.execute("CREATE TABLE IF NOT EXISTS test_ksp.tab(k int PRIMARY KEY,v int)") def test_enough_token(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list(_tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=10, token_range_size=None, - target_token_range_size=64 * 1024 * 1024)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 10 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 10, 20) def test_too_little_tokens(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list(_tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1000, token_range_size=None, - target_token_range_size=64 * 1024)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1000 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1000, 1000) def test_splitting_tokens(self): - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - tkns_p = list( - _tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1, - token_range_size=int((2 ** 64) / 1000), - target_token_range_size=None)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 + tkns_p = list(partitioner._tokens_partitions("test_ksp", "tab", token_range_size=int((2 ** 64) / 1000), + target_token_range_size=None)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1, 1000) def test_using_size_estimates(self): - for i in xrange(100000): + for i in range(100000): config.session.execute("INSERT INTO test_ksp.tab(k,v) values(%s,%s)", [i, i]) test_config.ccm_cluster.flush() test_config.ccm_cluster.compact() - all_tokens = _discrete_token_ranges(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) + + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 tkns_p = list( - _tokens_partitions("test_ksp", "tab", all_tokens, splits_per_node=1, - token_range_size=None, - target_token_range_size=64)) - #self.check_all(tkns_p, 1, 1000) + partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, target_token_range_size=64)) + # self.check_all(tkns_p, 1, 1000) def check_all(self, tkns_p, split_per_node, expected_total_tkns): self.assertGreaterEqual(len(tkns_p), len(test_config.ccm_cluster.nodes) * split_per_node) @@ -58,11 +73,11 @@ def check_all(self, tkns_p, split_per_node, expected_total_tkns): def check_full_range(self, list_of_ranges): list_of_ranges.sort() - start = map(lambda a: a[0], list_of_ranges) - counts = filter(lambda size: size[1] > 1, map(lambda number: (number, start.count(number)), start)) + start = list(map(lambda a: a[0], list_of_ranges)) + counts = list(filter(lambda size: size[1] > 1, map(lambda number: (number, start.count(number)), start))) self.assertEqual(0, len(counts), "duplicated starts") - end = map(lambda a: a[0], list_of_ranges) - counts = filter(lambda size: size[1] > 1, map(lambda number: (number, end.count(number)), end)) + end = list(map(lambda a: a[0], list_of_ranges)) + counts = list(filter(lambda size: size[1] > 1, map(lambda number: (number, end.count(number)), end))) self.assertEqual(0, len(counts), "duplicated ends") first, last = list_of_ranges[0] @@ -93,6 +108,10 @@ def checkToken(self, tokens): # type: (List[Long]) -> Host from cassandra.metadata import Token tm = config.cluster.metadata.token_map + + # only the first token of each partition is not assigned correctly + tokens = [(tok[0] + 1, tok[1]) for tok in tokens] + hosts = set(map(lambda token: tm.get_replicas("test_ksp", token)[0], map(lambda a: Token(a[0]), tokens))) self.assertEqual(len(hosts), 1, "A token range is local in 2 nodes") diff --git a/storageAPI/storage/api.py b/storageAPI/storage/api.py index 4b3cf603..5b8662bd 100755 --- a/storageAPI/storage/api.py +++ b/storageAPI/storage/api.py @@ -1,5 +1,6 @@ import uuid + def init(config_file_path=None): """ Function that can be useful when running the application with COMPSs >= 2.0 @@ -38,7 +39,28 @@ def start_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break def end_task(params): @@ -48,7 +70,28 @@ def end_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET end_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break class TaskContext(object): From b34189c730fe5843b3fb60dfa228b873f3ed3efa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Mon, 14 Oct 2019 08:35:15 +0200 Subject: [PATCH 03/19] added tests --- .../tests/withcassandra/partitioner_tests.py | 304 ++++++++++++++++++ 1 file changed, 304 insertions(+) create mode 100644 hecuba_py/tests/withcassandra/partitioner_tests.py diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py new file mode 100644 index 00000000..7382d0b0 --- /dev/null +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -0,0 +1,304 @@ +import os +import time +import unittest +from random import randint + +from hecuba import config, StorageDict + + +class MyDict(StorageDict): + ''' + @TypeSpec dict<, val0:str> + ''' + + +class PartitionerTest(unittest.TestCase): + + def computeItems(self, SDict): + counter = 0 + for _ in SDict.keys(): + counter = counter + 1 + return counter + + def test_simple(self): + config.splits_per_node = 32 + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + nsplits = 0 + config.partition_strategy = "SIMPLE" + for partition in d.split(): + nsplits += 1 + acc += self.computeItems(partition) + + nodes_number = 2 + print("number of splits: %s, best is %s" % (nsplits, config.splits_per_node * nodes_number)) + self.assertEqual(nitems, acc) + + def test_dynamic_simple(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 5)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 1: + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // 2) + + def test_dynamic_simple_other(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 10), (0, 80)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 1: + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 32)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 32 // 2) + + def test_dynamic_different_nodes(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "5" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45, 64, 90] + times = [(0, 1000), (0, 1000), (0, 20), (0, 1000)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 3: + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s\n" % (nsplits, 64)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 64 // 2) + + def test_dynamic_best_without_finishing(self): + """ + Test if the best granularity is set without finishing all the initial granularities tasks. + This happens when all the unfinished tasks are worse than the best granularity with at least one finished task + """ + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 40)] + nsplits = 0 + for partition in d.split(): + # pretending that task with gran=32 is taking a lot of time + if nsplits == 0: + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [0, partition._storage_id]) + elif nsplits == 1: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + else: + self.assertEqual(config.splits_per_node, 45 // 2) + + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // 2) + + def test_dynamic_best_idle_nodes(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 120), (0, 60)] + nsplits = 0 + for partition in d.split(): + # pretending that task with gran=32 is taking a lot of time + if nsplits == 0: + id_partition0 = partition._storage_id + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [time.time(), partition._storage_id]) + elif nsplits == 1: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + elif nsplits == 15: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [time.time() + 150, id_partition0]) + elif 1 < nsplits < 15: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + start = randint(0, 200) + config.session.execute(query, [start, start + 60, partition._storage_id]) + + if nsplits > 1: + self.assertEqual(config.splits_per_node, 45 // 2) + + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // 2) + + def test_dynamic_idle_nodes_new_best(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 60)] + nsplits = 0 + for partition in d.split(): + if nsplits == 0: + id_partition0 = partition._storage_id + # this will be done by the compss api + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + # time.time() to avoid choosing gran=64 when task with gran=32 taking a lot of time + # dynamic partitioning mode will use time.time() to check how much is taking + config.session.execute(query, [time.time(), partition._storage_id]) + elif nsplits == 1: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + elif nsplits == 10: + last_time = config.session.execute("""SELECT start_time FROM hecuba.partitioning + WHERE storage_id = %s""" % id_partition0)[0][0] + query = config.session.prepare("""UPDATE hecuba.partitioning + SET end_time = ? + WHERE storage_id = ?""") + config.session.execute(query, [last_time + 80, id_partition0]) + else: + query = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ?, end_time = ? + WHERE storage_id = ?""") + start = randint(0, 200) + config.session.execute(query, [start, start + 60, partition._storage_id]) + + if 10 >= nsplits >= 2: + self.assertEqual(config.splits_per_node, 45 // 2) + elif nsplits > 10: + self.assertEqual(config.splits_per_node, 32 // 2) + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 32)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 32 // 2) + + +if __name__ == "__main__": + unittest.main() From e2c7a1fa885916b0cfdce24fa3f1a327e7d2bafa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Mon, 14 Oct 2019 09:40:46 +0200 Subject: [PATCH 04/19] added assertion in test --- hecuba_py/tests/withcassandra/istorage_split_locality_tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py index 70c96ef3..67fb76fa 100644 --- a/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py +++ b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py @@ -60,6 +60,7 @@ def test_using_size_estimates(self): config.splits_per_node = 1 tkns_p = list( partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, target_token_range_size=64)) + self.assertEqual(len(tkns_p), 2) # self.check_all(tkns_p, 1, 1000) def check_all(self, tkns_p, split_per_node, expected_total_tkns): From 07b3ee7f845912d1cc9fe835419aab03bc7b65b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Mon, 14 Oct 2019 10:24:27 +0200 Subject: [PATCH 05/19] reverted changes --- hecuba_py/tests/withcassandra/istorage_split_locality_tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py index 67fb76fa..70c96ef3 100644 --- a/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py +++ b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py @@ -60,7 +60,6 @@ def test_using_size_estimates(self): config.splits_per_node = 1 tkns_p = list( partitioner._tokens_partitions("test_ksp", "tab", token_range_size=None, target_token_range_size=64)) - self.assertEqual(len(tkns_p), 2) # self.check_all(tkns_p, 1, 1000) def check_all(self, tkns_p, split_per_node, expected_total_tkns): From 4e1cbb6268e2a9e67883e3ac1ddcc5eb83fe166f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Mon, 14 Oct 2019 13:50:43 +0200 Subject: [PATCH 06/19] fixed tests and improved code --- hecuba_py/hecuba/partitioner.py | 29 +++--- .../tests/withcassandra/partitioner_tests.py | 88 +++++++------------ 2 files changed, 46 insertions(+), 71 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index 3ed8f2be..9f31cb38 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -158,19 +158,20 @@ def _dynamic_tokens_partitions(self, partitions_per_node): Returns: a tuple (node, partition) every time it's called """ - while True: - if self._initial_send > 0: - for initial_tokens in self._send_initial_tasks(partitions_per_node): - yield initial_tokens - elif self._best_granularity is None: - for intermediate_tokens in self._send_intermediate_tasks(partitions_per_node): - yield intermediate_tokens - if sum([len(partitions) for partitions in partitions_per_node.values()]) == 0: - break - else: - for final_tokens in self._send_final_tasks(partitions_per_node): - yield final_tokens + + while self._initial_send > 0: + for initial_tokens in self._send_initial_tasks(partitions_per_node): + yield initial_tokens + + while self._best_granularity is None: + for intermediate_tokens in self._send_intermediate_tasks(partitions_per_node): + yield intermediate_tokens + if sum([len(partitions) for partitions in partitions_per_node.values()]) == 0: + self._best_granularity = config.splits_per_node break + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens def _send_initial_tasks(self, partitions_per_node): for node in self._tokens_per_node.keys(): @@ -302,11 +303,11 @@ def _update_partitions_time(self): self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) break else: + total_time = {"start_time": start, "end_time": end} + self._partitions_time[partitions].append(total_time) if end is not None: self._n_idle_nodes += 1 self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) - total_time = {"start_time": start, "end_time": end} - self._partitions_time[partitions].append(total_time) def split(self): """ diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index 7382d0b0..f9148265 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -12,6 +12,19 @@ class MyDict(StorageDict): ''' +set_start_time = """UPDATE hecuba.partitioning + SET start_time = %s + WHERE storage_id = %s""" + +set_time = """UPDATE hecuba.partitioning + SET start_time = %s, end_time = %s + WHERE storage_id = %s""" + +set_end_time = """UPDATE hecuba.partitioning + SET end_time = %s + WHERE storage_id = %s""" + + class PartitionerTest(unittest.TestCase): def computeItems(self, SDict): @@ -64,10 +77,7 @@ def test_dynamic_simple(self): for partition in d.split(): if nsplits <= 1: # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -97,10 +107,7 @@ def test_dynamic_simple_other(self): for partition in d.split(): if nsplits <= 1: # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -130,10 +137,7 @@ def test_dynamic_different_nodes(self): for partition in d.split(): if nsplits <= 3: # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -168,15 +172,9 @@ def test_dynamic_best_without_finishing(self): # pretending that task with gran=32 is taking a lot of time if nsplits == 0: # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [0, partition._storage_id]) + config.session.execute(set_start_time, [0, partition._storage_id]) elif nsplits == 1: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) else: self.assertEqual(config.splits_per_node, 45 // 2) @@ -210,26 +208,14 @@ def test_dynamic_best_idle_nodes(self): if nsplits == 0: id_partition0 = partition._storage_id # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [time.time(), partition._storage_id]) + config.session.execute(set_start_time, [time.time(), partition._storage_id]) elif nsplits == 1: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) - elif nsplits == 15: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [time.time() + 150, id_partition0]) - elif 1 < nsplits < 15: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + elif nsplits == 5: + config.session.execute(set_end_time, [time.time() + 150, id_partition0]) + elif 1 < nsplits < 5: start = randint(0, 200) - config.session.execute(query, [start, start + 60, partition._storage_id]) + config.session.execute(set_time, [start, start + 60, partition._storage_id]) if nsplits > 1: self.assertEqual(config.splits_per_node, 45 // 2) @@ -263,34 +249,22 @@ def test_dynamic_idle_nodes_new_best(self): if nsplits == 0: id_partition0 = partition._storage_id # this will be done by the compss api - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ? - WHERE storage_id = ?""") # time.time() to avoid choosing gran=64 when task with gran=32 taking a lot of time # dynamic partitioning mode will use time.time() to check how much is taking - config.session.execute(query, [time.time(), partition._storage_id]) + config.session.execute(set_start_time, [time.time(), partition._storage_id]) elif nsplits == 1: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [times[nsplits][0], times[nsplits][1], partition._storage_id]) - elif nsplits == 10: + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + elif nsplits == 5: last_time = config.session.execute("""SELECT start_time FROM hecuba.partitioning WHERE storage_id = %s""" % id_partition0)[0][0] - query = config.session.prepare("""UPDATE hecuba.partitioning - SET end_time = ? - WHERE storage_id = ?""") - config.session.execute(query, [last_time + 80, id_partition0]) + config.session.execute(set_end_time, [last_time + 80, id_partition0]) else: - query = config.session.prepare("""UPDATE hecuba.partitioning - SET start_time = ?, end_time = ? - WHERE storage_id = ?""") start = randint(0, 200) - config.session.execute(query, [start, start + 60, partition._storage_id]) + config.session.execute(set_time, [start, start + 60, partition._storage_id]) - if 10 >= nsplits >= 2: + if 5 >= nsplits >= 2: self.assertEqual(config.splits_per_node, 45 // 2) - elif nsplits > 10: + elif nsplits > 5: self.assertEqual(config.splits_per_node, 32 // 2) nsplits += 1 acc += self.computeItems(partition) From 006968bf42bc4d3a95119e443d3784e72fb7e1e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Tue, 15 Oct 2019 12:35:08 +0200 Subject: [PATCH 07/19] storage api tests added --- hecuba_py/tests/storage_api_tests.py | 123 ++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/hecuba_py/tests/storage_api_tests.py b/hecuba_py/tests/storage_api_tests.py index d0d0579f..0861b598 100644 --- a/hecuba_py/tests/storage_api_tests.py +++ b/hecuba_py/tests/storage_api_tests.py @@ -1,6 +1,8 @@ import unittest +import uuid +import logging -from storage.api import getByID +from storage.api import getByID, TaskContext, start_task, end_task from hecuba import config, StorageDict @@ -10,12 +12,127 @@ class ApiTestSDict(StorageDict): ''' -class StorageApi_Tests(unittest.TestCase): +select_time = "SELECT * FROM hecuba.partitioning" +create_partitioning = """CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""" - def class_type_test(self): + +class SimpleObject: + pass + + +class StorageApiTest(unittest.TestCase): + + def test_class_type(self): base_dict = ApiTestSDict('test.api_sdict') storage_id = base_dict.getID() del base_dict rebuild_dict = getByID(storage_id) self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_start_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj._storage_id = storage_id + + start_task([simple_obj]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj._storage_id = storage_id + + end_task([simple_obj]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj._storage_id = storage_id + + task_context = TaskContext(logger=logging, values=[simple_obj]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + def test_start_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj.__dict__["key"] = str(storage_id) + + start_task([simple_obj]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj.__dict__["key"] = str(storage_id) + + end_task([simple_obj]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(create_partitioning) + + storage_id = uuid.uuid4() + simple_obj = SimpleObject() + simple_obj.__dict__["key"] = str(storage_id) + + task_context = TaskContext(logger=logging, values=[simple_obj]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + +if __name__ == "__main__": + unittest.main() From da073fb90c8a0e7ce679e214d276e84e72cbd791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo?= Date: Mon, 21 Oct 2019 10:31:10 +0200 Subject: [PATCH 08/19] requested changes --- hecuba_py/hecuba/partitioner.py | 80 +++++++++++-------- .../tests/withcassandra/partitioner_tests.py | 16 ++++ 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index 9f31cb38..054f3054 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -15,10 +15,13 @@ def partitioner_split(father): - if hasattr(config, "partition_strategy"): - return Partitioner(father, config.partition_strategy).split() + if hasattr(config, "partition_strategy") and ( + config.partition_strategy == "DYNAMIC" or config.partition_strategy == "SIMPLE"): + strategy = config.partition_strategy else: - return Partitioner(father, "SIMPLE").split() + strategy = "SIMPLE" + + return Partitioner(father, strategy).split() class Partitioner: @@ -82,7 +85,14 @@ def _setup_dynamic_structures(self): try: self._nodes_number = len(os.environ["PYCOMPSS_NODES"].split(",")) - 1 except KeyError: + pass + try: self._nodes_number = int(os.environ["NODES_NUMBER"]) - 1 # master and worker + except KeyError: + raise RuntimeError("You must set the environment variable PYCOMPSS_NODES|NODES_NUMBER before using " + "the dynamic task granularity scheduler." + "\nPYCOMPSS_NODES should be a list of nodes separated by commas." + "\nNODES_NUMBER should be an integer representing the number of hosts.") self._n_idle_nodes = self._nodes_number self._initial_send = self._nodes_number @@ -190,7 +200,7 @@ def _send_initial_tasks(self, partitions_per_node): def _send_intermediate_tasks(self, partitions_per_node): self._update_partitions_time() - while not self._all_tasks_finished(): + while not self._at_least_each_granularity_finished(): if self._n_idle_nodes > 0: # if there is an idle node, send a new task without choosing the best granularity config.splits_per_node, set_best = self._best_time_per_token() @@ -217,7 +227,7 @@ def _send_final_tasks(self, partitions_per_node): for i in range(0, len(partition), group_size): yield -1, partition[i:i + group_size] - def _all_tasks_finished(self): + def _at_least_each_granularity_finished(self): """ Checks that there is at least one end_time set for all the granularities """ @@ -245,38 +255,40 @@ def _best_time_per_token(self): unfinished_tasks = dict() actual_time = time.time() - for splits_per_node, partition_times in self._partitions_time.items(): - if len(partition_times) > 0: - group_size = self._partitions_size[splits_per_node] - partition_time = 0.0 - if not any(times["end_time"] for times in partition_times): - """ - If there isn't at least one end_time set for this granularity, takes the actual time as the - finishing time. If there is already a granularity with better performance, it is selected as the - best granularity. - A granularity with this condition cannot be set as the best granularity. - """ - for t in partition_times: - partition_time += actual_time - t["start_time"] - - partition_time = partition_time / float(len(partition_times)) + not_empty_partitions = ((splits, partition_times) for splits, partition_times + in self._partitions_time.items() if partition_times) + + for splits_per_node, partition_times in not_empty_partitions: + group_size = self._partitions_size[splits_per_node] + partition_time = 0.0 + if not any(times["end_time"] for times in partition_times): + """ + If there isn't at least one end_time set for this granularity, takes the actual time as the + finishing time. If there is already a granularity with better performance, it is selected as the + best granularity. + A granularity with this condition cannot be set as the best granularity. + """ + for t in partition_times: + partition_time += actual_time - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + try: + unfinished_tasks[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + else: + # at least one task finished + for t in partition_times: + if t["end_time"] is not None: + partition_time += t["end_time"] - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + if partition_time >= 2.0: + # to avoid having too much overhead, granularities lasting less than two seconds are discarded try: - unfinished_tasks[splits_per_node] = partition_time / group_size + times_per_token[splits_per_node] = partition_time / group_size except ZeroDivisionError: pass - else: - # at least one task finished - for t in partition_times: - if t["end_time"] is not None: - partition_time += t["end_time"] - t["start_time"] - - partition_time = partition_time / float(len(partition_times)) - if partition_time >= 2.0: - # to avoid having too much overhead, granularities lasting less than two seconds are discarded - try: - times_per_token[splits_per_node] = partition_time / group_size - except ZeroDivisionError: - pass sorted_times = sorted(times_per_token.items(), key=lambda item: item[1]) diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index f9148265..d91f1298 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -273,6 +273,22 @@ def test_dynamic_idle_nodes_new_best(self): self.assertEqual(nitems, acc) self.assertEqual(config.splits_per_node, 32 // 2) + def test_check_nodes_not_set(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + d = MyDict("my_app.mydict") + + def raise_exception(): + return [partition for partition in d.split()] + + config.partition_strategy = "DYNAMIC" + + if "NODES_NUMBER" in os.environ: + del os.environ["NODES_NUMBER"] + if "PYCOMPSS_NODES" in os.environ: + del os.environ["PYCOMPSS_NODES"] + + self.assertRaises(RuntimeError, raise_exception) + if __name__ == "__main__": unittest.main() From a6fe14a1c9181a0b83f5e7e30d472832b92aa104 Mon Sep 17 00:00:00 2001 From: Pol Santamaria Date: Wed, 13 Nov 2019 17:47:17 +0100 Subject: [PATCH 09/19] Merge fix #1 --- hecuba_py/hecuba/partitioner.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index 6dd0fa62..d8695483 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -123,24 +123,24 @@ def _compute_partitions_per_node(self, ksp, table, token_range_size, target_toke Returns: a dictionary with hosts as keys and partitions of tokens as values """ - - from collections import defaultdict - splits_per_node = config.splits_per_node + step_size = _max_token // (config.splits_per_node * self._nodes_number) - n_nodes = len(self._tokens_per_node) - step_size = _max_token // (splits_per_node * n_nodes) if token_range_size: step_size = token_range_size elif target_token_range_size: - one = config.session.execute(_size_estimates, [ksp, table]).one() + res = config.session.execute(_size_estimates, [ksp, table]) + if res: + one = res.one() + else: + one = 0 if one: (mean_p_size, p_count) = one estimated_size = mean_p_size * p_count if estimated_size > 0: step_size = _max_token // ( max(estimated_size / target_token_range_size, - splits_per_node * n_nodes) + splits_per_node * self._nodes_number) ) if self._strategy == "DYNAMIC": From f22293b4c511605cb58088cae329ffafd0a5c99b Mon Sep 17 00:00:00 2001 From: Pol Santamaria Date: Thu, 14 Nov 2019 19:28:26 +0100 Subject: [PATCH 10/19] Tests to evalaute Dynamic strategy --- hecuba_py/tests/storage_api_tests.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hecuba_py/tests/storage_api_tests.py b/hecuba_py/tests/storage_api_tests.py index 34b31755..9bc62e1d 100644 --- a/hecuba_py/tests/storage_api_tests.py +++ b/hecuba_py/tests/storage_api_tests.py @@ -17,6 +17,15 @@ class ApiTestSDict(StorageDict): class StorageApiTest(unittest.TestCase): + def setUp(self) -> None: + config.partition_strategy = "DYNAMIC" + import os + os.environ['PYCOMPSS_NODES'] = '2' + os.environ['NODES_NUMBER'] = '2' + + def tearDown(self) -> None: + config.partition_strategy = "SIMPLE" + def test_class_type(self): base_dict = ApiTestSDict('test.api_sdict') storage_id = base_dict.getID() From 90c1119d46ea980e6e14e830f995b92948b725e7 Mon Sep 17 00:00:00 2001 From: Pol Santamaria Date: Thu, 14 Nov 2019 19:28:58 +0100 Subject: [PATCH 11/19] Merge fixes, part 2 --- hecuba_py/hecuba/partitioner.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index d8695483..af9833e2 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -108,8 +108,9 @@ def tokens_partitions(self, ksp, table, token_range_size, target_token_range_siz [self._partitioning_uuid, storage_id, config.splits_per_node]) self._n_idle_nodes -= 1 # self._partitions_nodes[storage_id] = node TODO why? where do you get node? - - yield uuid.uuid4(), partition_tokens + storage_id = uuid.uuid4() + self._partitions_nodes[storage_id] = partition_tokens[0] + yield storage_id, partition_tokens[1] else: for final_tokens in self._send_final_tasks(partitions_per_node): yield uuid.uuid4(), final_tokens @@ -124,6 +125,9 @@ def _compute_partitions_per_node(self, ksp, table, token_range_size, target_toke a dictionary with hosts as keys and partitions of tokens as values """ splits_per_node = config.splits_per_node + token_range_size = config.token_range_size + target_token_range_size = config.target_token_range_size + step_size = _max_token // (config.splits_per_node * self._nodes_number) if token_range_size: @@ -155,10 +159,6 @@ def _compute_partitions_per_node(self, ksp, table, token_range_size, target_toke fraction += step_size partitions_per_node[node].append((fraction, to)) - #group_size = max(len(partition) // splits_per_node, 1) TODO - #for i in range(0, len(partition), group_size): - # yield partition[i:i + group_size] - return partitions_per_node def _dynamic_tokens_partitions(self, partitions_per_node): @@ -229,7 +229,7 @@ def _send_final_tasks(self, partitions_per_node): for partition in partitions_per_node.values(): group_size = max(len(partition) // config.splits_per_node, 1) for i in range(0, len(partition), group_size): - yield -1, partition[i:i + group_size] + yield partition[i:i + group_size] def _at_least_each_granularity_finished(self): """ From c58ba3de5ce6b1c95f51e976203dc88fa6057a76 Mon Sep 17 00:00:00 2001 From: Pol Santamaria Date: Mon, 18 Nov 2019 15:37:42 +0100 Subject: [PATCH 12/19] Remove Hecuba objects from PyCOMPSs API tests --- hecuba_py/hecuba/partitioner.py | 18 ++-- hecuba_py/tests/storage_api_tests.py | 82 +++++++++---------- .../tests/withcassandra/partitioner_tests.py | 26 +++--- 3 files changed, 63 insertions(+), 63 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index af9833e2..893f6ee2 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -9,6 +9,15 @@ _size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) +_dynamic_part_table_cql = """CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""" + _max_token = int(((2 ** 63) - 1)) # type: int _min_token = int(-2 ** 63) # type: int @@ -45,14 +54,7 @@ def _rebuild_token_ring(self, ksp, tokens_ranges): def _setup_dynamic_structures(self): try: - config.session.execute("""CREATE TABLE IF NOT EXISTS hecuba.partitioning( - partitioning_uuid uuid, - storage_id uuid, - number_of_partitions int, - start_time double, - end_time double, - PRIMARY KEY (storage_id)) - WITH default_time_to_live = 86400""") + config.session.execute(_dynamic_part_table_cql) except Exception as ex: print("Could not create table hecuba.partitioning.") raise ex diff --git a/hecuba_py/tests/storage_api_tests.py b/hecuba_py/tests/storage_api_tests.py index 9bc62e1d..6b50a24c 100644 --- a/hecuba_py/tests/storage_api_tests.py +++ b/hecuba_py/tests/storage_api_tests.py @@ -5,6 +5,8 @@ from storage.api import getByID, TaskContext, start_task, end_task from hecuba import config, StorageDict +from hecuba.partitioner import _dynamic_part_table_cql + class ApiTestSDict(StorageDict): ''' @@ -15,18 +17,30 @@ class ApiTestSDict(StorageDict): select_time = "SELECT * FROM hecuba.partitioning" +class PyCOMPSsArg: + def __init__(self, storage_id): + self.key = str(storage_id) + + class StorageApiTest(unittest.TestCase): - def setUp(self) -> None: - config.partition_strategy = "DYNAMIC" - import os - os.environ['PYCOMPSS_NODES'] = '2' - os.environ['NODES_NUMBER'] = '2' + def test_get_by_id_uuid(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = base_dict.storage_id + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_get_by_id_str(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = str(base_dict.storage_id) + del base_dict - def tearDown(self) -> None: - config.partition_strategy = "SIMPLE" + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) - def test_class_type(self): + def test_get_by_id_getID(self): base_dict = ApiTestSDict('test.api_sdict') storage_id = base_dict.getID() del base_dict @@ -36,42 +50,37 @@ def test_class_type(self): def test_start_task_uuid(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() + storage_id = uuid.uuid4() - start_task([simple_obj]) + start_task([PyCOMPSsArg(storage_id)]) inserted = list(config.session.execute(select_time)) self.assertEqual(len(inserted), 1) self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].start_time, None) - simple_obj.delete_persistent() def test_end_task_uuid(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() + storage_id = uuid.uuid4() - end_task([simple_obj]) + end_task([PyCOMPSsArg(storage_id)]) inserted = list(config.session.execute(select_time)) self.assertEqual(len(inserted), 1) self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].end_time, None) - simple_obj.delete_persistent() def test_task_context_uuid(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() + storage_id = uuid.uuid4() - task_context = TaskContext(logger=logging, values=[simple_obj]) + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) task_context.__enter__() task_context.__exit__(type=None, value=None, traceback=None) @@ -80,49 +89,39 @@ def test_task_context_uuid(self): self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].start_time, None) self.assertNotEqual(inserted[0].end_time, None) - simple_obj.delete_persistent() def test_start_task_key(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + storage_id = uuid.uuid4() - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() - simple_obj.__dict__["key"] = str(storage_id) - - start_task([simple_obj]) + start_task([PyCOMPSsArg(storage_id)]) inserted = list(config.session.execute(select_time)) self.assertEqual(len(inserted), 1) self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].start_time, None) - simple_obj.delete_persistent() def test_end_task_key(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() - simple_obj.__dict__["key"] = str(storage_id) + storage_id = uuid.uuid4() - end_task([simple_obj]) + end_task([PyCOMPSsArg(storage_id)]) inserted = list(config.session.execute(select_time)) self.assertEqual(len(inserted), 1) self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].end_time, None) - simple_obj.delete_persistent() def test_task_context_key(self): config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) - base_obj = ApiTestSDict("test.compss_api") - simple_obj = next(base_obj.split()) - storage_id = simple_obj.getID() - simple_obj.__dict__["key"] = str(storage_id) + storage_id = uuid.uuid4() - task_context = TaskContext(logger=logging, values=[simple_obj]) + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) task_context.__enter__() task_context.__exit__(type=None, value=None, traceback=None) @@ -131,7 +130,6 @@ def test_task_context_key(self): self.assertEqual(inserted[0].storage_id, storage_id) self.assertNotEqual(inserted[0].start_time, None) self.assertNotEqual(inserted[0].end_time, None) - simple_obj.delete_persistent() if __name__ == "__main__": diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index d91f1298..98edd3d4 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -77,7 +77,7 @@ def test_dynamic_simple(self): for partition in d.split(): if nsplits <= 1: # this will be done by the compss api - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -107,7 +107,7 @@ def test_dynamic_simple_other(self): for partition in d.split(): if nsplits <= 1: # this will be done by the compss api - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -137,7 +137,7 @@ def test_dynamic_different_nodes(self): for partition in d.split(): if nsplits <= 3: # this will be done by the compss api - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) nsplits += 1 acc += self.computeItems(partition) @@ -172,9 +172,9 @@ def test_dynamic_best_without_finishing(self): # pretending that task with gran=32 is taking a lot of time if nsplits == 0: # this will be done by the compss api - config.session.execute(set_start_time, [0, partition._storage_id]) + config.session.execute(set_start_time, [0, partition.storage_id]) elif nsplits == 1: - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) else: self.assertEqual(config.splits_per_node, 45 // 2) @@ -206,16 +206,16 @@ def test_dynamic_best_idle_nodes(self): for partition in d.split(): # pretending that task with gran=32 is taking a lot of time if nsplits == 0: - id_partition0 = partition._storage_id + id_partition0 = partition.storage_id # this will be done by the compss api - config.session.execute(set_start_time, [time.time(), partition._storage_id]) + config.session.execute(set_start_time, [time.time(), partition.storage_id]) elif nsplits == 1: - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) elif nsplits == 5: config.session.execute(set_end_time, [time.time() + 150, id_partition0]) elif 1 < nsplits < 5: start = randint(0, 200) - config.session.execute(set_time, [start, start + 60, partition._storage_id]) + config.session.execute(set_time, [start, start + 60, partition.storage_id]) if nsplits > 1: self.assertEqual(config.splits_per_node, 45 // 2) @@ -247,20 +247,20 @@ def test_dynamic_idle_nodes_new_best(self): nsplits = 0 for partition in d.split(): if nsplits == 0: - id_partition0 = partition._storage_id + id_partition0 = partition.storage_id # this will be done by the compss api # time.time() to avoid choosing gran=64 when task with gran=32 taking a lot of time # dynamic partitioning mode will use time.time() to check how much is taking - config.session.execute(set_start_time, [time.time(), partition._storage_id]) + config.session.execute(set_start_time, [time.time(), partition.storage_id]) elif nsplits == 1: - config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition._storage_id]) + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) elif nsplits == 5: last_time = config.session.execute("""SELECT start_time FROM hecuba.partitioning WHERE storage_id = %s""" % id_partition0)[0][0] config.session.execute(set_end_time, [last_time + 80, id_partition0]) else: start = randint(0, 200) - config.session.execute(set_time, [start, start + 60, partition._storage_id]) + config.session.execute(set_time, [start, start + 60, partition.storage_id]) if 5 >= nsplits >= 2: self.assertEqual(config.splits_per_node, 45 // 2) From 439e10ae06f8279c6bc6a244eda87961bd0c4b5f Mon Sep 17 00:00:00 2001 From: Pol Santamaria Date: Mon, 18 Nov 2019 15:48:34 +0100 Subject: [PATCH 13/19] Merge Storage API tests --- hecuba_py/tests/storage_api_tests.py | 136 ------------------ .../tests/withcassandra/storage_api_tests.py | 128 ++++++++++++++++- 2 files changed, 126 insertions(+), 138 deletions(-) delete mode 100644 hecuba_py/tests/storage_api_tests.py diff --git a/hecuba_py/tests/storage_api_tests.py b/hecuba_py/tests/storage_api_tests.py deleted file mode 100644 index 6b50a24c..00000000 --- a/hecuba_py/tests/storage_api_tests.py +++ /dev/null @@ -1,136 +0,0 @@ -import unittest -import uuid -import logging - -from storage.api import getByID, TaskContext, start_task, end_task -from hecuba import config, StorageDict - -from hecuba.partitioner import _dynamic_part_table_cql - - -class ApiTestSDict(StorageDict): - ''' - @TypeSpec dict<, value:double> - ''' - - -select_time = "SELECT * FROM hecuba.partitioning" - - -class PyCOMPSsArg: - def __init__(self, storage_id): - self.key = str(storage_id) - - -class StorageApiTest(unittest.TestCase): - - def test_get_by_id_uuid(self): - base_dict = ApiTestSDict('test.api_sdict') - storage_id = base_dict.storage_id - del base_dict - - rebuild_dict = getByID(storage_id) - self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) - - def test_get_by_id_str(self): - base_dict = ApiTestSDict('test.api_sdict') - storage_id = str(base_dict.storage_id) - del base_dict - - rebuild_dict = getByID(storage_id) - self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) - - def test_get_by_id_getID(self): - base_dict = ApiTestSDict('test.api_sdict') - storage_id = base_dict.getID() - del base_dict - - rebuild_dict = getByID(storage_id) - self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) - - def test_start_task_uuid(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - - storage_id = uuid.uuid4() - - start_task([PyCOMPSsArg(storage_id)]) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].start_time, None) - - def test_end_task_uuid(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - - storage_id = uuid.uuid4() - - end_task([PyCOMPSsArg(storage_id)]) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].end_time, None) - - def test_task_context_uuid(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - - storage_id = uuid.uuid4() - - task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) - task_context.__enter__() - task_context.__exit__(type=None, value=None, traceback=None) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].start_time, None) - self.assertNotEqual(inserted[0].end_time, None) - - def test_start_task_key(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - storage_id = uuid.uuid4() - - start_task([PyCOMPSsArg(storage_id)]) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].start_time, None) - - def test_end_task_key(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - - storage_id = uuid.uuid4() - - end_task([PyCOMPSsArg(storage_id)]) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].end_time, None) - - def test_task_context_key(self): - config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") - config.session.execute(_dynamic_part_table_cql) - - storage_id = uuid.uuid4() - - task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) - task_context.__enter__() - task_context.__exit__(type=None, value=None, traceback=None) - - inserted = list(config.session.execute(select_time)) - self.assertEqual(len(inserted), 1) - self.assertEqual(inserted[0].storage_id, storage_id) - self.assertNotEqual(inserted[0].start_time, None) - self.assertNotEqual(inserted[0].end_time, None) - - -if __name__ == "__main__": - unittest.main() diff --git a/hecuba_py/tests/withcassandra/storage_api_tests.py b/hecuba_py/tests/withcassandra/storage_api_tests.py index 9e8bd375..2a147c6c 100644 --- a/hecuba_py/tests/withcassandra/storage_api_tests.py +++ b/hecuba_py/tests/withcassandra/storage_api_tests.py @@ -1,9 +1,14 @@ import unittest +import uuid +import logging -from storage.api import getByID -from ..app.words import Words +from storage.api import getByID, TaskContext, start_task, end_task from hecuba import config, StorageDict +from hecuba.partitioner import _dynamic_part_table_cql + +from ..app.words import Words + class ApiTestSDict(StorageDict): ''' @@ -11,6 +16,14 @@ class ApiTestSDict(StorageDict): ''' +select_time = "SELECT * FROM hecuba.partitioning" + + +class PyCOMPSsArg: + def __init__(self, storage_id): + self.key = str(storage_id) + + class StorageApi_Tests(unittest.TestCase): def class_type_test(self): base_dict = ApiTestSDict('test.api_sdict') @@ -47,3 +60,114 @@ def test_getByID_storage_obj(self): b = Words('testspace.tt') new_block = getByID(b.storage_id) self.assertEqual(b, new_block) + + def test_get_by_id_uuid(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = base_dict.storage_id + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_get_by_id_str(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = str(base_dict.storage_id) + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_get_by_id_getID(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = base_dict.getID() + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_start_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + start_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + end_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + def test_start_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + storage_id = uuid.uuid4() + + start_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + end_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + +if __name__ == "__main__": + unittest.main() From 986198297ff5e6882a33e9fe06c4314780adbb7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Fri, 10 Jan 2020 11:48:12 +0100 Subject: [PATCH 14/19] solved merge problems --- hecuba_py/hecuba/partitioner.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index 893f6ee2..0fe11d03 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -104,17 +104,15 @@ def tokens_partitions(self, ksp, table, token_range_size, target_token_range_siz partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) if self._strategy == "DYNAMIC": - for partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): + for node, partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): storage_id = uuid.uuid4() config.session.execute(self._prepared_store_id, [self._partitioning_uuid, storage_id, config.splits_per_node]) self._n_idle_nodes -= 1 - # self._partitions_nodes[storage_id] = node TODO why? where do you get node? - storage_id = uuid.uuid4() - self._partitions_nodes[storage_id] = partition_tokens[0] - yield storage_id, partition_tokens[1] + self._partitions_nodes[storage_id] = node + yield storage_id, partition_tokens else: - for final_tokens in self._send_final_tasks(partitions_per_node): + for _, final_tokens in self._send_final_tasks(partitions_per_node): yield uuid.uuid4(), final_tokens def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): @@ -231,7 +229,7 @@ def _send_final_tasks(self, partitions_per_node): for partition in partitions_per_node.values(): group_size = max(len(partition) // config.splits_per_node, 1) for i in range(0, len(partition), group_size): - yield partition[i:i + group_size] + yield -1, partition[i:i + group_size] def _at_least_each_granularity_finished(self): """ From b4c00e203e7ed757d590a7fb60e7c5fdaa0ffbb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Mon, 13 Jan 2020 11:27:11 +0100 Subject: [PATCH 15/19] merge with master --- .../tests/withcassandra/partitioner_tests.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index 98edd3d4..ae563b6b 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -24,6 +24,8 @@ class MyDict(StorageDict): SET end_time = %s WHERE storage_id = %s""" +number_nodes = 2 + class PartitionerTest(unittest.TestCase): @@ -84,7 +86,7 @@ def test_dynamic_simple(self): print("number of splits: %s, best is %s" % (nsplits, 45)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) def test_dynamic_simple_other(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") @@ -114,7 +116,7 @@ def test_dynamic_simple_other(self): print("number of splits: %s, best is %s" % (nsplits, 32)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 32 // 2) + self.assertEqual(config.splits_per_node, 32 // number_nodes) def test_dynamic_different_nodes(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") @@ -144,7 +146,7 @@ def test_dynamic_different_nodes(self): print("number of splits: %s, best is %s\n" % (nsplits, 64)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 64 // 2) + self.assertEqual(config.splits_per_node, 64 // number_nodes) def test_dynamic_best_without_finishing(self): """ @@ -176,14 +178,14 @@ def test_dynamic_best_without_finishing(self): elif nsplits == 1: config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) else: - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) nsplits += 1 acc += self.computeItems(partition) print("number of splits: %s, best is %s" % (nsplits, 45)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) def test_dynamic_best_idle_nodes(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") @@ -218,14 +220,14 @@ def test_dynamic_best_idle_nodes(self): config.session.execute(set_time, [start, start + 60, partition.storage_id]) if nsplits > 1: - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) nsplits += 1 acc += self.computeItems(partition) print("number of splits: %s, best is %s" % (nsplits, 45)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) def test_dynamic_idle_nodes_new_best(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") @@ -263,15 +265,15 @@ def test_dynamic_idle_nodes_new_best(self): config.session.execute(set_time, [start, start + 60, partition.storage_id]) if 5 >= nsplits >= 2: - self.assertEqual(config.splits_per_node, 45 // 2) + self.assertEqual(config.splits_per_node, 45 // number_nodes) elif nsplits > 5: - self.assertEqual(config.splits_per_node, 32 // 2) + self.assertEqual(config.splits_per_node, 32 // number_nodes) nsplits += 1 acc += self.computeItems(partition) print("number of splits: %s, best is %s" % (nsplits, 32)) self.assertEqual(nitems, acc) - self.assertEqual(config.splits_per_node, 32 // 2) + self.assertEqual(config.splits_per_node, 32 // number_nodes) def test_check_nodes_not_set(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") From ae759c9356907b59914a0a1600eea4516ea00ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Mon, 13 Jan 2020 12:00:24 +0100 Subject: [PATCH 16/19] creates table partitioning before tests --- hecuba_py/tests/withcassandra/partitioner_tests.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index ae563b6b..54e46dc0 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -26,6 +26,17 @@ class MyDict(StorageDict): number_nodes = 2 +_dynamic_part_table_cql = """CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""" + +config.session.execute(_dynamic_part_table_cql) + class PartitionerTest(unittest.TestCase): From 0c61e2084f9f138c00fa6438be8f78abcce10305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Mon, 13 Jan 2020 12:36:56 +0100 Subject: [PATCH 17/19] partitioning table not creating --- hecuba_py/tests/withcassandra/partitioner_tests.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index 54e46dc0..b5c76a4c 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -35,8 +35,6 @@ class MyDict(StorageDict): PRIMARY KEY (storage_id)) WITH default_time_to_live = 86400""" -config.session.execute(_dynamic_part_table_cql) - class PartitionerTest(unittest.TestCase): @@ -72,6 +70,7 @@ def test_simple(self): def test_dynamic_simple(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): @@ -102,6 +101,7 @@ def test_dynamic_simple(self): def test_dynamic_simple_other(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): @@ -132,6 +132,7 @@ def test_dynamic_simple_other(self): def test_dynamic_different_nodes(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): @@ -166,6 +167,7 @@ def test_dynamic_best_without_finishing(self): """ config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): @@ -201,6 +203,7 @@ def test_dynamic_best_without_finishing(self): def test_dynamic_best_idle_nodes(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): @@ -243,6 +246,7 @@ def test_dynamic_best_idle_nodes(self): def test_dynamic_idle_nodes_new_best(self): config.session.execute("DROP TABLE IF EXISTS my_app.mydict") config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) d = MyDict("my_app.mydict") nitems = 10000 for i in range(0, nitems): From e0534dec637c09ccc5204f7bafde55fbdbb22774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Tue, 14 Jan 2020 09:42:40 +0100 Subject: [PATCH 18/19] removed unused parameters --- hecuba_py/hecuba/IStorage.py | 4 +--- hecuba_py/hecuba/partitioner.py | 6 +++--- hecuba_py/tests/withcassandra/partitioner_tests.py | 3 +++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index fb49084f..c42e01bb 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -111,9 +111,7 @@ def split(self): except AttributeError as ex: raise RuntimeError("Object {} does not have tokens".format(self._get_name())) - # TODO Fix len(tokens), this is wrong, should be tokens - for storage_id, token_split in p.tokens_partitions(self._ksp, self._table, len(tokens), - config.target_token_range_size): + for storage_id, token_split in p.tokens_partitions(self._ksp, self._table): log.debug('assigning to {} num tokens {}'.format(str(storage_id), len(token_split))) new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) args_dict = new_args._asdict() diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index 0fe11d03..e29e769e 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -95,13 +95,13 @@ def _setup_dynamic_structures(self): # 11 basic number of partitions, repeating them when more than 11 nodes self._basic_partitions = (partitions * (self._nodes_number // len(partitions) + 1))[0:self._nodes_number] - def tokens_partitions(self, ksp, table, token_range_size, target_token_range_size): + def tokens_partitions(self, ksp, table): """ Method that calculates the new token partitions for a given object Returns: a tuple (node, partition) every time it's called """ - partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) + partitions_per_node = self._compute_partitions_per_node(ksp, table) if self._strategy == "DYNAMIC": for node, partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): @@ -115,7 +115,7 @@ def tokens_partitions(self, ksp, table, token_range_size, target_token_range_siz for _, final_tokens in self._send_final_tasks(partitions_per_node): yield uuid.uuid4(), final_tokens - def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): + def _compute_partitions_per_node(self, ksp, table): """ Compute all the partitions per node. If the strategy is simple partitioning, each node will have (config.splits_per_node (default 32) * self._nodes_number) partitions. If the strategy is dynamic partitioning, diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py index b5c76a4c..e57cc581 100644 --- a/hecuba_py/tests/withcassandra/partitioner_tests.py +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -35,6 +35,9 @@ class MyDict(StorageDict): PRIMARY KEY (storage_id)) WITH default_time_to_live = 86400""" +config.token_range_size = None +config.target_token_range_size = None + class PartitionerTest(unittest.TestCase): From 9366875d1a727e1728df831aed56676a5b6e2dda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Espejo=20Salda=C3=B1a?= Date: Tue, 14 Jan 2020 12:19:47 +0100 Subject: [PATCH 19/19] added needed parameters --- hecuba_py/hecuba/IStorage.py | 3 ++- hecuba_py/hecuba/partitioner.py | 9 +++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index c42e01bb..44ee756c 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -111,7 +111,8 @@ def split(self): except AttributeError as ex: raise RuntimeError("Object {} does not have tokens".format(self._get_name())) - for storage_id, token_split in p.tokens_partitions(self._ksp, self._table): + for storage_id, token_split in p.tokens_partitions(self._ksp, self._table, config.token_range_size, + config.target_token_range_size): log.debug('assigning to {} num tokens {}'.format(str(storage_id), len(token_split))) new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) args_dict = new_args._asdict() diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py index e29e769e..c4ad5404 100644 --- a/hecuba_py/hecuba/partitioner.py +++ b/hecuba_py/hecuba/partitioner.py @@ -95,13 +95,13 @@ def _setup_dynamic_structures(self): # 11 basic number of partitions, repeating them when more than 11 nodes self._basic_partitions = (partitions * (self._nodes_number // len(partitions) + 1))[0:self._nodes_number] - def tokens_partitions(self, ksp, table): + def tokens_partitions(self, ksp, table, token_range_size, target_token_range_size): """ Method that calculates the new token partitions for a given object Returns: a tuple (node, partition) every time it's called """ - partitions_per_node = self._compute_partitions_per_node(ksp, table) + partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) if self._strategy == "DYNAMIC": for node, partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): @@ -115,7 +115,7 @@ def tokens_partitions(self, ksp, table): for _, final_tokens in self._send_final_tasks(partitions_per_node): yield uuid.uuid4(), final_tokens - def _compute_partitions_per_node(self, ksp, table): + def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): """ Compute all the partitions per node. If the strategy is simple partitioning, each node will have (config.splits_per_node (default 32) * self._nodes_number) partitions. If the strategy is dynamic partitioning, @@ -125,9 +125,6 @@ def _compute_partitions_per_node(self, ksp, table): a dictionary with hosts as keys and partitions of tokens as values """ splits_per_node = config.splits_per_node - token_range_size = config.token_range_size - target_token_range_size = config.target_token_range_size - step_size = _max_token // (config.splits_per_node * self._nodes_number) if token_range_size: