Dataset.map

Gamchan Kang·2024년 10월 2일
1

ML/DL

목록 보기
10/13
post-thumbnail

허깅페이스에 제공하는 여러가지 라이브러리 중 datasets은 허깅페이스에 있는 수 많은 데이터셋을 쉽게 사용할 수 있는 강력한 도구이다.

Dataset vs DatasetDict

허깅페이스 라이브러리를 사용하면서 두 클래스의 차이점을 경시하고 사용했다. Dataset.map을 조사하면서 두 클래스의 차이를 비로소 알게 됐다.

load_dataset() 함수로 데이터셋을 호출하면 구성에 따라 둘 중 하나의 객체로 생성된다.

Dataset 클래스

단일 데이터셋을 다루기 위한 클래스이다. 각 example은 같은 구조(column 혹은 필드)이며 행과 열로 구성된다. 마치 DataFrame 과 유사하다.

DatasetDict 클래스

데이터셋과 스플릿이라는 용어는 종종 혼용된다. 허깅페이스 문서에서는 데이터셋은 train, validation, test에 활용되는 데이터 전체를 의미하고, 스플릿은 각 작업에 사용되는 데이터 일부를 의미한다. 즉 스플릿은 데이터셋의 부분집합인 셈이다.

DatasetDict은 여러 개의 스플릿을 동시에 다루기 위한 클래스이다. 스플릿 이름을 키로, Dataset을 값으로 갖는 구조이다. 여기까지만 보면 파이썬 딕셔너리를 상속받는 클래스인가 싶지만, pop(), setdefault(), update() 등의 메소드 함수는 지원하지 않는다. 그 대신 select, flatten 등 학습에 필요한 기능을 지원하는 클래스이다.

참고한 허깅페이스 문서

map()

처리 과정 속도를 높이기 위한 함수이다. 개별 데이터 혹은 배치에 해당되는 example에 처리 함수(processing function)을 적용한다. 새로운 행, 열 생성이 가능하다.

  • remove_columns 파라미터로 열 제거가 가능하다.
updated_dataset = dataset.map(lambda example: {"new_sentence": example["sentence1"]}, remove_columns=["sentence1"])
updated_dataset.column_names
['sentence2', 'label', 'idx', 'new_sentence']

💡 데이터를 복사하지 않는 더 빠른 remove_columns() 함수가 있다.

  • with_indice=True 옵션으로 인덱스 추가가 가능하다.
updated_dataset = dataset.map(lambda example, idx: {"sentence2": f"{idx}: " + example["sentence2"]}, with_indices=True)
updated_dataset["sentence2"][:5]
updated_dataset["sentence2"][:5]
['0: Referring to him as only " the witness " , Amrozi accused his brother of deliberately distorting his evidence .',
 "1: Yucaipa bought Dominick 's in 1995 for $ 693 million and sold it to Safeway for $ 1.8 billion in 1998 .",
 "2: On June 10 , the ship 's owners had published an advertisement on the Internet , offering the explosives for sale .",
 '3: Tab shares jumped 20 cents , or 4.6 % , to set a record closing high at A $ 4.57 .',
 '4: PG & E Corp. shares jumped $ 1.63 or 8 percent to $ 21.03 on the New York Stock Exchange on Friday .'
]

멀티프로세싱

  • num_proc 파라미터로 병렬 처리할 코어 수를 설정할 수 있다.
updated_dataset = dataset.map(lambda example, idx: {"sentence2": f"{idx}: " + example["sentence2"]}, with_indices=True, num_proc=4)
  • with_rank=True로 프로세스 랭크(ID)를 같이 전달할 수 있다. 프로세스 rank에 따라 서로 다른 데이터 처리 가능하다.
import torch
from multiprocess import set_start_method
from transformers import AutoTokenizer, AutoModelForCausalLM 
from datasets import load_dataset

# Get an example dataset
dataset = load_dataset("fka/awesome-chatgpt-prompts", split="train")

# Get an example model and its tokenizer 
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen1.5-0.5B-Chat").eval()
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen1.5-0.5B-Chat")
def gpu_computation(batch, rank):
    # Move the model on the right GPU if it's not there already
    device = f"cuda:{(rank or 0) % torch.cuda.device_count()}"
    model.to(device)
    
    # Your big GPU call goes here, for example:
    chats = [[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ] for prompt in batch["prompt"]]
    texts = [tokenizer.apply_chat_template(
        chat,
        tokenize=False,
        add_generation_prompt=True
    ) for chat in chats]
    model_inputs = tokenizer(texts, padding=True, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model.generate(**model_inputs, max_new_tokens=512)
    batch["output"] = tokenizer.batch_decode(outputs, skip_special_tokens=True)
    return batch
if __name__ == "__main__":
    set_start_method("spawn")
    updated_dataset = dataset.map(
        gpu_computation,
        batched=True,
        batch_size=16,
        with_rank=True,
        num_proc=torch.cuda.device_count(),  # one process per GPU
    )

Batch processing

  • batched=True 옵션으로 배치 처리가 가능하다.
  • batch_size 파라미터로 배치 크기 설정할 수 있다. 기본 값으로 1000을 갖는다.
def chunk_examples(examples):
    chunks = []
    for sentence in examples["sentence1"]:
        chunks += [sentence[i:i + 50] for i in range(0, len(sentence), 50)]
    return {"chunks": chunks}
    
chunked_dataset = dataset.map(chunk_examples, batched=True, remove_columns=dataset.column_names)
chunked_dataset[:10]
{'chunks': ['Amrozi accused his brother , whom he called " the ',
            'witness " , of deliberately distorting his evidenc',
            'e .',
            "Yucaipa owned Dominick 's before selling the chain",
            ' to Safeway in 1998 for $ 2.5 billion .',
            'They had published an advertisement on the Interne',
            't on June 10 , offering the cargo for sale , he ad',
            'ded .',
            'Around 0335 GMT , Tab shares were up 19 cents , or',
            ' 4.4 % , at A $ 4.56 , having earlier set a record']}

데이터 증강

RoBERTA를 활용한 마스킹 코드 예시이다.

from random import randint
from transformers import pipeline

fillmask = pipeline("fill-mask", model="roberta-base")
mask_token = fillmask.tokenizer.mask_token
smaller_dataset = dataset.filter(lambda e, i: i<100, with_indices=True)

def augment_data(examples):
    outputs = []
    for sentence in examples["sentence1"]:
        words = sentence.split(' ')
        K = randint(1, len(words)-1)
        masked_sentence = " ".join(words[:K]  + [mask_token] + words[K+1:])
        predictions = fillmask(masked_sentence)
        augmented_sequences = [predictions[i]["sequence"] for i in range(3)]
        outputs += [sentence] + augmented_sequences
    return {"data": outputs}
    
augmented_dataset = smaller_dataset.map(augment_data, batched=True, remove_columns=dataset.column_names, batch_size=8)
augmented_dataset[:9]["data"]
['Amrozi accused his brother , whom he called " the witness " , of deliberately distorting his evidence .',
 'Amrozi accused his brother, whom he called " the witness ", of deliberately withholding his evidence.',
 'Amrozi accused his brother, whom he called " the witness ", of deliberately suppressing his evidence.',
 'Amrozi accused his brother, whom he called " the witness ", of deliberately destroying his evidence.',
 "Yucaipa owned Dominick 's before selling the chain to Safeway in 1998 for $ 2.5 billion .",
 'Yucaipa owned Dominick Stores before selling the chain to Safeway in 1998 for $ 2.5 billion.',
 "Yucaipa owned Dominick's before selling the chain to Safeway in 1998 for $ 2.5 billion.",
 'Yucaipa owned Dominick Pizza before selling the chain to Safeway in 1998 for $ 2.5 billion.'
]

multiple splits

여러 스플릿에 함수 매핑이 가능하다.

from datasets import load_dataset

dataset = load_dataset('glue', 'mrpc')
encoded_dataset = dataset.map(lambda examples: tokenizer(examples["sentence1"]), batched=True)
encoded_dataset["train"][0]
{'sentence1': 'Amrozi accused his brother , whom he called " the witness " , of deliberately distorting his evidence .',
'sentence2': 'Referring to him as only " the witness " , Amrozi accused his brother of deliberately distorting his evidence .',
'label': 1,
'idx': 0,
'input_ids': [  101,  7277,  2180,  5303,  4806,  1117,  1711,   117,  2292, 1119,  1270,   107,  1103,  7737,   107,   117,  1104,  9938, 4267, 12223, 21811,  1117,  2554,   119,   102],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
}

분산 처리(Distributed Processing)

💡 분산처리란?
여러 프로세스가 동시에 협력해서 작업을 처리하는 방식. 여러 프로세스(혹은 노드)가 작업을 나눠서 병렬로 처리한다. 처리 속도를 높이고, 더 큰 데이터를 다룰 수 있다.

💡 torch.distributed 모듈
파이토치에서 제공하는 분산처리 지원 모듈.

  • rank: 각 프로세스에 부여되는 고유 번호. 분산 환경에서 프로세스를 구분하는 역할(ID)
  • local_rank: 각 노드 내에서 GPU나 프로세스를 구분할 때 사용되는 rank. 여러 GPU에서 병렬 처리를 하는 경우 각 GPU에 고유한 local_rank가 부여됨
  • torch.distributed.barrier(): 프로세스 간 동기화 함수. 각 프로세스의 실행 시간이 다를 수 있다. 중복 작업이 발생할 수 있다는 뜻이다. 이를 방지하기 위해 모든 프로세스가 특정 시점에서 서로를 기다렸다가 동시에 다음 단계로 넘어가도록 보장한다.
from datasets import Dataset
import torch.distributed

dataset1 = Dataset.from_dict({"a": [0, 1, 2]})

# 초기에 동기화 여부를 모르므로 확실히 하기 위해 동기화를 한다.
if training_args.local_rank > 0:
    print("Waiting for main process to perform the mapping")
    torch.distributed.barrier()

dataset2 = dataset1.map(lambda x: {"a": x["a"] + 1})

if training_args.local_rank == 0:
    print("Loading results from main process")
    torch.distributed.barrier()
profile
Someday, the dream will come true

0개의 댓글