In [1]:
import torch

import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [3]:
data = pd.read_csv('data/bel_data_test.csv')
# Load the data
data = np.array(data)

# Split features and labels
X = data[:, 1:]  # All columns except the first one
y = data[:, 0].astype(int)  # First column as labels

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [4]:
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)

# Create DataLoader objects
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [5]:
# Define the MLP
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.input_layer = nn.Linear(1024, 512)
        self.h1_layer = nn.Linear(512, 64)
        self.h2_layer = nn.Linear(64, 62)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.input_layer(x))
        x = self.h1_layer(x)
        x = self.h2_layer(x)
        return x

# Define the Decoder
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.h2_h1 = nn.Linear(64, 512)
        self.h1_input = nn.Linear(512, 1024)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.h2_h1(x))
        x = self.h1_input(x)
        return x

class MLPWithDecoder(nn.Module):
    def __init__(self):
        super(MLPWithDecoder, self).__init__()
        self.mlp = MLP()
        self.decoder = Decoder()

    def forward(self, x):
        # MLP forward pass
        h1 = self.mlp.relu(self.mlp.input_layer(x))
        h2 = self.mlp.relu(self.mlp.h1_layer(h1))
        output = self.mlp.h2_layer(h2)
        
        # Reconstruction
        reconstruction = self.decoder(h2)
        
        return output, reconstruction

In [6]:
# Function to reconstruct an image
def reconstruct_image(model, image):
    model.eval()
    with torch.no_grad():
        _, reconstruction = model(image.unsqueeze(0))
    return reconstruction.squeeze(0)

In [7]:
def show_image_comparison(original, reconstructed, label, prediction):
    """
    Display the original and reconstructed images side by side.
    
    :param original: Original image (1D tensor of 1024 elements)
    :param reconstructed: Reconstructed image (1D tensor of 1024 elements)
    :param label: True label of the image
    :param prediction: Predicted label of the image
    """
    # Convert to numpy arrays and move to CPU if they're on GPU
    original = original.cpu().numpy()
    reconstructed = reconstructed.cpu().numpy()
    
    # Reshape the 1D arrays to 32x32 images
    original_img = original.reshape(32, 32)
    reconstructed_img = reconstructed.reshape(32, 32)
    
    # Create a figure with two subplots side by side
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    
    # Show original image
    ax1.imshow(original_img, cmap='gray')
    ax1.set_title(f'Original (Label: {label})')
    ax1.axis('off')
    
    # Show reconstructed image
    ax2.imshow(reconstructed_img, cmap='gray')
    ax2.set_title(f'Reconstructed (Predicted: {prediction})')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

In [8]:
# Initialize the model, loss function, and optimizer
model = MLPWithDecoder()
criterion = nn.CrossEntropyLoss()
reconstruction_criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

model = model.to(device)
criterion = criterion.to(device)
reconstruction_criterion = reconstruction_criterion.to(device)

In [None]:
num_epochs = 250
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    
    # Use tqdm for a progress bar
    with tqdm(train_loader, unit="batch") as tepoch:
        for images, labels in tepoch:
            tepoch.set_description(f"Epoch {epoch+1}")
            
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            
            outputs, reconstructions = model(images)
            
            classification_loss = criterion(outputs, labels)
            reconstruction_loss = reconstruction_criterion(reconstructions, images)
            total_loss = classification_loss + reconstruction_loss
            
            total_loss.backward()
            optimizer.step()
            
            running_loss += total_loss.item()
            
            tepoch.set_postfix(loss=total_loss.item())
    
    epoch_loss = running_loss / len(train_loader)
    
    # print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

In [None]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    try:
        # Get a batch of test data
        images, labels = next(iter(test_loader))
        
        # Move data to the same device as the model
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass through the model
        outputs, reconstructions = model(images)
        
        # Get predicted labels
        _, predicted = torch.max(outputs.data, 1)
        
        # Display the first few images in the batch
        num_images_to_show = min(5, len(images))
        for i in range(num_images_to_show):
            show_image_comparison(
                images[i], 
                reconstructions[i], 
                labels[i].item(), 
                predicted[i].item()
            )
        
        # Calculate and print accuracy
        correct = (predicted == labels).sum().item()
        total = labels.size(0)
        accuracy = 100 * correct / total
        print(f'Test Accuracy: {accuracy:.2f}%')
        
    except Exception as e:
        print(f"An error occurred during evaluation: {str(e)}")