Skip to content

Data parser

Here, we provide the documentation for the data parser.

DataParser

A class for parsing data files in the SceneFun3D dataset.

Source code in scenefun3d/utils/data_parser.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
class DataParser:
    """
    A class for parsing data files in the SceneFun3D dataset.
    """

    def __init__(self, data_root_path):
        """
        Initialize the DataParser instance with the root path.

        Args:
            data_root_path (str): The root path where data is located.
        """
        self.data_root_path = os.path.join(data_root_path)

    def TrajStringToMatrix(self, traj_str):
        """ 
        Converts a line from the camera trajectory file into translation and rotation matrices.

        Args:
            traj_str (str): A space-delimited string where each line represents a camera pose at a particular timestamp. 
                            The line consists of seven columns:
                - Column 1: timestamp
                - Columns 2-4: rotation (axis-angle representation in radians)
                - Columns 5-7: translation (in meters)

        Returns:
            (tuple): A tuple containing:
                - ts (str): Timestamp.
                - Rt (numpy.ndarray): 4x4 transformation matrix representing rotation and translation.

        Raises:
            AssertionError: If the input string does not have exactly seven columns.
        """
        tokens = traj_str.split()
        assert len(tokens) == 7
        ts = tokens[0]

        # Rotation in angle axis
        angle_axis = [float(tokens[1]), float(tokens[2]), float(tokens[3])]
        r_w_to_p = convert_angle_axis_to_matrix3(np.asarray(angle_axis))

        # Translation
        t_w_to_p = np.asarray([float(tokens[4]), float(tokens[5]), float(tokens[6])])
        extrinsics = np.eye(4, 4)
        extrinsics[:3, :3] = r_w_to_p
        extrinsics[:3, -1] = t_w_to_p
        Rt = np.linalg.inv(extrinsics)

        return (ts, Rt)

    def get_camera_trajectory(self, visit_id, video_id, pose_source="colmap"):
        """
        Retrieve the camera trajectory from a file and convert it into a dictionary whose keys are timestamps and 
        values are the corresponding camera poses.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.
            pose_source (str, optional): Specifies the trajectory asset type, either "colmap" or "arkit". Defaults to "colmap".

        Returns:
            (dict): A dictionary where keys are timestamps (rounded to 3 decimal points) and values are 4x4 transformation matrices representing camera poses.

        Raises:
            AssertionError: If an unsupported trajectory asset type is provided.
        """
        assert pose_source in ["colmap", "arkit"], f"Unknown option {pose_source}"

        data_asset_identifier = "hires_poses" if pose_source == "colmap" else "lowres_poses"
        traj_file_path = self.get_data_asset_path(data_asset_identifier=f"{data_asset_identifier}", visit_id=visit_id, video_id=video_id)

        with open(traj_file_path) as f:
            traj = f.readlines()

        # Convert trajectory to a dictionary
        poses_from_traj = {}
        for line in traj:
            traj_timestamp = line.split(" ")[0]

            if pose_source == "colmap":
                poses_from_traj[f"{float(traj_timestamp)}"] = np.array(self.TrajStringToMatrix(line)[1].tolist())
            elif pose_source == "arkit":
                poses_from_traj[f"{round(float(traj_timestamp), 3):.3f}"] = np.array(self.TrajStringToMatrix(line)[1].tolist())

        return poses_from_traj

    def get_laser_scan(self, visit_id):
        """
        Load a point cloud from a .ply file containing laser scan data.

        Args:
            visit_id (str): The identifier of the scene.

        Returns:
            (open3d.geometry.PointCloud): A point cloud object containing the laser scan data (i.e., XYZRGB point cloud).
        """
        laser_scan_path = self.get_data_asset_path(data_asset_identifier="laser_scan_5mm", visit_id=visit_id)

        pcd = o3d.io.read_point_cloud(laser_scan_path)

        return pcd

    def get_arkit_reconstruction(self, visit_id, video_id, format="point_cloud"):
        """
        Load ARKit mesh reconstruction data based on the iPad video sequence from a .ply file.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.
            format (str, optional): The format of the mesh reconstruction data to load. 
                                    Supported formats are "point_cloud" and "mesh". 
                                    Defaults to "point_cloud".

        Returns:
            (Union[open3d.geometry.PointCloud, open3d.geometry.TriangleMesh]): 
                The loaded mesh reconstruction data in the specified format.

        Raises:
            ValueError: If an unsupported 3D data format is specified.
        """
        mesh_path = self.get_data_asset_path(data_asset_identifier="arkit_mesh", visit_id=visit_id, video_id=video_id)

        mesh = None 

        if format == "point_cloud":
            mesh = o3d.io.read_point_cloud(mesh_path)
        elif format == "mesh":
            mesh = o3d.io.read_triangle_mesh(mesh_path)
        else: 
            raise ValueError(f"Unknown mesh format {format}")

        return mesh

    def get_rgb_frames(self, visit_id, video_id, data_asset_identifier="hires_wide"):
        """
        Retrieve the paths to the RGB frames for a given scene and video sequence.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.
            data_asset_identifier (str, optional): The data asset type for the RGB frames.
                                                   Can be either "hires_wide" or "lowres_wide". 
                                                   Defaults to "hires_wide".

        Returns:
            (dict): A dictionary mapping frame timestamps to their corresponding file paths.

        Raises:
            ValueError: If an unsupported data asset identifier is provided.
            FileNotFoundError: If no frames are found at the specified path.
        """
        frame_mapping = {}
        if data_asset_identifier == "hires_wide":
            rgb_frames_path = self.get_data_asset_path(data_asset_identifier="hires_wide", visit_id=visit_id, video_id=video_id)

            frames = sorted(glob.glob(os.path.join(rgb_frames_path, "*.jpg")))
            if not frames:
                raise FileNotFoundError(f"No RGB frames found in {rgb_frames_path}")
            frame_timestamps = [os.path.basename(x).split(".jpg")[0].split("_")[1] for x in frames]

        elif data_asset_identifier == "lowres_wide":
            rgb_frames_path = self.get_data_asset_path(data_asset_identifier="lowres_wide", visit_id=visit_id, video_id=video_id)

            frames = sorted(glob.glob(os.path.join(rgb_frames_path, "*.png")))
            if not frames:
                raise FileNotFoundError(f"No RGB frames found in {rgb_frames_path}")
            frame_timestamps = [os.path.basename(x).split(".png")[0].split("_")[1] for x in frames]
        else: 
            raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for RGB frames")

        # Create mapping from timestamp to full path
        frame_mapping = {timestamp: frame for timestamp, frame in zip(frame_timestamps, frames)}

        return frame_mapping

    def get_depth_frames(self, visit_id, video_id, data_asset_identifier="hires_depth"):
        """
        Retrieve the paths to the depth frames for a given scene and video sequence.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.
            data_asset_identifier (str, optional): The data asset type for the depth frames.
                                                   Can be either "hires_depth" or "lowres_depth". 
                                                   Defaults to "hires_depth".

        Returns:
            (dict): A dictionary mapping frame timestamps to their corresponding file paths.

        Raises:
            ValueError: If an unsupported data asset identifier is provided.
            FileNotFoundError: If no depth frames are found at the specified path.
        """
        frame_mapping = {}
        if data_asset_identifier == "hires_depth":
            depth_frames_path = self.get_data_asset_path(data_asset_identifier="hires_depth", visit_id=visit_id, video_id=video_id)

        elif data_asset_identifier == "lowres_depth":
            depth_frames_path = self.get_data_asset_path(data_asset_identifier="lowres_depth", visit_id=visit_id, video_id=video_id)

        else: 
            raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for depth frames")

        frames = sorted(glob.glob(os.path.join(depth_frames_path, "*.png")))
        if not frames:
            raise FileNotFoundError(f"No depth frames found in {depth_frames_path}")
        frame_timestamps = [os.path.basename(x).split(".png")[0].split("_")[1] for x in frames]

         # Create mapping from timestamp to full path
        frame_mapping = {timestamp: frame for timestamp, frame in zip(frame_timestamps, frames)}

        return frame_mapping

    def get_camera_intrinsics(self, visit_id, video_id, data_asset_identifier="hires_wide_intrinsics"):
        """
        Retrieve the camera intrinsics for a given scene and video sequence.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.
            data_asset_identifier (str, optional): The data asset type for camera intrinsics.
                                                   Can be either "hires_wide_intrinsics" or "lowres_wide_intrinsics". 
                                                   Defaults to "hires_wide_intrinsics".

        Returns:
            (dict): A dictionary mapping timestamps to file paths of camera intrinsics data.

        Raises:
            ValueError: If an unsupported data asset identifier is provided.
            FileNotFoundError: If no intrinsics files are found at the specified path.
        """
        intrinsics_mapping = {}
        if data_asset_identifier == "hires_wide_intrinsics":
            intrinsics_path = self.get_data_asset_path(data_asset_identifier="hires_wide_intrinsics", visit_id=visit_id, video_id=video_id)

        elif data_asset_identifier == "lowres_wide_intrinsics":
            intrinsics_path = self.get_data_asset_path(data_asset_identifier="lowres_wide_intrinsics", visit_id=visit_id, video_id=video_id)

        else: 
            raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for camera intrinsics")

        intrinsics = sorted(glob.glob(os.path.join(intrinsics_path, "*.pincam")))

        if not intrinsics:
            raise FileNotFoundError(f"No camera intrinsics found in {intrinsics_path}")

        intrinsics_timestamps = [os.path.basename(x).split(".pincam")[0].split("_")[1] for x in intrinsics]

        # Create mapping from timestamp to full path
        intrinsics_mapping = {timestamp: cur_intrinsics for timestamp, cur_intrinsics in zip(intrinsics_timestamps, intrinsics)}

        return intrinsics_mapping

    def get_nearest_pose(self, 
                            desired_timestamp,
                            poses_from_traj, 
                            time_distance_threshold = np.inf):
        """
        Get the nearest pose to a desired timestamp from a dictionary of poses.

        Args:
            desired_timestamp (str): The timestamp of the desired pose.
            poses_from_traj (dict): A dictionary where keys are timestamps (as strings) 
                                    and values are 4x4 transformation matrices representing poses.
            time_distance_threshold (float, optional): The maximum allowable time difference 
                                                    between the desired timestamp and the nearest pose timestamp. Defaults to np.inf.

        Returns:
            (Union[numpy.ndarray, None]): The nearest pose as a 4x4 transformation matrix if found within the specified threshold, else None.

        Note:
            The function will return the pose closest to the desired timestamp if it exists in the provided poses.
            If the closest pose is further away than the specified `time_distance_threshold`, the function returns `None`.
        """
        max_pose_timestamp = max(float(key) for key in poses_from_traj.keys())
        min_pose_timestamp = min(float(key) for key in poses_from_traj.keys()) 

        if float(desired_timestamp) < min_pose_timestamp or \
            float(desired_timestamp) > max_pose_timestamp:
            return None

        if desired_timestamp in poses_from_traj.keys():
            H = poses_from_traj[desired_timestamp]
        else:
            closest_timestamp = min(
                poses_from_traj.keys(), 
                key=lambda x: abs(float(x) - float(desired_timestamp))
            )

            if abs(float(closest_timestamp) - float(desired_timestamp)) > time_distance_threshold:
                return None

            H = poses_from_traj[closest_timestamp]

        desired_pose = H

        assert desired_pose.shape == (4, 4)

        return desired_pose



    def get_interpolated_pose(self, 
                                desired_timestamp,
                                poses_from_traj, 
                                time_distance_threshold = np.inf,
                                interpolation_method = 'split',
                                frame_distance_threshold = np.inf):
        """
        Get the interpolated pose for a desired timestamp from a dictionary of poses.

        Args:
            desired_timestamp (str): The timestamp of the desired pose.
            poses_from_traj (dict): A dictionary where keys are timestamps (as strings) 
                                    and values are 4x4 transformation matrices representing poses.
            time_distance_threshold (float, optional): The maximum allowable time difference 
                                                    between the desired timestamp and the nearest pose timestamps. Defaults to np.inf.
            interpolation_method (str, optional): Method used for interpolation. Defaults to 'split'.
                - "split": Performs rigid body motion interpolation in SO(3) x R^3.
                - "geodesic_path": Performs rigid body motion interpolation in SE(3).
            frame_distance_threshold (float, optional): Maximum allowable frame distance between two consecutive poses. Defaults to np.inf.

        Returns:
            (Union[numpy.ndarray, None]): The interpolated pose as a 4x4 transformation matrix, or None if not found within thresholds.

        Raises:
            ValueError: If an unsupported interpolation method is specified.

        Note:
            This function uses interpolation between two nearest poses if `desired_timestamp` is not directly available.
            The interpolation method can be either "split" (for rigid body interpolation in SO(3) x R^3) or "geodesic_path" (for SE(3)).
            If the difference between the timestamps or poses is beyond the specified thresholds, the function will return None.
        """

        max_pose_timestamp = max(float(key) for key in poses_from_traj.keys())
        min_pose_timestamp = min(float(key) for key in poses_from_traj.keys()) 

        if float(desired_timestamp) < min_pose_timestamp or \
            float(desired_timestamp) > max_pose_timestamp:
            return None

        if desired_timestamp in poses_from_traj.keys():
            H = poses_from_traj[desired_timestamp]
        else:
            greater_closest_timestamp = min(
                [x for x in poses_from_traj.keys() if float(x) > float(desired_timestamp) ], 
                key=lambda x: abs(float(x) - float(desired_timestamp))
            )
            smaller_closest_timestamp = min(
                [x for x in poses_from_traj.keys() if float(x) < float(desired_timestamp) ], 
                key=lambda x: abs(float(x) - float(desired_timestamp))
            )

            if abs(float(greater_closest_timestamp) - float(desired_timestamp)) > time_distance_threshold or \
                abs(float(smaller_closest_timestamp) - float(desired_timestamp)) > time_distance_threshold:
                return None

            H0 = poses_from_traj[smaller_closest_timestamp]
            H1 = poses_from_traj[greater_closest_timestamp]
            H0_t = hm.trans(H0)
            H1_t = hm.trans(H1)

            if np.linalg.norm(H0_t - H1_t) > frame_distance_threshold:
                return None

            if interpolation_method == "split":
                H = rigid_interp_split(
                    float(desired_timestamp), 
                    poses_from_traj[smaller_closest_timestamp], 
                    float(smaller_closest_timestamp), 
                    poses_from_traj[greater_closest_timestamp], 
                    float(greater_closest_timestamp)
                )
            elif interpolation_method == "geodesic_path":
                H = rigid_interp_geodesic(
                    float(desired_timestamp), 
                    poses_from_traj[smaller_closest_timestamp], 
                    float(smaller_closest_timestamp), 
                    poses_from_traj[greater_closest_timestamp], 
                    float(greater_closest_timestamp)
                )
            else:
                raise ValueError(f"Unknown interpolation method {interpolation_method}")

        desired_pose = H

        assert desired_pose.shape == (4, 4)

        return desired_pose


    def get_transform(self, visit_id, video_id):
        """
        Load the transformation matrix from a .npy file. This transformation matrix converts coordinates from the Faro laser scan coordinate system to the ARKit coodinate system.

        Args:
            visit_id (str): The identifier of the scene.
            video_id (str): The identifier of the video sequence.

        Returns:
            (numpy.ndarray): The estimated transformation matrix loaded from the file.
        """
        transform_path = self.get_data_asset_path(data_asset_identifier="transform", visit_id=visit_id, video_id=video_id)
        transform = np.load(transform_path) 
        return transform


    def read_rgb_frame(self, rgb_frame_path, normalize=False):
        """
        Read an RGB frame from the specified path.

        Args:
            rgb_frame_path (str): The full path to the RGB frame file.
            normalize (bool, optional): Whether to normalize the pixel values to the range [0, 1]. Defaults to False.

        Returns:
            (numpy.ndarray): The RGB frame as a NumPy array with the RGB color values.

        """
        color = imageio.v2.imread(rgb_frame_path)

        if normalize:
            color = color / 255.

        return color

    def read_depth_frame(self, depth_frame_path, conversion_factor=1000):
        """
        Read a depth frame from the specified path and convert it to depth values.

        Args:
            depth_frame_path (str): The full path to the depth frame file.
            conversion_factor (float, optional): The conversion factor to convert pixel values to depth values. Defaults to 1000 to convert millimeters to meters.

        Returns:
            (numpy.ndarray): The depth frame as a NumPy array with the depth values.
        """

        depth = imageio.v2.imread(depth_frame_path) / conversion_factor

        return depth

    def read_camera_intrinsics(self, intrinsics_file_path, format="tuple"):
        """
        Parses a file containing camera intrinsic parameters and returns them in the specified format.

        Args:
            intrinsics_file_path (str): The path to the file containing camera intrinsic parameters.
            format (str, optional): The format in which to return the camera intrinsic parameters.
                                    Supported formats are "tuple" and "matrix". Defaults to "tuple".

        Returns:
            (Union[tuple, numpy.ndarray]): Camera intrinsic parameters in the specified format.

                - If format is "tuple", returns a tuple \\(w, h, fx, fy, hw, hh\\).
                - If format is "matrix", returns a 3x3 numpy array representing the camera matrix.

        Raises:
            ValueError: If an unsupported format is specified.
        """
        w, h, fx, fy, hw, hh = np.loadtxt(intrinsics_file_path)

        if format == "tuple":
            return (w, h, fx, fy, hw, hh)
        elif format == "matrix":
            return np.asarray([[fx, 0, hw], [0, fy, hh], [0, 0, 1]])
        else:
            raise ValueError(f"Unknown format {format}")

    def get_crop_mask(self, visit_id, return_indices=False):
        """
        Load the crop mask from a .npy file.

        Args:
            visit_id (str): The identifier of the scene.
            return_indices (bool, optional): Whether to return the indices of the cropped points. Defaults to False.

        Returns:
            (numpy.ndarray): The crop mask loaded from the file. If `return_indices` is False, returns a Numpy array that is a binary mask of the indices to keep. If `return_indices` is True, returns a Numpy array containing the indices of the points to keep.
        """
        # crop_mask_path = os.path.join(self.data_root_path, visit_id, f"{visit_id}_crop_mask.npy")
        crop_mask_path = self.get_data_asset_path(data_asset_identifier="crop_mask", visit_id=visit_id)
        crop_mask = np.load(crop_mask_path)

        if return_indices:
            return np.where(crop_mask)[0]
        else:
            return crop_mask

    def get_cropped_laser_scan(self, visit_id, laser_scan):
        """
        Crop a laser scan using a crop mask.

        Args:
            visit_id (str): The identifier of the scene.
            laser_scan (open3d.geometry.PointCloud): The laser scan point cloud to be cropped.

        Returns:
            (open3d.geometry.PointCloud): The cropped laser scan point cloud.
        """
        filtered_idx_list = self.get_crop_mask(visit_id, return_indices=True)

        laser_scan_points = np.array(laser_scan.points)
        laser_scan_colors = np.array(laser_scan.colors)
        laser_scan_points = laser_scan_points[filtered_idx_list]
        laser_scan_colors = laser_scan_colors[filtered_idx_list]

        cropped_laser_scan = o3d.geometry.PointCloud()
        cropped_laser_scan.points = o3d.utility.Vector3dVector(laser_scan_points)
        cropped_laser_scan.colors = o3d.utility.Vector3dVector(laser_scan_colors)

        return cropped_laser_scan


    def get_data_asset_path(self, data_asset_identifier, visit_id, video_id=None):
        """
        Get the file path for a specified data asset.

        Args:
            data_asset_identifier (str): A string identifier for the data asset.
            visit_id (str or int): The identifier for the visit (scene).
            video_id (str or int, optional): The identifier for the video sequence. Required if specified data asset requires a video identifier.

        Returns:
            (Path): A Path object representing the file path to the specified data asset.

        Raises:
            AssertionError: If the `data_asset_identifier` is not valid or if `video_id` is required but not provided.
        """
        assert data_asset_identifier in data_asset_to_path, f"Data asset identifier '{data_asset_identifier}' is not valid"

        data_path = data_asset_to_path[data_asset_identifier]

        if ("<video_id>" in data_path) and (video_id is None):
            assert False, f"video_id must be specified for the data asset identifier '{data_asset_identifier}'"

        visit_id = str(visit_id)

        data_path = (
            data_path
                .replace("<data_dir>", self.data_root_path)
                .replace("<visit_id>", visit_id)
        )

        if "<video_id>" in data_path:
            video_id = str(video_id)
            data_path = data_path.replace("<video_id>", video_id)

        return data_path


    def get_annotations(self, visit_id, group_excluded_points=True):
        """
        Retrieve the functionality annotations for a specified scene.

        Args:
            visit_id (str or int): The identifier for the scene.
            group_excluded_points (bool, optional): If True, all annotations with the label "exclude" will be grouped together 
                                                    into a single annotation instance. Defaults to True.

        Returns:
            (list): A list of annotations, each represented as a dictionary.

        """
        annotations_path = self.get_data_asset_path(data_asset_identifier="annotations", visit_id=visit_id)

        annotations_data = None
        with open(annotations_path, 'r') as f:
            annotations_data = json.load(f)["annotations"]

        if group_excluded_points:
            # group the excluded points into a single annotation instance
            exclude_indices_set = set()
            first_exclude_annotation = None
            filtered_annotation_data = []

            for annotation in annotations_data:
                if annotation["label"] == "exclude":
                    if first_exclude_annotation is None:
                        first_exclude_annotation = annotation
                    exclude_indices_set.update(annotation["indices"])
                else:
                    filtered_annotation_data.append(annotation)

            if first_exclude_annotation:
                first_exclude_annotation["indices"] = sorted(list(exclude_indices_set))
                filtered_annotation_data.append(first_exclude_annotation)

            annotations_data = filtered_annotation_data

        return annotations_data


    def get_descriptions(self, visit_id):
        """
        Retrieve the natural language task descriptions for a specified scene.

        Args:
            visit_id (str or int): The identifier for the scene.

        Returns:
            (list): A list of descriptions, each represented as a dictionary.
        """
        descriptions_path = self.get_data_asset_path(data_asset_identifier="descriptions", visit_id=visit_id)

        with open(descriptions_path, 'r') as f:
            descriptions_data = json.load(f)["descriptions"]

        return descriptions_data


    def get_motions(self, visit_id):
        """
        Retrieve the motion annotations for a specified scene.

        Args:
            visit_id (str or int): The identifier for the scene.

        Returns:
            (list): A list of motions, each represented as a dictionary.
        """
        motions_path = self.get_data_asset_path(data_asset_identifier="motions", visit_id=visit_id)

        with open(motions_path, 'r') as f:
            motions_data = json.load(f)["motions"]

        return motions_data

TrajStringToMatrix(traj_str)

Converts a line from the camera trajectory file into translation and rotation matrices.

Parameters:

Name Type Description Default
traj_str str

A space-delimited string where each line represents a camera pose at a particular timestamp. The line consists of seven columns: - Column 1: timestamp - Columns 2-4: rotation (axis-angle representation in radians) - Columns 5-7: translation (in meters)

required

Returns:

Type Description
tuple

A tuple containing: - ts (str): Timestamp. - Rt (numpy.ndarray): 4x4 transformation matrix representing rotation and translation.

Raises:

Type Description
AssertionError

If the input string does not have exactly seven columns.

Source code in scenefun3d/utils/data_parser.py
def TrajStringToMatrix(self, traj_str):
    """ 
    Converts a line from the camera trajectory file into translation and rotation matrices.

    Args:
        traj_str (str): A space-delimited string where each line represents a camera pose at a particular timestamp. 
                        The line consists of seven columns:
            - Column 1: timestamp
            - Columns 2-4: rotation (axis-angle representation in radians)
            - Columns 5-7: translation (in meters)

    Returns:
        (tuple): A tuple containing:
            - ts (str): Timestamp.
            - Rt (numpy.ndarray): 4x4 transformation matrix representing rotation and translation.

    Raises:
        AssertionError: If the input string does not have exactly seven columns.
    """
    tokens = traj_str.split()
    assert len(tokens) == 7
    ts = tokens[0]

    # Rotation in angle axis
    angle_axis = [float(tokens[1]), float(tokens[2]), float(tokens[3])]
    r_w_to_p = convert_angle_axis_to_matrix3(np.asarray(angle_axis))

    # Translation
    t_w_to_p = np.asarray([float(tokens[4]), float(tokens[5]), float(tokens[6])])
    extrinsics = np.eye(4, 4)
    extrinsics[:3, :3] = r_w_to_p
    extrinsics[:3, -1] = t_w_to_p
    Rt = np.linalg.inv(extrinsics)

    return (ts, Rt)

__init__(data_root_path)

Initialize the DataParser instance with the root path.

Parameters:

Name Type Description Default
data_root_path str

The root path where data is located.

required
Source code in scenefun3d/utils/data_parser.py
def __init__(self, data_root_path):
    """
    Initialize the DataParser instance with the root path.

    Args:
        data_root_path (str): The root path where data is located.
    """
    self.data_root_path = os.path.join(data_root_path)

get_annotations(visit_id, group_excluded_points=True)

Retrieve the functionality annotations for a specified scene.

Parameters:

Name Type Description Default
visit_id str or int

The identifier for the scene.

required
group_excluded_points bool

If True, all annotations with the label "exclude" will be grouped together into a single annotation instance. Defaults to True.

True

Returns:

Type Description
list

A list of annotations, each represented as a dictionary.

Source code in scenefun3d/utils/data_parser.py
def get_annotations(self, visit_id, group_excluded_points=True):
    """
    Retrieve the functionality annotations for a specified scene.

    Args:
        visit_id (str or int): The identifier for the scene.
        group_excluded_points (bool, optional): If True, all annotations with the label "exclude" will be grouped together 
                                                into a single annotation instance. Defaults to True.

    Returns:
        (list): A list of annotations, each represented as a dictionary.

    """
    annotations_path = self.get_data_asset_path(data_asset_identifier="annotations", visit_id=visit_id)

    annotations_data = None
    with open(annotations_path, 'r') as f:
        annotations_data = json.load(f)["annotations"]

    if group_excluded_points:
        # group the excluded points into a single annotation instance
        exclude_indices_set = set()
        first_exclude_annotation = None
        filtered_annotation_data = []

        for annotation in annotations_data:
            if annotation["label"] == "exclude":
                if first_exclude_annotation is None:
                    first_exclude_annotation = annotation
                exclude_indices_set.update(annotation["indices"])
            else:
                filtered_annotation_data.append(annotation)

        if first_exclude_annotation:
            first_exclude_annotation["indices"] = sorted(list(exclude_indices_set))
            filtered_annotation_data.append(first_exclude_annotation)

        annotations_data = filtered_annotation_data

    return annotations_data

get_arkit_reconstruction(visit_id, video_id, format='point_cloud')

Load ARKit mesh reconstruction data based on the iPad video sequence from a .ply file.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required
format str

The format of the mesh reconstruction data to load. Supported formats are "point_cloud" and "mesh". Defaults to "point_cloud".

'point_cloud'

Returns:

Type Description
Union[PointCloud, TriangleMesh]

The loaded mesh reconstruction data in the specified format.

Raises:

Type Description
ValueError

If an unsupported 3D data format is specified.

Source code in scenefun3d/utils/data_parser.py
def get_arkit_reconstruction(self, visit_id, video_id, format="point_cloud"):
    """
    Load ARKit mesh reconstruction data based on the iPad video sequence from a .ply file.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.
        format (str, optional): The format of the mesh reconstruction data to load. 
                                Supported formats are "point_cloud" and "mesh". 
                                Defaults to "point_cloud".

    Returns:
        (Union[open3d.geometry.PointCloud, open3d.geometry.TriangleMesh]): 
            The loaded mesh reconstruction data in the specified format.

    Raises:
        ValueError: If an unsupported 3D data format is specified.
    """
    mesh_path = self.get_data_asset_path(data_asset_identifier="arkit_mesh", visit_id=visit_id, video_id=video_id)

    mesh = None 

    if format == "point_cloud":
        mesh = o3d.io.read_point_cloud(mesh_path)
    elif format == "mesh":
        mesh = o3d.io.read_triangle_mesh(mesh_path)
    else: 
        raise ValueError(f"Unknown mesh format {format}")

    return mesh

get_camera_intrinsics(visit_id, video_id, data_asset_identifier='hires_wide_intrinsics')

Retrieve the camera intrinsics for a given scene and video sequence.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required
data_asset_identifier str

The data asset type for camera intrinsics. Can be either "hires_wide_intrinsics" or "lowres_wide_intrinsics". Defaults to "hires_wide_intrinsics".

'hires_wide_intrinsics'

Returns:

Type Description
dict

A dictionary mapping timestamps to file paths of camera intrinsics data.

Raises:

Type Description
ValueError

If an unsupported data asset identifier is provided.

FileNotFoundError

If no intrinsics files are found at the specified path.

Source code in scenefun3d/utils/data_parser.py
def get_camera_intrinsics(self, visit_id, video_id, data_asset_identifier="hires_wide_intrinsics"):
    """
    Retrieve the camera intrinsics for a given scene and video sequence.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.
        data_asset_identifier (str, optional): The data asset type for camera intrinsics.
                                               Can be either "hires_wide_intrinsics" or "lowres_wide_intrinsics". 
                                               Defaults to "hires_wide_intrinsics".

    Returns:
        (dict): A dictionary mapping timestamps to file paths of camera intrinsics data.

    Raises:
        ValueError: If an unsupported data asset identifier is provided.
        FileNotFoundError: If no intrinsics files are found at the specified path.
    """
    intrinsics_mapping = {}
    if data_asset_identifier == "hires_wide_intrinsics":
        intrinsics_path = self.get_data_asset_path(data_asset_identifier="hires_wide_intrinsics", visit_id=visit_id, video_id=video_id)

    elif data_asset_identifier == "lowres_wide_intrinsics":
        intrinsics_path = self.get_data_asset_path(data_asset_identifier="lowres_wide_intrinsics", visit_id=visit_id, video_id=video_id)

    else: 
        raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for camera intrinsics")

    intrinsics = sorted(glob.glob(os.path.join(intrinsics_path, "*.pincam")))

    if not intrinsics:
        raise FileNotFoundError(f"No camera intrinsics found in {intrinsics_path}")

    intrinsics_timestamps = [os.path.basename(x).split(".pincam")[0].split("_")[1] for x in intrinsics]

    # Create mapping from timestamp to full path
    intrinsics_mapping = {timestamp: cur_intrinsics for timestamp, cur_intrinsics in zip(intrinsics_timestamps, intrinsics)}

    return intrinsics_mapping

get_camera_trajectory(visit_id, video_id, pose_source='colmap')

Retrieve the camera trajectory from a file and convert it into a dictionary whose keys are timestamps and values are the corresponding camera poses.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required
pose_source str

Specifies the trajectory asset type, either "colmap" or "arkit". Defaults to "colmap".

'colmap'

Returns:

Type Description
dict

A dictionary where keys are timestamps (rounded to 3 decimal points) and values are 4x4 transformation matrices representing camera poses.

Raises:

Type Description
AssertionError

If an unsupported trajectory asset type is provided.

Source code in scenefun3d/utils/data_parser.py
def get_camera_trajectory(self, visit_id, video_id, pose_source="colmap"):
    """
    Retrieve the camera trajectory from a file and convert it into a dictionary whose keys are timestamps and 
    values are the corresponding camera poses.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.
        pose_source (str, optional): Specifies the trajectory asset type, either "colmap" or "arkit". Defaults to "colmap".

    Returns:
        (dict): A dictionary where keys are timestamps (rounded to 3 decimal points) and values are 4x4 transformation matrices representing camera poses.

    Raises:
        AssertionError: If an unsupported trajectory asset type is provided.
    """
    assert pose_source in ["colmap", "arkit"], f"Unknown option {pose_source}"

    data_asset_identifier = "hires_poses" if pose_source == "colmap" else "lowres_poses"
    traj_file_path = self.get_data_asset_path(data_asset_identifier=f"{data_asset_identifier}", visit_id=visit_id, video_id=video_id)

    with open(traj_file_path) as f:
        traj = f.readlines()

    # Convert trajectory to a dictionary
    poses_from_traj = {}
    for line in traj:
        traj_timestamp = line.split(" ")[0]

        if pose_source == "colmap":
            poses_from_traj[f"{float(traj_timestamp)}"] = np.array(self.TrajStringToMatrix(line)[1].tolist())
        elif pose_source == "arkit":
            poses_from_traj[f"{round(float(traj_timestamp), 3):.3f}"] = np.array(self.TrajStringToMatrix(line)[1].tolist())

    return poses_from_traj

get_crop_mask(visit_id, return_indices=False)

Load the crop mask from a .npy file.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
return_indices bool

Whether to return the indices of the cropped points. Defaults to False.

False

Returns:

Type Description
ndarray

The crop mask loaded from the file. If return_indices is False, returns a Numpy array that is a binary mask of the indices to keep. If return_indices is True, returns a Numpy array containing the indices of the points to keep.

Source code in scenefun3d/utils/data_parser.py
def get_crop_mask(self, visit_id, return_indices=False):
    """
    Load the crop mask from a .npy file.

    Args:
        visit_id (str): The identifier of the scene.
        return_indices (bool, optional): Whether to return the indices of the cropped points. Defaults to False.

    Returns:
        (numpy.ndarray): The crop mask loaded from the file. If `return_indices` is False, returns a Numpy array that is a binary mask of the indices to keep. If `return_indices` is True, returns a Numpy array containing the indices of the points to keep.
    """
    # crop_mask_path = os.path.join(self.data_root_path, visit_id, f"{visit_id}_crop_mask.npy")
    crop_mask_path = self.get_data_asset_path(data_asset_identifier="crop_mask", visit_id=visit_id)
    crop_mask = np.load(crop_mask_path)

    if return_indices:
        return np.where(crop_mask)[0]
    else:
        return crop_mask

get_cropped_laser_scan(visit_id, laser_scan)

Crop a laser scan using a crop mask.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
laser_scan PointCloud

The laser scan point cloud to be cropped.

required

Returns:

Type Description
PointCloud

The cropped laser scan point cloud.

Source code in scenefun3d/utils/data_parser.py
def get_cropped_laser_scan(self, visit_id, laser_scan):
    """
    Crop a laser scan using a crop mask.

    Args:
        visit_id (str): The identifier of the scene.
        laser_scan (open3d.geometry.PointCloud): The laser scan point cloud to be cropped.

    Returns:
        (open3d.geometry.PointCloud): The cropped laser scan point cloud.
    """
    filtered_idx_list = self.get_crop_mask(visit_id, return_indices=True)

    laser_scan_points = np.array(laser_scan.points)
    laser_scan_colors = np.array(laser_scan.colors)
    laser_scan_points = laser_scan_points[filtered_idx_list]
    laser_scan_colors = laser_scan_colors[filtered_idx_list]

    cropped_laser_scan = o3d.geometry.PointCloud()
    cropped_laser_scan.points = o3d.utility.Vector3dVector(laser_scan_points)
    cropped_laser_scan.colors = o3d.utility.Vector3dVector(laser_scan_colors)

    return cropped_laser_scan

get_data_asset_path(data_asset_identifier, visit_id, video_id=None)

Get the file path for a specified data asset.

Parameters:

Name Type Description Default
data_asset_identifier str

A string identifier for the data asset.

required
visit_id str or int

The identifier for the visit (scene).

required
video_id str or int

The identifier for the video sequence. Required if specified data asset requires a video identifier.

None

Returns:

Type Description
Path

A Path object representing the file path to the specified data asset.

Raises:

Type Description
AssertionError

If the data_asset_identifier is not valid or if video_id is required but not provided.

Source code in scenefun3d/utils/data_parser.py
def get_data_asset_path(self, data_asset_identifier, visit_id, video_id=None):
    """
    Get the file path for a specified data asset.

    Args:
        data_asset_identifier (str): A string identifier for the data asset.
        visit_id (str or int): The identifier for the visit (scene).
        video_id (str or int, optional): The identifier for the video sequence. Required if specified data asset requires a video identifier.

    Returns:
        (Path): A Path object representing the file path to the specified data asset.

    Raises:
        AssertionError: If the `data_asset_identifier` is not valid or if `video_id` is required but not provided.
    """
    assert data_asset_identifier in data_asset_to_path, f"Data asset identifier '{data_asset_identifier}' is not valid"

    data_path = data_asset_to_path[data_asset_identifier]

    if ("<video_id>" in data_path) and (video_id is None):
        assert False, f"video_id must be specified for the data asset identifier '{data_asset_identifier}'"

    visit_id = str(visit_id)

    data_path = (
        data_path
            .replace("<data_dir>", self.data_root_path)
            .replace("<visit_id>", visit_id)
    )

    if "<video_id>" in data_path:
        video_id = str(video_id)
        data_path = data_path.replace("<video_id>", video_id)

    return data_path

get_depth_frames(visit_id, video_id, data_asset_identifier='hires_depth')

Retrieve the paths to the depth frames for a given scene and video sequence.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required
data_asset_identifier str

The data asset type for the depth frames. Can be either "hires_depth" or "lowres_depth". Defaults to "hires_depth".

'hires_depth'

Returns:

Type Description
dict

A dictionary mapping frame timestamps to their corresponding file paths.

Raises:

Type Description
ValueError

If an unsupported data asset identifier is provided.

FileNotFoundError

If no depth frames are found at the specified path.

Source code in scenefun3d/utils/data_parser.py
def get_depth_frames(self, visit_id, video_id, data_asset_identifier="hires_depth"):
    """
    Retrieve the paths to the depth frames for a given scene and video sequence.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.
        data_asset_identifier (str, optional): The data asset type for the depth frames.
                                               Can be either "hires_depth" or "lowres_depth". 
                                               Defaults to "hires_depth".

    Returns:
        (dict): A dictionary mapping frame timestamps to their corresponding file paths.

    Raises:
        ValueError: If an unsupported data asset identifier is provided.
        FileNotFoundError: If no depth frames are found at the specified path.
    """
    frame_mapping = {}
    if data_asset_identifier == "hires_depth":
        depth_frames_path = self.get_data_asset_path(data_asset_identifier="hires_depth", visit_id=visit_id, video_id=video_id)

    elif data_asset_identifier == "lowres_depth":
        depth_frames_path = self.get_data_asset_path(data_asset_identifier="lowres_depth", visit_id=visit_id, video_id=video_id)

    else: 
        raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for depth frames")

    frames = sorted(glob.glob(os.path.join(depth_frames_path, "*.png")))
    if not frames:
        raise FileNotFoundError(f"No depth frames found in {depth_frames_path}")
    frame_timestamps = [os.path.basename(x).split(".png")[0].split("_")[1] for x in frames]

     # Create mapping from timestamp to full path
    frame_mapping = {timestamp: frame for timestamp, frame in zip(frame_timestamps, frames)}

    return frame_mapping

get_descriptions(visit_id)

Retrieve the natural language task descriptions for a specified scene.

Parameters:

Name Type Description Default
visit_id str or int

The identifier for the scene.

required

Returns:

Type Description
list

A list of descriptions, each represented as a dictionary.

Source code in scenefun3d/utils/data_parser.py
def get_descriptions(self, visit_id):
    """
    Retrieve the natural language task descriptions for a specified scene.

    Args:
        visit_id (str or int): The identifier for the scene.

    Returns:
        (list): A list of descriptions, each represented as a dictionary.
    """
    descriptions_path = self.get_data_asset_path(data_asset_identifier="descriptions", visit_id=visit_id)

    with open(descriptions_path, 'r') as f:
        descriptions_data = json.load(f)["descriptions"]

    return descriptions_data

get_interpolated_pose(desired_timestamp, poses_from_traj, time_distance_threshold=np.inf, interpolation_method='split', frame_distance_threshold=np.inf)

Get the interpolated pose for a desired timestamp from a dictionary of poses.

Parameters:

Name Type Description Default
desired_timestamp str

The timestamp of the desired pose.

required
poses_from_traj dict

A dictionary where keys are timestamps (as strings) and values are 4x4 transformation matrices representing poses.

required
time_distance_threshold float

The maximum allowable time difference between the desired timestamp and the nearest pose timestamps. Defaults to np.inf.

inf
interpolation_method str

Method used for interpolation. Defaults to 'split'. - "split": Performs rigid body motion interpolation in SO(3) x R^3. - "geodesic_path": Performs rigid body motion interpolation in SE(3).

'split'
frame_distance_threshold float

Maximum allowable frame distance between two consecutive poses. Defaults to np.inf.

inf

Returns:

Type Description
Union[ndarray, None]

The interpolated pose as a 4x4 transformation matrix, or None if not found within thresholds.

Raises:

Type Description
ValueError

If an unsupported interpolation method is specified.

Note

This function uses interpolation between two nearest poses if desired_timestamp is not directly available. The interpolation method can be either "split" (for rigid body interpolation in SO(3) x R^3) or "geodesic_path" (for SE(3)). If the difference between the timestamps or poses is beyond the specified thresholds, the function will return None.

Source code in scenefun3d/utils/data_parser.py
def get_interpolated_pose(self, 
                            desired_timestamp,
                            poses_from_traj, 
                            time_distance_threshold = np.inf,
                            interpolation_method = 'split',
                            frame_distance_threshold = np.inf):
    """
    Get the interpolated pose for a desired timestamp from a dictionary of poses.

    Args:
        desired_timestamp (str): The timestamp of the desired pose.
        poses_from_traj (dict): A dictionary where keys are timestamps (as strings) 
                                and values are 4x4 transformation matrices representing poses.
        time_distance_threshold (float, optional): The maximum allowable time difference 
                                                between the desired timestamp and the nearest pose timestamps. Defaults to np.inf.
        interpolation_method (str, optional): Method used for interpolation. Defaults to 'split'.
            - "split": Performs rigid body motion interpolation in SO(3) x R^3.
            - "geodesic_path": Performs rigid body motion interpolation in SE(3).
        frame_distance_threshold (float, optional): Maximum allowable frame distance between two consecutive poses. Defaults to np.inf.

    Returns:
        (Union[numpy.ndarray, None]): The interpolated pose as a 4x4 transformation matrix, or None if not found within thresholds.

    Raises:
        ValueError: If an unsupported interpolation method is specified.

    Note:
        This function uses interpolation between two nearest poses if `desired_timestamp` is not directly available.
        The interpolation method can be either "split" (for rigid body interpolation in SO(3) x R^3) or "geodesic_path" (for SE(3)).
        If the difference between the timestamps or poses is beyond the specified thresholds, the function will return None.
    """

    max_pose_timestamp = max(float(key) for key in poses_from_traj.keys())
    min_pose_timestamp = min(float(key) for key in poses_from_traj.keys()) 

    if float(desired_timestamp) < min_pose_timestamp or \
        float(desired_timestamp) > max_pose_timestamp:
        return None

    if desired_timestamp in poses_from_traj.keys():
        H = poses_from_traj[desired_timestamp]
    else:
        greater_closest_timestamp = min(
            [x for x in poses_from_traj.keys() if float(x) > float(desired_timestamp) ], 
            key=lambda x: abs(float(x) - float(desired_timestamp))
        )
        smaller_closest_timestamp = min(
            [x for x in poses_from_traj.keys() if float(x) < float(desired_timestamp) ], 
            key=lambda x: abs(float(x) - float(desired_timestamp))
        )

        if abs(float(greater_closest_timestamp) - float(desired_timestamp)) > time_distance_threshold or \
            abs(float(smaller_closest_timestamp) - float(desired_timestamp)) > time_distance_threshold:
            return None

        H0 = poses_from_traj[smaller_closest_timestamp]
        H1 = poses_from_traj[greater_closest_timestamp]
        H0_t = hm.trans(H0)
        H1_t = hm.trans(H1)

        if np.linalg.norm(H0_t - H1_t) > frame_distance_threshold:
            return None

        if interpolation_method == "split":
            H = rigid_interp_split(
                float(desired_timestamp), 
                poses_from_traj[smaller_closest_timestamp], 
                float(smaller_closest_timestamp), 
                poses_from_traj[greater_closest_timestamp], 
                float(greater_closest_timestamp)
            )
        elif interpolation_method == "geodesic_path":
            H = rigid_interp_geodesic(
                float(desired_timestamp), 
                poses_from_traj[smaller_closest_timestamp], 
                float(smaller_closest_timestamp), 
                poses_from_traj[greater_closest_timestamp], 
                float(greater_closest_timestamp)
            )
        else:
            raise ValueError(f"Unknown interpolation method {interpolation_method}")

    desired_pose = H

    assert desired_pose.shape == (4, 4)

    return desired_pose

get_laser_scan(visit_id)

Load a point cloud from a .ply file containing laser scan data.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required

Returns:

Type Description
PointCloud

A point cloud object containing the laser scan data (i.e., XYZRGB point cloud).

Source code in scenefun3d/utils/data_parser.py
def get_laser_scan(self, visit_id):
    """
    Load a point cloud from a .ply file containing laser scan data.

    Args:
        visit_id (str): The identifier of the scene.

    Returns:
        (open3d.geometry.PointCloud): A point cloud object containing the laser scan data (i.e., XYZRGB point cloud).
    """
    laser_scan_path = self.get_data_asset_path(data_asset_identifier="laser_scan_5mm", visit_id=visit_id)

    pcd = o3d.io.read_point_cloud(laser_scan_path)

    return pcd

get_motions(visit_id)

Retrieve the motion annotations for a specified scene.

Parameters:

Name Type Description Default
visit_id str or int

The identifier for the scene.

required

Returns:

Type Description
list

A list of motions, each represented as a dictionary.

Source code in scenefun3d/utils/data_parser.py
def get_motions(self, visit_id):
    """
    Retrieve the motion annotations for a specified scene.

    Args:
        visit_id (str or int): The identifier for the scene.

    Returns:
        (list): A list of motions, each represented as a dictionary.
    """
    motions_path = self.get_data_asset_path(data_asset_identifier="motions", visit_id=visit_id)

    with open(motions_path, 'r') as f:
        motions_data = json.load(f)["motions"]

    return motions_data

get_nearest_pose(desired_timestamp, poses_from_traj, time_distance_threshold=np.inf)

Get the nearest pose to a desired timestamp from a dictionary of poses.

Parameters:

Name Type Description Default
desired_timestamp str

The timestamp of the desired pose.

required
poses_from_traj dict

A dictionary where keys are timestamps (as strings) and values are 4x4 transformation matrices representing poses.

required
time_distance_threshold float

The maximum allowable time difference between the desired timestamp and the nearest pose timestamp. Defaults to np.inf.

inf

Returns:

Type Description
Union[ndarray, None]

The nearest pose as a 4x4 transformation matrix if found within the specified threshold, else None.

Note

The function will return the pose closest to the desired timestamp if it exists in the provided poses. If the closest pose is further away than the specified time_distance_threshold, the function returns None.

Source code in scenefun3d/utils/data_parser.py
def get_nearest_pose(self, 
                        desired_timestamp,
                        poses_from_traj, 
                        time_distance_threshold = np.inf):
    """
    Get the nearest pose to a desired timestamp from a dictionary of poses.

    Args:
        desired_timestamp (str): The timestamp of the desired pose.
        poses_from_traj (dict): A dictionary where keys are timestamps (as strings) 
                                and values are 4x4 transformation matrices representing poses.
        time_distance_threshold (float, optional): The maximum allowable time difference 
                                                between the desired timestamp and the nearest pose timestamp. Defaults to np.inf.

    Returns:
        (Union[numpy.ndarray, None]): The nearest pose as a 4x4 transformation matrix if found within the specified threshold, else None.

    Note:
        The function will return the pose closest to the desired timestamp if it exists in the provided poses.
        If the closest pose is further away than the specified `time_distance_threshold`, the function returns `None`.
    """
    max_pose_timestamp = max(float(key) for key in poses_from_traj.keys())
    min_pose_timestamp = min(float(key) for key in poses_from_traj.keys()) 

    if float(desired_timestamp) < min_pose_timestamp or \
        float(desired_timestamp) > max_pose_timestamp:
        return None

    if desired_timestamp in poses_from_traj.keys():
        H = poses_from_traj[desired_timestamp]
    else:
        closest_timestamp = min(
            poses_from_traj.keys(), 
            key=lambda x: abs(float(x) - float(desired_timestamp))
        )

        if abs(float(closest_timestamp) - float(desired_timestamp)) > time_distance_threshold:
            return None

        H = poses_from_traj[closest_timestamp]

    desired_pose = H

    assert desired_pose.shape == (4, 4)

    return desired_pose

get_rgb_frames(visit_id, video_id, data_asset_identifier='hires_wide')

Retrieve the paths to the RGB frames for a given scene and video sequence.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required
data_asset_identifier str

The data asset type for the RGB frames. Can be either "hires_wide" or "lowres_wide". Defaults to "hires_wide".

'hires_wide'

Returns:

Type Description
dict

A dictionary mapping frame timestamps to their corresponding file paths.

Raises:

Type Description
ValueError

If an unsupported data asset identifier is provided.

FileNotFoundError

If no frames are found at the specified path.

Source code in scenefun3d/utils/data_parser.py
def get_rgb_frames(self, visit_id, video_id, data_asset_identifier="hires_wide"):
    """
    Retrieve the paths to the RGB frames for a given scene and video sequence.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.
        data_asset_identifier (str, optional): The data asset type for the RGB frames.
                                               Can be either "hires_wide" or "lowres_wide". 
                                               Defaults to "hires_wide".

    Returns:
        (dict): A dictionary mapping frame timestamps to their corresponding file paths.

    Raises:
        ValueError: If an unsupported data asset identifier is provided.
        FileNotFoundError: If no frames are found at the specified path.
    """
    frame_mapping = {}
    if data_asset_identifier == "hires_wide":
        rgb_frames_path = self.get_data_asset_path(data_asset_identifier="hires_wide", visit_id=visit_id, video_id=video_id)

        frames = sorted(glob.glob(os.path.join(rgb_frames_path, "*.jpg")))
        if not frames:
            raise FileNotFoundError(f"No RGB frames found in {rgb_frames_path}")
        frame_timestamps = [os.path.basename(x).split(".jpg")[0].split("_")[1] for x in frames]

    elif data_asset_identifier == "lowres_wide":
        rgb_frames_path = self.get_data_asset_path(data_asset_identifier="lowres_wide", visit_id=visit_id, video_id=video_id)

        frames = sorted(glob.glob(os.path.join(rgb_frames_path, "*.png")))
        if not frames:
            raise FileNotFoundError(f"No RGB frames found in {rgb_frames_path}")
        frame_timestamps = [os.path.basename(x).split(".png")[0].split("_")[1] for x in frames]
    else: 
        raise ValueError(f"Unknown data_asset_identifier {data_asset_identifier} for RGB frames")

    # Create mapping from timestamp to full path
    frame_mapping = {timestamp: frame for timestamp, frame in zip(frame_timestamps, frames)}

    return frame_mapping

get_transform(visit_id, video_id)

Load the transformation matrix from a .npy file. This transformation matrix converts coordinates from the Faro laser scan coordinate system to the ARKit coodinate system.

Parameters:

Name Type Description Default
visit_id str

The identifier of the scene.

required
video_id str

The identifier of the video sequence.

required

Returns:

Type Description
ndarray

The estimated transformation matrix loaded from the file.

Source code in scenefun3d/utils/data_parser.py
def get_transform(self, visit_id, video_id):
    """
    Load the transformation matrix from a .npy file. This transformation matrix converts coordinates from the Faro laser scan coordinate system to the ARKit coodinate system.

    Args:
        visit_id (str): The identifier of the scene.
        video_id (str): The identifier of the video sequence.

    Returns:
        (numpy.ndarray): The estimated transformation matrix loaded from the file.
    """
    transform_path = self.get_data_asset_path(data_asset_identifier="transform", visit_id=visit_id, video_id=video_id)
    transform = np.load(transform_path) 
    return transform

read_camera_intrinsics(intrinsics_file_path, format='tuple')

Parses a file containing camera intrinsic parameters and returns them in the specified format.

Parameters:

Name Type Description Default
intrinsics_file_path str

The path to the file containing camera intrinsic parameters.

required
format str

The format in which to return the camera intrinsic parameters. Supported formats are "tuple" and "matrix". Defaults to "tuple".

'tuple'

Returns:

Type Description
Union[tuple, ndarray]

Camera intrinsic parameters in the specified format.

  • If format is "tuple", returns a tuple \(w, h, fx, fy, hw, hh\).
  • If format is "matrix", returns a 3x3 numpy array representing the camera matrix.

Raises:

Type Description
ValueError

If an unsupported format is specified.

Source code in scenefun3d/utils/data_parser.py
def read_camera_intrinsics(self, intrinsics_file_path, format="tuple"):
    """
    Parses a file containing camera intrinsic parameters and returns them in the specified format.

    Args:
        intrinsics_file_path (str): The path to the file containing camera intrinsic parameters.
        format (str, optional): The format in which to return the camera intrinsic parameters.
                                Supported formats are "tuple" and "matrix". Defaults to "tuple".

    Returns:
        (Union[tuple, numpy.ndarray]): Camera intrinsic parameters in the specified format.

            - If format is "tuple", returns a tuple \\(w, h, fx, fy, hw, hh\\).
            - If format is "matrix", returns a 3x3 numpy array representing the camera matrix.

    Raises:
        ValueError: If an unsupported format is specified.
    """
    w, h, fx, fy, hw, hh = np.loadtxt(intrinsics_file_path)

    if format == "tuple":
        return (w, h, fx, fy, hw, hh)
    elif format == "matrix":
        return np.asarray([[fx, 0, hw], [0, fy, hh], [0, 0, 1]])
    else:
        raise ValueError(f"Unknown format {format}")

read_depth_frame(depth_frame_path, conversion_factor=1000)

Read a depth frame from the specified path and convert it to depth values.

Parameters:

Name Type Description Default
depth_frame_path str

The full path to the depth frame file.

required
conversion_factor float

The conversion factor to convert pixel values to depth values. Defaults to 1000 to convert millimeters to meters.

1000

Returns:

Type Description
ndarray

The depth frame as a NumPy array with the depth values.

Source code in scenefun3d/utils/data_parser.py
def read_depth_frame(self, depth_frame_path, conversion_factor=1000):
    """
    Read a depth frame from the specified path and convert it to depth values.

    Args:
        depth_frame_path (str): The full path to the depth frame file.
        conversion_factor (float, optional): The conversion factor to convert pixel values to depth values. Defaults to 1000 to convert millimeters to meters.

    Returns:
        (numpy.ndarray): The depth frame as a NumPy array with the depth values.
    """

    depth = imageio.v2.imread(depth_frame_path) / conversion_factor

    return depth

read_rgb_frame(rgb_frame_path, normalize=False)

Read an RGB frame from the specified path.

Parameters:

Name Type Description Default
rgb_frame_path str

The full path to the RGB frame file.

required
normalize bool

Whether to normalize the pixel values to the range [0, 1]. Defaults to False.

False

Returns:

Type Description
ndarray

The RGB frame as a NumPy array with the RGB color values.

Source code in scenefun3d/utils/data_parser.py
def read_rgb_frame(self, rgb_frame_path, normalize=False):
    """
    Read an RGB frame from the specified path.

    Args:
        rgb_frame_path (str): The full path to the RGB frame file.
        normalize (bool, optional): Whether to normalize the pixel values to the range [0, 1]. Defaults to False.

    Returns:
        (numpy.ndarray): The RGB frame as a NumPy array with the RGB color values.

    """
    color = imageio.v2.imread(rgb_frame_path)

    if normalize:
        color = color / 255.

    return color

convert_angle_axis_to_matrix3(angle_axis)

Converts a rotation from angle-axis representation to a 3x3 rotation matrix.

Parameters:

Name Type Description Default
angle_axis ndarray

A 3-element array representing the rotation in angle-axis form.

required

Returns:

Type Description
ndarray

A 3x3 rotation matrix representing the same rotation as the input angle-axis.

Raises:

Type Description
ValueError

If the input is not a valid 3-element numpy array.

Source code in scenefun3d/utils/data_parser.py
def convert_angle_axis_to_matrix3(angle_axis):
    """
    Converts a rotation from angle-axis representation to a 3x3 rotation matrix.

    Args:
        angle_axis (numpy.ndarray): A 3-element array representing the rotation in angle-axis form.

    Returns:
        (numpy.ndarray): A 3x3 rotation matrix representing the same rotation as the input angle-axis.

    Raises:
        ValueError: If the input is not a valid 3-element numpy array.
    """
    # Check if input is a numpy array
    if not isinstance(angle_axis, np.ndarray):
        raise ValueError("Input must be a numpy array.")

    # Check if the input is of shape (3,)
    if angle_axis.shape != (3,):
        raise ValueError("Input must be a 3-element array representing the rotation in angle-axis representation.")

    matrix, jacobian = cv2.Rodrigues(angle_axis)
    return matrix

convert_matrix3_to_angle_axis(matrix)

Converts a 3x3 rotation matrix to angle-axis representation (rotation vector).

Parameters:

Name Type Description Default
matrix ndarray

A 3x3 rotation matrix representing the rotation.

required

Returns:

Type Description
ndarray

A 3-element array representing the rotation in angle-axis form

Raises:

Type Description
ValueError

If the input is not a valid 3x3 numpy array.

Source code in scenefun3d/utils/data_parser.py
def convert_matrix3_to_angle_axis(matrix):
    """
    Converts a 3x3 rotation matrix to angle-axis representation (rotation vector).

    Args:
        matrix (numpy.ndarray): A 3x3 rotation matrix representing the rotation.

    Returns:
        (numpy.ndarray): A 3-element array representing the rotation in angle-axis form

    Raises:
        ValueError: If the input is not a valid 3x3 numpy array.
    """
    # Check if input is a numpy array
    if not isinstance(matrix, np.ndarray):
        raise ValueError("Input must be a numpy array.")

    # Check if the input is of shape (3, 3)
    if matrix.shape != (3, 3):
        raise ValueError("Input must be a 3x3 matrix representing the rotation.")

    # Convert the 3x3 rotation matrix to an angle-axis (rotation vector)
    angle_axis, jacobian = cv2.Rodrigues(matrix)

    return angle_axis.flatten()  # Return as a 1D array (rotation vector)

decide_pose(pose)

Determines the orientation of a 3D pose based on the alignment of its z-vector with predefined orientations.

Parameters:

Name Type Description Default
pose ndarray

A 4x4 NumPy array representing a 3D pose transformation matrix.

required

Returns:

Type Description
int

Index representing the closest predefined orientation: 0 for upright, 1 for left, 2 for upside-down, and 3 for right.

Source code in scenefun3d/utils/data_parser.py
def decide_pose(pose):
    """
    Determines the orientation of a 3D pose based on the alignment of its z-vector with predefined orientations.

    Args:
        pose (np.ndarray): A 4x4 NumPy array representing a 3D pose transformation matrix.

    Returns:
        (int): Index representing the closest predefined orientation:
             0 for upright, 1 for left, 2 for upside-down, and 3 for right.
    """

    # pose style
    z_vec = pose[2, :3]
    z_orien = np.array(
        [
            [0.0, -1.0, 0.0], # upright
            [-1.0, 0.0, 0.0], # left
            [0.0, 1.0, 0.0], # upside-down
            [1.0, 0.0, 0.0], # right
        ]  
    )
    corr = np.matmul(z_orien, z_vec)
    corr_max = np.argmax(corr)
    return corr_max

rotate_pose(im, rot_index)

Rotates an image by a specified angle based on the rotation index.

Parameters:

Name Type Description Default
im ndarray

The input image to be rotated. It should have shape (height, width, channels).

required
rot_index int

Index representing the rotation angle: 0 for no rotation, 1 for 90 degrees clockwise rotation, 2 for 180 degrees rotation, and 3 for 90 degrees counterclockwise rotation.

required

Returns:

Type Description
ndarray

The rotated image.

Source code in scenefun3d/utils/data_parser.py
def rotate_pose(im, rot_index):
    """
    Rotates an image by a specified angle based on the rotation index.

    Args:
        im (numpy.ndarray): The input image to be rotated. It should have shape (height, width, channels).
        rot_index (int): Index representing the rotation angle:
                         0 for no rotation, 1 for 90 degrees clockwise rotation,
                         2 for 180 degrees rotation, and 3 for 90 degrees counterclockwise rotation.

    Returns:
        (numpy.ndarray): The rotated image.
    """
    h, w, d = im.shape
    if d == 3:
        if rot_index == 0:
            new_im = im
        elif rot_index == 1:
            new_im = cv2.rotate(im, cv2.ROTATE_90_CLOCKWISE)
        elif rot_index == 2:
            new_im = cv2.rotate(im, cv2.ROTATE_180)
        elif rot_index == 3:
            new_im = cv2.rotate(im, cv2.ROTATE_90_COUNTERCLOCKWISE)
    return new_im