# -*- coding: utf-8 -*-
import os
import numpy as np
import pandas as pd
import skimage.io as io
from skimage.util import img_as_uint
from skimage.util import img_as_ubyte
[docs]def relabel_trackID(label_table):
"""Relabel trackID in tracking table, starting from 1.
Args:
label_table (pandas.DataFrame): tracked object table.
Returns:
pandas.DataFrame: tracked object table with relabeled trackID.
"""
dic = {}
ori = list(np.unique(label_table['trackId']))
for i in range(1, len(ori) + 1):
dic[ori[i - 1]] = i
dic[0] = 0
for i in range(label_table.shape[0]):
label_table.loc[i, 'trackId'] = dic[label_table['trackId'][i]]
label_table.loc[i, 'parentTrackId'] = dic[label_table['parentTrackId'][i]]
label_table.loc[i, 'lineageId'] = dic[label_table['lineageId'][i]]
return label_table
[docs]def label_by_track(mask, label_table):
"""Label objects in mask with track ID
Args:
mask (numpy.ndarray): uint8 np array, output from main model.
label_table (pandas.DataFrame): track table.
Returns:
numpy.ndarray: uint8/16 dtype based on track count.
"""
assert mask.shape[0] == np.max(label_table['frame'] + 1)
if np.max(label_table['trackId']) * 2 > 254:
mask = mask.astype('uint16')
for i in np.unique(label_table['frame']):
sub_table = label_table[label_table['frame'] == i]
sl = mask[i, :, :].copy()
lbs = np.unique(sl).tolist()
'''
if lbs[-1] + 1 != len(lbs):
raise ValueError('Mask is not continuously or wrongly labeled.')
'''
ori_labels = set(lbs) - {0}
untracked = list(ori_labels - set(list(sub_table['continuous_label'])))
# remove untracked
for j in untracked:
sl[mask[i, :, :] == j] = 0
# update tracked
for j in sub_table.index:
sl[mask[i, :, :] == sub_table.loc[j, 'continuous_label']] = sub_table.loc[j, 'trackId']
mask[i, :, :] = sl.copy()
return mask
[docs]def get_lineage_txt(label_table):
"""Generate txt table in Cell Tracking Challenge (CTC) format.
Args:
label_table (pandas.DataFrame): table processed, should not has gaped tracks.
Returns:
pandas.DataFrame: lineage table in .txt format that fits CTC.
"""
dic = {'id': [], 'appear': [], 'disappear': [], 'parent': []}
for i in np.unique(label_table['trackId']):
sub = label_table[label_table['trackId'] == i]
begin = np.min(sub['frame'])
end = np.max(sub['frame'])
parent = np.unique(sub['parentTrackId'])
dic['id'].append(i)
dic['appear'].append(int(begin))
dic['disappear'].append(int(end))
dic['parent'].append(int(parent))
return pd.DataFrame(dic)
[docs]def break_track(label_table):
"""Break tracks in a lineage table into single tracks, where
NO gaped tracks allowed. All gaps will be transferred into parent-daughter
relationship.
Args:
label_table (pandas.DataFrame): tracked object table to process.
Algorithm:
1. Rename raw parentTrackId to mtParTrk.
2. Initiate new parentTrackId column with 0.
3. Separate all tracks individually.
Notes:
In original lineage table, single track can be gaped, lineage only associates
mitosis tracks, not gaped tracks.
Returns:
pandas.DataFrame: processed tracked object table.
"""
# For parent track that has one daughter extrude into the parent frame,
# e.g. parent: t1-10; daughter1: t8-20; daughter2: t11-20.
# re-organize the track by trimming parent and add to daughter,
# i.e. parent: t1-7; daughter1: t8-20; daughter2: t8-10, t11-20
# If both daughter extrude, e.g. daughter2: t9-20, then trim parent directly
# to t1-8. Since this indicates faulty track, warning shown
# *** this should NOT usually happen
for l in np.unique(label_table['trackId']):
daugs = np.unique(label_table[label_table['parentTrackId'] == l]['trackId'])
if len(daugs) == 2:
daug1 = label_table[label_table['trackId'] == daugs[0]]['frame'].iloc[0]
daug2 = label_table[label_table['trackId'] == daugs[1]]['frame'].iloc[0]
par = label_table[label_table['trackId'] == l]
par_frame = par['frame'].iloc[-1]
if par_frame >= daug1 and par_frame >= daug2:
label_table.drop(par[(par['frame'] >= daug1) | (par['frame'] >= daug2)].index, inplace=True)
raise UserWarning('Faluty mitosis, check parent: ' + str(l) +
', daughters: ' + str(daugs[0]) + '/' + str(daugs[1]))
elif par_frame >= daug1:
# migrate par to daug2
label_table.loc[par[par['frame'] >= daug1].index, 'trackId'] = daugs[1]
label_table.loc[par[par['frame'] >= daug1].index, 'parentTrackId'] = l
elif par_frame >= daug2:
# migrate par to daug1
label_table.loc[par[par['frame'] >= daug2].index, 'trackId'] = daugs[0]
label_table.loc[par[par['frame'] >= daug2].index, 'parentTrackId'] = l
label_table = label_table.sort_values(by=['trackId', 'frame'])
# break tracks individually
max_trackId = np.max(label_table['trackId'])
label_table['mtParTrk'] = label_table['parentTrackId']
label_table['parentTrackId'] = 0
label_table['ori_trackId'] = label_table['trackId']
new_table = pd.DataFrame()
for l in np.unique(label_table['trackId']):
tr = label_table[label_table['trackId'] == l].copy()
if np.max(tr['frame']) - np.min(tr['frame']) + 1 != tr.shape[0]:
sep, max_trackId = separate(list(tr['frame']).copy(), list(tr['mtParTrk']).copy(), l, base=max_trackId)
tr.loc[:, 'frame'] = sep['frame']
tr.loc[:, 'trackId'] = sep['trackId']
tr.loc[:, 'parentTrackId'] = sep['parentTrackId']
tr.loc[:, 'mtParTrk'] = sep['mtParTrk']
new_table = new_table.append(tr)
# For tracks that have mitosis parents, find new ID of their parents
for l in np.unique(new_table['trackId']):
tr = new_table[new_table['trackId'] == l].copy()
ori_par = list(tr['mtParTrk'])[0]
if ori_par != 0:
app = np.min(tr['frame'])
search = new_table[new_table['ori_trackId'] == ori_par]
new_par = search.iloc[np.argmin(abs(search['frame'] - app))]['trackId']
new_table.loc[tr.index, 'mtParTrk'] = new_par
for i in range(new_table.shape[0]):
new_table.loc[i, 'parentTrackId'] = np.max(
((new_table['parentTrackId'][i]), new_table['mtParTrk'][i])) # merge mitosis information in to parent
return new_table
[docs]def separate(frame_list, mtPar_list, ori_id, base):
"""For single gaped track, separate it into all complete tracks.
Args:
frame_list (list): frames list, length equals to label table.
mtPar_list (list): mitosis parent list, for solving mitosis relationship.
ori_id (int): original track ID.
base (int): base track ID, will assign new track ID sequentially from base + 1.
Returns:
dict: Dictionary of having following keys: frame, trackId, parentTrackId, mtParTrk.
"""
trackId = [ori_id for _ in range(len(frame_list))]
parentTrackId = [0 for _ in range(len(frame_list))]
for i in range(1, len(frame_list)):
if frame_list[i] - frame_list[i - 1] != 1:
trackId[i:] = [base + 1 for s in range(i, len(trackId))]
parentTrackId[i:] = [trackId[i - 1] for s in range(i, len(trackId))]
mtPar_list[i:] = [0 for s in range(i, len(trackId))]
base += 1
rt = {'frame': frame_list, 'trackId': trackId, 'parentTrackId': parentTrackId, 'mtParTrk': mtPar_list}
return rt, base
[docs]def save_seq(stack, out_dir, prefix, dig_num=3, dtype='uint16', base=0, img_format='.tif', keep_chn=True, sep='-'):
"""Save image stack and label sequentially.
Args:
stack (numpy array) : image stack in THW format (Time, Height, Width).
out_dir (str) : output directory.
prefix (str) : prefix of single slice, output will be prefix-000x.tif/png.
(see sep below for separator).
dig_num (int) : digit number (3 -> 00x) for labeling image sequentially.
dtype (numpy.dtype) : data type to save, either 'uint8' or 'uint16'.
base (int) : base number of the label (starting from).
img_format (str): image format, '.tif' or '.png', remind the dot.
keep_chn (bool): whether to keep full channel or not.
sep (str): separator between file name and id, default '-'.
"""
if len(stack.shape) == 4 and not keep_chn:
stack = stack[:, :, :, 0]
for i in range(stack.shape[0]):
fm = ("%0" + str(dig_num) + "d") % (i + base)
name = os.path.join(out_dir, prefix + sep + fm + img_format)
if dtype == 'uint16':
img = img_as_uint(stack[i, :])
elif dtype == 'uint8':
img = img_as_ubyte(stack[i, :])
else:
raise ValueError("Seq save only accepts uint8 or uint16 format.")
io.imsave(name, img)
return
[docs]def findM(gt_cls, direction='begin'):
"""Find M exit/entry from ground truth classification.
The method assumes that all mitosis classification is continuous, therefore only suitable.
for processing classification ground truth. For processing prediction, use `pcnaDeep.refiner.deduce_transition`.
Args:
gt_cls (list): list of classifications.
direction (str): begin/end, search M from which terminal of the classification list.
Returns:
int: index of the mitosis entry/exit.
"""
# possible for parent end with 'G', but daughter must begin with 'M'
i = 0
if direction == 'begin':
if gt_cls[0] != 'M':
return None
while gt_cls[i] == 'M':
i += 1
if i == len(gt_cls):
break
return i - 1
else:
gt_cls = gt_cls[::-1]
if 'M' not in gt_cls:
return None
i = gt_cls.index('M')
while gt_cls[i] == 'M':
i += 1
if i == len(gt_cls):
break
return -i
[docs]def check_continuous_track(table):
"""Check if every track is continuous (no gap). Returns trackID list that is gaped.
"""
out = []
for i in list(np.unique(table['trackId'])):
f = table[table['trackId'] == i]['frame'].tolist()
if f[-1] - f[0] != len(f) - 1:
out.append(i)
return out