-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathParallelizer.py
More file actions
51 lines (46 loc) · 2.31 KB
/
Parallelizer.py
File metadata and controls
51 lines (46 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import concurrent.futures
import os
from functools import wraps
def make_parallel(func):
"""
Decorator used to decorate any function which needs to be parallized.
After the input of the function should be a list in which each element is a instance of input fot the normal function.
You can also pass in keyword arguements seperatley.
:param func: function
The instance of the function that needs to be parallelized.
:return: function
"""
@wraps(func)
def wrapper(lst):
"""
:param lst:
The inputs of the function in a list.
:return:
"""
# the number of threads that can be max-spawned.
# If the number of threads are too high, then the overhead of creating the threads will be significant.
# Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
# In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
number_of_threads_multiple = 2 # You can change this multiple according to you requirement
number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
if len(lst) < number_of_workers:
# If the length of the list is low, we would only require those many number of threads.
# Here we are avoiding creating unnecessary threads
number_of_workers = len(lst)
if number_of_workers:
if number_of_workers == 1:
# If the length of the list that needs to be parallelized is 1, there is no point in
# parallelizing the function.
# So we run it serially.
result = [func(lst[0])]
else:
# Core Code, where we are creating max number of threads and running the decorated function in parallel.
result = []
with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executer:
bag = {executer.submit(func, i): i for i in lst}
for future in concurrent.futures.as_completed(bag):
result.append(future.result())
else:
result = []
return result
return wrapper