Skip to Content
🚀 Voxtra v0.3.1 is live. Read the docs
VoxtraReferenceRecordingSink

RecordingSink

from voxtra.recording import ( RecordingSink, RecordingMetadata, LocalFileSink, WebhookSink, CompositeSink, ) # Also re-exported from the top-level package: from voxtra import RecordingSink, WebhookSink

Pluggable destinations for finished call recordings. Voxtra writes recordings via Asterisk; what’s pluggable is what happens after the recording stops.

RecordingMetadata

@dataclass class RecordingMetadata: session_id: str name: str file_path: str = "" duration_seconds: float | None = None format: str = "wav" extra: dict[str, Any] = field(default_factory=dict)
FieldNotes
session_idThe owning CallSession.id.
nameRecording name passed to record_start.
file_pathBest-effort guess — /var/spool/asterisk/recording/{name}.{format}.
duration_secondsWall-clock seconds between start and stop (None if unknown).
formatAudio format (e.g. wav, gsm).
extraArbitrary key-value extension data.

RecordingSink (ABC)

class RecordingSink(ABC): @abstractmethod async def on_recording_complete(self, metadata: RecordingMetadata) -> None: ...

Implementations should be idempotent — sinks may be invoked more than once for the same recording during reconnects or retries.

Implementations should never raise into the call pipeline. The framework wraps every call in a try/except as a safety net, but well-behaved sinks catch their own errors and log them.

LocalFileSink

LocalFileSink(base_dir: str | Path = "/var/spool/asterisk/recording")

Default no-op sink — logs a debug line and exits. Use as a placeholder when recording is on but you don’t yet have an upload path.

from voxtra.recording import LocalFileSink app = VoxtraApp(..., recording_sink=LocalFileSink())

WebhookSink

WebhookSink( url: str, *, signing_secret: str = "", timeout_seconds: float = 10.0, http_client: httpx.AsyncClient | None = None, )

POSTs the recording metadata as JSON to url. The receiver fetches the audio (via Asterisk’s HTTP recording endpoint, scp, or other means) and uploads it to long-term storage.

from voxtra.recording import WebhookSink sink = WebhookSink( "https://api.example.com/webhooks/recording", signing_secret="please-rotate-me", )

Payload

{ "session_id": "ch-1234", "name": "voxtra-rec-...", "file_path": "/var/spool/asterisk/recording/voxtra-rec-....wav", "duration_seconds": 42.3, "format": "wav", "extra": {} }

Headers

HeaderValue
Content-Typeapplication/json
X-Voxtra-Signaturehmac_sha256(secret, body) (when secret set)

await sink.aclose()

Close the underlying HTTP client (only if owned).

CompositeSink

CompositeSink(*sinks: RecordingSink)

Fan a single recording event out to multiple sinks. Per-sink errors are isolated, so a slow or broken sink can’t starve the others.

from voxtra.recording import CompositeSink, WebhookSink, LocalFileSink sink = CompositeSink( LocalFileSink(), WebhookSink("https://crm.example.com/recordings"), WebhookSink("https://archive.example.com/recordings"), )

Custom sinks

Any class that implements on_recording_complete(metadata) works:

from voxtra.recording import RecordingSink, RecordingMetadata class MySink(RecordingSink): async def on_recording_complete(self, metadata: RecordingMetadata) -> None: # Upload to your storage, kick off transcription, ... ...

For a working GCS implementation, see the Recording guide.

Wiring

App-level default

app = VoxtraApp(..., recording_sink=my_sink)

Every session inherits this sink. Set on the app, not per-session, when you want all recordings to follow the same pipeline.

Per-call override

@app.default() async def handle(call): await call.answer() await call.record_start(sink=PrioritySink()) # overrides the app default ... await call.record_stop()

Lifecycle

PhaseSink behaviour
record_start()Sink is captured but not invoked.
Mid-callSink is dormant.
record_stop()Sink is invoked once with RecordingMetadata.
Hangup mid-recordingIf recording is still active, framework calls record_stop (and the sink) as part of cleanup.
Last updated on