"""Reliable low-latency audio playback and recording.
https://python-rtmixer.readthedocs.io/
"""
__version__ = '0.1.4'
import sounddevice as _sd
from pa_ringbuffer import init as _init_ringbuffer
from _rtmixer import ffi as _ffi, lib as _lib
RingBuffer = _init_ringbuffer(_ffi, _lib)
# Get constants from C library
for _k, _v in vars(_lib).items():
if _k.isupper():
globals()[_k] = _v
class _Base(_sd._StreamBase):
"""Base class for Mixer et al."""
def __init__(self, kind, qsize=16, **kwargs):
callback = _ffi.addressof(_lib, 'callback')
self._action_q = RingBuffer(_ffi.sizeof('struct action*'), qsize)
self._result_q = RingBuffer(_ffi.sizeof('struct action*'), qsize)
self._state = _ffi.new('struct state*', dict(
input_channels=0,
output_channels=0,
samplerate=0,
action_q=self._action_q._ptr,
result_q=self._result_q._ptr,
actions=_ffi.NULL,
))
_sd._StreamBase.__init__(
self, kind=kind, dtype='float32',
callback=callback, userdata=self._state, **kwargs)
self._state.samplerate = self.samplerate
self._actions = {}
self._temp_action_ptr = _ffi.new('struct action**')
@property
def actions(self):
"""The set of active "actions"."""
self._drain_result_q()
return self._actions.keys()
@property
def stats(self):
"""Get over-/underflow statistics from an *inactive* stream.
To get statistics from an :attr:`~sounddevice.Stream.active`
stream, use `fetch_and_reset_stats()`.
"""
if self.active:
raise RuntimeError('Accessing .stats on an active stream')
return _ffi.new('struct stats*', self._state.stats)
def cancel(self, action, time=0, allow_belated=True):
"""Initiate stopping a running action.
This creates another action that is sent to the callback in
order to stop the given *action*.
This function typically returns before the *action* is actually
stopped. Use `wait()` (on either one of the two actions) to
wait until it's done.
"""
cancel_action = _ffi.new('struct action*', dict(
type=CANCEL,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=time,
action=action,
))
self._enqueue(cancel_action)
return cancel_action
def fetch_and_reset_stats(self, time=0, allow_belated=True):
"""Fetch and reset over-/underflow statistics of the stream.
The statistics will be available in the ``stats`` field of the
returned action.
"""
action = _ffi.new('struct action*', dict(
type=FETCH_AND_RESET_STATS,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=time,
))
self._enqueue(action)
return action
def wait(self, action=None, sleeptime=10):
"""Wait for *action* to be finished.
Between repeatedly checking if the action is finished, this
waits for *sleeptime* milliseconds.
If no *action* is given, this waits for all actions.
"""
if action is None:
while self.actions:
_sd.sleep(sleeptime)
else:
while action in self.actions:
_sd.sleep(sleeptime)
def _check_channels(self, channels, kind):
"""Check if number of channels or mapping was given."""
assert kind in ('input', 'output')
try:
channels, mapping = len(channels), channels
except TypeError:
mapping = tuple(range(1, channels + 1))
max_channels = _sd._split(self.channels)[kind == 'output']
if max(mapping) > max_channels:
raise ValueError('Channel number too large')
if min(mapping) < 1:
raise ValueError('Channel numbers start with 1')
return channels, mapping
def _enqueue(self, action, keep_alive=None):
self._drain_result_q()
self._temp_action_ptr[0] = action
ret = self._action_q.write(self._temp_action_ptr)
if ret != 1:
raise RuntimeError('Action queue is full')
assert action not in self._actions
self._actions[action] = keep_alive
def _drain_result_q(self):
"""Get actions from the result queue and discard them."""
while self._result_q.readinto(self._temp_action_ptr):
try:
del self._actions[self._temp_action_ptr[0]]
except KeyError:
assert False
[docs]class Mixer(_Base):
"""PortAudio output stream for realtime mixing.
Takes the same keyword arguments as `sounddevice.OutputStream`,
except *callback* (a callback function implemented in C is used
internally) and *dtype* (which is always ``'float32'``).
Uses default values from `sounddevice.default` (except *dtype*,
which is always ``'float32'``).
Has the same methods and attributes as `sounddevice.OutputStream`
(except :meth:`~sounddevice.Stream.write` and
:attr:`~sounddevice.Stream.write_available`), plus the following:
"""
def __init__(self, **kwargs):
_Base.__init__(self, kind='output', **kwargs)
self._state.output_channels = self.channels
[docs] def play_buffer(self, buffer, channels, start=0, allow_belated=True):
"""Send a buffer to the callback to be played back.
After calling this, the *buffer* must not be written to anymore.
"""
channels, mapping = self._check_channels(channels, 'output')
buffer = _ffi.from_buffer(buffer)
_, samplesize = _sd._split(self.samplesize)
action = _ffi.new('struct action*', dict(
type=PLAY_BUFFER,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=start,
buffer=_ffi.cast('float*', buffer),
total_frames=len(buffer) // channels // samplesize,
channels=channels,
mapping=mapping,
))
self._enqueue(action, keep_alive=buffer)
return action
[docs] def play_ringbuffer(self, ringbuffer, channels=None, start=0,
allow_belated=True):
"""Send a `RingBuffer` to the callback to be played back.
By default, the number of channels is obtained from the ring
buffer's :attr:`~RingBuffer.elementsize`.
"""
_, samplesize = _sd._split(self.samplesize)
if channels is None:
channels = ringbuffer.elementsize // samplesize
channels, mapping = self._check_channels(channels, 'output')
if ringbuffer.elementsize != samplesize * channels:
raise ValueError('Incompatible elementsize')
action = _ffi.new('struct action*', dict(
type=PLAY_RINGBUFFER,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=start,
ringbuffer=ringbuffer._ptr,
total_frames=ULONG_MAX,
channels=channels,
mapping=mapping,
))
self._enqueue(action, keep_alive=ringbuffer)
return action
[docs]class Recorder(_Base):
"""PortAudio input stream for realtime recording.
Takes the same keyword arguments as `sounddevice.InputStream`,
except *callback* (a callback function implemented in C is used
internally) and *dtype* (which is always ``'float32'``).
Uses default values from `sounddevice.default` (except *dtype*,
which is always ``'float32'``).
Has the same methods and attributes as `Mixer`, except that
`play_buffer()` and `play_ringbuffer()` are replaced by:
"""
def __init__(self, **kwargs):
_Base.__init__(self, kind='input', **kwargs)
self._state.input_channels = self.channels
[docs] def record_buffer(self, buffer, channels, start=0, allow_belated=True):
"""Send a buffer to the callback to be recorded into.
"""
channels, mapping = self._check_channels(channels, 'input')
buffer = _ffi.from_buffer(buffer)
samplesize, _ = _sd._split(self.samplesize)
action = _ffi.new('struct action*', dict(
type=RECORD_BUFFER,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=start,
buffer=_ffi.cast('float*', buffer),
total_frames=len(buffer) // channels // samplesize,
channels=channels,
mapping=mapping,
))
self._enqueue(action, keep_alive=buffer)
return action
[docs] def record_ringbuffer(self, ringbuffer, channels=None, start=0,
allow_belated=True):
"""Send a `RingBuffer` to the callback to be recorded into.
By default, the number of channels is obtained from the ring
buffer's :attr:`~RingBuffer.elementsize`.
"""
samplesize, _ = _sd._split(self.samplesize)
if channels is None:
channels = ringbuffer.elementsize // samplesize
channels, mapping = self._check_channels(channels, 'input')
if ringbuffer.elementsize != samplesize * channels:
raise ValueError('Incompatible elementsize')
action = _ffi.new('struct action*', dict(
type=RECORD_RINGBUFFER,
actual_time=-1.0 if allow_belated else 0.0,
requested_time=start,
ringbuffer=ringbuffer._ptr,
total_frames=ULONG_MAX,
channels=channels,
mapping=mapping,
))
self._enqueue(action, keep_alive=ringbuffer)
return action
[docs]class MixerAndRecorder(Mixer, Recorder):
"""PortAudio stream for realtime mixing and recording.
Takes the same keyword arguments as `sounddevice.Stream`, except
*callback* (a callback function implemented in C is used internally)
and *dtype* (which is always ``'float32'``).
Uses default values from `sounddevice.default` (except *dtype*,
which is always ``'float32'``).
Inherits all methods and attributes from `Mixer` and `Recorder`.
"""
def __init__(self, **kwargs):
_Base.__init__(self, kind='duplex', **kwargs)
self._state.input_channels = self.channels[0]
self._state.output_channels = self.channels[1]