forked from SelimEmre/webrtc-ant-media
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathhls.py
More file actions
111 lines (87 loc) · 3 KB
/
hls.py
File metadata and controls
111 lines (87 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
import numpy as np
import requests
import time
Gst.init(None)
callback = None
def on_decoded_video_buffer(pad, info, streamId):
caps = pad.get_current_caps()
structure = None
if caps:
structure = caps.get_structure(0)
width = structure.get_int("width")[1]
height = structure.get_int("height")[1]
format = structure.get_string("format")
else:
print("No caps available")
return Gst.FlowReturn.ERROR
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.FlowReturn.ERROR
(result, mapinfo) = gst_buffer.map(Gst.MapFlags.READ)
assert result
stride = width * 3
if structure and structure.has_field("stride"):
stride = structure.get_int("stride")[1]
else:
# Calculate stride from buffer size if not explicitly provided
# stride = buffer_size / height
calculated_stride = mapinfo.size // height
if calculated_stride >= width * 3:
stride = calculated_stride
expected_size = height * width * 3
actual_size = mapinfo.size
if stride != width * 3 or actual_size != expected_size:
numpy_frame = np.zeros((height, width, 3), dtype=np.uint8)
buffer_view = memoryview(mapinfo.data)
for y in range(height):
src_start = y * stride
src_end = src_start + width * 3
row_data = bytes(buffer_view[src_start:src_end])
numpy_frame[y, :, :] = np.frombuffer(row_data, dtype=np.uint8).reshape(width, 3)
else:
numpy_frame = np.ndarray(
shape=(height, width, 3),
dtype=np.uint8,
buffer=mapinfo.data)
callback(numpy_frame)
gst_buffer.unmap(mapinfo)
return Gst.FlowReturn.OK
def url_exists(url, timeout=5):
try:
response = requests.head(url, allow_redirects=True, timeout=timeout)
return response.status_code == 200
except requests.RequestException:
return False
def start_hls_pipeline(url,on_video_frame_callback):
while True:
if url_exists(url):
break
print("waiting for hls url to be available")
time.sleep(3)
global callback
callback = on_video_frame_callback
pipeline_str = f"""
souphttpsrc location="{url}" !
hlsdemux !
decodebin ! videoconvert ! capsfilter
caps=video/x-raw,format=RGB !
fakesink name=vsink sync=false
"""
pipeline = Gst.parse_launch(pipeline_str)
sink = pipeline.get_by_name("vsink")
rgb_pad = sink.get_static_pad("sink")
rgb_pad.add_probe(
Gst.PadProbeType.BUFFER, on_decoded_video_buffer, "stream1")
pipeline.set_state(Gst.State.PLAYING)
print("Pipeline started…")
# Run main loop
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print("Stopping…")
pipeline.set_state(Gst.State.NULL)