# -*- coding: utf-8 -*-
import os
import matplotlib.pyplot as plt
import numpy as np
import json
import pickle
from .utils import load_kp_data, save_kp_feature, save_mfcc_feature, load_mfcc_data, load_aedat_v3, un_tar, \
create_same_directory_structure, integrate_events_file_to_frames_file_by_fixed_frames_number, \
integrate_events_file_to_frames_file_by_fixed_duration
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import time
[docs]class Dataset(object):
r"""
All datasets that represent a map from keys to data samples should subclass it.
All subclasses should overwrite :meth:`__getitem__`, supporting fetching a data
sample for a given key.
Subclasses should also overwrite :meth:`__len__`, which is expected to return
the size of the sample dataset.
"""
def __init__(self, **kwargs):
super().__init__()
# 根据索引返回数据内容和标签
def __getitem__(self, index):
raise NotImplementedError
# 返回数据集大小
def __len__(self):
raise NotImplementedError
[docs]class CustomDataset(Dataset):
r"""
自定义数据集:
个人采集的实值数据
"""
def __init__(self, data=None, label=None):
super().__init__()
data_type = type(data)
label_type = type(label)
assert data_type is list or data_type is np.ndarray, "The type of data should be list or np.ndarray"
assert label_type is list or label_type is np.ndarray, "The type of label should be list or np.ndarray"
self.data = data
self.label = label
def __getitem__(self, index):
data = np.float32(self.data[index])
label = np.int64(self.label[index])
return data, label
def __len__(self):
return len(self.data)
[docs]class CustomSpikeDataset(Dataset):
r"""
自定义数据集:
编码后的脉冲数据(仅支持[spike_time, neuron_ids]表示)
"""
def __init__(self, spike_times=None, neuron_ids=None, label=None):
super().__init__()
# The shape of spike_times should be [sample_num, spikes_times_num]
spike_times_type = type(spike_times)
neuron_ids_type = type(neuron_ids)
label_type = type(label)
assert spike_times_type is list or spike_times_type is np.ndarray, "The type of data should be list or np.ndarray"
assert neuron_ids_type is list or neuron_ids_type is np.ndarray, "The type of data should be list or np.ndarray"
assert label_type is list or label_type is np.ndarray, "The type of label should be list or np.ndarray"
self.spike_times = spike_times
self.neuron_ids = neuron_ids
self.label = label
def __getitem__(self, index):
spike_time = self.spike_times[index]
neuron_id = self.neuron_ids[index]
label = self.label[index]
spiking = [spike_time, neuron_id]
return spiking, label
def __len__(self):
return len(self.data)
[docs]class SpecifiedDataset(Dataset):
r"""
labels load from json file
"""
def __init__(self, image_file, label_file):
super().__init__()
# 加载数据集
label_file = label_file
img_folder = image_file
fp = open(label_file, 'r')
data_dict = json.load(fp)
# 如果图像数和标签数不匹配说明数据集标注生成有问题,报错提示
assert len(data_dict['images']) == len(data_dict['labels'])
num_data = len(data_dict['images'])
self.filenames = []
self.labels = []
self.img_folder = img_folder
for i in range(num_data):
self.filenames.append(data_dict['images'][i])
self.labels.append(data_dict['labels'][i])
def __getitem__(self, index):
img_name = np.float32(self.img_folder + self.filenames[index])
label = np.int64(self.labels[index])
img = plt.imread(img_name)
return img, label
def __len__(self):
return len(self.filenames)
[docs]class cifar10(Dataset):
files = {
"train_dataset1": 'data_batch_1',
"train_dataset2": 'data_batch_2',
"train_dataset3": 'data_batch_3',
"train_dataset4": 'data_batch_4',
"train_dataset5": 'data_batch_5',
"test_dataset": 'test_batch'
}
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {
'train_images': [],
'test_images': [],
'train_labels': [],
'test_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
# 数据归一化到[0,1]
mean = (0.4914, 0.4822, 0.4465)
std = (0.2023, 0.1994, 0.2010)
if self._is_train:
img = (self.data['train_images'][index] / 255.0 - mean) / std
# img = (self.data['train_images'][index])
img = np.float32(img.transpose(2, 0, 1))
# img = np.float32(self.data['train_images'][index])
label = np.int64(self.data['train_labels'][index])
else:
img = (self.data['test_images'][index] / 255.0 - mean) / std
# img = (self.data['test_images'][index])
img = np.float32(img.transpose(2, 0, 1))
label = np.int64(self.data['test_labels'][index])
return img, label
def __len__(self):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
[docs] def load_cifar10_batch(self, folder_path, batch_id):
with open(folder_path + '/data_batch_' + str(batch_id), mode='rb') as file:
batch = pickle.load(file, encoding='latin1')
# features and labels
features = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1) # batch['data']
labels = batch['labels']
return features, labels
def _to_numpy_format(self):
self.data['train_images'], self.data['train_labels'] = self.load_cifar10_batch(self.root, 1)
for batch_id in range(2, 6):
features, labels = self.load_cifar10_batch(self.root, batch_id)
self.data['train_images'] = np.concatenate([self.data['train_images'], features])
self.data['train_labels'] = np.concatenate([self.data['train_labels'], labels])
with open(self.root + '/test_batch', mode='rb') as f:
batch = pickle.load(f, encoding='latin1')
self.data['test_images'] = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3,
1) # batch['data']
self.data['test_labels'] = batch['labels']
print(">> Dataset loaded")
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in cifar10.files.values():
if not os.path.isfile(os.path.join(self.root, file)):
return False
return True
else:
return False
[docs]class ImageNet(Dataset):
files = {
"train_dataset": 'ILSVRC2012_img_train.tar',
"val_dataset": 'ILSVRC2012_img_val.tar',
"val_label": 'ILSVRC2012_devkit_t12.tar.gz'
}
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {
'train_images': [],
'val_images': [],
'train_labels': [],
'val_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self.untar_train_tar(ImageNet.files['train_dataset'].split('.')[0])
val_dir = ImageNet.files['val_dataset'].split('.')[0]
devkit_dir = ImageNet.files['val_label'].split('.')[0]
self.move_val_img(val_dir=val_dir, devkit_dir=devkit_dir)
pass
# self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
# 数据归一化到[0,1]
if self._is_train:
img = np.float32(self.data['train_images'][index])
label = np.int64(self.data['train_labels'][index])
else:
img = np.float32(self.data['test_images'][index])
label = np.int64(self.data['test_labels'][index])
return img / 255.0, label
def __len__(self):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
[docs] def untar_train_tar(self, train_tar):
"""
untar images from train_tar and save in corresponding folders
organize like:
/train
/n01440764
images
/n01443537
images
.....
"""
root, _, files = next(os.walk(os.path.join(self.root, train_tar)))
for file in files:
un_tar(os.path.join(root, file), os.path.join(self.root, 'train'))
[docs] def move_val_img(self, val_dir, devkit_dir):
"""
move val_img to correspongding folders.
val_id(start from 1) -> ILSVRC_ID(start from 1) -> WIND
organize like:
/val
/n01440764
images
/n01443537
images
.....
"""
# load synset, val ground truth and val images list
from scipy import io
import shutil
devkit_dir_name = os.path.join(self.root, devkit_dir)
synset = io.loadmat(os.path.join(devkit_dir_name, 'ILSVRC2012_devkit_t12', 'data', 'meta.mat'))
ground_truth = open(
os.path.join(devkit_dir_name, 'ILSVRC2012_devkit_t12', 'data', 'ILSVRC2012_validation_ground_truth.txt'))
lines = ground_truth.readlines()
labels = [int(line[:-1]) for line in lines]
val_dir_name = os.path.join(self.root, val_dir)
root, _, filenames = next(os.walk(val_dir_name))
for filename in filenames:
# val image name -> ILSVRC ID -> WIND
val_id = int(filename.split('.')[0].split('_')[-1])
ILSVRC_ID = labels[val_id - 1]
WIND = synset['synsets'][ILSVRC_ID - 1][0][1][0]
print("val_id:%d, ILSVRC_ID:%d, WIND:%s" % (val_id, ILSVRC_ID, WIND))
# move val images
output_dir = os.path.join(self.root, 'val', WIND)
if os.path.isdir(output_dir):
pass
else:
os.mkdir(output_dir)
shutil.move(os.path.join(root, filename), os.path.join(output_dir, filename))
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in ImageNet.files.values():
if os.path.isfile(os.path.join(self.root, file)):
# file_name = os.path.join(self.root, file)
un_tar(os.path.join(self.root, file), self.root)
else:
return False
return True
else:
return False
[docs]class MNIST(Dataset):
r"""
A 10-class multi-class classfication
Args:
root (string): Root directory of dataset where ``MNIST/processed/training.pt``
and ``MNIST/processed/test.pt`` exist.
is_train (bool, optional): If True, creates dataset from ``training.pt``,
otherwise from ``test.pt``.
"""
class_number = 10
maxNum = 28 * 28
resources = [
"http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz",
"http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz",
"http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz",
"http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz",
]
files = {
"train_images": 'train-images-idx3-ubyte',
"test_images": 't10k-images-idx3-ubyte',
"train_labels": 'train-labels-idx1-ubyte',
"test_labels": 't10k-labels-idx1-ubyte',
}
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
self.download()
def __getitem__(self, index):
# 数据归一化到[0,1]
if self._is_train:
img = np.float32(self.data['train_images'][index])
label = np.int64(self.data['train_labels'][index])
else:
img = np.float32(self.data['test_images'][index])
label = np.int64(self.data['test_labels'][index])
return img / 255.0, label
def __len__(self):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def class_to_idx(self):
return {_class: i for i, _class in enumerate(self.classes)}
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
with open(
os.path.join(self.root, MNIST.files['train_images']), 'rb'
) as f:
self.data['train_images'] = np.frombuffer(
f.read(), np.uint8, offset=16
).reshape(-1, 28 ** 2)
with open(
os.path.join(self.root, MNIST.files['train_labels']), 'rb'
) as f:
self.data['train_labels'] = np.frombuffer(
f.read(), np.uint8, offset=8
)
with open(
os.path.join(self.root, MNIST.files['test_images']), 'rb'
) as f:
self.data['test_images'] = np.frombuffer(
f.read(),
np.uint8,
offset=16
).reshape(-1, 28 ** 2)
with open(
os.path.join(self.root, MNIST.files['test_labels']), 'rb'
) as f:
self.data['test_labels'] = np.frombuffer(
f.read(), np.uint8, offset=8
)
print(">> Dataset loaded")
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in MNIST.files.values():
if not os.path.isfile(os.path.join(self.root, file)):
return False
return True
else:
return False
[docs] def download(self):
if self._dataset_exists():
print(">> Dataset already exists. ")
return
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
pass
[docs]class FashionMNIST(Dataset):
r"""
A 10-class multi-class classfication
"""
class_number = 10
maxNum = 28 * 28
resources = [
"http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz",
"http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz",
"http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz",
"http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz",
]
files = {
"train_images": 'train-images-idx3-ubyte',
"test_images": 't10k-images-idx3-ubyte',
"train_labels": 'train-labels-idx1-ubyte',
"test_labels": 't10k-labels-idx1-ubyte',
}
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
self.download()
def __getitem__(self, index):
# 数据归一化到[0,1]
if self._is_train:
img = np.float32(self.data['train_images'][index])
label = np.int64(self.data['train_labels'][index])
else:
img = np.float32(self.data['test_images'][index])
label = np.int64(self.data['test_labels'][index])
return img / 255.0, label
def __len__(self):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def class_to_idx(self):
return {_class: i for i, _class in enumerate(self.classes)}
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
with open(
os.path.join(self.root, FashionMNIST.files['train_images']), 'rb'
) as f:
self.data['train_images'] = np.frombuffer(
f.read(), np.uint8, offset=16
).reshape(-1, 28 ** 2)
with open(
os.path.join(self.root, FashionMNIST.files['train_labels']), 'rb'
) as f:
self.data['train_labels'] = np.frombuffer(
f.read(), np.uint8, offset=8
)
with open(
os.path.join(self.root, FashionMNIST.files['test_images']), 'rb'
) as f:
self.data['test_images'] = np.frombuffer(
f.read(),
np.uint8,
offset=16
).reshape(-1, 28 ** 2)
with open(
os.path.join(self.root, FashionMNIST.files['test_labels']), 'rb'
) as f:
self.data['test_labels'] = np.frombuffer(
f.read(), np.uint8, offset=8
)
print(">> Dataset loaded")
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in FashionMNIST.files.values():
if not os.path.isfile(os.path.join(self.root, file)):
return False
return True
else:
return False
[docs]class PathMNIST(Dataset):
r"""
A 9-class multi-class classfication
"""
resources = 'https://drive.google.com/drive/folders/1Tl_SP-ffDQg-jDG_EWPlWKgZTmGbvFXU'
class_number = 9
maxNum = 28 * 28 * 3
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {
'train_images': [],
'test_images': [],
'train_labels': [],
'test_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
img = np.float32(self.data['train_images'][index].flatten() / 255)
label = np.int64(self.data['train_labels'][index])
return img, label
else:
img = np.float32(self.data['test_images'][index].flatten() / 255)
label = np.int64(self.data['test_labels'][index])
return img, label
def __len__(self, ):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
unzip_data = np.load(os.path.join(self.root, 'pathmnist.npz'))
self.data['train_images'] = unzip_data['train_images']
self.data['test_images'] = unzip_data['test_images']
self.data['train_labels'] = unzip_data['train_labels'].squeeze()
self.data['test_labels'] = unzip_data['test_labels'].squeeze()
return
def _dataset_exists(self):
if os.path.isfile(os.path.join(self.root, 'pathmnist.npz')):
return True
else:
return False
[docs]class OctMNIST(Dataset):
r"""
A 4-class multi-class classfication
"""
resources = 'https://drive.google.com/drive/folders/1Tl_SP-ffDQg-jDG_EWPlWKgZTmGbvFXU'
class_number = 4
maxNum = 28 * 28
def __init__(self, root, is_train=True):
super().__init__()
self.root = root
self._is_train = is_train
self.data = {
'train_images': [],
'test_images': [],
'train_labels': [],
'test_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
img = np.float32(self.data['train_images'][index].flatten() / 255)
label = np.int64(self.data['train_labels'][index])
return img, label
else:
img = np.float32(self.data['test_images'][index].flatten() / 255)
label = np.int64(self.data['test_labels'][index])
return img, label
def __len__(self, ):
if self._is_train:
return len(self.data['train_images'])
else:
return len(self.data['test_images'])
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
unzip_data = np.load(os.path.join(self.root, 'octmnist.npz'))
self.data['train_images'] = unzip_data['train_images']
self.data['test_images'] = unzip_data['test_images']
self.data['train_labels'] = unzip_data['train_labels'].squeeze()
self.data['test_labels'] = unzip_data['test_labels'].squeeze()
return
def _dataset_exists(self):
if os.path.isfile(os.path.join(self.root, 'octmnist.npz')):
return True
else:
return False
[docs]class RWCP10(Dataset):
r"""
"""
classes = {
"ring": 0,
"whistle1": 1,
"phone4": 2,
"cymbals": 3,
"horn": 4,
"bells5": 5,
"buzzer": 6,
"kara": 7,
"metal15": 8,
"bottle1": 9
}
class_number = 10
def __init__(self, root, is_train=True, **kwargs):
super().__init__()
self.scale = kwargs.get('scale', 0.1)
preprocessing = kwargs.get('preprocessing', 'mfcc')
self.preprocessing = preprocessing.lower()
self.root = root
npz_name = kwargs.get('npz_name', ('mfcc_feature', 'kp_feature'))
self._is_train = is_train
self.data = {
'train_audios': [],
'test_audios': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._npz_exists(npz_name)
if self.npz_name == 'kp_feature.npz':
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.npz_name == 'mfcc_feature.npz':
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 50
self.maxNum = int(self.data['neuron_num'])
# 如果npz_name不存在
else:
if self._classfile_exists():
if self.preprocessing == 'kp':
self.npz_name = save_kp_feature(root=self.root, npz_name=self.npz_name, sample_rate=16e3,
class_labels=RWCP10.classes)
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.preprocessing == 'mfcc':
self.npz_name = save_mfcc_feature(root=self.root, npz_name=self.npz_name, sample_rate=16e3,
class_labels=RWCP10.classes)
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 50
self.maxNum = int(self.data['neuron_num'])
else:
raise ValueError(">> Wrong preprocessing method. Please select kp or mfcc")
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['train_audios'][index] * self.scale, self.data['train_ids'][index]]
else:
spiking = (self.data['train_audios'][index]).astype(float)
label = np.int64(self.data['train_labels'][index])
else:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['test_audios'][index] * self.scale, self.data['test_ids'][index]]
else:
spiking = (self.data['test_audios'][index]).astype(float)
label = np.int64(self.data['test_labels'][index])
return spiking, label
def __len__(self):
if self._is_train:
return len(self.data['train_audios'])
else:
return len(self.data['test_audios'])
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _dataset_exists(self):
if os.path.exists(self.root):
return True
else:
return False
# 判断原始数据是否存在
def _classfile_exists(self):
for cls_id in RWCP10.classes.keys():
if not os.path.isdir(os.path.join(self.root, 'test', cls_id)):
return False
if not os.path.isdir(os.path.join(self.root, 'train', cls_id)):
return False
return True
def _npz_exists(self, npz_name):
file_name = os.listdir(self.root)
if npz_name == ('mfcc_feature', 'kp_feature'):
if 'mfcc_feature.npz' in file_name:
self.npz_name = 'mfcc_feature.npz'
elif 'kp_feature.npz' in file_name:
self.npz_name = 'kp_feature.npz'
else:
self.npz_name = ''
else:
if npz_name in file_name:
self.npz_name = npz_name
else:
self.npz_name = ''
[docs]class MNISTVoices(Dataset):
r"""
Used to load any type of 0-9 audio dataset
Class number: 10
"""
class_number = 10
classes = {
"0": 0,
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6,
"7": 7,
"8": 8,
"9": 9
}
def __init__(self, root, is_train=True, **kwargs):
super().__init__()
self.scale = kwargs.get('scale', 0.1)
preprocessing = kwargs.get('preprocessing', 'mfcc')
self.preprocessing = preprocessing.lower()
self.root = root
npz_name = kwargs.get('npz_name', ('mfcc_feature', 'kp_feature'))
self._is_train = is_train
self.data = {
'train_audios': [],
'test_audios': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._npz_exists(npz_name)
if self.npz_name == 'kp_feature.npz':
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.npz_name == 'mfcc_feature.npz':
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 50
self.maxNum = int(self.data['neuron_num'])
# 如果npz_name不存在
else:
if self._classfile_exists():
if self.preprocessing == 'kp':
self.npz_name = save_kp_feature(root=self.root, npz_name=self.npz_name, sample_rate=16e3,
class_labels=MNISTVoices.classes)
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.preprocessing == 'mfcc':
self.npz_name = save_mfcc_feature(root=self.root, npz_name=self.npz_name, sample_rate=16e3,
class_labels=MNISTVoices.classes)
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 50
self.maxNum = int(self.data['neuron_num'])
else:
raise ValueError(">> Wrong preprocessing method. Please select kp or mfcc")
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['train_audios'][index] * self.scale, self.data['train_ids'][index]]
else:
spiking = (self.data['train_audios'][index]).astype(float)
label = np.int64(self.data['train_labels'][index])
else:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['test_audios'][index] * self.scale, self.data['test_ids'][index]]
else:
spiking = (self.data['test_audios'][index]).astype(float)
label = np.int64(self.data['test_labels'][index])
return spiking, label
def __len__(self):
if self._is_train:
return len(self.data['train_audios'])
else:
return len(self.data['test_audios'])
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _dataset_exists(self):
if os.path.exists(self.root):
return True
else:
return False
# 判断原始数据是否存在
def _classfile_exists(self):
for cls_id in MNISTVoices.classes.keys():
if not os.path.isdir(os.path.join(self.root, 'test', cls_id)):
return False
if not os.path.isdir(os.path.join(self.root, 'train', cls_id)):
return False
return True
def _npz_exists(self, npz_name):
file_name = os.listdir(self.root)
if npz_name == ('mfcc_feature', 'kp_feature'):
if 'mfcc_feature.npz' in file_name:
self.npz_name = 'mfcc_feature.npz'
elif 'kp_feature.npz' in file_name:
self.npz_name = 'kp_feature.npz'
else:
self.npz_name = ''
else:
if npz_name in file_name:
self.npz_name = npz_name
else:
self.npz_name = ''
[docs]class TIDIGITS(Dataset):
r"""
Used to load any type of 0-9 and oh audio dataset
Class number: 11
"""
class_number = 11
classes = {
"zero": 0,
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5,
"six": 6,
"seven": 7,
"eight": 8,
"nine": 9,
"oh": 10
}
def __init__(self, root, is_train=True, **kwargs):
super().__init__()
self.scale = kwargs.get('scale', 0.1)
preprocessing = kwargs.get('preprocessing', 'mfcc')
self.preprocessing = preprocessing.lower()
self.root = root
npz_name = kwargs.get('npz_name', ('mfcc_feature', 'kp_feature'))
self._is_train = is_train
self.data = {
'train_audios': [],
'test_audios': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._npz_exists(npz_name)
if self.npz_name == 'kp_feature.npz':
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.npz_name == 'mfcc_feature.npz':
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 40
self.maxNum = int(self.data['neuron_num'])
# 如果npz_name不存在
else:
if self._classfile_exists():
if self.preprocessing == 'kp':
self.npz_name = save_kp_feature(root=self.root, npz_name=self.npz_name, sample_rate=16e3,
class_labels=TIDIGITS.classes)
self.data = load_kp_data(self.root, self.npz_name)
self.maxTime = int(np.ceil(self.data['Time'] * self.scale))
self.maxNum = int(self.data['neuron_num'])
elif self.preprocessing == 'mfcc':
self.npz_name = save_mfcc_feature(root=self.root, npz_name=self.npz_name, sample_rate=20e3,
signal_num=20e3, class_labels=TIDIGITS.classes)
self.data = load_mfcc_data(self.root, self.npz_name)
self.maxTime = 40
self.maxNum = int(self.data['neuron_num'])
else:
raise ValueError(">> Wrong preprocessing method. Please select kp or mfcc")
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['train_audios'][index] * self.scale, self.data['train_ids'][index]]
else:
spiking = (self.data['train_audios'][index]).astype(float)
label = np.int64(self.data['train_labels'][index])
else:
if self.npz_name == 'kp_feature.npz':
spiking = [self.data['test_audios'][index] * self.scale, self.data['test_ids'][index]]
else:
spiking = (self.data['test_audios'][index]).astype(float)
label = np.int64(self.data['test_labels'][index])
return spiking, label
def __len__(self):
if self._is_train:
return len(self.data['train_audios'])
else:
return len(self.data['test_audios'])
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _dataset_exists(self):
if os.path.exists(self.root):
return True
else:
return False
# 判断原始数据是否存在
def _classfile_exists(self):
for cls_id in TIDIGITS.classes.keys():
if not os.path.isdir(os.path.join(self.root, 'test', cls_id)):
return False
if not os.path.isdir(os.path.join(self.root, 'train', cls_id)):
return False
return True
def _npz_exists(self, npz_name):
file_name = os.listdir(self.root)
if npz_name == ('mfcc_feature', 'kp_feature'):
if 'mfcc_feature.npz' in file_name:
self.npz_name = 'mfcc_feature.npz'
elif 'kp_feature.npz' in file_name:
self.npz_name = 'kp_feature.npz'
else:
self.npz_name = ''
else:
if npz_name in file_name:
self.npz_name = npz_name
else:
self.npz_name = ''
[docs]class SHD(Dataset):
'''
Spiking Heidelberg Digits Dataset
max spiking time: 136.9 ms
max neuron num: 700
Class number: 20
number of train samples: 8156
number of test samples: 2264
'''
class_number = 20
maxNum = 700
Time = 1.37
files = {
"train_dataset": 'shd_train.h5',
"test_dataset": 'shd_test.h5'
}
def __init__(self, root, is_train=True, **kwargs):
super().__init__()
self.scale = kwargs.get('scale', 100)
self.maxTime = int(SHD.Time * self.scale)
self.root = root
self._is_train = is_train
self.data = {
'train_spiking': [],
'test_spiking': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
spiking = [self.data['train_spiking'][index] * self.scale, self.data['train_ids'][index]]
label = np.int64(self.data['train_labels'][index])
else:
spiking = [self.data['test_spiking'][index] * self.scale, self.data['test_ids'][index]]
label = np.int64(self.data['test_labels'][index])
return spiking, label
def __len__(self):
if self._is_train:
return len(self.data['train_spiking'])
else:
return len(self.data['test_spiking'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
# trainset
# import tables
import h5py
if self._is_train:
# train_fileh = tables.open_file(os.path.join(self.root, SHD.files['train_dataset']), mode='r')
train_fileh = h5py.File(os.path.join(self.root, SHD.files['train_dataset']), 'r')
neuron_ids = train_fileh['spikes']['units'][:]
spike_times = train_fileh['spikes']['times'][:]
labels = train_fileh['labels'][:]
# neuron_ids = train_fileh.root.spikes.units
# spike_times = train_fileh.root.spikes.times
# labels = np.array(train_fileh.root.labels)
self.data['train_spiking'] = spike_times
self.data['train_ids'] = neuron_ids
self.data['train_labels'] = labels
else:
# testset
# test_fileh = tables.open_file(os.path.join(self.root, SHD.files['test_dataset']), mode='r')
test_fileh = h5py.File(os.path.join(self.root, SHD.files['test_dataset']), 'r')
neuron_ids = test_fileh['spikes']['units'][:]
spike_times = test_fileh['spikes']['times'][:]
labels = test_fileh['labels'][:]
# neuron_ids = test_fileh.root.spikes.units
# spike_times = test_fileh.root.spikes.times
# labels = np.array(test_fileh.root.labels)
self.data['test_spiking'] = spike_times
self.data['test_ids'] = neuron_ids
self.data['test_labels'] = labels
print(">> Dataset loaded")
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in SHD.files.values():
if not os.path.isfile(os.path.join(self.root, file)):
return False
return True
else:
return False
[docs]class SSC(Dataset):
'''
Spiking Speech Command Dataset
max spiking time: 99.95ms
max neuron num: 700
Class number: 35
number of train samples: 75466
number of test samples: 30363
'''
class_number = 35
maxNum = 700
Time = 1
files = {
"train_dataset": 'ssc_train.h5',
"valid_dataset": 'ssc_valid.h5',
"test_dataset": 'ssc_test.h5'
}
def __init__(self, root, is_train=True, **kwargs):
super().__init__()
self.scale = kwargs.get('scale', 100)
self.maxTime = int(SSC.Time * self.scale)
self.root = root
self._is_train = is_train
self.data = {
'train_spiking': [],
'test_spiking': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': []
}
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
if self._dataset_exists():
self._to_numpy_format()
else:
raise ValueError(">> Failed to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
spiking = [self.data['train_spiking'][index] * self.scale, self.data['train_ids'][index]]
label = np.int64(self.data['train_labels'][index])
else:
spiking = [self.data['test_spiking'][index] * self.scale, self.data['test_ids'][index]]
label = np.int64(self.data['test_labels'][index])
return spiking, label
def __len__(self):
if self._is_train:
return len(self.data['train_spiking'])
else:
return len(self.data['test_spiking'])
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _to_numpy_format(self):
# import tables
import h5py
# trainset
if self._is_train:
# train_fileh = tables.open_file(os.path.join(self.root, SSC.files['train_dataset']), mode='r')
train_fileh = h5py.File(os.path.join(self.root, SSC.files['train_dataset']), 'r')
neuron_ids = train_fileh['spikes']['units'][:]
spike_times = train_fileh['spikes']['times'][:]
labels = train_fileh['labels'][:]
# neuron_ids = train_fileh.root.spikes.units
# spike_times = train_fileh.root.spikes.times
# labels = np.array(train_fileh.root.labels)
self.data['train_spiking'] = spike_times
self.data['train_ids'] = neuron_ids
self.data['train_labels'] = labels
else:
# test_fileh = tables.open_file(os.path.join(self.root, SSC.files['test_dataset']), mode='r')
test_fileh = h5py.File(os.path.join(self.root, SSC.files['test_dataset']), 'r')
neuron_ids = test_fileh['spikes']['units'][:]
spike_times = test_fileh['spikes']['times'][:]
labels = test_fileh['labels'][:]
# neuron_ids = test_fileh.root.spikes.units
# spike_times = test_fileh.root.spikes.times
# labels = np.array(test_fileh.root.labels)
self.data['test_spiking'] = spike_times
self.data['test_ids'] = neuron_ids
self.data['test_labels'] = labels
print(">> Dataset loaded")
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
for file in SSC.files.values():
if not os.path.isfile(os.path.join(self.root, file)):
return False
return True
else:
return False
[docs]class DVS128Gesture(Dataset):
"""
参考 https://spikingjelly.readthedocs.io/zh_CN/latest/clock_driven/13_neuromorphic_datasets.html
"""
resources = 'https://ibm.ent.box.com/s/3hiq58ww1pbbjrinh367ykfdf60xsfm8/folder/50167556794'
scale = 0.001
resize = 1
class_number = 11
maxNum = (128 // resize) * (128 // resize)
# maxTime: 18456.951ms 数据集中单位为us
origin_maxTime = 18457
maxTime = 1000 # mstb编码也设置为最长1秒
maxFrameValue = 65.0
files = {
"train_dataset": 'trials_to_train.txt',
"test_dataset": 'trials_to_test.txt'
}
def __init__(self, dataset_root, is_train=True, data_type='event', event_cut=None, frames_number=None,
split_by=None, duration=None, time_step=None):
"""
Args:
dataset_root:
is_train:
data_type:
event_cut: 事件太长需要切分,分割事件的时间长度,ms
frames_number:
split_by:
duration:
"""
super().__init__()
self.root = dataset_root
self._is_train = is_train
self.data_type = data_type
self.frame_number = frames_number
self.duration = duration
self.time_step = time_step
self.data = {
'train_spiking': [],
'test_spiking': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': []
}
self.file_list = {
'train_files': [],
'test_files': []
}
if event_cut is None:
events_np_root = os.path.join(dataset_root, 'events_np')
else:
events_np_root = os.path.join(dataset_root, f'events_np_{event_cut}')
self.events_np_root = events_np_root
if not isinstance(self._is_train, bool):
raise TypeError(">> is_train should be boolean value")
train_path = os.path.join(events_np_root, 'train')
test_path = os.path.join(events_np_root, 'test')
if self._dataset_exists():
if os.path.exists(train_path) and os.path.exists(test_path):
print("DVS128 Gesture Dataset had been transferred into npz format before.")
else:
os.mkdir(events_np_root)
print(f'Mkdir [{events_np_root}].')
self.create_events_np_files(events_np_root, event_cut)
else:
raise ValueError(">> Faild to load the set, file not exist. You should download the dataset firstly.")
self.H, self.W = self.get_H_W()
# self._to_numpy_format()
if data_type == 'event':
for root, dirs, files in os.walk(train_path):
for file in files:
self.file_list['train_files'].append(os.path.join(root, file))
for root, dirs, files in os.walk(test_path):
for file in files:
self.file_list['test_files'].append(os.path.join(root, file))
elif data_type == 'frame':
if frames_number is not None:
assert frames_number > 0 and isinstance(frames_number, int)
assert split_by == 'time' or split_by == 'number'
frames_np_root = os.path.join(dataset_root, f'frames_number_{frames_number}_split_by_{split_by}')
if os.path.exists(frames_np_root):
print(f'The directory [{frames_np_root}] already exists.')
else:
os.mkdir(frames_np_root)
print(f'Mkdir [{frames_np_root}].')
# create the same directory structure
create_same_directory_structure(events_np_root, frames_np_root)
# use multi-thread to accelerate
t_ckp = time.time()
with ThreadPoolExecutor(max_workers=min(multiprocessing.cpu_count(), 64)) as tpe:
print(f'Start ThreadPoolExecutor with max workers = [{tpe._max_workers}].')
for e_root, e_dirs, e_files in os.walk(events_np_root):
if e_files.__len__() > 0:
output_dir = os.path.join(frames_np_root, os.path.relpath(e_root, events_np_root))
for e_file in e_files:
events_np_file = os.path.join(e_root, e_file)
print(
f'Start to integrate [{events_np_file}] to frames and save to [{output_dir}].')
tpe.submit(integrate_events_file_to_frames_file_by_fixed_frames_number,
events_np_file, output_dir, split_by, frames_number, self.H, self.W,
True)
print(f'Used time = [{round(time.time() - t_ckp, 2)}s].')
train_path = os.path.join(frames_np_root, 'train')
test_path = os.path.join(frames_np_root, 'test')
for root, dirs, files in os.walk(train_path):
for file in files:
self.file_list['train_files'].append(os.path.join(root, file))
for root, dirs, files in os.walk(test_path):
for file in files:
self.file_list['test_files'].append(os.path.join(root, file))
elif duration is not None:
# duration单位为us
assert duration > 0 and isinstance(duration, int)
frames_np_root = os.path.join(dataset_root, f'duration_{duration}')
if os.path.exists(frames_np_root):
print(f'The directory [{frames_np_root}] already exists.')
else:
os.mkdir(frames_np_root)
print(f'Mkdir [{frames_np_root}].')
# create the same directory structure
create_same_directory_structure(events_np_root, frames_np_root)
# use multi-thread to accelerate
t_ckp = time.time()
with ThreadPoolExecutor(max_workers=min(multiprocessing.cpu_count(), 64)) as tpe:
print(f'Start ThreadPoolExecutor with max workers = [{tpe._max_workers}].')
for e_root, e_dirs, e_files in os.walk(events_np_root):
if e_files.__len__() > 0:
output_dir = os.path.join(frames_np_root, os.path.relpath(e_root, events_np_root))
for e_file in e_files:
events_np_file = os.path.join(e_root, e_file)
print(
f'Start to integrate [{events_np_file}] to frames and save to [{output_dir}].')
tpe.submit(integrate_events_file_to_frames_file_by_fixed_duration, events_np_file,
output_dir, duration, self.H, self.W, True)
print(f'Used time = [{round(time.time() - t_ckp, 2)}s].')
train_path = os.path.join(frames_np_root, 'train')
test_path = os.path.join(frames_np_root, 'test')
for root, dirs, files in os.walk(train_path):
for file in files:
self.file_list['train_files'].append(os.path.join(root, file))
for root, dirs, files in os.walk(test_path):
for file in files:
self.file_list['test_files'].append(os.path.join(root, file))
else:
raise ValueError('frames_number and duration can not both be None.')
[docs] def calculate_max_time(self):
"""
读取数据集,计算事件的最大持续时间
Returns:
"""
if os.path.exists(os.path.join(self.root)):
data_files = {
'train_dataset': [],
'test_dataset': []
}
with open(os.path.join(self.root, self.files['train_dataset'])) as train_f:
for line in train_f.readlines():
line = line.strip('\n')
data_files['train_dataset'].append(line.split('.')[0] + '_labels.csv')
with open(os.path.join(self.root, self.files['test_dataset'])) as test_f:
for line in test_f.readlines():
line = line.strip('\n')
data_files['test_dataset'].append(line.split('.')[0] + '_labels.csv')
for files in data_files.values():
for f in files:
fpath = os.path.join(self.root, f)
with open(fpath, 'r') as cf:
import csv
csv_reader = csv.reader(cf)
birth_header = next(csv_reader) # 读取第一行每一列的标题
for row in csv_reader: # 将csv 文件中的数据保存到birth_data中
self.maxTime = max(self.maxTime, int(row[2]) - int(row[1]))
print('maxTime:', self.maxTime / 1000) # maxTime: 18456.951
else:
raise ValueError(">> Faild to load the set, file not exist. You should download the dataset firstly.")
[docs] def calculate_max_value_from_frame(self):
train_path = self.file_list['train_files']
test_path = self.file_list['test_files']
maxFrameValue = 0
for f in train_path:
data = np.load(f)['frames']
maxFrameValue = max(maxFrameValue, np.max(data))
for f in test_path:
data = np.load(f)['frames']
maxFrameValue = max(maxFrameValue, np.max(data))
print('maxFrameValue:', maxFrameValue) # maxFrameValue: 53.0
[docs] def calculate_max_time_from_npz(self):
"""
读取数据集,计算事件的最大持续时间
Returns:
"""
if os.path.exists(os.path.join(self.events_np_root)):
train_path = os.path.join(self.events_np_root, 'train')
test_path = os.path.join(self.events_np_root, 'test')
for root, dirs, files in os.walk(train_path):
for file in files:
data = np.load(os.path.join(root, file))
t = data['t']
self.maxTime = max(self.maxTime, np.max(t) - np.min(t))
for root, dirs, files in os.walk(test_path):
for file in files:
data = np.load(os.path.join(root, file))
t = data['t']
self.maxTime = max(self.maxTime, np.max(t) - np.min(t))
print('maxTime:', self.maxTime / 1000) # maxTime: 18456.951 ms
else:
raise ValueError(">> Faild to load the set, file not exist. You should download the dataset firstly.")
def __getitem__(self, index):
if self._is_train:
file_name = self.file_list['train_files'][index]
else:
file_name = self.file_list['test_files'][index]
data = np.load(file_name)
if self.data_type == 'event':
label = int(os.path.split(file_name)[-1].split('.')[0].split('_')[-1])
t = data['t']
x = data['x'] // self.resize
y = data['y'] // self.resize
p = data['p']
index_p = (p == 1)
x = x[index_p]
y = y[index_p]
t = t[index_p]
p = p[index_p]
neuron_index = []
firing_time = []
new_w = self.W // self.resize
for i in range(len(x)):
if t[i] - t[0] > 1e6: # 只取一秒
break
neuron_index.append(y[i] * new_w + x[i])
firing_time.append(t[i] - t[0])
spiking = [np.asarray(firing_time) * DVS128Gesture.scale, neuron_index]
# label = np.int64(self.data['train_labels'][index])
elif self.data_type == 'frame' and self.duration is not None:
import random
frames = data['frames']
# 随机取一段数据
nums = frames.shape[0]
left = random.randint(0, nums - self.time_step)
frames = frames[left: left + self.time_step, ...]
label = int(os.path.split(file_name)[-1].split('.')[0].split('_')[-2])
# normalization
# spiking = frames / self.maxFrameValue
spiking = (frames > 0) * 1
spiking = spiking.astype(np.float32) # shape[batch, time_step, channel, H ,W], 在编码中交换维度
elif self.data_type == 'frame' and self.frame_number is not None:
import random
spiking = data['frames']
spiking = spiking.astype(np.float32)
label = int(os.path.split(file_name)[-1].split('.')[0].split('_')[-1])
else:
raise NotImplementedError
return spiking, label
def __len__(self):
if self._is_train:
return len(self.file_list['train_files'])
else:
return len(self.file_list['test_files'])
[docs] def split_event_to_short(self, spiking):
bins = spiking.shape[-1] // self.maxTime
@property
def dataset_folder(self):
return os.path.join(self.root, self.__class__.__name__)
@property
def data_dict(self):
return self.data
@property
def is_train(self):
return self._is_train
@is_train.setter
def is_train(self, is_train):
assert is_train in [True, False], ">> Invalid is_train setting"
self._is_train = is_train
def _dataset_exists(self):
if os.path.exists(os.path.join(self.root)):
data_files = {
'train_dataset': [],
'test_dataset': []
}
with open(os.path.join(self.root, self.files['train_dataset'])) as train_f:
for line in train_f.readlines():
line = line.strip('\n')
data_files['train_dataset'].append(line)
with open(os.path.join(self.root, self.files['test_dataset'])) as test_f:
for line in test_f.readlines():
line = line.strip('\n')
data_files['test_dataset'].append(line)
for files in data_files.values():
for f in files:
if not os.path.isfile(os.path.join(self.root, f)):
return False
return True
else:
return False
[docs] @staticmethod
def split_aedat_files_to_np(fname: str, aedat_file: str, csv_file: str, output_dir: str, event_split=None):
events = load_aedat_v3(aedat_file)
print(f'Start to split [{aedat_file}] to samples.')
# Read csv file and get time stamp and label of each sample. Then split the origin data to samples
csv_data = np.loadtxt(csv_file, dtype=np.uint32, delimiter=',', skiprows=1)
# Note that there are some files that many samples have the same label, e.g., user26_fluorescent_labels.csv
label_file_num = [0] * 11
if event_split is not None:
event_split *= 1000 # convert ms to us
for i in range(csv_data.shape[0]):
# the label of DVS128 Gesture is 1, 2, ..., 11. We set 0 as the first label, rather than 1
label = csv_data[i][0] - 1
t_start = csv_data[i][1]
t_end = csv_data[i][2]
mask = np.logical_and(events['t'] >= t_start, events['t'] < t_end)
if event_split is None:
file_name = os.path.join(output_dir, str(label), f'{fname}_{label}.npz')
np.savez(file_name,
t=events['t'][mask],
x=events['x'][mask],
y=events['y'][mask],
p=events['p'][mask]
)
print(f'[{file_name}] saved.')
label_file_num[label] += 1
else:
total_time = t_end - t_start
bins = total_time // event_split
t = events['t'][mask]
x = events['x'][mask]
y = events['y'][mask]
p = events['p'][mask]
left = 0
if bins == 0:
bins = 1 # 事件长度小于指定的时间
per_bin = t.shape[0] // bins
for j in range(bins):
right = left + per_bin
file_name = os.path.join(output_dir, str(label), f'{fname}_{j}_{label}.npz')
np.savez(file_name,
t=t[left: right],
x=x[left: right],
y=y[left: right],
p=p[left: right],
)
print(f'[{file_name}] saved.')
left = right
label_file_num[label] += 1
[docs] def create_events_np_files(self, events_np_root: str, event_split=None):
'''
:param events_np_root: Root directory path which saves events files in the ``npz`` format
:type events_np_root: str
This function defines how to convert the origin binary data in ``extract_root`` to ``npz`` format and save converted files in ``events_np_root``.
Args:
event_split: 事件划分成几份
'''
train_dir = os.path.join(events_np_root, 'train')
test_dir = os.path.join(events_np_root, 'test')
os.mkdir(train_dir)
os.mkdir(test_dir)
print(f'Mkdir {train_dir, test_dir}.')
for label in range(11):
os.mkdir(os.path.join(train_dir, str(label)))
os.mkdir(os.path.join(test_dir, str(label)))
print(f'Mkdir {os.listdir(train_dir)} in [{train_dir}] and {os.listdir(test_dir)} in [{test_dir}].')
with open(os.path.join(self.root, DVS128Gesture.files['train_dataset'])) as trials_to_train_txt, open(
os.path.join(self.root, DVS128Gesture.files['test_dataset'])) as trials_to_test_txt:
# use multi-thread to accelerate
# t_ckp = time.time()
with ThreadPoolExecutor(max_workers=min(multiprocessing.cpu_count(), 64)) as tpe:
print(f'Start the ThreadPoolExecutor with max workers = [{tpe._max_workers}].')
for fname in trials_to_train_txt.readlines():
fname = fname.strip()
if fname.__len__() > 0:
aedat_file = os.path.join(self.root, fname)
fname = os.path.splitext(fname)[0]
tpe.submit(DVS128Gesture.split_aedat_files_to_np, fname, aedat_file,
os.path.join(self.root, fname + '_labels.csv'), train_dir)
for fname in trials_to_test_txt.readlines():
fname = fname.strip()
if fname.__len__() > 0:
aedat_file = os.path.join(self.root, fname)
fname = os.path.splitext(fname)[0]
tpe.submit(DVS128Gesture.split_aedat_files_to_np, fname, aedat_file,
os.path.join(self.root, fname + '_labels.csv'), test_dir)
# print(f'Used time = [{round(time.time() - t_ckp, 2)}s].')
print(f'All aedat files have been split to samples and saved into [{train_dir, test_dir}].')
[docs] def get_H_W(self):
'''
:return: A tuple ``(H, W)``, where ``H`` is the height of the data and ``W` is the weight of the data.
For example, this function returns ``(128, 128)`` for the DVS128 Gesture dataset.
:rtype: tuple
'''
return 128, 128