본문 바로가기

코딩 -/인공지능

RNN의 모델구조만 보고 RNN을 직접 만들어보자.

반응형

RNN의 모델구조는 다음과 같다.

$x_0$ ~ $x_t$는 각 글자이다. $x_0$의경우 첫 글자가 되는 것이고 $h_t$와 $y_t$는 다음과 같이 계산된다. 

 

$h_t = \tanh\left( \mathbf{W}_h \cdot \left[ h_{t-1}, x_t \right] + \mathbf{b}_h \right)$

$y_t = \text{softmax}\left( \mathbf{W}_y \cdot h_t + \mathbf{b}_y \right)$

 

즉 $x_t$와 $h_{t-1}$를 합쳐서 $W_h$에 곱해주고, $\tanh$ 에 넣어준 값이 $h_t$ 이고, 여기서 나온  $h_t$ 를 $W_y$에 곱해준값이 출력 $y_t$ 인 것이다.

여기서 $h_t$ 는 그다음 $W_{h+1}$과 곱해지는 $x_{t+1}$과 합쳐져 입력으로 들어가게 된다. 

 

 

이를 구현해 보기 전에 기본적인 텍스트의 인풋을 간단하게 다음과 같이 정의한다. 

 

문장들의 리스트명을 sentences라 가정. 모든 문장을 모아 다음과 같은 처리를 한다. 

word_set = set([word for sentence in sentences for word in sentence.split()])
vocap_size = len(word_set)
idx_to_word =  {idx: word for idx, word in enumerate(word_set)}
word_to_idx = {word:idx for idx, word in enumerate(word_set)}

 

여기서 vocap_size가 충분히 크기 때문에, 원핫인코딩을 사용할 수 없다. 

그래서 등장한 개념이 embedding이다. 

embedding 은 간단히 각 word를 수용가능한 크기의 벡터로 변환시켜 주도록 학습이 되는 개념으로 보면 되고, 학습이 진행되면서 이 embedding 벡터 또한 같이 학습이 된다. (자세한 개념은 더 자세히 찾아보길 바람)

 

 

그럼 대충 위의 수식 구조를 가지고 모델을 설계해 보자 

class RNN(nn.Module):
  def __init__(self, hidden_size, input_size, vocab_size):
    super(RNN, self).__init__()
    self.embedding = nn.Embedding(vocab_size, input_size)
    self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
    self.W_y = nn.Linear(hidden_size, vocab_size)
    self.tanh = nn.Tanh()
    self.softmax = nn.Softmax(dim=1)

  def forward(self, x, b_ht):
    x = self.embedding(x)
    ht = self.W_h(torch.cat((x, b_ht), dim=1))
    ht = self.tanh(ht)
    yt = self.W_y(ht)
    yt = self.softmax(yt)
    return yt, ht

 

대충 수식에 맞춘다고 맞췄는데 문제가 생겼다.

RNN의 경우는 문장을 그대로 넣고 다음 문장을 내뱉는 구조가 아니라, 문장의 첫 단어부터 시작해서 예측 이전단어까지 순서대로 연산을 진행하고 체이닝이 되어야 하기 때문이다. 

 

따라서 내가 작성한 forward는 forward 함수라기보단 forward에서 진행하는 각 단어단위의 작업이라고 보는 게 맞을 것 같다.  (이래서 트랜스포머가 나온 거구나.. 연속적으로 진행돼 야해서 비효율적임)

 

작명을 둘째치고 함수를 완성해 보자.

 

class RNN(nn.Module):
  def __init__(self, hidden_size, input_size, vocab_size):
    super(RNN, self).__init__()
    self.embedding = nn.Embedding(vocab_size, input_size)
    self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
    self.W_y = nn.Linear(hidden_size, vocab_size)
    self.tanh = nn.Tanh()
    self.softmax = nn.Softmax(dim=1)

  def fd(self, x, b_ht):
    x = self.embedding(x)
    ht = self.W_h(torch.cat((x, b_ht), dim=1))
    ht = self.tanh(ht)
    yt = self.W_y(ht)
    yt = self.softmax(yt)
    return yt, ht


  def forward(self, x, b_ht):
    b_ht = None
    for i in range(x.size(0)):
      yt, ht = self.fd(x[i], b_ht)
      b_ht = ht
    return yt, ht

 

x가 문장이라면 x의 문장의 길이만큼 for 문을 돌면서 한 단어씩 아까 정의한 함수에 넣고 b_ht를 받아오려고 했다. 

여기서 다시 두 가지 문제가 생긴다. 

x는 단일 문장이 아니라 배치를 기대한다. 

 

그렇다면 x라는 문장의 단어수만큼 어떠한 연산을 진행하기 어렵다는 것이다. (배치 내의 각 문장은 길이가 다르기 때문)

또 하나는 b_ht를 처음 초기화해줘야 하는데 0 벡터로 초기화하려고 했더니 형상이 기존 ht과 같으면서 배치 사이즈를 가져야 하기 때문에 배치사이즈 x hidden_size  (ht의 사이즈)의 행렬이 되어야 해서 RNN을 초기화할 때 batch_size를 받아와야 했다. (이는 간단히 추가)

 

남은 문제는 배치 내 문장들마다 다른 단어수를 맞혀줘야 한다는 건데 이건 찾아보니 PAD 등을 넣는다고 한다.

 

그럼 또 궁금한 점은 가장 긴 문장을 기준으로 PAD를 넣을 수 없지 않은가..

그래서 이걸 해결하려면 문장들을 단어수 기준으로 정렬해서 비슷한 단어수의 문장끼리 배치를 묶이도록 묶어준다는 것..! 

 

(Padding 이 배치 때문에 나온 개념이 자연스럽게 이해되고 문제점과 효율성의 장단점이 이해됨..!)

 

잠시 패딩처리와 관련해서 텍스트데이터 로드, 전처리 코드는 아래와 같다

from sklearn.datasets import fetch_20newsgroups

# 20 Newsgroups 데이터셋 불러오기 (텍스트만 로드)
newsgroups_train = fetch_20newsgroups(subset='train', remove=('headers', 'footers', 'quotes'))

# 데이터 확인
print(f"Number of documents: {len(newsgroups_train.data)}")

sentences = []
# 10개 문서 출력
for i, doc in enumerate(newsgroups_train.data):
    sentences.append(doc)

sentence_list = []

for sentence in sentences:
  sentence.replace('\n', '.')
  while '..' in sentence:
    sentence = sentence.replace('..', '.')
  sentence_list.extend(sentence.split('.'))

sentence_list = [sentence.strip() for sentence in sentence_list if sentence.strip() != '']
sentences = sentence_list

word_set = set([word for sentence in sentences for word in sentence.split()])
vocab_size = len(word_set)
idx_to_word =  {idx+1: word for idx, word in enumerate(word_set)}
word_to_idx = {word:idx+1 for idx, word in enumerate(word_set)}

# 이제 단어를 idx 리스트로 만들고 단어가 너무 적은 문장은 제외하자. 
word_idx_inputs = [[word_to_idx[word] for word in sentence.split()] for sentence in sentences]
word_idx_inputs = [inputs for inputs in word_idx_inputs if len(inputs)>=5 and len(inputs) <=15]

# 각 문장당 단어수를 기준으로 정렬해주자 
word_idx_inputs = sorted(word_idx_inputs, key=len)

x = []
for i in range(len(word_idx_inputs)):
  x.append(word_idx_inputs[i][:-1])

y = []
for i in range(len(word_idx_inputs)):
  y.append(word_idx_inputs[i][1:]) 

X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

# 먼저 X_train과 y_train을 함께 묶습니다.
train_data = list(zip(X_train, y_train))
test_data = list(zip(X_test, y_test))

# 길이를 기준으로 train_data를 정렬합니다.
train_data = sorted(train_data, key=lambda pair: len(pair[0]))
test_data = sorted(test_data, key=lambda pair: len(pair[0]))

# 정렬된 데이터를 다시 X_train과 y_train으로 나눕니다.
X_train, y_train = zip(*train_data)
X_test, y_test = zip(*test_data)

# 리스트로 변환 (zip 객체는 튜플이기 때문에 리스트로 변환해 줍니다)
X_train = list(X_train)
y_train = list(y_train)
X_test = list(X_test)
y_test = list(y_test)

padding_value = 0

# 미리 배치단위로 패딩 토큰을 처리해주자
def pad_sequences(sequences, max_len, padding_value=padding_value, curr_idx=None):
    padded_sequences = []
    for seq in sequences:
        if len(seq) < max_len:
          seq = seq + [padding_value] * (max_len - len(seq))
        padded_sequences.append(seq)
    return padded_sequences

batch_size = 64

def get_padded_word_idx_inputs(before_inputs):
  padded_word_idx_inputs = []
  for i in range(0, len(before_inputs), batch_size):
      batch = before_inputs[i:i + batch_size]
      max_len = max([len(seq) for seq in batch])  # 배치 내 최대 길이 계산
      padded_batch = pad_sequences(batch, max_len,padding_value, i)  # 배치 내 최대 길이로 패딩
      padded_word_idx_inputs.extend(padded_batch)

      check_len = len(padded_batch[0])
      for seq in padded_batch:
        assert len(seq) == check_len
  return padded_word_idx_inputs

X_train = get_padded_word_idx_inputs(X_train)
X_test = get_padded_word_idx_inputs(X_test)
y_train = get_padded_word_idx_inputs(y_train)
y_test = get_padded_word_idx_inputs(y_test)

 

#Todo 문장단위 정렬 후 데이터 로더의 Shuffle을 끄게 되면 학습의 악영향이 있을 수 있다고 한다. 동적인 패딩처리를 고려해 보자. 

 

전처리가 조금 복잡한데 pad 토큰을 추가하기 위해서 조금 복잡해졌다.

이렇게 처리하고 나면 각 배치별로 인풋 길이를 맞춰줄 수 있다.

 

또한 dataloader에 넣는 코드

class TextDataset(Dataset):
  def __init__(self, x, y):
    self.x = x
    self.y = y

  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    return torch.tensor(self.x[idx]) ,torch.tensor(self.y[idx])
    
train_dataset = TextDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

 

다시 모델로 돌아와서

 

class RNN(nn.Module):
  def __init__(self, hidden_size, input_size, vocab_size, batch_size):
    super(RNN, self).__init__()
    self.embedding = nn.Embedding(vocab_size, input_size)
    self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
    self.W_y = nn.Linear(hidden_size, vocab_size)
    self.tanh = nn.Tanh()
    self.softmax = nn.Softmax(dim=1)
    self.batch_size = batch_size
    self.hidden_size = hidden_size

  def fw(self, x, b_ht):
    x = self.embedding(x)
    b_ht = b_ht.to(x.device)  
    ht = self.W_h(torch.cat((x, b_ht), dim=1))
    ht = self.tanh(ht)
    yt = self.W_y(ht)
   # yt = self.softmax(yt)
    return yt, ht

  # 입력 문장의 길이가 비슷한 배치가 섞여왔을것이라고 가정하고 진행.
  def forward(self, x):
    max_length = len(x[0])
    b_ht = torch.zeros(len(x), self.hidden_size)
    b_ht = b_ht.to(x.device)
    outputs = []

    for t in range(max_length):
      x_t = x[:,t]
      mask = (x_t != 0).float().unsqueeze(1)  # 패딩 여부에 따른 마스크 계산 (배치 크기, 1)
      yt, ht = self.fw(x[:,t], b_ht)

      b_ht = ht * mask + b_ht * (1 - mask)
      outputs.append(yt.unsqueeze(1))

    outputs = torch.cat(outputs, dim=1)
    return outputs

 

forward 부분을 보면 b_ht는 x의 len을 따라가고 (batch_size로 하니까 마지막 배치에서 에러남.. 개수가 부족해서)

첫 단어에는 0로 초기화해 준다.

그리고 여기서도 패딩여부에 따른 마스크 계산을 하는데 0이 아니면 1 0이면 0으로 만들어서 마지막에 이전벡터를 갱신할지, 또는 그대로 둘 지를 정한다.

 

여기까지 했으면 loss함수를 따로 정의해야 한다. 

 

criterion = nn.CrossEntropyLoss(reduction='none', ignore_index=0)  # 패딩 값을 무시하도록 설정

# 손실 함수 계산
def compute_loss(outputs, targets):
    # targets는 시퀀스 전체에 대해 각 타임스텝에서의 정답 레이블을 포함
    # outputs의 shape: [batch_size, seq_len, vocab_size]
    # targets의 shape: [batch_size, seq_len]

    mask = (targets != padding_value).float()  # padding_token에 해당하는 부분을 0으로
    # 타임스텝별로 손실 계산
    loss = 0
    for t in range(targets.size(1)):
        output_t = outputs[:, t, :]  # [batch_size, vocab_size]
        target_t = targets[:, t]  # [batch_size]
        mask_t = mask[:, t]  # [batch_size]

        loss_t = criterion(output_t, target_t)
        loss += (loss_t * mask_t).sum()  # 마스킹된 손실 합산

    return loss / mask.sum()

 

여기서도 패딩에 해당하는 부분들을 0으로 만들어주고

reduction = 'none'이라는 옵션을 주어서 배치 내 요소들의 개별 loss를 받아 mssk_t를 곱해줌으로써 마스킹된 것을 계산하지 않도록 만든다 

 

또한 ignore_index=0으로  y 레이블에 패딩이 들어가도 무시하도록 하였다. 

 

 

최종적으로 아래는 학습 코드이다.

model = RNN(256, 32, vocab_size, batch_size)

def train(train_loader, model, loss_fn, optimizer):
    model.train()

    size = len(train_loader.dataset)

    for batch, (X, y) in enumerate(train_loader):
        try:
          X, y = X.to(DEVICE), y.to(DEVICE)
        except:
          print(f"x:{x}")
          print(f"y:{y}")
          continue
        pred = model(X)

        # 손실 계산
        loss = loss_fn(pred, y)

        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f'loss: {loss:>7f}  [{current:>5d}]/{size:5d}')
            
            
def test(test_loader, model, loss_fn, optimizer):
    model.eval()

    size = len(test_loader.dataset)
    num_batches = len(test_loader)
    test_loss, correct = 0, 0
    total_elements = 0

    with torch.no_grad():
      for batch, (X, y) in enumerate(test_loader):
          try:
            X, y = X.to(DEVICE), y.to(DEVICE)
          except:
            print(f"x:{x}")
            print(f"y:{y}")
          continue
          pred = model(X)

          # 손실 계산
          loss = loss_fn(pred, y)
          test_loss += loss.item()
          mask = (y != padding_value).float()  
           # correct 계산
          pred_labels = pred.argmax(dim=2)  # 시퀀스 길이를 기준으로 argmax 계산
          correct += ((pred_labels == y) * mask).sum().item()  # 패딩된 부분 무시하고 정확도 계산


    test_loss /= num_batches
    correct /= total_elements  
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:8f}\n")
    return test_loss
    
    import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

EPOCH = 10
BATCH_SIZE = batch_size
LEARNING_RATE = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)
model = model.to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# StepLR 스케줄러 정의
scheduler = StepLR(optimizer, step_size=8, gamma=0.2)
print(model)

 

학습이 잘되는 거 같지 않고 문제도 다시 찾아야 하지만 

RNN 구조를 직접 만들면서 언어모델에서의 패딩처리나 masking 처리가 왜 필요하고 어떤 식으로 이를 무시하게 할지 고민해 보는 게 좋은 경험이 된 것 같다

또한 RNN 모델의 구조적인 이해를 조금 더 깊이 한 것 같다. 

 

코랩 링크

https://colab.research.google.com/drive/1hjD7SEvFxlrN4HSlUK_sPzq7oX7DLE4E?usp=sharing

반응형