RNN의 모델구조는 다음과 같다.
$x_0$ ~ $x_t$는 각 글자이다. $x_0$의경우 첫 글자가 되는 것이고 $h_t$와 $y_t$는 다음과 같이 계산된다.
$h_t = \tanh\left( \mathbf{W}_h \cdot \left[ h_{t-1}, x_t \right] + \mathbf{b}_h \right)$
$y_t = \text{softmax}\left( \mathbf{W}_y \cdot h_t + \mathbf{b}_y \right)$
즉 $x_t$와 $h_{t-1}$를 합쳐서 $W_h$에 곱해주고, $\tanh$ 에 넣어준 값이 $h_t$ 이고, 여기서 나온 $h_t$ 를 $W_y$에 곱해준값이 출력 $y_t$ 인 것이다.
여기서 $h_t$ 는 그다음 $W_{h+1}$과 곱해지는 $x_{t+1}$과 합쳐져 입력으로 들어가게 된다.
이를 구현해 보기 전에 기본적인 텍스트의 인풋을 간단하게 다음과 같이 정의한다.
문장들의 리스트명을 sentences라 가정. 모든 문장을 모아 다음과 같은 처리를 한다.
word_set = set([word for sentence in sentences for word in sentence.split()])
vocap_size = len(word_set)
idx_to_word = {idx: word for idx, word in enumerate(word_set)}
word_to_idx = {word:idx for idx, word in enumerate(word_set)}
여기서 vocap_size가 충분히 크기 때문에, 원핫인코딩을 사용할 수 없다.
그래서 등장한 개념이 embedding이다.
embedding 은 간단히 각 word를 수용가능한 크기의 벡터로 변환시켜 주도록 학습이 되는 개념으로 보면 되고, 학습이 진행되면서 이 embedding 벡터 또한 같이 학습이 된다. (자세한 개념은 더 자세히 찾아보길 바람)
그럼 대충 위의 수식 구조를 가지고 모델을 설계해 보자
class RNN(nn.Module):
def __init__(self, hidden_size, input_size, vocab_size):
super(RNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, input_size)
self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
self.W_y = nn.Linear(hidden_size, vocab_size)
self.tanh = nn.Tanh()
self.softmax = nn.Softmax(dim=1)
def forward(self, x, b_ht):
x = self.embedding(x)
ht = self.W_h(torch.cat((x, b_ht), dim=1))
ht = self.tanh(ht)
yt = self.W_y(ht)
yt = self.softmax(yt)
return yt, ht
대충 수식에 맞춘다고 맞췄는데 문제가 생겼다.
RNN의 경우는 문장을 그대로 넣고 다음 문장을 내뱉는 구조가 아니라, 문장의 첫 단어부터 시작해서 예측 이전단어까지 순서대로 연산을 진행하고 체이닝이 되어야 하기 때문이다.
따라서 내가 작성한 forward는 forward 함수라기보단 forward에서 진행하는 각 단어단위의 작업이라고 보는 게 맞을 것 같다. (이래서 트랜스포머가 나온 거구나.. 연속적으로 진행돼 야해서 비효율적임)
작명을 둘째치고 함수를 완성해 보자.
class RNN(nn.Module):
def __init__(self, hidden_size, input_size, vocab_size):
super(RNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, input_size)
self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
self.W_y = nn.Linear(hidden_size, vocab_size)
self.tanh = nn.Tanh()
self.softmax = nn.Softmax(dim=1)
def fd(self, x, b_ht):
x = self.embedding(x)
ht = self.W_h(torch.cat((x, b_ht), dim=1))
ht = self.tanh(ht)
yt = self.W_y(ht)
yt = self.softmax(yt)
return yt, ht
def forward(self, x, b_ht):
b_ht = None
for i in range(x.size(0)):
yt, ht = self.fd(x[i], b_ht)
b_ht = ht
return yt, ht
x가 문장이라면 x의 문장의 길이만큼 for 문을 돌면서 한 단어씩 아까 정의한 함수에 넣고 b_ht를 받아오려고 했다.
여기서 다시 두 가지 문제가 생긴다.
x는 단일 문장이 아니라 배치를 기대한다.
그렇다면 x라는 문장의 단어수만큼 어떠한 연산을 진행하기 어렵다는 것이다. (배치 내의 각 문장은 길이가 다르기 때문)
또 하나는 b_ht를 처음 초기화해줘야 하는데 0 벡터로 초기화하려고 했더니 형상이 기존 ht과 같으면서 배치 사이즈를 가져야 하기 때문에 배치사이즈 x hidden_size (ht의 사이즈)의 행렬이 되어야 해서 RNN을 초기화할 때 batch_size를 받아와야 했다. (이는 간단히 추가)
남은 문제는 배치 내 문장들마다 다른 단어수를 맞혀줘야 한다는 건데 이건 찾아보니 PAD 등을 넣는다고 한다.
그럼 또 궁금한 점은 가장 긴 문장을 기준으로 PAD를 넣을 수 없지 않은가..
그래서 이걸 해결하려면 문장들을 단어수 기준으로 정렬해서 비슷한 단어수의 문장끼리 배치를 묶이도록 묶어준다는 것..!
(Padding 이 배치 때문에 나온 개념이 자연스럽게 이해되고 문제점과 효율성의 장단점이 이해됨..!)
잠시 패딩처리와 관련해서 텍스트데이터 로드, 전처리 코드는 아래와 같다
from sklearn.datasets import fetch_20newsgroups
# 20 Newsgroups 데이터셋 불러오기 (텍스트만 로드)
newsgroups_train = fetch_20newsgroups(subset='train', remove=('headers', 'footers', 'quotes'))
# 데이터 확인
print(f"Number of documents: {len(newsgroups_train.data)}")
sentences = []
# 10개 문서 출력
for i, doc in enumerate(newsgroups_train.data):
sentences.append(doc)
sentence_list = []
for sentence in sentences:
sentence.replace('\n', '.')
while '..' in sentence:
sentence = sentence.replace('..', '.')
sentence_list.extend(sentence.split('.'))
sentence_list = [sentence.strip() for sentence in sentence_list if sentence.strip() != '']
sentences = sentence_list
word_set = set([word for sentence in sentences for word in sentence.split()])
vocab_size = len(word_set)
idx_to_word = {idx+1: word for idx, word in enumerate(word_set)}
word_to_idx = {word:idx+1 for idx, word in enumerate(word_set)}
# 이제 단어를 idx 리스트로 만들고 단어가 너무 적은 문장은 제외하자.
word_idx_inputs = [[word_to_idx[word] for word in sentence.split()] for sentence in sentences]
word_idx_inputs = [inputs for inputs in word_idx_inputs if len(inputs)>=5 and len(inputs) <=15]
# 각 문장당 단어수를 기준으로 정렬해주자
word_idx_inputs = sorted(word_idx_inputs, key=len)
x = []
for i in range(len(word_idx_inputs)):
x.append(word_idx_inputs[i][:-1])
y = []
for i in range(len(word_idx_inputs)):
y.append(word_idx_inputs[i][1:])
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
# 먼저 X_train과 y_train을 함께 묶습니다.
train_data = list(zip(X_train, y_train))
test_data = list(zip(X_test, y_test))
# 길이를 기준으로 train_data를 정렬합니다.
train_data = sorted(train_data, key=lambda pair: len(pair[0]))
test_data = sorted(test_data, key=lambda pair: len(pair[0]))
# 정렬된 데이터를 다시 X_train과 y_train으로 나눕니다.
X_train, y_train = zip(*train_data)
X_test, y_test = zip(*test_data)
# 리스트로 변환 (zip 객체는 튜플이기 때문에 리스트로 변환해 줍니다)
X_train = list(X_train)
y_train = list(y_train)
X_test = list(X_test)
y_test = list(y_test)
padding_value = 0
# 미리 배치단위로 패딩 토큰을 처리해주자
def pad_sequences(sequences, max_len, padding_value=padding_value, curr_idx=None):
padded_sequences = []
for seq in sequences:
if len(seq) < max_len:
seq = seq + [padding_value] * (max_len - len(seq))
padded_sequences.append(seq)
return padded_sequences
batch_size = 64
def get_padded_word_idx_inputs(before_inputs):
padded_word_idx_inputs = []
for i in range(0, len(before_inputs), batch_size):
batch = before_inputs[i:i + batch_size]
max_len = max([len(seq) for seq in batch]) # 배치 내 최대 길이 계산
padded_batch = pad_sequences(batch, max_len,padding_value, i) # 배치 내 최대 길이로 패딩
padded_word_idx_inputs.extend(padded_batch)
check_len = len(padded_batch[0])
for seq in padded_batch:
assert len(seq) == check_len
return padded_word_idx_inputs
X_train = get_padded_word_idx_inputs(X_train)
X_test = get_padded_word_idx_inputs(X_test)
y_train = get_padded_word_idx_inputs(y_train)
y_test = get_padded_word_idx_inputs(y_test)
#Todo 문장단위 정렬 후 데이터 로더의 Shuffle을 끄게 되면 학습의 악영향이 있을 수 있다고 한다. 동적인 패딩처리를 고려해 보자.
전처리가 조금 복잡한데 pad 토큰을 추가하기 위해서 조금 복잡해졌다.
이렇게 처리하고 나면 각 배치별로 인풋 길이를 맞춰줄 수 있다.
또한 dataloader에 넣는 코드
class TextDataset(Dataset):
def __init__(self, x, y):
self.x = x
self.y = y
def __len__(self):
return len(self.x)
def __getitem__(self, idx):
return torch.tensor(self.x[idx]) ,torch.tensor(self.y[idx])
train_dataset = TextDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
다시 모델로 돌아와서
class RNN(nn.Module):
def __init__(self, hidden_size, input_size, vocab_size, batch_size):
super(RNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, input_size)
self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
self.W_y = nn.Linear(hidden_size, vocab_size)
self.tanh = nn.Tanh()
self.softmax = nn.Softmax(dim=1)
self.batch_size = batch_size
self.hidden_size = hidden_size
def fw(self, x, b_ht):
x = self.embedding(x)
b_ht = b_ht.to(x.device)
ht = self.W_h(torch.cat((x, b_ht), dim=1))
ht = self.tanh(ht)
yt = self.W_y(ht)
# yt = self.softmax(yt)
return yt, ht
# 입력 문장의 길이가 비슷한 배치가 섞여왔을것이라고 가정하고 진행.
def forward(self, x):
max_length = len(x[0])
b_ht = torch.zeros(len(x), self.hidden_size)
b_ht = b_ht.to(x.device)
outputs = []
for t in range(max_length):
x_t = x[:,t]
mask = (x_t != 0).float().unsqueeze(1) # 패딩 여부에 따른 마스크 계산 (배치 크기, 1)
yt, ht = self.fw(x[:,t], b_ht)
b_ht = ht * mask + b_ht * (1 - mask)
outputs.append(yt.unsqueeze(1))
outputs = torch.cat(outputs, dim=1)
return outputs
forward 부분을 보면 b_ht는 x의 len을 따라가고 (batch_size로 하니까 마지막 배치에서 에러남.. 개수가 부족해서)
첫 단어에는 0로 초기화해 준다.
그리고 여기서도 패딩여부에 따른 마스크 계산을 하는데 0이 아니면 1 0이면 0으로 만들어서 마지막에 이전벡터를 갱신할지, 또는 그대로 둘 지를 정한다.
여기까지 했으면 loss함수를 따로 정의해야 한다.
criterion = nn.CrossEntropyLoss(reduction='none', ignore_index=0) # 패딩 값을 무시하도록 설정
# 손실 함수 계산
def compute_loss(outputs, targets):
# targets는 시퀀스 전체에 대해 각 타임스텝에서의 정답 레이블을 포함
# outputs의 shape: [batch_size, seq_len, vocab_size]
# targets의 shape: [batch_size, seq_len]
mask = (targets != padding_value).float() # padding_token에 해당하는 부분을 0으로
# 타임스텝별로 손실 계산
loss = 0
for t in range(targets.size(1)):
output_t = outputs[:, t, :] # [batch_size, vocab_size]
target_t = targets[:, t] # [batch_size]
mask_t = mask[:, t] # [batch_size]
loss_t = criterion(output_t, target_t)
loss += (loss_t * mask_t).sum() # 마스킹된 손실 합산
return loss / mask.sum()
여기서도 패딩에 해당하는 부분들을 0으로 만들어주고
reduction = 'none'이라는 옵션을 주어서 배치 내 요소들의 개별 loss를 받아 mssk_t를 곱해줌으로써 마스킹된 것을 계산하지 않도록 만든다
또한 ignore_index=0으로 y 레이블에 패딩이 들어가도 무시하도록 하였다.
최종적으로 아래는 학습 코드이다.
model = RNN(256, 32, vocab_size, batch_size)
def train(train_loader, model, loss_fn, optimizer):
model.train()
size = len(train_loader.dataset)
for batch, (X, y) in enumerate(train_loader):
try:
X, y = X.to(DEVICE), y.to(DEVICE)
except:
print(f"x:{x}")
print(f"y:{y}")
continue
pred = model(X)
# 손실 계산
loss = loss_fn(pred, y)
# 역전파
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f'loss: {loss:>7f} [{current:>5d}]/{size:5d}')
def test(test_loader, model, loss_fn, optimizer):
model.eval()
size = len(test_loader.dataset)
num_batches = len(test_loader)
test_loss, correct = 0, 0
total_elements = 0
with torch.no_grad():
for batch, (X, y) in enumerate(test_loader):
try:
X, y = X.to(DEVICE), y.to(DEVICE)
except:
print(f"x:{x}")
print(f"y:{y}")
continue
pred = model(X)
# 손실 계산
loss = loss_fn(pred, y)
test_loss += loss.item()
mask = (y != padding_value).float()
# correct 계산
pred_labels = pred.argmax(dim=2) # 시퀀스 길이를 기준으로 argmax 계산
correct += ((pred_labels == y) * mask).sum().item() # 패딩된 부분 무시하고 정확도 계산
test_loss /= num_batches
correct /= total_elements
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:8f}\n")
return test_loss
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
EPOCH = 10
BATCH_SIZE = batch_size
LEARNING_RATE = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)
model = model.to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# StepLR 스케줄러 정의
scheduler = StepLR(optimizer, step_size=8, gamma=0.2)
print(model)
학습이 잘되는 거 같지 않고 문제도 다시 찾아야 하지만
RNN 구조를 직접 만들면서 언어모델에서의 패딩처리나 masking 처리가 왜 필요하고 어떤 식으로 이를 무시하게 할지 고민해 보는 게 좋은 경험이 된 것 같다
또한 RNN 모델의 구조적인 이해를 조금 더 깊이 한 것 같다.
코랩 링크
https://colab.research.google.com/drive/1hjD7SEvFxlrN4HSlUK_sPzq7oX7DLE4E?usp=sharing
'코딩 - > 인공지능' 카테고리의 다른 글
파이토치로 기본 사용법, 파이토치 입문 데이터 로드부터 mlp 모델까지 (0) | 2024.08.05 |
---|---|
Seaborn 사용법 Seaborn 그래프 종류별로 알아보기 파이썬 데이터 시각화 (1) | 2024.08.04 |
인공지능 용어 및 요약정리 (0) | 2021.12.13 |