169 lines
No EOL
6.1 KiB
Python
169 lines
No EOL
6.1 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
|
|
from sklearn.model_selection import train_test_split
|
|
|
|
from net.activation import Activations as af
|
|
from net.optimizer import Optimizers as opt
|
|
from net.loss import Loss
|
|
|
|
class MLP:
|
|
def __init__(self, architecture, activations, optimizer, loss_function):
|
|
self.architecture = architecture
|
|
self.activations = activations
|
|
self.optimizer = self.select_optimizer(optimizer)
|
|
self.loss_function = getattr(Loss, loss_function)
|
|
|
|
self.params = self.init_params()
|
|
self.activation_funcs = self.select_activations()
|
|
|
|
self.acc_store = []
|
|
self.loss_store = []
|
|
self.test_results = []
|
|
|
|
def init_params(self):
|
|
params = {}
|
|
for i in range(1, len(self.architecture)):
|
|
params[f'W{i}'] = np.random.randn(self.architecture[i], self.architecture[i-1]) * 0.01
|
|
params[f'b{i}'] = np.zeros((self.architecture[i], 1))
|
|
return params
|
|
|
|
def select_activations(self):
|
|
activation_funcs = []
|
|
for activation in self.activations:
|
|
activation_funcs.append(getattr(af, activation))
|
|
return activation_funcs
|
|
|
|
def select_optimizer(self, optimizer_name):
|
|
return getattr(opt, optimizer_name)
|
|
|
|
def forward_prop(self, X):
|
|
A = X
|
|
caches = []
|
|
for i in range(1, len(self.architecture)):
|
|
W = self.params[f'W{i}']
|
|
b = self.params[f'b{i}']
|
|
Z = np.dot(W, A) + b
|
|
A = self.activation_funcs[i-1](Z)
|
|
caches.append((A, W, b, Z))
|
|
return A, caches
|
|
|
|
def backward_prop(self, AL, Y, caches):
|
|
grads = {}
|
|
L = len(caches)
|
|
|
|
# Ensure Y is a 2D array
|
|
Y = Y.reshape(-1, 1) if Y.ndim == 1 else Y
|
|
m = Y.shape[1]
|
|
|
|
Y = self.one_hot(Y)
|
|
|
|
dAL = AL - Y
|
|
current_cache = caches[L-1]
|
|
grads[f"dA{L}"], grads[f"dW{L}"], grads[f"db{L}"] = self.linear_activation_backward(
|
|
dAL, current_cache, self.activation_funcs[L-1].__name__)
|
|
|
|
for l in reversed(range(L-1)):
|
|
current_cache = caches[l]
|
|
dA_prev_temp, dW_temp, db_temp = self.linear_activation_backward(
|
|
grads[f"dA{l+2}"], current_cache, self.activation_funcs[l].__name__)
|
|
grads[f"dA{l+1}"] = dA_prev_temp
|
|
grads[f"dW{l+1}"] = dW_temp
|
|
grads[f"db{l+1}"] = db_temp
|
|
|
|
return grads
|
|
|
|
def one_hot(self, Y):
|
|
num_classes = self.architecture[-1]
|
|
if Y.ndim == 1:
|
|
return np.eye(num_classes)[Y]
|
|
else:
|
|
return np.eye(num_classes)[Y.reshape(-1)].T
|
|
|
|
def linear_activation_backward(self, dA, cache, activation):
|
|
A_prev, W, b, Z = cache
|
|
m = A_prev.shape[1]
|
|
|
|
if activation == "Softmax":
|
|
dZ = dA
|
|
elif activation == "ReLU":
|
|
dZ = dA * af.ReLU_deriv(Z)
|
|
else:
|
|
raise ValueError(f"Backward propagation not implemented for {activation}")
|
|
|
|
dW = 1 / m * np.dot(dZ, A_prev.T)
|
|
db = 1 / m * np.sum(dZ, axis=1, keepdims=True)
|
|
dA_prev = np.dot(W.T, dZ)
|
|
|
|
return dA_prev, dW, db
|
|
|
|
def get_predictions(self, A):
|
|
return np.argmax(A, axis=0)
|
|
|
|
def get_accuracy(self, predictions, Y):
|
|
return np.mean(predictions == Y)
|
|
|
|
def train(self, X, Y, alpha, iterations, validation_split=0.2):
|
|
X_train, X_val, Y_train, Y_val = train_test_split(X.T, Y, test_size=validation_split, shuffle=True, random_state=42)
|
|
X_train, X_val = X_train.T, X_val.T
|
|
|
|
# Ensure Y_train and Y_val are 1D arrays
|
|
Y_train = Y_train.ravel()
|
|
Y_val = Y_val.ravel()
|
|
|
|
for i in range(iterations):
|
|
AL, caches = self.forward_prop(X_train)
|
|
grads = self.backward_prop(AL, Y_train, caches)
|
|
|
|
self.params = self.optimizer(self.params, grads, alpha)
|
|
|
|
if i % 10 == 0:
|
|
train_preds = self.get_predictions(AL)
|
|
train_acc = self.get_accuracy(train_preds, Y_train)
|
|
train_loss = self.loss_function(self.one_hot(Y_train), AL)
|
|
|
|
AL_val, _ = self.forward_prop(X_val)
|
|
val_preds = self.get_predictions(AL_val)
|
|
val_acc = self.get_accuracy(val_preds, Y_val)
|
|
val_loss = self.loss_function(self.one_hot(Y_val), AL_val)
|
|
|
|
print(f"Iteration {i}")
|
|
print(f"Training Accuracy: {train_acc:.4f}, Validation Accuracy: {val_acc:.4f}")
|
|
print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")
|
|
print("-------------------------------------------------------")
|
|
|
|
self.acc_store.append((train_acc, val_acc))
|
|
self.loss_store.append((train_loss, val_loss))
|
|
|
|
return self.params
|
|
|
|
def test(self, X_test, Y_test):
|
|
AL, _ = self.forward_prop(X_test)
|
|
predictions = self.get_predictions(AL)
|
|
test_accuracy = self.get_accuracy(predictions, Y_test)
|
|
test_loss = self.loss_function(self.one_hot(Y_test), AL)
|
|
|
|
self.test_results.append((test_accuracy, test_loss))
|
|
|
|
print(f"Test Accuracy: {test_accuracy:.4f}")
|
|
print(f"Test Loss: {test_loss:.4f}")
|
|
|
|
def save_model(self, dataset):
|
|
weights_file = f"weights/{dataset}_{self.activation_funcs[0].__name__}_weights.npz"
|
|
results_file = f"results/{dataset}_{self.activation_funcs[0].__name__}_results.csv"
|
|
|
|
np.savez(weights_file, **self.params)
|
|
|
|
train_df = pd.DataFrame(self.acc_store, columns=["training_accuracy", "validation_accuracy"])
|
|
loss_df = pd.DataFrame(self.loss_store, columns=["training_loss", "validation_loss"])
|
|
test_df = pd.DataFrame(self.test_results, columns=['test_accuracy', 'test_loss'])
|
|
|
|
combined_df = pd.concat([train_df, loss_df, test_df], axis=1)
|
|
combined_df.to_csv(results_file, index=False)
|
|
|
|
print(f"Weights saved to {weights_file}")
|
|
print(f"Results saved to {results_file}")
|
|
|
|
def load_weights(self, file_name):
|
|
data = np.load(file_name)
|
|
self.params = {key: data[key] for key in data.files} |