-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentrypoint.py
More file actions
executable file
·388 lines (316 loc) · 12.3 KB
/
entrypoint.py
File metadata and controls
executable file
·388 lines (316 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
#!/usr/bin/env python3
"""
Floor Finder Container Entrypoint
Orchestrates the pipeline:
1. Convert input images (.8ij or other) to JPEG
2. Detect checkerboard corners in each image
3. Compute 4x4 floor grounding transform
4. Write result to output location
Usage:
python entrypoint.py --input <images_dir> --calibration <calib_dir> --output <output.json>
Or with environment variables:
INPUT_DIR=/data/input CALIBRATION_DIR=/data/calibration OUTPUT_FILE=/data/output/transform.json python entrypoint.py
"""
import argparse
import json
import os
import shutil
import sys
from pathlib import Path
# Import our modules
from floorfinder.converter import process_directory as convert_8ij_to_jpeg, HAS_IMAGECODECS
from floorfinder.detector import process_image as detect_checkerboard, parse_pattern_size
from floorfinder.calculator import process_all_grids
def find_images(input_dir: Path) -> tuple[list[Path], str]:
"""Find all image files in input directory.
Returns:
Tuple of (list of image paths, image type: '8ij', 'jpeg', 'mixed', 'grids', or 'none')
"""
input_dir = Path(input_dir)
# Check if input contains pre-computed grid JSONs
json_files = list(input_dir.rglob("*.json"))
# Filter for grid JSONs (have 'corners' key)
grid_files = []
for jf in json_files:
try:
with open(jf) as f:
data = json.load(f)
if "corners" in data and "pattern_size" in data:
grid_files.append(jf)
except:
pass
if grid_files:
return grid_files, "grids"
# Find different image types
eij_files = list(input_dir.rglob("*.8ij"))
jpg_files = list(input_dir.rglob("*.jpg")) + list(input_dir.rglob("*.jpeg"))
png_files = list(input_dir.rglob("*.png"))
all_images = jpg_files + png_files
if eij_files and not all_images:
return eij_files, "8ij"
elif all_images and not eij_files:
return all_images, "jpeg"
elif eij_files and all_images:
return eij_files + all_images, "mixed"
else:
return [], "none"
def convert_images_to_jpeg(input_dir: Path, output_dir: Path) -> list[Path]:
"""Convert .8ij files to JPEG, copy other images.
Returns:
List of output JPEG paths
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
images, img_type = find_images(input_dir)
output_images = []
if img_type == "8ij" or img_type == "mixed":
if not HAS_IMAGECODECS:
print("Warning: imagecodecs not available, .8ij conversion may fail")
# Convert .8ij files
eij_files = [f for f in images if f.suffix.lower() == ".8ij"]
if eij_files:
print(f"Converting {len(eij_files)} .8ij files to JPEG...")
convert_8ij_to_jpeg(input_dir, output_dir, verbose=True)
output_images.extend(output_dir.rglob("*.jpg"))
# Copy/link existing JPEG/PNG files
other_images = [f for f in images if f.suffix.lower() in (".jpg", ".jpeg", ".png")]
for img_path in other_images:
rel_path = img_path.relative_to(input_dir)
out_path = output_dir / rel_path.with_suffix(".jpg")
out_path.parent.mkdir(parents=True, exist_ok=True)
if img_path.suffix.lower() == ".png":
# Convert PNG to JPEG
from PIL import Image
img = Image.open(img_path)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.save(out_path, "JPEG", quality=95)
else:
# Copy JPEG
shutil.copy2(img_path, out_path)
output_images.append(out_path)
return list(set(output_images)) # Deduplicate
def detect_checkerboards(
image_dir: Path,
output_dir: Path,
pattern_size: tuple[int, int] = (8, 6)
) -> list[Path]:
"""Detect checkerboard corners in all images.
Returns:
List of grid JSON paths (only successful detections)
"""
image_dir = Path(image_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
images = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg"))
grid_files = []
print(f"Detecting checkerboards in {len(images)} images (pattern: {pattern_size[0]}x{pattern_size[1]})...")
for img_path in sorted(images):
# Preserve directory structure
rel_path = img_path.relative_to(image_dir)
out_path = output_dir / rel_path.with_suffix(".json")
out_path.parent.mkdir(parents=True, exist_ok=True)
success = detect_checkerboard(
str(img_path),
pattern_size,
str(out_path),
visualize=False
)
if success:
grid_files.append(out_path)
print(f"Successfully detected {len(grid_files)}/{len(images)} checkerboards")
return grid_files
def compute_floor_transform(
grids_dir: Path,
calibration_dir: Path,
square_size: float = 10.0,
platform_height: float = 50.0
) -> dict:
"""Compute floor grounding 4x4 transform.
Returns:
Result dictionary with transform and statistics
"""
print(f"Computing floor transform from {grids_dir}...")
result = process_all_grids(
Path(grids_dir),
Path(calibration_dir),
square_size
)
# Update platform height if different from default
if platform_height != 50.0 and result.get("floor_transform"):
import numpy as np
# Recompute with new platform height
checkerboard_z_mean = result["summary"]["checkerboard_level"]["z_mean_cm"]
floor_z_estimate = checkerboard_z_mean - platform_height
z_translation = -floor_z_estimate
transform_4x4 = np.array([
[1.0, 0.0, 0.0, 0.0],
[0.0, 0.0, -1.0, 0.0],
[0.0, 1.0, 0.0, 0.0],
[0.0, z_translation, 0.0, 1.0]
])
mat_flat = transform_4x4.flatten()
mat_str = '{' + ' '.join(f'{v:.15e}' for v in mat_flat) + '}'
result["floor_transform"] = {
"matrix_4x4": transform_4x4.tolist(),
"matrix_4x4_flat": mat_flat.tolist(),
"mat_8i_format": mat_str,
"y_translation_cm": z_translation,
"description": "Transforms rig (Z-up) to world (Y-up) with floor at Y=0. Translation in last row (8i format)."
}
result["summary"]["floor_estimate"]["assumed_platform_height_cm"] = platform_height
result["summary"]["floor_estimate"]["floor_z_rig_cm"] = floor_z_estimate
return result
def write_output(result: dict, output_path: Path, format: str = "full"):
"""Write result to output file.
Args:
result: Full result dictionary
output_path: Output file path
format: 'full' for complete JSON, '8i' for just the mat string
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if format == "8i":
# Write just the 8i format matrix
if result.get("floor_transform"):
output = {"mat": result["floor_transform"]["mat_8i_format"]}
else:
output = {"error": "No transform computed"}
else:
# Write full result
output = result
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
print(f"Results written to {output_path}")
def main():
parser = argparse.ArgumentParser(
description="Floor Finder: Compute floor grounding transform from checkerboard images"
)
parser.add_argument(
"--input", "-i",
type=Path,
default=Path(os.environ.get("INPUT_DIR", "/data/input")),
help="Input directory containing images (.8ij, .jpg, .png)"
)
parser.add_argument(
"--calibration", "-c",
type=Path,
default=Path(os.environ.get("CALIBRATION_DIR", "/data/calibration")),
help="Directory containing camera calibration files (camera_*.json)"
)
parser.add_argument(
"--output", "-o",
type=Path,
default=Path(os.environ.get("OUTPUT_FILE", "/data/output/floor_transform.json")),
help="Output JSON file path"
)
parser.add_argument(
"--pattern", "-p",
type=str,
default=os.environ.get("PATTERN_SIZE", "8x6"),
help="Checkerboard pattern size as WxH (default: 8x6)"
)
parser.add_argument(
"--square-size", "-s",
type=float,
default=float(os.environ.get("SQUARE_SIZE", "10.0")),
help="Checkerboard square size in cm (default: 10.0)"
)
parser.add_argument(
"--platform-height",
type=float,
default=float(os.environ.get("PLATFORM_HEIGHT", "50.0")),
help="Height of checkerboard platform above floor in cm (default: 50.0)"
)
parser.add_argument(
"--format", "-f",
choices=["full", "8i"],
default=os.environ.get("OUTPUT_FORMAT", "8i"),
help="Output format: 'full' for complete JSON, '8i' for just mat string (default: 8i)"
)
parser.add_argument(
"--work-dir", "-w",
type=Path,
default=Path("/work"),
help="Working directory for intermediate files"
)
parser.add_argument(
"--keep-intermediate",
action="store_true",
help="Keep intermediate files (converted images, grid JSONs)"
)
args = parser.parse_args()
# Validate inputs
if not args.input.exists():
print(f"Error: Input directory not found: {args.input}", file=sys.stderr)
sys.exit(1)
if not args.calibration.exists():
print(f"Error: Calibration directory not found: {args.calibration}", file=sys.stderr)
sys.exit(1)
pattern_size = parse_pattern_size(args.pattern)
# Set up working directories
images_dir = args.work_dir / "images"
grids_dir = args.work_dir / "grids"
print("=" * 60)
print("Floor Finder Pipeline")
print("=" * 60)
print(f"Input: {args.input}")
print(f"Calibration: {args.calibration}")
print(f"Output: {args.output}")
print(f"Pattern: {pattern_size[0]}x{pattern_size[1]}")
print(f"Square size: {args.square_size} cm")
print(f"Platform: {args.platform_height} cm")
print("=" * 60)
# Check what type of input we have
_, input_type = find_images(args.input)
if input_type == "grids":
# Input already contains grid JSONs - skip steps 1 and 2
print("\n[1/3] Skipping - input contains pre-computed grid JSONs")
print("[2/3] Skipping - grids already available")
grids_dir = args.input # Use input directly
grid_files = list(args.input.rglob("*.json"))
print(f" Found {len(grid_files)} grid files")
else:
# Step 1: Convert images to JPEG
print("\n[1/3] Converting images to JPEG...")
jpeg_images = convert_images_to_jpeg(args.input, images_dir)
if not jpeg_images:
print("Error: No images found or converted", file=sys.stderr)
sys.exit(1)
print(f" {len(jpeg_images)} images ready")
# Step 2: Detect checkerboard corners
print("\n[2/3] Detecting checkerboard corners...")
grid_files = detect_checkerboards(images_dir, grids_dir, pattern_size)
if not grid_files:
print("Error: No checkerboards detected", file=sys.stderr)
sys.exit(1)
# Step 3: Compute floor transform
print("\n[3/3] Computing floor transform...")
result = compute_floor_transform(
grids_dir,
args.calibration,
args.square_size,
args.platform_height
)
# Write output
print("\n" + "=" * 60)
write_output(result, args.output, args.format)
# Print summary
if result.get("floor_transform"):
ft = result["floor_transform"]
print(f"\nFloor transform Y translation: {ft['y_translation_cm']:.2f} cm")
print(f"8i format: {ft['mat_8i_format'][:60]}...")
summary = result.get("summary", {})
print(f"\nProcessed {summary.get('total_processed', 0)} cameras")
if cl := summary.get("checkerboard_level"):
print(f"Checkerboard Z: {cl['z_mean_cm']:.1f} +/- {cl['z_std_cm']:.1f} cm")
if fe := summary.get("floor_estimate"):
print(f"Floor Z estimate: {fe['floor_z_rig_cm']:.1f} cm")
# Cleanup
if not args.keep_intermediate:
import shutil
shutil.rmtree(args.work_dir, ignore_errors=True)
print("\nDone!")
if __name__ == "__main__":
main()