RNN Seq2Seq

Seq2Seq 이론

out, put = LSTM(x) 예시

  • Many to One
    • out[ : , -1, : ]

  • Many to Many
    • out[ : , : , : ]

  • Many to Many
    • 모두 순차 입력 > Context vector > 모두 순차 출력
    • Context vector
      • 입력을 모두 받은 뒤에 처음으로 나가는 하나의 벡터
      • 메모리 기능을 한다.
    • hidden(short term), cell(long term) > Context vector > out[ : , : , : ]
    • 주의할 점은 LSTM과 같이 hidden과 cell 2개가 나오는 경우에 out[ : , -1, : ] 을 사용할 수 없다는 것이다.
      • LSTM은 일반적으로 입력인자를 2개 받는다. 디폴트는 (토큰, 0) 

데이터 예시

  1. 한글 입력 데이터:
    [나는], [학생], [이다], [<eos>]
    <eos> = 끝신호 토큰, recurrent 과정에서 이게 나오면 끝.

  2. 영어 입력 데이터(학습용):
    [<sos>], [I], [am], [a], [student]
    학습할 때만 사용한다. 첫 단어가 'I'라는 것을 알려준다.
    teacher forcing: 강제적 교사

  3. 영어 정답 데이터:
    [I], [am], [a], [student], [<eos>]

인코더와 디코더

  • 인코더: 기존 LSTM, many to one과 동일

  • 디코더: 모두 순차출력
    • 첫번째 순차:
      • in: <sos> + ContextVector
      • out1: h1, c1
      • out2: h1 > fc(완전연결계층) > 예측값

    • 두번째 순차: 
      • in:
        • 학습시: 첫번째 정답 예측값을 전달해준다.
          (첫번째부터 오답을 하면, 다음 시퀀스의 학습이 의미 없기 때문)
        • 검증시: out2(예측값)

  • RNN은 seq가 길어지면, 단기기억력(기울기 소실) 이슈가 존재한다.
    Seq2Seq는 Many to one 과 Many to Many 이기 때문에 Many to one의 ContextVector가 잘못 나올 가능성이 높다. 

  • 즉, RNN에서 LSTM으로 바뀌었다고 해도 기울기 소실 문제를 완전히 해결하지 못한 것처럼, Seq2Seq도 같은 문제를 갖고 있다. 그래서 이를 보완하고자 Attention을 사용한다.


Seq2Seq 실습

1. 토큰화 (아래 코드에서는 입력에 따로 <eos>를 넣지 않았다.)

# 원문(한국어) 토큰화
def preprocess_src(text):
    tokens = tokenizer.tokenize(text)
    return tokens

# 번역문(영어) 입력, 출력 토큰화

def preprocess_trg_in(text):
  text = ['<sos>'] + tokenizer.tokenize((text))
  return text

def preprocess_trg_out(text):
  text = tokenizer.tokenize((text)) + ['<eos>']
  return text

2. 단어집합

def build_vocab(sents):
  word_list = []

  for sent in sents:
      for word in sent:
        word_list.append(word)

  # 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
  # Counter는 단어를 키(key), 등장 빈도를 값(value)으로 가지도록 반환
  word_counts = Counter(word_list)
  vocab = sorted(word_counts, key=word_counts.get, reverse=True)

  word_to_index = {}
  word_to_index['<PAD>'] = 0
  word_to_index['<UNK>'] = 1

  # 등장 빈도가 높은 단어일수록 낮은 정수를 부여
  for index, word in enumerate(vocab) :
    if word != '<PAD>':
        word_to_index[word] = index + 2

  return word_to_index

word to index (src, tar)
src_vocab = build_vocab(src_input)
tar_vocab = build_vocab(trg_input + trg_output)

index to word (src, tar)
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

3. Seq 숫자 정규화

# 원문의 문장별 토큰 길이에 대한 집계함수 출력
new_df['src_in_len'].describe()

# src_in_len이 23보다 큰 행 제거
new_df = new_df[new_df['src_in_len'] <= 23].reset_index(drop=True)

4. 패딩처리

# 원문과 번역문 최대 길이 확인

src_length= int(new_df['src_in_len'].max())
trg_length = int(new_df['trg_in_len'].max())
src_length, trg_length

원문 패딩
# 원문 패딩 및 인코딩
def src_encoding(tokens):
    # (src_length - 토큰 수) 만큼 패딩토큰 추가
    tokens = tokens + ['<PAD>'] *  (src_length - len(tokens))

    # 인코딩 된 숫자를 담아 둘 리스트
    index_sequences = []

    # 문장에서 토큰을 꺼내오며
    for word in tokens:
      try: # 토큰 인코딩 (단어 사전에 없는 토큰이 들어오면 except로)
          index_sequences.append(src_vocab[word])
      except KeyError: # 단어 사전에 없는 토큰이 들어오면 '<UNK>' 토큰의 숫자로 변환
          index_sequences.append(src_vocab['<UNK>'])

    return index_sequences

번역문 (input: <sos>, , )
# 번역문 패딩 및 인코딩
def trg_encoding(tokens):
    tokens = tokens + ['<PAD>'] * (trg_length - len(tokens))
    index_sequences = []
    for word in tokens:
      try:
          index_sequences.append(tar_vocab[word])
      except KeyError:
          index_sequences.append(tar_vocab['<UNK>'])

    return index_sequences

5. 데이터셋 준비

from sklearn.model_selection import train_test_split

# 데이터프레임에서 무작위로 80%를 훈련 데이터로, 20%를 검증 데이터로 분할
train_df, valid_df = train_test_split(new_df, test_size=0.2, random_state=42)

# 결과 확인
print(f"훈련 데이터 크기: {len(train_df)}")
print(f"검증 데이터 크기: {len(valid_df)}")

텐서화(TranslationDataset), 데이터로더(DataLoader)
# 하이퍼파라미터 설정
BATCH_SIZE = 256

# 훈련 데이터셋 및 데이터로더
train_dataset = TranslationDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# 검증 데이터셋 및 데이터로더
valid_dataset = TranslationDataset(valid_df)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)

6. 인코더 모델 정의

import torch
import torch.nn as nn
import torch.optim as optim

embedding_dim = 256
hidden_units = 512

class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_units):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, num_layers=1, batch_first=True)

    def forward(self, x):
        # 입력 데이터 차원 : (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)

        # hidden.shape : (1, batch_size, hidden_units), cell.shape : (1, batch_size, hidden_units)
        _, (hidden, cell) = self.lstm(x)

        # 인코더의 출력은 hidden state, cell state
        return hidden, cell


7. 디코더 모델 정의

class Decoder(nn.Module):
    def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_units, tar_vocab_size)

    def forward(self, x, hidden, cell):

        # x.shape : (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)

        # 디코더의 LSTM으로 인코더의 hidden state, cell state를 전달.
        # output.shape : (batch_size, seq_len, hidden_units)
        # hidden.shape : (1, batch_size, hidden_units)
        # cell.shape : (1, batch_size, hidden_units)
        output, (hidden, cell) = self.lstm(x, (hidden, cell))

        # output.shape: (batch_size, seq_len, tar_vocab_size)
        output = self.fc(output)

        # 디코더의 출력은 예측값, hidden state, cell state
        return output, hidden, cell

8. Seq2Seq = 인코더와 디코더 결합

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        hidden, cell = self.encoder(src)

        # 훈련 중에는 디코더의 출력 중 오직 output만 사용한다.
        output, _, _ = self.decoder(trg, hidden, cell)
        return output

encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)

# ignore_index는 손실 계산에서 특정 인덱스를 무시하도록 설정하는 파라미터
loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters())

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(3488, 256, padding_idx=0)
    (lstm): LSTM(256, 512, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(11971, 256, padding_idx=0)
    (lstm): LSTM(256, 512, batch_first=True)
    (fc): Linear(in_features=512, out_features=11971, bias=True)
  )
)

9. 훈련과정에서 검증함수

def evaluation(model, dataloader, loss_function, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)

            # 순전파
            # outputs.shape : (batch_size, seq_len, tar_vocab_size)
            outputs = model(encoder_inputs, decoder_inputs)

            # 손실 계산
            # outputs.view(-1, outputs.size(-1))의 shape는 (batch_size * seq_len, tar_vocab_size)
            # decoder_targets.view(-1)의 shape는 (batch_size * seq_len)
            loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
            total_loss += loss.item()

            # 정확도 계산 (패딩 토큰 제외)
            mask = (decoder_targets != 0)
            # outputs.argmax(dim=-1) : 각 시퀀스의 각 타임스텝에서, 모델이 가장 높은 확률로 예측한 단어의 인덱스를 반환 ->(batch_size, seq_len)
            total_correct += ((outputs.argmax(dim=-1) == decoder_targets) * mask).sum().item()
            # 패딩 토큰을 제외한 전체 정답 토큰의 개수 체크
            total_count += mask.sum().item()

    return total_loss / len(dataloader), total_correct / total_count


10. 훈련 (검증 함수를 사용하기 위해, 훈련 반복문이 끝나고 검증 값을 구한다.)

# Training loop
best_val_loss = 99999 # 최대한 큰 값으로 초기값 설정

for epoch in range(num_epochs):
    start_time = time.time()
    # 훈련 모드
    model.train()

    for encoder_inputs, decoder_inputs, decoder_targets in train_loader:
        encoder_inputs = encoder_inputs.to(device)
        decoder_inputs = decoder_inputs.to(device)
        decoder_targets = decoder_targets.to(device)

        # 기울기 초기화
        optimizer.zero_grad()

        # 순방향 전파
        # outputs.shape == (batch_size, seq_len, tar_vocab_size)
        outputs = model(encoder_inputs, decoder_inputs)

        # 손실 계산 및 역방향 전파
        # outputs.view(-1, outputs.size(-1))의 shape는 (batch_size * seq_len, tar_vocab_size)
        # decoder_targets.view(-1)의 shape는 (batch_size * seq_len)
        loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
        loss.backward()

        # 가중치 업데이트
        optimizer.step()

    train_loss, train_acc = evaluation(model, train_loader, loss_function, device)
    valid_loss, valid_acc = evaluation(model, valid_loader, loss_function, device)

    # Epoch 종료 시간 기록 및 소요 시간 계산
    end_time = time.time()
    epoch_time = end_time - start_time

    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)

    print(f'Epoch: {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')
    print(f'소요시간 : {minutes}{seconds}초')
    print('')

    # valid_loss가 갱신될 때 체크포인트 저장
    if valid_loss < best_val_loss:
        print(f'Validation loss improved from {best_val_loss:.4f} to {valid_loss:.4f}. 체크포인트를 저장합니다.')
        best_val_loss = valid_loss
        torch.save(model.state_dict(), 'best_model_checkpoint.pth')


11. 학습된 모델 불러오기

# 모델 로드
model.load_state_dict(torch.load('best_model_checkpoint.pth'))

# 모델을 device에 올립니다.
model.to(device)

# 검증 데이터에 대한 정확도와 손실 계산
val_loss, val_accuracy = evaluation(model, valid_loader, loss_function, device)

print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')


12. 평가(Test)

# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

평가용 디코더 준비
def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
    encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

    # 인코더의 초기 상태 설정
    hidden, cell = model.encoder(encoder_inputs)

    # 시작 토큰 <sos>을 디코더의 첫 입력으로 설정
    # unsqueeze(0)는 배치 차원을 추가하기 위함.
    decoder_input = torch.tensor([6], dtype=torch.long).unsqueeze(0).to(device)

    decoded_tokens = []

    # for문을 도는 것 == 디코더의 각 시점
    for _ in range(max_output_len):
        output, hidden, cell = model.decoder(decoder_input, hidden, cell)

        # 소프트맥스 회귀를 수행. 예측 단어의 인덱스
        output_token = output.argmax(dim=-1).item()

        # 종료 토큰 <eos>
        if output_token == 7:
            break

        # 각 시점의 단어(정수) decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴합니다.
        decoded_tokens.append(output_token)

        # 현재 시점의 예측. 다음 시점의 입력으로 사용된다.
        decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)
        print(decoded_tokens[0])
        trg_tokens = [int_to_tar_token[token] for token in decoded_tokens]
    return ''.join(token if token.startwith('#') else token+' ' for token in trg_tokens)


평가(test)
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = valid_df['src_in'].iloc[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",valid_df['원문'].iloc[seq_index])
  print("정답문장 :",valid_df['번역문'].iloc[seq_index])
  print("번역문장 :",translated_text)
  print("-"*50)



댓글 쓰기

다음 이전