ort-customops/test/test_pyops.py

233 строки
9.1 KiB
Python

import os
import onnx
import unittest
import numpy as np
from numpy.testing import assert_almost_equal, assert_array_equal
from onnx import helper, onnx_pb as onnx_proto
import onnxruntime as _ort
from onnxruntime_extensions import (
onnx_op, PyCustomOpDef, make_onnx_model,
get_library_path as _get_library_path)
def _create_test_model_test():
nodes = [helper.make_node('CustomOpOne', ['input_1', 'input_2'], ['output_1'],
domain='ai.onnx.contrib'),
helper.make_node('CustomOpTwo', ['output_1'], ['output'],
domain='ai.onnx.contrib')]
input0 = helper.make_tensor_value_info(
'input_1', onnx_proto.TensorProto.FLOAT, [3, 5])
input1 = helper.make_tensor_value_info(
'input_2', onnx_proto.TensorProto.FLOAT, [3, 5])
output0 = helper.make_tensor_value_info(
'output', onnx_proto.TensorProto.INT32, [3, 5])
graph = helper.make_graph(nodes, 'test0', [input0, input1], [output0])
model = helper.make_model(
graph, opset_imports=[helper.make_operatorsetid('ai.onnx.contrib', 1)], ir_version=7)
return model
def _create_test_model():
nodes = []
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
nodes[1:] = [helper.make_node('PyReverseMatrix',
['identity1'], ['reversed'],
domain='ai.onnx.contrib')]
input0 = helper.make_tensor_value_info(
'input_1', onnx_proto.TensorProto.FLOAT, [None, 2])
output0 = helper.make_tensor_value_info(
'reversed', onnx_proto.TensorProto.FLOAT, [None, 2])
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
model = make_onnx_model(graph)
return model
def _create_test_model_double(prefix, domain='ai.onnx.contrib'):
nodes = []
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
nodes[1:] = [helper.make_node('%sAddEpsilon' % prefix,
['identity1'], ['customout'],
domain=domain)]
input0 = helper.make_tensor_value_info(
'input_1', onnx_proto.TensorProto.DOUBLE, [None, None])
output0 = helper.make_tensor_value_info(
'customout', onnx_proto.TensorProto.DOUBLE, [None, None])
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
model = make_onnx_model(graph)
return model
def _create_test_model_2outputs(prefix, domain='ai.onnx.contrib'):
nodes = [
helper.make_node('Identity', ['x'], ['identity1']),
helper.make_node(
'%sNegPos' % prefix, ['identity1'], ['neg', 'pos'],
domain=domain)
]
input0 = helper.make_tensor_value_info(
'x', onnx_proto.TensorProto.FLOAT, None)
output1 = helper.make_tensor_value_info(
'neg', onnx_proto.TensorProto.FLOAT, None)
output2 = helper.make_tensor_value_info(
'pos', onnx_proto.TensorProto.FLOAT, None)
graph = helper.make_graph(nodes, 'test0', [input0], [output1, output2])
model = make_onnx_model(graph)
return model
def _create_test_join():
nodes = []
nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
nodes[1:] = [helper.make_node('PyOpJoin',
['identity1'], ['joined'],
sep=';',
domain='ai.onnx.contrib')]
input0 = helper.make_tensor_value_info(
'input_1', onnx_proto.TensorProto.STRING, [None, None])
output0 = helper.make_tensor_value_info(
'joined', onnx_proto.TensorProto.STRING, [None])
graph = helper.make_graph(nodes, 'test0', [input0], [output0])
model = make_onnx_model(graph)
return model
class TestPythonOp(unittest.TestCase):
@classmethod
def setUpClass(cls):
@onnx_op(op_type="CustomOpOne",
inputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
def custom_one_op(x, y):
return np.add(x, y)
@onnx_op(op_type="CustomOpTwo",
outputs=[PyCustomOpDef.dt_int32])
def custom_two_op(f):
return np.round(f).astype(np.int32)
@onnx_op(op_type="PyReverseMatrix")
def reverse_matrix(x):
# The user custom op implementation here.
return np.flip(x, axis=0).astype(np.float32)
@onnx_op(op_type="PyAddEpsilon",
inputs=[PyCustomOpDef.dt_double],
outputs=[PyCustomOpDef.dt_double])
def add_epsilon(x):
# The user custom op implementation here.
return x + 1e-3
@onnx_op(op_type="PyNegPos",
inputs=[PyCustomOpDef.dt_float],
outputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
def negpos(x):
neg = x.copy()
pos = x.copy()
neg[x > 0] = 0
pos[x < 0] = 0
return neg, pos
@onnx_op(op_type="PyOpJoin",
inputs=[PyCustomOpDef.dt_string],
outputs=[PyCustomOpDef.dt_string],
attrs=['sep'])
def join(xs, **kwargs):
sep = kwargs.get('sep', '')
res = []
for x in xs:
res.append(sep.join(x))
return np.array(res, dtype=object)
def test_python_operator(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_model = _create_test_model()
self.assertIn('op_type: "PyReverseMatrix"', str(onnx_model))
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
input_1 = np.array(
[1, 2, 3, 4, 5, 6]).astype(np.float32).reshape([3, 2])
txout = sess.run(None, {'input_1': input_1})
assert_almost_equal(txout[0], np.array([[5., 6.], [3., 4.], [1., 2.]]))
def test_add_epsilon_python(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_model = _create_test_model_double('Py')
self.assertIn('op_type: "PyAddEpsilon"', str(onnx_model))
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
input_1 = np.array([[0., 1., 1.5], [7., 8., -5.5]])
txout = sess.run(None, {'input_1': input_1})
diff = txout[0] - input_1 - 1e-3
assert_almost_equal(diff, np.zeros(diff.shape))
def test_python_negpos(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_model = _create_test_model_2outputs('Py')
self.assertIn('op_type: "PyNegPos"', str(onnx_model))
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
neg, pos = sess.run(None, {'x': x})
diff = x - (neg + pos)
assert_almost_equal(diff, np.zeros(diff.shape))
def test_cc_negpos(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_model = _create_test_model_2outputs("")
self.assertIn('op_type: "NegPos"', str(onnx_model))
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
neg, pos = sess.run(None, {'x': x})
diff = x - (neg + pos)
assert_almost_equal(diff, np.zeros(diff.shape))
def test_check_saved_model(self):
this = os.path.dirname(__file__)
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_content = _create_test_model_test()
onnx_bytes = onnx_content.SerializeToString()
with open(os.path.join(this, 'data', 'custom_op_test.onnx'),
'rb') as f:
saved = f.read()
self.assertEqual(onnx_content, onnx.load(os.path.join(this, 'data', 'custom_op_test.onnx')))
def test_cc_operator(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_content = _create_test_model_test()
self.assertIn('op_type: "CustomOpOne"', str(onnx_content))
ser = onnx_content.SerializeToString()
sess0 = _ort.InferenceSession(ser, so, providers=['CPUExecutionProvider'])
res = sess0.run(None, {
'input_1': np.random.rand(3, 5).astype(np.float32),
'input_2': np.random.rand(3, 5).astype(np.float32)})
self.assertEqual(res[0].shape, (3, 5))
def test_python_join(self):
so = _ort.SessionOptions()
so.register_custom_ops_library(_get_library_path())
onnx_model = _create_test_join()
self.assertIn('op_type: "PyOpJoin"', str(onnx_model))
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
arr = np.array([["a", "b"]], dtype=object)
txout = sess.run(None, {'input_1': arr})
exp = np.array(["a;b"], dtype=object)
assert_array_equal(txout[0][0], exp[0])
if __name__ == "__main__":
unittest.main()