RecordingSink
from voxtra.recording import (
RecordingSink,
RecordingMetadata,
LocalFileSink,
WebhookSink,
CompositeSink,
)
# Also re-exported from the top-level package:
from voxtra import RecordingSink, WebhookSinkPluggable destinations for finished call recordings. Voxtra writes recordings via Asterisk; what’s pluggable is what happens after the recording stops.
RecordingMetadata
@dataclass
class RecordingMetadata:
session_id: str
name: str
file_path: str = ""
duration_seconds: float | None = None
format: str = "wav"
extra: dict[str, Any] = field(default_factory=dict)| Field | Notes |
|---|---|
session_id | The owning CallSession.id. |
name | Recording name passed to record_start. |
file_path | Best-effort guess — /var/spool/asterisk/recording/{name}.{format}. |
duration_seconds | Wall-clock seconds between start and stop (None if unknown). |
format | Audio format (e.g. wav, gsm). |
extra | Arbitrary key-value extension data. |
RecordingSink (ABC)
class RecordingSink(ABC):
@abstractmethod
async def on_recording_complete(self, metadata: RecordingMetadata) -> None:
...Implementations should be idempotent — sinks may be invoked more than once for the same recording during reconnects or retries.
Implementations should never raise into the call pipeline. The framework wraps every call in a try/except as a safety net, but well-behaved sinks catch their own errors and log them.
LocalFileSink
LocalFileSink(base_dir: str | Path = "/var/spool/asterisk/recording")Default no-op sink — logs a debug line and exits. Use as a placeholder when recording is on but you don’t yet have an upload path.
from voxtra.recording import LocalFileSink
app = VoxtraApp(..., recording_sink=LocalFileSink())WebhookSink
WebhookSink(
url: str,
*,
signing_secret: str = "",
timeout_seconds: float = 10.0,
http_client: httpx.AsyncClient | None = None,
)POSTs the recording metadata as JSON to url. The receiver fetches
the audio (via Asterisk’s HTTP recording endpoint, scp, or other
means) and uploads it to long-term storage.
from voxtra.recording import WebhookSink
sink = WebhookSink(
"https://api.example.com/webhooks/recording",
signing_secret="please-rotate-me",
)Payload
{
"session_id": "ch-1234",
"name": "voxtra-rec-...",
"file_path": "/var/spool/asterisk/recording/voxtra-rec-....wav",
"duration_seconds": 42.3,
"format": "wav",
"extra": {}
}Headers
| Header | Value |
|---|---|
Content-Type | application/json |
X-Voxtra-Signature | hmac_sha256(secret, body) (when secret set) |
await sink.aclose()
Close the underlying HTTP client (only if owned).
CompositeSink
CompositeSink(*sinks: RecordingSink)Fan a single recording event out to multiple sinks. Per-sink errors are isolated, so a slow or broken sink can’t starve the others.
from voxtra.recording import CompositeSink, WebhookSink, LocalFileSink
sink = CompositeSink(
LocalFileSink(),
WebhookSink("https://crm.example.com/recordings"),
WebhookSink("https://archive.example.com/recordings"),
)Custom sinks
Any class that implements on_recording_complete(metadata) works:
from voxtra.recording import RecordingSink, RecordingMetadata
class MySink(RecordingSink):
async def on_recording_complete(self, metadata: RecordingMetadata) -> None:
# Upload to your storage, kick off transcription, ...
...For a working GCS implementation, see the Recording guide.
Wiring
App-level default
app = VoxtraApp(..., recording_sink=my_sink)Every session inherits this sink. Set on the app, not per-session, when you want all recordings to follow the same pipeline.
Per-call override
@app.default()
async def handle(call):
await call.answer()
await call.record_start(sink=PrioritySink()) # overrides the app default
...
await call.record_stop()Lifecycle
| Phase | Sink behaviour |
|---|---|
record_start() | Sink is captured but not invoked. |
| Mid-call | Sink is dormant. |
record_stop() | Sink is invoked once with RecordingMetadata. |
| Hangup mid-recording | If recording is still active, framework calls record_stop (and the sink) as part of cleanup. |