텍스트 분류를 넘어: 시퀀스-투-시퀀스 학습
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
english, spanish = line.split("\t")
spanish = "[start] " + spanish + " [end]"
text_pairs.append((english, spanish))
import random
print(random.choice(text_pairs))
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]
num_train_samples = len(text_pairs) - 2 * num_val_samples
-> num_train_samples는 전체 데이터셋에서 검증 데이터셋과 테스트 데이터셋의 샘플 수를 제외한 나머지 샘플 수를 계산
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
영어와 스페인어 텍스트 쌍을 벡터화하기
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
string.punctuation은 string 모듈에 포함된 구두점 문자들을 나타내는 문자열입니다. 예를 들면 "!", ".", "," 등이 포함
"¿"는 스페인어 문장에서 사용되는 특수 문자 중 하나입니다.
strip_chars 변수에는 구두점 문자들과 "¿" 문자가 포함
strip_chars 문자열에서 "["와 "]"를 제거
후 최종적인 strip_chars가 생긴다.
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string) # 중요 ,
return tf.strings.regex_replace(
lowercase, f"[{re.escape(strip_chars)}]", "")
lowercase = tf.strings.lower(input_string)
-> 입력된 문자열을 소문자로 변환하는 역할
tf.strings.regex_replace(lowercase, f"[{re.escape(strip_chars)}]", "")
-> strip_chars에 해당하는 문자들을 제거하는 역할
re.escape
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
)
max_tokens 매개변수는 각각의 TextVectorization 레이어에서 사용할 최대 토큰 수를 지정
output_sequence_length=sequence_length + 1
-> target에서 , 이런이유는 start나 end와 같은 특수 토큰을 추가하기 때문이다.
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)
-> train_pairs에서 각 문장 쌍의 첫 번째 요소, 즉 영어 문장을 추출하여 리스트로 저장하는 과정
이걸 대신해서 쓸수 있는것 ,
-> english_texts, spanish_texts = zip(*text_pairs)
이 코드를 사용하면 한번에 할 수 있다.
번역 작업을 위한 데이터셋 준비하기
batch_size = 64
def format_dataset(eng, spa):
eng = source_vectorization(eng)
spa = target_vectorization(spa)
return ({
"english": eng,
"spanish": spa[:, :-1],
}, spa[:, 1:])
def make_dataset(pairs):
eng_texts, spa_texts = zip(*pairs)
eng_texts = list(eng_texts)
spa_texts = list(spa_texts)
dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
dataset = dataset.batch(batch_size)
dataset = dataset.map(format_dataset, num_parallel_calls=4)
return dataset.shuffle(2048).prefetch(16).cache()
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
[:, :-1]를 사용하여 디코더의 입력 시퀀스에서 마지막 토큰을 제외
spa[:, 1:] -> 첫번쨰 토큰을 제외한 시퀀스를 선택함으로써 디코더의 타깃 시퀀스
def make_dataset(pairs):
eng_texts, spa_texts = zip(*pairs)
eng_texts = list(eng_texts)
spa_texts = list(spa_texts)
dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
dataset = dataset.batch(batch_size)
dataset = dataset.map(format_dataset, num_parallel_calls=4)
return dataset.shuffle(2048).prefetch(16).cache()
list(eng_texts)
-> 이는 데이터셋 생성을 위해 텐서플로우 데이터셋에 사용하기 위한 단계
tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
-> 텐서플로우 데이터셋의 텐서 슬라이스로 변환
dataset.batch(batch_size)
-> 데이터셋을 배치 단위로 나눕니다. batch_size는 배치의 크기를 결정하는 파라미터입니다. 배치 단위로 데이터를 처리하는 것은 학습과정에서 효율적인 연산을 수행하기 위한 방법
dataset.prefetch(16)
-> 데이터 전처리와 모델 학습/평가 과정에서 데이터 로딩에 소요되는 대기 시간을 줄일 수 있다.
dataset.cache()
-> 데이터셋을 캐싱합니다. 이는 데이터셋을 메모리나 로컬 저장소에 저장하여 데이터 로딩 속도를 향상시키는 역할
for inputs, targets in train_ds.take(1):
print(f"inputs['english'].shape: {inputs['english'].shape}")
print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
print(f"targets.shape: {targets.shape}")
첫 번째 배치에는 64개의 데이터 포인트가 있습니다.
"english" 특성은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.
"spanish" 특성은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.
타깃은 각 데이터 포인트마다 길이가 100인 시퀀스로 표현되었습니다.
RNN을 사용한 시퀀스-투-시퀀스 모델
from tensorflow import keras
from tensorflow.keras import layers
embed_dim = 256
latent_dim = 1024
source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
layers.GRU(latent_dim), merge_mode="sum")(x)
-> 양방향 GRU(Gated Recurrent Unit) 층을 사용하여 입력 시퀀스를 인코딩합니다
-> GRU 셀은 순환 신경망에서 이전 시점의 은닉 상태와 현재 시점의 입력을 기반으로 계산
-> Bidirectional 래퍼는 입력 시퀀스를 양방향으로 처리하고,
-> merge_mode="sum"은 양방향 처리의 결과를 더하는 방식으로 합칩니다
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
-> return_sequences=True 설정은 GRU 층이 시퀀스를 출력
x = decoder_gru(x, initial_state=encoded_source)
-> 디코더의 GRU 층을 사용하여 이전 타겟 시퀀스를 디코딩
-> 인코딩된 소스 시퀀스 encoded_source를 초기 은닉 상태로 사용
seq2seq_rnn = keras.Model([source, past_target], target_next_step)
-> [source, past_target]: 모델의 입력을 나타내는 리스트입니다. 첫 번째 원소는 "english" 특성인 source이고, 두 번째 원소는 이전 타겟 시퀀스인 past_target입니다
-> 이 모델은 입력으로 source와 past_target을 받아 다음 단어를 예측하는 작업을 수행
target_next_step
-> source와 past_target에 기반하여 다음 단어를 예측
입력으로 [source, past_target]
출력으로 target_next_step
seq2seq_rnn.compile(
optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"]) #중요하다 -> BLEU_scores를 사용해야한다 -> 번역할때
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)
RNN 인코더와 디코더로 새로운 문장 번역하기
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab)) #튜플이 나온다.
max_decoded_sentence_length = 20
target_vectorization.get_vocabulary()를 사용하여 스페인어 어휘 사전을 가져옵니다
range(len(spa_vocab))를 사용하여 어휘 사전의 인덱스 범위를 생성
zip() 함수를 사용하여 인덱스와 어휘 사전의 단어를 짝지어서 튜플로 묶습니다. 예를 들어, (0, 'word_1'), (1, 'word_2'), (2, 'word_3')과 같은 형태로 매핑
dict() 함수를 사용하여 튜플의 리스트를 딕셔너리로 변환
max_decoded_sentence_length는 디코딩된 문장의 최대 길이
decode_sequence
def decode_sequence(input_sentence):
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization([decoded_sentence])
next_token_predictions = seq2seq_rnn.predict(
[tokenized_input_sentence, tokenized_target_sentence])
sampled_token_index = np.argmax(next_token_predictions[0, i, :])
sampled_token = spa_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence
test_eng_texts = [pair[0] for pair in test_pairs]
-> test_eng_texts = [pair[0] for pair in test_pairs]
영어 문장만 추출하여 리스트에 저장
트랜스 포머를 사용한 시퀀스 투 시퀀스 모델
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
-attention layer가 두개이다.
첫번쨰는 self-attention , 두번쨰는 인코더와 디코더 간의 attention을 수행한다.
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32)], axis=0)
return tf.tile(mask, mult
inputs.shape를 이용해서 batch_size와 sequence_length를 추출한다.
i = tf.range(sequence_length)[:, tf.newaxis]
-> 생성된 벡터의 모든 원소에 대해 차원을 추가
mask = tf.cast(i >= j, dtype="int32")
-> i>=j를 만족하는 원소 1 그렇지 않으면 0으로 이루어진 마스크 행렬을 생성
이 마스크는 현재 위치 이후의 원소에 대해서만 정보를 전달 , 이전 원소들은 가린다.
이를 통해 인과적 어텐션 구현
-tf.reshape(mask, (1, input_shape[1], input_shape[1]))는 mask 텐서의 모양을 (1, input_shape[1], input_shape[1]) 로 변환시키는 작업
tf.expand_dims(batch_size, -1) -> batch 텐서에 차원을 추가하여 열벡터로 전환
tf.constant([1, 1], dtype=tf.int32) -> 1,1을 가지는 텐서를 생성
tf.concat 이걸 통해서 결합된다 -> (3,1)모양의 mult생성
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(
mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=causal_mask)
attention_output_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=attention_output_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
attention_output_2 = self.layernorm_2(
attention_output_1 + attention_output_2)
proj_output = self.dense_proj(attention_output_2)
return self.layernorm_3(attention_output_2 + proj_output)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs
-> 입력과 자기 어텐션을 수행한다.
자기 어텐션 결과와 원래 입력을 더한 후, layernorm_1을 적용합니다.
인코더 출력(encoder_outputs)과 어텐션을 수행합니다. attention_2를 사용하여 자기 어텐션 결과의 쿼리(query)로 인코더 출력의 값(value)과 키(key)에 대한 어텐션을 계산
자기 어텐션 결과와 인코더 어텐션 결과를 더한 후, layernorm_2를 적용합니다
proj_output = self.dense_proj(attention_output_2) 을 통해 어텐션 결과를 변환
변환된 결과와 어텐션 결과를 더한 후, layernorm_3을 적용
PositionalEmbedding 층
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=input_dim, output_dim=output_dim)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=output_dim)
self.sequence_length = sequence_length
self.input_dim = input_dim
self.output_dim = output_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
tf.math.not_equal 함수를 사용하여 입력 텐서 inputs와 0을 비교하여 불일치하는 위치를 찾습니다
값이 0인 위치에는 False, 값이 0이 아닌 위치에는 True가 있는 마스크
-> 이를 통해 모델에서 0이 아닌 요소들에 대한 유효한 마스킹을 수행
엔드-투-엔드 트랜스포머
-> ransformer는 입력으로 encoder_inputs와 decoder_inputs를 받고, 출력으로 decoder_outputs를 반환
트랜스포머 모델을 사용해 새로운 문장 번역하기
tokenized_target_sentence = target_vectorization(
[decoded_sentence])[:, :-1]
-Rnn과 차이는 마지막것을 넣지 않는다는 것이다.