Distributed Training - Horovod (4)

steadycode·2022년 11월 24일
0

이전 포스팅에서 horovod class 중 하나인 _DistributedOptimizer() 를 실행할 때, 추가적으로 _register_hook() 을 실행한다는 것을 파악하고 분석을 진행했다. 너무 글이 길어져 추가 분석을 본 포스팅에서 진행한다.


def _register_hooks()

def _register_hooks(self):
        if self._groups is not None:
            p_list = []
            # Get list of parameters with grads
            for param_group in self.param_groups:
                for p in param_group['params']:
                    if p.requires_grad:
                        p_list.append(p)

            # To ensure parameter order and group formation is consistent, broadcast p_list order
            # from rank 0 and use for every worker
            p_list_names = [self._parameter_names.get(p) for p in p_list]
            p_list_names = broadcast_object(p_list_names, root_rank=0, process_set=self.process_set)
            p_list = sorted(p_list, key=lambda p: p_list_names.index(self._parameter_names.get(p)))

            # Form groups
            if isinstance(self._groups, list):
                p_groups = []
                grouped_id = set()
                p_list_ids = [id(p) for p in p_list]
                for group in self._groups:
                    p_groups.append([p for p in group if id(p) in p_list_ids])
                    for p in p_groups[-1]:
                        grouped_id.add(id(p))
                for p in p_list:
                    if id(p) not in grouped_id:
                        p_groups.append([p])
            else:
                p_groups = split_list(p_list, self._groups)

            p_groups = [tuple(p) for p in p_groups]
            for group in p_groups:
                for p in group:
                    self._p_to_group[p] = group
                self._group_counts[group] = 0

        for param_group in self.param_groups:
            for p in param_group['params']:
                if p.requires_grad:
                    self._requires_update.add(p)
                    p_tmp = p.expand_as(p)
                    grad_acc = p_tmp.grad_fn.next_functions[0][0]
                    grad_acc.register_hook(self._make_hook(p))
                    self._grad_accs.append(grad_acc)

위 과정을 쉽게 요약하면 다음과 같다.

  • p_list를 초기화
  • 파라미터중 gradient 계산이 되는 파라미터를 p_list에 append
  • p_list_names에 p_list에 존재하는 파라미터의 이름을 넣는다.
  • p_list_names 에 broadcast_object 함수의 리턴값을 넣는다.
  • p_list를 sorting 한 값을 p_list에 넣는다.

다음과 같은 궁금증이 생겼으며, 첫번째 궁금증을 해결했다.

  • self._requires_update 은 어떤 역할을 할까
  • expand_as 는 어떤 함수일까
  • grad_fn 은 무엇일까
  • grad_acc 은 무엇일까
  • self.param_groups 에서 굳이 'param' 으로 parameter 값을 저장하는 이유가 무엇일까
  • _make_hook() 은 무엇일까

expand_as

Pytorch 문서 상으로는 다음과 같이 설명되어있다.

Expand this tensor to the same size as other. self.expand_as(other) is equivalent to self.expand(other.size()).

아마 동일한 차원을 사용했으니, 동일한 tensor 를 return하지 싶다.

import torch

x = torch.randn(4, 4, requires_grad=True)
x_tmp = x.expand_as(x)
x_same = x

print(id(x), x)
print(id(x_tmp), x_tmp)
print(id(x_sname), x_same)
140455830053648 tensor([[ 0.0576,  1.8933,  1.0352, -1.1749],
        [ 0.5789, -0.4150, -2.0399, -0.3701],
        [-0.3757,  0.3018, -0.4324, -0.1325],
        [ 0.2515, -0.1606, -0.2203,  0.6161]], requires_grad=True)
140455829732304 tensor([[ 0.0576,  1.8933,  1.0352, -1.1749],
        [ 0.5789, -0.4150, -2.0399, -0.3701],
        [-0.3757,  0.3018, -0.4324, -0.1325],
        [ 0.2515, -0.1606, -0.2203,  0.6161]], grad_fn=<ExpandBackward0>)
140455830053648 tensor([[ 0.0576,  1.8933,  1.0352, -1.1749],
        [ 0.5789, -0.4150, -2.0399, -0.3701],
        [-0.3757,  0.3018, -0.4324, -0.1325],
        [ 0.2515, -0.1606, -0.2203,  0.6161]], requires_grad=True)

왜 그런 것인지는 모르겠으나, expand_as 된 텐서는 grad_fn 을 출력한다. 그리고 x 값을 그대로 넣은 텐서는 메모리 위치가 동일한 것을 확인할 수 있다. 다음의 이유로 활용하는 것 같다.

  • 다른 메모리 공간할당
  • grad_fn 초기화

grad_fn and grad_acc

궁금해서 다음의 코드로 출력을 해보았다.

import torch

x = torch.randn(4, 4, requires_grad=True)
x_tmp = x.expand_as(x)

print(type(x.grad_fn), x.grad_fn)
print(type(x_tmp.grad_fn), x_tmp.grad_fn)

x_grad_acc = x.grad_fn.next_functions[0][0]
tmp_grad_acc = x_tmp.grad_fn.next_functions[0][0]

print(x_grad_acc)
print(tmp_grad_acc
<class 'NoneType'> None
<class 'ExpandBackward0'> <ExpandBackward0 object at 0x7f56e9d58d30>
AttributeError: 'NoneType' object has no attribute 'next_functions'

에러가 나는 x_grad_acc 만 주석처리하고 출력해봤다.

<class 'NoneType'> None
<class 'ExpandBackward0'> <ExpandBackward0 object at 0x7f5c52e54d30>
<AccumulateGrad object at 0x7f5c52d54a30>

grad_accAccumulateGrad를 나타내는 객체인 것 같다. 한편, grad_fn은 gradient function을 가리키는 객체인 것 같다.

analysis of grad_fn

심층 분석을 위해 다음의 자료를 참고했다. 참고 1, 참고 2

참고 1

x = torch.randn(4, 4, requires_grad=True)
y = torch.randn(4, 4, requires_grad=True)
z = x * y
l = z.sum()
print("FP: \n")
print("X: \n",x, x.grad_fn)
print("Y: \n",y, y.grad_fn)
print("Z: \n",z, z.grad_fn)
print("L: \n",l, l.grad_fn)

dl = torch.tensor(1.)
back_sum = l.grad_fn
dz = back_sum(dl)
back_mul = back_sum.next_functions[0][0]
dx, dy = back_mul(dz)
back_x = back_mul.next_functions[0][0]
back_x(dx)
back_y = back_mul.next_functions[1][0]
back_y(dy)
FP: 

X: 
 tensor([[ 1.0804,  0.2147,  0.5602, -0.1632],
        [-0.8500, -1.5312, -1.4656,  0.4574],
        [-1.6296, -2.2056,  0.7183, -0.4765],
        [ 0.1588,  0.8230,  0.2368,  0.4921]], requires_grad=True) None
Y: 
 tensor([[-1.4711e+00, -1.3592e+00,  2.3243e-01,  1.2495e-01],
        [-1.2825e+00, -1.0158e+00,  9.2749e-01,  4.7613e-01],
        [-1.3274e+00, -4.5900e-02,  1.9388e-01,  2.3195e-01],
        [-1.9465e-01,  1.8027e-03, -6.9851e-01,  1.8149e+00]],
       requires_grad=True) None
Z: 
 tensor([[-1.5893e+00, -2.9182e-01,  1.3020e-01, -2.0395e-02],
        [ 1.0901e+00,  1.5553e+00, -1.3594e+00,  2.1777e-01],
        [ 2.1632e+00,  1.0124e-01,  1.3927e-01, -1.1053e-01],
        [-3.0919e-02,  1.4836e-03, -1.6540e-01,  8.9309e-01]],
       grad_fn=<MulBackward0>) <MulBackward0 object at 0x7f4c274b9fd0>
L: 
 tensor(2.7240, grad_fn=<SumBackward0>) <SumBackward0 object at 0x7f4c274b9fd0>

특징을 살펴보면 다음과 같다.

  • 연산이 진행되지 않은 텐서는 grad_fnNone 값을 가지고 있음.
  • 연산이 포함되어 나온 텐서는 grad_fn 이 생성됨
    - Z: mul 연산
    - L: sum 연산

Backprop 이후의 결과를 출력해보았다.

print("BP\n")
print(x, "\n", x.grad)
print(y, "\n", y.grad)
print(z, "\n", z.grad)
print(l, "\n", l.grad)
BP

tensor([[ 0.5812, -0.4268, -0.8694, -1.7837],
        [-0.1613, -0.5435, -0.3585, -0.0232],
        [ 0.2461, -1.1176,  1.8555, -0.7077],
        [-0.1033, -0.0230, -1.6810,  0.5903]], requires_grad=True) 
 tensor([[ 1.6810e+00,  7.3783e-01,  2.6994e-01,  1.2975e+00],
        [ 2.0548e-03, -1.2514e+00,  9.3488e-02, -1.8246e+00],
        [-2.2385e+00,  4.0009e-01,  8.1906e-01, -1.2315e+00],
        [ 9.2964e-01, -1.4634e+00,  1.5285e+00, -6.5143e-01]],
       grad_fn=<CopyBackwards>)
tensor([[ 1.6810e+00,  7.3783e-01,  2.6994e-01,  1.2975e+00],
        [ 2.0548e-03, -1.2514e+00,  9.3488e-02, -1.8246e+00],
        [-2.2385e+00,  4.0009e-01,  8.1906e-01, -1.2315e+00],
        [ 9.2964e-01, -1.4634e+00,  1.5285e+00, -6.5143e-01]],
       requires_grad=True) 
 tensor([[ 0.5812, -0.4268, -0.8694, -1.7837],
        [-0.1613, -0.5435, -0.3585, -0.0232],
        [ 0.2461, -1.1176,  1.8555, -0.7077],
        [-0.1033, -0.0230, -1.6810,  0.5903]], grad_fn=<CopyBackwards>)
tensor([[ 9.7707e-01, -3.1493e-01, -2.3467e-01, -2.3144e+00],
        [-3.3151e-04,  6.8017e-01, -3.3515e-02,  4.2267e-02],
        [-5.5088e-01, -4.4712e-01,  1.5197e+00,  8.7150e-01],
        [-9.6054e-02,  3.3667e-02, -2.5693e+00, -3.8454e-01]],
       grad_fn=<MulBackward0>) 
 None
tensor(-2.8213, grad_fn=<SumBackward0>) 
 None

특징을 요약하면 다음과 같다.

  • gradient는 연산이 끝나면 사라짐.
  • 연산결과가 아닌 텐서는 그 자체로는 grad_fn을 가지고 있지 않음
  • 그러나 grad에 grad_fn이 존재
  • 이러한 grad_fntorch/csrc/autograd 에 함수 형태로 저장

참고 2

하나하나 분석해보자

Autograd is reverse automatic differentiation system. Conceptually, autograd records a graph recording all of the operations that created the data as you execute operations, giving you a directed acyclic graph whose leaves are the input tensors and roots are the output tensors. By tracing this graph from roots to leaves, you can automatically compute the gradients using the chain rule.

  • autograd 는 모든 operations 를 추적하며, leaves 는 input tensor 그리고 root 는 output 으로 이루어진 directed acyclic graph를 제공함
  • root (output) 부터 leaves (input) 를 추적하여 chain rule 을 사용한 gradient 연산을 자동화함.

Internally, autograd represents this graph as a graph of Function objects (really expressions), which can be apply() ed to compute the result of evaluating the graph. When computing the forwards pass, autograd simultaneously performs the requested computations and builds up a graph representing the function that computes the gradient (the .grad_fn attribute of each torch.Tensor is an entry point into this graph). When the forwards pass is completed, we evaluate this graph in the backwards pass to compute the gradients.

  • 내부적으로 autograd 는 function object의 그래로 표현함.
  • forward pass 를 진행하면, autograd 는 요청된 연산을 수행하고 gradient를 연산하는 graph를 생성함.
  • 만약 모든 forward pass 가 수행되면 backward pass 로 이루어진 graph를 활용하여 gradient 계산.

analysis of grad_acc (accumulated grad)

다음의 참고 자료를 사용했다. 참고3

AccumulateGrad

위 참고자료에 따르면 AccumulatedGrad는 backprop 연산의 종료 조건이다. AccumulateGrad가 나오면 backprop을 종료한다고 한다.


_make_hook()

그렇다면 이제 make_hook() 을 알아보자. _register_hook()에서는 다음과 같이 make_hook()을 사용한다.

grad_acc = p_tmp.grad_fn.next_functions[0][0]
grad_acc.register_hook(self._make_hook(p))

그렇다면 여기서, register_hook() 의 역할은 무엇일까. 다음과 같다.

The hook will be called every time a gradient with respect to the Tensor is computed. The hook should have the following signature:

보통은 tensor에 hook을 등록하는데, 위 함수에서는 grad_acc 에 직접 거는 것 같다. 일단 함수에 걸어도 된다고 이해하고 넘어가자.

def _make_hook(self, p):
        def hook(*ignore):
            if p in self._handles and self._handles[p][0] is not None:
                if self._allreduce_delay[p] <= 0:
                    raise AssertionError(
                        "Gradients were computed more than "
                        "backward_passes_per_step times before call "
                        "to step(). Increase backward_passes_per_step to "
                        "accumulate gradients locally.")
            assert not p.grad.requires_grad
            assert self._allreduce_delay[p] > 0
            handle, ctx = None, None
            self._allreduce_delay[p] -= 1
            if self._allreduce_delay[p] == 0:
                if self._groups is not None:
                    group = self._p_to_group[p]
                    self._group_counts[group] += 1
                    if self._group_counts[group] == len(group):
                        handle, ctxs = self._grouped_allreduce_grad_async(group)
                        self._handles[group] = (handle, ctxs)
                        # Remove any None entries from previous no-op hook calls
                        for gp in group:
                            self._handles.pop(gp, None)
                        self._group_counts[group] = 0
                        return
                else:
                    handle, ctx = self._allreduce_grad_async(p)
            self._handles[p] = (handle, ctx)
        return hook

간단하게 요약하면 다음과 같다.

  • self._handles 그리고 self._handles[p][0]가 None이 아니면 gradient가 이미 계산되었다는 의미이다.
  • 위 두 값을 None으로 초기화해놓고 self._allreduce_grad_async(p) 를 진행한다.
  • 결과값을 self._handles[p]tuple형태로 저장한다.

self._handles

위 값은 __init__() 에 dict 형태로 정의되어있다.

self._handles = {}

2차원으로 정의를 할 수 있는건 tuple 형태이기 때문이다.

x = torch.randn(4, 4, requires_grad=True)

handles = {}
handles[x] = ("handles","ctx")
print(type(handles[x]))
print(handles[x][1])
print(handles[x][0])
<class 'tuple'>
ctx
handles

_allreduce_grad_async()

그렇다면 hook()이 호출되었을 때 실행되는 self._allreduce_grad_async()을 살펴보자

def _allreduce_grad_async(self, p):
        if p.grad is None:
            # Gradient was not computed, but we still need to submit a tensor to allreduce
            # as one of the other ranks may have computed it (due to dynamic forward functions).
            #
            # NOTE: this will not work if the gradient is sparse and we perform an allgather.
            # Unfrotunately, there doesn't appear to be a good way to detect that the parameter will
            # produce sparse gradients before computing the gradient.
            p.grad = p.data.new(p.size()).zero_()

        name = self._parameter_names.get(p)
        tensor = p.grad

        if p.grad.is_sparse:
            if self.sparse_as_dense:
                tensor = tensor.to_dense()
            else:
                return self._sparse_allreduce_grad_async(p, name)

        tensor_compressed, ctx = self._compression.compress(tensor)

        if self.op == Average:
            # Split average operation across pre/postscale factors
            # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average.
            prescale_factor = 1.0 / self.gradient_predivide_factor
            postscale_factor = self.gradient_predivide_factor
        else:
            prescale_factor = 1.0
            postscale_factor = 1.0

        handle = allreduce_async_(tensor_compressed, name=name, op=self.op,
                                  prescale_factor=prescale_factor,
                                  postscale_factor=postscale_factor,
                                  process_set=self.process_set)
        return handle, ctx

간단히 요약하면 다음과 같다.

  • p.grad가 None 이더라도 0을 채워서 보냄
  • handleallreduce_async_()를 넣음
  • ctx 는 compression 값

horovod/torch/mpi_ops/allreduce_async

위 디렉토리에 함수가 정의되어 있다. 잡다한 것은 제외하고 핵심만 보자.

def allreduce_async_(tensor, average=None, name=None, op=None,
                     prescale_factor=1.0, postscale_factor=1.0,
                     process_set=global_process_set):
    """
    A function that performs asynchronous in-place averaging or summation of the input
    tensor over all the Horovod processes.

    The reduction operation is keyed by the name. If name is not provided, an incremented
    auto-generated name is used. The tensor type and shape must be the same on all
    Horovod processes for a given name. The reduction will not start until all processes
    are ready to send and receive the tensor.

    Arguments:
        tensor: A tensor to reduce.
        average:
            .. warning:: .. deprecated:: 0.19.0

                Use `op` instead. Will be removed in v1.0.

        name: A name of the reduction operation.
        op: The reduction operation to combine tensors across different ranks. Defaults to
            Average if None is given.
        prescale_factor: Multiplicative factor to scale tensor before allreduce.
        postscale_factor: Multiplicative factor to scale tensor after allreduce.
        process_set: Process set object to limit this operation to a subset of
                     Horovod processes. Default is the global process set.

    Returns:
        A handle to the allreduce operation that can be used with `poll()` or
        `synchronize()`.
    """
    op = handle_average_backwards_compatibility(op, average)
    return _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor, process_set)
def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set: ProcessSet):
   	
    ...
    
    function = _check_function(_allreduce_function_factory, tensor)
    try:
        handle = getattr(mpi_lib, function)(tensor, output, divisor,
                                            name.encode() if name is not None else _NULL, op,
                                            prescale_factor, postscale_factor, process_set.process_set_id)
    except RuntimeError as e:
        raise HorovodInternalError(e)
    _handle_map[handle] = (tensor, output)
    return handle
def _allreduce_function_factory(tensor):
    return 'horovod_torch_allreduce_async_' + tensor.type().replace('.', '_')
def _check_function(function_factory, tensor):
    function = function_factory(tensor)
    if not hasattr(mpi_lib, function):
        raise ValueError('Tensor type %s is not supported.' % tensor.type())
    if not tensor.is_contiguous():
        raise ValueError('Tensor is required to be contiguous.')
    return function

간단하게 요약하면 다음과 같다.

  • getattr(mpi_lib, function)으로 mpi_lib에서 함수를 가져옴.
  • functionhorovod_torch_allreduce.. 으로 정의됨
  • _check_function 에서 함수 유효성 검사

conclusion

다음엔 step() 함수를 포스팅하겠다.

profile
steadycode

0개의 댓글