编写自定义数组容器

NumPy 的分派机制(在numpy版本v1.16中引入)是编写与numpy API兼容并提供numpy功能的自定义实现的自定义N维数组容器的推荐方法。 应用包括 dask 数组(分布在多个节点上的N维数组) 和 cupy 数组(GPU上的N维数组)。

为了获得编写自定义数组容器的感觉,我们将从一个简单的示例开始,该示例具有相当狭窄的实用程序,但说明了所涉及的概念。

  1. >>> import numpy as np
  2. >>> class DiagonalArray:
  3. ... def __init__(self, N, value):
  4. ... self._N = N
  5. ... self._i = value
  6. ... def __repr__(self):
  7. ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
  8. ... def __array__(self):
  9. ... return self._i * np.eye(self._N)
  10. ...

我们的自定义数组可以实例化,如下所示:

  1. >>> arr = DiagonalArray(5, 1)
  2. >>> arr
  3. DiagonalArray(N=5, value=1)

我们可以使用 numpy.arraynumpy.asarray, 转换为numpy数组,这将调用它的 __array__ 方法来获得标准 numpy.ndarray

  1. >>> np.asarray(arr)
  2. array([[1., 0., 0., 0., 0.],
  3. [0., 1., 0., 0., 0.],
  4. [0., 0., 1., 0., 0.],
  5. [0., 0., 0., 1., 0.],
  6. [0., 0., 0., 0., 1.]])

如果我们使用 numpy 函数对 arr 进行操作,numpy 将再次使用 __array__接口将其转换为数组,然后以通常的方式应用该函数。

  1. >>> np.multiply(arr, 2)
  2. array([[2., 0., 0., 0., 0.],
  3. [0., 2., 0., 0., 0.],
  4. [0., 0., 2., 0., 0.],
  5. [0., 0., 0., 2., 0.],
  6. [0., 0., 0., 0., 2.]])

注意,返回类型是标准 numpy.ndarray

  1. >>> type(arr)
  2. numpy.ndarray

我们如何通过此函数传递我们的自定义数组类型?Numpy允许类指示它希望通过交互 __array_ufunc____array_function__ 以自定义方式处理计算。 让我们一次拿一个,从 __array_ufunc__ 开始。 此方法涵盖 Universal functions (ufunc), 这是一类函数,包括例如 numpy.multiplynumpy.sin

_array_ufunc_ 获得:

  • ufunc, 一个类似 numpy.multiply 的函数
  • method,一个字符串,区分 numpy.multiply(...)。 以及numpy.multiy.outernumpy.multiy.accumate等变体。对于常见情况,numpy.multiply(...)method='__call__'
  • inputs, 可能是不同类型的混合
  • kwargs, 传递给函数的关键字参数

对于这个例子,我们将只处理方法 '__call__

  1. >>> from numbers import Number
  2. >>> class DiagonalArray:
  3. ... def __init__(self, N, value):
  4. ... self._N = N
  5. ... self._i = value
  6. ... def __repr__(self):
  7. ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
  8. ... def __array__(self):
  9. ... return self._i * np.eye(self._N)
  10. ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
  11. ... if method == '__call__':
  12. ... N = None
  13. ... scalars = []
  14. ... for input in inputs:
  15. ... if isinstance(input, Number):
  16. ... scalars.append(input)
  17. ... elif isinstance(input, self.__class__):
  18. ... scalars.append(input._i)
  19. ... if N is not None:
  20. ... if N != self._N:
  21. ... raise TypeError("inconsistent sizes")
  22. ... else:
  23. ... N = self._N
  24. ... else:
  25. ... return NotImplemented
  26. ... return self.__class__(N, ufunc(*scalars, **kwargs))
  27. ... else:
  28. ... return NotImplemented
  29. ...

现在让我们的自定义数组类型通过numpy的函数。

  1. >>> arr = DiagonalArray(5, 1)
  2. >>> np.multiply(arr, 3)
  3. DiagonalArray(N=5, value=3)
  4. >>> np.add(arr, 3)
  5. DiagonalArray(N=5, value=4)
  6. >>> np.sin(arr)
  7. DiagonalArray(N=5, value=0.8414709848078965)

此时 arr + 3 不起作用。

  1. >>> arr + 3
  2. TypeError: unsupported operand type(s) for *: 'DiagonalArray' and 'int'

为了支持它,我们需要定义Python接口 __add____lt__ 等,以便调度到相应的ufunc。 我们可以通过继承mixin NDArrayOperatorsMixin 来方便地实现这一点。

  1. >>> import numpy.lib.mixins
  2. >>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin):
  3. ... def __init__(self, N, value):
  4. ... self._N = N
  5. ... self._i = value
  6. ... def __repr__(self):
  7. ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
  8. ... def __array__(self):
  9. ... return self._i * np.eye(self._N)
  10. ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
  11. ... if method == '__call__':
  12. ... N = None
  13. ... scalars = []
  14. ... for input in inputs:
  15. ... if isinstance(input, Number):
  16. ... scalars.append(input)
  17. ... elif isinstance(input, self.__class__):
  18. ... scalars.append(input._i)
  19. ... if N is not None:
  20. ... if N != self._N:
  21. ... raise TypeError("inconsistent sizes")
  22. ... else:
  23. ... N = self._N
  24. ... else:
  25. ... return NotImplemented
  26. ... return self.__class__(N, ufunc(*scalars, **kwargs))
  27. ... else:
  28. ... return NotImplemented
  29. ...
  1. >>> arr = DiagonalArray(5, 1)
  2. >>> arr + 3
  3. DiagonalArray(N=5, value=4)
  4. >>> arr > 0
  5. DiagonalArray(N=5, value=True)

现在让我们来解决 __array_function__。 我们将创建将 numpy 函数映射到我们的自定义变体的 dict。

  1. >>> HANDLED_FUNCTIONS = {}
  2. >>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin):
  3. ... def __init__(self, N, value):
  4. ... self._N = N
  5. ... self._i = value
  6. ... def __repr__(self):
  7. ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
  8. ... def __array__(self):
  9. ... return self._i * np.eye(self._N)
  10. ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
  11. ... if method == '__call__':
  12. ... N = None
  13. ... scalars = []
  14. ... for input in inputs:
  15. ... # In this case we accept only scalar numbers or DiagonalArrays.
  16. ... if isinstance(input, Number):
  17. ... scalars.append(input)
  18. ... elif isinstance(input, self.__class__):
  19. ... scalars.append(input._i)
  20. ... if N is not None:
  21. ... if N != self._N:
  22. ... raise TypeError("inconsistent sizes")
  23. ... else:
  24. ... N = self._N
  25. ... else:
  26. ... return NotImplemented
  27. ... return self.__class__(N, ufunc(*scalars, **kwargs))
  28. ... else:
  29. ... return NotImplemented
  30. ... def __array_function__(self, func, types, args, kwargs):
  31. ... if func not in HANDLED_FUNCTIONS:
  32. ... return NotImplemented
  33. ... # Note: this allows subclasses that don't override
  34. ... # __array_function__ to handle DiagonalArray objects.
  35. ... if not all(issubclass(t, self.__class__) for t in types):
  36. ... return NotImplemented
  37. ... return HANDLED_FUNCTIONS[func](*args, **kwargs)
  38. ...

一个便捷的模式是定义一个可用于向 HANDLED_FUNCTIONS 添加函数的装饰器 实现

  1. >>> def implements(np_function):
  2. ... "Register an __array_function__ implementation for DiagonalArray objects."
  3. ... def decorator(func):
  4. ... HANDLED_FUNCTIONS[np_function] = func
  5. ... return func
  6. ... return decorator
  7. ...

现在我们为 DiagonalArray 编写numpy函数的实现。 为了完整性,为了支持使用 arr.sum(), 添加一个调用 numpy.sum(self) 的方法 sum,对于 mean 来说也是一样的。

  1. >>> @implements(np.sum)
  2. ... def sum(a):
  3. ... "Implementation of np.sum for DiagonalArray objects"
  4. ... return arr._i * arr._N
  5. ...
  6. >>> @implements(np.mean)
  7. ... def sum(a):
  8. ... "Implementation of np.mean for DiagonalArray objects"
  9. ... return arr._i / arr._N
  10. ...
  11. >>> arr = DiagonalArray(5, 1)
  12. >>> np.sum(arr)
  13. 5
  14. >>> np.mean(arr)
  15. 0.2

如果用户尝试使用 HANDLED_FUNCTIONS 中未包含的任何numpy函数, 则numpy将引发 TypeError,表示不支持此操作。 例如,连接两个 DiagonalArrays 不会产生另一个对角线数组,因此不支持它。

  1. >>> np.concatenate([arr, arr])
  2. TypeError: no implementation found for 'numpy.concatenate' on types that implement __array_function__: [<class '__main__.DiagonalArray'>]

另外,我们的 summean 实现不接受numpy实现的可选参数。

  1. >>> np.sum(arr, axis=0)
  2. TypeError: sum() got an unexpected keyword argument 'axis'

用户总是可以选择使用 numpy.asarray 转换为普通的 numpy.asarray,并使用标准的numpy。

  1. >>> np.concatenate([np.asarray(arr), np.asarray(arr)])
  2. array([[1., 0., 0., 0., 0.],
  3. [0., 1., 0., 0., 0.],
  4. [0., 0., 1., 0., 0.],
  5. [0., 0., 0., 1., 0.],
  6. [0., 0., 0., 0., 1.],
  7. [1., 0., 0., 0., 0.],
  8. [0., 1., 0., 0., 0.],
  9. [0., 0., 1., 0., 0.],
  10. [0., 0., 0., 1., 0.],
  11. [0., 0., 0., 0., 1.]])

有关自定义数组容器的更完整工作示例,请参阅dask源代码cupy源代码

另外可以看一下 NEP 18