-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTreap.py
More file actions
344 lines (250 loc) · 10.6 KB
/
Treap.py
File metadata and controls
344 lines (250 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python3
from Dictionary import Dictionary
import random
import unittest
import string
class _TreapNode(object):
def __init__(self, key, value, left_son=None, right_son=None, priority=None):
self.key = key
self.value = value
self.priority = priority
if priority is None:
self.priority = random.random()
self.left_son = left_son
self.right_son = right_son
self.weight_of_subtree = 1
self.weight_of_subtree += left_son.weight_of_subtree if left_son is not None else 0
self.weight_of_subtree += right_son.weight_of_subtree if right_son is not None else 0
self.min_key = self.max_key = key
if left_son is not None:
self.min_key = min(self.min_key, left_son.min_key)
self.max_key = max(self.max_key, left_son.max_key)
if right_son is not None:
self.min_key = min(self.min_key, right_son.min_key)
self.max_key = max(self.max_key, right_son.max_key)
def rotate_right(self):
his_left_node = self.left_son
self.left_son = his_left_node.right_son
his_left_node.right_son = self
self.update_fields()
return his_left_node
def rotate_left(self):
his_right_node = self.right_son
self.right_son = his_right_node.left_son
his_right_node.left_son = self
self.update_fields()
return his_right_node
# Assumes that his sons are already updated
def update_fields(self):
self.weight_of_subtree = 1
self.weight_of_subtree += self.left_son.weight_of_subtree if self.left_son is not None else 0
self.weight_of_subtree += self.right_son.weight_of_subtree if self.right_son is not None else 0
self.min_key = self.max_key = self.key
if self.left_son is not None:
self.min_key = min(self.min_key, self.left_son.min_key)
self.max_key = max(self.max_key, self.left_son.max_key)
if self.right_son is not None:
self.min_key = min(self.min_key, self.right_son.min_key)
self.max_key = max(self.max_key, self.right_son.max_key)
class Treap(Dictionary):
def __init__(self, root=None):
self._root = root
def get_root(self):
return self._root
@staticmethod
def _balance(node):
if (node.left_son is not None and
(node.right_son is None or node.left_son.priority >= node.right_son.priority) and
node.left_son.priority >= node.priority):
return node.rotate_right()
if (node.right_son is not None and
(node.left_son is None or node.right_son.priority >= node.left_son.priority) and
node.right_son.priority >= node.priority):
return node.rotate_left()
return node
# Support insertion
@staticmethod
def _insert(node, key, value, _priority):
if node is None:
return _TreapNode(key, value, priority=_priority)
if node.key == key:
# replace the old value with the new one
node.value = value
return node
if node.key > key:
node.left_son = Treap._insert(node.left_son, key, value, _priority)
else:
node.right_son = Treap._insert(node.right_son, key, value, _priority)
node = Treap._balance(node)
node.update_fields()
return node
def insert(self, key, value=None, priority=None):
self._root = Treap._insert(self._root, key, value, priority)
# Support erasing
@staticmethod
def _erase(node, key):
if node is None:
return None
if key < node.key:
node.left_son = Treap._erase(node.left_son, key)
elif key > node.key:
node.right_son = Treap._erase(node.right_son, key)
else:
# the current node must be deleted
if node.left_son is None and node.right_son is None:
# can be deleted
del node
return None
else:
if node.left_son is not None and (node.right_son is None or node.left_son.priority >= node.right_son.priority):
node = node.rotate_right()
node.right_son = Treap._erase(node.right_son, key)
else:
node = node.rotate_left()
node.left_son = Treap._erase(node.left_son, key)
node.update_fields()
return node
def erase(self, key):
self._root = Treap._erase(self._root, key)
# Support split
def split(self, key):
key += 0.5
self.insert(key, None, 1)
root_lower, root_higher = self._root.left_son, self._root.right_son
del self._root
return Treap(root_lower), Treap(root_higher)
# Support join
def join(self, treap_higher):
if self.get_max_key() > treap_higher.get_min_key():
raise ValueError("All keys from the current treap must be lower than those of the argument treap.")
middle_key = self.get_max_key() + 0.5
joining_root = _TreapNode(middle_key, None, self._root, treap_higher.get_root())
join_treap = Treap(joining_root)
join_treap.erase(middle_key)
return join_treap
def __add__(self, other):
return self.join(other)
# Specific queries
def size(self):
return self._root.weight_of_subtree
def choose_element(self):
return self.get_kth_element(random.randint(0, self.size() - 1))
@staticmethod
def _get_min_key(node):
if node.left_son is not None:
return Treap._get_min_key(node.left_son)
else:
return node.key
def get_min_key(self):
if self._root is None:
return None
else:
return Treap._get_min_key(self._root)
@staticmethod
def _get_max_key(node):
if node.right_son is not None:
return Treap._get_max_key(node.right_son)
else:
return node.key
def get_max_key(self):
if self._root is None:
return None
else:
return Treap._get_max_key(self._root)
@staticmethod
def _get_kth_element(node, k):
if node is None:
return None
left_son_weight = node.left_son.weight_of_subtree if node.left_son is not None else 0
if k == left_son_weight:
return node.key, node.value
if k < left_son_weight:
return Treap._get_kth_element(node.left_son, k)
else:
return Treap._get_kth_element(node.right_son, k - left_son_weight - 1)
def get_kth_element(self, k):
return Treap._get_kth_element(self._root, k)
########################## Testing
class TestTreapOperations(unittest.TestCase):
def setUp(self):
self.treap = Treap()
# populate the treap
self.number_of_insertions = 10000
for i in range(self.number_of_insertions):
key = random.randint(1, 1000000)
value = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(50))
# self.treap.insert(key, value)
self.treap[key] = value
@staticmethod
def is_sorted(L):
return all(L[i] <= L[i+1] for i in range(len(L)-1))
@staticmethod
def check_treap_priorities(node):
if node is None:
return True
return ((node.left_son is None or node.priority >= node.left_son.priority) and
(node.right_son is None or node.priority >= node.right_son.priority) and
((node.left_son is None or TestTreapOperations.check_treap_priorities(node.left_son)) and
(node.right_son is None or TestTreapOperations.check_treap_priorities(node.right_son))))
def test_insert(self):
# the items list must be sorted by keys
self.assertTrue(TestTreapOperations.is_sorted(self.treap.keys()))
print ("The height of the treap is: " + str(self.treap.get_height()))
# the treap must have the apropiate structure
self.assertTrue(TestTreapOperations.check_treap_priorities(self.treap._root))
def test_erase(self):
print ("Erasing elements...")
number_of_deletions = 50
for i in range(number_of_deletions):
k, v = self.treap.choose_element()
self.treap.erase(k)
# the items list must be sorted by keys
self.assertTrue(TestTreapOperations.is_sorted(self.treap.keys()))
# the treap must have the apropiate structure
self.assertTrue(TestTreapOperations.check_treap_priorities(self.treap._root))
def test_split_and_join(self):
k, v = self.treap.choose_element()
t1, t2 = self.treap.split(k)
# Test the split
# the items lists must be sorted by keys
self.assertTrue(TestTreapOperations.is_sorted(t1.keys()))
self.assertTrue(TestTreapOperations.is_sorted(t2.keys()))
# the treaps must have the apropiate structure
self.assertTrue(TestTreapOperations.check_treap_priorities(t1._root))
self.assertTrue(TestTreapOperations.check_treap_priorities(t2._root))
# the treaps elements must all be lower/higher than k
self.assertTrue(all(current_key <= k for current_key in t1.keys()))
self.assertTrue(all(current_key > k for current_key in t2.keys()))
# Test the join
t = t1 + t2
# the items lists must be sorted by keys
self.assertTrue(TestTreapOperations.is_sorted(t.keys()))
# the treaps must have the apropiate structure
self.assertTrue(TestTreapOperations.check_treap_priorities(t._root))
def test_queries(self):
number_of_deletions = 50
for i in range(number_of_deletions):
k, v = self.treap.choose_element()
self.treap.erase(k)
self.assertTrue(self.treap._root.weight_of_subtree == len(self.treap.items()))
number_of_indexing_queries = 1000
for i in range(number_of_indexing_queries):
query_index = random.randint(0, 11000)
k = None
try:
k, v = self.treap.get_kth_element(query_index)
except TypeError:
pass
correct_k = None
try:
correct_k = self.treap.keys()[query_index]
except IndexError:
pass
self.assertTrue(k == correct_k)
number_of_getitem_queries = 1000
for i in range(number_of_getitem_queries):
k, v = self.treap.choose_element()
self.assertTrue(self.treap.look_up(k) == v)
self.assertTrue(self.treap[k] == v)
if __name__ == "__main__":
unittest.main()