#!/usr/bin/env python3
"""Send an audio file to PersonaPlex and save the voice response."""
import argparse
import asyncio
import struct
import numpy as np
import opuslib
import soundfile as sf
import websockets
# ---------------------------------------------------------------------------
# Wire protocol tags
# ---------------------------------------------------------------------------
TAG_HANDSHAKE = 0x00 # Server → Client: session ready
TAG_AUDIO = 0x01 # Both directions: Opus audio in Ogg container
TAG_TEXT = 0x02 # Both directions: UTF-8 text
TAG_CONTROL = 0x03 # Client → Server: stream boundary (eos)
# ---------------------------------------------------------------------------
# Ogg helpers — build a minimal Ogg/Opus bitstream from raw Opus frames
# ---------------------------------------------------------------------------
def _ogg_page(serial: int, granule: int, seq: int, bos: bool, eos: bool,
*segments: bytes) -> bytes:
"""Build one Ogg page containing the given segments."""
body = b"".join(segments)
seg_count = len(segments)
seg_table = bytes(len(s) for s in segments)
header_type = (0x02 if bos else 0x00) | (0x04 if eos else 0x00)
# Ogg page header (27 bytes + segment table + body), checksum patched below
header = struct.pack(
"<4sBBqIIIB",
b"OggS", # capture pattern
0, # version
header_type,
granule,
serial,
seq,
0, # checksum placeholder
seg_count,
)
page_no_crc = header + seg_table + body
crc = _ogg_crc(page_no_crc)
return page_no_crc[:22] + struct.pack("<I", crc) + page_no_crc[26:]
# CRC-32 lookup table used by the Ogg framing spec
_OGG_CRC_TABLE = None
def _ogg_crc(data: bytes) -> int:
global _OGG_CRC_TABLE
if _OGG_CRC_TABLE is None:
_OGG_CRC_TABLE = []
for i in range(256):
r = i << 24
for _ in range(8):
r = ((r << 1) ^ 0x04C11DB7) & 0xFFFFFFFF if r & 0x80000000 else (r << 1) & 0xFFFFFFFF
_OGG_CRC_TABLE.append(r)
crc = 0
for b in data:
crc = ((crc << 8) ^ _OGG_CRC_TABLE[((crc >> 24) & 0xFF) ^ b]) & 0xFFFFFFFF
return crc
def encode_audio_to_ogg_opus(pcm: np.ndarray, sample_rate: int = 24000,
frame_ms: int = 20) -> bytes:
"""Encode PCM float32 mono audio into an Ogg/Opus bytestream."""
# Resample to 24 kHz if needed
if sample_rate != 24000:
from fractions import Fraction
ratio = Fraction(24000, sample_rate)
n_out = int(len(pcm) * ratio)
indices = np.linspace(0, len(pcm) - 1, n_out)
pcm = np.interp(indices, np.arange(len(pcm)), pcm).astype(np.float32)
sample_rate = 24000
encoder = opuslib.Encoder(sample_rate, 1, opuslib.APPLICATION_VOIP)
frame_size = sample_rate * frame_ms // 1000 # samples per frame
serial = 1
seq = 0
# --- BOS page: OpusHead ---
opus_head = struct.pack("<8sBBHIhB", b"OpusHead", 1, 1, 312, 24000, 0, 0)
pages = _ogg_page(serial, 0, seq, bos=True, eos=False, opus_head)
seq += 1
# --- Comment page: OpusTags ---
vendor = b"PolarGrid"
opus_tags = struct.pack("<8sI", b"OpusTags", len(vendor)) + vendor + struct.pack("<I", 0)
pages += _ogg_page(serial, 0, seq, bos=False, eos=False, opus_tags)
seq += 1
# --- Audio pages ---
granule = 0
pcm_i16 = (np.clip(pcm, -1.0, 1.0) * 32767).astype(np.int16)
pos = 0
while pos < len(pcm_i16):
chunk = pcm_i16[pos : pos + frame_size]
if len(chunk) < frame_size:
chunk = np.pad(chunk, (0, frame_size - len(chunk)))
opus_frame = encoder.encode(chunk.tobytes(), frame_size)
granule += frame_size
is_last = pos + frame_size >= len(pcm_i16)
pages += _ogg_page(serial, granule, seq, bos=False, eos=is_last, opus_frame)
seq += 1
pos += frame_size
return pages
# ---------------------------------------------------------------------------
# Main client
# ---------------------------------------------------------------------------
async def run(api_key: str, audio_path: str, region: str, voice: str,
persona: str, output_path: str):
# Load and prepare audio
pcm, sr = sf.read(audio_path, dtype="float32", always_2d=True)
pcm = pcm[:, 0] # mono
print(f"Loaded {audio_path}: {len(pcm)/sr:.1f}s at {sr} Hz")
ogg_data = encode_audio_to_ogg_opus(pcm, sample_rate=sr)
print(f"Encoded to Ogg/Opus: {len(ogg_data)} bytes")
# Build WebSocket URL
from urllib.parse import quote
url = (
f"wss://api.{region}.edge.polargrid.ai/v1/voice/personaplex"
f"?voice={voice}&persona={quote(persona)}&token={api_key}"
)
response_audio = bytearray()
transcript_parts = []
# Connect — disable ping to avoid teardown (moshi does not pong)
async with websockets.connect(url, ping_interval=None,
max_size=None) as ws:
# Step 1: Wait for the handshake (0x00)
print("Waiting for handshake...")
msg = await ws.recv()
if isinstance(msg, bytes) and msg[0] == TAG_HANDSHAKE:
print("Handshake received — session is ready")
else:
raise RuntimeError(f"Expected handshake, got: {msg[:20]!r}")
# Step 2: Send audio frames with 0x01 tag prefix
CHUNK = 4096
for i in range(0, len(ogg_data), CHUNK):
frame = bytes([TAG_AUDIO]) + ogg_data[i : i + CHUNK]
await ws.send(frame)
print(f"Sent {len(ogg_data)} bytes of audio")
# Step 3: Send end-of-stream control frame
await ws.send(bytes([TAG_CONTROL]) + b"eos")
print("Sent eos — waiting for response...")
# Step 4: Receive response audio and transcript
try:
async for msg in ws:
if not isinstance(msg, bytes) or len(msg) < 1:
continue
tag = msg[0]
payload = msg[1:]
if tag == TAG_AUDIO:
response_audio.extend(payload)
elif tag == TAG_TEXT:
text = payload.decode("utf-8")
transcript_parts.append(text)
print(f" transcript: {text}")
except websockets.exceptions.ConnectionClosed:
pass
# Save response audio (raw Opus/Ogg — playable with ffplay, mpv, VLC)
if response_audio:
with open(output_path, "wb") as f:
f.write(response_audio)
print(f"\nSaved response audio to {output_path} ({len(response_audio)} bytes)")
if transcript_parts:
print(f"Full transcript: {''.join(transcript_parts)}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="PersonaPlex client example")
parser.add_argument("--api-key", required=True, help="Your pg_* API key")
parser.add_argument("--audio", required=True, help="Path to input audio file (WAV)")
parser.add_argument("--region", default="yto-01", help="Edge region (default: yto-01)")
parser.add_argument("--voice", default="NATF0", help="Voice ID (default: NATF0)")
parser.add_argument("--persona", default="A helpful voice assistant.",
help="Persona prompt")
parser.add_argument("--output", default="response.ogg", help="Output file path")
args = parser.parse_args()
asyncio.run(run(args.api_key, args.audio, args.region, args.voice,
args.persona, args.output))