233 строки
9.1 KiB
Python
233 строки
9.1 KiB
Python
import os
|
|
import onnx
|
|
import unittest
|
|
import numpy as np
|
|
from numpy.testing import assert_almost_equal, assert_array_equal
|
|
from onnx import helper, onnx_pb as onnx_proto
|
|
import onnxruntime as _ort
|
|
from onnxruntime_extensions import (
|
|
onnx_op, PyCustomOpDef, make_onnx_model,
|
|
get_library_path as _get_library_path)
|
|
|
|
|
|
def _create_test_model_test():
|
|
nodes = [helper.make_node('CustomOpOne', ['input_1', 'input_2'], ['output_1'],
|
|
domain='ai.onnx.contrib'),
|
|
helper.make_node('CustomOpTwo', ['output_1'], ['output'],
|
|
domain='ai.onnx.contrib')]
|
|
|
|
input0 = helper.make_tensor_value_info(
|
|
'input_1', onnx_proto.TensorProto.FLOAT, [3, 5])
|
|
input1 = helper.make_tensor_value_info(
|
|
'input_2', onnx_proto.TensorProto.FLOAT, [3, 5])
|
|
output0 = helper.make_tensor_value_info(
|
|
'output', onnx_proto.TensorProto.INT32, [3, 5])
|
|
|
|
graph = helper.make_graph(nodes, 'test0', [input0, input1], [output0])
|
|
model = helper.make_model(
|
|
graph, opset_imports=[helper.make_operatorsetid('ai.onnx.contrib', 1)], ir_version=7)
|
|
return model
|
|
|
|
|
|
def _create_test_model():
|
|
nodes = []
|
|
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
|
|
nodes[1:] = [helper.make_node('PyReverseMatrix',
|
|
['identity1'], ['reversed'],
|
|
domain='ai.onnx.contrib')]
|
|
|
|
input0 = helper.make_tensor_value_info(
|
|
'input_1', onnx_proto.TensorProto.FLOAT, [None, 2])
|
|
output0 = helper.make_tensor_value_info(
|
|
'reversed', onnx_proto.TensorProto.FLOAT, [None, 2])
|
|
|
|
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
def _create_test_model_double(prefix, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
|
|
nodes[1:] = [helper.make_node('%sAddEpsilon' % prefix,
|
|
['identity1'], ['customout'],
|
|
domain=domain)]
|
|
|
|
input0 = helper.make_tensor_value_info(
|
|
'input_1', onnx_proto.TensorProto.DOUBLE, [None, None])
|
|
output0 = helper.make_tensor_value_info(
|
|
'customout', onnx_proto.TensorProto.DOUBLE, [None, None])
|
|
|
|
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
def _create_test_model_2outputs(prefix, domain='ai.onnx.contrib'):
|
|
nodes = [
|
|
helper.make_node('Identity', ['x'], ['identity1']),
|
|
helper.make_node(
|
|
'%sNegPos' % prefix, ['identity1'], ['neg', 'pos'],
|
|
domain=domain)
|
|
]
|
|
|
|
input0 = helper.make_tensor_value_info(
|
|
'x', onnx_proto.TensorProto.FLOAT, None)
|
|
output1 = helper.make_tensor_value_info(
|
|
'neg', onnx_proto.TensorProto.FLOAT, None)
|
|
output2 = helper.make_tensor_value_info(
|
|
'pos', onnx_proto.TensorProto.FLOAT, None)
|
|
|
|
graph = helper.make_graph(nodes, 'test0', [input0], [output1, output2])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
def _create_test_join():
|
|
nodes = []
|
|
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
|
|
nodes[1:] = [helper.make_node('PyOpJoin',
|
|
['identity1'], ['joined'],
|
|
sep=';',
|
|
domain='ai.onnx.contrib')]
|
|
|
|
input0 = helper.make_tensor_value_info(
|
|
'input_1', onnx_proto.TensorProto.STRING, [None, None])
|
|
output0 = helper.make_tensor_value_info(
|
|
'joined', onnx_proto.TensorProto.STRING, [None])
|
|
|
|
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
class TestPythonOp(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
@onnx_op(op_type="CustomOpOne",
|
|
inputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
|
|
def custom_one_op(x, y):
|
|
return np.add(x, y)
|
|
|
|
@onnx_op(op_type="CustomOpTwo",
|
|
outputs=[PyCustomOpDef.dt_int32])
|
|
def custom_two_op(f):
|
|
return np.round(f).astype(np.int32)
|
|
|
|
@onnx_op(op_type="PyReverseMatrix")
|
|
def reverse_matrix(x):
|
|
# The user custom op implementation here.
|
|
return np.flip(x, axis=0).astype(np.float32)
|
|
|
|
@onnx_op(op_type="PyAddEpsilon",
|
|
inputs=[PyCustomOpDef.dt_double],
|
|
outputs=[PyCustomOpDef.dt_double])
|
|
def add_epsilon(x):
|
|
# The user custom op implementation here.
|
|
return x + 1e-3
|
|
|
|
@onnx_op(op_type="PyNegPos",
|
|
inputs=[PyCustomOpDef.dt_float],
|
|
outputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
|
|
def negpos(x):
|
|
neg = x.copy()
|
|
pos = x.copy()
|
|
neg[x > 0] = 0
|
|
pos[x < 0] = 0
|
|
return neg, pos
|
|
|
|
@onnx_op(op_type="PyOpJoin",
|
|
inputs=[PyCustomOpDef.dt_string],
|
|
outputs=[PyCustomOpDef.dt_string],
|
|
attrs=['sep'])
|
|
def join(xs, **kwargs):
|
|
sep = kwargs.get('sep', '')
|
|
res = []
|
|
for x in xs:
|
|
res.append(sep.join(x))
|
|
return np.array(res, dtype=object)
|
|
|
|
def test_python_operator(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_model = _create_test_model()
|
|
self.assertIn('op_type: "PyReverseMatrix"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
input_1 = np.array(
|
|
[1, 2, 3, 4, 5, 6]).astype(np.float32).reshape([3, 2])
|
|
txout = sess.run(None, {'input_1': input_1})
|
|
assert_almost_equal(txout[0], np.array([[5., 6.], [3., 4.], [1., 2.]]))
|
|
|
|
def test_add_epsilon_python(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_model = _create_test_model_double('Py')
|
|
self.assertIn('op_type: "PyAddEpsilon"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
input_1 = np.array([[0., 1., 1.5], [7., 8., -5.5]])
|
|
txout = sess.run(None, {'input_1': input_1})
|
|
diff = txout[0] - input_1 - 1e-3
|
|
assert_almost_equal(diff, np.zeros(diff.shape))
|
|
|
|
def test_python_negpos(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_model = _create_test_model_2outputs('Py')
|
|
self.assertIn('op_type: "PyNegPos"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
|
|
neg, pos = sess.run(None, {'x': x})
|
|
diff = x - (neg + pos)
|
|
assert_almost_equal(diff, np.zeros(diff.shape))
|
|
|
|
def test_cc_negpos(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_model = _create_test_model_2outputs("")
|
|
self.assertIn('op_type: "NegPos"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
|
|
neg, pos = sess.run(None, {'x': x})
|
|
diff = x - (neg + pos)
|
|
assert_almost_equal(diff, np.zeros(diff.shape))
|
|
|
|
def test_check_saved_model(self):
|
|
this = os.path.dirname(__file__)
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_content = _create_test_model_test()
|
|
onnx_bytes = onnx_content.SerializeToString()
|
|
with open(os.path.join(this, 'data', 'custom_op_test.onnx'),
|
|
'rb') as f:
|
|
saved = f.read()
|
|
self.assertEqual(onnx_content, onnx.load(os.path.join(this, 'data', 'custom_op_test.onnx')))
|
|
|
|
def test_cc_operator(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_content = _create_test_model_test()
|
|
self.assertIn('op_type: "CustomOpOne"', str(onnx_content))
|
|
ser = onnx_content.SerializeToString()
|
|
sess0 = _ort.InferenceSession(ser, so, providers=['CPUExecutionProvider'])
|
|
res = sess0.run(None, {
|
|
'input_1': np.random.rand(3, 5).astype(np.float32),
|
|
'input_2': np.random.rand(3, 5).astype(np.float32)})
|
|
self.assertEqual(res[0].shape, (3, 5))
|
|
|
|
def test_python_join(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
onnx_model = _create_test_join()
|
|
self.assertIn('op_type: "PyOpJoin"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
arr = np.array([["a", "b"]], dtype=object)
|
|
txout = sess.run(None, {'input_1': arr})
|
|
exp = np.array(["a;b"], dtype=object)
|
|
assert_array_equal(txout[0][0], exp[0])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|