In [1]:
import torch

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [2]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [3]:
data = pd.read_csv('data/bel_data_test.csv')
data = np.array(data)

# Split features and labels
X = data[:, 1:]  # All columns except the first one
y = data[:, 0].astype(int)  # First column as labels

# Create test set from the first thousand rows
X_test = X[:1000]
y_test = y[:1000]

# Use the remaining data for train and validation
X_remaining = X[1000:]
y_remaining = y[1000:]

# Split the remaining data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_remaining, y_remaining, test_size=0.2, random_state=42)

# Print the shapes of the resulting sets
print(f"Train set shape: {X_train.shape}")
print(f"Validation set shape: {X_val.shape}")
print(f"Test set shape: {X_test.shape}")

Train set shape: (2860, 1024)
Validation set shape: (715, 1024)
Test set shape: (1000, 1024)


In [4]:
# # Convert to PyTorch tensors
# X_train_tensor = torch.FloatTensor(X_train)
# y_train_tensor = torch.LongTensor(y_train)
# X_test_tensor = torch.FloatTensor(X_test)
# y_test_tensor = torch.LongTensor(y_test)

# # Create DataLoader objects
# train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
# test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [5]:
class SemanticsMLP:
    def __init__(self, input_size=1024, hidden_sizes=[512, 256, 128], num_classes=62):
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.num_classes = num_classes

        # Initialize weights and biases
        self.encoder_weights = []
        self.encoder_biases = []
        prev_size = input_size
        for hidden_size in hidden_sizes:
            self.encoder_weights.append(np.random.randn(prev_size, hidden_size) * np.sqrt(2. / prev_size))
            self.encoder_biases.append(np.zeros(hidden_size))
            prev_size = hidden_size

        self.classifier_weight = np.random.randn(hidden_sizes[-1], num_classes) * np.sqrt(2. / hidden_sizes[-1])
        self.classifier_bias = np.zeros(num_classes)

        self.decoder_weights = []
        self.decoder_biases = []
        reversed_hidden_sizes = list(reversed(hidden_sizes))
        prev_size = hidden_sizes[-1]
        for hidden_size in reversed_hidden_sizes[1:] + [input_size]:
            self.decoder_weights.append(np.random.randn(prev_size, hidden_size) * np.sqrt(2. / prev_size))
            self.decoder_biases.append(np.zeros(hidden_size))
            prev_size = hidden_size

    def relu(self, x):
        return np.maximum(0, x)

    def encode(self, x):
        for weight, bias in zip(self.encoder_weights, self.encoder_biases):
            x = self.relu(np.dot(x, weight) + bias)
        return x

    def decode(self, x):
        for weight, bias in zip(self.decoder_weights[:-1], self.decoder_biases[:-1]):
            x = self.relu(np.dot(x, weight) + bias)
        x = np.dot(x, self.decoder_weights[-1]) + self.decoder_biases[-1]  # No activation on the final layer
        return x

    def forward(self, x):
        encoded = self.encode(x)
        logits = np.dot(encoded, self.classifier_weight) + self.classifier_bias
        reconstructed = self.decode(encoded)
        return logits, reconstructed

In [6]:
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

def cross_entropy_loss(y_pred, y_true):
    m = y_true.shape[0]
    p = softmax(y_pred)
    log_likelihood = -np.log(p[range(m), y_true])
    loss = np.sum(log_likelihood) / m
    return loss

def cross_entropy_gradient(y_pred, y_true):
    m = y_true.shape[0]
    grad = softmax(y_pred)
    grad[range(m), y_true] -= 1
    grad = grad / m
    return grad

def mse_loss(y_pred, y_true):
    return np.mean((y_pred - y_true) ** 2)

def mse_gradient(y_pred, y_true):
    return 2 * (y_pred - y_true) / y_true.shape[0]

def train_step(model, X, y, learning_rate):
    # Forward pass
    logits, reconstructed = model.forward(X)
    
    # Compute gradients
    ce_grad = cross_entropy_gradient(logits, y)
    mse_grad = mse_gradient(reconstructed, X)
    
    # Backpropagation (simplified, not computing full gradients for all layers)
    encoded = model.encode(X)
    
    # Update classifier
    model.classifier_weight -= learning_rate * np.dot(encoded.T, ce_grad)
    model.classifier_bias -= learning_rate * np.sum(ce_grad, axis=0)
    
    # Update decoder (last layer only for simplicity)
    decoder_grad = np.dot(encoded.T, mse_grad)
    if decoder_grad.shape != model.decoder_weights[-1].shape:
        raise ValueError(f"Shape mismatch: decoder_grad {decoder_grad.shape}, decoder_weights[-1] {model.decoder_weights[-1].shape}")
    model.decoder_weights[-1] -= learning_rate * decoder_grad
    model.decoder_biases[-1] -= learning_rate * np.sum(mse_grad, axis=0)
    
    # Compute loss
    ce_loss = cross_entropy_loss(logits, y)
    mse_loss_val = mse_loss(reconstructed, X)
    
    return ce_loss, mse_loss_val

def evaluate(model, X, y):
    logits, reconstructed = model.forward(X)
    ce_loss = cross_entropy_loss(logits, y)
    mse_loss_val = mse_loss(reconstructed, X)
    accuracy = np.mean(np.argmax(logits, axis=1) == y)
    return ce_loss, mse_loss_val, accuracy

In [7]:
def show_image_comparison(original, reconstructed, label, prediction):
    """
    Display the original and reconstructed images side by side.
    
    :param original: Original image (1D tensor of 1024 elements)
    :param reconstructed: Reconstructed image (1D tensor of 1024 elements)
    :param label: True label of the image
    :param prediction: Predicted label of the image
    """
    # Convert to numpy arrays and move to CPU if they're on GPU
    original = original.cpu().numpy()
    reconstructed = reconstructed.cpu().numpy()
    
    # Reshape the 1D arrays to 32x32 images
    original_img = original.reshape(32, 32)
    reconstructed_img = reconstructed.reshape(32, 32)
    
    # Create a figure with two subplots side by side
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    
    # Show original image
    ax1.imshow(original_img, cmap='gray')
    ax1.set_title(f'Original (Label: {label})')
    ax1.axis('off')
    
    # Show reconstructed image
    ax2.imshow(reconstructed_img, cmap='gray')
    ax2.set_title(f'Reconstructed (Predicted: {prediction})')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

In [8]:
input_size = X_train.shape[1]
num_classes = len(np.unique(y))
model = SemanticsMLP(input_size=input_size, hidden_sizes=[512, 256, 128], num_classes=num_classes)

In [9]:
epochs = 100
batch_size = 32
learning_rate = 0.001

for epoch in range(epochs):
    # Shuffle the training data
    indices = np.arange(X_train.shape[0])
    np.random.shuffle(indices)
    X_train = X_train[indices]
    y_train = y_train[indices]
    
    # Mini-batch training
    for i in range(0, X_train.shape[0], batch_size):
        X_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]
        
        try:
            ce_loss, mse_loss_val = train_step(model, X_batch, y_batch, learning_rate)
        except ValueError as e:
            print(f"Error in batch {i // batch_size}: {e}")
            print(f"X_batch shape: {X_batch.shape}")
            print(f"y_batch shape: {y_batch.shape}")
            raise
    
    # Evaluate on validation set
    val_ce_loss, val_mse_loss, val_accuracy = evaluate(model, X_val, y_val)
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Val CE Loss: {val_ce_loss:.4f}, Val MSE Loss: {val_mse_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")



ValueError: operands could not be broadcast together with shapes (512,1024) (128,1024) (512,1024) 

In [None]:
test_ce_loss, test_mse_loss, test_accuracy = evaluate(model, X_test, y_test)
print(f"Final Test CE Loss: {test_ce_loss:.4f}, Test MSE Loss: {test_mse_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")