Source code for behavenet.data.preprocess

"""Utility functions for automatically constructing hdf5 files."""

import cv2
import h5py
from numpy import genfromtxt
import numpy as np
import os

# to ignore imports for sphinx-autoapidoc
__all__ = ['build_hdf5', 'load_raw_labels', 'resize_labels', 'get_frames_from_idxs']


[docs]def build_hdf5( save_file, video_file, label_file=None, pose_algo=None, batch_size=128, xpix=None, ypix=None, label_likelihood_thresh=0.9, zscore=True): """Build Behavenet-style HDF5 file from video file and optional label file. This function provides a basic example for how to convert raw video and label files into the processed version required by Behavenet. In doing so no additional assumptions are made about a possible trial structure; equally-sized batches are created. For more complex data, users will need to adapt this function to suit their own needs. Parameters ---------- save_file : :obj:`str` absolute file path of new HDF5 file; the directory does not need to be created beforehand video_file : :obj:`str` absolute file path of the video (.mp4, .avi) label_file : :obj:`str`, optional absolute file path of the labels; current formats include DLC/DGP csv or h5 files pose_algo : :obj:`str`, optional 'dlc' | 'dgp' batch_size : :obj:`int`, optional uniform batch size of data xpix : :obj:`int`, optional if not None, video frames will be reshaped before storing in the HDF5 ypix : :obj:`int`, optional if not None, video frames will be reshaped before storing in the HDF5 label_likelihood_thresh : :obj:`float`, optional likelihood threshold used to define masks; any labels/timepoints with a likelihood below this value will be set to NaN zscore : :obj:`bool`, optional individually z-score each label before saving in the HDF5 """ # load video capture video_cap = cv2.VideoCapture(video_file) n_total_frames = video_cap.get(cv2.CAP_PROP_FRAME_COUNT) xpix_og = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) ypix_og = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # load labels if label_file is not None: labels, masks = load_raw_labels( label_file, pose_algo=pose_algo, likelihood_thresh=label_likelihood_thresh) # error check n_total_labels = labels.shape[0] assert n_total_frames == n_total_labels, 'Number of frames does not match number of labels' else: labels = None n_trials = int(np.ceil(n_total_frames / batch_size)) trials = np.arange(n_trials) timestamps = np.arange(n_total_frames) # compute z-score params if label_file is not None and zscore: means = np.nanmean(labels, axis=0) stds = np.nanstd(labels, axis=0) else: means = None stds = None # create directory for hdf5 if it doesn't already exist if not os.path.exists(os.path.dirname(save_file)): os.makedirs(os.path.dirname(save_file)) with h5py.File(save_file, 'w', libver='latest', swmr=True) as f: # single write multi-read f.swmr_mode = True # create image group group_i = f.create_group('images') if label_file is not None: # create labels group (z-scored) group_l = f.create_group('labels') # create label mask group group_m = f.create_group('labels_masks') # create labels group (not z-scored, but downsampled if necessary) group_ls = f.create_group('labels_sc') # create a dataset for each trial within groups for tr_idx, trial in enumerate(trials): # find video timestamps during this trial trial_beg = trial * batch_size trial_end = (trial + 1) * batch_size ts_idxs = np.where((timestamps >= trial_beg) & (timestamps < trial_end))[0] # ---------------------------------------------------------------------------- # image data # ---------------------------------------------------------------------------- # collect from video capture, downsample frames_tmp = get_frames_from_idxs(video_cap, ts_idxs) if xpix is not None and ypix is not None: # Nones to add batch/channel dims frames_tmp = [cv2.resize(f[0], (xpix, ypix))[None, None, ...] for f in frames_tmp] else: frames_tmp = [f[None, ...] for f in frames_tmp] group_i.create_dataset( 'trial_%04i' % tr_idx, data=np.vstack(frames_tmp), dtype='uint8') # ---------------------------------------------------------------------------- # label data # ---------------------------------------------------------------------------- if label_file is not None: # label masks group_m.create_dataset('trial_%04i' % tr_idx, data=masks[ts_idxs], dtype='float32') # label data (zscored, masked) labels_tmp = (labels[ts_idxs] - means) / stds labels_tmp[masks[ts_idxs] == 0] = 0 # pytorch doesn't play well with nans assert ~np.any(np.isnan(labels_tmp)) group_l.create_dataset('trial_%04i' % tr_idx, data=labels_tmp, dtype='float32') # label data (non-zscored, masked) labels_tmp = labels[ts_idxs] labels_tmp = resize_labels(labels_tmp, xpix, ypix, xpix_og, ypix_og) labels_tmp[masks[ts_idxs] == 0] = 0 group_ls.create_dataset('trial_%04i' % tr_idx, data=labels_tmp, dtype='float32')
[docs]def load_raw_labels(file_path, pose_algo, likelihood_thresh=0.9): """Load labels and build masks from a variety of standardized source files. This function currently supports the loading of csv and h5 files output by DeepLabCut (DLC) and Deep Graph Pose (DGP). Parameters ---------- file_path : :obj:`str` absolute file path of label file pose_algo : :obj:`str` 'dlc' | 'dgp' likelihood_thresh : :obj:`float` likelihood threshold used to define masks; any labels/timepoints with a likelihood below this value will be set to NaN and the corresponding masks file with have a 0 Returns ------- :obj:`tuple` - (array-like): labels, all x-values first, then all y-values - (array-like): masks; 1s correspond to good values, 0s correspond to bad values """ if pose_algo == 'dlc' or pose_algo == 'dgp': file_ext = file_path.split('.')[-1] if file_ext == 'csv': labels_tmp = genfromtxt(file_path, delimiter=',', dtype=None, encoding=None) labels_tmp = labels_tmp[3:, 1:].astype('float') # get rid of headers, etc. elif file_ext == 'h5': with h5py.File(file_path, 'r') as f: t = f['df_with_missing']['table'][()] labels_tmp = np.concatenate([t[i][1][None, :] for i in range(len(t))]) else: raise NotImplementedError( '"%s" is an unsupported file extentsion for %s' % (file_ext, pose_algo)) xvals = labels_tmp[:, 0::3] yvals = labels_tmp[:, 1::3] likes = labels_tmp[:, 2::3] labels = np.hstack([xvals, yvals]) likes = np.hstack([likes, likes]) masks = 1.0 * (likes >= likelihood_thresh) labels[masks != 1] = np.nan elif pose_algo == 'dpk': raise NotImplementedError elif pose_algo == 'leap': raise NotImplementedError else: raise NotImplementedError('the pose algorithm "%s" is currently unsupported' % pose_algo) return labels, masks
[docs]def resize_labels(labels, xpix_new, ypix_new, xpix_old, ypix_old): """Update label values to reflect scale of corresponding images. Parameters ---------- labels : :obj:`array-like` np.ndarray of shape (n_time, 2 * n_labels); for a given row, all x-values come first, followed by all y-values xpix_new : :obj:`int` xpix of new images ypix_new : :obj:`int` ypix of new images xpix_old : :obj:`int` xpix of original images ypix_old : :obj:`int` ypix of original images Returns ------- array-like resized label values """ if xpix_new is None or ypix_new is None: return labels else: n_labels = labels.shape[1] // 2 old = np.array([xpix_old] * n_labels + [ypix_old] * n_labels) new = np.array([xpix_new] * n_labels + [ypix_new] * n_labels) labels_scale = (labels / old) * new return labels_scale
[docs]def get_frames_from_idxs(cap, idxs): """Helper function to load video segments. Parameters ---------- cap : :obj:`cv2.VideoCapture` object idxs : :obj:`array-like` frame indices into video Returns ------- obj:`array-like` returned frames of shape shape (n_frames, y_pix, x_pix) """ is_contiguous = np.sum(np.diff(idxs)) == (len(idxs) - 1) n_frames = len(idxs) for fr, i in enumerate(idxs): if fr == 0 or not is_contiguous: cap.set(1, i) ret, frame = cap.read() if ret: if fr == 0: height, width, _ = frame.shape frames = np.zeros((n_frames, 1, height, width), dtype='uint8') frames[fr, 0, :, :] = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: print( 'warning! reached end of video; returning blank frames for remainder of ' + 'requested indices') break return frames