Deeplearning - chap 11-4

심준보·2023년 6월 17일
0
post-thumbnail

텍스트 분류를 넘어: 시퀀스-투-시퀀스 학습

기계번역 예제

text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split("\t")
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))
  • open(text_file): 파일을 연다
  • f.read()를 사용하여 파일의 내용을 읽는다
  • split("\n")을 사용하여 읽은 내용을 줄 바꿈 기준으로 나눕니다.
  • [:-1]을 사용하여 마지막 빈 줄을 제외한 모든 줄을 선택
  • text_pairs라는 빈 리스트를 생성합니다. 이 리스트는 각 줄에서 추출한 영어-스페인어 문장 쌍을 저장할 것
  • english, spanish = line.split("\t")
    line.split("\t")을 사용하여 탭 문자(\t)를 기준으로 영어 문장과 스페인어 문장을 분리
  • spanish = "[start] " + spanish + " [end]"을 사용하여 스페인어 문장 앞뒤에 "[start]"와 "[end]"를 추가
    -text_pairs.append((english, spanish))
    -> text_pairs에 넣어준다.
import random
print(random.choice(text_pairs))
  • random.choice(text_pairs)
    -> 무작위로 하나의 요소를 선택한다. -> 영어 , 스페인어 문장쌍으로 튜플 형태로 반환
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]
  • num_train_samples = len(text_pairs) - 2 * num_val_samples
    -> num_train_samples는 전체 데이터셋에서 검증 데이터셋과 테스트 데이터셋의 샘플 수를 제외한 나머지 샘플 수를 계산

  • val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]

영어와 스페인어 텍스트 쌍을 벡터화하기

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
  • string.punctuation은 string 모듈에 포함된 구두점 문자들을 나타내는 문자열입니다. 예를 들면 "!", ".", "," 등이 포함

  • "¿"는 스페인어 문장에서 사용되는 특수 문자 중 하나입니다.

  • strip_chars 변수에는 구두점 문자들과 "¿" 문자가 포함

  • strip_chars 문자열에서 "["와 "]"를 제거

후 최종적인 strip_chars가 생긴다.

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)  # 중요 , 
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")   
  • lowercase = tf.strings.lower(input_string)
    -> 입력된 문자열을 소문자로 변환하는 역할

  • tf.strings.regex_replace(lowercase, f"[{re.escape(strip_chars)}]", "")
    -> strip_chars에 해당하는 문자들을 제거하는 역할

  • re.escape

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1, 
    standardize=custom_standardization,
)
  • max_tokens 매개변수는 각각의 TextVectorization 레이어에서 사용할 최대 토큰 수를 지정

  • output_sequence_length=sequence_length + 1
    -> target에서 , 이런이유는 start나 end와 같은 특수 토큰을 추가하기 때문이다.

train_english_texts = [pair[0] for pair in train_pairs] 
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)  
  • train_english_texts = [pair[0] for pair in train_pairs]

-> train_pairs에서 각 문장 쌍의 첫 번째 요소, 즉 영어 문장을 추출하여 리스트로 저장하는 과정

이걸 대신해서 쓸수 있는것 ,

-> english_texts, spanish_texts = zip(*text_pairs)

이 코드를 사용하면 한번에 할 수 있다.

번역 작업을 위한 데이터셋 준비하기

batch_size = 64

def format_dataset(eng, spa):  
    eng = source_vectorization(eng)  
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1],  
    }, spa[:, 1:])  

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)  
  • [:, :-1]를 사용하여 디코더의 입력 시퀀스에서 마지막 토큰을 제외

  • spa[:, 1:] -> 첫번쨰 토큰을 제외한 시퀀스를 선택함으로써 디코더의 타깃 시퀀스

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()
  • list(eng_texts)
    -> 이는 데이터셋 생성을 위해 텐서플로우 데이터셋에 사용하기 위한 단계

  • tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    -> 텐서플로우 데이터셋의 텐서 슬라이스로 변환

  • dataset.batch(batch_size)
    -> 데이터셋을 배치 단위로 나눕니다. batch_size는 배치의 크기를 결정하는 파라미터입니다. 배치 단위로 데이터를 처리하는 것은 학습과정에서 효율적인 연산을 수행하기 위한 방법

  • dataset.prefetch(16)
    -> 데이터 전처리와 모델 학습/평가 과정에서 데이터 로딩에 소요되는 대기 시간을 줄일 수 있다.

  • dataset.cache()
    -> 데이터셋을 캐싱합니다. 이는 데이터셋을 메모리나 로컬 저장소에 저장하여 데이터 로딩 속도를 향상시키는 역할

for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
    print(f"targets.shape: {targets.shape}")

첫 번째 배치에는 64개의 데이터 포인트가 있습니다.
"english" 특성은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.
"spanish" 특성은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.
타깃은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.

RNN을 사용한 시퀀스-투-시퀀스 모델

  • Encoder
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english")  
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)  
encoded_source = layers.Bidirectional(  
    layers.GRU(latent_dim), merge_mode="sum")(x)
    
  • encoded_source = layers.Bidirectional(layers.GRU(latent_dim), merge_mode="sum")(x)

-> 양방향 GRU(Gated Recurrent Unit) 층을 사용하여 입력 시퀀스를 인코딩합니다

-> GRU 셀은 순환 신경망에서 이전 시점의 은닉 상태와 현재 시점의 입력을 기반으로 계산
-> Bidirectional 래퍼는 입력 시퀀스를 양방향으로 처리하고,
-> merge_mode="sum"은 양방향 처리의 결과를 더하는 방식으로 합칩니다

GRU 기반 디코더와 엔드-투-엔드 모델

past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")  
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)  
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x) 
seq2seq_rnn = keras.Model([source, past_target], target_next_step)
  • decoder_gru = layers.GRU(latent_dim, return_sequences=True)
    -> return_sequences=True 설정은 GRU 층이 시퀀스를 출력

  • x = decoder_gru(x, initial_state=encoded_source)
    -> 디코더의 GRU 층을 사용하여 이전 타겟 시퀀스를 디코딩
    -> 인코딩된 소스 시퀀스 encoded_source를 초기 은닉 상태로 사용

  • seq2seq_rnn = keras.Model([source, past_target], target_next_step)

-> [source, past_target]: 모델의 입력을 나타내는 리스트입니다. 첫 번째 원소는 "english" 특성인 source이고, 두 번째 원소는 이전 타겟 시퀀스인 past_target입니다

-> 이 모델은 입력으로 source와 past_target을 받아 다음 단어를 예측하는 작업을 수행

  • target_next_step
    -> source와 past_target에 기반하여 다음 단어를 예측

  • 입력으로 [source, past_target]

  • 출력으로 target_next_step

seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])  #중요하다 -> BLEU_scores를 사용해야한다 -> 번역할때
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)
  • loss : sparse_categorical_crossentropy 를 사용한다.

RNN 인코더와 디코더로 새로운 문장 번역하기

import numpy as np
spa_vocab = target_vectorization.get_vocabulary()  
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))  #튜플이 나온다. 
max_decoded_sentence_length = 20  
  • target_vectorization.get_vocabulary()를 사용하여 스페인어 어휘 사전을 가져옵니다

  • range(len(spa_vocab))를 사용하여 어휘 사전의 인덱스 범위를 생성

  • zip() 함수를 사용하여 인덱스와 어휘 사전의 단어를 짝지어서 튜플로 묶습니다. 예를 들어, (0, 'word_1'), (1, 'word_2'), (2, 'word_3')과 같은 형태로 매핑

  • dict() 함수를 사용하여 튜플의 리스트를 딕셔너리로 변환

  • max_decoded_sentence_length는 디코딩된 문장의 최대 길이

decode_sequence

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence]) 
    decoded_sentence = "[start]"  
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict(  
            [tokenized_input_sentence, tokenized_target_sentence])  
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index] 
        decoded_sentence += " " + sampled_token  
        if sampled_token == "[end]":  
            break
    return decoded_sentence   
  • target_vectorization
    -> 토큰화된 문장은 tokenized_target_sentence에 저장
  • sampled_token_index = np.argmax(next_token_predictions[0, i, :])
    -> 이 확률 분포에서 가장 큰 값의 인덱스를 선택
  • sampled_token = spa_index_lookup[sampled_token_index]
test_eng_texts = [pair[0] for pair in test_pairs]  

-> test_eng_texts = [pair[0] for pair in test_pairs]
영어 문장만 추출하여 리스트에 저장

트랜스 포머를 사용한 시퀀스 투 시퀀스 모델

트랜스 포머 디코더

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(   
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()  
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

-attention layer가 두개이다.

첫번쨰는 self-attention , 두번쨰는 인코더와 디코더 간의 attention을 수행한다.

  • self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()
    self.layernorm_3 = layers.LayerNormalization()
    -> 각각의 레이어에서의 레이어 정규화를 수행
    이는 , 그래디언트 소실과 폭주 문제르 완화

def get_causal_attention_mask(self, inputs): 
        input_shape = tf.shape(inputs) 
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")  
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult
  • inputs.shape를 이용해서 batch_size와 sequence_length를 추출한다.

  • i = tf.range(sequence_length)[:, tf.newaxis]
    -> 생성된 벡터의 모든 원소에 대해 차원을 추가

  • mask = tf.cast(i >= j, dtype="int32")
    -> i>=j를 만족하는 원소 1 그렇지 않으면 0으로 이루어진 마스크 행렬을 생성

이 마스크는 현재 위치 이후의 원소에 대해서만 정보를 전달 , 이전 원소들은 가린다.
이를 통해 인과적 어텐션 구현

-tf.reshape(mask, (1, input_shape[1], input_shape[1]))는 mask 텐서의 모양을 (1, input_shape[1], input_shape[1]) 로 변환시키는 작업

  • mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0)

tf.expand_dims(batch_size, -1) -> batch 텐서에 차원을 추가하여 열벡터로 전환
tf.constant([1, 1], dtype=tf.int32) -> 1,1을 가지는 텐서를 생성

tf.concat 이걸 통해서 결합된다 -> (3,1)모양의 mult생성

def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(   
            query=attention_output_1,
            value=encoder_outputs,  
            key=encoder_outputs,  
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output) 
  • attention_output_1 = self.attention_1(
    query=inputs,
    value=inputs,
    key=inputs
    -> 입력과 자기 어텐션을 수행한다.

  • 자기 어텐션 결과와 원래 입력을 더한 후, layernorm_1을 적용합니다.

  • 인코더 출력(encoder_outputs)과 어텐션을 수행합니다. attention_2를 사용하여 자기 어텐션 결과의 쿼리(query)로 인코더 출력의 값(value)과 키(key)에 대한 어텐션을 계산

  • 자기 어텐션 결과와 인코더 어텐션 결과를 더한 후, layernorm_2를 적용합니다

  • proj_output = self.dense_proj(attention_output_2) 을 통해 어텐션 결과를 변환

  • 변환된 결과와 어텐션 결과를 더한 후, layernorm_3을 적용

PositionalEmbedding 층

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim
  • token_embeddings : 입력 토큰에 대한 임베딩을 담당하는 Embedding 레이어
  • position_embeddings : 이 레이어는 시퀀스 길이(sequence_length)의 크기를 가지며, 출력 차원(output_dim)의 임베딩 벡터로 위치를 매핑
    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions
  • length = tf.shape(inputs)[-1]
    -> 입력 시퀀스의 길이를 inputs의 마지막 차원으로부터 추출한다.
  • positions = tf.range(start=0, limit=length, delta=1)
    -> 0부터 4까지 길이가 1씩 커지는
  • embedded_tokens = self.token_embeddings(inputs)
  • embedded_positions = self.position_embeddings(positions)
  • embedded_tokens + embedded_positions: 토큰 임베딩과 위치 임베딩을 더하여 최종 임베딩 벡터를 얻음
 def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
  • tf.math.not_equal 함수를 사용하여 입력 텐서 inputs와 0을 비교하여 불일치하는 위치를 찾습니다

  • 값이 0인 위치에는 False, 값이 0이 아닌 위치에는 True가 있는 마스크

-> 이를 통해 모델에서 0이 아닌 요소들에 대한 유효한 마스킹을 수행

엔드-투-엔드 트랜스포머

  • transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

-> ransformer는 입력으로 encoder_inputs와 decoder_inputs를 받고, 출력으로 decoder_outputs를 반환

트랜스포머 모델을 사용해 새로운 문장 번역하기

tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1] 

-Rnn과 차이는 마지막것을 넣지 않는다는 것이다.

profile
밑거름이라고생각합니다

0개의 댓글