python chunk implementation

Hyunchul·2021년 8월 31일
0

긴 시간 동안 실시간으로 어떤 데이터를 로깅하려면, 적절하게 끊어서 저장해줘야 한다. 실시간으로 파일을 쓰는 것은 많은 리소스를 필요로 하고, 한번에 모아서 저장하는 것은 데이터 전체를 잃을 가능성이 크다. 그래서 Chunk라는 클래스를 만들어서, 일정 갯수 이상 데이터가 쌓이면 자동으로 저장되게 구현했다. 저장하는 포맷은 대용량 데이터 처리에 용이한 hdf5이다. 데이터를 추가하는 코드가 다른 프로세스와 실시간으로 상호작용할 때는, 저장이 오래 걸리면, 이후 프로세스를 지연시킬 수 있다. 그래서 저장은 multiprocessing을 이용해서 구현했다(save_mp()).

import numpy as np
import h5py, time, datetime
import multiprocessing as mp


class Chunk():

    def __init__(self, size:tuple, path:str, dset_name:str, mode='mp'):
        self.size = size
        self.path = path
        self.dset_name = dset_name
        self.data = np.full(self.size, np.nan)
        self.row_idx = 0
        self.mode = mode
        self.processes = []

    def push(self, row:np.array):
        self.data[self.row_idx] = row
        self.row_idx += 1
        if self.row_idx >= self.size[0]:
            if self.mode == 'mp':
                self.save_mp()
            else:
                self.save() 

            self.data = np.full(self.size, np.nan)
            self.row_idx = 0

    def save(self):
        self._save(self.data, self.path, self.dset_name)
        with h5py.File(self.path, 'a')  as f:
            data = self.data[:self.row_idx]
            if self.dset_name in f:
                dset = f[self.dset_name]
                shape = [dset.shape[i] for i in range(len(dset.shape))]
                appending_idx = shape[0]
                shape[0] += data.shape[0]
                dset.resize(shape)
                dset[appending_idx:] = data
            else:
                shape = list(self.data.shape)
                shape[0] = None
                shape = tuple(shape)
                dset = f.create_dataset(self.dset_name, data=self.data, compression='gzip', maxshape=shape)

    def save_mp(self):
        # print(f"save_mp \t {datetime.datetime.now()}")
        p = mp.Process(target=Chunk._save, args=(self.data, self.path, self.dset_name,))
        p.start()
        self.processes.append(p)

    def wait_mp(self):
        for p in self.processes:
            p.join()

    @staticmethod
    def _save(data, path, dset_name):
        with h5py.File(path, 'a')  as f:
            _data_shape = list(data.shape)
            _data_shape[0] = -1
            data = data[~np.isnan(data)].reshape(_data_shape)
            if dset_name in f:
                dset = f[dset_name]
                shape = [dset.shape[i] for i in range(len(dset.shape))]
                appending_idx = shape[0]
                shape[0] += data.shape[0]
                dset.resize(shape)
                dset[appending_idx:] = data
            else:
                shape = list(data.shape)
                shape[0] = None
                shape = tuple(shape)
                dset = f.create_dataset(dset_name, data=data, compression='gzip', maxshape=shape)

0개의 댓글