-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsolver.py
More file actions
150 lines (131 loc) · 6.95 KB
/
solver.py
File metadata and controls
150 lines (131 loc) · 6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import parameters
from ortools.constraint_solver import pywrapcp
import sys
class Solver:
solver = None
decision_builder = None
data = None
# -- VARS
x_var = dict() # employee -> dict(): shift -> variable
we_var = dict() # employee -> dict(): int representing week -> variable
all_vars = list()
# -- CONSTRS
cover_shift = dict() # shift -> constr
shift_per_week = dict() # employee, int (week) -> constr
adjacent_shifts = dict() # employee, shift -> constr
shifts_max_week = dict() # employee, shift name, int (week) -> constr
we_var_definition = dict() # employee, int(week) -> constr
we_consecutive = dict() # employee, int(week) -> constr
def __init__(self, data):
self.data = data
self.solver = pywrapcp.Solver("simple_example")
self.create_x_vars()
self.create_we_vars()
self.create_cover_shift_constraints()
self.create_shift_per_week_constraints()
self.create_adjacent_shifts_constraints()
self.create_shifts_max_week_constraints()
self.create_we_var_definition_constraints()
self.create_we_consecutive_constraints()
def create_x_vars(self):
for employee in self.data.employees:
self.x_var[employee] = dict()
for shift in self.data.shifts:
if not employee.unavailable[shift]:
self.x_var[employee][shift] = self.solver.IntVar(0,1,"x-E{0}-S{1}-D{2}".format(employee.id, shift.name, shift.day))
self.all_vars.append(self.x_var[employee][shift])
def create_we_vars(self):
for employee in self.data.employees:
self.we_var[employee] = dict()
for we in range(self.data.weeks):
self.we_var[employee][we] = self.solver.IntVar(0,1,"w-{0}-{1}".format(employee.name, we))
self.all_vars.append(self.we_var[employee][we])
def create_cover_shift_constraints(self):
# -- mandatory shifts must be taken
for shift in self.data.shifts:
if shift.mandatory:
self.solver.Add(sum(self.x_var[e][shift] for e in self.data.employees if not e.unavailable[shift]) > 0)
def create_shift_per_week_constraints(self):
# -- employee must take correct number of shifts per week
for e in self.data.employees:
for w in range(self.data.weeks):
self.solver.Add(sum(self.x_var[e][s] for s in self.data.shifts.select(name='*', mandatory='*', day='*', week=w) if not e.unavailable[s]) == e.shifts_per_week[w])
def create_adjacent_shifts_constraints(self):
# -- for each set of adjacent shifts, select at most 1, for each employee
# counter = 0
# for shift in self.data.shifts:
# adjacents = self.data.shifts.get_adjacent(shift)
# print "shift", shift
# print "adjacents", [(s.name, s.day) for s in adjacents]
# for e in self.data.employees:
# if not e.unavailable[shift]:
# print e.name, [(s.name, s.day) for s in adjacents if not e.unavailable[s]]
# self.solver.Add(sum(self.x_var[e][s] for s in adjacents if not e.unavailable[s]) <= 1)
for d in range(self.data.days):
if not self.data.holidays[d]:
shifts = self.data.shifts.select(name='*', mandatory='*', day=d,week='*')
for e in self.data.employees:
shifts_e = [s for s in shifts if not e.unavailable[s]]
if shifts_e and e.id in [0]:
self.solver.Add(sum(self.x_var[e][s] for s in shifts_e) < 2)
print d, e.name, ", ".join(map(lambda x: getattr(x, 'name'), shifts_e))
def create_shifts_max_week_constraints(self):
# -- for each shift type, employee can take a maximum per week
for shift_name, max_shifts in parameters.shifts_max_week.items():
for e in self.data.employees:
for w in range(self.data.weeks):
self.shifts_max_week[e, shift_name, w] = self.solver.Add(
sum(self.x_var[e][s] for s in self.data.shifts.select(name=shift_name, mandatory='*', day='*', week=w)) <= max_shifts)
def create_we_var_definition_constraints(self):
# -- get number of shifts in a weekend
for w in range(self.data.weeks):
we_shifts = self.data.shifts.select(name='*', mandatory='*', day= w*7 +5, week=w) + self.data.shifts.select(
name='*', mandatory='*', day=w*7+ 6, week=w)
for e in self.data.employees:
self.we_var_definition[e,w] = self.solver.Add(sum(self.x_var[e][s] for s in we_shifts if not e.unavailable[s]) < len(we_shifts)*self.we_var[e][w])
def create_we_consecutive_constraints(self):
for w in range(self.data.weeks - parameters.max_we_consecutive):
for e in self.data.employees:
self.we_consecutive[e,w] = self.solver.Add(sum(self.we_var[e][w + i] for i in range(parameters.max_we_consecutive)) <= parameters.max_we_consecutive)
def solve(self):
self.decision_builder = self.solver.Phase(self.all_vars, self.solver.CHOOSE_RANDOM, self.solver.ASSIGN_MIN_VALUE)
solutions_limit = self.solver.SolutionsLimit(1)
time_limit = self.solver.TimeLimit(3 * 1000)
collector = self.solver.AllSolutionCollector()
collector.Add(self.all_vars)
if self.solver.Solve(self.decision_builder, [solutions_limit, collector]):
print "Some solutions found"
# self.print_solution()
sol = 0
while self.solver.NextSolution():
print "writing ..."
with open("solutions/sol-%s.txt"%sol, 'w') as target:
for day in range(self.data.days):
target.write("DAY %s\n" %day)
shifts = self.data.shifts.select(name='*', mandatory='*', day=day, week='*')
for s in shifts:
for e in self.data.employees:
if not e.unavailable[s] and self.x_var[e][s].Value() > 0:
target.write("{0}: {1}\n".format(s.name, e.name))
sys.exit()
sol += 1
else:
print "No solutions found"
def print_solution(self):
sol = 0
while self.solver.NextSolution():
print "Solution", sol
sol += 1
for day in range(self.data.days):
print "DAY",day
shifts = self.data.shifts.select(name='*',mandatory='*',day=day,week='*')
for s in shifts:
for e in self.data.employees:
if not e.unavailable[s] and self.x_var[e][s].Value() > 0:
print "{0}: {1}".format(s, e.name)
# sys.exit()
print("\nNumber of solutions found:", sol)
def populate_solution(self):
pass
def to_string(self):
pass