411 строки
18 KiB
Python
411 строки
18 KiB
Python
from __future__ import print_function
|
|
import sys
|
|
import time
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import torch.optim as optim
|
|
import torch.backends.cudnn as cudnn
|
|
import numpy as np
|
|
import os
|
|
import random
|
|
import math
|
|
import shutil
|
|
import argparse
|
|
from torchvision import datasets, transforms
|
|
from torch.autograd import Variable # Useful info about autograd: http://pytorch.org/docs/master/notes/autograd.html
|
|
|
|
import dataset
|
|
from utils import *
|
|
from cfg import parse_cfg
|
|
from region_loss import RegionLoss
|
|
from darknet import Darknet
|
|
from MeshPly import MeshPly
|
|
|
|
import warnings
|
|
warnings.filterwarnings("ignore")
|
|
|
|
# Create new directory
|
|
def makedirs(path):
|
|
if not os.path.exists( path ):
|
|
os.makedirs( path )
|
|
|
|
# Adjust learning rate during training, learning schedule can be changed in network config file
|
|
def adjust_learning_rate(optimizer, batch):
|
|
lr = learning_rate
|
|
for i in range(len(steps)):
|
|
scale = scales[i] if i < len(scales) else 1
|
|
if batch >= steps[i]:
|
|
lr = lr * scale
|
|
if batch == steps[i]:
|
|
break
|
|
else:
|
|
break
|
|
for param_group in optimizer.param_groups:
|
|
param_group['lr'] = lr/batch_size
|
|
return lr
|
|
|
|
def train(epoch):
|
|
|
|
global processed_batches
|
|
|
|
# Initialize timer
|
|
t0 = time.time()
|
|
|
|
# Get the dataloader for training dataset
|
|
train_loader = torch.utils.data.DataLoader(dataset.listDataset(trainlist,
|
|
shape=(init_width, init_height),
|
|
shuffle=True,
|
|
transform=transforms.Compose([transforms.ToTensor(),]),
|
|
train=True,
|
|
seen=model.seen,
|
|
batch_size=batch_size,
|
|
num_workers=num_workers,
|
|
bg_file_names=bg_file_names),
|
|
batch_size=batch_size, shuffle=False, **kwargs)
|
|
|
|
# TRAINING
|
|
lr = adjust_learning_rate(optimizer, processed_batches)
|
|
logging('epoch %d, processed %d samples, lr %f' % (epoch, epoch * len(train_loader.dataset), lr))
|
|
# Start training
|
|
model.train()
|
|
t1 = time.time()
|
|
avg_time = torch.zeros(9)
|
|
niter = 0
|
|
# Iterate through batches
|
|
for batch_idx, (data, target) in enumerate(train_loader):
|
|
t2 = time.time()
|
|
# adjust learning rate
|
|
adjust_learning_rate(optimizer, processed_batches)
|
|
processed_batches = processed_batches + 1
|
|
# Pass the data to GPU
|
|
if use_cuda:
|
|
data = data.cuda()
|
|
t3 = time.time()
|
|
# Wrap tensors in Variable class for automatic differentiation
|
|
data, target = Variable(data), Variable(target)
|
|
t4 = time.time()
|
|
# Zero the gradients before running the backward pass
|
|
optimizer.zero_grad()
|
|
t5 = time.time()
|
|
# Forward pass
|
|
output = model(data)
|
|
t6 = time.time()
|
|
model.seen = model.seen + data.data.size(0)
|
|
region_loss.seen = region_loss.seen + data.data.size(0)
|
|
# Compute loss, grow an array of losses for saving later on
|
|
loss = region_loss(output, target, epoch)
|
|
training_iters.append(epoch * math.ceil(len(train_loader.dataset) / float(batch_size) ) + niter)
|
|
training_losses.append(convert2cpu(loss.data))
|
|
niter += 1
|
|
t7 = time.time()
|
|
# Backprop: compute gradient of the loss with respect to model parameters
|
|
loss.backward()
|
|
t8 = time.time()
|
|
# Update weights
|
|
optimizer.step()
|
|
t9 = time.time()
|
|
# Print time statistics
|
|
if False and batch_idx > 1:
|
|
avg_time[0] = avg_time[0] + (t2-t1)
|
|
avg_time[1] = avg_time[1] + (t3-t2)
|
|
avg_time[2] = avg_time[2] + (t4-t3)
|
|
avg_time[3] = avg_time[3] + (t5-t4)
|
|
avg_time[4] = avg_time[4] + (t6-t5)
|
|
avg_time[5] = avg_time[5] + (t7-t6)
|
|
avg_time[6] = avg_time[6] + (t8-t7)
|
|
avg_time[7] = avg_time[7] + (t9-t8)
|
|
avg_time[8] = avg_time[8] + (t9-t1)
|
|
print('-------------------------------')
|
|
print(' load data : %f' % (avg_time[0]/(batch_idx)))
|
|
print(' cpu to cuda : %f' % (avg_time[1]/(batch_idx)))
|
|
print('cuda to variable : %f' % (avg_time[2]/(batch_idx)))
|
|
print(' zero_grad : %f' % (avg_time[3]/(batch_idx)))
|
|
print(' forward feature : %f' % (avg_time[4]/(batch_idx)))
|
|
print(' forward loss : %f' % (avg_time[5]/(batch_idx)))
|
|
print(' backward : %f' % (avg_time[6]/(batch_idx)))
|
|
print(' step : %f' % (avg_time[7]/(batch_idx)))
|
|
print(' total : %f' % (avg_time[8]/(batch_idx)))
|
|
t1 = time.time()
|
|
t1 = time.time()
|
|
return epoch * math.ceil(len(train_loader.dataset) / float(batch_size) ) + niter - 1
|
|
|
|
def test(epoch, niter):
|
|
def truths_length(truths):
|
|
for i in range(50):
|
|
if truths[i][1] == 0:
|
|
return i
|
|
|
|
# Set the module in evaluation mode (turn off dropout, batch normalization etc.)
|
|
model.eval()
|
|
|
|
# Parameters
|
|
num_classes = model.num_classes
|
|
anchors = model.anchors
|
|
num_anchors = model.num_anchors
|
|
testtime = True
|
|
testing_error_trans = 0.0
|
|
testing_error_angle = 0.0
|
|
testing_error_pixel = 0.0
|
|
testing_samples = 0.0
|
|
errs_2d = []
|
|
errs_3d = []
|
|
errs_trans = []
|
|
errs_angle = []
|
|
errs_corner2D = []
|
|
logging(" Testing...")
|
|
logging(" Number of test samples: %d" % len(test_loader.dataset))
|
|
notpredicted = 0
|
|
# Iterate through test examples
|
|
for batch_idx, (data, target) in enumerate(test_loader):
|
|
t1 = time.time()
|
|
# Pass the data to GPU
|
|
if use_cuda:
|
|
data = data.cuda()
|
|
target = target.cuda()
|
|
# Wrap tensors in Variable class, set volatile=True for inference mode and to use minimal memory during inference
|
|
data = Variable(data, volatile=True)
|
|
t2 = time.time()
|
|
# Formward pass
|
|
output = model(data).data
|
|
t3 = time.time()
|
|
# Using confidence threshold, eliminate low-confidence predictions
|
|
all_boxes = get_region_boxes(output, num_classes, num_keypoints)
|
|
t4 = time.time()
|
|
# Iterate through all batch elements
|
|
for box_pr, target in zip([all_boxes], [target[0]]):
|
|
# For each image, get all the targets (for multiple object pose estimation, there might be more than 1 target per image)
|
|
truths = target.view(-1, num_keypoints*2+3)
|
|
# Get how many objects are present in the scene
|
|
num_gts = truths_length(truths)
|
|
# Iterate through each ground-truth object
|
|
for k in range(num_gts):
|
|
box_gt = list()
|
|
for j in range(1, 2*num_keypoints+1):
|
|
box_gt.append(truths[k][j])
|
|
box_gt.extend([1.0, 1.0])
|
|
box_gt.append(truths[k][0])
|
|
|
|
# Denormalize the corner predictions
|
|
corners2D_gt = np.array(np.reshape(box_gt[:num_keypoints*2], [num_keypoints, 2]), dtype='float32')
|
|
corners2D_pr = np.array(np.reshape(box_pr[:num_keypoints*2], [num_keypoints, 2]), dtype='float32')
|
|
corners2D_gt[:, 0] = corners2D_gt[:, 0] * im_width
|
|
corners2D_gt[:, 1] = corners2D_gt[:, 1] * im_height
|
|
corners2D_pr[:, 0] = corners2D_pr[:, 0] * im_width
|
|
corners2D_pr[:, 1] = corners2D_pr[:, 1] * im_height
|
|
|
|
# Compute corner prediction error
|
|
corner_norm = np.linalg.norm(corners2D_gt - corners2D_pr, axis=1)
|
|
corner_dist = np.mean(corner_norm)
|
|
errs_corner2D.append(corner_dist)
|
|
|
|
# Compute [R|t] by pnp
|
|
R_gt, t_gt = pnp(np.array(np.transpose(np.concatenate((np.zeros((3, 1)), corners3D[:3, :]), axis=1)), dtype='float32'), corners2D_gt, np.array(internal_calibration, dtype='float32'))
|
|
R_pr, t_pr = pnp(np.array(np.transpose(np.concatenate((np.zeros((3, 1)), corners3D[:3, :]), axis=1)), dtype='float32'), corners2D_pr, np.array(internal_calibration, dtype='float32'))
|
|
|
|
# Compute errors
|
|
# Compute translation error
|
|
trans_dist = np.sqrt(np.sum(np.square(t_gt - t_pr)))
|
|
errs_trans.append(trans_dist)
|
|
|
|
# Compute angle error
|
|
angle_dist = calcAngularDistance(R_gt, R_pr)
|
|
errs_angle.append(angle_dist)
|
|
|
|
# Compute pixel error
|
|
Rt_gt = np.concatenate((R_gt, t_gt), axis=1)
|
|
Rt_pr = np.concatenate((R_pr, t_pr), axis=1)
|
|
proj_2d_gt = compute_projection(vertices, Rt_gt, internal_calibration)
|
|
proj_2d_pred = compute_projection(vertices, Rt_pr, internal_calibration)
|
|
norm = np.linalg.norm(proj_2d_gt - proj_2d_pred, axis=0)
|
|
pixel_dist = np.mean(norm)
|
|
errs_2d.append(pixel_dist)
|
|
|
|
# Compute 3D distances
|
|
transform_3d_gt = compute_transformation(vertices, Rt_gt)
|
|
transform_3d_pred = compute_transformation(vertices, Rt_pr)
|
|
norm3d = np.linalg.norm(transform_3d_gt - transform_3d_pred, axis=0)
|
|
vertex_dist = np.mean(norm3d)
|
|
errs_3d.append(vertex_dist)
|
|
|
|
# Sum errors
|
|
testing_error_trans += trans_dist
|
|
testing_error_angle += angle_dist
|
|
testing_error_pixel += pixel_dist
|
|
testing_samples += 1
|
|
|
|
t5 = time.time()
|
|
|
|
# Compute 2D projection, 6D pose and 5cm5degree scores
|
|
px_threshold = 5 # 5 pixel threshold for 2D reprojection error is standard in recent sota 6D object pose estimation works
|
|
eps = 1e-5
|
|
acc = len(np.where(np.array(errs_2d) <= px_threshold)[0]) * 100. / (len(errs_2d)+eps)
|
|
acc3d = len(np.where(np.array(errs_3d) <= vx_threshold)[0]) * 100. / (len(errs_3d)+eps)
|
|
acc5cm5deg = len(np.where((np.array(errs_trans) <= 0.05) & (np.array(errs_angle) <= 5))[0]) * 100. / (len(errs_trans)+eps)
|
|
corner_acc = len(np.where(np.array(errs_corner2D) <= px_threshold)[0]) * 100. / (len(errs_corner2D)+eps)
|
|
mean_err_2d = np.mean(errs_2d)
|
|
mean_corner_err_2d = np.mean(errs_corner2D)
|
|
nts = float(testing_samples)
|
|
|
|
if testtime:
|
|
print('-----------------------------------')
|
|
print(' tensor to cuda : %f' % (t2 - t1))
|
|
print(' predict : %f' % (t3 - t2))
|
|
print('get_region_boxes : %f' % (t4 - t3))
|
|
print(' eval : %f' % (t5 - t4))
|
|
print(' total : %f' % (t5 - t1))
|
|
print('-----------------------------------')
|
|
|
|
# Print test statistics
|
|
logging(" Mean corner error is %f" % (mean_corner_err_2d))
|
|
logging(' Acc using {} px 2D Projection = {:.2f}%'.format(px_threshold, acc))
|
|
logging(' Acc using {} vx 3D Transformation = {:.2f}%'.format(vx_threshold, acc3d))
|
|
logging(' Acc using 5 cm 5 degree metric = {:.2f}%'.format(acc5cm5deg))
|
|
logging(' Translation error: %f, angle error: %f' % (testing_error_trans/(nts+eps), testing_error_angle/(nts+eps)) )
|
|
|
|
# Register losses and errors for saving later on
|
|
testing_iters.append(niter)
|
|
testing_errors_trans.append(testing_error_trans/(nts+eps))
|
|
testing_errors_angle.append(testing_error_angle/(nts+eps))
|
|
testing_errors_pixel.append(testing_error_pixel/(nts+eps))
|
|
testing_accuracies.append(acc)
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# Parse configuration files
|
|
parser = argparse.ArgumentParser(description='SingleShotPose')
|
|
parser.add_argument('--datacfg', type=str, default='cfg/ape.data') # data config
|
|
parser.add_argument('--modelcfg', type=str, default='cfg/yolo-pose.cfg') # network config
|
|
parser.add_argument('--initweightfile', type=str, default='cfg/darknet19_448.conv.23') # imagenet initialized weights
|
|
parser.add_argument('--pretrain_num_epochs', type=int, default=15) # how many epoch to pretrain
|
|
args = parser.parse_args()
|
|
datacfg = args.datacfg
|
|
modelcfg = args.modelcfg
|
|
initweightfile = args.initweightfile
|
|
pretrain_num_epochs = args.pretrain_num_epochs
|
|
|
|
# Parse configuration files
|
|
data_options = read_data_cfg(datacfg)
|
|
net_options = parse_cfg(modelcfg)[0]
|
|
trainlist = data_options['train']
|
|
testlist = data_options['valid']
|
|
gpus = data_options['gpus']
|
|
meshname = data_options['mesh']
|
|
num_workers = int(data_options['num_workers'])
|
|
backupdir = data_options['backup']
|
|
vx_threshold = float(data_options['diam']) * 0.1 # threshold for the ADD metric
|
|
if not os.path.exists(backupdir):
|
|
makedirs(backupdir)
|
|
batch_size = int(net_options['batch'])
|
|
max_batches = int(net_options['max_batches'])
|
|
learning_rate = float(net_options['learning_rate'])
|
|
momentum = float(net_options['momentum'])
|
|
decay = float(net_options['decay'])
|
|
nsamples = file_lines(trainlist)
|
|
batch_size = int(net_options['batch'])
|
|
nbatches = nsamples / batch_size
|
|
steps = [float(step)*nbatches for step in net_options['steps'].split(',')]
|
|
scales = [float(scale) for scale in net_options['scales'].split(',')]
|
|
bg_file_names = get_all_files('VOCdevkit/VOC2012/JPEGImages')
|
|
|
|
# Train parameters
|
|
max_epochs = int(net_options['max_epochs'])
|
|
num_keypoints = int(net_options['num_keypoints'])
|
|
|
|
# Test parameters
|
|
im_width = int(data_options['width'])
|
|
im_height = int(data_options['height'])
|
|
fx = float(data_options['fx'])
|
|
fy = float(data_options['fy'])
|
|
u0 = float(data_options['u0'])
|
|
v0 = float(data_options['v0'])
|
|
test_width = int(net_options['test_width'])
|
|
test_height = int(net_options['test_height'])
|
|
|
|
# Specify which gpus to use
|
|
use_cuda = True
|
|
seed = int(time.time())
|
|
torch.manual_seed(seed)
|
|
if use_cuda:
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = gpus
|
|
torch.cuda.manual_seed(seed)
|
|
|
|
# Specifiy the model and the loss
|
|
model = Darknet(modelcfg)
|
|
region_loss = RegionLoss(num_keypoints=9, num_classes=1, anchors=[], num_anchors=1, pretrain_num_epochs=15)
|
|
|
|
# Model settings
|
|
model.load_weights_until_last(initweightfile)
|
|
model.print_network()
|
|
model.seen = 0
|
|
region_loss.iter = model.iter
|
|
region_loss.seen = model.seen
|
|
processed_batches = model.seen//batch_size
|
|
init_width = model.width
|
|
init_height = model.height
|
|
init_epoch = model.seen//nsamples
|
|
|
|
# Variable to save
|
|
training_iters = []
|
|
training_losses = []
|
|
testing_iters = []
|
|
testing_losses = []
|
|
testing_errors_trans = []
|
|
testing_errors_angle = []
|
|
testing_errors_pixel = []
|
|
testing_accuracies = []
|
|
|
|
# Get the intrinsic camerea matrix, mesh, vertices and corners of the model
|
|
mesh = MeshPly(meshname)
|
|
vertices = np.c_[np.array(mesh.vertices), np.ones((len(mesh.vertices), 1))].transpose()
|
|
corners3D = get_3D_corners(vertices)
|
|
internal_calibration = get_camera_intrinsic(u0, v0, fx, fy)
|
|
|
|
|
|
# Specify the number of workers
|
|
kwargs = {'num_workers': num_workers, 'pin_memory': True} if use_cuda else {}
|
|
|
|
# Get the dataloader for test data
|
|
test_loader = torch.utils.data.DataLoader(dataset.listDataset(testlist,
|
|
shape=(test_width, test_height),
|
|
shuffle=False,
|
|
transform=transforms.Compose([transforms.ToTensor(),]),
|
|
train=False),
|
|
batch_size=1, shuffle=False, **kwargs)
|
|
|
|
# Pass the model to GPU
|
|
if use_cuda:
|
|
model = model.cuda() # model = torch.nn.DataParallel(model, device_ids=[0]).cuda() # Multiple GPU parallelism
|
|
|
|
# Get the optimizer
|
|
params_dict = dict(model.named_parameters())
|
|
params = []
|
|
for key, value in params_dict.items():
|
|
if key.find('.bn') >= 0 or key.find('.bias') >= 0:
|
|
params += [{'params': [value], 'weight_decay': 0.0}]
|
|
else:
|
|
params += [{'params': [value], 'weight_decay': decay*batch_size}]
|
|
optimizer = optim.SGD(model.parameters(), lr=learning_rate/batch_size, momentum=momentum, dampening=0, weight_decay=decay*batch_size)
|
|
|
|
best_acc = -sys.maxsize
|
|
for epoch in range(init_epoch, max_epochs):
|
|
# TRAIN
|
|
niter = train(epoch)
|
|
# TEST and SAVE
|
|
if (epoch % 10 == 0) and (epoch > 15):
|
|
test(epoch, niter)
|
|
logging('save training stats to %s/costs.npz' % (backupdir))
|
|
np.savez(os.path.join(backupdir, "costs.npz"),
|
|
training_iters=training_iters,
|
|
training_losses=training_losses,
|
|
testing_iters=testing_iters,
|
|
testing_accuracies=testing_accuracies,
|
|
testing_errors_pixel=testing_errors_pixel,
|
|
testing_errors_angle=testing_errors_angle)
|
|
if (testing_accuracies[-1] > best_acc ):
|
|
best_acc = testing_accuracies[-1]
|
|
logging('best model so far!')
|
|
logging('save weights to %s/model.weights' % (backupdir))
|
|
model.save_weights('%s/model.weights' % (backupdir))
|
|
# shutil.copy2('%s/model.weights' % (backupdir), '%s/model_backup.weights' % (backupdir))
|