CRWU凯斯西储大学轴承数据,12k频率,十分类

admin2024-05-15  3

CRWU凯斯西储大学轴承数据,12k频率,十分类,在这里插入图片描述,第1张
CRWU凯斯西储大学轴承数据,12k频率,十分类。

from torch.utils.data import Dataset, DataLoader
from scipy.io import loadmat
import numpy as np
import os
from sklearn import preprocessing  # 0-1编码
from sklearn.model_selection import StratifiedShuffleSplit  # 随机划分,保证每一类比例相同
import torch
from torch import nn
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch.optim as optim

def prepro(d_path, length=0, number=0, normal=True, rate=[0, 0, 0], enc=False, enc_step=28):
    # 获得该文件夹下所有.mat文件名
    filenames = os.listdir(d_path)

    def capture(original_path):
        files = {}
        for i in filenames:
            # 文件路径
            file_path = os.path.join(d_path, i)
            file = loadmat(file_path)
            file_keys = file.keys()
            for key in file_keys:
                if 'DE' in key:
                    files[i] = file[key].ravel()
        return files

    def slice_enc(data, slice_rate= rate[1]):
        keys = data.keys()
        Train_Samples = {}
        Test_Samples = {}
        for i in keys:
            slice_data = data[i]

            all_lenght = len(slice_data)
            # end_index = int(all_lenght * (1 - slice_rate))
            samp_train = int(number * (1 - slice_rate))  # 1000(1-0.3)
            Train_sample = []
            Test_Sample = []

            for j in range(samp_train):
                sample = slice_data[j * 150: j * 150 + length]
                Train_sample.append(sample)

            # 抓取测试数据
            for h in range(number - samp_train):
                sample = slice_data[samp_train * 150 + length + h * 150: samp_train * 150 + length + h * 150 + length]
                Test_Sample.append(sample)
            Train_Samples[i] = Train_sample
            Test_Samples[i] = Test_Sample
        return Train_Samples, Test_Samples

    # 仅抽样完成,打标签
    def add_labels(train_test):
        X = []
        Y = []
        label = 0
        for i in filenames:
            x = train_test[i]
            X += x
            lenx = len(x)
            Y += [label] * lenx
            label += 1
        return X, Y

    def scalar_stand(Train_X, Test_X):
        # 用训练集标准差标准化训练集以及测试集
        data_all = np.vstack((Train_X, Test_X))
        scalar = preprocessing.StandardScaler().fit(data_all)
        Train_X = scalar.transform(Train_X)
        Test_X = scalar.transform(Test_X)
        return Train_X, Test_X

    def valid_test_slice(Test_X, Test_Y):

        test_size = rate[2] / (rate[1] + rate[2])
        ss = StratifiedShuffleSplit(n_splits=1, test_size=test_size)
        Test_Y = np.asarray(Test_Y, dtype=np.int32)

        for train_index, test_index in ss.split(Test_X, Test_Y):
            X_valid, X_test = Test_X[train_index], Test_X[test_index]
            Y_valid, Y_test = Test_Y[train_index], Test_Y[test_index]

            return X_valid, Y_valid, X_test, Y_test

    # 从所有.mat文件中读取出数据的字典
    data = capture(original_path=d_path)
    # 将数据切分为训练集、测试集
    train, test = slice_enc(data)
    # 为训练集制作标签,返回X,Y
    Train_X, Train_Y = add_labels(train)
    # 为测试集制作标签,返回X,Y
    Test_X, Test_Y = add_labels(test)
    # for i in Test_X:
    #     print(i.shape)
    # for i in Train_X:
    #     print(i.shape)
    # Train_X = np.stack(Train_X,axis=0)
    # Test_X = np.stack(Test_X,axis=0)
    # print(Train_X.shape,Test_X.shape)

    # 训练数据/测试数据 是否标准化.
    if normal:
        Train_X, Test_X = scalar_stand(Train_X, Test_X)

    # 将测试集切分为验证集和测试集.
    # Valid_X, Valid_Y, Test_X, Test_Y = valid_test_slice(Test_X, Test_Y)
    return Train_X, Train_Y,  Test_X, Test_Y


num_classes = 10  # 样本类别
length = 224*224  # 样本长度
number = 140  # 每类样本的数量
normal = True  # 是否标准化
rate = [0.5, 0.25, 0.25]  # 测试集验证集划分比例




class BearingDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

def create_dataloader(data, labels, batch_size=32, shuffle=True, num_workers=0):
    dataset = BearingDataset(data, labels)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)
    return dataloader

# 使用前面定义的函数处理数据
path = './data12k'  # 注意路径格式可能需要根据您的操作系统调整
x_train, y_train,  x_test, y_test = prepro(
    d_path=path,
    length=112*112,  # 样本长度
    number=250,  # 每类样本的数量
    normal=True,  # 是否标准化
    rate=[0.8, 0.2]  # 测试集验证集划分比例
)

# 创建 DataLoader
train_loader = create_dataloader(x_train, y_train, batch_size=32, shuffle=True, num_workers=0)
test_loader = create_dataloader(x_test, y_test, batch_size=32, shuffle=False, num_workers=0)


class AlexNet(nn.Module):
    def __init__(self):
        super(AlexNet, self).__init__()
        self.Conv1 = nn.Conv2d(1, 24, 15, 3, 2)  # [1, 48, 107, 107]
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(2)  # [1, 24, 35, 35]

        self.Conv2 = nn.Conv2d(24, 64, 5, 1, 2)  # [1, 64, 37, 37]
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(2)  # [1, 64, 18, 18]

        self.Conv3 = nn.Conv2d(64, 96, 2, 1, 1)  # [1, 96, 19, 19]
        self.relu3 = nn.ReLU()
        self.Conv4 = nn.Conv2d(96, 96, 2, 1, 1)  # [1, 96, 20, 20]
        self.relu4 = nn.ReLU()
        self.Conv5 = nn.Conv2d(96, 64, 2, 1, 1)  # [1, 64, 21, 21]
        self.relu5 = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(3)  # [1, 64, 7, 7]

        self.Dro1 = nn.Dropout(p=0.5)
        self.flatten = nn.Flatten()
        self.line1 = nn.Linear(64 * 3 * 3, 1000)
        self.relu6 = nn.ReLU()
        self.Dro2 = nn.Dropout(p=0.5)
        self.line2 = nn.Linear(1000, 1000)
        self.relu7 = nn.ReLU()
        self.line3 = nn.Linear(1000, 500)
        self.line4 = nn.Linear(500, 10)

    def forward(self, x):
        x = self.Conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        x = self.Conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        x = self.Conv3(x)
        x = self.relu3(x)
        x = self.Conv4(x)
        x = self.relu4(x)
        x = self.Conv5(x)
        x = self.relu5(x)
        x = self.maxpool3(x)

        x = self.Dro1(x)
        x = self.flatten(x)
        x = self.line1(x)
        x = self.relu6(x)
        x = self.Dro2(x)
        x = self.line2(x)
        x = self.relu7(x)
        x = self.line3(x)
        x = self.line4(x)

        return x


def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs.view(-1,1,112,112))
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    return running_loss / len(train_loader)

def evaluate_model(model, data_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs.view(-1,1,112,112))
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1_score, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    return accuracy, precision, recall, f1_score


def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AlexNet().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    num_epochs = 50

    # 假设 train_loader 和 test_loader 已经被创建
    best_acc=0
    for epoch in range(num_epochs):
        train_loss = train_model(model, train_loader, criterion, optimizer, device)
        print(f'Epoch {epoch+1}, Loss: {train_loss:.4f}')
        accuracy, precision, recall, f1_score = evaluate_model(model, test_loader, device)
        print(f'Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1_score:.4f}')
        if best_acc<accuracy:
            best_acc = accuracy
            torch.save(model.state_dict(), 'best_alexnet.pth')

if __name__ == "__main__":
    main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!