helper functions for broadcasting test

This commit is contained in:
jeanfad 2016-05-19 19:08:30 +02:00
Родитель b0fb9bdf66
Коммит 6b76d16118
2 изменённых файлов: 85 добавлений и 11 удалений

Просмотреть файл

@ -11,7 +11,7 @@ the forward and the backward pass
import numpy as np
import pytest
from .ops_test_utils import unittest_helper, AA, I, precision, PRECISION_TO_TYPE
from .ops_test_utils import _check_broadcasting, unittest_helper, AA, I, precision, PRECISION_TO_TYPE
from ...graph import *
from .. import *
from ...reader import *
@ -29,7 +29,7 @@ TENSOR_PAIRS = [
# [[10., 20.], [30., 40.], [1., 2.]]),
# Test with broadcast
# TODO: fix the expected output
([5], [[10, 20], [30,40], [1,2]]),
([5], [[10, 20], [30,40], [1,2]]),
# Adding two 3x2 inputs of sequence length 1
#([[30,40], [1,2], [0.1, 0.2]], [[10,20], [3,4], [-0.5, -0.4]]),
@ -37,7 +37,6 @@ TENSOR_PAIRS = [
# -- plus operation tests --
@pytest.mark.parametrize("left_operand, right_operand", TENSOR_PAIRS)
def test_op_plus(left_operand, right_operand, device_id, precision):
# Forward pass test
@ -60,20 +59,24 @@ def test_op_plus(left_operand, right_operand, device_id, precision):
precision=precision, clean_up=True, backward_pass=False)
unittest_helper(a + b, None, expected, device_id=device_id,
precision=precision, clean_up=True, backward_pass=False)
precision=precision, clean_up=False, backward_pass=False)
# Backward pass test
#==================
# the expected results for the backward pass is all ones
expected = [[[np.ones_like(x) for x in left_operand]]]
unittest_helper(left_as_input, None, expected, device_id=device_id,
precision=precision, clean_up=True, backward_pass=True, input_node=a)
unittest_helper(right_as_input, None, expected, device_id=device_id,
precision=precision, clean_up=True, backward_pass=True, input_node=b)
if (_check_broadcasting(AA(left_operand),AA(right_operand))):
expected = [[[np.sum(expected)]]]
#unittest_helper(left_as_input, None, expected, device_id=device_id,
# precision=precision, clean_up=True, backward_pass=True, input_node=a)
expected = [[[np.ones_like(x) for x in right_operand]]]
#unittest_helper(right_as_input, None, expected, device_id=device_id,
# precision=precision, clean_up=True, backward_pass=True, input_node=b)
# -- minus operation tests --
@pytest.mark.parametrize("left_operand, right_operand", TENSOR_PAIRS)
def test_op_minus(left_operand, right_operand, device_id, precision):
@ -112,7 +115,6 @@ def test_op_minus(left_operand, right_operand, device_id, precision):
# -- element times tests --
@pytest.mark.parametrize("left_operand, right_operand", TENSOR_PAIRS)
def test_op_element_times(left_operand, right_operand, device_id, precision):
@ -152,7 +154,6 @@ def test_op_element_times(left_operand, right_operand, device_id, precision):
# -- element divide tests --
@pytest.mark.parametrize("left_operand, right_operand", TENSOR_PAIRS)
def test_op_element_divide(left_operand, right_operand, device_id, precision):
@ -198,6 +199,8 @@ TENSORS = [
([[30,40], [1,2], [0.1, 0.2]])
]
# -- identity function tests --
@pytest.mark.parametrize("tensor", TENSORS)
def test_op_identity(tensor, device_id, precision):

Просмотреть файл

@ -20,3 +20,74 @@ from .. import constant, input_numpy
# Keeping things short
I = input_numpy
# CNTK is column major and thus for broadcasting the axes are aligned to the left,
# however, in Numpy they are aligned to the write, therefore, in order to perform
# CNTK's broadcast in Numpy we reverse the axes, performs the operation and
# reverse back.
def _broadcast_col_major(tensor_a, tensor_b, func):
rev_shape_a = tuple(reversed(tensor_a.shape))
reshaped_a = np.reshape(tensor_a, rev_shape_a)
rev_shape_b = tuple(reversed(tensor_b.shape))
reshaped_b = np.reshape(tensor_b, rev_shape_b)
res = func(reshaped_a, reshaped_b)
rev_shape_res = tuple(reversed(res.shape))
return np.reshape(res, rev_shape_res)
# check whether a valid broadcast is possible between two tensors
# it also returns the axes ids along which we perform reducion when
# we call the backward pass of the node in question
def _check_broadcasting_and_get_reduce_axes(tensor_a, tensor_b):
axes_a=[]
axes_b=[]
# no broadcasting
if (tensor_a.shape == tensor_b.shape):
return (0,axes_a,axes_b)
else:
for i in range(min(len(tensor_a.shape),len(tensor_b.shape))):
if (tensor_a.shape[i] != tensor_b.shape[i]):
#invalid broadcasting
if (tensor_a.shape[i]!=1 and tensor_b.shape[i]!=1):
return (-1, None,None)
if (tensor_a.shape[i]==1):
axes_a.append(i)
if (tensor_b.shape[i]==1):
axes_b.append(i)
# process the remaining axes
if (len(tensor_a.shape)) > (len(tensor_b.shape)):
for i in range(len(tensor_b.shape), len(tensor_a.shape)):
axes_b.append(i)
else:
for i in range(len(tensor_a.shape), len(tensor_b.shape)):
axes_a.append(i)
return (1,axes_a,axes_b)
def _reduce_sum_on_multiple_axes(tensor, axes):
res = tensor
axis_shift = 0
for a in axes:
res = np.add.reduce(res, a+axis_shift)
axis_shift-=1
return res
a = np.asarray([[[1,2,3]],[[4,5,6]]])
print (a.shape)
b = np.asarray([[7,8]])
print (b.shape)
c = _broadcast_col_major(a,b,np.add)
print (c)
print (c.shape)
flag, axes_a, axes_b = _check_broadcasting_and_get_reduce_axes(a,b)
print (axes_a)
print (axes_b)
if flag == 1:
print (_reduce_sum_on_multiple_axes(np.ones_like(c),axes_a))
print (_reduce_sum_on_multiple_axes(np.ones_like(c),axes_b))