-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPSUShortestPath_Algorithm.py
More file actions
123 lines (99 loc) · 3.53 KB
/
PSUShortestPath_Algorithm.py
File metadata and controls
123 lines (99 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import csv
import heapq
import time
from math import sin, cos, sqrt, atan2, radians
def haversine_distance(lat1, lon1, lat2, lon2):
# RADIUS OF EARTH IN MILES
R = 3958.8
# FUTURE DEPLOYMENT LINES
lat1_rad = radians(lat1)
lon1_rad = radians(lon1)
lat2_rad = radians(lat2)
lon2_rad = radians(lon2)
# DIFFERENCES
dlon = lon2_rad - lon1_rad
dlat = lat2_rad - lat1_rad
# HARVESINE FORMULA
a = sin(dlat / 2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance = R * c
return distance
class Graph:
def __init__(self, nodes_file, links_file):
self.nodes = {}
self.graph = {}
self.load_nodes(nodes_file)
self.load_links(links_file)
def load_nodes(self, nodes_file):
with open(nodes_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
node_id = int(row['node_id'])
lat = float(row['y_coord'])
lon = float(row['x_coord'])
self.nodes[node_id] = (lat, lon)
self.graph[node_id] = []
def load_links(self, links_file):
with open(links_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
from_node = int(row['from_node_id'])
to_node = int(row['to_node_id'])
length = float(row['length'])
self.graph[from_node].append((to_node, length))
def a_star_search(graph, start, goal):
"""
A* search algorithm
"""
frontier = []
heapq.heappush(frontier, (0, start))
came_from = {}
g_score = {node: float('inf') for node in graph.nodes}
g_score[start] = 0
f_score = {node: float('inf') for node in graph.nodes}
f_score[start] = haversine_distance(*graph.nodes[start], *graph.nodes[goal])
closed_set = set()
while frontier:
current_f, current = heapq.heappop(frontier)
if current in closed_set:
continue
closed_set.add(current)
if current == goal:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
path.append(start)
path.reverse()
return path, g_score[goal]
for neighbor, cost in graph.graph[current]:
if neighbor in closed_set:
continue
tentative_g = g_score[current] + cost
if tentative_g < g_score[neighbor]:
came_from[neighbor] = current
g_score[neighbor] = tentative_g
f_score[neighbor] = tentative_g + haversine_distance(*graph.nodes[neighbor], *graph.nodes[goal])
heapq.heappush(frontier, (f_score[neighbor], neighbor))
return None, float('inf')
def find_shortest_path(start=1, goal=20):
"""
Main function to find shortest path
"""
graph = Graph('Nodes.csv', 'Links.csv')
if start not in graph.nodes or goal not in graph.nodes:
return None, None, None
start_time = time.time()
path, cost = a_star_search(graph, start, goal)
end_time = time.time()
running_time = end_time - start_time
return path, cost, running_time
if __name__ == "__main__":
# DEFAULT NODE 1 TO NODE 20
path, cost, running_time = find_shortest_path()
if path:
print(f"Shortest Path: {path}")
print(f"Total Cost: {cost:.2f} miles")
print(f"Running Time: {running_time:.4f} seconds")
else:
print("No path found")