From 708a8e7222b206015cb2323f8772b5d3e2469bc1 Mon Sep 17 00:00:00 2001 From: Murtadha Date: Fri, 27 Sep 2024 11:01:14 -0400 Subject: [PATCH] Rewrite code with pytorch code --- bel_semantics.ipynb | 308 ++++++++++++++++++++++++++++++-------------- 1 file changed, 213 insertions(+), 95 deletions(-) diff --git a/bel_semantics.ipynb b/bel_semantics.ipynb index 3904138..f0924cc 100644 --- a/bel_semantics.ipynb +++ b/bel_semantics.ipynb @@ -2,15 +2,21 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ - "import net.modules\n", + "import torch\n", "\n", + "import torch.nn as nn\n", + "import torch.optim as optim\n", "import numpy as np\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", "\n", - "from net.transcoder import Transcoder" + "from torch.utils.data import DataLoader, TensorDataset\n", + "from sklearn.model_selection import train_test_split\n", + "from tqdm import tqdm" ] }, { @@ -19,138 +25,250 @@ "metadata": {}, "outputs": [], "source": [ - "filepath = 'data/bel_data_test.csv'\n", - "train_loader, test_loader, input_size = load_and_prepare_data(file_path=filepath)\n", - "\n", - "print(\"X_train shape:\", input_size.shape)" + "# Check if CUDA is available\n", + "device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", + "print(f\"Using device: {device}\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ - "# input_size = X_train.shape[0]\n", - "# hidden_size = 128\n", - "# output_size = 61\n", + "data = pd.read_csv('data/bel_data_test.csv')\n", + "# Load the data\n", + "data = np.array(data)\n", "\n", - "architecture = [input_size, [128], 61]\n", - "activations = ['leaky_relu','softmax']" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Initialize transcoder" + "# Split features and labels\n", + "X = data[:, 1:] # All columns except the first one\n", + "y = data[:, 0].astype(int) # First column as labels\n", + "\n", + "# Split the data into training and testing sets\n", + "X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ - "# bl_transcoder = Transcoder(input_size, hidden_size, output_size, 'leaky_relu', 'softmax')\n", - "bl_transcoder = Transcoder(architecture, hidden_activation='relu', output_activation='softmax')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Train Encoders and save weights\n" + "# Convert to PyTorch tensors\n", + "X_train_tensor = torch.FloatTensor(X_train)\n", + "y_train_tensor = torch.LongTensor(y_train)\n", + "X_test_tensor = torch.FloatTensor(X_test)\n", + "y_test_tensor = torch.LongTensor(y_test)\n", + "\n", + "# Create DataLoader objects\n", + "train_dataset = TensorDataset(X_train_tensor, y_train_tensor)\n", + "test_dataset = TensorDataset(X_test_tensor, y_test_tensor)\n", + "\n", + "train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)\n", + "test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ - "# # Train the encoder if need\n", + "# Define the MLP\n", + "class MLP(nn.Module):\n", + " def __init__(self):\n", + " super(MLP, self).__init__()\n", + " self.input_layer = nn.Linear(1024, 512)\n", + " self.h1_layer = nn.Linear(512, 64)\n", + " self.h2_layer = nn.Linear(64, 62)\n", + " self.relu = nn.ReLU()\n", "\n", - "bl_transcoder.train_model(train_loader, test_loader, learning_rate=0.001, epochs=1000)\n", - "# bl_transcoder.train_with_validation(X_train, Y_train, alpha=0.1, iterations=1000)\n", - "bl_transcoder.save_results('bt_1h128n')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Load weights" + " def forward(self, x):\n", + " x = self.relu(self.input_layer(x))\n", + " x = self.h1_layer(x)\n", + " x = self.h2_layer(x)\n", + " return x\n", + "\n", + "# Define the Decoder\n", + "class Decoder(nn.Module):\n", + " def __init__(self):\n", + " super(Decoder, self).__init__()\n", + " self.h2_h1 = nn.Linear(64, 512)\n", + " self.h1_input = nn.Linear(512, 1024)\n", + " self.relu = nn.ReLU()\n", + "\n", + " def forward(self, x):\n", + " x = self.relu(self.h2_h1(x))\n", + " x = self.h1_input(x)\n", + " return x\n", + "\n", + "class MLPWithDecoder(nn.Module):\n", + " def __init__(self):\n", + " super(MLPWithDecoder, self).__init__()\n", + " self.mlp = MLP()\n", + " self.decoder = Decoder()\n", + "\n", + " def forward(self, x):\n", + " # MLP forward pass\n", + " h1 = self.mlp.relu(self.mlp.input_layer(x))\n", + " h2 = self.mlp.relu(self.mlp.h1_layer(h1))\n", + " output = self.mlp.h2_layer(h2)\n", + " \n", + " # Reconstruction\n", + " reconstruction = self.decoder(h2)\n", + " \n", + " return output, reconstruction" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ - "bl_transcoder.load_weights('weights/bt_1h128n_leaky_relu_weights.pth')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Analysis" + "# Function to reconstruct an image\n", + "def reconstruct_image(model, image):\n", + " model.eval()\n", + " with torch.no_grad():\n", + " _, reconstruction = model(image.unsqueeze(0))\n", + " return reconstruction.squeeze(0)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ - "# Plot learning curves\n", - "bl_transcoder.plot_learning_curves()\n", - "\n", - "# Visualize encoded space\n", - "bl_transcoder.plot_encoded_space(X_test, Y_test)\n", - "\n", - "print(X_test.shape)\n", - "print(X_train.shape)\n", - "# Check reconstructions\n", - "bl_transcoder.plot_reconstructions(X_test)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Transcode images" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "num_images = 2\n", - "indices = np.random.choice(X_test.shape[1], num_images, replace=False)\n", - "\n", - "for idx in indices:\n", - " original_image = X_test[:, idx]\n", + "def show_image_comparison(original, reconstructed, label, prediction):\n", + " \"\"\"\n", + " Display the original and reconstructed images side by side.\n", " \n", - " # Encode the image\n", - " encoded = bl_transcoder.encode_image(original_image.reshape(-1, 1))\n", + " :param original: Original image (1D tensor of 1024 elements)\n", + " :param reconstructed: Reconstructed image (1D tensor of 1024 elements)\n", + " :param label: True label of the image\n", + " :param prediction: Predicted label of the image\n", + " \"\"\"\n", + " # Convert to numpy arrays and move to CPU if they're on GPU\n", + " original = original.cpu().numpy()\n", + " reconstructed = reconstructed.cpu().numpy()\n", " \n", - " # Decode the image\n", - " decoded = bl_transcoder.decode_image(encoded)\n", + " # Reshape the 1D arrays to 32x32 images\n", + " original_img = original.reshape(32, 32)\n", + " reconstructed_img = reconstructed.reshape(32, 32)\n", + " \n", + " # Create a figure with two subplots side by side\n", + " fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))\n", + " \n", + " # Show original image\n", + " ax1.imshow(original_img, cmap='gray')\n", + " ax1.set_title(f'Original (Label: {label})')\n", + " ax1.axis('off')\n", + " \n", + " # Show reconstructed image\n", + " ax2.imshow(reconstructed_img, cmap='gray')\n", + " ax2.set_title(f'Reconstructed (Predicted: {prediction})')\n", + " ax2.axis('off')\n", + " \n", + " plt.tight_layout()\n", + " plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize the model, loss function, and optimizer\n", + "model = MLPWithDecoder()\n", + "criterion = nn.CrossEntropyLoss()\n", + "reconstruction_criterion = nn.MSELoss()\n", + "optimizer = optim.Adam(model.parameters())\n", "\n", - " # Visualize original, encoded, and decoded images\n", - " visualize_transcoding(original_image, encoded, decoded, idx)\n", - "\n", - " print(f\"Image {idx}:\")\n", - " print(\"Original shape:\", original_image.shape)\n", - " print(\"Encoded shape:\", encoded.shape)\n", - " print(\"Decoded shape:\", decoded.shape)\n", - " print(\"Encoded vector:\", encoded.flatten()) # Print flattened encoded vector\n", - " print(\"\\n\")" + "model = model.to(device)\n", + "criterion = criterion.to(device)\n", + "reconstruction_criterion = reconstruction_criterion.to(device)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "num_epochs = 250\n", + "for epoch in range(num_epochs):\n", + " model.train() # Set the model to training mode\n", + " running_loss = 0.0\n", + " \n", + " # Use tqdm for a progress bar\n", + " with tqdm(train_loader, unit=\"batch\") as tepoch:\n", + " for images, labels in tepoch:\n", + " tepoch.set_description(f\"Epoch {epoch+1}\")\n", + " \n", + " images, labels = images.to(device), labels.to(device)\n", + " \n", + " optimizer.zero_grad()\n", + " \n", + " outputs, reconstructions = model(images)\n", + " \n", + " classification_loss = criterion(outputs, labels)\n", + " reconstruction_loss = reconstruction_criterion(reconstructions, images)\n", + " total_loss = classification_loss + reconstruction_loss\n", + " \n", + " total_loss.backward()\n", + " optimizer.step()\n", + " \n", + " running_loss += total_loss.item()\n", + " \n", + " tepoch.set_postfix(loss=total_loss.item())\n", + " \n", + " epoch_loss = running_loss / len(train_loader)\n", + " \n", + " # print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model.eval() # Set the model to evaluation mode\n", + "with torch.no_grad():\n", + " try:\n", + " # Get a batch of test data\n", + " images, labels = next(iter(test_loader))\n", + " \n", + " # Move data to the same device as the model\n", + " images = images.to(device)\n", + " labels = labels.to(device)\n", + " \n", + " # Forward pass through the model\n", + " outputs, reconstructions = model(images)\n", + " \n", + " # Get predicted labels\n", + " _, predicted = torch.max(outputs.data, 1)\n", + " \n", + " # Display the first few images in the batch\n", + " num_images_to_show = min(5, len(images))\n", + " for i in range(num_images_to_show):\n", + " show_image_comparison(\n", + " images[i], \n", + " reconstructions[i], \n", + " labels[i].item(), \n", + " predicted[i].item()\n", + " )\n", + " \n", + " # Calculate and print accuracy\n", + " correct = (predicted == labels).sum().item()\n", + " total = labels.size(0)\n", + " accuracy = 100 * correct / total\n", + " print(f'Test Accuracy: {accuracy:.2f}%')\n", + " \n", + " except Exception as e:\n", + " print(f\"An error occurred during evaluation: {str(e)}\")" ] }, {