-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
68 lines (52 loc) · 1.93 KB
/
main.py
File metadata and controls
68 lines (52 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# A demo of the pipline
import numpy as np
import networkx as nx
from collections import defaultdict, deque
from ai2thor.controller import Controller
from scipy.spatial import distance
from typing import Tuple
import random
import os
from utils.task_agent import generate_task_sequence, parse_task_sequence
from utils.graph_utils import *
from utils.control_policy import *
# Initialize the environment
floor_no = 1
c = Controller(height=1000, width=1000)
c.reset(f"FloorPlan{floor_no}")
cp = ControlPolicy(c, "tag")
# Define robot capabilities
robot_activities = ["GoToObject", "PickupObject", "PutObject", "SwitchOn", "SwitchOff", "SliceObject"]
robots = [
{"name": "robot1", "skills": robot_activities},
]
cp.init_robots(robots)
print("Robot initialized!")
def main():
enable_safety_aware = False
method = "graphomer"
# 1. Retrieve object information
env_objects, obj_lists = get_environment_data(controller=c)
safety_notice = 'Safe'
if enable_safety_aware:
if method == 'graphomer':
# 2. Construct the environment graph
nodes, edges = build_environment_graph(env_objects)
# 3. Receive safety notifications
safety_notice = receive_safety_notice(nodes, edges)
elif method == 'ltl':
# Note: please modify the safety rules
# 3. Receive safety notifications
safety_notice = receive_safety_notice_ltl(obj_lists)
# 4. Generate task sequence
task_description = "pick up an apple and put it on any container"
task_sequence_json = generate_task_sequence(task_description, robot_activities, obj_lists, safety_notice)
# 5. Parse task sequence
action_queue = parse_task_sequence(task_sequence_json)
print(action_queue)
cp.add_action_list(action_queue)
print("Starting execution!")
# 6. Execute tasks
cp.run_task_thread()
if __name__ == "__main__":
main()