from __future__ import annotations import logging import wave from pathlib import Path from typing import Callable from discord.opus import Decoder, OpusError from discord.ext import voice_recv log = logging.getLogger(__name__) class OpusWaveSink(voice_recv.AudioSink): """Decode raw opus packets per-SSRC and write PCM to a wav file.""" def __init__(self, output_path: str): super().__init__() self.output_path = output_path self._file: wave.Wave_write = wave.open(output_path, "wb") self._file.setnchannels(Decoder.CHANNELS) self._file.setsampwidth(Decoder.SAMPLE_SIZE // Decoder.CHANNELS) self._file.setframerate(Decoder.SAMPLING_RATE) self._decoders: dict[int, Decoder] = {} self._error_counts: dict[int, int] = {} def wants_opus(self) -> bool: return True def write(self, user, data) -> None: packet = data.packet ssrc = getattr(packet, "ssrc", 0) decoder = self._decoders.setdefault(ssrc, Decoder()) try: if packet: opus_bytes = data.opus if not opus_bytes: return pcm = decoder.decode(opus_bytes, fec=False) else: pcm = decoder.decode(None, fec=False) except OpusError as exc: count = self._error_counts.get(ssrc, 0) + 1 self._error_counts[ssrc] = count if count in {1, 10, 100}: log.warning( "Dropping corrupted opus packet for ssrc %s: %s (count=%s)", ssrc, exc, count, ) return self._file.writeframes(pcm) def cleanup(self) -> None: try: self._file.close() except Exception: log.warning("OpusWaveSink got error closing file on cleanup", exc_info=True) class MeetingRecorder: """Wrapper around discord-ext-voice-recv's listen/stop_listening API.""" def __init__(self, voice_client: voice_recv.VoiceRecvClient, output_path: str): self.vc = voice_client self.output_path = output_path self.recording = False self.sink: OpusWaveSink | None = None async def start(self, after_callback: Callable[[Exception | None], None]) -> None: if self.vc.is_listening(): raise RuntimeError("Voice client is already listening") Path(self.output_path).parent.mkdir(parents=True, exist_ok=True) self.sink = OpusWaveSink(self.output_path) self.vc.listen(self.sink, after=after_callback) self.recording = True async def stop(self) -> None: if not self.recording: return if self.vc.is_listening(): self.vc.stop_listening() self.recording = False