Не подтверждена Коммит bbe8c952 создал по автору Durman's avatar Durman Зафиксировано автором GitHub
Просмотр файлов

Merge pull request #4068 from nortikin/vectorization_decorator

Vectorization decorator
владельцы 74bfa4b3 714a89bc
......@@ -16,14 +16,106 @@
#
# ##### END GPL LICENSE BLOCK #####
from typing import List, Tuple
import numpy as np
import bpy
from bpy.props import BoolProperty, EnumProperty
from mathutils import Matrix
from sverchok.node_tree import SverchCustomTreeNode
from sverchok.data_structure import updateNode
from sverchok.data_structure import repeat_last
from sverchok.utils.mesh_functions import join_meshes, apply_matrix, meshes_py, to_elements, repeat_meshes, \
apply_matrices, meshes_np
from sverchok.utils.mesh_functions import apply_matrix_to_vertices_py
from sverchok.utils.vectorize import vectorize, devectorize, SvVerts, SvEdges, SvPolys
from sverchok.utils.modules.matrix_utils import matrix_apply_np
def apply_matrices(
*,
vertices: SvVerts,
edges: SvEdges,
polygons: SvPolys,
matrices: List[Matrix],
implementation: str = 'Python') -> Tuple[SvVerts, SvEdges, SvPolys]:
"""several matrices can be applied to a mesh
in this case each matrix will populate geometry inside object"""
if not matrices or (vertices is None or not len(vertices)):
return vertices, edges, polygons
if implementation == 'NumPy':
vertices = np.asarray(vertices, dtype=np.float32)
_apply_matrices = matrix_apply_np if isinstance(vertices, np.ndarray) else apply_matrix_to_vertices_py
sub_vertices = []
sub_edges = [edges] * len(matrices) if edges else None
sub_polygons = [polygons] * len(matrices) if polygons else None
for matrix in matrices:
sub_vertices.append(_apply_matrices(vertices, matrix))
out_vertices, out_edges, out_polygons = join_meshes(vertices=sub_vertices, edges=sub_edges, polygons=sub_polygons)
return out_vertices, out_edges, out_polygons
def apply_matrix(
*,
vertices: SvVerts,
edges: SvEdges,
polygons: SvPolys,
matrix: Matrix,
implementation: str = 'Python') -> Tuple[SvVerts, SvEdges, SvPolys]:
"""several matrices can be applied to a mesh
in this case each matrix will populate geometry inside object"""
if not matrix or (vertices is None or not len(vertices)):
return vertices, edges, polygons
if implementation == 'NumPy':
vertices = np.asarray(vertices, dtype=np.float32)
_apply_matrices = matrix_apply_np if isinstance(vertices, np.ndarray) else apply_matrix_to_vertices_py
new_vertices = _apply_matrices(vertices, matrix)
return new_vertices, edges, polygons
def join_meshes(*, vertices: List[SvVerts], edges: List[SvEdges], polygons: List[SvPolys]):
joined_vertices = []
joined_edges = []
joined_polygons = []
if not vertices:
return joined_vertices, joined_edges, joined_polygons
else:
if isinstance(vertices[0], np.ndarray):
joined_vertices = np.concatenate(vertices)
else:
joined_vertices = [v for vs in vertices for v in vs]
if edges:
vertexes_number = 0
for i, es in enumerate(edges):
if es:
if isinstance(es, np.ndarray):
joined_edges.extend((es + vertexes_number).tolist())
else:
joined_edges.extend([(e[0] + vertexes_number, e[1] + vertexes_number) for e in es])
vertexes_number += len(vertices[i])
if polygons:
vertexes_number = 0
for i, ps in enumerate(polygons):
if ps:
if isinstance(ps, np.ndarray):
joined_polygons.extend((ps + vertexes_number).tolist())
else:
joined_polygons.extend([[i + vertexes_number for i in p] for p in ps])
vertexes_number += len(vertices[i])
return joined_vertices, joined_edges, joined_polygons
class SvMatrixApplyJoinNode(bpy.types.Node, SverchCustomTreeNode):
......@@ -78,18 +170,30 @@ class SvMatrixApplyJoinNode(bpy.types.Node, SverchCustomTreeNode):
faces = self.inputs['Faces'].sv_get(default=[], deepcopy=False)
matrices = self.inputs['Matrices'].sv_get(default=[], deepcopy=False)
object_number = max([len(vertices), len(matrices)]) if vertices else 0
meshes = (meshes_py if self.implementation == 'Python' else meshes_np)(vertices, edges, faces)
meshes = repeat_meshes(meshes, object_number)
# fixing matrices nesting level if necessary, this is for back capability, can be removed later on
if matrices:
is_flat_list = not isinstance(matrices[0], (list, tuple))
meshes = (apply_matrix if is_flat_list else apply_matrices)(meshes, repeat_last(matrices))
if is_flat_list:
_apply_matrix = vectorize(apply_matrix, match_mode='REPEAT')
out_vertices, out_edges, out_polygons = _apply_matrix(
vertices=vertices, edges=edges, polygons=faces, matrix=matrices, implementation=self.implementation)
else:
_apply_matrix = vectorize(apply_matrices, match_mode="REPEAT")
out_vertices, out_edges, out_polygons = _apply_matrix(
vertices=vertices or None, edges=edges or None, polygons=faces or None, matrices=matrices or None,
implementation=self.implementation)
else:
out_vertices, out_edges, out_polygons = vertices, edges, faces
if self.do_join:
meshes = join_meshes(meshes)
_join_mesh = devectorize(join_meshes, match_mode="REPEAT")
out_vertices, out_edges, out_polygons = _join_mesh(
vertices=out_vertices, edges=out_edges, polygons=out_polygons)
out_vertices, out_edges, out_polygons = (
[out_vertices] if out_vertices is not None and len(out_vertices) else out_vertices,
[out_edges] if out_edges is not None and len(out_edges) else out_edges,
[out_polygons] if out_polygons is not None and len(out_polygons) else out_polygons)
out_vertices, out_edges, out_polygons = to_elements(meshes)
self.outputs['Vertices'].sv_set(out_vertices)
self.outputs['Edges'].sv_set(out_edges)
self.outputs['Faces'].sv_set(out_polygons)
......
from typing import List
from itertools import product
import numpy as np
from mathutils import Matrix
from sverchok.utils.testing import SverchokTestCase
import sverchok.nodes.matrix.apply_and_join as apply_mat
from sverchok.utils.vectorize import SvVerts, SvEdges, SvPolys
class NodeCommonTest(SverchokTestCase):
def setUp(self):
self.node_funcs = [apply_mat.apply_matrices, apply_mat.apply_matrix, apply_mat.join_meshes]
self.vertices = [[-1.0, -0.5, 0.0], [0.0, -0.5, 0.0], [1.0, -0.5, 0.0], [-1.0, 0.5, 0.0], [0.0, 0.5, 0.0], [1.0, 0.5, 0.0]]
self.vertices_np = np.array(self.vertices, dtype=np.float32)
self.edges = [[0, 3], [1, 4], [2, 5], [0, 1], [1, 2], [3, 4], [4, 5]]
self.polys = [[1, 4, 3, 0], [2, 5, 4, 1]]
self.matrix = Matrix.Translation((1.0, 0., 0.))
self.args_data = {
SvVerts: self.vertices_np,
SvEdges: self.edges,
SvPolys: self.polys,
List[SvVerts]: [self.vertices_np, self.vertices_np],
List[SvEdges]: [self.edges, self.edges],
List[SvPolys]: [self.polys, self.edges],
Matrix: self.matrix,
List[Matrix]: [self.matrix, self.matrix]
}
def function_parameters_generator(self, func) -> dict:
kwargs = {key: self.args_data[an] for key, an in func.__annotations__.items() if an in self.args_data}
for mask in product([False, True], repeat=len(kwargs)):
yield {key: data if m else None for (key, data), m in zip(kwargs.items(), mask)}
def test_empty_data(self):
"""Assume that all node functions should be able to handle cases when one or more of input parameters are None
it's quite fair since user may not add connections to a node"""
for func in self.node_funcs:
for kwargs in self.function_parameters_generator(func):
with self.subTest(msg=f"Function: {func.__name__}; Module: {func.__module__}, Arguments: {kwargs}"):
func(**kwargs)
# todo add passing vertices edges and polygons parameters
class ApplyMatrixNodeTest(SverchokTestCase):
def setUp(self):
self.vertices = [[-1.0, -0.5, 0.0], [0.0, -0.5, 0.0], [1.0, -0.5, 0.0], [-1.0, 0.5, 0.0], [0.0, 0.5, 0.0], [1.0, 0.5, 0.0]]
self.polygons = [[1, 4, 3, 0], [2, 5, 4, 1]]
self.matrices = [Matrix.Translation((0, 0, 0)), Matrix.Translation((1, 0, 0))]
def test_apply_matrices(self):
res_vertices = [(-1.0, -0.5, 0.0), (0.0, -0.5, 0.0), (1.0, -0.5, 0.0), (-1.0, 0.5, 0.0), (0.0, 0.5, 0.0), (1.0, 0.5, 0.0), (0.0, -0.5, 0.0), (1.0, -0.5, 0.0), (2.0, -0.5, 0.0), (0.0, 0.5, 0.0), (1.0, 0.5, 0.0), (2.0, 0.5, 0.0)]
res_polygons = [[1, 4, 3, 0], [2, 5, 4, 1], [7, 10, 9, 6], [8, 11, 10, 7]]
vertices, edges, polygons = apply_mat.apply_matrices(vertices=self.vertices, edges=None, polygons=self.polygons, matrices=self.matrices)
self.assert_sverchok_data_equal(vertices, res_vertices, 5)
self.assert_sverchok_data_equal(polygons, res_polygons)
# todo other functions?
if __name__ == '__main__':
import unittest
unittest.main(exit=False)
from typing import Tuple, List
from sverchok.utils.testing import SverchokTestCase
from sverchok.utils.vectorize import DataWalker, walk_data, vectorize
class VectorizeTest(SverchokTestCase):
def test_parameters_matching(self):
wa = DataWalker(1)
wb = DataWalker([1, 2, 3])
walker = walk_data([wa, wb], [[]])
self.assertEqual([v for v, _ in walker], [[1, 1], [1, 2], [1, 3]])
wa = DataWalker([[1, 2], 3])
wb = DataWalker([1, [2, 3], 4])
walker = walk_data([wa, wb], [[]])
self.assertEqual([v for v, _ in walker], [[1, 1], [2, 1], [3, 2], [3, 3], [3, 4]])
wa = DataWalker([[1, 2], [3, 4, 5]])
wb = DataWalker([1, [2, 3], 4])
walker = walk_data([wa, wb], [[]])
self.assertEqual([v for v, _ in walker], [[1, 1], [2, 1], [3, 2], [4, 3], [5, 3], [3, 4], [4, 4], [5, 4]])
wa = DataWalker(1)
wb = DataWalker([1, 2, 3])
out = []
walker = walk_data([wa, wb], [out])
[l[0].append((a, b)) for (a, b), l in walker]
self.assertEqual(out, [(1, 1), (1, 2), (1, 3)])
wa = DataWalker([[1, 2], 3])
wb = DataWalker([1, [2, 3], 4])
out = []
walker = walk_data([wa, wb], [out])
[l[0].append((a, b)) for (a, b), l in walker]
self.assertEqual(out, [[(1, 1), (2, 1)], [(3, 2), (3, 3)], (3, 4)])
wa = DataWalker([[1, 2], [3, 4, 5]])
wb = DataWalker([1, [2, 3], 4])
out = []
walker = walk_data([wa, wb], [out])
[l[0].append((a, b)) for (a, b), l in walker]
self.assertEqual(out, [[(1, 1), (2, 1)], [(3, 2), (4, 3), (5, 3)], [(3, 4), (4, 4), (5, 4)]])
def test_decorator(self):
def math(*, a: float, b: float, mode='SUM'):
if mode == 'SUM':
return a + b
elif mode == 'MUL':
return a * b
a_values = [[1, 2], [3, 4, 5]]
b_values = [1, [2, 3], 4]
math1 = vectorize(math, match_mode="REPEAT")
self.assertEqual(math1(a=a_values, b=b_values, mode='SUM'), [[2, 3], [5, 7, 8], [7, 8, 9]])
a_values = 10
b_values = [1, [2, 3], 4]
math2 = vectorize(math, match_mode="REPEAT")
self.assertEqual(math2(a=a_values, b=b_values, mode='SUM'), [11, [12, 13], 14])
def some_list_statistic(*, a: list, b: list) -> Tuple[list, list]:
a_grater_b = [_a > _b for _a, _b in zip(a, b)]
a_in_b = [_a in b for _a in a]
return a_grater_b, a_in_b
a_values = [[1, 3, 7, 3, 7, 1], [[1, 2], [3, 4]]]
b_values = [[5, 7, 3, 4, 6, 7], [[2, 3], [4, 5]]]
some_list_statistic1 = vectorize(some_list_statistic, match_mode='REPEAT')
a_grater_b, a_in_b = some_list_statistic1(a=a_values, b=b_values)
self.assertEqual(a_grater_b, [[False, False, True, False, True, False], [[False, False], [False, False]]])
self.assertEqual(a_in_b, [[False, True, True, True, True, False], [[False, True], [False, True]]])
def zeros(*, length: int) -> list:
return [0 for _ in range(length)]
lengths = [4, [[3], 1], 5]
zeros1 = vectorize(zeros, match_mode='REPEAT')
self.assertEqual(zeros1(length=lengths), [[0, 0, 0, 0], [[[0, 0, 0]], [0]], [0, 0, 0, 0, 0]])
def vector(*, length) -> List[int]:
return list(range(length))
vector1 = vectorize(vector, match_mode='REPEAT')
self.assertEqual(vector1(length=lengths), [[0, 1, 2, 3], [[[0, 1, 2]], [0]], [0, 1, 2, 3, 4]])
if __name__ == '__main__':
import unittest
unittest.main(exit=False)
from functools import wraps
from typing import List, Tuple
import numpy as np
from mathutils import Matrix
from sverchok.data_structure import levels_of_list_or_np
SvVerts = List[Tuple[float, float, float]]
SvEdges = List[Tuple[int, int]]
SvPolys = List[List[int]]
def vectorize(func=None, *, match_mode="REPEAT"):
"""
If there is function which takes some values
with this decorator it's possible to call the function by passing list of values of any shape
Take care of properly annotating of decorated function
Use Tuple[] in return annotation only if you want the decorator splits the return values into different lists
++ Example ++
from sverchok.utils import vectorize
def main_node_logic(*, prop_a: List[float], prop_b: Matrix, mode_a: str) -> Tuple[list, list]:
...
return data1, data2
class MyNode:
...
def process(self):
input_a = self.inputs[0].sv_get(default=None)
input_b = self.inputs[1].sv_get(default=None)
main_node_logic = vectorize(main_node_logic, match_mode=self.match_mode)
out1, out2 = main_node_logic(input_a, input_b, mode_a = self.mode_a)
self.outputs[0].sv_set(out1)
self.outputs[1].sv_set(out2)
"""
# this condition only works when used via "@" syntax
if func is None:
return lambda f: vectorize(f, match_mode=match_mode)
@wraps(func)
def wrap(*args, **kwargs):
# it's better not to use positional arguments for backward compatibility
# in this case a function can get new arguments
if args:
raise TypeError(f'Vectorized function {func.__name__} should not have positional arguments')
walkers = []
for key, data in zip(kwargs, kwargs.values()):
if data is None or data == []:
walkers.append(EmptyDataWalker(data, key))
else:
annotation = func.__annotations__.get(key)
nesting_level = _get_nesting_level(annotation) if annotation else 0
walkers.append(DataWalker(data, output_nesting=nesting_level, mode=match_mode, data_name=key))
# this is corner case, it can't be handled via walk data iterator
if all([w.what_is_next() == DataWalker.VALUE for w in walkers]):
return func(*args, **kwargs)
out_number = _get_output_number(func)
# handle case when return value of decorated function is simple one value
if out_number == 1:
out_list = []
for match_args, result in walk_data(walkers, [out_list]):
match_args, match_kwargs = match_args[:len(args)], match_args[len(args):]
match_kwargs = {n: d for n, d in zip(kwargs, match_kwargs)}
func_out = func(*match_args, **match_kwargs)
if not is_empty_out(func_out):
result[0].append(func_out)
return out_list
# the case when return value is tuple of multiple values
else:
out_lists = [[] for _ in range(out_number)]
for match_args, result in walk_data(walkers, out_lists):
match_args, match_kwargs = match_args[:len(args)], match_args[len(args):]
match_kwargs = {n: d for n, d in zip(kwargs, match_kwargs)}
func_out = func(*match_args, **match_kwargs)
[r.append(out) for r, out in zip(result, func_out) if not is_empty_out(out)]
return out_lists
def is_empty_out(value):
if value is None:
return True
try:
return not bool(len(value))
except TypeError:
return False
return wrap
def devectorize(func=None, *, match_mode="REPEAT"):
"""It takes list of values of arbitrary shape, flatten it
and call the decorated function once with flattened data
This needs for functions (nodes) which breaks vectorization"""
# this condition only works when used via "@" syntax
if func is None:
return lambda f: vectorize(f, match_mode=match_mode)
@wraps(func)
def wrap(*args, **kwargs):
# it's better not to use positional arguments for backward compatibility
# in this case a function can get new arguments
if args:
raise TypeError(f'Vectorized function {func.__name__} should not have positional arguments')
walkers = []
for key, data in zip(kwargs, kwargs.values()):
if data is None or data == []:
walkers.append(EmptyDataWalker(data, key))
else:
annotation = func.__annotations__.get(key)
nesting_level = _get_nesting_level(annotation) if annotation else 0
walkers.append(DataWalker(data, output_nesting=nesting_level - 1, mode=match_mode, data_name=key))
flat_data = {key: [] for key in kwargs}
for match_args, _ in walk_data(walkers, []):
match_args, match_kwargs = match_args[:len(args)], match_args[len(args):]
[container.append(data) for container, data in zip(flat_data.values(), match_kwargs)]
return func(**flat_data)
return wrap
def _get_nesting_level(annotation) -> int:
"""It measures how many nested types the annotation has
simple annotations like string, float have 0 level
list without arguments gives 1 level
List[list] such thing returns 2 level"""
if not hasattr(annotation, '__origin__'):
if annotation in [list, tuple]:
return 1
elif annotation in [float, int, bool, Matrix, str]:
return 0
elif annotation.__origin__ is list:
return 1 + _get_nesting_level(annotation.__args__[0])
elif annotation.__origin__ is tuple:
# not sure how this should act if arguments of the tuple have different level of nesting
return 1 + max([_get_nesting_level(arg) for arg in annotation.__args__])
raise NotImplementedError(f'Given annotation: {annotation} is not supported yet')
def _get_output_number(function):
"""Returns number of arguments returning by given function
the function should have returning annotation with Tuple value - Tuple[list, list]"""
annotation = function.__annotations__.get('return')
if annotation:
if hasattr(annotation, '__origin__') and annotation.__origin__ == tuple:
if hasattr(annotation, '__args__'):
return len(annotation.__args__)
return 1
def _what_is_next_catch(func):
"""It's exclusively for using in DataWalker class for optimization performance"""
@wraps(func)
def what_is_next_catcher(self):
next_val_id = id(self._stack[-1])
if next_val_id not in self._catch:
# this should not conflict with float, string, integer and other values
self._catch[next_val_id] = func(self)
return self._catch[next_val_id]
return what_is_next_catcher
class DataWalker:
"""This class allows walk over a list of arbitrary shape like over a tree data structure
Input data can be a value or list
the list can include values and / or other lists
the value itself can be just a number, list of numbers, list of list of numbers etc.
values should be consistent and should not include other values
for example inside list of vertices there should be other lists of vertices or any thing else
there is no way of handling such data structure efficiently"""
# match modes
SHORT, CYCLE, REPEAT, XREF, XREF2 = "SHORT", "CYCLE", "REPEAT", "XREF", "XREF2"
# node types
VALUE, END, SUB_TREE = "VALUE", "END", "SUB_TREE"
EXIT_VALUE = type('ExitValue', (), {'__repr__': lambda s: "<ExitValue>"})()
def __init__(self, data, output_nesting=0, mode=REPEAT, data_name=None):
self.match_mode = mode
self._stack = [data]
self._output_nesting = output_nesting
self._name = data_name
self._catch = dict() # for optimization
def step_down_matching(self, match_len, match_mode):
# todo protection from little nesting
if self.what_is_next() == DataWalker.SUB_TREE:
current_node = self._stack.pop()
elif self.what_is_next() == DataWalker.VALUE:
current_node = [self._stack.pop()]
else:
raise RuntimeError(f'Step down is impossible current position is: {self._stack[-1]}')
self._stack.append(DataWalker.EXIT_VALUE)
self._stack.extend(list(reversed(self._match_values(current_node, match_len, match_mode))))
def step_up(self):
if self.what_is_next() != DataWalker.END:
raise RuntimeError(f'There are still values to read: {self._stack}')
self._stack.pop()
def pop_next_value(self):
return self._stack.pop()
# this method is used most extensively
@_what_is_next_catch
def what_is_next(self):
if self._stack[-1] is DataWalker.EXIT_VALUE:
return DataWalker.END
if isinstance(self._stack[-1], (list, tuple, np.ndarray)):
nesting = levels_of_list_or_np(self._stack[-1])
else:
nesting = 0
if nesting == self._output_nesting:
return DataWalker.VALUE
else: # todo add the case when next element has too less nested levels
return DataWalker.SUB_TREE
@property
def next_values_number(self):
try:
if self.what_is_next() == DataWalker.VALUE:
return 1
last = self._stack[-1]
return len(last)
except (IndexError, TypeError):
return 0
@property
def is_exhausted(self):
return not bool(self._stack)
@staticmethod
def _match_values(data, match_len, match_mode):
if len(data) > match_len:
return data[:match_len]
elif len(data) == match_len:
return data
else:
if match_mode == DataWalker.REPEAT:
return data + [data[-1]] * (match_len - len(data)) # todo deepcopy ??
# todo add other modes
def __repr__(self):
return f"<DataWalker {self._name if self._name else 'data'}: {self._stack}>"
class EmptyDataWalker:
"""Use this (instead of DataWalker) if a channel does not has any data
It is needed not to overcomplicate logic of DataWalker"""
def __init__(self, data=None, data_name=None):
self._data = data
self._name = data_name
def step_down_matching(self, *_, **__):
pass
def step_up(self):
pass
def pop_next_value(self):
return self._data
def what_is_next(self):
return DataWalker.VALUE
@property
def next_values_number(self):
return 0
@property
def is_exhausted(self):
return True
def __repr__(self):
return f"<EmptyDataWalker {self._name if self._name else 'data'}: {self._data}>"
class ListTreeGenerator:
"""Generates tree from nested lists with step up/down interface"""
def __init__(self, root_list):
self.data = root_list
self._stack = [root_list]
def step_down(self):
new_node = []
self._stack.append(new_node)
def step_up(self):
last_node = self._stack.pop()
if last_node and self._stack:
current_node = self._stack[-1]
current_node.append(last_node)
@property
def current_list(self):
return self._stack[-1]
def __repr__(self):
return f'<TreeGen data: {self.data}>'
def walk_data(walkers: List[DataWalker], out_list: List[list]) -> Tuple[list, List[list]]:
"""It walks over data in given walkers in proper order
match data between each other if necessary
and gives output containers where to put result of handled data"""
match_mode = DataWalker.REPEAT # todo should be determined by modes of input walkers
result_data = [ListTreeGenerator(l) for l in out_list]
# first step is always step down because walkers create extra wrapping list (for the algorithm simplicity)
max_value_len = max(w.next_values_number for w in walkers)
[w.step_down_matching(max_value_len, match_mode) for w in walkers]
while any(not w.is_exhausted for w in walkers):
if all(w.what_is_next() == DataWalker.VALUE for w in walkers):
yield [w.pop_next_value() for w in walkers], [t.current_list for t in result_data]
elif any(w.what_is_next() == DataWalker.END for w in walkers):
[w.step_up() for w in walkers]
[t.step_up() for t in result_data]
elif any(w.what_is_next() == DataWalker.SUB_TREE for w in walkers):
max_value_len = max(w.next_values_number for w in walkers)
[w.step_down_matching(max_value_len, match_mode) for w in walkers]
[t.step_down() for t in result_data]
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать