Seq2Seq 이론
out, put = LSTM(x) 예시
- Many to One
- out[ : , -1, : ]
- Many to Many
- out[ : , : , : ]
- Many to Many
- 모두 순차 입력 > Context vector > 모두 순차 출력
- Context vector
- 입력을 모두 받은 뒤에 처음으로 나가는 하나의 벡터
- 메모리 기능을 한다.
- hidden(short term), cell(long term) > Context vector > out[ : , : , : ]
- 주의할 점은 LSTM과 같이 hidden과 cell 2개가 나오는 경우에 out[ : , -1, : ] 을 사용할 수 없다는 것이다.
- LSTM은 일반적으로 입력인자를 2개 받는다. 디폴트는 (토큰, 0)
데이터 예시
- 한글 입력 데이터:
[나는], [학생], [이다], [<eos>]
<eos> = 끝신호 토큰, recurrent 과정에서 이게 나오면 끝. - 영어 입력 데이터(학습용):
[<sos>], [I], [am], [a], [student]
학습할 때만 사용한다. 첫 단어가 'I'라는 것을 알려준다.
teacher forcing: 강제적 교사 - 영어 정답 데이터:
[I], [am], [a], [student], [<eos>]
인코더와 디코더
- 인코더: 기존 LSTM, many to one과 동일
- 디코더: 모두 순차출력
- 첫번째 순차:
- in: <sos> + ContextVector
- out1: h1, c1
- out2: h1 > fc(완전연결계층) > 예측값
- 두번째 순차:
- in:
- 학습시: 첫번째 정답 예측값을 전달해준다.
(첫번째부터 오답을 하면, 다음 시퀀스의 학습이 의미 없기 때문) - 검증시: out2(예측값)
- RNN은 seq가 길어지면, 단기기억력(기울기 소실) 이슈가 존재한다.
Seq2Seq는 Many to one 과 Many to Many 이기 때문에 Many to one의 ContextVector가 잘못 나올 가능성이 높다. - 즉, RNN에서 LSTM으로 바뀌었다고 해도 기울기 소실 문제를 완전히 해결하지 못한 것처럼, Seq2Seq도 같은 문제를 갖고 있다. 그래서 이를 보완하고자 Attention을 사용한다.
Seq2Seq 실습
1. 토큰화 (아래 코드에서는 입력에 따로 <eos>를 넣지 않았다.)
# 원문(한국어) 토큰화
def preprocess_src(text):
tokens = tokenizer.tokenize(text)
return tokens
# 번역문(영어) 입력, 출력 토큰화
def preprocess_trg_in(text):
text = ['<sos>'] + tokenizer.tokenize((text))
return text
def preprocess_trg_out(text):
text = tokenizer.tokenize((text)) + ['<eos>']
return text
2. 단어집합
def build_vocab(sents):
word_list = []
for sent in sents:
for word in sent:
word_list.append(word)
# 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
# Counter는 단어를 키(key), 등장 빈도를 값(value)으로 가지도록 반환
word_counts = Counter(word_list)
vocab = sorted(word_counts, key=word_counts.get, reverse=True)
word_to_index = {}
word_to_index['<PAD>'] = 0
word_to_index['<UNK>'] = 1
# 등장 빈도가 높은 단어일수록 낮은 정수를 부여
for index, word in enumerate(vocab) :
if word != '<PAD>':
word_to_index[word] = index + 2
return word_to_index
word to index (src, tar)
src_vocab = build_vocab(src_input)
tar_vocab = build_vocab(trg_input + trg_output)
index to word (src, tar)
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}
3. Seq 숫자 정규화
# 원문의 문장별 토큰 길이에 대한 집계함수 출력
new_df['src_in_len'].describe()
# src_in_len이 23보다 큰 행 제거
new_df = new_df[new_df['src_in_len'] <= 23].reset_index(drop=True)
4. 패딩처리
# 원문과 번역문 최대 길이 확인
src_length= int(new_df['src_in_len'].max())
trg_length = int(new_df['trg_in_len'].max())
src_length, trg_length
원문 패딩
# 원문 패딩 및 인코딩
def src_encoding(tokens):
# (src_length - 토큰 수) 만큼 패딩토큰 추가
tokens = tokens + ['<PAD>'] * (src_length - len(tokens))
# 인코딩 된 숫자를 담아 둘 리스트
index_sequences = []
# 문장에서 토큰을 꺼내오며
for word in tokens:
try: # 토큰 인코딩 (단어 사전에 없는 토큰이 들어오면 except로)
index_sequences.append(src_vocab[word])
except KeyError: # 단어 사전에 없는 토큰이 들어오면 '<UNK>' 토큰의 숫자로 변환
index_sequences.append(src_vocab['<UNK>'])
return index_sequences
번역문 (input: <sos>, , )
# 번역문 패딩 및 인코딩
def trg_encoding(tokens):
tokens = tokens + ['<PAD>'] * (trg_length - len(tokens))
index_sequences = []
for word in tokens:
try:
index_sequences.append(tar_vocab[word])
except KeyError:
index_sequences.append(tar_vocab['<UNK>'])
return index_sequences
5. 데이터셋 준비
from sklearn.model_selection import train_test_split
# 데이터프레임에서 무작위로 80%를 훈련 데이터로, 20%를 검증 데이터로 분할
train_df, valid_df = train_test_split(new_df, test_size=0.2, random_state=42)
# 결과 확인
print(f"훈련 데이터 크기: {len(train_df)}")
print(f"검증 데이터 크기: {len(valid_df)}")
텐서화(TranslationDataset), 데이터로더(DataLoader)
# 하이퍼파라미터 설정
BATCH_SIZE = 256
# 훈련 데이터셋 및 데이터로더
train_dataset = TranslationDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# 검증 데이터셋 및 데이터로더
valid_dataset = TranslationDataset(valid_df)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)
6. 인코더 모델 정의
import torch
import torch.nn as nn
import torch.optim as optim
embedding_dim = 256
hidden_units = 512
class Encoder(nn.Module):
def __init__(self, src_vocab_size, embedding_dim, hidden_units):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_units, num_layers=1, batch_first=True)
def forward(self, x):
# 입력 데이터 차원 : (batch_size, seq_len, embedding_dim)
x = self.embedding(x)
# hidden.shape : (1, batch_size, hidden_units), cell.shape : (1, batch_size, hidden_units)
_, (hidden, cell) = self.lstm(x)
# 인코더의 출력은 hidden state, cell state
return hidden, cell
7. 디코더 모델 정의
class Decoder(nn.Module):
def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_units, num_layers=1, batch_first=True)
self.fc = nn.Linear(hidden_units, tar_vocab_size)
def forward(self, x, hidden, cell):
# x.shape : (batch_size, seq_len, embedding_dim)
x = self.embedding(x)
# 디코더의 LSTM으로 인코더의 hidden state, cell state를 전달.
# output.shape : (batch_size, seq_len, hidden_units)
# hidden.shape : (1, batch_size, hidden_units)
# cell.shape : (1, batch_size, hidden_units)
output, (hidden, cell) = self.lstm(x, (hidden, cell))
# output.shape: (batch_size, seq_len, tar_vocab_size)
output = self.fc(output)
# 디코더의 출력은 예측값, hidden state, cell state
return output, hidden, cell
8. Seq2Seq = 인코더와 디코더 결합
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg):
hidden, cell = self.encoder(src)
# 훈련 중에는 디코더의 출력 중 오직 output만 사용한다.
output, _, _ = self.decoder(trg, hidden, cell)
return output
encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)
# ignore_index는 손실 계산에서 특정 인덱스를 무시하도록 설정하는 파라미터
loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters())
Seq2Seq(
(encoder): Encoder(
(embedding): Embedding(3488, 256, padding_idx=0)
(lstm): LSTM(256, 512, batch_first=True)
)
(decoder): Decoder(
(embedding): Embedding(11971, 256, padding_idx=0)
(lstm): LSTM(256, 512, batch_first=True)
(fc): Linear(in_features=512, out_features=11971, bias=True)
)
)
9. 훈련과정에서 검증함수
def evaluation(model, dataloader, loss_function, device):
model.eval()
total_loss = 0.0
total_correct = 0
total_count = 0
with torch.no_grad():
for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
encoder_inputs = encoder_inputs.to(device)
decoder_inputs = decoder_inputs.to(device)
decoder_targets = decoder_targets.to(device)
# 순전파
# outputs.shape : (batch_size, seq_len, tar_vocab_size)
outputs = model(encoder_inputs, decoder_inputs)
# 손실 계산
# outputs.view(-1, outputs.size(-1))의 shape는 (batch_size * seq_len, tar_vocab_size)
# decoder_targets.view(-1)의 shape는 (batch_size * seq_len)
loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
total_loss += loss.item()
# 정확도 계산 (패딩 토큰 제외)
mask = (decoder_targets != 0)
# outputs.argmax(dim=-1) : 각 시퀀스의 각 타임스텝에서, 모델이 가장 높은 확률로 예측한 단어의 인덱스를 반환 ->(batch_size, seq_len)
total_correct += ((outputs.argmax(dim=-1) == decoder_targets) * mask).sum().item()
# 패딩 토큰을 제외한 전체 정답 토큰의 개수 체크
total_count += mask.sum().item()
return total_loss / len(dataloader), total_correct / total_count
10. 훈련 (검증 함수를 사용하기 위해, 훈련 반복문이 끝나고 검증 값을 구한다.)
# Training loop
best_val_loss = 99999 # 최대한 큰 값으로 초기값 설정
for epoch in range(num_epochs):
start_time = time.time()
# 훈련 모드
model.train()
for encoder_inputs, decoder_inputs, decoder_targets in train_loader:
encoder_inputs = encoder_inputs.to(device)
decoder_inputs = decoder_inputs.to(device)
decoder_targets = decoder_targets.to(device)
# 기울기 초기화
optimizer.zero_grad()
# 순방향 전파
# outputs.shape == (batch_size, seq_len, tar_vocab_size)
outputs = model(encoder_inputs, decoder_inputs)
# 손실 계산 및 역방향 전파
# outputs.view(-1, outputs.size(-1))의 shape는 (batch_size * seq_len, tar_vocab_size)
# decoder_targets.view(-1)의 shape는 (batch_size * seq_len)
loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
loss.backward()
# 가중치 업데이트
optimizer.step()
train_loss, train_acc = evaluation(model, train_loader, loss_function, device)
valid_loss, valid_acc = evaluation(model, valid_loader, loss_function, device)
# Epoch 종료 시간 기록 및 소요 시간 계산
end_time = time.time()
epoch_time = end_time - start_time
minutes = int(epoch_time // 60)
seconds = int(epoch_time % 60)
print(f'Epoch: {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')
print(f'소요시간 : {minutes}분 {seconds}초')
print('')
# valid_loss가 갱신될 때 체크포인트 저장
if valid_loss < best_val_loss:
print(f'Validation loss improved from {best_val_loss:.4f} to {valid_loss:.4f}. 체크포인트를 저장합니다.')
best_val_loss = valid_loss
torch.save(model.state_dict(), 'best_model_checkpoint.pth')
11. 학습된 모델 불러오기
# 모델 로드
model.load_state_dict(torch.load('best_model_checkpoint.pth'))
# 모델을 device에 올립니다.
model.to(device)
# 검증 데이터에 대한 정확도와 손실 계산
val_loss, val_accuracy = evaluation(model, valid_loader, loss_function, device)
print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')
12. 평가(Test)
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0):
sentence = sentence + index_to_src[encoded_word] + ' '
return sentence
# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
sentence = sentence + index_to_tar[encoded_word] + ' '
return sentence
평가용 디코더 준비
def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)
# 인코더의 초기 상태 설정
hidden, cell = model.encoder(encoder_inputs)
# 시작 토큰 <sos>을 디코더의 첫 입력으로 설정
# unsqueeze(0)는 배치 차원을 추가하기 위함.
decoder_input = torch.tensor([6], dtype=torch.long).unsqueeze(0).to(device)
decoded_tokens = []
# for문을 도는 것 == 디코더의 각 시점
for _ in range(max_output_len):
output, hidden, cell = model.decoder(decoder_input, hidden, cell)
# 소프트맥스 회귀를 수행. 예측 단어의 인덱스
output_token = output.argmax(dim=-1).item()
# 종료 토큰 <eos>
if output_token == 7:
break
# 각 시점의 단어(정수) decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴합니다.
decoded_tokens.append(output_token)
# 현재 시점의 예측. 다음 시점의 입력으로 사용된다.
decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)
print(decoded_tokens[0])
trg_tokens = [int_to_tar_token[token] for token in decoded_tokens]
return ''.join(token if token.startwith('#') else token+' ' for token in trg_tokens)
평가(test)
for seq_index in [3, 50, 100, 300, 1001]:
input_seq = valid_df['src_in'].iloc[seq_index]
translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)
print("입력문장 :",valid_df['원문'].iloc[seq_index])
print("정답문장 :",valid_df['번역문'].iloc[seq_index])
print("번역문장 :",translated_text)
print("-"*50)
Tags:
AI개발_교육