"""
@author: Yuan Mengwen
@contact: mwyuan94@gmail.com
@project: PyCharm
@filename: utils.py
@time:2021/4/1 14:33
@description:
"""
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
from random import shuffle
from shutil import copy
import numpy as np
import scipy.signal as signal
import scipy.io.wavfile as wav
from scipy.ndimage import maximum_filter
# import soundfile as sf
# import cv2
import math
import struct
from ..utils import plot, gtgram
import matplotlib.pyplot as plt
import scipy.io.wavfile as wav
'''
==================audio preprocess method==================
'''
[docs]def wav_file_resample(file_path, dest_sample=16e3):
"""
对WAV文件进行resample的操作
Args:
file_path: 需要进行resample操作的wav文件的路径
dest_sample:目标采样率
Returns:
resampled: 降采样后的数据
dest_sample: 目标采样率
"""
sample_rate, sound_signal = wav.read(file_path)
signal_num = int((sound_signal.shape[0]) / sample_rate * dest_sample)
resampled = signal.resample(sound_signal, signal_num)
return resampled, dest_sample
[docs]def wav_file_cut(file_path, signal_num=16e3):
"""
对WAV文件进行裁剪操作
Args:
file_path: 需要进行resample操作的wav文件的路径
signal_num:目标数据数量
Returns:
cropped_data: 裁剪后的数据
"""
sample_rate, sound_signal = wav.read(file_path)
cropped_data = signal.resample(sound_signal, int(signal_num))
return cropped_data
def _dataset_exists(root, class_labels):
if os.path.exists(root):
for cls_id in class_labels.keys():
if not os.path.isdir(os.path.join(root, 'test', cls_id)):
return False
if not os.path.isdir(os.path.join(root, 'train', cls_id)):
return False
return True
else:
return False
[docs]def save_mfcc_feature(root, npz_name, sample_rate=16e3, signal_num=16e3, class_labels=None, **kwargs):
from python_speech_features import mfcc
if npz_name == '':
feature_name = 'mfcc_feature'
# set class labels
if class_labels is None:
raise ValueError('Missing class labels dict')
else:
classes = class_labels
data = {
'train_audios': [],
'train_labels': [],
'test_audios': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
dataset_name = ['train', 'test'] # if is_train else 'test'
for subset in dataset_name:
for cls in classes.keys():
cur_dir = os.path.join(root, subset, cls)
for file in os.listdir(cur_dir):
if not file.endswith('wav'):
continue
wavform = wav_file_cut(os.path.join(cur_dir, file), signal_num)
from python_speech_features import mfcc
feature_mfcc = mfcc(wavform, samplerate=sample_rate)
feature_mfcc = feature_mfcc.flatten()
audios_name = "{}_audios".format(subset.lower())
labels_name = "{}_labels".format(subset.lower())
data[audios_name].append(feature_mfcc)
data[labels_name].append(classes[cls])
# 将音频数据存储为.npz文件
data_root = os.path.join(root, feature_name)
npz_name = feature_name + '.npz'
data['neuron_num'] = len(data['train_audios'][0])
trainMaxTime = get_Max(data['train_audios'])
testMaxTime = get_Max(data['test_audios'])
data['Time'] = max(trainMaxTime, testMaxTime)
for k in data.keys():
data[k] = np.array(data[k], dtype=object)
np.savez(data_root, train_audios=data['train_audios'], train_labels=data['train_labels'],
test_audios=data['test_audios'], test_labels=data['test_labels'], Time=data['Time'],
neuron_num=data['neuron_num'])
print(">> mfcc_features saved")
return npz_name
[docs]def save_kp_feature(root=None, npz_name=None, sample_rate=16e3, class_labels=None, **kwargs):
# parameters for extracting key points of audio
window_size = kwargs.get('window_size', 0.016)
stride = kwargs.get('stride', 0.008)
kernels_num = kwargs.get('kernels_num', 100)
freq_min = kwargs.get('freq_min', 20)
Dr = kwargs.get('Dr', 3)
Dc = kwargs.get('Dc', 3)
significance_level = kwargs.get('significance_level', 3)
if npz_name == '':
feature_name = 'kp_feature'
# set class labels
if class_labels is None:
raise ValueError('Missing class labels dict')
else:
classes = class_labels
data = {
'train_audios': [],
'train_labels': [],
'test_audios': [],
'test_labels': [],
'train_ids': [],
'test_ids': [],
'Time': [],
'neuron_num': []
}
dataset_name = ['train', 'test'] # if is_train else 'test'
for subset in dataset_name:
for cls in classes.keys():
cur_dir = os.path.join(root, subset, cls)
for file in os.listdir(cur_dir):
if not file.endswith('wav'):
continue
wavform, sr = wav_file_resample(os.path.join(cur_dir, file), sample_rate)
gmspec = fetchGmSpectrogram(wavform, sample_rate, window_size, stride, kernels_num,
freq_min) # gtgram.gtgram(wavform, sample_rate, window_size, stride, kernels_num, freq_min, show=True)
irow, icol, ival = extractKeyPoints(gmspec, Dr, Dc, significance_level)
audios_name = "{}_audios".format(subset.lower())
labels_name = "{}_labels".format(subset.lower())
id_name = "{}_ids".format(subset.lower())
data[audios_name].append(ival)
data[labels_name].append(classes[cls])
data[id_name].append(irow)
# 将音频数据存储为.npz文件
data_root = os.path.join(root, feature_name)
npz_name = feature_name + '.npz'
data['neuron_num'] = kernels_num
trainMaxTime = get_Max(data['train_audios'])
testMaxTime = get_Max(data['test_audios'])
data['Time'] = max(trainMaxTime, testMaxTime)
for k in data.keys():
data[k] = np.array(data[k], dtype=object)
np.savez(data_root, train_audios=data['train_audios'], train_labels=data['train_labels'],
train_ids=data['train_ids'],
test_audios=data['test_audios'], test_labels=data['test_labels'], test_ids=data['test_ids'],
Time=data['Time'], neuron_num=data['neuron_num'])
print(">> kp_feature saved")
return npz_name
[docs]def load_kp_data(root, filename):
data = {
'train_audios': [],
'test_audios': [],
'train_ids': [],
'test_ids': [],
'train_labels': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
fileroot = os.path.join(root, filename)
data_temp = np.load(fileroot, allow_pickle=True)
data['train_audios'] = data_temp['train_audios']
data['test_audios'] = data_temp['test_audios']
data['train_labels'] = data_temp['train_labels']
data['test_labels'] = data_temp['test_labels']
data['train_ids'] = data_temp['train_ids']
data['test_ids'] = data_temp['test_ids']
data['Time'] = data_temp['Time']
data['neuron_num'] = data_temp['neuron_num']
print(">> " + filename + " loaded")
return data
[docs]def load_mfcc_data(root, filename):
data = {
'train_audios': [],
'test_audios': [],
'train_labels': [],
'test_labels': [],
'Time': [],
'neuron_num': []
}
fileroot = os.path.join(root, filename)
data_temp = np.load(fileroot, allow_pickle=True)
data['train_audios'] = data_temp['train_audios']
data['test_audios'] = data_temp['test_audios']
data['train_labels'] = data_temp['train_labels']
data['test_labels'] = data_temp['test_labels']
data['Time'] = data_temp['Time']
data['neuron_num'] = data_temp['neuron_num']
print(">> " + filename + " loaded")
return data
[docs]def dataset_split(source_root, target_root, ratio, is_shuffle):
train_root = target_root + "\\" + 'train'
test_root = target_root + "\\" + 'test'
if not os.path.isdir(train_root):
os.makedirs(train_root)
if not os.path.isdir(test_root):
os.makedirs(test_root)
for class_name in os.listdir(source_root):
class_root = os.path.join(source_root, class_name)
train_dir = os.path.join(train_root, class_name)
test_dir = os.path.join(test_root, class_name)
if not os.path.isdir(train_dir):
os.makedirs(train_dir)
if not os.path.isdir(test_dir):
os.makedirs(test_dir)
samples = os.listdir(class_root)
samples_len = len(samples)
if is_shuffle is True:
shuffle(samples)
# i 用来计算文件数量
i = 0
to_path = train_dir
for data_name in samples:
split_num = math.ceil(ratio * samples_len)
if i == 0:
to_path = train_dir
elif ((i % split_num) == 0):
to_path = test_dir
from_path = os.path.join(class_root, data_name)
copy(from_path, to_path)
i += 1
[docs]def reclassification(source_root, target_root, class_num, perperson_perclass_samplenum):
'''
将按录音者分类的digit语音数据集重保存为按录的音频的类别分类
'''
# 想保存到的根路径
for i in range(class_num):
save_dir = os.path.join(target_root, str(i))
# 如果目录不存在,则创建
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
if os.path.exists(source_root):
all_files = os.listdir(source_root)
else:
raise ValueError('The path' + source_root + ' is not exist')
for file_name in all_files:
file_root = os.path.join(source_root, file_name)
samples = os.listdir(file_root)
# i 用来计算文件数量,k 用来计算应存放到哪一类中
i = 0
k = 0
for data_name in samples:
if i == 0:
k = 0
elif ((i % perperson_perclass_samplenum) == 0):
k += 1
from_path = os.path.join(file_root, data_name)
to_path = target_root + "\\" + str(k)
copy(from_path, to_path)
i += 1
[docs]def datasetAlignment(source, maxNum):
"""
Zeros are padded to each sample in the dataset according to the value of maxNum
Args:
source (ndarray): samples of dataset
maxNum (int): The length of longest sample
Returns:
The data after padding
"""
source = source.tolist()
for n in range(len(source)):
pad_len = maxNum - len(source[n])
source[n] = np.pad(source[n], (0, pad_len), 'constant', constant_values=(0, 0))
source = np.array(source)
return source
[docs]def batchAlignment(source):
source = source.tolist()
maxNum = 0
for n in range(len(source)):
wav_sig = source[n]
maxNum = max(maxNum, len(wav_sig))
for n in range(len(source)):
pad_len = maxNum - len(source[n])
source[n] = np.pad(source[n], (0, pad_len), 'constant', constant_values=(0, 0))
return source
[docs]def fetchGmSpectrogram(sig, fs=16e3, window_size=0.016, stride=0.008, kernels_num=32, freq_min=20, log=False,
show=False):
gmspec = gtgram.gtgram(sig, fs, window_size, stride, kernels_num, freq_min)
if log:
gmspec = np.log(gmspec)
if show:
p1 = plt.figure('spectrum', dpi=500)
axes = p1.add_axes([0.1, 0.1, 0.9, 0.9])
plot.gtgram_plot(gtgram.gtgram, axes, sig, fs, window_size, stride, kernels_num, freq_min)
# plt.show()
# print('')
return gmspec
[docs]def get_Max(data):
'''
get the maximum number of data
Args:
data (): can be spiking time or neuron ids
Returns:
'''
maxData = 0
for i in range(len(data)):
tempMax = max(data[i])
maxData = max(maxData, tempMax)
return maxData
'''
==================image preprocess method==================
'''
[docs]def RGBtoGray(image):
"""
Converts RGB image into gray image.
Args:
image: RGB image.
Returns:
Gray image.
"""
import cv2
return cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
[docs]def GraytoBinary(image):
"""
Converts input image into black and white (binary)
Args:
image: Gray image.
Returns:
Binary image.
"""
import cv2
return cv2.threshold(image, 0, 1, cv2.THRESH_BINARY)[1]
[docs]def reshape(image, shape):
"""
Scale the image to (x, y).
Args:
image: Image to be rescaled.
shape: Changed shape
Returns:
Re-scaled image.
"""
import cv2
return cv2.resize(image, shape)
[docs]def im2col(img, kh, kw, stride, padding='same'):
'''
:param img: 4D array
:param kh: kernel_height
:param kw: kernel_width
:param stride:
:param padding:
:return:
'''
if padding == 'same':
p1 = kh // 2
p2 = kw // 2
img = np.pad(img, ((0, 0), (0, 0), (p1, p1), (p2, p2),), 'constant')
N, C, H, W = img.shape
out_h = (H - kh) // stride[0] + 1
out_w = (W - kw) // stride[1] + 1
outsize = out_w * out_h
col = np.empty((N, C, kw * kh, outsize,))
for y in range(out_h):
y_start = y * stride[0]
y_end = y_start + kh
for x in range(out_w):
x_start = x * stride[1]
x_end = x_start + kw
col[:, :, 0:, y * out_w + x] = img[:, :, y_start:y_end, x_start:x_end].reshape(N, C, kh * kw)
return col.reshape(N, -1, outsize)
[docs]def un_tar(file_name, output_root):
# untar zip file to folder whose name is same as tar file
import tarfile
tar = tarfile.open(file_name)
names = tar.getnames()
file_name = os.path.basename(file_name)
extract_dir = os.path.join(output_root, file_name.split('.')[0])
# create folder if nessessary
if os.path.isdir(extract_dir):
pass
else:
os.makedirs(extract_dir)
file_list = os.listdir(extract_dir)
if len(file_list) == len(names):
pass
else:
for name in names:
tar.extract(name, extract_dir)
tar.close()
'''
==================DVS preprocess method==================
'''
[docs]def load_aedat_v3(file_name: str):
'''
Args:
file_name(str): path of the aedat v3 file
Returns:
a dict whose keys are ['t', 'x', 'y', 'p'] and values are ``numpy.ndarray``
This function is written by referring to https://gitlab.com/inivation/dv/dv-python . It can be used for DVS128 Gesture.
'''
with open(file_name, 'rb') as bin_f:
# skip ascii header
line = bin_f.readline()
while line.startswith(b'#'):
if line == b'#!END-HEADER\r\n':
break
else:
line = bin_f.readline()
txyp = {
't': [],
'x': [],
'y': [],
'p': []
}
while True:
header = bin_f.read(28)
if not header or len(header) == 0:
break
# read header
e_type = struct.unpack('H', header[0:2])[0]
e_size = struct.unpack('I', header[4:8])[0]
e_tsoverflow = struct.unpack('I', header[12:16])[0]
e_capacity = struct.unpack('I', header[16:20])[0]
data_length = e_capacity * e_size
data = bin_f.read(data_length)
counter = 0
if e_type == 1:
while data[counter:counter + e_size]:
aer_data = struct.unpack('I', data[counter:counter + 4])[0]
timestamp = struct.unpack('I', data[counter + 4:counter + 8])[0] | e_tsoverflow << 31
x = (aer_data >> 17) & 0x00007FFF
y = (aer_data >> 2) & 0x00007FFF
pol = (aer_data >> 1) & 0x00000001
counter = counter + e_size
txyp['x'].append(x)
txyp['y'].append(y)
txyp['t'].append(timestamp)
txyp['p'].append(pol)
else:
# non-polarity event packet, not implemented
pass
txyp['x'] = np.asarray(txyp['x'])
txyp['y'] = np.asarray(txyp['y'])
txyp['t'] = np.asarray(txyp['t'])
txyp['p'] = np.asarray(txyp['p'])
return txyp
[docs]def create_same_directory_structure(source_dir: str, target_dir: str) -> None:
'''
:param source_dir: Path of the directory that be copied from
:type source_dir: str
:param target_dir: Path of the directory that be copied to
:type target_dir: str
:return: None
Create the same directory structure in ``target_dir`` with that of ``source_dir``.
'''
for sub_dir_name in os.listdir(source_dir):
source_sub_dir = os.path.join(source_dir, sub_dir_name)
if os.path.isdir(source_sub_dir):
target_sub_dir = os.path.join(target_dir, sub_dir_name)
os.mkdir(target_sub_dir)
print(f'Mkdir [{target_sub_dir}].')
create_same_directory_structure(source_sub_dir, target_sub_dir)
[docs]def integrate_events_file_to_frames_file_by_fixed_frames_number(events_np_file: str, output_dir: str, split_by: str,
frames_num: int, H: int, W: int,
print_save: bool = False) -> None:
'''
:param events_np_file: path of the events np file
:type events_np_file: str
:param output_dir: output directory for saving the frames
:type output_dir: str
:param split_by: 'time' or 'number'
:type split_by: str
:param frames_num: the number of frames
:type frames_num: int
:param H: the height of frame
:type H: int
:param W: the weight of frame
:type W: int
:param print_save: If ``True``, this function will print saved files' paths.
:type print_save: bool
:return: None
Integrate a events file to frames by fixed frames number and save it. See ``cal_fixed_frames_number_segment_index`` and ``integrate_events_segment_to_frame`` for more details.
'''
fname = os.path.join(output_dir, os.path.basename(events_np_file))
np.savez(fname, frames=integrate_events_by_fixed_frames_number(np.load(events_np_file), split_by, frames_num, H, W))
if print_save:
print(f'Frames [{fname}] saved.')
[docs]def integrate_events_by_fixed_frames_number(events: dict, split_by: str, frames_num: int, H: int, W: int) -> np.ndarray:
'''
:param events: a dict whose keys are ['t', 'x', 'y', 'p'] and values are ``numpy.ndarray``
:type events: Dict
:param split_by: 'time' or 'number'
:type split_by: str
:param frames_num: the number of frames
:type frames_num: int
:param H: the height of frame
:type H: int
:param W: the weight of frame
:type W: int
:return: frames
:rtype: np.ndarray
Integrate events to frames by fixed frames number. See ``cal_fixed_frames_number_segment_index`` and ``integrate_events_segment_to_frame`` for more details.
'''
j_l, j_r = cal_fixed_frames_number_segment_index(events['t'], split_by, frames_num)
frames = np.zeros([frames_num, 2, H, W])
for i in range(frames_num):
frames[i] = integrate_events_segment_to_frame(events, H, W, j_l[i], j_r[i])
return frames
[docs]def cal_fixed_frames_number_segment_index(events_t: np.ndarray, split_by: str, frames_num: int) -> tuple:
'''
:param events_t: events' t
:type events_t: numpy.ndarray
:param split_by: 'time' or 'number'
:type split_by: str
:param frames_num: the number of frames
:type frames_num: int
:return: a tuple ``(j_l, j_r)``
:rtype: tuple
Denote ``frames_num`` as :math:`M`, if ``split_by`` is ``'time'``, then
.. math::
\\Delta T & = [\\frac{t_{N-1} - t_{0}}{M}] \\\\
j_{l} & = \\mathop{\\arg\\min}\\limits_{k} \\{t_{k} | t_{k} \\geq t_{0} + \\Delta T \\cdot j\\} \\\\
j_{r} & = \\begin{cases} \\mathop{\\arg\\max}\\limits_{k} \\{t_{k} | t_{k} < t_{0} + \\Delta T \\cdot (j + 1)\\} + 1, & j < M - 1 \\cr N, & j = M - 1 \\end{cases}
If ``split_by`` is ``'number'``, then
.. math::
j_{l} & = [\\frac{N}{M}] \\cdot j \\\\
j_{r} & = \\begin{cases} [\\frac{N}{M}] \\cdot (j + 1), & j < M - 1 \\cr N, & j = M - 1 \\end{cases}
'''
j_l = np.zeros(shape=[frames_num], dtype=int)
j_r = np.zeros(shape=[frames_num], dtype=int)
N = events_t.size
if split_by == 'number':
di = N // frames_num
for i in range(frames_num):
j_l[i] = i * di
j_r[i] = j_l[i] + di
j_r[-1] = N
elif split_by == 'time':
dt = (events_t[-1] - events_t[0]) // frames_num
idx = np.arange(N)
for i in range(frames_num):
t_l = dt * i + events_t[0]
t_r = t_l + dt
mask = np.logical_and(events_t >= t_l, events_t < t_r)
idx_masked = idx[mask]
j_l[i] = idx_masked[0]
j_r[i] = idx_masked[-1] + 1
j_r[-1] = N
else:
raise NotImplementedError
return j_l, j_r
[docs]def integrate_events_segment_to_frame(events: dict, H: int, W: int, j_l: int = 0, j_r: int = -1) -> np.ndarray:
'''
:param events: a dict whose keys are ['t', 'x', 'y', 'p'] and values are ``numpy.ndarray``
:type events: Dict
:param H: height of the frame
:type H: int
:param W: weight of the frame
:type W: int
:param j_l: the start index of the integral interval, which is included
:type j_l: int
:param j_r: the right index of the integral interval, which is not included
:type j_r:
:return: frames
:rtype: np.ndarray
Denote a two channels frame as :math:`F` and a pixel at :math:`(p, x, y)` as :math:`F(p, x, y)`, the pixel value is integrated from the events data whose indices are in :math:`[j_{l}, j_{r})`:
.. math::
F(p, x, y) &= \sum_{i = j_{l}}^{j_{r} - 1} \mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i})
where :math:`\lfloor \cdot \rfloor` is the floor operation, :math:`\mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i})` is an indicator function and it equals 1 only when :math:`(p, x, y) = (p_{i}, x_{i}, y_{i})`.
'''
# 累计脉冲需要用bitcount而不能直接相加,原因可参考下面的示例代码,以及
# https://stackoverflow.com/questions/15973827/handling-of-duplicate-indices-in-numpy-assignments
# We must use ``bincount`` rather than simply ``+``. See the following reference:
# https://stackoverflow.com/questions/15973827/handling-of-duplicate-indices-in-numpy-assignments
# Here is an example:
# height = 3
# width = 3
# frames = np.zeros(shape=[2, height, width])
# events = {
# 'x': np.asarray([1, 2, 1, 1]),
# 'y': np.asarray([1, 1, 1, 2]),
# 'p': np.asarray([0, 1, 0, 1])
# }
#
# frames[0, events['y'], events['x']] += (1 - events['p'])
# frames[1, events['y'], events['x']] += events['p']
# print('wrong accumulation\n', frames)
#
# frames = np.zeros(shape=[2, height, width])
# for i in range(events['p'].__len__()):
# frames[events['p'][i], events['y'][i], events['x'][i]] += 1
# print('correct accumulation\n', frames)
#
# frames = np.zeros(shape=[2, height, width])
# frames = frames.reshape(2, -1)
#
# mask = [events['p'] == 0]
# mask.append(np.logical_not(mask[0]))
# for i in range(2):
# position = events['y'][mask[i]] * width + events['x'][mask[i]]
# events_number_per_pos = np.bincount(position)
# idx = np.arange(events_number_per_pos.size)
# frames[i][idx] += events_number_per_pos
# frames = frames.reshape(2, height, width)
# print('correct accumulation by bincount\n', frames)
frame = np.zeros(shape=[2, H * W])
x = events['x'][j_l: j_r].astype(int) # avoid overflow
y = events['y'][j_l: j_r].astype(int)
p = events['p'][j_l: j_r]
mask = []
mask.append(p == 0)
mask.append(np.logical_not(mask[0]))
for c in range(2):
position = y[mask[c]] * W + x[mask[c]]
events_number_per_pos = np.bincount(position)
frame[c][np.arange(events_number_per_pos.size)] += events_number_per_pos
return frame.reshape((2, H, W))
[docs]def integrate_events_file_to_frames_file_by_fixed_duration(events_np_file: str, output_dir: str, duration: int, H: int,
W: int, print_save: bool = False) -> None:
'''
:param events_np_file: path of the events np file
:type events_np_file: str
:param output_dir: output directory for saving the frames
:type output_dir: str
:param duration: the time duration of each frame
:type duration: int
:param H: the height of frame
:type H: int
:param W: the weight of frame
:type W: int
:param print_save: If ``True``, this function will print saved files' paths.
:type print_save: bool
:return: None
Integrate events to frames by fixed time duration of each frame.
'''
frames = integrate_events_by_fixed_duration(np.load(events_np_file), duration, H, W)
fname, _ = os.path.splitext(os.path.basename(events_np_file))
fname = os.path.join(output_dir, f'{fname}_{frames.shape[0]}.npz')
np.savez(fname, frames=frames)
if print_save:
print(f'Frames [{fname}] saved.')
return frames.shape[0]
[docs]def integrate_events_by_fixed_duration(events: dict, duration: int, H: int, W: int) -> np.ndarray:
'''
:param events: a dict whose keys are ['t', 'x', 'y', 'p'] and values are ``numpy.ndarray``
:type events: Dict
:param duration: the time duration of each frame
:type duration: int
:param H: the height of frame
:type H: int
:param W: the weight of frame
:type W: int
:return: frames
:rtype: np.ndarray
Integrate events to frames by fixed time duration of each frame.
'''
t = events['t']
N = t.size
frames = []
left = 0
right = 0
while True:
t_l = t[left]
while True:
if right == N or t[right] - t_l > duration:
break
else:
right += 1
# integrate from index [left, right)
frames.append(np.expand_dims(integrate_events_segment_to_frame(events, H, W, left, right), 0))
left = right
if right == N:
return np.concatenate(frames)
# if __name__ == "__main__":
# import numpy as np
# import matplotlib.pyplot as plt
# import os
# import wave
#
# # 读入音频。
# path = r"F:\GitCode\Python\datasets\TidigitsWAV1\train\zero"
# name = '1.wav'
# # 我音频的路径为E:\SpeechWarehouse\zmkm\zmkm0.wav
# filename = os.path.join(path, name)
#
# # 打开语音文件。
# f = wave.open(filename, 'rb')
# # 得到语音参数
# params = f.getparams()
# nchannels, sampwidth, framerate, nframes = params[:4]
# # ---------------------------------------------------------------#
# # 将字符串格式的数据转成int型
# print("reading wav file......")
# strData = f.readframes(nframes)
# waveData = np.fromstring(strData, dtype=np.short)
# # 归一化
# waveData = waveData * 1.0 / max(abs(waveData))
# # 将音频信号规整乘每行一路通道信号的格式,即该矩阵一行为一个通道的采样点,共nchannels行
# waveData = np.reshape(waveData, [nframes, nchannels]).T # .T 表示转置
# f.close() # 关闭文件
# print("file is closed!")
# # ----------------------------------------------------------------#
# '''绘制语音波形'''
# print("plotting signal wave...")
# time = np.arange(0, nframes) * (1.0 / framerate) # 计算时间
# time = np.reshape(time, [nframes, 1]).T
# plt.plot(time[0, :nframes], waveData[0, :nframes], c="b")
# plt.axis('off') # no axis
# plt.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[])
# # plt.xlabel("time")
# # plt.ylabel("amplitude")
# # plt.title("Original wave")
# plt.show()
# print('end')
# sroot = r'F:\GitCode\Python\datasets\TidigitsWAV'
# troot = r'F:\GitCode\Python\datasets\TidigitsWAV1'
# classes = {
# "zero": 0,
# "one": 1,
# "two": 2,
# "three": 3,
# "four": 4,
# "five": 5,
# "six": 6,
# "seven": 7,
# "eight": 8,
# "nine": 9,
# "oh": 10
# }
# save_mfcc_feature(root=troot, npz_name='mfcc_test.npz', class_labels=classes)
# dataset_split(sroot, troot, 0.7, True)
# print('end')
# sroot = r'C:\Users\hp\Desktop\AudioMNIST'
# troot = r'C:\Users\hp\Desktop\SpeechMNIST'
# reclassification(sroot, troot, 10, 50)
# time_now = time.time()
# path = r'F:\GitCode\Python\dataset\AudioMNIST\train\0\0_01_2.wav'
# sound, sampling_freq = wav_file_resample(path, 16e3)
# print(time.time() - time_now)
# time_now1 = time.time()
# sound1, fs = librosa.load(path, sr=16e3)
# print(time.time() - time_now1)
# root = r'F:\GitCode\Python\datasets\AudioMNIST'
# save_numpy_format(root, True, sample_rate=16e3)
# save_numpy_format(root, False, sample_rate=16e3)
# load_audio_data(root, True)
# load_audio_data(root, False)
# source = data['train_audios']
# maxNum = data['maxNum']
# datasetAlignment(source, maxNum)
# root = r'F:\GitCode\Python\datasets\AudioMNIST'
# filenameTr = 'train.npz'
# root = r'F:\GitCode\Python\datasets\DigitsVoices'
# filenameTr = 'train_kernels_num50significance_level10.npz'
# filerootTr = os.path.join(root, filenameTr)
# dataTr = np.load(filerootTr, allow_pickle=True)
# trian_audios0 = dataTr['train_ids'][0]
# trian_audios1 = dataTr['train_ids'][1]
# trian_audios2 = dataTr['train_ids'][2]
# trian_audios3 = dataTr['train_ids'][3]
# train_ids = dataTr['train_ids']
# maxvalue = get_Max(dataTr['train_audios'])
# filenameTe = 'test_kernels_num50significance_level10.npz'
# filerootTe = os.path.join(root, filenameTe)
# dataTe = np.load(filerootTe, allow_pickle=True)
# test_audios0 = dataTe['test_audios'][0]
# test_audios1 = dataTe['test_audios'][1]
# test_audios2 = dataTe['test_audios'][2]
# test_audios3 = dataTe['test_audios'][3]
# test_audios = dataTe['test_audios']
# test_ids = dataTe['test_ids']
# maxvalue = get_Max(dataTe['test_audios'])
# print(maxvalue)
# print('')