576 строки
23 KiB
Python
576 строки
23 KiB
Python
# coding: utf-8
|
|
import unittest
|
|
import os
|
|
import base64
|
|
import numpy as np
|
|
import onnx
|
|
from numpy.testing import assert_almost_equal, assert_equal
|
|
from onnx import helper, onnx_pb as onnx_proto
|
|
from transformers import AutoTokenizer
|
|
import onnxruntime as _ort
|
|
from onnxruntime_extensions import (
|
|
util,
|
|
onnx_op,
|
|
PyCustomOpDef,
|
|
OrtPyFunction,
|
|
make_onnx_model,
|
|
get_library_path as _get_library_path)
|
|
|
|
|
|
_is_tensorflow_available = False
|
|
try:
|
|
import tensorflow as tf
|
|
from tensorflow_text import SentencepieceTokenizer
|
|
_is_tensorflow_available = True
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
def load_piece(name):
|
|
fullname = os.path.join(
|
|
os.path.dirname(__file__), "data",
|
|
"%s_%s.txt" % (
|
|
os.path.splitext(os.path.split(__file__)[-1])[0],
|
|
name))
|
|
with open(fullname, "r") as f:
|
|
content = f.read()
|
|
t = base64.decodebytes(content.encode())
|
|
b64 = base64.b64encode(t)
|
|
return np.array(list(t), dtype=np.uint8), b64
|
|
|
|
|
|
def _create_test_model_sentencepiece(
|
|
prefix, model_b64, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
mkv = helper.make_tensor_value_info
|
|
if model_b64 is None:
|
|
nodes.append(helper.make_node(
|
|
'%sSentencepieceTokenizer' % prefix,
|
|
inputs=[
|
|
'model', # model__6
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
],
|
|
outputs=['out0', 'out1'],
|
|
name='SentencepieceTokenizeOpName',
|
|
domain=domain,
|
|
))
|
|
inputs = [
|
|
mkv('model', onnx_proto.TensorProto.UINT8, [None]),
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
else:
|
|
nodes.append(helper.make_node(
|
|
'%sSentencepieceTokenizer' % prefix,
|
|
inputs=[
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
],
|
|
outputs=['out0', 'out1'],
|
|
model=model_b64,
|
|
name='SentencepieceTokenizeOpName',
|
|
domain='ai.onnx.contrib',
|
|
))
|
|
inputs = [
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
graph = helper.make_graph(
|
|
nodes, 'test0', inputs, [
|
|
mkv('out0', onnx_proto.TensorProto.INT32, [None]),
|
|
mkv('out1', onnx_proto.TensorProto.INT64, [None])
|
|
])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
def _create_test_model_ragged_to_sparse(
|
|
prefix, model_b64, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
mkv = helper.make_tensor_value_info
|
|
if model_b64 is None:
|
|
nodes.append(helper.make_node(
|
|
'%sSentencepieceTokenizer' % prefix,
|
|
inputs=[
|
|
'model', # model__6
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
],
|
|
outputs=['tokout0', 'tokout1'],
|
|
name='SentencepieceTokenizeOpName',
|
|
domain=domain,
|
|
))
|
|
inputs = [
|
|
mkv('model', onnx_proto.TensorProto.UINT8, [None]),
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
nodes.append(helper.make_node(
|
|
'%sRaggedTensorToSparse' % prefix,
|
|
inputs=['tokout1', 'tokout0'],
|
|
outputs=['out0', 'out1', 'out2'],
|
|
name='RaggedTensorToSparse',
|
|
domain='ai.onnx.contrib',
|
|
))
|
|
else:
|
|
nodes.append(helper.make_node(
|
|
'%sSentencepieceTokenizer' % prefix,
|
|
inputs=[
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
],
|
|
outputs=['tokout0', 'tokout1'],
|
|
model=model_b64,
|
|
name='SentencepieceTokenizeOpName',
|
|
domain='ai.onnx.contrib',
|
|
))
|
|
inputs = [
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
nodes.append(helper.make_node(
|
|
'Shape', inputs=['tokout1'], outputs=['n_els']))
|
|
|
|
nodes.append(helper.make_node(
|
|
'RaggedTensorToSparse',
|
|
inputs=['tokout1'],
|
|
outputs=['out0', 'out2'],
|
|
name='RaggedTensorToSparse',
|
|
domain='ai.onnx.contrib',
|
|
))
|
|
|
|
nodes.append(helper.make_node(
|
|
'Identity', inputs=['tokout0'], outputs=['out1']))
|
|
|
|
graph = helper.make_graph(
|
|
nodes, 'test0', inputs, [
|
|
mkv('out0', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('out1', onnx_proto.TensorProto.INT32, [None]),
|
|
mkv('out2', onnx_proto.TensorProto.INT64, [None])
|
|
])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
|
|
def _create_test_model_ragged_to_dense(
|
|
prefix, model_b64, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
mkv = helper.make_tensor_value_info
|
|
nodes.append(helper.make_node(
|
|
'%sSentencepieceTokenizer' % prefix,
|
|
inputs=[
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
],
|
|
outputs=['tokout0', 'tokout1'],
|
|
model=model_b64,
|
|
name='SentencepieceTokenizeOpName',
|
|
domain=domain,
|
|
))
|
|
inputs = [
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
nodes.append(helper.make_node(
|
|
'Shape', inputs=['tokout1'], outputs=['n_els']))
|
|
nodes.append(helper.make_node(
|
|
'Cast', inputs=['tokout0'], outputs=['tokout064'], to=onnx_proto.TensorProto.INT64))
|
|
|
|
default_value = helper.make_tensor("default_value", onnx_proto.TensorProto.INT64, [1, ], [-1])
|
|
unused = helper.make_tensor("unused", onnx_proto.TensorProto.INT64, [0, ], [])
|
|
|
|
nodes.append(helper.make_node(
|
|
'%sRaggedTensorToDense' % prefix,
|
|
inputs=['unused', 'tokout064', 'default_value', 'tokout1'],
|
|
outputs=['out0'],
|
|
name='RaggedTensorToDense',
|
|
domain='ai.onnx.contrib',
|
|
))
|
|
|
|
nodes.append(helper.make_node(
|
|
'Identity', inputs=['tokout0'], outputs=['out1']))
|
|
|
|
graph = helper.make_graph(
|
|
nodes, 'test0', inputs, [
|
|
mkv('out0', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('out1', onnx_proto.TensorProto.INT32, [None]),
|
|
], [default_value, unused])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
def _create_test_model_sentencepiece_fairseq(
|
|
model, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
mkv = helper.make_tensor_value_info
|
|
nodes.append(helper.make_node(
|
|
'SentencepieceTokenizer',
|
|
inputs=[
|
|
'inputs', # inputs
|
|
'nbest_size',
|
|
'alpha',
|
|
'add_bos',
|
|
'add_eos',
|
|
'reverse',
|
|
'fairseq'
|
|
],
|
|
outputs=['out0', 'out1', 'out2'],
|
|
model=model,
|
|
name='SentencepieceTokenizeOpName',
|
|
domain='ai.onnx.contrib'
|
|
))
|
|
inputs = [
|
|
mkv('inputs', onnx_proto.TensorProto.STRING, [None]),
|
|
mkv('nbest_size', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('alpha', onnx_proto.TensorProto.FLOAT, [None]),
|
|
mkv('add_bos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('add_eos', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('reverse', onnx_proto.TensorProto.BOOL, [None]),
|
|
mkv('fairseq', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
graph = helper.make_graph(
|
|
nodes, 'test0', inputs, [
|
|
mkv('out0', onnx_proto.TensorProto.INT32, [None]),
|
|
mkv('out1', onnx_proto.TensorProto.INT64, [None]),
|
|
mkv('out2', onnx_proto.TensorProto.INT32, [None])
|
|
])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
def _create_test_model_sentencepiece_fairseq_decoder(
|
|
model, domain='ai.onnx.contrib'):
|
|
nodes = []
|
|
mkv = helper.make_tensor_value_info
|
|
nodes.append(helper.make_node(
|
|
'SentencepieceDecoder',
|
|
inputs=['ids', 'fairseq'],
|
|
outputs=['str'],
|
|
model=model,
|
|
name='SentencepieceDecodeOpName',
|
|
domain='ai.onnx.contrib'
|
|
))
|
|
inputs = [
|
|
mkv('ids', onnx.TensorProto.INT64, [None]),
|
|
mkv('fairseq', onnx_proto.TensorProto.BOOL, [None])
|
|
]
|
|
|
|
graph = helper.make_graph(
|
|
nodes, 'test0', inputs, [
|
|
mkv('str', onnx_proto.TensorProto.STRING, [None])
|
|
])
|
|
model = make_onnx_model(graph)
|
|
return model
|
|
|
|
@unittest.skipIf(not _is_tensorflow_available, "tensorflow/tensorflow-text is unavailable")
|
|
class TestPythonOpSentencePiece(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
@onnx_op(op_type="PySentencepieceTokenizer",
|
|
inputs=[PyCustomOpDef.dt_uint8, # 0: input,
|
|
PyCustomOpDef.dt_string, # 1: input
|
|
PyCustomOpDef.dt_int64, # 2: nbest_size
|
|
PyCustomOpDef.dt_float, # 3: alpha
|
|
PyCustomOpDef.dt_bool, # 4: add_bos
|
|
PyCustomOpDef.dt_bool, # 5: add_eos
|
|
PyCustomOpDef.dt_bool], # 6: reverse
|
|
outputs=[PyCustomOpDef.dt_int32,
|
|
PyCustomOpDef.dt_int64])
|
|
def sentence_piece_tokenizer_op(model, inputs, nbest_size,
|
|
alpha, add_bos, add_eos, reverse):
|
|
"""Implements `text.SentencepieceTokenizer
|
|
<https://github.com/tensorflow/text/blob/master/docs/
|
|
api_docs/python/text/SentencepieceTokenizer.md>`_."""
|
|
# The custom op implementation.
|
|
tokenizer = SentencepieceTokenizer(
|
|
model=model.tobytes(),
|
|
reverse=reverse[0],
|
|
add_bos=add_bos[0],
|
|
add_eos=add_eos[0],
|
|
alpha=alpha[0],
|
|
nbest_size=nbest_size[0])
|
|
ragged_tensor = tokenizer.tokenize(inputs)
|
|
output_values = ragged_tensor.flat_values.numpy()
|
|
output_splits = ragged_tensor.nested_row_splits[0].numpy()
|
|
return output_values, output_splits
|
|
|
|
cls.SentencepieceTokenizer = sentence_piece_tokenizer_op
|
|
|
|
@onnx_op(op_type="PyRaggedTensorToSparse",
|
|
inputs=[PyCustomOpDef.dt_int64,
|
|
PyCustomOpDef.dt_int32],
|
|
outputs=[PyCustomOpDef.dt_int64,
|
|
PyCustomOpDef.dt_int32,
|
|
PyCustomOpDef.dt_int64])
|
|
def ragged_tensor_to_sparse(nested_splits, dense_values):
|
|
sparse_indices, sparse_values, sparse_dense_shape = \
|
|
tf.raw_ops.RaggedTensorToSparse(
|
|
rt_nested_splits=[nested_splits],
|
|
rt_dense_values=dense_values)
|
|
return (sparse_indices.numpy(),
|
|
sparse_values.numpy(),
|
|
sparse_dense_shape.numpy())
|
|
|
|
cls.RaggedTensorToSparse = ragged_tensor_to_sparse
|
|
|
|
def test_string_ragged_string_to_sparse_python(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
model, model_b64 = load_piece('model__6')
|
|
onnx_model = _create_test_model_ragged_to_sparse('Py', None)
|
|
self.assertIn('op_type: "PyRaggedTensorToSparse"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
["Hello world", "Hello world louder"], dtype=object),
|
|
nbest_size=np.array([0], dtype=np.int64),
|
|
alpha=np.array([0], dtype=np.float32),
|
|
add_bos=np.array([0], dtype=np.bool_),
|
|
add_eos=np.array([0], dtype=np.bool_),
|
|
reverse=np.array([0], dtype=np.bool_))
|
|
txout = sess.run(None, inputs)
|
|
temp = self.SentencepieceTokenizer(**inputs)
|
|
exp = self.RaggedTensorToSparse(temp[1], temp[0])
|
|
for i in range(0, 3):
|
|
assert_almost_equal(exp[i], txout[i])
|
|
|
|
def test_string_ragged_string_to_sparse_cc(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
model, model_b64 = load_piece('model__6')
|
|
onnx_model = _create_test_model_ragged_to_sparse('', model_b64)
|
|
self.assertIn('op_type: "RaggedTensorToSparse"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
["Hello world", "Hello world louder"], dtype=object),
|
|
nbest_size=np.array([0], dtype=np.int64),
|
|
alpha=np.array([0], dtype=np.float32),
|
|
add_bos=np.array([0], dtype=np.bool_),
|
|
add_eos=np.array([0], dtype=np.bool_),
|
|
reverse=np.array([0], dtype=np.bool_))
|
|
temp = self.SentencepieceTokenizer(**inputs)
|
|
exp = self.RaggedTensorToSparse(temp[1], temp[0])
|
|
del inputs['model']
|
|
txout = sess.run(None, inputs)
|
|
assert_almost_equal(exp[0], txout[0])
|
|
assert_almost_equal(exp[1], txout[1])
|
|
assert_almost_equal(exp[2], txout[2])
|
|
|
|
def test_string_ragged_string_to_dense_cc(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
model, model_b64 = load_piece('model__6')
|
|
onnx_model = _create_test_model_ragged_to_dense('', model_b64)
|
|
self.assertIn('op_type: "RaggedTensorToDense"', str(onnx_model))
|
|
sess = _ort.InferenceSession(onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
["Hello world", "Hello world louder"], dtype=object),
|
|
nbest_size=np.array([0], dtype=np.int64),
|
|
alpha=np.array([0], dtype=np.float32),
|
|
add_bos=np.array([0], dtype=np.bool_),
|
|
add_eos=np.array([0], dtype=np.bool_),
|
|
reverse=np.array([0], dtype=np.bool_))
|
|
del inputs['model']
|
|
txout = sess.run(None, inputs)
|
|
assert_almost_equal(
|
|
txout[0], np.array([[17486, 1017, -1, -1], [17486, 1017, 155, 21869]], dtype=np.int64))
|
|
assert_almost_equal(
|
|
txout[1], np.array([17486, 1017, 17486, 1017, 155, 21869], dtype=np.int32))
|
|
|
|
def test_string_sentencepiece_tokenizer(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
model, model_b64 = load_piece('model__6')
|
|
py_onnx_model = _create_test_model_sentencepiece('Py', None)
|
|
self.assertIn(
|
|
'op_type: "PySentencepieceTokenizer"', str(py_onnx_model))
|
|
cc_onnx_model = _create_test_model_sentencepiece('', model_b64)
|
|
self.assertIn('op_type: "SentencepieceTokenizer"', str(cc_onnx_model))
|
|
py_sess = _ort.InferenceSession(py_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
cc_sess = _ort.InferenceSession(cc_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
for alpha in [0, 0.5]:
|
|
for nbest_size in [0, 1]:
|
|
for bools in range(0, 8):
|
|
with self.subTest(
|
|
alpha=alpha, nbest_size=nbest_size, bools=bools):
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
["Hello world", "Hello world louder"],
|
|
dtype=object),
|
|
nbest_size=np.array(
|
|
[nbest_size], dtype=np.int64),
|
|
alpha=np.array([alpha], dtype=np.float32),
|
|
add_bos=np.array([bools & 1], dtype=np.bool_),
|
|
add_eos=np.array([bools & 2], dtype=np.bool_),
|
|
reverse=np.array([bools & 4], dtype=np.bool_))
|
|
exp = self.SentencepieceTokenizer(**inputs)
|
|
py_txout = py_sess.run(None, inputs)
|
|
del inputs['model']
|
|
cc_txout = cc_sess.run(None, inputs)
|
|
for i in range(0, 2):
|
|
assert_almost_equal(exp[i], py_txout[i])
|
|
assert_almost_equal(exp[i], cc_txout[i])
|
|
|
|
def test_string_sentencepiece_tokenizer_bin(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
model, model_b64 = load_piece('model__6')
|
|
modelb = bytes(model)
|
|
py_onnx_model = _create_test_model_sentencepiece('Py', None)
|
|
self.assertIn(
|
|
'op_type: "PySentencepieceTokenizer"', str(py_onnx_model))
|
|
cc_onnx_model = _create_test_model_sentencepiece('', modelb)
|
|
self.assertIn('op_type: "SentencepieceTokenizer"', str(cc_onnx_model))
|
|
py_sess = _ort.InferenceSession(py_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
cc_sess = _ort.InferenceSession(cc_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
alpha = 0
|
|
nbest_size = 0
|
|
bools = 0
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
["Hello world", "Hello world louder"],
|
|
dtype=object),
|
|
nbest_size=np.array(
|
|
[nbest_size], dtype=np.int64),
|
|
alpha=np.array([alpha], dtype=np.float32),
|
|
add_bos=np.array([bools & 1], dtype=np.bool_),
|
|
add_eos=np.array([bools & 2], dtype=np.bool_),
|
|
reverse=np.array([bools & 4], dtype=np.bool_))
|
|
exp = self.SentencepieceTokenizer(**inputs)
|
|
py_txout = py_sess.run(None, inputs)
|
|
del inputs['model']
|
|
cc_txout = cc_sess.run(None, inputs)
|
|
for i in range(0, 2):
|
|
assert_almost_equal(exp[i], py_txout[i])
|
|
assert_almost_equal(exp[i], cc_txout[i])
|
|
|
|
def test_xlm_roberta_tokenizer(self):
|
|
so = _ort.SessionOptions()
|
|
so.register_custom_ops_library(_get_library_path())
|
|
tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base", use_fast=False)
|
|
text = "Wow, these models are getting popular."
|
|
ids = tokenizer.encode(text, return_tensors="np")
|
|
model = util.read_file(tokenizer.vocab_file, 'rb')
|
|
cc_onnx_model = _create_test_model_sentencepiece_fairseq(model)
|
|
cc_decoder_onnx_model = _create_test_model_sentencepiece_fairseq_decoder(model)
|
|
self.assertIn('op_type: "SentencepieceTokenizer"', str(cc_onnx_model))
|
|
self.assertIn('op_type: "SentencepieceDecoder"', str(cc_decoder_onnx_model))
|
|
cc_sess = _ort.InferenceSession(cc_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
cc_decoder_sess = _ort.InferenceSession(cc_decoder_onnx_model.SerializeToString(), so, providers=['CPUExecutionProvider'])
|
|
|
|
inputs = dict(
|
|
model=model,
|
|
inputs=np.array(
|
|
[text],
|
|
dtype=object),
|
|
nbest_size=np.array(
|
|
[0], dtype=np.int64),
|
|
alpha=np.array([0], dtype=np.float32),
|
|
add_bos=np.array([True], dtype=np.bool_),
|
|
add_eos=np.array([True], dtype=np.bool_),
|
|
reverse=np.array([False], dtype=np.bool_),
|
|
fairseq=np.array([True], dtype=np.bool_))
|
|
del inputs['model']
|
|
cc_txout = cc_sess.run(None, inputs)
|
|
|
|
decoder_inputs = dict(
|
|
ids=np.array(
|
|
cc_txout[0],
|
|
dtype=np.int64),
|
|
fairseq=np.array([True], dtype=np.bool_))
|
|
cc_decoder_out = cc_decoder_sess.run(None, decoder_inputs)
|
|
|
|
assert_equal(ids[0], cc_txout[0])
|
|
assert_equal(text, cc_decoder_out[0])
|
|
assert_equal(cc_txout[2], [0, 0, 3, 4, 10, 17, 21, 29, 37, 38])
|
|
|
|
|
|
class TestOrtXSentencePiece(unittest.TestCase):
|
|
def test_external_pretrained_model(self):
|
|
fullname = util.get_test_data_file('data', 'en.wiki.bpe.vs100000.model')
|
|
ofunc = OrtPyFunction.from_customop('SentencepieceTokenizer', model=open(fullname, 'rb').read())
|
|
|
|
alpha = 0
|
|
nbest_size = 0
|
|
flags = 0
|
|
tokens, instance_indices, token_indices = ofunc(
|
|
np.array(['best hotel in bay area.']),
|
|
np.array(
|
|
[nbest_size], dtype=np.int64),
|
|
np.array([alpha], dtype=np.float32),
|
|
np.array([flags & 1], dtype=np.bool_),
|
|
np.array([flags & 2], dtype=np.bool_),
|
|
np.array([flags & 4], dtype=np.bool_),
|
|
np.array([False], dtype=np.bool_))
|
|
self.assertEqual(tokens.tolist(), [1095, 4054, 26, 2022, 755, 99935])
|
|
|
|
|
|
def test_spm_decoder(self):
|
|
fullname = util.get_test_data_file('data', 'en.wiki.bpe.vs100000.model')
|
|
ofunc = OrtPyFunction.from_customop('SentencepieceDecoder', model=open(fullname, 'rb').read())
|
|
|
|
result = ofunc(np.array([1095, 4054, 26, 2022, 755, 99935], dtype=np.int64))
|
|
self.assertEqual(' '.join(result), 'best hotel in bay area.')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|