fix: vendor DAVE-compatible voice recv build

This commit is contained in:
2026-06-08 06:58:18 +00:00
parent b6223382dd
commit eecd129656
27 changed files with 3977 additions and 58 deletions
+132
View File
@@ -0,0 +1,132 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
.vscode/
*.code-*
+21
View File
@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2015-present Imayhaveborkedit
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+230
View File
@@ -0,0 +1,230 @@
![PyPI - Version](https://img.shields.io/pypi/v/discord-ext-voice-recv?color=dodgerblue&link=https%3A%2F%2Fpypi.org%2Fproject%2Fdiscord-ext-voice-recv%2F)
# discord-ext-voice-recv
Voice receive extension package for discord.py
## Warning
**This extension should be more or less functional, but the code is not yet feature complete. No guarantees are given for stability or random breaking changes.**
See the [update notes](update_notes.md) for a poor excuse for a changelog.
## Installing
**Python 3.8 or higher is required**, preferably at least 3.11 or whatever is latest
```
python -m pip install discord-ext-voice-recv
```
To install directly from github:
```
python -m pip install git+https://github.com/imayhaveborkedit/discord-ext-voice-recv
```
Naturally, this extension depends on `discord.py` being installed with voice support (`pynacl`).
## Example
See the [example script](examples/recv.py).
## Feature overview
### Custom VoiceProtocol client
No monkey patching or bizarre hacks required. Simply use the library feature to use `VoiceRecvClient` as the voice client class. See [Usage](#usage).
### New events
This extension adds the unimplemented voice websocket events and three virtual events. See [New Events](#new-events).
### Speaking state
It is now possible to determine if a member is speaking or not, using `VoiceRecvClient.get_speaking()`, or using the speaking events inside an `AudioSink`.
### Simple and familiar API
The overall API is designed to mirror the discord.py voice send API, with `AudioSink` being the counterpart to the existing `AudioSource`. See [Sinks](#sinks).
### Convenient included utilities
Batteries included in the form of useful built in `AudioSinks`. Some to match their `AudioSource` counterpart, some I merely considered useful. See... uh... TODO.
### Optional extras
Slightly more complex included batteries that depend on external packages. These live in `voice_recv.extras`. They can be installed by adding their optional dependency during install, ex: `pip install discord-ext-voice-recv[extras_thing]`, or all of them can be installed by specifying `extras` instead. See [Extras](#extras).
### More or less typed
It's probably fine.
## Usage
### VoiceRecvClient
The class `voice_recv.VoiceRecvClient` must be used in `VoiceChannel.connect()` to enable voice receive functionality.
```python
from discord.ext import voice_recv
voice_client = await voice_channel.connect(cls=voice_recv.VoiceRecvClient)
```
### New voice client functions
```python
def listen(sink: voice_recv.AudioSink, *, after=None) -> None
```
Receives audio data into an `AudioSink`. A sink is similar to the `AudioSource` class, where most of the logic is done in a single callback function, but in reverse. Sinks are explained in detail in the [Sinks](#sinks) section below.
The finalizer, `after` is called after the sink has been exhausted or an error occurred. The callback signature is the same as the after callback for `play()`, one parameter for an optional Exception object.
```python
def is_listening() -> bool
```
Returns `True` if the voice client is currently receiving audio. Specifically, if the bot is reading from the voice socket.
```python
def stop() -> None
```
This function now stops both receiving and sending of audio.
```python
def stop_listening() -> None
```
Stops receiving audio.
```python
def stop_playing() -> None
```
Stops playing audio. This function is identical to `discord.VoiceClient.stop()`.
```python
def get_speaking(member: discord.Member | discord.User) -> bool | None
```
Gets the speaking state (voice activity, the green circle) of a member. User is typed in for convenience. Returns None if the member was not found.
## Sinks
The API of this extension is designed to mirror the discord.py voice send API. Sending audio uses the `AudioSource` class, while receiving audio uses the `AudioSink` class. A sink is designed to be the inverse of a source. Essentially, a source is a callback called by discord.py to produce a chunk of audio data. Conversely, a sink is a callback called by the library to handle a chunk of audio. Sinks can be composed in the same fashion as sources, creating an audio processing pipeline. Sources and sinks can even combined into one object to handle both tasks, such as creating a feedback loop.
Special care should be taken not to write excessively computationally expensive code, as python is not particularly well suited to real-time audio processing.
Due to voice receive being somewhat more complex than voice sending, sinks have additional functionality compared to sources. However, the core sink functions should look relatively familiar.
```python
class MySink(voice_recv.AudioSink):
def __init__(self):
super().__init__()
def wants_opus(self) -> bool:
return False
def write(self, user: User | Member | None, data: VoiceData):
...
def cleanup(self):
...
```
These are the main functions of a sink, names and purpose reflecting that of their source counterparts. It is important to note that `super().__init__()` must be called when inheriting from `AudioSink`, in contrast to `AudioSource` which does not have a default `__init__` function.
- The `wants_opus()` function determines if the sink should receive opus packets or decoded PCM packets. Care should be taken not to unintentionally mix sinks that want different types.
- The `write()` function is the main callback, where the sink logic takes place. In a sink pipeline, this could alter, inspect, or log a packet, and then write it to a child sink. `VoiceData` is a simple container class with attributes for the origin member, opus data, optionally pcm data, and raw audio packet.
- The `cleanup()` function is identical to `AudioSource.cleanup()`, a finalizer to cleanup any loose ends when the sink has finished its job.
Additionally, sinks also have properties for their `client` and `voice_client`, as well as `parent` and `child`/`children` sinks.
### Built in Sinks
This extension comes with several useful built in sinks, as well as a few [extras](#extras) mentioned later. For a more information, you will have to [source dive](discord/ext/voice_recv/sinks.py) for now.
- `AudioSink` - The base class for most sinks, similar in purpose to the discord.py `AudioSource`.
- `MultiAudioSink` - A sink that supports writing to multiple destination sinks. Has no subclass implementations currently. Generally intended to be extended by the user.
- `BasicSink` - A simple sink that operates based on a user provided callback. Useful for testing or simple tasks not performed by other sinks.
- `WaveSink` - Writes audio data to a .wav file. It does not fill in silence or mix audio from multiple users on its own. `WavSink` is an alias for this sink.
- `FFmpegSink` - Uses ffmpeg to convert the audio stream to an arbitrary format, or whatever else ffmpeg can do to it. Requires ffmpeg, but you should already have it working for discord.py.
- `PCMVolumeTransformer` - The AudioSink analog to the discord.py AudioSource version. Does exactly the same thing: controls the volume.
- `ConditionalFilter` - Filters audio data based on a given predicate. If the predicate fails for a packet, it is not written to the destination sink.
- `UserFilter` - A conditional filter to check if data is from a given user.
- `TimedFilter` - A conditional filter with a timer for how long it should operate.
- `SilenceGeneratorSink` - Generates silence to fill in audio transmission downtime for a continuous data stream. **Note: This sink is pretty broken and buggy right now and slated for rewrite. Usage is not advised.**
### Sink event listeners
With AudioSinks being potentially more complex and stateful than AudioSources and the addition of new events, it is sometimes necessary to handle events in the context of a sink. It would be rather awkward to have to register a sink function with `commands.Bot.add_listener()` while dealing with thread safety, and even more so using `discord.Client`. To remedy this, listeners can be defined within sinks, similarly to how they work in Cogs.
```python
class MySink(AudioSink):
@AudioSink.listener()
def on_voice_member_disconnect(self, member: discord.Member, ssrc: int | None):
print(f"{member} has disconnected")
self.do_something_like_handle_disconnect(ssrc)
```
Note that these functions must be sync functions, as they are dispatched from a thread. Trying to use an async function will result in an error. This restriction only applies to sink listeners, and normal async event listeners will function as per usual. The event listener dispatch thread is different from the one used to dispatch the `write()` callback so potential thread safety issues should be considered. A decorator argument to run the event callback in the other thread *may* be added later.
## New events
```python
async def on_voice_member_speaking_state(member: discord.Member, ssrc: int, state: SpeakingState | int)
```
First and foremost, this event does **NOT** refer to the speaking indicator in discord (the green circle). For voice activity, see `on_voice_member_speaking_start`.
This event is fired when the speaking state (speaking mode) of a member changes. This happens when:
- A member first speaks (transmits audio) in a voice, but only once per session
- A member activates or deactivates priority speaker mode
This event is fired once initially to reveal the ssrc of a member, an identifier to map packets to their originating member. Any packets received from this member before this event fires can (probably) be safely ignored since they are likely just silence packets.
```python
async def on_voice_member_connect(member: discord.Member)
```
Called when a member connects to a voice channel. Also called on initial connection for every member in the channel.
```python
async def on_voice_member_disconnect(member: discord.Member, ssrc: int | None)
```
Called when a member disconnects from a voice channel. The `ssrc` parameter is the unique id a member has to identify which packets belong to them. This is useful when using custom sinks, particularly those that handle packets from multiple members.
```python
async def on_voice_member_video(member: discord.Member, data: voice_recv.VoiceVideoStreams)
```
Called when a member in voice channel toggles their webcam on or off, NOT screenshare. Screenshare status is only indicated in the `self_video` attribute of `discord.VoiceState`.
```python
async def on_voice_member_flags(member: discord.Member, flags: voice_recv.VoiceFlags)
```
An undocumented event dispatched when a member joins a voice channel containing a flags bitfield. Also called on initial connection for every member in the channel.
Flags:
- `VoiceFlags.clips_enabled`: User has [clips](https://support.discord.com/hc/en-us/articles/16861982215703-Clips) enabled
- `VoiceFlags.allow_voice_recording`: User has consented to their voice being clipped
- `VoiceFlags.allow_any_viewer_clips`: User has consented to stream viewers clipping them
```python
async def on_voice_member_platform(member: discord.Member, platform: voice_recv.VoicePlatform | None)
```
An undocumented event dispatched when a member joins a voice channel containing the member's platform. Also called on initial connection for every member in the channel.
Values:
- `VoicePlatform.desktop`
- `VoicePlatform.mobile`
- `VoicePlatform.xbox`
- `VoicePlatform.playstation`
```python
def on_rtcp_packet(packet: RTCPPacket, guild: discord.Guild)
```
A virtual event for when an RTCP packet is received. This event only works inside of sinks, so it cannot be async.
```python
def on_voice_member_speaking_start(member: discord.Member)
def on_voice_member_speaking_stop(member: discord.Member)
```
Virtual events for the state of the speaking indicator (the green circle). These events are synthesized from packet activity and may not exactly match what is displayed in the discord client. Due to performance issues with asyncio, this event is sink only and cannot be async.
## Extras
### `voice_recv.extras.speechrecognition`
- Optional dependency: `extras_speech`
- Requires package: `SpeechRecognition`
- Provides: `SpeechRecognitionSink`
A helper sink for using `SpeechRecognition` to perform speech-to-text conversion. Generally depends on third party services for reasonable quality. Results may vary.
### `voice_recv.extras.localplayback`
- Optional dependency: `extras_local`
- Requires package: `pyaudio`
- Provides: `LocalPlaybackSink`, `SimpleLocalPlaybackSink`
Helper sinks for playing audio through an audio output device the local system. Defaults to the system default device, but other output devices can also be specified.
## Currently missing or WIP features
- Silence generation (WIP, pending rewrite)
## Future plans
- Muxer AudioSink (mixes multiple audio streams into a single stream)
- Rust implementations of some components for improved performance
- Alternative voice client implementation with a minimal interface intended for use with external data processing
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from .voice_client import *
from .reader import *
from .sinks import *
from .video import *
from .opus import *
from .rtp import *
from .enums import *
from . import (
rtp as rtp,
extras as extras,
)
__title__ = 'discord.ext.voice_recv'
__author__ = 'Imayhaveborkedit'
__license__ = 'MIT'
__copyright__ = 'Copyright 2021-present Imayhaveborkedit'
__version__ = '0.5.3a'
@@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import heapq
import logging
import threading
from .utils import gap_wrapped, add_wrapped
from typing import (
TYPE_CHECKING,
Protocol,
TypeVar,
)
from .rtp import _PacketCmpMixin
if TYPE_CHECKING:
from typing import Optional, List
from .rtp import AudioPacket
__all__ = [
'HeapJitterBuffer',
]
_T = TypeVar('_T')
PacketT = TypeVar('PacketT', bound=_PacketCmpMixin)
log = logging.getLogger(__name__)
class Buffer(Protocol[_T]):
"""The base class representing a simple buffer with no extra features."""
# fmt: off
def __len__(self) -> int: ...
def push(self, item: _T) -> None: ...
def pop(self) -> Optional[_T]: ...
def peek(self) -> Optional[_T]: ...
def flush(self) -> List[_T]: ...
def reset(self) -> None: ...
# fmt: on
class BaseBuffer(Buffer[PacketT]):
"""A basic buffer."""
def __init__(self):
self._buffer: List[PacketT] = []
def __len__(self) -> int:
return len(self._buffer)
def push(self, item: PacketT) -> None:
self._buffer.append(item)
def pop(self) -> Optional[PacketT]:
return self._buffer.pop()
def peek(self) -> Optional[PacketT]:
return self._buffer[-1] if self._buffer else None
def flush(self) -> List[PacketT]:
buf = self._buffer.copy()
self._buffer.clear()
return buf
def reset(self) -> None:
self._buffer.clear()
class HeapJitterBuffer(BaseBuffer[PacketT]):
"""Push item in, pop items out"""
_threshold: int = 10000
def __init__(self, maxsize: int = 10, *, prefsize: int = 1, prefill: int = 1):
if maxsize < 1:
raise ValueError(f'maxsize ({maxsize}) must be greater than 0')
if not 0 <= prefsize <= maxsize:
raise ValueError(f'prefsize must be between 0 and maxsize ({maxsize})')
self.maxsize: int = maxsize
self.prefsize: int = prefsize
self.prefill: int = prefill
self._prefill: int = prefill
self._last_tx_seq: int = -1
self._has_item: threading.Event = threading.Event()
# I sure hope I dont need to add a lock to this
self._buffer: List[AudioPacket] = []
def _push(self, packet: AudioPacket) -> None:
heapq.heappush(self._buffer, packet)
def _pop(self) -> AudioPacket:
return heapq.heappop(self._buffer)
def _get_packet_if_ready(self) -> Optional[AudioPacket]:
return self._buffer[0] if len(self._buffer) > self.prefsize else None
def _pop_if_ready(self) -> Optional[AudioPacket]:
return self._pop() if len(self._buffer) > self.prefsize else None
def _update_has_item(self) -> None:
prefilled = self._prefill == 0
packet_ready = len(self._buffer) > self.prefsize
if not prefilled or not packet_ready:
self._has_item.clear()
return
next_packet = self._buffer[0]
sequential = add_wrapped(self._last_tx_seq, 1) == next_packet.sequence
positive_seq = self._last_tx_seq >= 0
# We have the next packet ready
# OR we havent sent a packet out yet
# OR the buffer is full
if (sequential and positive_seq) or not positive_seq or len(self._buffer) >= self.maxsize:
self._has_item.set()
else:
self._has_item.clear()
def _cleanup(self) -> None:
# Logging this is pointless until I fix the stale remote buffer issue
# if len(self._buffer) > self.maxsize:
# log.debug("Buffer overfilled: %s > %s", len(self._buffer), self.maxsize)
# drop oldest packets if buffer overfilled
while len(self._buffer) > self.maxsize:
packet = heapq.heappop(self._buffer)
# log.debug("Dropped extra packet %s", packet)
def push(self, packet: AudioPacket) -> bool:
"""
Push a packet into the buffer. If the packet would make the buffer
exceed its maxsize, the oldest packet will be dropped.
"""
seq = packet.sequence
# for the gap between _last_tx_seq and the current one, a large gap is old, a small gap is new
# the gap for old packets will generally be very large since they wrap all the way around
if gap_wrapped(self._last_tx_seq, seq) > self._threshold and self._last_tx_seq != -1:
log.debug("Dropping old packet %s", packet)
return False
self._push(packet)
if self._prefill > 0:
self._prefill -= 1
self._cleanup()
self._update_has_item()
return True
def pop(self, *, timeout: float | None = 0) -> Optional[AudioPacket]:
"""
If timeout is a positive number, wait as long as timeout for a packet
to be ready and return that packet, otherwise return None.
"""
ok = self._has_item.wait(timeout)
if not ok:
return None
if self._prefill > 0:
return None
# This function should actually be redundant but i'll leave it for now
packet = self._pop_if_ready()
if packet is not None:
self._last_tx_seq = packet.sequence
self._update_has_item()
return packet
def peek(self, *, all: bool = False) -> Optional[AudioPacket]:
"""
Returns the next packet in the buffer only if it is ready, meaning it can
be popped. When `all` is set to True, it returns the next packet, if any.
"""
if not self._buffer:
return None
if all:
return self._buffer[0]
else:
return self._get_packet_if_ready()
def peek_next(self) -> Optional[AudioPacket]:
"""
Returns the next packet in the buffer only if it is sequential.
"""
packet = self.peek(all=True)
if packet is None:
return
if packet.sequence == add_wrapped(self._last_tx_seq, 1) or self._last_tx_seq < 0:
return packet
def gap(self) -> int:
"""
Returns the number of missing packets between the last packet to be
popped and the currently held next packet. Returns 0 otherwise.
"""
if self._buffer and self._last_tx_seq > 0:
return gap_wrapped(self._last_tx_seq, self._buffer[0].sequence)
return 0
def flush(self) -> List[AudioPacket]:
"""
Return all remaining packets.
"""
packets = sorted(self._buffer)
self._buffer.clear()
if packets:
self._last_tx_seq = packets[-1].sequence
self._prefill = self.prefill
self._has_item.clear()
return packets
def reset(self) -> None:
"""
Clear buffer and reset internal counters.
"""
self._buffer.clear()
self._has_item.clear()
self._prefill = self.prefill
self._last_tx_seq = -1
@@ -0,0 +1,30 @@
from discord.flags import BaseFlags, fill_with_flags, flag_value
from discord.enums import Enum
__all__ = (
'VoiceFlags',
'VoicePlatform',
)
@fill_with_flags()
class VoiceFlags(BaseFlags):
__slots__ = ()
@flag_value
def clips_enabled(self):
return 1 << 0
@flag_value
def allow_voice_recording(self):
return 1 << 1
@flag_value
def allow_any_viewer_clips(self):
return 1 << 2
class VoicePlatform(Enum):
desktop = 0
mobile = 1
xbox = 2
playstation = 3
@@ -0,0 +1,2 @@
from . import speechrecognition
from . import localplayback
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from ..sinks import AudioSink
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..opus import VoiceData
from ..types import MemberOrUser
__all__ = [
'LocalPlaybackSink',
'SimpleLocalPlaybackSink',
]
log = logging.getLogger(__name__)
try:
import pyaudio
except ImportError:
def __getattr__(name: str):
if name in __all__:
raise RuntimeError('The pyaudio module is required to use this sink.')
else:
if TYPE_CHECKING:
from typing import Optional, Dict
from discord import Member
PyAudioStream = pyaudio._Stream
class _BaseLocalPlaybackSink(AudioSink):
pa: pyaudio.PyAudio = None # type: ignore
def __init__(self, output_device_id: Optional[int] = None, *, py_audio: Optional[pyaudio.PyAudio] = None):
self._init_pa(py_audio)
if output_device_id is None:
output_device_id = self.pa.get_default_output_device_info().get("index") # type: ignore
self.output_device_id = output_device_id
@classmethod
def _init_pa(cls, pa: Optional[pyaudio.PyAudio]) -> None:
if pa is None:
if cls.pa is None:
cls.pa = pyaudio.PyAudio()
else:
if cls.pa is None:
cls.pa = pa
elif cls.pa is not pa:
raise RuntimeError("Conflicting PyAudio objects")
def write(self, user: Optional[MemberOrUser], data: VoiceData) -> None:
raise NotImplementedError
def wants_opus(self) -> bool:
return False
@classmethod
def terminate_pyaudio(cls):
"""Call this when you are completely done using all instances of LocalPlayback sinks."""
cls.pa.terminate()
cls.pa = None # type: ignore
class SimpleLocalPlaybackSink(_BaseLocalPlaybackSink):
"""
A simplified version of LocalPlaybackSink that only supports one stream of audio.
Convenient for when you have already isolated a single member's audio.
"""
def __init__(self, output_device_id: Optional[int] = None, *, py_audio: Optional[pyaudio.PyAudio] = None):
super().__init__(output_device_id, py_audio=py_audio)
self._stream: PyAudioStream = self.pa.open(
rate=48000,
channels=2,
format=pyaudio.paInt16,
output=True,
output_device_index=output_device_id,
)
def write(self, user: Optional[MemberOrUser], data: VoiceData) -> None:
self._stream.write(data.pcm)
def cleanup(self) -> None:
self._stream.close()
class LocalPlaybackSink(_BaseLocalPlaybackSink):
"""
An AudioSink for playing received audio directly to one of the system's audio output devices using PyAudio.
This sink can handle playback of multiple users' audio without additional stream mixing beforehand.
The `output_device_id` parameter defaults to the system's default audio device, and can otherwise be
acquired via PyAudio functions. A specific `PyAudio` instance can also be passed to use a specific instance.
"""
def __init__(self, output_device_id: Optional[int] = None, *, py_audio: Optional[pyaudio.PyAudio] = None):
super().__init__(output_device_id, py_audio=py_audio)
self._streams: Dict[int, PyAudioStream] = {}
def _get_stream(self, user: MemberOrUser) -> PyAudioStream:
stream = self._streams.get(user.id)
if stream is None:
stream = self._streams[user.id] = self.pa.open(
rate=48000,
channels=2,
format=pyaudio.paInt16,
output=True,
output_device_index=self.output_device_id,
)
return stream
def write(self, user: Optional[MemberOrUser], data: VoiceData) -> None:
if user:
self._get_stream(user).write(data.pcm)
def cleanup(self) -> None:
for stream in tuple(self._streams.values()):
stream.close()
@AudioSink.listener()
def on_voice_member_disconnect(self, member: Member, ssrc: Optional[int]) -> None:
stream = self._streams.pop(member.id, None)
if stream:
stream.close()
@@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from ..sinks import AudioSink
log = logging.getLogger(__name__)
__all__ = [
'SpeechRecognitionSink',
]
try:
import speech_recognition as sr
except ImportError:
def __getattr__(name: str):
if name in __all__:
raise RuntimeError('The SpeechRecognition module is required to use this sink.')
else:
import time
import array
import asyncio
import audioop
from collections import defaultdict
from ..rtp import SilencePacket
from typing import TYPE_CHECKING, TypedDict
if TYPE_CHECKING:
from concurrent.futures import Future as CFuture
from typing import Literal, Callable, Optional, Any, Final, Protocol, Awaitable, TypeVar
from discord import Member
from ..opus import VoiceData
from ..types import MemberOrUser as User
T = TypeVar('T')
# [r.split('_', 1)[1] for r in dir(sr.Recognizer()) if r.startswith("recognize")]
SRRecognizerMethod = Literal[
'amazon',
'api',
'assemblyai',
'azure',
'bing',
'faster_whisper',
'google',
'google_cloud',
'groq',
'houndify',
'ibm',
'lex',
'openai',
'sphinx',
'tensorflow',
'vosk',
'whisper',
'wit',
]
class SRStopper(Protocol):
def __call__(self, wait: bool = True, /) -> None: ...
SRProcessDataCB = Callable[[sr.Recognizer, sr.AudioData, User], Optional[str]]
SRTextCB = Callable[[User, str], Any]
class _StreamData(TypedDict):
stopper: Optional[SRStopper]
recognizer: sr.Recognizer
buffer: array.array[int]
class SpeechRecognitionSink(AudioSink):
def __init__(
self,
*,
process_cb: Optional[SRProcessDataCB] = None,
text_cb: Optional[SRTextCB] = None,
default_recognizer: SRRecognizerMethod = 'google',
phrase_time_limit: int = 10,
ignore_silence_packets: bool = True,
):
super().__init__(None)
self.process_cb: Optional[SRProcessDataCB] = process_cb
self.text_cb: Optional[SRTextCB] = text_cb
self.phrase_time_limmit: int = phrase_time_limit
self.ignore_silence_packets: bool = ignore_silence_packets
self.default_recognizer: SRRecognizerMethod = default_recognizer
self._stream_data: defaultdict[int, _StreamData] = defaultdict(
lambda: _StreamData(stopper=None, recognizer=sr.Recognizer(), buffer=array.array('B'))
)
def _await(self, coro: Awaitable[T]) -> CFuture[T]:
assert self.client is not None
return asyncio.run_coroutine_threadsafe(coro, self.client.loop) # type: ignore
def wants_opus(self) -> bool:
return False
def write(self, user: Optional[User], data: VoiceData) -> None:
if self.ignore_silence_packets and isinstance(data.packet, SilencePacket):
return
if user is None:
return
sdata = self._stream_data[user.id]
sdata['buffer'].extend(data.pcm)
if not sdata['stopper']:
sdata['stopper'] = sdata['recognizer'].listen_in_background(
DiscordSRAudioSource(sdata['buffer']), self.background_listener(user), self.phrase_time_limmit
)
def background_listener(self, user: User):
process_cb = self.process_cb or self.get_default_process_callback()
text_cb = self.text_cb or self.get_default_text_callback()
def callback(_recognizer: sr.Recognizer, _audio: sr.AudioData):
output = process_cb(_recognizer, _audio, user)
if output is not None:
text_cb(user, output)
return callback
def get_default_process_callback(self) -> SRProcessDataCB:
def cb(recognizer: sr.Recognizer, audio: sr.AudioData, user: Optional[User]) -> Optional[str]:
log.debug("Got %s, %s, %s", audio, audio.sample_rate, audio.sample_width)
text: Optional[str] = None
try:
# they changed recognize_google to be optionally assigned at runtime...
func = getattr(recognizer, 'recognize_' + self.default_recognizer, recognizer.recognize_google) # type: ignore
text = func(audio)
except sr.UnknownValueError:
log.debug("Bad speech chunk")
# self._debug_audio_chunk(audio)
return text
return cb
def get_default_text_callback(self) -> SRTextCB:
def cb(user: Optional[User], text: Optional[str]) -> Any:
log.info("%s said: %s", user.display_name if user else 'Someone', text)
return cb
@AudioSink.listener()
def on_voice_member_disconnect(self, member: Member, ssrc: Optional[int]) -> None:
if member is not None:
self._drop(member.id)
def cleanup(self) -> None:
for user_id in tuple(self._stream_data.keys()):
self._drop(user_id)
def _drop(self, user_id: int) -> None:
data = self._stream_data.pop(user_id, None)
if data is None:
log.debug("Cannot drop user id: %s, no data", user_id)
return
stopper = data.get('stopper')
if stopper:
stopper()
buffer = data.get('buffer')
if buffer:
# arrays don't have a clear function
del buffer[:]
def _debug_audio_chunk(self, audio: sr.AudioData, filename: str = 'sound.wav') -> None:
import io, wave, discord
with io.BytesIO() as b:
with wave.open(b, 'wb') as writer:
writer.setframerate(48000)
writer.setsampwidth(2)
writer.setnchannels(2)
writer.writeframes(audio.get_wav_data())
b.seek(0)
f = discord.File(b, filename)
self._await(self.voice_client.channel.send(file=f)) # type: ignore
class DiscordSRAudioSource(sr.AudioSource):
little_endian: Final[bool] = True
SAMPLE_RATE: Final[int] = 48_000
SAMPLE_WIDTH: Final[int] = 2
CHANNELS: Final[int] = 2
CHUNK: Final[int] = 960
def __init__(self, buffer: array.array[int]):
self.buffer = buffer
self._entered: bool = False
@property
def stream(self):
return self
def __enter__(self):
if self._entered:
log.warning('Already entered sr audio source')
self._entered = True
return self
def __exit__(self, *exc) -> None:
self._entered = False
if any(exc):
log.exception('Error closing sr audio source')
def read(self, size: int) -> bytes:
# TODO: make this timeout configurable
for _ in range(10):
if len(self.buffer) < size * self.CHANNELS:
time.sleep(0.1)
else:
break
else:
if len(self.buffer) == 0:
return b''
chunksize = size * self.CHANNELS
audiochunk = self.buffer[:chunksize].tobytes()
del self.buffer[: min(chunksize, len(audiochunk))]
audiochunk = audioop.tomono(audiochunk, 2, 1, 1)
return audiochunk
def close(self) -> None:
self.buffer.clear()
@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from discord.enums import SpeakingState, try_enum
from .enums import VoiceFlags, VoicePlatform
from .video import VoiceVideoStreams
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from typing import Dict, Any
from discord.gateway import DiscordVoiceWebSocket
from .voice_client import VoiceRecvClient
from .video import VoiceVideoPayload
log = logging.getLogger(__name__)
# https://cdn.discordapp.com/attachments/381887113391505410/1094473412623204533/image.png
# fmt: off
IDENTIFY = 0
SELECT_PROTOCOL = 1
READY = 2
HEARTBEAT = 3
SESSION_DESCRIPTION = 4 # (aka SELECT_PROTOCOL_ACK)
SPEAKING = 5
HEARTBEAT_ACK = 6
RESUME = 7
HELLO = 8
RESUMED = 9
CLIENT_CONNECT = 11
VIDEO = 12
CLIENT_DISCONNECT = 13
SESSION_UPDATE = 14 # (useless)
MEDIA_SINK_WANTS = 15 # (useless)
VOICE_BACKEND_VERSION = 16 # (useless)
CHANNEL_OPTIONS_UPDATE = 17 # (dead)
FLAGS = 18
SPEED_TEST = 19 # (dead)
PLATFORM = 20
# fmt: on
async def hook(self: DiscordVoiceWebSocket, msg: Dict[str, Any]):
op: int = msg['op']
data: Dict[str, Any] = msg.get('d', {})
vc: VoiceRecvClient = self._connection.voice_client # type: ignore
if op not in (3, 6):
from pprint import pformat
log.debug("Received op %s: \n%s", op, pformat(data, compact=True))
if len(msg.keys()) > 2:
m = msg.copy()
m.pop('op')
m.pop('d')
log.info("WS payload has extra keys: %s", m)
if op == self.READY:
vc._add_ssrc(vc.guild.me.id, data['ssrc'])
elif op == self.SESSION_DESCRIPTION:
if vc._reader:
# TODO: remove bytes cast once type is fixed in dpy
vc._reader.update_secret_key(bytes(self.secret_key)) # type: ignore
elif op == self.SPEAKING:
# this event refers to the speaking MODE, e.g. priority speaker
# it also sends the user's ssrc
uid = int(data['user_id'])
ssrc = data['ssrc']
vc._add_ssrc(uid, ssrc)
member = vc.guild.get_member(uid)
state = try_enum(SpeakingState, data['speaking'])
vc.dispatch("voice_member_speaking_state", member, ssrc, state)
elif op == CLIENT_CONNECT:
uids = [int(uid) for uid in data['user_ids']]
# Multiple user IDs means this is the initial member list
for uid in uids:
member = vc.guild.get_member(uid)
vc.dispatch("voice_member_connect", member)
elif op == VIDEO:
uid = int(data['user_id'])
vc._add_ssrc(uid, data['audio_ssrc'])
member = vc.guild.get_member(uid)
streams = VoiceVideoStreams(data=cast('VoiceVideoPayload', data), vc=vc)
vc.dispatch("voice_member_video", member, streams)
elif op == CLIENT_DISCONNECT:
uid = int(data['user_id'])
ssrc = vc._get_ssrc_from_id(uid)
if vc._reader and ssrc is not None:
log.debug("Destroying decoder for %s, ssrc=%s", uid, ssrc)
vc._reader.packet_router.destroy_decoder(ssrc)
vc._remove_ssrc(user_id=uid)
member = vc.guild.get_member(uid)
vc.dispatch("voice_member_disconnect", member, ssrc)
elif op == FLAGS:
uid = int(data['user_id'])
member = vc.guild.get_member(uid)
vc.dispatch("voice_member_flags", member, VoiceFlags._from_value(data['flags'] or 0))
elif op == PLATFORM:
uid = int(data['user_id'])
member = vc.guild.get_member(uid)
vc.dispatch(
"voice_member_platform",
member,
try_enum(VoicePlatform, data['platform']) if data['platform'] is not None else None,
)
@@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Final
from .buffer import HeapJitterBuffer as JitterBuffer
from .rtp import FakePacket
from .utils import add_wrapped
from discord.opus import Decoder
try:
from davey import MediaType
has_dave = True
except ImportError:
has_dave = False
if TYPE_CHECKING:
from typing import Optional, Tuple, Dict, Callable, Any
from .rtp import AudioPacket
from .sinks import AudioSink
from .router import PacketRouter
from .voice_client import VoiceRecvClient
from .types import MemberOrUser as User
EventCB = Callable[..., Any]
EventData = Tuple[str, Tuple[Any, ...], Dict[str, Any]]
log = logging.getLogger(__name__)
__all__ = [
'VoiceData',
]
class VoiceData:
"""Container object for audio data and source user."""
__slots__ = ('packet', 'source', 'pcm')
def __init__(self, packet: AudioPacket, source: Optional[User], *, pcm: Optional[bytes] = None):
self.packet: AudioPacket = packet
self.source: Optional[User] = source
self.pcm: bytes = pcm if pcm else b''
@property
def opus(self) -> Optional[bytes]:
return self.packet.decrypted_data
class PacketDecoder:
def __init__(self, router: PacketRouter, ssrc: int):
self.router: PacketRouter = router
self.ssrc: int = ssrc
self._decoder: Optional[Decoder] = None if self.sink.wants_opus() else Decoder()
self._buffer: JitterBuffer = JitterBuffer()
self._cached_id: Optional[int] = None
self.vc: VoiceRecvClient = self.sink.voice_client # type: ignore
self.vc._connection.dave_session.set_passthrough_mode(True, 10)
self._last_seq: int = -1
self._last_ts: int = -1
@property
def sink(self) -> AudioSink:
return self.router.sink
def _get_user(self, user_id: int) -> Optional[User]:
vc: VoiceRecvClient = self.sink.voice_client # type: ignore
return vc.guild.get_member(user_id) or vc.client.get_user(user_id)
def _get_cached_member(self) -> Optional[User]:
return self._get_user(self._cached_id) if self._cached_id else None
def _flag_ready_state(self):
if self._buffer.peek():
self.router.waiter.register(self)
else:
self.router.waiter.unregister(self)
def push_packet(self, packet: AudioPacket) -> None:
self._buffer.push(packet)
self._flag_ready_state()
def pop_data(self, *, timeout: float = 0) -> Optional[VoiceData]:
packet = self._get_next_packet(timeout)
self._flag_ready_state()
if packet is None:
return
return self._process_packet(packet)
def set_user_id(self, user_id: int) -> None:
self._cached_id = user_id
def reset(self) -> None:
self._buffer.reset()
self._decoder = None if self.sink.wants_opus() else Decoder()
self._last_seq = self._last_ts = -1
self._flag_ready_state()
def destroy(self) -> None:
self._buffer.reset()
self._decoder = None
self._flag_ready_state()
def _get_next_packet(self, timeout: float) -> Optional[AudioPacket]:
packet = self._buffer.pop(timeout=timeout)
if packet is None:
# Gets the last (buffered) packet out (i think)
# TODO: revist this, might be an issue
if self._buffer:
packets = self._buffer.flush()
if any(packets[1:]):
log.warning(
"%s packets were lost being flushed in decoder-%s\n --> (last=%s) %s",
len(packets) - 1,
self.ssrc,
self._last_seq,
[p.sequence for p in packets],
)
return packets[0]
return
elif not packet:
packet = self._make_fakepacket()
return packet
def _make_fakepacket(self) -> FakePacket:
seq = add_wrapped(self._last_seq, 1)
ts = add_wrapped(self._last_ts, Decoder.SAMPLES_PER_FRAME, wrap=2**32)
return FakePacket(self.ssrc, seq, ts)
def _process_packet(self, packet: AudioPacket) -> VoiceData:
pcm = None
member = self._get_cached_member()
if member is None:
self._cached_id = self.sink.voice_client._get_id_from_ssrc(self.ssrc) # type: ignore
member = self._get_cached_member()
if has_dave and not packet.is_silence() and packet.decrypted_data is not None and self.vc._connection.dave_session is not None and self.vc._connection.dave_session.ready:
try:
packet.decrypted_data = self.vc._connection.dave_session.decrypt(member.id, MediaType.audio,
bytes(
packet.decrypted_data)) # type: ignore
except:
self._last_seq = packet.sequence
self._last_ts = packet.timestamp
return VoiceData(packet, None, pcm=b'')
if not self.sink.wants_opus():
packet, pcm = self._decode_packet(packet)
data = VoiceData(packet, member, pcm=pcm)
self._last_seq = packet.sequence
self._last_ts = packet.timestamp
return data
def _decode_packet(self, packet: AudioPacket) -> Tuple[AudioPacket, bytes]:
assert self._decoder is not None
# Decode as per usual
if packet:
try:
pcm = self._decoder.decode(packet.decrypted_data, fec=False)
except:
pcm = self._decoder.decode(None, fec=False)
return packet, pcm
# Fake packet, need to check next one to use fec
next_packet = self._buffer.peek_next()
if next_packet is not None:
nextdata: bytes = next_packet.decrypted_data # type: ignore
log.debug(
"Generating fec packet: fake=%s, fec=%s",
packet.sequence,
next_packet.sequence,
)
pcm = self._decoder.decode(nextdata, fec=True)
# Need to drop a packet
else:
pcm = self._decoder.decode(None, fec=False)
return packet, pcm
@@ -0,0 +1,422 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import time
import logging
import threading
from operator import itemgetter
from typing import TYPE_CHECKING
from . import rtp
from .sinks import AudioSink
from .router import PacketRouter, SinkEventRouter
try:
import nacl.secret
from nacl.exceptions import CryptoError
except ImportError as e:
raise RuntimeError("pynacl is required") from e
if TYPE_CHECKING:
from typing import Optional, Callable, Any, Dict, Literal, Union
from discord import Member
from discord.types.voice import SupportedModes
from .voice_client import VoiceRecvClient
from .rtp import RTPPacket
DecryptRTP = Callable[[RTPPacket], bytes]
DecryptRTCP = Callable[[bytes], bytes]
AfterCB = Callable[[Optional[Exception]], Any]
SpeakingEvent = Literal['voice_member_speaking_start', 'voice_member_speaking_stop']
EncryptionBox = Union[nacl.secret.SecretBox, nacl.secret.Aead]
log = logging.getLogger(__name__)
__all__ = [
'AudioReader',
]
class AudioReader:
def __init__(self, sink: AudioSink, voice_client: VoiceRecvClient, *, after: Optional[AfterCB] = None):
if after is not None and not callable(after):
raise TypeError('Expected a callable for the "after" parameter.')
self.sink: AudioSink = sink
self.voice_client: VoiceRecvClient = voice_client
self.after: Optional[AfterCB] = after
# No need for the whole set_sink() call
self.sink._voice_client = voice_client
self.active: bool = False
self.error: Optional[Exception] = None
self.packet_router: PacketRouter = PacketRouter(sink, self)
self.event_router: SinkEventRouter = SinkEventRouter(sink, self)
self.decryptor: PacketDecryptor = PacketDecryptor(voice_client.mode, bytes(voice_client.secret_key))
self.speaking_timer: SpeakingTimer = SpeakingTimer(self)
self.keepalive: UDPKeepAlive = UDPKeepAlive(voice_client)
def is_listening(self) -> bool:
return self.active
def update_secret_key(self, secret_key: bytes) -> None:
self.decryptor.update_secret_key(secret_key)
def start(self) -> None:
if self.active:
log.debug('Reader is already started', exc_info=True)
return
self.speaking_timer.start()
self.event_router.start()
self.packet_router.start()
self.voice_client._connection.add_socket_listener(self.callback)
self.keepalive.start()
self.active = True
def stop(self) -> None:
if not self.active:
log.debug('Tried to stop an inactive reader', exc_info=True)
return
self.voice_client._connection.remove_socket_listener(self.callback)
self.active = False
self.speaking_timer.notify()
threading.Thread(target=self._stop, name=f'audioreader-stopper-{id(self):x}').start()
def _stop(self) -> None:
try:
self.packet_router.stop()
except Exception as e:
self.error = e
log.exception('Error stopping packet router')
try:
self.event_router.stop()
except Exception as e:
self.error = e
log.exception('Error stopping event router')
self.speaking_timer.stop()
self.keepalive.stop()
if self.after:
try:
self.after(self.error)
except Exception:
log.exception('Error calling listener after function')
for sink in self.sink.root.walk_children(with_self=True):
try:
sink.cleanup()
except Exception:
log.exception('Error calling cleanup() for %s', sink)
def set_sink(self, sink: AudioSink) -> AudioSink:
"""Sets the new sink for the reader and returns the old one.
Does not call cleanup()
"""
# This whole function is potentially very racy
old_sink = self.sink
old_sink._voice_client = None
sink._voice_client = self.voice_client
self.packet_router.set_sink(sink)
self.sink = sink
return old_sink
def _is_ip_discovery_packet(self, data: bytes) -> bool:
return len(data) == 74 and data[1] == 0x02
def callback(self, packet_data: bytes) -> None:
packet = rtp_packet = rtcp_packet = None
try:
if not rtp.is_rtcp(packet_data):
packet = rtp_packet = rtp.decode_rtp(packet_data)
packet.decrypted_data = self.decryptor.decrypt_rtp(packet)
else:
packet = rtcp_packet = rtp.decode_rtcp(self.decryptor.decrypt_rtcp(packet_data))
if not isinstance(packet, rtp.ReceiverReportPacket):
log.info("Received unexpected rtcp packet: type=%s, %s", packet.type, type(packet))
log.debug("Packet info:\n packet=%s\n data=%s", packet, packet_data)
except CryptoError as e:
log.error("CryptoError decoding packet data")
log.debug("CryptoError details:\n data=%s\n secret_key=%s", packet_data, self.voice_client.secret_key)
return
except Exception as e:
if self._is_ip_discovery_packet(packet_data):
log.debug("Ignoring ip discovery packet")
return
log.exception("Error unpacking packet")
log.debug("Packet data: len=%s data=%s", len(packet_data), packet_data)
finally:
if self.error:
self.stop()
return
if not packet:
return
if rtcp_packet:
self.packet_router.feed_rtcp(rtcp_packet)
elif rtp_packet:
ssrc = rtp_packet.ssrc
if ssrc not in self.voice_client._ssrc_to_id:
if rtp_packet.is_silence():
# TODO: buffer packets from unknown ssrcs, 50 max?
# also remove this log later its pointless
log.debug("Skipping silence packet for unknown ssrc %s", ssrc)
return
else:
log.info("Received packet for unknown ssrc %s:\n%s", ssrc, rtp_packet)
self.speaking_timer.notify(ssrc)
try:
self.packet_router.feed_rtp(rtp_packet)
except Exception as e:
log.exception('Error processing rtp packet')
self.error = e
self.stop()
class PacketDecryptor:
supported_modes: list[SupportedModes] = [
'aead_xchacha20_poly1305_rtpsize',
'xsalsa20_poly1305_lite',
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305',
]
def __init__(self, mode: SupportedModes, secret_key: bytes) -> None:
self.mode: SupportedModes = mode
try:
self.decrypt_rtp: DecryptRTP = getattr(self, '_decrypt_rtp_' + mode)
self.decrypt_rtcp: DecryptRTCP = getattr(self, '_decrypt_rtcp_' + mode)
except AttributeError as e:
raise NotImplementedError(mode) from e
self.box: EncryptionBox = self._make_box(secret_key)
def _make_box(self, secret_key: bytes) -> EncryptionBox:
if self.mode.startswith("aead"):
return nacl.secret.Aead(secret_key)
else:
return nacl.secret.SecretBox(secret_key)
def update_secret_key(self, secret_key: bytes) -> None:
self.box = self._make_box(secret_key)
def _decrypt_rtp_xsalsa20_poly1305(self, packet: RTPPacket) -> bytes:
nonce = bytearray(24)
nonce[:12] = packet.header
result = self.box.decrypt(bytes(packet.data), bytes(nonce))
if packet.extended:
offset = packet.update_ext_headers(result)
result = result[offset:]
return result
def _decrypt_rtcp_xsalsa20_poly1305(self, data: bytes) -> bytes:
nonce = bytearray(24)
nonce[:8] = data[:8]
result = self.box.decrypt(data[8:], bytes(nonce))
return data[:8] + result
def _decrypt_rtp_xsalsa20_poly1305_suffix(self, packet: RTPPacket) -> bytes:
nonce = packet.data[-24:]
voice_data = packet.data[:-24]
result = self.box.decrypt(bytes(voice_data), bytes(nonce))
if packet.extended:
offset = packet.update_ext_headers(result)
result = result[offset:]
return result
def _decrypt_rtcp_xsalsa20_poly1305_suffix(self, data: bytes) -> bytes:
nonce = data[-24:]
header = data[:8]
result = self.box.decrypt(data[8:-24], nonce)
return header + result
def _decrypt_rtp_xsalsa20_poly1305_lite(self, packet: RTPPacket) -> bytes:
nonce = bytearray(24)
nonce[:4] = packet.data[-4:]
voice_data = packet.data[:-4]
result = self.box.decrypt(bytes(voice_data), bytes(nonce))
if packet.extended:
offset = packet.update_ext_headers(result)
result = result[offset:]
return result
def _decrypt_rtcp_xsalsa20_poly1305_lite(self, data: bytes) -> bytes:
nonce = bytearray(24)
nonce[:4] = data[-4:]
header = data[:8]
result = self.box.decrypt(data[8:-4], bytes(nonce))
return header + result
def _decrypt_rtp_aead_xchacha20_poly1305_rtpsize(self, packet: RTPPacket) -> bytes:
packet.adjust_rtpsize()
nonce = bytearray(24)
nonce[:4] = packet.nonce
voice_data = packet.data
# Blob vomit
assert isinstance(self.box, nacl.secret.Aead)
result = self.box.decrypt(bytes(voice_data), bytes(packet.header), bytes(nonce))
if packet.extended:
offset = packet.update_ext_headers(result)
result = result[offset:]
return result
def _decrypt_rtcp_aead_xchacha20_poly1305_rtpsize(self, data: bytes) -> bytes:
nonce = bytearray(24)
nonce[:4] = data[-4:]
header = data[:8]
assert isinstance(self.box, nacl.secret.Aead)
result = self.box.decrypt(data[8:-4], bytes(header), bytes(nonce))
return header + result
class SpeakingTimer(threading.Thread):
def __init__(self, reader: AudioReader):
super().__init__(daemon=True, name=f'speaking-timer-{id(self):x}')
self.reader: AudioReader = reader
self.voice_client = reader.voice_client
self.speaking_timeout_delay: float = 0.2
self.last_speaking_state: Dict[int, bool] = {}
self.speaking_cache: Dict[int, float] = {}
self.speaking_timer_event: threading.Event = threading.Event()
self._end_thread: threading.Event = threading.Event()
def _lookup_member(self, ssrc: int) -> Optional[Member]:
whoid = self.voice_client._get_id_from_ssrc(ssrc)
return self.voice_client.guild.get_member(whoid) if whoid else None
def maybe_dispatch_speaking_start(self, ssrc: int) -> None:
tlast = self.speaking_cache.get(ssrc)
if tlast is None or tlast + self.speaking_timeout_delay < time.perf_counter():
self.dispatch('voice_member_speaking_start', ssrc)
def dispatch(self, event: SpeakingEvent, ssrc: int) -> None:
who = self._lookup_member(ssrc)
if not who:
return
self.voice_client.dispatch_sink(event, who)
def notify(self, ssrc: Optional[int] = None) -> None:
if ssrc is not None:
self.last_speaking_state[ssrc] = True
self.maybe_dispatch_speaking_start(ssrc)
self.speaking_cache[ssrc] = time.perf_counter()
self.speaking_timer_event.set()
self.speaking_timer_event.clear()
def drop_ssrc(self, ssrc: int) -> None:
self.speaking_cache.pop(ssrc, None)
state = self.last_speaking_state.pop(ssrc, None)
if state:
self.dispatch('voice_member_speaking_stop', ssrc)
self.notify()
def get_speaking(self, ssrc: int) -> Optional[bool]:
return self.last_speaking_state.get(ssrc)
def stop(self) -> None:
self._end_thread.set()
self.notify()
def run(self) -> None:
_i1 = itemgetter(1)
def get_next_entry():
cache = sorted(self.speaking_cache.items(), key=_i1)
for ssrc, tlast in cache:
# only return pair if speaking
if self.last_speaking_state.get(ssrc):
return ssrc, tlast
return None, None
self.speaking_timer_event.wait()
while not self._end_thread.is_set():
if not self.speaking_cache:
self.speaking_timer_event.wait()
tnow = time.perf_counter()
ssrc, tlast = get_next_entry()
# no ssrc has been speaking, nothing to timeout
if ssrc is None or tlast is None:
self.speaking_timer_event.wait()
continue
self.speaking_timer_event.wait(tlast + self.speaking_timeout_delay - tnow)
if time.perf_counter() < tlast + self.speaking_timeout_delay:
continue
self.dispatch('voice_member_speaking_stop', ssrc)
self.last_speaking_state[ssrc] = False
# TODO: unify into a single thread that does all keepalives
class UDPKeepAlive(threading.Thread):
delay: int = 5000
def __init__(self, voice_client: VoiceRecvClient):
super().__init__(daemon=True, name=f"voice-udp-keepalive-{id(self):x}")
self.voice_client: VoiceRecvClient = voice_client
self.last_time: float = 0
self.counter: int = 0
self._end_thread: threading.Event = threading.Event()
def run(self) -> None:
self.voice_client.wait_until_connected()
while not self._end_thread.is_set():
vc = self.voice_client
try:
packet = self.counter.to_bytes(8, 'big')
except OverflowError:
self.counter = 0
continue
try:
vc._connection.socket.sendto(packet, (vc._connection.endpoint_ip, vc._connection.voice_port))
except Exception as e:
log.debug("Error sending keepalive to socket: %s: %s", e.__class__.__name__, e)
# TODO: test connection interruptions
vc.wait_until_connected()
if vc.is_connected():
continue
break
else:
self.counter += 1
time.sleep(self.delay)
def stop(self) -> None:
self._end_thread.set()
@@ -0,0 +1,203 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import queue
import logging
import threading
from collections import deque
from .utils import MultiDataEvent
from .opus import PacketDecoder
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Tuple, Dict, List, Callable, Any, Optional
from .rtp import RTPPacket, RTCPPacket
from .sinks import AudioSink
from .reader import AudioReader
EventCB = Callable[..., Any]
EventData = Tuple[str, Tuple[Any, ...], Dict[str, Any]]
log = logging.getLogger(__name__)
class PacketRouter(threading.Thread):
def __init__(self, sink: AudioSink, reader: AudioReader):
super().__init__(daemon=True, name=f"packet-router-{id(self):x}")
self.sink: AudioSink = sink
self.decoders: Dict[int, PacketDecoder] = {}
self.reader: AudioReader = reader
self.waiter: MultiDataEvent[PacketDecoder] = MultiDataEvent()
self._lock: threading.RLock = threading.RLock()
self._end_thread: threading.Event = threading.Event()
self._dropped_ssrcs: deque[int] = deque(maxlen=16)
def feed_rtp(self, packet: RTPPacket) -> None:
# TODO: stale packet check
if packet.ssrc in self._dropped_ssrcs:
log.debug("Ignoring packet from dropped ssrc %s", packet.ssrc)
return
with self._lock:
decoder = self.get_decoder(packet.ssrc)
if decoder is not None:
decoder.push_packet(packet)
def feed_rtcp(self, packet: RTCPPacket) -> None:
guild = self.sink.voice_client.guild if self.sink.voice_client else None
event_router = self.reader.event_router
event_router.dispatch('rtcp_packet', packet, guild)
def get_decoder(self, ssrc: int) -> Optional[PacketDecoder]:
with self._lock:
decoder = self.decoders.get(ssrc)
if decoder is None:
decoder = self.decoders[ssrc] = PacketDecoder(self, ssrc)
return decoder
def set_sink(self, sink: AudioSink) -> None:
with self._lock:
self.sink = sink
def set_user_id(self, ssrc: int, user_id: int) -> None:
with self._lock:
if ssrc in self._dropped_ssrcs:
self._dropped_ssrcs.remove(ssrc)
decoder = self.decoders.get(ssrc)
if decoder is not None:
decoder.set_user_id(user_id)
def destroy_decoder(self, ssrc: int) -> None:
with self._lock:
decoder = self.decoders.pop(ssrc, None)
if decoder is not None:
self._dropped_ssrcs.append(ssrc)
decoder.destroy()
def destroy_all_decoders(self) -> None:
with self._lock:
for ssrc in list(self.decoders.keys()):
self.destroy_decoder(ssrc)
def stop(self) -> None:
self._end_thread.set()
self.waiter.notify()
def run(self) -> None:
try:
self._do_run()
except Exception as e:
log.exception("Error in %s loop", self)
self.reader.error = e
finally:
self.reader.voice_client.stop_listening()
self.waiter.clear()
def _do_run(self) -> None:
while not self._end_thread.is_set():
self.waiter.wait()
with self._lock:
for decoder in self.waiter.items:
data = decoder.pop_data()
if data is not None and data.source is not None:
self.sink.write(data.source, data)
class SinkEventRouter(threading.Thread):
def __init__(self, sink: AudioSink, reader: AudioReader):
super().__init__(daemon=True, name=f"sink-event-router-{id(self):x}")
self.sink: AudioSink = sink
self.reader: AudioReader = reader
self._event_listeners: Dict[str, List[EventCB]] = {}
self._buffer: queue.SimpleQueue[EventData] = queue.SimpleQueue()
self._lock = threading.RLock()
self._end_thread: threading.Event = threading.Event()
self.register_events()
def dispatch(self, event: str, /, *args: Any, **kwargs: Any) -> None:
log.debug("Dispatching voice_client event %s", event)
self._buffer.put_nowait((event, args, kwargs))
def set_sink(self, sink: AudioSink) -> None:
with self._lock:
self.unregister_events()
self.sink = sink
self.register_events()
def register_events(self) -> None:
with self._lock:
self._register_listeners(self.sink)
for child in self.sink.walk_children():
self._register_listeners(child)
def unregister_events(self) -> None:
with self._lock:
self._unregister_listeners(self.sink)
for child in self.sink.walk_children():
self._unregister_listeners(child)
def _register_listeners(self, sink: AudioSink) -> None:
log.debug("Registering events for %s: %s ", sink, sink.__sink_listeners__)
for name, method_name in sink.__sink_listeners__:
func = getattr(sink, method_name)
log.debug("Registering event: %r, func: %r", name, method_name)
if name in self._event_listeners:
self._event_listeners[name].append(func)
else:
self._event_listeners[name] = [func]
def _unregister_listeners(self, sink: AudioSink):
for name, method_name in sink.__sink_listeners__:
func = getattr(sink, method_name)
if name in self._event_listeners:
try:
self._event_listeners[name].remove(func)
except ValueError:
pass
def _dispatch_to_listeners(self, event: str, *args: Any, **kwargs: Any) -> None:
for listener in self._event_listeners.get(f'on_{event}', []):
try:
listener(*args, **kwargs)
except Exception:
log.exception("Unhandled exception dispatching voice listener event %r", event)
log.debug("event=%r, args=%r, kwargs=%r, listener=%r", event, args, kwargs, listener)
def stop(self) -> None:
self._end_thread.set()
def run(self) -> None:
try:
self._do_run()
except Exception as e:
log.exception("Error in %s", self.name)
self.reader.error = e
self.reader.voice_client.stop_listening()
def _do_run(self) -> None:
while not self._end_thread.is_set():
try:
event, args, kwargs = self._buffer.get(timeout=0.5)
except queue.Empty:
continue
else:
with self._lock:
# this looks dumb
with self.reader.packet_router._lock:
self._dispatch_to_listeners(event, *args, **kwargs)
@@ -0,0 +1,471 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import struct
import logging
from math import ceil
from collections import namedtuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, Literal, Union, Final, Dict, Any, Tuple
AudioPacket = Union['RTPPacket', 'FakePacket', 'SilencePacket']
RealPacket = Union['RTPPacket', 'RTCPPacket']
Packet = Union[RealPacket, 'FakePacket', 'SilencePacket']
PacketTypes = Union[
'SenderReportPacket',
'ReceiverReportPacket',
'SDESPacket',
'BYEPacket',
'APPPacket',
]
log = logging.getLogger(__name__)
__all__ = [
'RTPPacket',
'RTCPPacket',
'FakePacket',
'SilencePacket',
'ExtensionID',
]
OPUS_SILENCE: Final = b'\xf8\xff\xfe'
class ExtensionID:
audio_power: Final = 1
speaking_state: Final = 9
def decode(data: bytes) -> RealPacket:
"""Creates an :class:`RTPPacket` or an :class:`RTCPPacket`.
Parameters
-----------
data : bytes
The raw packet data.
"""
# While technically unreliable, discord RTP packets (should)
# always be distinguishable from RTCP packets. RTCP packets
# should always have 200-204 as their second byte, while RTP
# packet are (probably) always 73 (or at least not 200-204).
# check version bits
if not data[0] >> 6 == 2:
raise ValueError(f'Invalid packet header 0b{data[0]:0>8b}')
return _rtcp_map.get(data[1], RTPPacket)(data)
def decode_rtp(data: bytes) -> RTPPacket:
return decode(data) # type: ignore
def decode_rtcp(data: bytes) -> RTCPPacket:
return decode(data) # type: ignore
def is_rtcp(data: bytes) -> bool:
return 200 <= data[1] <= 204
def _parse_low(x: int, bitlen: int = 32) -> float:
return x / 2.0**bitlen
def _into_low(x: float, bitlen: int = 32) -> int:
return int(x * 2.0**bitlen)
class _PacketCmpMixin:
__slots__ = ('ssrc', 'sequence', 'timestamp')
def __lt__(self, other: _PacketCmpMixin) -> bool:
if self.ssrc != other.ssrc:
raise TypeError("packet ssrc mismatch (%s, %s)" % (self.ssrc, other.ssrc))
return self.sequence < other.sequence and self.timestamp < other.timestamp
def __gt__(self, other: _PacketCmpMixin) -> bool:
if self.ssrc != other.ssrc:
raise TypeError("packet ssrc mismatch (%s, %s)" % (self.ssrc, other.ssrc))
return self.sequence > other.sequence or self.timestamp > other.timestamp
def __eq__(self, other: _PacketCmpMixin) -> bool:
if self.ssrc != other.ssrc:
return False
return self.sequence == other.sequence and self.timestamp == other.timestamp
def is_silence(self) -> bool:
data = getattr(self, 'decrypted_data', None)
return data == OPUS_SILENCE
class FakePacket(_PacketCmpMixin):
__slots__ = ('ssrc', 'sequence', 'timestamp')
decrypted_data: bytes = b''
extension_data: dict = {}
def __init__(self, ssrc: int, sequence: int, timestamp: int):
self.ssrc: int = ssrc
self.sequence: int = sequence
self.timestamp: int = timestamp
def __repr__(self) -> str:
return '<FakePacket ssrc={0.ssrc}, sequence={0.sequence}, timestamp={0.timestamp}>'.format(self)
def __bool__(self) -> Literal[False]:
return False
class SilencePacket(_PacketCmpMixin):
__slots__ = ('ssrc', 'timestamp')
decrypted_data: Final = OPUS_SILENCE
extension_data: Final[Dict[int, Any]] = {}
sequence: int = -1
def __init__(self, ssrc: int, timestamp: int):
self.ssrc: int = ssrc
self.timestamp: int = timestamp
def __repr__(self) -> str:
return '<SilencePacket ssrc={0.ssrc}, timestamp={0.timestamp}>'.format(self)
def is_silence(self) -> bool:
return True
class RTPPacket(_PacketCmpMixin):
__slots__ = (
'version',
'padding',
'extended',
'cc',
'marker',
'payload',
'sequence',
'timestamp',
'ssrc',
'csrcs',
'header',
'data',
'decrypted_data',
'nonce',
'extension',
'extension_data',
'_rtpsize',
)
_hstruct = struct.Struct('>xxHII')
_ext_header = namedtuple("Extension", 'profile length values')
_ext_magic = b'\xbe\xde'
def __init__(self, data: bytes):
data = bytearray(data) # type: ignore
# fmt: off
self.version: int = data[0] >> 6
self.padding: bool = bool(data[0] & 0b00100000)
self.extended: bool = bool(data[0] & 0b00010000)
self.cc: int = data[0] & 0b00001111
self.marker: bool = bool(data[1] & 0b10000000)
self.payload: int = data[1] & 0b01111111
# fmt: on
sequence, timestamp, ssrc = self._hstruct.unpack_from(data)
self.sequence: int = sequence
self.timestamp: int = timestamp
self.ssrc: int = ssrc
self.csrcs: Tuple[int, ...] = ()
self.extension = None
self.extension_data: Dict[int, bytes] = {}
self.header = data[:12]
self.data = data[12:]
self.decrypted_data: Optional[bytes] = None
self.nonce: bytes = b''
self._rtpsize: bool = False
if self.cc:
fmt = '>%sI' % self.cc
offset = struct.calcsize(fmt) + 12
self.csrcs = struct.unpack(fmt, data[12:offset])
self.data = data[offset:]
# TODO?: impl padding calculations (though discord doesn't seem to use that bit)
def adjust_rtpsize(self):
"""Adjusts the packet header and data based on the rtpsize format."""
self._rtpsize = True
self.nonce = self.data[-4:]
if not self.extended:
self.data = self.data[:-4]
return
# rtpsize based formats are laid out similarly to SRTP packets, which includes the ext header now
# the nonce also needs to be removed from the end
self.header += self.data[:4]
self.data = self.data[4:-4]
def update_ext_headers(self, data: bytes) -> int:
"""Adds extended header data to this packet, returns payload offset"""
if not self.extended:
return 0
# rtpsize formats have the extension header in the rtp header instead of payload
if self._rtpsize:
data = self.header[-4:] + data
# data is the decrypted packet payload containing the extension header and opus data
profile, length = struct.unpack_from('>2sH', data)
if profile == self._ext_magic:
self._parse_bede_header(data, length)
values = struct.unpack('>%sI' % length, data[4 : 4 + length * 4])
self.extension = self._ext_header(profile, length, values)
offset = 4 + length * 4
if self._rtpsize:
# remove the extra offset from adding the header in
offset -= 4
return offset
# https://www.rfcreader.com/#rfc5285_line186
def _parse_bede_header(self, data: bytes, length: int) -> None:
offset = 4
n = 0
while n < length:
next_byte = data[offset : offset + 1]
if next_byte == b'\x00':
offset += 1
continue
header = struct.unpack('>B', next_byte)[0]
element_id = header >> 4
element_len = 1 + (header & 0b0000_1111)
self.extension_data[element_id] = data[offset + 1 : offset + 1 + element_len]
offset += 1 + element_len
n += 1
def _dump_info(self) -> str:
attrs = {name: getattr(self, name) for name in self.__slots__}
return ''.join(("<RTPPacket ", *['{}={}, '.format(n, v) for n, v in attrs.items()], '>'))
def __repr__(self) -> str:
return (
'<RTPPacket '
'ssrc={0.ssrc}, '
'sequence={0.sequence}, '
'timestamp={0.timestamp}, '
'size={1}, '
'ext={2}'
'>'.format(self, len(self.data), set(self.extension_data))
)
# http://www.rfcreader.com/#rfc3550_line855
class RTCPPacket:
__slots__ = ('version', 'padding', 'length')
_header = struct.Struct('>BBH')
_ssrc_fmt = struct.Struct('>I')
type = None
def __init__(self, data: bytes):
self.length: int
head, _, self.length = self._header.unpack_from(data)
self.version: int = head >> 6
self.padding: bool = bool(head & 0b00100000)
# dubious, yet devious
setattr(self, self.__slots__[0], head & 0b00011111)
def __repr__(self) -> str:
content = ', '.join("{}: {}".format(k, getattr(self, k, None)) for k in self.__slots__)
return "<{} {}>".format(self.__class__.__name__, content)
@classmethod
def from_data(cls, data: bytes) -> PacketTypes:
_, ptype, _ = cls._header.unpack_from(data)
return _rtcp_map[ptype](data)
# TODO?: consider moving repeated code to a ReportPacket type
# http://www.rfcreader.com/#rfc3550_line1614
class SenderReportPacket(RTCPPacket):
__slots__ = ('report_count', 'ssrc', 'info', 'reports', 'extension')
_info_fmt = struct.Struct('>5I')
_report_fmt = struct.Struct('>IB3x4I')
_24bit_int_fmt = struct.Struct('>4xI')
_info = namedtuple('RRSenderInfo', 'ntp_ts rtp_ts packet_count octet_count')
_report = namedtuple("RReport", 'ssrc perc_loss total_lost last_seq jitter lsr dlsr')
type = 200
def __init__(self, data):
super().__init__(data)
self.ssrc = self._ssrc_fmt.unpack_from(data, 4)[0]
self.info = self._read_sender_info(data, 8)
reports = []
for x in range(self.report_count):
offset = 28 + 24 * x
reports.append(self._read_report(data, offset))
self.reports = tuple(reports)
self.extension = None
if len(data) > 28 + 24 * self.report_count:
self.extension = data[28 + 24 * self.report_count :]
def _read_sender_info(self, data, offset):
nhigh, nlow, rtp_ts, pcount, ocount = self._info_fmt.unpack_from(data, offset)
ntotal = nhigh + _parse_low(nlow)
return self._info(ntotal, rtp_ts, pcount, ocount)
def _read_report(self, data, offset):
ssrc, flost, seq, jit, lsr, dlsr = self._report_fmt.unpack_from(data, offset)
clost = self._24bit_int_fmt.unpack_from(data, offset)[0] & 0xFFFFFF
return self._report(ssrc, flost, clost, seq, jit, lsr, dlsr)
# http://www.rfcreader.com/#rfc3550_line1879
class ReceiverReportPacket(RTCPPacket):
__slots__ = ('report_count', 'ssrc', 'reports', 'extension')
_report_fmt = struct.Struct('>IB3x4I')
_24bit_int_fmt = struct.Struct('>4xI')
_report = namedtuple("RReport", 'ssrc perc_loss total_lost last_seq jitter lsr dlsr')
type = 201
reports: Tuple[_report, ...]
def __init__(self, data: bytes):
super().__init__(data)
self.ssrc: int = self._ssrc_fmt.unpack_from(data, 4)[0]
reports = []
for x in range(self.report_count):
offset = 8 + 24 * x
reports.append(self._read_report(data, offset))
self.reports = tuple(reports)
self.extension: Optional[bytes] = None
if len(data) > 8 + 24 * self.report_count:
self.extension = data[8 + 24 * self.report_count :]
def _read_report(self, data: bytes, offset: int) -> _report:
ssrc, flost, seq, jit, lsr, dlsr = self._report_fmt.unpack_from(data, offset)
clost = self._24bit_int_fmt.unpack_from(data, offset)[0] & 0xFFFFFF
return self._report(ssrc, flost, clost, seq, jit, lsr, dlsr)
# UNFORTUNATELY it seems discord only uses the above ~~two packet types~~ packet type.
# Good thing I knew that when I made the rest of these. Haha yes.
# http://www.rfcreader.com/#rfc3550_line2024
class SDESPacket(RTCPPacket):
__slots__ = ('source_count', 'chunks', '_pos')
_item_header = struct.Struct('>BB')
_chunk = namedtuple("SDESChunk", 'ssrc items')
_item = namedtuple("SDESItem", 'type size length text')
type = 202
def __init__(self, data):
super().__init__(data)
_chunks = []
self._pos = 4
for _ in range(self.source_count):
_chunks.append(self._read_chunk(data))
self.chunks = tuple(_chunks)
def _read_chunk(self, data):
ssrc = self._ssrc_fmt.unpack_from(data, self._pos)[0]
self._pos += 4
# check for chunk with no items
if data[self._pos : self._pos + 4] == b'\x00\x00\x00\x00':
self._pos += 4
return self._chunk(ssrc, ())
items = [self._read_item(data)]
# Read items until END type is found
while items[-1].type != 0:
items.append(self._read_item(data))
# pad chunk to 4 bytes
if self._pos % 4:
self._pos = ceil(self._pos / 4) * 4
return self._chunk(ssrc, items)
def _read_item(self, data):
itype, ilen = self._item_header.unpack_from(data, self._pos)
self._pos += 2
text = None
if ilen:
text = data[self._pos : self._pos + ilen].decode()
self._pos += ilen
return self._item(itype, ilen + 2, ilen, text)
def _get_chunk_size(self, chunk):
return 4 + max(4, sum(i.size for i in chunk.items)) # + padding?
# http://www.rfcreader.com/#rfc3550_line2311
class BYEPacket(RTCPPacket):
__slots__ = ('source_count', 'ssrcs', 'reason')
type = 203
def __init__(self, data):
super().__init__(data)
self.ssrcs = struct.unpack_from('>%sI' % self.source_count, data, 4)
self.reason = None
body_length = 4 + len(self.ssrcs) * 4
if len(data) > body_length:
extra_len = struct.unpack_from('B', data, body_length)[0]
reason = struct.unpack_from('%ss' % extra_len, data, body_length + 1)
self.reason = reason.decode()
# http://www.rfcreader.com/#rfc3550_line2353
class APPPacket(RTCPPacket):
__slots__ = ('subtype', 'ssrc', 'name', 'data')
_packet_info = struct.Struct('>I4s')
type = 204
def __init__(self, data):
super().__init__(data)
self.ssrc, name = self._packet_info.unpack_from(data, 4)
self.name = name.decode('ascii')
self.data = data[12:] # should be a multiple of 32 bits but idc
_rtcp_map = {
200: SenderReportPacket,
201: ReceiverReportPacket,
202: SDESPacket,
203: BYEPacket,
204: APPPacket,
}
@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import time
import logging
import threading
from .opus import VoiceData
from .rtp import SilencePacket
from discord.utils import MISSING
from discord.opus import Decoder
from typing import TYPE_CHECKING, Tuple
if TYPE_CHECKING:
from typing import Callable, Any, Dict, Optional, Final, Union
from .rtp import AudioPacket
from .types import MemberOrUser as User
SilenceGenFN = Callable[[Optional[User], VoiceData], Any]
SSRCData = Tuple[float, Optional[User], AudioPacket]
log = logging.getLogger(__name__)
__all__ = [
'SilenceGenerator',
]
SILENCE_PCM: Final = b'\0' * Decoder.FRAME_SIZE
PACKET_INTERVAL: Final = 0.02
class SilenceGenerator(threading.Thread):
"""Generates and sends silence packets."""
def __init__(self, callback: SilenceGenFN, *, grace_period: float = 0.015):
super().__init__(daemon=True, name=f'silencegen-{id(self):x}')
self.callback: SilenceGenFN = callback
self.grace_period: float = grace_period
self._ssrc_data: Dict[int, SSRCData] = {} # {ssrc: (time, _, _)}
self._last_timestamp: Dict[int, int] = {} # {ssrc: timestamp}
self._user_map_backup: Dict[int, int] = {} # {id: ssrc}
self._end: threading.Event = threading.Event()
self._has_data: threading.Event = threading.Event()
self._lock: threading.Lock = threading.Lock()
def push(self, user: Optional[User], packet: AudioPacket) -> None:
"""Updates the last time a packet was received and from whom.
Calling this function will start generating silence packets for `packet.ssrc`
until `drop(ssrc)` or `stop()` is called.
"""
with self._lock:
self._ssrc_data[packet.ssrc] = (time.perf_counter(), user, packet)
self._last_timestamp[packet.ssrc] = packet.timestamp
if user:
self._user_map_backup[user.id] = packet.ssrc
self._has_data.set()
def _get_next_info(self) -> SSRCData:
return min(self._ssrc_data.values())
def drop(self, *, ssrc: Optional[int] = None, user: User = MISSING) -> None:
"""Stop generating silence packets for `ssrc`, or whatever is cached for `user`
if `ssrc` is None, if any.
"""
with self._lock:
if ssrc is None:
ssrc = self._user_map_backup.pop(user.id, None)
if ssrc is None:
return # weird but ok
self._last_timestamp.pop(ssrc, None)
last_data = self._ssrc_data.pop(ssrc, None)
if last_data is None and user is not MISSING:
ssrc = self._user_map_backup.pop(user.id)
self._ssrc_data.pop(ssrc, None)
if not self._ssrc_data:
self._has_data.clear()
def stop(self) -> None:
"""Stops generating silence for everything and clears the cache."""
self._end.set()
self._has_data.set()
with self._lock:
self._ssrc_data.clear()
self._user_map_backup.clear()
self._last_timestamp.clear()
self._has_data.clear()
self.join(1)
def start(self) -> None:
self._end.clear()
super().start()
def run(self) -> None:
try:
self._do_run()
except Exception as e:
log.exception("Error in %s", self)
def _do_run(self) -> None:
while not self._end.is_set():
self._has_data.wait()
if self._end.is_set():
return
with self._lock:
tlast, user, packet = self._get_next_info()
ssrc = packet.ssrc
# prepare the object before the sleep as a little micro optimization
next_packet = SilencePacket(
ssrc, self._last_timestamp.get(ssrc, packet.timestamp) + Decoder.SAMPLES_PER_FRAME
)
# TODO: check if destination wants opus or not
next_data = VoiceData(next_packet, user, pcm=SILENCE_PCM)
tnext = tlast + PACKET_INTERVAL
tnow = time.perf_counter()
# wait a little bit longer than when the next one should be
# so we don't have to race with the next packet
delay = tnext + self.grace_period - tnow
if delay > 0:
time.sleep(delay)
with self._lock:
tlast2, luser, lpacket = self._ssrc_data.get(ssrc, (-1, None, packet))
if next_packet.ssrc != lpacket.ssrc or tlast != tlast2 or self._end.is_set():
continue # another packet came in and bumped up the time
next_data.source = luser # is there any point in doing this?
self.callback(luser, next_data)
with self._lock:
# If there was no packet update during the sleep...
if tlast == tlast2 and ssrc in self._ssrc_data:
# update the existing packet time for the next window
self._ssrc_data[ssrc] = (tlast + PACKET_INTERVAL, user, packet)
self._last_timestamp[ssrc] += Decoder.SAMPLES_PER_FRAME
@@ -0,0 +1,634 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import io
import abc
import time
import wave
import shlex
import inspect
import audioop
import logging
import threading
import subprocess
from .opus import VoiceData
from .silence import SilenceGenerator
import discord
from discord.utils import MISSING, SequenceProxy
from discord.opus import Decoder as OpusDecoder
from typing import TYPE_CHECKING, overload
if TYPE_CHECKING:
from typing import Callable, Optional, Any, IO, Sequence, Tuple, Generator, Union, Dict, List
from .rtp import AudioPacket, RTCPPacket
from .voice_client import VoiceRecvClient
from .opus import VoiceData
from .types import MemberOrUser as User
BasicSinkWriteCB = Callable[[Optional[User], VoiceData], Any]
BasicSinkWriteRTCPCB = Callable[[RTCPPacket], Any]
ConditionalFilterFn = Callable[[Optional[User], VoiceData], bool]
FFmpegErrorCB = Callable[['FFmpegSink', Exception, Optional[VoiceData]], Any]
log = logging.getLogger(__name__)
__all__ = [
'AudioSink',
'MultiAudioSink',
'BasicSink',
'WaveSink',
'FFmpegSink',
'PCMVolumeTransformer',
'ConditionalFilter',
'TimedFilter',
'UserFilter',
'SilenceGeneratorSink',
]
# TODO: use this in more places
class VoiceRecvException(discord.DiscordException):
"""Generic exception for voice recv related errors"""
def __init__(self, message: str):
self.message: str = message
class SinkMeta(abc.ABCMeta):
__sink_listeners__: List[Tuple[str, str]]
def __new__(cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any], **kwargs):
listeners: Dict[str, Any] = {}
new_cls = super().__new__(cls, name, bases, attrs, **kwargs)
for base in reversed(new_cls.__mro__):
for elem, value in base.__dict__.items():
# If it exists in a subclass, delete the higher level one
if elem in listeners:
del listeners[elem]
is_static_method = isinstance(value, staticmethod)
if is_static_method:
value = value.__func__
if not hasattr(value, '__sink_listener__'):
continue
listeners[elem] = value
listener_list = []
for listener in listeners.values():
for listener_name in listener.__sink_listener_names__:
listener_list.append((listener_name, listener.__name__))
new_cls.__sink_listeners__ = listener_list
return new_cls
class SinkABC(metaclass=SinkMeta):
__sink_listeners__: List[Tuple[str, str]]
@property
@abc.abstractmethod
def root(self) -> AudioSink:
raise NotImplementedError
@property
@abc.abstractmethod
def parent(self) -> Optional[AudioSink]:
raise NotImplementedError
@property
@abc.abstractmethod
def child(self) -> Optional[AudioSink]:
raise NotImplementedError
@property
@abc.abstractmethod
def children(self) -> Sequence[AudioSink]:
raise NotImplementedError
@property
@abc.abstractmethod
def voice_client(self) -> Optional[VoiceRecvClient]:
raise NotImplementedError
# handling opus vs pcm is not strictly mutually exclusive
# a sink could handle both but idk about that pattern
@abc.abstractmethod
def wants_opus(self) -> bool:
"""If sink handles opus data"""
raise NotImplementedError
@abc.abstractmethod
def write(self, user: Optional[User], data: VoiceData):
"""Callback for when the sink receives data"""
raise NotImplementedError
@abc.abstractmethod
def cleanup(self):
raise NotImplementedError
@abc.abstractmethod
def _register_child(self, child: AudioSink) -> None:
raise NotImplementedError
class AudioSink(SinkABC):
_voice_client: Optional[VoiceRecvClient]
_parent: Optional[AudioSink] = None
_child: Optional[AudioSink] = None
def __init__(self, destination: Optional[AudioSink] = None, /):
if destination is not None:
self._register_child(destination)
else:
self._child = None
def __del__(self):
self.cleanup()
def _register_child(self, child: AudioSink) -> None:
if child in self.root.walk_children():
raise RuntimeError('Sink is already registered.')
self._child = child
child._parent = self
@property
def root(self) -> AudioSink:
if self.parent is None:
return self
return self.parent.root
@property
def parent(self) -> Optional[AudioSink]:
return self._parent
@property
def child(self) -> Optional[AudioSink]:
return self._child
@property
def children(self) -> Sequence[AudioSink]:
return [self._child] if self._child else []
@property
def voice_client(self) -> Optional[VoiceRecvClient]:
"""Guaranteed to not be None inside write()"""
if self.parent is not None:
return self.parent.voice_client
else:
return self._voice_client
@property
def client(self) -> Optional[discord.Client]:
"""Guaranteed to not be None inside write()"""
return self.voice_client and self.voice_client.client
def walk_children(self, *, with_self: bool = False) -> Generator[AudioSink, None, None]:
"""Returns a generator of all the children of this sink, recursively, depth first."""
if with_self:
yield self
for child in self.children:
yield child
yield from child.walk_children()
@classmethod
def listener(cls, name: str = MISSING):
"""Marks a function as an event listener."""
if name is not MISSING and not isinstance(name, str):
raise TypeError(f'AudioSink.listener expected str but received {type(name).__name__} instead.')
def decorator(func):
actual = func
if isinstance(actual, staticmethod):
actual = actual.__func__
if inspect.iscoroutinefunction(actual):
raise TypeError('Listener function must not be a coroutine function.')
actual.__sink_listener__ = True
to_assign = name or actual.__name__
try:
actual.__sink_listener_names__.append(to_assign)
except AttributeError:
actual.__sink_listener_names__ = [to_assign]
return func
return decorator
class MultiAudioSink(AudioSink):
def __init__(self, destinations: Sequence[AudioSink], /):
# Intentionally not calling super().__init__ here
if destinations is not None:
for dest in destinations:
self._register_child(dest)
self._children: List[AudioSink] = list(destinations)
def _register_child(self, child: AudioSink) -> None:
if child in self.root.walk_children():
raise RuntimeError('Sink is already registered.')
child._parent = self
@property
def child(self) -> Optional[AudioSink]:
return self._children[0] if self._children else None
@property
def children(self) -> Sequence[AudioSink]:
return SequenceProxy(self._children)
# TODO: add functions to add/remove children?
class BasicSink(AudioSink):
"""Simple callback based sink."""
def __init__(
self,
event: BasicSinkWriteCB,
*,
rtcp_event: Optional[BasicSinkWriteRTCPCB] = None,
decode: bool = True,
):
super().__init__()
self.cb = event
self.cb_rtcp = rtcp_event
self.decode = decode
def wants_opus(self) -> bool:
return not self.decode
def write(self, user: Optional[User], data: VoiceData) -> None:
self.cb(user, data)
@AudioSink.listener()
def on_rtcp_packet(self, packet: RTCPPacket, guild: discord.Guild) -> None:
self.cb_rtcp(packet) if self.cb_rtcp else None
def cleanup(self) -> None:
pass
class WaveSink(AudioSink):
"""Endpoint AudioSink that generates a wav file.
Best used in conjunction with a silence generating sink. (TBD)
"""
CHANNELS = OpusDecoder.CHANNELS
SAMPLE_WIDTH = OpusDecoder.SAMPLE_SIZE // OpusDecoder.CHANNELS
SAMPLING_RATE = OpusDecoder.SAMPLING_RATE
def __init__(self, destination: wave._File):
super().__init__()
self._file: wave.Wave_write = wave.open(destination, 'wb')
self._file.setnchannels(self.CHANNELS)
self._file.setsampwidth(self.SAMPLE_WIDTH)
self._file.setframerate(self.SAMPLING_RATE)
def wants_opus(self) -> bool:
return False
def write(self, user: Optional[User], data: VoiceData) -> None:
self._file.writeframes(data.pcm)
def cleanup(self) -> None:
try:
self._file.close()
except Exception:
log.warning("WaveSink got error closing file on cleanup", exc_info=True)
WavSink = WaveSink
class FFmpegSink(AudioSink):
@overload
def __init__(
self,
*,
filename: str,
executable: str = 'ffmpeg',
stderr: Optional[IO[bytes]] = None,
before_options: Optional[str] = None,
options: Optional[str] = None,
on_error: Optional[FFmpegErrorCB] = None,
): ...
@overload
def __init__(
self,
*,
buffer: IO[bytes],
executable: str = 'ffmpeg',
stderr: Optional[IO[bytes]] = None,
before_options: Optional[str] = None,
options: Optional[str] = None,
on_error: Optional[FFmpegErrorCB] = None,
): ...
def __init__(
self,
*,
filename: str = MISSING,
buffer: IO[bytes] = MISSING,
executable: str = 'ffmpeg',
stderr: Optional[IO[bytes]] = None,
before_options: Optional[str] = None,
options: Optional[str] = None,
on_error: Optional[FFmpegErrorCB] = None,
):
super().__init__()
self.filename: str = filename or 'pipe:1'
self.buffer: IO[bytes] = buffer
self.on_error: FFmpegErrorCB = on_error or self._on_error
args = [executable, '-hide_banner']
subprocess_kwargs: Dict[str, Any] = {'stdin': subprocess.PIPE}
if self.buffer is not MISSING:
subprocess_kwargs['stdout'] = subprocess.PIPE
piping_stderr = False
if stderr is not None:
try:
stderr.fileno()
except Exception:
piping_stderr = True
subprocess_kwargs['stderr'] = subprocess.PIPE
if isinstance(before_options, str):
args.extend(shlex.split(before_options))
# fmt: off
args.extend((
'-f', 's16le',
'-ar', '48000',
'-ac', '2',
'-i', 'pipe:0',
'-loglevel', 'warning',
'-blocksize', str(discord.FFmpegAudio.BLOCKSIZE)
))
# fmt: on
if isinstance(options, str):
args.extend(shlex.split(options))
args.append(self.filename)
self._process: subprocess.Popen = MISSING
self._process = self._spawn_process(args, **subprocess_kwargs)
self._stdin: IO[bytes] = self._process.stdin # type: ignore
self._stdout: Optional[IO[bytes]] = None
self._stderr: Optional[IO[bytes]] = None
self._stdout_reader_thread: Optional[threading.Thread] = None
self._stderr_reader_thread: Optional[threading.Thread] = None
if self.buffer:
n = f'popen-stout-reader:pid-{self._process.pid}'
self._stdout = self._process.stdout
_args = (self._stdout, self.buffer)
self._stdout_reader_thread = threading.Thread(target=self._pipe_reader, args=_args, daemon=True, name=n)
self._stdout_reader_thread.start()
if piping_stderr:
n = f'popen-stderr-reader:pid-{self._process.pid}'
self._stderr = self._process.stderr
_args = (self._stderr, stderr)
self._stderr_reader_thread = threading.Thread(target=self._pipe_reader, args=_args, daemon=True, name=n)
self._stderr_reader_thread.start()
@staticmethod
def _on_error(_self: FFmpegSink, error: Exception, data: Optional[VoiceData]) -> None:
_self.voice_client.stop_listening() # type: ignore
def wants_opus(self) -> bool:
return False
def cleanup(self):
self._kill_process()
self._process = self._stdout = self._stdin = self._stderr = MISSING
def write(self, user: Optional[User], data: VoiceData):
if self._process and not self._stdin.closed:
audio = data.opus if self.wants_opus() else data.pcm
assert audio is not None
try:
self._stdin.write(audio)
except Exception as e:
log.exception('Error writing data to ffmpeg')
self._kill_process()
self.on_error(self, e, data)
def _spawn_process(self, args: Any, **subprocess_kwargs: Any) -> subprocess.Popen:
log.debug('Spawning ffmpeg process with command: %s, kwargs: %s', args, subprocess_kwargs)
process = None
try:
process = subprocess.Popen(args, creationflags=discord.player.CREATE_NO_WINDOW, **subprocess_kwargs)
except FileNotFoundError:
executable = args.partition(' ')[0] if isinstance(args, str) else args[0]
raise Exception(executable + ' was not found.') from None
except subprocess.SubprocessError as exc:
raise Exception(f'Popen failed: {exc.__class__.__name__}: {exc}') from exc
else:
return process
def _kill_process(self) -> None:
# this function gets called in __del__ so instance attributes might not even exist
proc: subprocess.Popen = getattr(self, '_process', MISSING)
if proc is MISSING:
return
log.debug('Terminating ffmpeg process %s.', proc.pid)
try:
self._stdin.close()
except Exception:
pass
# TODO: extract wait time
log.debug('Waiting for ffmpeg process %s for up to 5 seconds.', proc.pid)
try:
proc.wait(5)
except Exception:
pass
try:
proc.kill()
except Exception:
log.exception('Ignoring error attempting to kill ffmpeg process %s', proc.pid)
if proc.poll() is None:
log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid)
proc.communicate()
log.info('ffmpeg process %s should have terminated with a return code of %s.', proc.pid, proc.returncode)
else:
log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode)
self._process = MISSING
def _pipe_reader(self, source: IO[bytes], dest: IO[bytes]) -> None:
while self._process:
if source.closed:
return
try:
data = source.read(discord.FFmpegAudio.BLOCKSIZE)
except (OSError, ValueError) as e:
log.debug('FFmpeg stdin pipe closed: %s', e)
return
except Exception:
log.debug('Read error for %s, this is probably not a problem', self, exc_info=True)
return
if data is None:
return
try:
dest.write(data)
except Exception as e:
log.exception('Write error for %s', self)
self._kill_process()
self.on_error(self, e, None)
return
class PCMVolumeTransformer(AudioSink):
"""AudioSink used to change the volume of PCM data, just like
:class:`discord.PCMVolumeTransformer`.
"""
def __init__(self, destination: AudioSink, volume: float = 1.0):
if not isinstance(destination, AudioSink):
raise TypeError(f'expected AudioSink not {type(destination).__name__}')
if destination.wants_opus():
raise VoiceRecvException('AudioSink must not request Opus encoding.')
super().__init__(destination)
self.destination: AudioSink = destination
self._volume: float = volume
def wants_opus(self) -> bool:
return False
@property
def volume(self) -> float:
"""Retrieves or sets the volume as a floating point percentage (e.g. 1.0 for 100%)."""
return self._volume
@volume.setter
def volume(self, value: float):
self._volume = max(value, 0.0)
def write(self, user: Optional[User], data: VoiceData) -> None:
data.pcm = audioop.mul(data.pcm, 2, min(self._volume, 2.0))
self.destination.write(user, data)
def cleanup(self) -> None:
pass
class ConditionalFilter(AudioSink):
"""AudioSink for filtering packets based on an arbitrary predicate function."""
def __init__(self, destination: AudioSink, predicate: ConditionalFilterFn):
super().__init__(destination)
self.destination: AudioSink = destination
self.predicate: ConditionalFilterFn = predicate
def wants_opus(self) -> bool:
return self.destination.wants_opus()
def write(self, user: Optional[User], data: VoiceData) -> None:
if self.predicate(user, data):
self.destination.write(user, data)
def cleanup(self) -> None:
del self.predicate
class UserFilter(ConditionalFilter):
"""A convenience class for a User based ConditionalFilter."""
def __init__(self, destination: AudioSink, user: User):
super().__init__(destination, self._predicate)
self.user: User = user
def _predicate(self, user: Optional[User], data: VoiceData) -> bool:
return user == self.user
class TimedFilter(ConditionalFilter):
"""A convenience class for a timed ConditionalFilter."""
def __init__(self, destination: AudioSink, duration: float, *, start_on_init: bool = False):
super().__init__(destination, self.predicate)
self.duration: float = duration
self.start_time: Optional[float]
if start_on_init:
self.start_time = self.get_time()
else:
self.start_time = None
self.write = self._write_once
def _write_once(self, user: Optional[User], data: VoiceData):
self.start_time = self.get_time()
super().write(user, data)
self.write = super().write
def predicate(self, user: Optional[User], data: VoiceData) -> bool:
return self.start_time is not None and self.get_time() - self.start_time < self.duration
def get_time(self) -> float:
"""Function to generate a timestamp. Defaults to `time.perf_counter()`.
Can be overridden.
"""
return time.perf_counter()
class SilenceGeneratorSink(AudioSink):
"""Generates intermittent silence packets during transmission downtime."""
def __init__(self, destination: AudioSink):
super().__init__(destination)
self.destination: AudioSink = destination
self.silencegen: SilenceGenerator = SilenceGenerator(self.destination.write)
self.silencegen.start()
def wants_opus(self) -> bool:
return self.destination.wants_opus()
def write(self, user: Optional[User], data: VoiceData) -> None:
self.silencegen.push(user, data.packet)
self.destination.write(user, data)
@AudioSink.listener()
def on_voice_member_disconnect(self, member: discord.Member, ssrc: Optional[int]) -> None:
self.silencegen.drop(ssrc=ssrc, user=member)
def cleanup(self) -> None:
self.silencegen.stop()
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, List, Literal, Optional, TypedDict
from discord.types.snowflake import Snowflake
if TYPE_CHECKING:
from typing import Union
import discord
MemberOrUser = Union[discord.Member, discord.User]
ResolutionTypes = Literal['fixed', 'source']
StreamTypes = Literal['audio', 'video', 'screen', 'test'] # only video appears to be used
class VideoResolution(TypedDict):
height: int
width: int
type: ResolutionTypes
class VideoStream(TypedDict):
type: StreamTypes
active: bool
max_bitrate: int
max_framerate: int
max_resolution: VideoResolution
quality: int
rid: int
rtx_ssrc: int
ssrc: int
class VoiceVideoPayload(TypedDict):
audio_ssrc: int
video_ssrc: int
user_id: Snowflake
streams: list[VideoStream]
class VoiceClientConnectPayload(TypedDict):
user_ids: List[Snowflake]
class VoiceClientDisconnectPayload(TypedDict):
user_id: Snowflake
class VoiceFlagsPayload(TypedDict):
flags: Optional[int]
user_id: Snowflake
class VoicePlatformPayload(TypedDict):
platform: Optional[Literal[0, 1, 2, 3]]
user_id: Snowflake
@@ -0,0 +1,205 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import time
import threading
from collections import defaultdict
from typing import TYPE_CHECKING, Generic, TypeVar
if TYPE_CHECKING:
from typing import Callable, Sequence
TimeFunc = Callable[[], float]
_dataT = TypeVar("_dataT")
def gap_wrapped(a: int, b: int, *, wrap: int = 65536) -> int:
"""
Returns the gap between two numbers, acounting for unsigned integer wraparound.
"""
return (b - (a + 1) + wrap) % wrap
def add_wrapped(a: int, b: int, *, wrap: int = 65536) -> int:
"""
Returns the sum of two numbers, accounting for unsigned integer wraparound.
"""
return (a + b) % wrap
# May not even be needed if i dont use the dict subclasses
class Bidict(dict):
"""A bi-directional dict"""
_None = object()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super().update({v: k for k, v in self.items()})
def __setitem__(self, key, value):
# Delete related mappings
# if we have 1 <-> 2 and we set 2 <-> 3, 2 is now unrelated to 1
if key in self:
del self[key]
if value in self:
del self[value]
super().__setitem__(key, value)
super().__setitem__(value, key)
def __delitem__(self, key):
value = super().__getitem__(key)
super().__delitem__(value)
if key == value:
return
super().__delitem__(key)
def to_dict(self):
return super().copy()
def pop(self, k, d=_None):
try:
v = super().pop(k)
super().pop(v, d)
return v
except KeyError:
if d is not self._None:
return d
raise
def popitem(self):
item = super().popitem()
super().__delitem__(item[1])
return item
def setdefault(self, k, d=None):
try:
return self[k]
except KeyError:
if d in self:
return d
self[k] = d
return d
def update(self, *args, **F):
try:
E = args[0]
if callable(getattr(E, 'keys', None)):
for k in E:
self[k] = E[k]
else:
for k, v in E:
self[k] = v
except IndexError:
pass
finally:
for k in F:
self[k] = F[k]
def copy(self):
return self.__class__(super().copy())
# incompatible
# https://docs.python.org/3/library/exceptions.html#NotImplementedError, Note 1
fromkeys = None # type: ignore
class Defaultdict(defaultdict):
def __missing__(self, key):
if self.default_factory is None:
raise KeyError((key,))
self[key] = value = self.default_factory(key) # type: ignore
return value
class LoopTimer:
def __init__(self, delay: float, *, timefunc: TimeFunc = time.perf_counter):
self._delay: float = delay
self._time: TimeFunc = timefunc
self._start: float = 0
self._loops: int = 0
@property
def delay(self) -> float:
return self._delay
@property
def loops(self) -> int:
return self._loops
@property
def start_time(self) -> float:
return self._start
@property
def remaining_time(self) -> float:
next_time = self._start + self._delay * self._loops
return self._delay + (next_time - self._time())
def start(self) -> None:
self._loops = 0
self._start = self._time()
def mark(self) -> None:
self._loops += 1
def sleep(self) -> None:
time.sleep(max(0, self.remaining_time))
class MultiDataEvent(Generic[_dataT]):
"""
Something like the inverse of a Condition. A 1-waiting-on-N type of object,
with accompanying data object for convenience.
"""
def __init__(self):
self._items: list[_dataT] = []
self._ready: threading.Event = threading.Event()
@property
def items(self) -> list[_dataT]:
"""A shallow copy of the currently ready objects."""
return self._items.copy()
def is_ready(self) -> bool:
return self._ready.is_set()
def _check_ready(self) -> None:
if self._items:
self._ready.set()
else:
self._ready.clear()
def notify(self) -> None:
self._ready.set()
self._check_ready()
def wait(self, timeout: float | None = None) -> bool:
self._check_ready()
return self._ready.wait(timeout)
def register(self, item: _dataT) -> None:
self._items.append(item)
self._ready.set()
def unregister(self, item: _dataT) -> None:
try:
self._items.remove(item)
except ValueError:
pass
self._check_ready()
def clear(self) -> None:
self._items.clear()
self._ready.clear()
@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .types import (
VoiceVideoPayload,
VideoStream as VideoStreamPayload,
VideoResolution as VideoResolutionPayload,
)
from .voice_client import VoiceRecvClient
__all__ = [
'VoiceVideoStreams',
]
class VoiceVideoStreams:
__slots__ = (
'audio_ssrc',
'video_ssrc',
'member',
'streams',
)
def __init__(self, *, data: VoiceVideoPayload, vc: VoiceRecvClient):
self.audio_ssrc = data['audio_ssrc']
self.video_ssrc = data['video_ssrc']
self.member = vc.guild.get_member(int(data['user_id']))
self.streams = self._get_streams(data['streams'])
def __repr__(self) -> str:
return f"<VoiceVideoStreams member={self.member!s} streams={self._minify_streams()}>"
def _get_streams(self, data: list[VideoStreamPayload]) -> list[VideoStreamInfo]:
return [VideoStreamInfo(data=stream) for stream in data]
def _minify_streams(self) -> str:
streams = [f"<rid={s.rid} active={s.active}>" for s in self.streams]
return f"[{', '.join(streams)}]"
class VideoStreamInfo:
__slots__ = (
'type',
'active',
'max_bitrate',
'max_framerate',
'max_resolution',
'quality',
'rid',
'rtx_ssrc',
'ssrc',
)
def __init__(self, *, data: VideoStreamPayload):
self.type: str = data.get('type', 'video')
self.active = data['active']
self.max_bitrate = data.get('max_bitrate', 0)
self.max_framerate = data['max_framerate']
self.max_resolution = VideoStreamResolution(data['max_resolution'])
self.quality = data['quality']
self.rid = data['rid']
self.rtx_ssrc = data['rtx_ssrc']
self.ssrc = data['ssrc']
def __repr__(self) -> str:
attrs = [
('ssrc', self.ssrc),
('active', self.active),
('quality', self.quality),
('max_bitrate', self.max_bitrate),
('max_framerate', self.max_framerate),
('max_resolution', self.max_resolution),
]
inner = ' '.join('%s=%r' % t for t in attrs)
return f'<{self.__class__.__name__} {inner}>'
class VideoStreamResolution:
__slots__ = (
'height',
'width',
'type',
)
def __init__(self, data: VideoResolutionPayload):
self.height = data['height']
self.width = data['width']
self.type = data['type']
def __repr__(self) -> str:
return f"<VideoStreamResolution width={self.width!r} height={self.height!r} type={self.type!r}>"
@@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import time
import asyncio
import logging
import discord
from discord.voice_state import VoiceConnectionState
from discord.utils import MISSING
from typing import TYPE_CHECKING
from .gateway import hook
from .reader import AudioReader
from .sinks import AudioSink
if TYPE_CHECKING:
from typing import Optional, Dict, Any, Union
from discord.ext.commands._types import CoroFunc
from .reader import AfterCB
from pprint import pformat
__all__ = [
'VoiceRecvClient',
]
log = logging.getLogger(__name__)
class VoiceRecvClient(discord.VoiceClient):
endpoint_ip: str
voice_port: int
def __init__(self, client: discord.Client, channel: discord.abc.Connectable):
super().__init__(client, channel)
self._reader: AudioReader = MISSING
self._ssrc_to_id: Dict[int, int] = {}
self._id_to_ssrc: Dict[int, int] = {}
self._event_listeners: Dict[str, list] = {}
def create_connection_state(self) -> VoiceConnectionState:
return VoiceConnectionState(self, hook=hook)
async def on_voice_state_update(self, data) -> None:
old_channel_id = self.channel.id if self.channel else None
await super().on_voice_state_update(data)
log.debug("Got voice_client VSU: \n%s", pformat(data, compact=True))
# this can be None
try:
channel_id = int(data['channel_id'])
except TypeError:
return
# if we joined, left, or switched channels, reset the decoders
if self._reader and channel_id != old_channel_id:
log.debug("Destroying all decoders in guild %s", self.guild.id)
self._reader.packet_router.destroy_all_decoders()
def add_listener(self, func: CoroFunc, *, name: str = MISSING) -> None:
name = func.__name__ if name is MISSING else name
if not asyncio.iscoroutinefunction(func):
raise TypeError('Listeners must be coroutines')
if name in self._event_listeners:
self._event_listeners[name].append(func)
else:
self._event_listeners[name] = [func]
def remove_listener(self, func: CoroFunc, *, name: str = MISSING) -> None:
name = func.__name__ if name is MISSING else name
if name in self._event_listeners:
try:
self._event_listeners[name].remove(func)
except ValueError:
pass
async def _run_event(self, coro: CoroFunc, event_name: str, *args: Any, **kwargs: Any) -> None:
try:
await coro(*args, **kwargs)
except asyncio.CancelledError:
pass
except Exception:
log.exception("Error calling %s", event_name)
def _schedule_event(self, coro: CoroFunc, event_name: str, *args: Any, **kwargs: Any) -> asyncio.Task:
wrapped = self._run_event(coro, event_name, *args, **kwargs)
return self.client.loop.create_task(wrapped, name=f"ext.voice_recv: {event_name}")
def dispatch(self, event: str, /, *args: Any, **kwargs: Any) -> None:
log.debug("Dispatching voice_client event %s", event)
event_name = f"on_{event}"
for coro in self._event_listeners.get(event_name, []):
self._schedule_event(coro, event_name, *args, **kwargs)
self.dispatch_sink(event, *args, **kwargs)
self.client.dispatch(event, *args, **kwargs)
def dispatch_sink(self, event: str, /, *args: Any, **kwargs: Any) -> None:
if self._reader:
self._reader.event_router.dispatch(event, *args, **kwargs)
def cleanup(self) -> None:
# TODO: Does the order here matter?
super().cleanup()
self._event_listeners.clear()
self.stop()
def _add_ssrc(self, user_id: int, ssrc: int) -> None:
self._ssrc_to_id[ssrc] = user_id
self._id_to_ssrc[user_id] = ssrc
if self._reader:
self._reader.packet_router.set_user_id(ssrc, user_id)
def _remove_ssrc(self, *, user_id: int) -> None:
ssrc = self._id_to_ssrc.pop(user_id, None)
if ssrc:
self._reader.speaking_timer.drop_ssrc(ssrc)
self._ssrc_to_id.pop(ssrc, None)
def _get_ssrc_from_id(self, user_id: int) -> Optional[int]:
return self._id_to_ssrc.get(user_id)
def _get_id_from_ssrc(self, ssrc: int) -> Optional[int]:
return self._ssrc_to_id.get(ssrc)
def listen(self, sink: AudioSink, *, after: Optional[AfterCB] = None) -> None:
"""Receives audio into a :class:`AudioSink`."""
# TODO: more info
if not self.is_connected():
raise discord.ClientException('Not connected to voice.')
if not isinstance(sink, AudioSink):
raise TypeError('sink must be an AudioSink not {0.__class__.__name__}'.format(sink))
if self.is_listening():
raise discord.ClientException('Already receiving audio.')
self._reader = AudioReader(sink, self, after=after)
self._reader.start()
def is_listening(self) -> bool:
"""Indicates if we're currently receiving audio."""
return self._reader and self._reader.is_listening()
def stop_listening(self) -> None:
"""Stops receiving audio."""
if self._reader:
self._reader.stop()
self._reader = MISSING
def stop_playing(self) -> None:
"""Stops playing audio."""
if self._player:
self._player.stop()
self._player = None
def stop(self) -> None:
"""Stops playing and receiving audio."""
self.stop_playing()
self.stop_listening()
@property
def sink(self) -> Optional[AudioSink]:
return self._reader.sink if self._reader else None
@sink.setter
def sink(self, sink: AudioSink) -> None:
if not isinstance(sink, AudioSink):
raise TypeError('expected AudioSink not {0.__class__.__name__}.'.format(sink))
if not self._reader:
raise ValueError('Not receiving anything.')
self._reader.set_sink(sink)
def get_speaking(self, member: Union[discord.Member, discord.User]) -> Optional[bool]:
"""Returns if a member is speaking (approximately), or None if not found."""
ssrc = self._get_ssrc_from_id(member.id)
if ssrc is None:
return
if self._reader:
return self._reader.speaking_timer.get_speaking(ssrc)
+47
View File
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import discord
from discord.ext import commands, voice_recv
discord.opus._load_default()
bot = commands.Bot(command_prefix=commands.when_mentioned, intents=discord.Intents.all())
class Testing(commands.Cog):
def __init__(self, bot):
self.bot = bot
@commands.command()
async def test(self, ctx):
def callback(user, data: voice_recv.VoiceData):
print(f"Got packet from {user}")
## voice power level, how loud the user is speaking
# ext_data = packet.extension_data.get(voice_recv.ExtensionID.audio_power)
# value = int.from_bytes(ext_data, 'big')
# power = 127-(value & 127)
# print('#' * int(power * (79/128)))
## instead of 79 you can use shutil.get_terminal_size().columns-1
vc = await ctx.author.voice.channel.connect(cls=voice_recv.VoiceRecvClient)
vc.listen(voice_recv.BasicSink(callback))
@commands.command()
async def stop(self, ctx):
await ctx.voice_client.disconnect()
@commands.command()
async def die(self, ctx):
ctx.voice_client.stop()
await ctx.bot.close()
@bot.event
async def on_ready():
print('Logged in as {0.id}/{0}'.format(bot.user))
print('------')
@bot.event
async def setup_hook():
await bot.add_cog(Testing(bot))
bot.run("token")
+27
View File
@@ -0,0 +1,27 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.black]
line-length = 125
skip-string-normalization = true
[tool.isort]
profile = "black"
combine_as_imports = true
combine_star = true
line_length = 125
[tool.pyright]
include = [
"discord/ext/voice_recv",
]
exclude = [
"**/__pycache__",
"build",
"dist",
]
reportUnnecessaryTypeIgnoreComment = "warning"
# reportUnusedImport = "error"
pythonVersion = "3.8"
typeCheckingMode = "basic"
+1
View File
@@ -0,0 +1 @@
discord.py[voice]>=2.2.0
+70
View File
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
from setuptools import setup
import re
with open('discord/ext/voice_recv/__init__.py') as f:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', f.read(), re.MULTILINE).group(1) # type: ignore
if not version:
raise RuntimeError('version is not set')
if version.endswith(('a', 'b', 'rc')):
# append version identifier based on commit count
try:
import subprocess
p = subprocess.Popen(['git', 'rev-list', '--count', 'HEAD'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out:
version = version + out.decode('utf-8').strip()
except Exception:
pass
with open('README.md') as f:
readme = f.read()
extras_require = {
'extras_speech': [
'SpeechRecognition',
],
'extras_local': [
'pyaudio',
],
'extras': [
'SpeechRecognition',
'pyaudio',
],
}
setup(
name='discord-ext-voice_recv',
author='Imayhaveborkedit',
url='https://github.com/imayhaveborkedit/discord-ext-voice-recv',
version=version,
packages=['discord.ext.voice_recv', 'discord.ext.voice_recv.extras'],
license='MIT',
description='Experimental voice receive extension for discord.py',
long_description=readme,
long_description_content_type='text/markdown',
include_package_data=True,
python_requires='>=3.8',
install_requires=['discord.py[voice]>=2.5'],
extras_require=extras_require,
zip_safe=False,
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: MIT License',
'Intended Audience :: Developers',
'Natural Language :: English',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Operating System :: POSIX',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS',
'Topic :: Multimedia :: Sound/Audio :: Capture/Recording',
],
)
+21
View File
@@ -0,0 +1,21 @@
# Update notes
Notably, not a changelog, just notes.
## 0.5.2
- Adds `extras.localplayback` module
- Adds info about the extras modules to the readme
- Adds `WavSink` as an alias to `WaveSink`
- Fixed a member cleanup error in SpeechRecognitionSink
- Changes the optional dependency format
- Previously it was a single optional dep, `extras`. Now there is a dependency per module, with `extras` installing all of them. See the readme for details.
## 0.5.1
- Fixes a build process related error
- Changes `voice_recv.extras` import semantics
- The `__all__` contents of the extras modules are no longer `*` imported into `voice_recv.extras` (this was only `extras.SpeechRecognitionSink`). You will have to access them directly, or import that specific extra module. Example:
```py
from discord.ext.voice_recv.extras.speechrecognition import SpeechRecognitionSink
# or
from discord.ext.voice_recv.extras import speechrecognition
sink = speechrecognition.SpeechRecognitionSink(...)
```