Further updates to tracking 01 notebook

This commit is contained in:
Karine Ip 2020-07-06 17:55:28 -04:00 коммит произвёл GitHub
Родитель 965e14dbdc
Коммит 07ce0f1893
5 изменённых файлов: 706 добавлений и 212 удалений

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

1
utils_cv/tracking/data.py Normal file → Executable file
Просмотреть файл

@ -10,7 +10,6 @@ class Urls:
fridge_objects_path = urljoin(base, "odFridgeObjects_FairMOT-Format.zip")
carcans_annotations_path = urljoin(base, "carcans_vott-csv-export.zip")
carcans_video_path = urljoin(base, "car_cans_8s.mp4")
@classmethod
def all(cls) -> List[str]:

Просмотреть файл

@ -2,16 +2,19 @@
# Licensed under the MIT License.
from collections import OrderedDict
import numpy as np
import os
import os.path as osp
from typing import Dict, List
import numpy as np
from torch.utils.data import DataLoader
from torchvision.transforms import transforms as T
from .bbox import TrackingBbox
from .references.fairmot.datasets.dataset.jde import JointDataset
from .opts import opts
from ..common.gpu import db_num_workers
from .opts import opts
from .references.fairmot.datasets.dataset.jde import JointDataset
class TrackingDataset:
@ -74,10 +77,10 @@ def boxes_to_mot(results: Dict[int, List[TrackingBbox]]) -> None:
[
bb.frame_id,
bb.track_id,
bb.top,
bb.left,
bb.bottom - bb.top,
bb.top,
bb.right - bb.left,
bb.bottom - bb.top,
1,
-1,
-1,

Просмотреть файл

@ -1,26 +1,26 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
from collections import OrderedDict, defaultdict
from collections import defaultdict
from copy import deepcopy
import glob
import requests
import os
import os.path as osp
import tempfile
from typing import Dict, List, Optional, Tuple
import cv2
import matplotlib.pyplot as plt
import motmetrics as mm
import numpy as np
import torch
import torch.cuda as cuda
import torch.nn as nn
from torch.utils.data import DataLoader
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import motmetrics as mm
from .bbox import TrackingBbox
from ..common.gpu import torch_device
from .dataset import TrackingDataset, boxes_to_mot
from .opts import opts
from .references.fairmot.datasets.dataset.jde import LoadImages, LoadVideo
from .references.fairmot.models.model import (
@ -32,12 +32,6 @@ from .references.fairmot.tracker.multitracker import JDETracker
from .references.fairmot.tracking_utils.evaluation import Evaluator
from .references.fairmot.trains.train_factory import train_factory
from .bbox import TrackingBbox
from .dataset import TrackingDataset, boxes_to_mot
from .opts import opts
from .plot import draw_boxes, assign_colors
from ..common.gpu import torch_device
def _get_gpu_str():
if cuda.is_available():
@ -47,79 +41,43 @@ def _get_gpu_str():
return "-1" # cpu
def write_video(results: Dict[int, List[TrackingBbox]],
input_video: str,
output_video: str) -> None:
"""
Plot the predicted tracks on the input video. Write the output to {output_path}.
Args:
results: dictionary mapping frame id to a list of predicted TrackingBboxes
input_video: path to the input video
output_video: path to write out the output video
"""
results = OrderedDict(sorted(results.items()))
# read video and initialize new tracking video
def _get_frame(input_video: str, frame_id: int):
video = cv2.VideoCapture()
video.open(input_video)
image_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
image_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"MP4V")
frame_rate = int(video.get(cv2.CAP_PROP_FPS))
writer = cv2.VideoWriter(
output_video, fourcc, frame_rate, (image_width, image_height)
)
# assign bbox color per id
unique_ids = list(
set([bb.track_id for frame in results.values() for bb in frame])
)
color_map = assign_colors(unique_ids)
# create images and add to video writer, adapted from https://github.com/ZQPei/deep_sort_pytorch
frame_idx = 0
while video.grab():
_, cur_image = video.retrieve()
cur_tracks = results[frame_idx]
if len(cur_tracks) > 0:
cur_image = draw_boxes(cur_image, cur_tracks, color_map)
writer.write(cur_image)
frame_idx += 1
print(f"Output saved to {output_video}.")
video.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
_, im = video.read()
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
return im
def savetxt_results(results: Dict[int, List[TrackingBbox]],
exp_name: str = 'results',
root_path: str = None,
result_filename: str = 'results.txt') -> str:
"""Save tracking results to txt in tmp directory or provided path.
def savetxt_results(
results: Dict[int, List[TrackingBbox]],
exp_name: str,
root_path: str,
result_filename: str,
) -> str:
"""Save tracking results to txt in provided path.
Args:
results: prediction results from predict() function, i.e. Dict[int, List[TrackingBbox]]
exp_name: subfolder for each experiment
root_path: results saved root path. Default: None
result_filename: saved prediction results txt file; End with '.txt'
root_path: root path for results saved
result_filename: saved prediction results txt file; end with '.txt'
Returns:
result_path: saved prediction results txt file path
"""
if not root_path:
with tempfile.TemporaryDirectory() as tmpdir1:
os.makedirs(osp.join(tmpdir1, exp_name))
result_path = osp.join(tmpdir1, exp_name, result_filename)
else:
result_path = osp.join(root_path, exp_name, result_filename)
# Save results in MOT format for evaluation
# Convert prediction results to mot format
bboxes_mot = boxes_to_mot(results)
# Save results
result_path = osp.join(root_path, exp_name, result_filename)
np.savetxt(result_path, bboxes_mot, delimiter=",", fmt="%s")
return result_path
def evaluate_mot(gt_root_path: str,
exp_name: str,
result_path: str) -> object:
def evaluate_mot(gt_root_path: str, exp_name: str, result_path: str) -> object:
""" eval code that calls on 'motmetrics' package in referenced FairMOT script, to produce MOT metrics on inference, given ground-truth.
Args:
gt_root_path: path of dataset containing GT annotations in MOTchallenge format (xywh)
@ -137,15 +95,14 @@ def evaluate_mot(gt_root_path: str,
return mot_accumulator
def mot_summary(accumulators: list,
exp_names: list) -> str:
def mot_summary(accumulators: list, exp_names: list) -> str:
"""Given a list of MOTAccumulators, get total summary by method in 'motmetrics', containing metrics scores
Args:
accumulators: list of MOTAccumulators
exp_names: list of experiment names (str) corresponds to MOTAccumulators
Returns:
strsummary: pandas.DataFrame output by method in 'motmetrics', containing metrics scores
strsummary: str output by method in 'motmetrics', containing metrics scores
"""
metrics = mm.metrics.motchallenge_metrics
mh = mm.metrics.create()
@ -154,7 +111,7 @@ def mot_summary(accumulators: list,
strsummary = mm.io.render_summary(
summary,
formatters=mh.formatters,
namemap=mm.io.motchallenge_metric_names
namemap=mm.io.motchallenge_metric_names,
)
return strsummary
@ -164,11 +121,11 @@ class TrackingLearner(object):
"""Tracking Learner for Multi-Object Tracking"""
def __init__(
self,
dataset: Optional[TrackingDataset] = None,
model_path: Optional[str] = None,
arch: str = "dla_34",
head_conv: int = None,
self,
dataset: Optional[TrackingDataset] = None,
model_path: Optional[str] = None,
arch: str = "dla_34",
head_conv: int = None,
) -> None:
"""
Initialize learner object.
@ -209,7 +166,7 @@ class TrackingLearner(object):
self.opt.load_model = model_path
def fit(
self, lr: float = 1e-4, lr_step: str = "20,27", num_epochs: int = 30
self, lr: float = 1e-4, lr_step: str = "20,27", num_epochs: int = 30
) -> None:
"""
The main training loop.
@ -238,10 +195,12 @@ class TrackingLearner(object):
# initialize dataloader
train_loader = self.dataset.train_dl
self.model = create_model(
self.opt.arch, self.opt.heads, self.opt.head_conv
)
self.model = load_model(self.model, opt_fit.load_model)
self.optimizer = torch.optim.Adam(self.model.parameters(), opt_fit.lr)
start_epoch = 0
self.model = create_model(self.opt.arch, self.opt.heads, self.opt.head_conv)
self.model = load_model(self.model, opt_fit.load_model)
Trainer = train_factory[opt_fit.task]
trainer = Trainer(opt_fit.opt, self.model, self.optimizer)
@ -252,7 +211,7 @@ class TrackingLearner(object):
# training loop
for epoch in range(
start_epoch + 1, start_epoch + opt_fit.num_epochs + 1
start_epoch + 1, start_epoch + opt_fit.num_epochs + 1
):
print(
"=" * 5,
@ -262,7 +221,10 @@ class TrackingLearner(object):
self.epoch = epoch
log_dict_train, _ = trainer.train(epoch, train_loader)
for k, v in log_dict_train.items():
print(f"{k}: {v}")
if k == "time":
print(f"{k}:{v} min")
else:
print(f"{k}: {v}")
if epoch in opt_fit.lr_step:
lr = opt_fit.lr * (0.1 ** (opt_fit.lr_step.index(epoch) + 1))
for param_group in optimizer.param_groups:
@ -309,7 +271,7 @@ class TrackingLearner(object):
print(f"Model saved to {path}")
def evaluate(
self, results: Dict[int, List[TrackingBbox]], gt_root_path: str
self, results: Dict[int, List[TrackingBbox]], gt_root_path: str
) -> str:
"""
@ -324,14 +286,25 @@ class TrackingLearner(object):
"""
# Implementation inspired from code found here: https://github.com/ifzhang/FairMOT/blob/master/src/track.py
result_path = savetxt_results(results, exp_name="single_vid")
result_path = savetxt_results(
results, "single_vid", gt_root_path, "results.txt"
)
# Save tracking results in tmp
mot_accumulator = evaluate_mot(gt_root_path, "single_vid", result_path)
strsummary = mot_summary([mot_accumulator], ("single_vid",))
return strsummary
def eval_mot(self, conf_thres: float, track_buffer: int, im_size: Tuple[int, int], data_root: str,
seqs: list, result_root: str, exp_name: str, run_eval: bool = True) -> str:
def eval_mot(
self,
conf_thres: float,
track_buffer: int,
im_size: Tuple[int, int],
data_root: str,
seqs: list,
result_root: str,
exp_name: str,
run_eval: bool = True,
) -> str:
"""
Call the prediction function, saves the tracking results to txt file and provides the evaluation results with motmetrics format.
Args:
@ -351,24 +324,26 @@ class TrackingLearner(object):
os.makedirs(eval_path)
accumulators = []
for seq in seqs:
im_path = osp.join(data_root, seq, 'img1')
result_filename = '{}.txt'.format(seq)
im_path = osp.join(data_root, seq, "img1")
result_filename = "{}.txt".format(seq)
result_path = osp.join(result_root, exp_name, result_filename)
with open(osp.join(data_root, seq, 'seqinfo.ini')) as seqinfo_file:
with open(osp.join(data_root, seq, "seqinfo.ini")) as seqinfo_file:
meta_info = seqinfo_file.read()
# frame_rate is set from seqinfo.ini by frameRate
frame_rate = int(
meta_info[meta_info.find('frameRate') + 10:meta_info.find('\nseqLength')])
meta_info[
meta_info.find("frameRate")
+ 10 : meta_info.find("\nseqLength")
]
)
if not osp.exists(result_path):
# Run tracking.
eval_results = self.predict(
im_path,
conf_thres,
track_buffer,
im_size,
frame_rate)
result_path = savetxt_results(eval_results, exp_name, result_root,
result_filename)
im_path, conf_thres, track_buffer, im_size, frame_rate
)
result_path = savetxt_results(
eval_results, exp_name, result_root, result_filename
)
print(f"Saved tracking results to {result_path}")
if run_eval:
# eval
@ -382,14 +357,14 @@ class TrackingLearner(object):
return strsummary
def predict(
self,
im_or_video_path: str,
conf_thres: float = 0.6,
det_thres: float = 0.3,
nms_thres: float = 0.4,
track_buffer: int = 30,
min_box_area: float = 200,
frame_rate: int = 30,
self,
im_or_video_path: str,
conf_thres: float = 0.6,
det_thres: float = 0.3,
nms_thres: float = 0.4,
track_buffer: int = 30,
min_box_area: float = 200,
frame_rate: int = 30,
) -> Dict[int, List[TrackingBbox]]:
"""
Run inference on an image or video path.
@ -440,7 +415,7 @@ class TrackingLearner(object):
vertical = tlwh[2] / tlwh[3] > 1.6
if tlwh[2] * tlwh[3] > opt_pred.min_box_area and not vertical:
bb = TrackingBbox(
tlbr[1], tlbr[0], tlbr[3], tlbr[2], frame_id, tid
tlbr[0], tlbr[1], tlbr[2], tlbr[3], frame_id, tid
)
online_bboxes.append(bb)
out[frame_id] = online_bboxes
@ -470,28 +445,28 @@ class TrackingLearner(object):
# if path is to a root directory of images
if (
osp.isdir(im_or_video_path)
and len(
list(
filter(
lambda x: osp.splitext(x)[1].lower() in im_format,
sorted(glob.glob("%s/*.*" % im_or_video_path)),
osp.isdir(im_or_video_path)
and len(
list(
filter(
lambda x: osp.splitext(x)[1].lower() in im_format,
sorted(glob.glob("%s/*.*" % im_or_video_path)),
)
)
)
)
> 0
> 0
):
return LoadImages(im_or_video_path)
# if path is to a single video file
elif (
osp.isfile(im_or_video_path)
and osp.splitext(im_or_video_path)[1] in video_format
osp.isfile(im_or_video_path)
and osp.splitext(im_or_video_path)[1] in video_format
):
return LoadVideo(im_or_video_path)
# if path is to a single image file
elif (
osp.isfile(im_or_video_path)
and osp.splitext(im_or_video_path)[1] in im_format
osp.isfile(im_or_video_path)
and osp.splitext(im_or_video_path)[1] in im_format
):
return LoadImages(im_or_video_path)
else:

Просмотреть файл

@ -1,12 +1,131 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os.path as osp
from collections import OrderedDict
from typing import Dict, List, Tuple
import cv2
import decord
import io
import IPython.display
import numpy as np
from PIL import Image
from time import sleep
from .bbox import TrackingBbox
from .model import _get_frame
def plot_single_frame(
results: Dict[int, List[TrackingBbox]], input_video: str, frame_id: int
) -> None:
"""
Plot the bounding box and id on a wanted frame. Display as image to front end.
Args:
results: dictionary mapping frame id to a list of predicted TrackingBboxes
input_video: path to the input video
frame_id: frame_id for frame to show tracking result
"""
results = OrderedDict(sorted(results.items()))
# Assign bbox color per id
unique_ids = list(
set([bb.track_id for frame in results.values() for bb in frame])
)
color_map = assign_colors(unique_ids)
# Get frame from video
im = _get_frame(input_video, frame_id)
# Extract tracking results for wanted frame, and draw bboxes+tracking id, display frame
cur_tracks = results[frame_id]
if len(cur_tracks) > 0:
im = draw_boxes(im, cur_tracks, color_map)
im = Image.fromarray(im)
IPython.display.display(im)
def play_video(
results: Dict[int, List[TrackingBbox]], input_video: str
) -> None:
"""
Plot the predicted tracks on the input video. Displays to front-end as sequence of images stringed together in a video.
Args:
results: dictionary mapping frame id to a list of predicted TrackingBboxes
input_video: path to the input video
"""
results = OrderedDict(sorted(results.items()))
# assign bbox color per id
unique_ids = list(
set([bb.track_id for frame in results.values() for bb in frame])
)
color_map = assign_colors(unique_ids)
# read video and initialize new tracking video
video_reader = decord.VideoReader(input_video)
# set up ipython jupyter display
d_video = IPython.display.display("", display_id=1)
# Read each frame, add bbox+track id, display frame
for frame_idx in range(len(results)):
im = video_reader.next().asnumpy()
cur_tracks = results[frame_idx]
if len(cur_tracks) > 0:
cur_image = draw_boxes(im, cur_tracks, color_map)
f = io.BytesIO()
im = Image.fromarray(im)
im.save(f, "jpeg")
d_video.update(IPython.display.Image(data=f.getvalue()))
sleep(0.000001)
def write_video(
results: Dict[int, List[TrackingBbox]], input_video: str, output_video: str
) -> None:
"""
Plot the predicted tracks on the input video. Write the output to {output_path}.
Args:
results: dictionary mapping frame id to a list of predicted TrackingBboxes
input_video: path to the input video
output_video: path to write out the output video
"""
results = OrderedDict(sorted(results.items()))
# read video and initialize new tracking video
video = cv2.VideoCapture()
video.open(input_video)
im_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
im_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"MP4V")
frame_rate = int(video.get(cv2.CAP_PROP_FPS))
writer = cv2.VideoWriter(
output_video, fourcc, frame_rate, (im_width, im_height)
)
# assign bbox color per id
unique_ids = list(
set([bb.track_id for frame in results.values() for bb in frame])
)
color_map = assign_colors(unique_ids)
# create images and add to video writer, adapted from https://github.com/ZQPei/deep_sort_pytorch
frame_idx = 0
while video.grab():
_, im = video.retrieve()
cur_tracks = results[frame_idx]
if len(cur_tracks) > 0:
im = draw_boxes(im, cur_tracks, color_map)
writer.write(im)
frame_idx += 1
print(f"Output saved to {output_video}.")
def draw_boxes(
@ -65,7 +184,7 @@ def assign_colors(id_list: List[int],) -> Dict[int, Tuple[int, int, int]]:
# adapted from https://github.com/ZQPei/deep_sort_pytorch
for i in id_list2:
color = [int((p * ((i + 1) ** 5 - i + 1)) % 255) for p in palette]
color = [int((p * ((i + 1) ** 4 - i + 1)) % 255) for p in palette]
color_list.append(tuple(color))
color_map = dict(zip(id_list, color_list))