-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprecompute_pathfinding.py
More file actions
201 lines (144 loc) · 6.62 KB
/
precompute_pathfinding.py
File metadata and controls
201 lines (144 loc) · 6.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import osmnx as ox
from datetime import datetime, timedelta
import pathlib
import networkx as nx
from typing import Dict
import igraph
import geopandas as gpd
from tqdm import tqdm
from quadtree_buildings import PolygonQuery
from calculate_polygon_shade import calculate_polygon_shade
from shapely.geometry import Polygon, LineString
import haversine as hs
def generate_times() -> list[str]:
"""
Returns ["1 09:00", "1 10:00", ..., "12 18:00", "12 19:00"]
"""
# Define start and end times
start_time = datetime.strptime('09:00', '%H:%M')
end_time = datetime.strptime('19:00', '%H:%M')
# Generate list of time strings for 10 hours (from 9:00 to 19:00)
time_stages = [(start_time + timedelta(hours=h)).strftime('%H:%M') for h in range(10)]
# Define months
months = list(range(1, 13))
# Generate list of time stages for each month
time_stages_per_month = [f"{month} {time}" for month in months for time in time_stages]
return time_stages_per_month
TIMES = generate_times()
class PrecomputedPathfinder:
"""
Class for precomputing and storing pathfinding data for a given city.
Contains an ig.graph for each time of day and month of the year (9:00 - 19:00).
We differentiate between primary graph and a list of igraphs.
- The primary graph is the graph that is downloaded from OSM and saved as a graphml file.
- The list of igraphs is a dictionary of igraphs for each time of day and month of the year.
NOTE: Loading the Primary Graph of Gdansk takes around 25s.
"""
def __init__(self, city_name, shade_impact = 1, verbose = True):
self.verbose = verbose
self.city_name = city_name
print("Loading Polygon Query module...")
csv_name = f'data/buildings_{city_name}_clean.csv'
self.qtb = PolygonQuery(csv_name)
print("Polygon Query module loaded.")
self.graphs = self._create_graph(city_name)
self.was_loaded = False
self.shade_impact = shade_impact
def _create_graph(self, city_name):
"""
Downloads or loads the graph for the given city.
If it is downloaded, it is modified to create a dictionary of igraphs
for each time of day and month of the year.
"""
graph_path = f"data/{city_name}.graphml"
modified_graph_path = f"data/{city_name}_modified/"
# TEMP_graph_path = f"data/{city_name}_TEMP.graphml"
start_time = datetime.now()
if not pathlib.Path(modified_graph_path).exists():
graph = None
if pathlib.Path(graph_path).exists():
print("Primary graph already exists, loading...")
graph = ox.load_graphml(filepath=graph_path)
print("Primary graph loaded.")
else:
print("Downloading primary graph...")
graph = ox.graph_from_place(city_name, network_type='walk')
print("Primary graph downloaded.")
# # TEMPORARY!!!
# ox.save_graphml(graph, filepath=TEMP_graph_path)
# Change the graph to a dictionary of igraphs for each time of day and month of the year
graphs = self._modify_graph(graph)
for time, graph in tqdm(graphs.items()):
graph.save(f"{modified_graph_path}{time}.graphml")
print(f"Graph for {time} saved.")
else:
for time in tqdm(TIMES):
print(f"Loading graph for {time}...")
specific_time_graph_path = f"{modified_graph_path}{time}.graphml"
graph = ox.load_graphml(filepath=specific_time_graph_path)
print(f"Graph for {time} loaded.")
graph = ox.load_graphml(filepath=modified_graph_path)
self.was_loaded = True
end_time = datetime.now()
print("Dict of graphs loaded in:", end_time - start_time, "s")
end_time = datetime.now()
print("Graph loaded/downloaded in", end_time - start_time, "s")
return graph
def _modify_graph(self, graph: nx.MultiDiGraph) -> Dict[str, igraph.Graph]:
"""
Modifies the graph to create a dictionary of igraphs
for each time of day and month of the year.
"""
graphs = {}
for time in tqdm(TIMES):
time_value = datetime.strptime(time, '%m %H:%M')
edges = [
(u, v, self._calculate_edge(graph.nodes[u], graph.nodes[v], time_value))
for u, v, k, data in graph.edges(data=True, keys=True)
]
graphs[time] = igraph.Graph.TupleList(edges=edges, directed=False)
return graphs
def _calculate_edge(self, start, end, time) -> float:
"""
Calculates the value of the edge for a given time.
For now it is only the proportion of the edge that is in the shade.
"""
x1, y1 = start['x'], start['y']
x2, y2 = end['x'], end['y']
distance = hs.haversine((x1, y1), (x2, y2))
proportion_in_shade = 0
potential_polygons = self.qtb.query(x1, y1, x2, y2)
if potential_polygons == []:
proportion_in_shade = 0
else:
print(len(potential_polygons))
potential_polygons_geometries = gpd.GeoSeries(potential_polygons)
potential_polygons_gdf = gpd.GeoDataFrame(
geometry=potential_polygons_geometries,
# crs='EPSG:4326'
)
potential_polygons_gdf = potential_polygons_gdf.set_geometry('geometry')
list_of_shades = calculate_polygon_shade(potential_polygons_gdf, time)
for shade in list_of_shades:
line = LineString([(x1, y1), (x2, y2)])
intersection: float = shade.intersection(line).length
proportion_in_shade += intersection / line.length
return distance + distance * proportion_in_shade * self.shade_impact
def find_path(self, start, end):
"""
Finds the path between two points using the precomputed data.
"""
# TODO
return None
def test():
city_name = "gdansk"
pathfinder = PrecomputedPathfinder(city_name)
# Example start and end points
start = "Brama Wyżynna, Wały Jagiellońskie 2A, 80-887 Gdańsk"
end = "Museum of the Second World War, plac Władysława Bartoszewskiego 1, 80-862 Gdańsk"
# Find path
path = pathfinder.find_path(start, end)
print("Path found:", path)
# Example usage for Gdansk
if __name__ == '__main__':
test()