# -*- coding: utf-8 -*-
"""
A python module for audio I/O based on `spAudio <https://www-ie.meijo-u.ac.jp/labs/rj001/spLibs/index.html>`_.
Example:
The following is the example to realize fullduplex audio I/O (version 0.7.15+).
::
import spaudio
with spaudio.open('rw', nchannels=2, samprate=44100, buffersize=2048) as a:
nloop = 500
b = bytearray(4096)
for i in range(nloop):
a.readraw(b)
a.writeraw(b)
"""
import locale
import warnings
import array
from collections import namedtuple
from _spaudio import spaudio_c as _spaudio_c
__version__ = '0.7.16'
OUTPUT_POSITION_CALLBACK = (1 << 0)
OUTPUT_BUFFER_CALLBACK = (1 << 2)
_audio_global = None
_encoding = locale.getpreferredencoding()
_defaultdrivername = _spaudio_c.xspGetAudioDriverName(0)
_ischarbinary = isinstance(_defaultdrivername, bytes)
# https://stackoverflow.com/questions/2166818/how-to-check-if-an-object-is-an-instance-of-a-namedtuple
def _isnamedtupleinstance(x):
t = type(x)
b = t.__bases__
if len(b) != 1 or b[0] != tuple:
return False
f = getattr(t, '_fields', None)
if not isinstance(f, tuple):
return False
return all(type(n) == str for n in f)
[docs]class Error(Exception):
"""Base exception class for spaudio."""
pass
[docs]class DriverError(Error):
"""Exception raised by audio driver problems."""
pass
[docs]class DeviceError(Error):
"""Exception raised by audio device problems."""
pass
_StandardLibParams = namedtuple('_standard_lib_params',
'nchannels sampwidth framerate nframes comptype compname')
_STANDARD_LIB_KEYS = ['nchannels', 'sampwidth', 'framerate',
'nframes', 'comptype', 'compname']
PARAMS_KEYS = ['nchannels', 'sampbit', 'samprate',
'blockingmode', 'buffersize', 'nbuffers',
'sampwidth', 'framerate']
[docs]def callbacksignature(audio, calltype, cbdata, args):
"""Signature of the callback function for :func:`~spaudio.SpAudio.setcallback` method.
Args:
audio (SpAudio): An instance of :class:`~spaudio.SpAudio` class.
calltype (int): The callback type. ``OUTPUT_POSITION_CALLBACK`` or
``OUTPUT_BUFFER_CALLBACK``.
cbdata: Callback-depend data. A position (int) on ``OUTPUT_POSITION_CALLBACK``
or a buffer (bytes) on ``OUTPUT_BUFFER_CALLBACK``.
args: Variable length argument list specified at :func:`~spaudio.SpAudio.setcallback`.
Note that this `args` variable does not require the prefix ``*``.
Returns:
bool: ``False`` if you don't want to fire callbacks anymore.
"""
return True
[docs]class SpAudio:
"""A class for audio I/O.
Args:
drivername (str): the driver name to initialize.
"""
def __init__(self, drivername=None):
self._audio = None
self._read_mode = ' '
self._write_mode = ' '
self._initdriver(drivername)
def __del__(self):
self.terminate()
def __enter__(self):
return self
def __exit__(self, *args):
self.terminate()
[docs] def reload(self, drivername=None):
"""Reloads a new audio driver."""
self.terminate()
self._initdriver(drivername)
[docs] def terminate(self):
"""Terminates the current audio driver."""
if self._read_mode[0] == 'r' or self._write_mode[0] == 'w':
self.close()
global _audio_global
if (self._audio is not None) and (self._audio is _audio_global):
_spaudio_c._spFreeAudioDriver(self._audio)
_audio_global = None
self._audio = None
def _initdriver(self, drivername=None):
"""Initializes the audio driver."""
if drivername is not None:
global _ischarbinary
if _ischarbinary:
global _encoding
encodedname = drivername.encode(_encoding)
else:
encodedname = drivername
else:
encodedname = None
global _audio_global
if _audio_global is not None:
self._audio = _audio_global
else:
self._audio = _spaudio_c.spInitAudioDriver(encodedname)
if self._audio is None:
raise DriverError('cannot load audio driver')
_audio_global = self._audio
[docs] def getndevices(self):
"""Gets the number of audio devices."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, numdevice = _spaudio_c.spGetNumAudioDevice(self._audio)
if flag:
return numdevice
else:
raise DriverError('audio driver error')
[docs] def getdevicename(self, deviceindex):
"""Gets the name of the audio device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if deviceindex < 0 or deviceindex >= self.getndevices():
raise ValueError('0 <= index < %d is required'
% self.getndevices())
name = _spaudio_c.xspGetAudioDeviceName(self._audio, deviceindex)
if name is not None:
global _ischarbinary
if _ischarbinary:
global _encoding
return name.decode(_encoding)
else:
return name
else:
raise DriverError('audio driver error')
[docs] def getdevicelist(self):
"""Gets the list of audio devices."""
alist = []
for i in range(self.getndevices()):
alist.append(self.getdevicename(i))
return alist
[docs] def selectdevice(self, deviceindex):
"""Selects an audio device which has the index specified.
Args:
deviceindex (int): The index associated with an audio device.
Raises:
ValueError: If `deviceindex` is greater than or equal to
the number of devices.
DriverError: If the audio device cannot be selected.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if deviceindex < 0 or deviceindex >= self.getndevices():
raise ValueError('0 <= index < %d is required'
% self.getndevices())
flag = _spaudio_c.spSelectAudioDevice(self._audio, deviceindex)
if not flag:
raise DriverError('cannot select the audio device')
[docs] def setcallback(self, calltype, func, *args):
"""Sets a callback function.
Args:
calltype (int): A combination of callback types.
``OUTPUT_POSITION_CALLBACK`` and ``OUTPUT_BUFFER_CALLBACK``
are supported currently.
func (callable): Callback function. The callback must have
a signature in the :func:`~spaudio.callbacksignature` document.
*args: Variable length argument list.
Raises:
DriverError: If the callback function cannot be set.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spSetAudioCallbackFunc_(self._audio, calltype,
(func, self, args))
if not flag:
raise DriverError('cannot set a callback function')
[docs] def setsamprate(self, samprate):
"""Sets sample rate of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spSetAudioSampleRate(self._audio, samprate)
if not flag:
raise DriverError('audio driver error')
[docs] def setframerate(self, samprate):
"""Sets sample rate of the current device."""
self.setsamprate(samprate)
[docs] def getsamprate(self):
"""Gets sample rate of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, samprate = _spaudio_c.spGetAudioSampleRate(self._audio)
if flag:
return samprate
else:
raise DriverError('audio driver error')
[docs] def getframerate(self):
"""Gets sample rate of the current device."""
return self.getsamprate()
[docs] def setsampbit(self, sampbit):
"""Sets bits/sample of the current device. sampbit = 33 means
32bit float."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if sampbit < 8:
raise ValueError('sampbit >= 8 is required')
flag = _spaudio_c.spSetAudioSampleBit(self._audio, sampbit)
if not flag:
raise DriverError('audio driver error')
[docs] def setsampwidth(self, sampwidth, floatflag=False):
"""Sets bytes/sample of the current device."""
sampbit = sampwidth * 8
if floatflag:
sampbit += 1
return self.setsampbit(sampbit)
[docs] def getsampbit(self):
"""Gets bits/sample of the current device. sampbit = 33 means
32bit float."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, sampbit = _spaudio_c.spGetAudioSampleBit(self._audio)
if flag:
return sampbit
else:
raise DriverError('audio driver error')
[docs] def getsampwidth(self):
"""Gets bytes/sample of the current device."""
return self.getsampbit() // 8
[docs] def getrawsampbit(self):
"""Gets the bits/sample for a raw array."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, sampbit = _spaudio_c.spGetAudioSpecifiedSampleBit(self._audio)
if flag:
if sampbit < 16:
return 16
elif 16 < sampbit <= 32:
return 32
else:
return sampbit
else:
raise DriverError('audio driver error')
[docs] def getrawsampwidth(self):
"""Gets the bytes/sample for a raw array."""
return self.getrawsampbit() // 8
[docs] def setnchannels(self, nchannels):
"""Sets the number of channels of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if nchannels <= 0:
raise ValueError('nchannels >= 1 is required')
flag = _spaudio_c.spSetAudioChannel(self._audio, nchannels)
if not flag:
raise DriverError('audio driver error')
[docs] def getnchannels(self):
"""Gets the number of channels of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, nchannels = _spaudio_c.spGetAudioChannel(self._audio)
if flag:
return nchannels
else:
raise DriverError('audio driver error')
[docs] def setbuffersize(self, buffersize):
"""Sets the buffer size of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spSetAudioBufferSize(self._audio, buffersize)
if not flag:
raise DriverError('audio driver error')
[docs] def getbuffersize(self):
"""Gets the buffer size of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, buffersize = _spaudio_c.spGetAudioBufferSize(self._audio)
if flag:
return buffersize
else:
raise DriverError('audio driver error')
[docs] def setnbuffers(self, nbuffers):
"""Sets the number of buffers of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if nbuffers <= 0:
raise ValueError('nbuffers >= 1 is required')
flag = _spaudio_c.spSetAudioNumBuffer(self._audio, nbuffers)
if not flag:
raise DriverError('audio driver error')
[docs] def getnbuffers(self):
"""Gets the number of buffers of the current device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, nbuffers = _spaudio_c.spGetAudioNumBuffer(self._audio)
if flag:
return nbuffers
else:
raise DriverError('audio driver error')
[docs] def setblockingmode(self, mode):
"""Sets the blocking mode of the current device.
Args:
mode (int): 0 = blocking mode (default), 1 = nonblocking mode."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spSetAudioBlockingMode(self._audio, mode)
if not flag:
raise DriverError('audio driver error')
[docs] def getblockingmode(self):
"""Gets the blocking mode of the current device.
Returns:
int: 0 = blocking mode (default), 1 = nonblocking mode.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag, mode = _spaudio_c.spGetAudioBlockingMode(self._audio)
if flag:
return mode
else:
raise DriverError('audio driver error')
[docs] def getcomptype(self, decodebytes=False):
"""Returns compression type. Currently, ``'NONE'`` (decodebytes = ``True``)
or ``b'NONE'`` (decodebytes = ``False``) will be returned."""
if decodebytes:
return 'NONE'
else:
return b'NONE'
[docs] def getcompname(self, decodebytes=False):
"""Returns human-readable name of compression type. Currently,
``'not compressed'`` (decodebytes = ``True``) or ``b'not compressed'``
(decodebytes = ``False``) will be returned.
"""
if decodebytes:
return 'not compressed'
else:
return b'not compressed'
[docs] def setcomptype(self, encodestr=True):
"""Sets compression type. This parameter is ignored."""
pass
[docs] def setparams(self, params):
"""Sets supported all parameters described in dict or
namedtuple object to the device.
Args:
params (dict): a dict object of parameters whose keys are
``'nchannels'``, ``'sampbit'``, ``'samprate'``,
``'blockingmode'``, ``'buffersize'``, or ``'nbuffers'``.
The namedtuple generated by standard libraries
such as aifc, wave, or sunau is also acceptable.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
# if isinstance(params, namedtuple):
if _isnamedtupleinstance(params):
# some versions of Python include _asdist() bug.
params = dict(zip(params._fields, params))
elif isinstance(params, tuple):
params = dict(zip(_STANDARD_LIB_KEYS, params))
sampbit = 0
for key, value in params.items():
if key == 'sampwidth':
if sampbit == 0:
sampbit = value * 8
elif key == 'sampbit':
sampbit = value
elif key == 'nchannels':
self.setnchannels(value)
elif key in ('samprate', 'framerate'):
self.setsamprate(value)
elif key == 'blockingmode':
self.setblockingmode(value)
elif key == 'buffersize':
self.setbuffersize(value)
elif key == 'nbuffers':
self.setnbuffers(value)
if sampbit > 0:
self.setsampbit(sampbit)
[docs] def getparams(self):
"""Gets supported all parameters of the current device in dict object.
Returns:
dict: A dict object including all parameters of the current device
whose keys are ``'nchannels'``, ``'sampbit'``, ``'samprate'``,
``'blockingmode'``, ``'buffersize'``, and ``'nbuffers'``.
"""
return dict(nchannels=self.getnchannels(), sampbit=self.getsampbit(),
samprate=self.getsamprate(), blockingmode=self.getblockingmode(),
buffersize=self.getbuffersize(), nbuffers=self.getnbuffers(),
sampwidth=self.getrawsampwidth(), framerate=self.getframerate())
[docs] def getparamstuple(self, decodebytes=False, nframes=0):
"""Gets supported all parameters of the current device in namedtuple object.
Args:
decodebytes (bool, optional): ``True`` decodes bytes objects into
string objects for ``'comptype'`` and ``'compname'``.
The standard libraries of wave and sunau expect a decoded
string object while the standard aifc library expects
a bytes object.
nframes (int, optional): Specify the number of frames of an audio
file, otherwise 4th element of the output tuple will be 0.
Returns:
namedtuple: A namedtuple object including all parameters of
the current device whose entries are
``(nchannels, sampwidth, framerate, nframes, comptype, compname)`` .
This object is compatible with the argument of ``setparams()``
of standard libraries such as aifc, wave, or sunau.
"""
return _StandardLibParams(self.getnchannels(), self.getsampwidth(),
self.getframerate(), nframes,
self.getcomptype(decodebytes),
self.getcompname(decodebytes))
def _checksecondopen(self, mode):
if (mode[0] == 'w' and self._read_mode[0] == 'r') \
or (mode[0] == 'r' and self._write_mode[0] == 'w'):
raise RuntimeError('cannot set parameters in 2nd open call')
[docs] def open(self, mode, *, callback=None, deviceindex=-1, samprate=0,
sampbit=0, nchannels=0, blockingmode=-1, buffersize=0,
nbuffers=0, params=None):
"""Opens the current audio device.
Args:
mode (str): Device opening mode. ``'r'`` means read mode, ``'w'``
means write mode. ``'ro'`` and ``'wo'`` which mean read only
and write only modes are also supported. Although these modes
validate only the specified mode, some environments recieve
a benefit that the processing becomes faster.
callback (tuple, optional): The callback function included in tuple
which contains all arguments for
:func:`~spaudio.SpAudio.setcallback` method.
deviceindex (int, optional): The index of the audio device.
samprate (double, optional): Sample rate of the device.
sampbit (int, optional): Bits/sample of the device.
nchannels (int, optional): The number of channels of the device.
blockingmode (int, optional): 0 = blocking mode (default),
1 = nonblocking mode.
buffersize (int, optional): The buffer size of the device.
nbuffers (int, optional): The number of buffers of the device.
params (dict, optional): A dict object which can contain any
above parameters from `samprate` to `nbuffers`.
Note:
Support for the keyword arguments was added in Version 0.7.15.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if callback is not None:
if isinstance(callback, tuple):
self.setcallback(*callback)
else:
self.setcallback(OUTPUT_POSITION_CALLBACK | OUTPUT_BUFFER_CALLBACK,
callback, None)
if params is not None:
self._checksecondopen(mode)
self.setparams(params)
if deviceindex != -1:
self._checksecondopen(mode)
self.selectdevice(deviceindex)
if samprate != 0:
self._checksecondopen(mode)
self.setsamprate(samprate)
if sampbit != 0:
self._checksecondopen(mode)
self.setsampbit(sampbit)
if nchannels != 0:
self._checksecondopen(mode)
self.setnchannels(nchannels)
if blockingmode != -1:
self._checksecondopen(mode)
self.setblockingmode(blockingmode)
if buffersize != 0:
self._checksecondopen(mode)
self.setbuffersize(buffersize)
if nbuffers != 0:
self._checksecondopen(mode)
self.setnbuffers(nbuffers)
if mode[0] == 'r':
if self._write_mode[-1] == 'o':
raise RuntimeError('device has been opened with write only mode')
elif self._read_mode[0] == 'r':
raise RuntimeError('device has already been opened')
self._read_mode = mode
elif mode[0] == 'w':
if self._read_mode[-1] == 'o':
raise RuntimeError('device has been opened with read only mode')
elif self._write_mode[0] == 'w':
raise RuntimeError('device has already been opened')
self._write_mode = mode
else:
raise RuntimeError('unknown open mode: %s' % mode)
global _ischarbinary
if _ischarbinary:
mode2 = mode.encode('utf-8')
else:
mode2 = mode
flag = _spaudio_c.spOpenAudioDevice(self._audio, mode2)
if not flag:
raise DeviceError('cannot open audio device')
[docs] def close(self):
"""Closes the current audio device."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spCloseAudioDevice(self._audio)
self._read_mode = ' '
self._write_mode = ' '
if not flag:
raise DeviceError('cannot close audio device')
[docs] def stop(self):
"""Stops audio I/O."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spStopAudio(self._audio)
if not flag:
raise DeviceError('cannot stop audio I/O')
[docs] def sync(self):
"""Synchronizes audio I/O."""
if self._audio is None:
raise RuntimeError('driver must be loaded')
flag = _spaudio_c.spSyncAudio(self._audio)
if not flag:
raise DeviceError('cannot synchronize audio I/O')
[docs] def getrawarraytypecode(self):
"""Gets the type code for python array to store raw data.
Returns:
char: A type code for the current settings.
"""
sampbit = self.getrawsampbit()
if sampbit >= 64:
typecode = 'd'
elif sampbit >= 33:
typecode = 'f'
elif sampbit > 16: # sampbit >= 32
typecode = 'l'
else:
typecode = 'h'
return typecode
[docs] def createrawarray(self, length, nframesflag=False):
"""Creates a raw array for the current device settings.
Args:
length: A length of the array. Note that this length is
not identical to the number of frames
(length = nframes * nchannels).
If you want to specify the number of frames,
the second argument must be ``True``.
nframesflag (bool, optional): ``True`` makes the first argument
be treated as the number of frames.
Returns:
array.array: An array class object for the current device settings.
"""
if nframesflag:
length = int(length) * self.getnchannels()
size = int(length) * self.getrawsampwidth()
buffer = bytearray(size)
return array.array(self.getrawarraytypecode(), buffer)
[docs] def getarraytypecode(self):
"""Gets the type code for python array to store double-precision data.
Returns:
char: A type code of a double-precision array for the current settings.
"""
return 'd'
[docs] def createarray(self, length, nframesflag=False):
"""Creates a double-precision array for the current device settings.
Args:
length: A length of the array. Note that this length is
not identical to the number of frames
(length = nframes * nchannels).
If you want to specify the number of frames,
the second argument must be ``True``.
nframesflag (bool, optional): ``True`` makes the first argument
be treated as the number of frames.
Returns:
array.array: An array class object for the current device settings.
"""
if nframesflag:
length = int(length) * self.getnchannels()
size = int(length) * 8
buffer = bytearray(size)
return array.array(self.getarraytypecode(), buffer)
[docs] def getrawndarraydtype(self):
"""Gets the dtype string for numpy ndarray to store raw data.
Returns:
string: A dtype string for the current settings.
"""
sampbit = self.getrawsampbit()
if sampbit >= 64:
dtypestr = 'f8'
elif sampbit >= 33:
dtypestr = 'f4'
elif sampbit > 16: # sampbit >= 32
dtypestr = 'i4'
else:
dtypestr = 'i2'
return dtypestr
[docs] def createrawndarray(self, length, nframesflag=False, channelwise=False):
"""Creates a raw numpy ndarray for the current device settings.
Args:
length: A length of the array. Note that this length is
not identical to the number of frames
(length = nframes * nchannels).
If you want to specify the number of frames,
the second argument must be ``True``.
nframesflag (bool, optional): ``True`` makes the first argument
be treated as the number of frames.
channelwise (bool, optional): ``True`` resizes the returned array
into (nframes, nchannels) matrix.
This argument is introduced in Version 0.7.16.
Returns:
numpy.ndarray: An ndarray class object for the current device
settings.
"""
import numpy as np
nchannels = self.getnchannels()
if nframesflag:
nframes = int(length)
length = nframes * nchannels
else:
length = int(length)
nframes = length // nchannels
size = length * self.getrawsampwidth()
buffer = bytearray(size)
oarray = np.frombuffer(buffer, dtype=self.getrawndarraydtype())
if channelwise:
oarray.resize((nframes, nchannels))
return oarray
[docs] def getndarraydtype(self):
"""Gets the dtype string for numpy ndarray to store double-precision data.
Returns:
string: A dtype string for the current settings.
"""
return 'f8'
[docs] def createndarray(self, length, nframesflag=False, channelwise=False):
"""Creates a numpy double-precision array for the current device settings.
Args:
length: A length of the array. Note that this length is
not identical to the number of frames
(length = nframes * nchannels).
If you want to specify the number of frames,
the second argument must be ``True``.
nframesflag (bool, optional): ``True`` makes the first argument
be treated as the number of frames.
channelwise (bool, optional): ``True`` resizes the returned array
into (nframes, nchannels) matrix.
This argument is introduced in Version 0.7.16.
Returns:
numpy.ndarray: An ndarray class object for the current device
settings.
"""
import numpy as np
nchannels = self.getnchannels()
if nframesflag:
nframes = int(length)
length = nframes * nchannels
else:
length = int(length)
nframes = length // nchannels
size = length * 8
buffer = bytearray(size)
oarray = np.frombuffer(buffer, dtype=self.getndarraydtype())
if channelwise:
oarray.resize((nframes, nchannels))
return oarray
[docs] def readraw(self, data, offset=0, length=0):
"""Reads raw data from the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A raw array to receive raw data from the audio device.
offset (int, optional): Optional offset location for the array.
length (int, optional): Optional read length for the array.
Returns:
int: The read size if successful, -1 otherwise.
Note:
The keyword arguments of `offset` and `length` were
introduced in Version 0.7.15.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if data is None or len(data) <= 0:
raise ValueError('a valid buffer must be specified')
if self._read_mode[0] != 'r':
raise RuntimeError('device must be opened with read mode')
if isinstance(data, bytearray):
buffer = data
elif isinstance(data, array.array):
buffer = memoryview(data)
elif type(data).__name__ == 'ndarray':
import numpy as np
buffer = data.data
else:
raise RuntimeError('unsupported data type')
offsetbyte = int(offset) * self.getrawsampwidth() if offset > 0 else 0
lengthbyte = int(length) * self.getrawsampwidth() if length > 0 else 0
nread = _spaudio_c.spReadAudioBuffer_(self._audio, buffer,
offsetbyte, lengthbyte)
return nread // self.getrawsampwidth() if nread > 0 else nread
[docs] def writeraw(self, data, offset=0, length=0):
"""Writes data of a raw array to the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A raw array to send data to the audio device.
offset (int, optional): Optional offset location for the array.
length (int, optional): Optional write length for the array.
Returns:
int: The written size if successful, -1 otherwise.
Note:
The keyword arguments of `offset` and `length` were
introduced in Version 0.7.15.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if data is None or len(data) <= 0:
raise ValueError('a valid buffer must be specified')
if self._write_mode[0] != 'w':
raise RuntimeError('device must be opened with write mode')
if isinstance(data, (bytearray, bytes)):
buffer = data
elif isinstance(data, array.array):
buffer = memoryview(data)
elif type(data).__name__ == 'ndarray':
import numpy as np
buffer = data.data
else:
raise RuntimeError('unsupported data type')
offsetbyte = int(offset) * self.getrawsampwidth() if offset > 0 else 0
lengthbyte = int(length) * self.getrawsampwidth() if length > 0 else 0
nwrite = _spaudio_c.spWriteAudioBuffer_(self._audio, buffer,
offsetbyte, lengthbyte)
return nwrite // self.getrawsampwidth() if nwrite > 0 else nwrite
[docs] def read(self, data, weight=1.0, offset=0, length=0):
"""Reads data to a double-precision array from the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A double-precision array to receive data from the audio device.
weight (double, optional): A weighting factor multiplied
to data after reading.
offset (int, optional): Optional offset location for the array.
length (int, optional): Optional read length for the array.
Returns:
int: The read size if successful, -1 otherwise.
Note:
The keyword arguments of `offset` and `length` were
introduced in Version 0.7.15.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if data is None or len(data) <= 0:
raise ValueError('a valid buffer must be specified')
if self._read_mode[0] != 'r':
raise RuntimeError('device must be opened with read mode')
if isinstance(data, bytearray):
buffer = data
elif isinstance(data, array.array):
if data.typecode != 'd':
raise RuntimeError('the typecode must be \'d\'')
buffer = memoryview(data)
elif type(data).__name__ == 'ndarray':
import numpy as np
if not np.issubdtype('f8', data.dtype):
raise RuntimeError('the dtype must be \'f8\' (\'float64\')')
buffer = data.data
else:
raise RuntimeError('unsupported data type')
return _spaudio_c.spReadAudioDoubleBufferWeighted_(self._audio, buffer,
weight, offset, length)
[docs] def write(self, data, weight=1.0, offset=0, length=0):
"""Writes data of a double-precision array to the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A double-precision array to send data to the audio device.
weight (double, optional): A weighting factor multiplied
to data before writing.
offset (int, optional): Optional offset location for the array.
length (int, optional): Optional write length for the array.
Returns:
int: The written size if successful, -1 otherwise.
Note:
The keyword arguments of `offset` and `length` were
introduced in Version 0.7.15.
"""
if self._audio is None:
raise RuntimeError('driver must be loaded')
if data is None or len(data) <= 0:
raise ValueError('a valid buffer must be specified')
if self._write_mode[0] != 'w':
raise RuntimeError('device must be opened with write mode')
if isinstance(data, (bytearray, bytes)):
buffer = data
elif isinstance(data, array.array):
if data.typecode != 'd':
raise RuntimeError('the typecode must be \'d\'')
buffer = memoryview(data)
elif type(data).__name__ == 'ndarray':
import numpy as np
if not np.issubdtype('f8', data.dtype):
raise RuntimeError('the dtype must be \'f8\' (\'float64\')')
buffer = data.data
else:
raise RuntimeError('unsupported data type')
return _spaudio_c.spWriteAudioDoubleBufferWeighted_(self._audio, buffer,
weight, offset, length)
[docs] def readframes(self, nframes, weight=1.0, arraytype='ndarray',
channelwise=False):
"""Reads and returns the next `nframes` data of a double-precision array.
Args:
nframes (int): The number of frames to read.
weight (double, optional): A weighting factor multiplied
to data after reading.
arraytype (str, optional): The type of output array. The value must
be ``'ndarray'`` (default), ``'array'``, or ``'bytearray'``.
channelwise (bool, optional): ``True`` resizes the returned ndarray
into (nframes, nchannels) matrix.
This argument is valid only in ``arraytype='ndarray'`` case.
Returns:
numpy.ndarray, array.array or bytearray: The output array object
containing read data.
Note:
This function was introduced in Version 0.7.16.
"""
if nframes <= 0:
raise RuntimeError('invalid nframes value')
length = int(nframes) * self.getnchannels()
if arraytype in ('ndarray', 'numpy.ndarray'):
data = self.createndarray(length, channelwise=channelwise)
elif arraytype in ('array', 'array.array'):
data = self.createarray(length)
elif arraytype == 'bytearray':
data = bytearray(length * 8)
else:
raise RuntimeError('unknown arraytype: %s' % arraytype)
nread = self.read(data, weight=weight)
if nread != length:
warnings.warn('The read length (%d) is different from the buffer length (%d)'
% (nread, length))
return data
[docs] def readrawframes(self, nframes, arraytype='ndarray', channelwise=False):
"""Reads and returns the next `nframes` data of a raw array.
Args:
nframes (int): The number of frames to read.
arraytype (str, optional): The type of output array. The value must
be ``'ndarray'`` (default), ``'array'``, or ``'bytearray'``.
channelwise (bool, optional): ``True`` resizes the returned ndarray
into (nframes, nchannels) matrix.
This argument is valid only in ``arraytype='ndarray'`` case.
Returns:
numpy.ndarray, array.array or bytearray: The output array object
containing read data.
Note:
This function was introduced in Version 0.7.16.
"""
if nframes <= 0:
raise RuntimeError('invalid nframes value')
length = int(nframes) * self.getnchannels()
if arraytype in ('ndarray', 'numpy.ndarray'):
data = self.createrawndarray(length, channelwise=channelwise)
elif arraytype in ('array', 'array.array'):
data = self.createrawarray(length)
elif arraytype == 'bytearray':
data = bytearray(length * self.getrawsampwidth())
else:
raise RuntimeError('unknown arraytype: %s' % arraytype)
nread = self.readraw(data)
if nread != length:
warnings.warn('The read length (%d) is different from the buffer length (%d)'
% (nread, length))
return data
[docs] def writeframes(self, data, weight=1.0):
"""Writes data of a double-precision array to the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A double-precision array to send data to the audio device.
weight (double, optional): A weighting factor multiplied
to data before writing.
Returns:
int: The written number of frames if successful, -1 otherwise.
Note:
This function was introduced in Version 0.7.16.
"""
nwrite = self.write(data, weight=weight)
if nwrite > 0:
nwrite = nwrite // self.getnchannels()
return nwrite
[docs] def writerawframes(self, data):
"""Writes data of a raw array to the audio device.
Args:
data (bytearray, array.array or numpy.ndarray):
A raw array to send data to the audio device.
Returns:
int: The written number of frames if successful, -1 otherwise.
Note:
This function was introduced in Version 0.7.16.
"""
nwrite = self.writeraw(data)
if nwrite > 0:
nwrite = nwrite // self.getnchannels()
return nwrite
[docs]def getndrivers():
"""Gets the number of drivers."""
return _spaudio_c.spGetNumAudioDriver()
[docs]def getdrivername(index):
"""Gets the name of the driver which has the index specified.
Args:
index (int): The index associated with the audio driver.
Returns:
string: A string containing the driver name.
Raises:
ValueError: If `index` is greater than or equal to
the number of drivers.
"""
if index < 0 or index >= getndrivers():
raise ValueError('0 <= index < %d is required'
% getndrivers())
name = _spaudio_c.xspGetAudioDriverName(index)
if name is not None:
global _ischarbinary
if _ischarbinary:
return name.decode(locale.getpreferredencoding())
else:
return name
else:
raise DriverError('audio driver error')
[docs]def getdriverlist():
"""Gets a list of driver names."""
alist = []
for i in range(getndrivers()):
alist.append(getdrivername(i))
return alist
[docs]def getndriverdevices(drivername=None):
"""Gets the number of devices in the driver."""
if drivername is not None:
global _ischarbinary
if _ischarbinary:
encodedname = drivername.encode(locale.getpreferredencoding())
else:
encodedname = drivername
else:
encodedname = None
return _spaudio_c.spGetNumAudioDriverDevice(encodedname)
[docs]def getdriverdevicename(index, drivername=None):
"""Gets the name of the device in the driver.
Args:
index (int): The index associated with the audio device.
drivername (str): Optional driver name.
Returns:
string: A string containing the device name.
Raises:
ValueError: If `index` is greater than or equal to
the number of devices.
"""
global _ischarbinary
if index < 0 or index >= getndriverdevices(drivername):
raise ValueError('0 <= index < %d is required'
% getndriverdevices(drivername))
if drivername is not None:
if _ischarbinary:
encodedname = drivername.encode(locale.getpreferredencoding())
else:
encodedname = drivername
else:
encodedname = None
devicename = _spaudio_c.xspGetAudioDriverDeviceName(encodedname, index)
if devicename is not None:
if _ischarbinary:
return devicename.decode(locale.getpreferredencoding())
else:
return devicename
else:
return ''
[docs]def open(mode, *, drivername=None, callback=None, deviceindex=-1, samprate=0,
sampbit=0, nchannels=0, blockingmode=-1, buffersize=0, nbuffers=0,
params=None):
"""Opens an audio device. This function may be used in a ``with`` statement.
Args:
mode (str): Device opening mode. ``'r'`` means read mode, ``'w'``
means write mode. ``'ro'`` and ``'wo'`` which mean read only
and write only modes are also supported. Although these modes
validate only the specified mode, some environments recieve
a benefit that the processing becomes faster.
In this function, ``'rw'`` which means open with ``'w'``
after ``'r'`` is also supported.
drivername (str, optional): The driver name to initialize.
callback (tuple, optional): The callback function included in
tuple which contains all arguments for
:func:`~spaudio.SpAudio.setcallback` method.
deviceindex (int, optional): The index of the audio device.
samprate (double, optional): Sample rate of the device.
sampbit (int, optional): Bits/sample of the device.
nchannels (int, optional): The number of channels of the device.
blockingmode (int, optional): 0 = blocking mode (default),
1 = nonblocking mode.
buffersize (int, optional): The buffer size of the device.
nbuffers (int, optional): The number of buffers of the device.
params (dict, optional): A dict object which can contain any
above parameters from `samprate` to `nbuffers`.
Returns:
SpAudio: A new instance of :class:`~spaudio.SpAudio` class.
Note:
This function was introduced in Version 0.7.15.
"""
audio = SpAudio(drivername)
if mode[0] == 'r':
modefirst = 'ro' if mode[1] == 'o' else 'r'
elif mode[0] == 'w':
modefirst = 'wo' if mode[1] == 'o' else 'w'
else:
raise RuntimeError('unknown open mode: %s' % mode)
modesecond = None
if mode[0] == 'w':
if mode[1] == 'r':
modesecond = 'r'
elif mode[1] != 'o':
raise RuntimeError('unknown open mode: %s' % mode)
elif mode[0] == 'r':
if mode[1] == 'w':
modesecond = 'w'
elif mode[1] != 'o':
raise RuntimeError('unknown open mode: %s' % mode)
audio.open(modefirst, callback=callback, deviceindex=deviceindex,
samprate=samprate, sampbit=sampbit, nchannels=nchannels,
blockingmode=blockingmode, buffersize=buffersize,
nbuffers=nbuffers, params=params)
if modesecond is not None:
audio.open(modesecond)
return audio
if __name__ == '__main__':
driverlist = getdriverlist()
for i in range(getndrivers()):
print('Driver %d: %s' % (i, driverlist[i]))
for j in range(getndriverdevices(driverlist[i])):
print(' Device %d: %s'
% (j, getdriverdevicename(j, driverlist[i])))