-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecorder.py
More file actions
181 lines (126 loc) · 5.78 KB
/
recorder.py
File metadata and controls
181 lines (126 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from winapi_functions import *
from mss import mss
from PIL import Image
import pickle
import time
from input_map import vk_to_unicode
import matplotlib.pyplot as plt
import dxcam
import warnings
import cv2
import numpy as np
import keyboard
from copy import deepcopy
from win11toast import notify
import os
FPS = 20
SCALE_DOWN = 10
SHOW_SCREEN_CAPTURE = False
def capture_screenshot():
with mss(with_cursor=True) as sct:
monitor = sct.monitors[1]
sct_img = sct.grab(monitor)
return Image.frombytes('RGB', sct_img.size, sct_img.bgra, 'raw', 'BGRX')
def win_input_recorder(msg_pointer, frame_mouse_movement: list, frame_mouse_events: list, frame_keyboard_events: list):
while peek_message(msg_pointer, None, 0, 0, 1):
msg = msg_pointer.contents
if msg.message == 0x00FF:
size = wintypes.UINT(0)
get_raw_input_data(ctypes.cast(msg.lParam, PRAWINPUT), 0x10000003, None, ctypes.byref(size), ctypes.sizeof(RAWINPUTHEADER))
buffer = ctypes.create_string_buffer(size.value)
get_raw_input_data(ctypes.cast(msg.lParam, PRAWINPUT), 0x10000003, buffer, ctypes.byref(size), ctypes.sizeof(RAWINPUTHEADER))
rawinput = ctypes.cast(buffer, PRAWINPUT).contents
if rawinput.header.dwType == 0: # MOUSE
mouse = rawinput.data.mouse
frame_mouse_movement[0] += mouse.lLastX
frame_mouse_movement[1] += mouse.lLastY
frame_mouse_events.append(mouse.DUMMYUNIONNAME.DUMMYSTRUCTNAME.usButtonFlags)
if rawinput.header.dwType == 1: # KEYBOARD
keyboard = rawinput.data.keyboard
frame_keyboard_events.append((keyboard.VKey, keyboard.Flags))
#print(vk_to_unicode[keyboard.VKey], keyboard.Flags)
def recorder_loop(screen_capture: dxcam.DXCamera, cv_window=None):
msg = wintypes.MSG()
msg_pointer = ctypes.pointer(msg)
is_recording = False
is_record_button_pressed = False
keyboard_events = []
mouse_movements = []
mouse_events = []
frames = []
frames_passed = 0
lenfiles = 0
for roots, dirs, files in os.walk('./val_range_data/'):
lenfiles = len(files)
current_save_number = lenfiles + 1
while True:
if keyboard.is_pressed('ctrl + r') and not is_record_button_pressed:
is_record_button_pressed = True
is_recording = not is_recording
if is_recording:
notify('STARTED RECORDING!')
else:
notify('STOPPED RECORDING!')
# SAVE
with open("./val_range_data/save" + str(current_save_number) + '.pkl', 'wb') as f:
pickle.dump((frames, keyboard_events, mouse_events, mouse_movements), f)
current_save_number += 1
keyboard_events.clear()
mouse_movements.clear()
mouse_events.clear()
frames.clear()
frame_keyboard_events.clear()
frame_mouse_events.clear()
frame_mouse_movement = [0, 0]
frames_passed = 0
elif not keyboard.is_pressed('ctrl + r'):
is_record_button_pressed = False
start_time = time.perf_counter()
frame_mouse_movement = [0, 0] # x, y
frame_keyboard_events = []
frame_mouse_events = []
if is_recording:
win_input_recorder(msg_pointer, frame_mouse_movement, frame_mouse_events, frame_keyboard_events)
screenshot = screen_capture.get_latest_frame()
frame = cv2.resize(screenshot, (1920//5, 1080//5))
frames.append(frame)
keyboard_events.append(frame_keyboard_events)
# not clear() (this is like reassigning the pointer) that way no copying is needed
frame_keyboard_events = []
mouse_events.append(frame_mouse_events)
frame_mouse_events = []
mouse_movements.append(frame_mouse_movement)
frame_mouse_movement = [0, 0]
frames_passed += 1
if frames_passed > 3000: # equivalent to 2.5 minutes at 20 fps
notify('SAVED!')
with open("./val_range_data/save" + str(current_save_number) + '.pkl', 'wb') as f:
pickle.dump((frames, keyboard_events, mouse_events, mouse_movements), f)
current_save_number += 1
keyboard_events.clear()
mouse_movements.clear()
mouse_events.clear()
frames.clear()
frame_keyboard_events.clear()
frame_mouse_events.clear()
frame_mouse_movement = [0, 0]
frames_passed = 0
# REACH TARGET FPS - (not needed as screen capture can handle)
if not is_recording:
end_time = time.perf_counter()
amount_time_to_wait = (1/FPS) - (end_time - start_time)
if amount_time_to_wait > 0:
time.sleep(amount_time_to_wait)
end_time = time.perf_counter()
print((1 / (end_time - start_time)), end="\r") # SHOULD BE AROUND FPS
if __name__ == "__main__":
print("\033[31m","MAKE SURE THE TERMINAL THIS SCRIPT IS RUNNING IN IS IN THE MONITOR TO CAPTURE!!!", "\n\033[0m")
# everything needs to run on single thread to ENSURE FRAMES and INPUT ARE SYNCED
initialize_input_recorder_window()
# MAKE SURE TO RUN THE TERMINAL WHICH IS RUNNING THE SCRIPT IN THE MONITOR TO CAPTURE!!! idk why
screen_capture = dxcam.create(output_idx = 0)
screen_capture.start(target_fps=FPS)
print(dxcam.device_info(), dxcam.output_info())
recorder_loop(screen_capture)
print('done')
screen_capture.stop()