Twilio Streamingデータを用いたユーザー発話へのリアルタイム音声処理

はじめに

こんにちは、AIチームの杉山です。
前回の記事では、インターネット経由で電話による通信を行うことのできるTwilio社のサービスでの録音データを用いたユーザー発話への音声処理の方法を紹介しましたが、今回は通話中のユーザーの発話に対しリアルタイムに音声認識などの処理を行うためのデータハンドリングについて紹介します。

Twilioでの音声ストリーミングデータの取得

TwilioはWeb API経由で電話の発着信などを行うことのできるサービスです。
電話着信時やSMSの送信などTwilioのサービスとやり取りを行うには、TwiML(Twilio Markup Language)という専用のMarkup Languageを使用しますが、その要素である<Stream>を設定することで通話からWebSocket経由でリアルタイムに音声ストリームを受信することができます。
また、今回はTwilioから一方的にデータを取得するのではなく双方向ストリーミングにすることとします。その場合、通話開始時に通常用いられる<Start>ではなく<Connect>要素でのストリーミングの設定が必要になります。それぞれの要素はXML形式で指定することもできますが、Twilio SDKでコードからも指定することができます。

なお今回の記事でも前回の記事同様Python(+FastAPI)+Twilio clientでコード例を示します。

まず、Twilioに着信があった際のWebHookを指定します。
WebHookで設定したURLへのリクエストを受け付けるエンドポイントを作成し、その中で上記の<Stream><Connect>の設定を行います。その際、WebSocket通信のためのエンドポイントの指定が必要になるため、以降でそちらも作成します。

from twilio.twiml.voice_response import Connect, Stream, VoiceResponse

@router.post("YOUR_WEBHOOK_ENDPOINT")
async def request_webhook():
    response = VoiceResponse()
    connect = Connect()
    stream = Stream(url="wss://YOUR_WSS_SERVER")

    connect.append(stream)
    response.append(connect)

    return Response(
        content=str(response),
        status_code=200,
        headers={"Content-Type": "text/html"},
    )

以降、通話による通信データはStream()に指定したURLにWebSocket経由で流れてきます。
受信したデータはConnected, Start, Media, Stop, (Mark)というeventを持ちます。
Connectedでは繋がったという情報、その直後にStartで通信のメタデータに関する情報が得られるので、必要であればそれぞれの情報で処理を分岐するなどができ、実際の音声データはMedia eventのpayloadとして取得できます。

from starlette.websockets import WebSocket, WebSocketState

@router.websocket("YOUR_WSS_SERVER_URL")
async def receive_websocket(ws: WebSocket) -> None:
    await ws.accept()

    # 初回はdata["event"] == "connected"
    data = await ws.receive_json()

    # その直後にdata["event"] == "start"
    data = await ws.receive_json()

    while ws.application_state == WebSocketState.CONNECTED:
        # 以降、WebSocketが繋がっている間は基本的にdata["event"] == "media"
        data = await ws.receive_json()
        raw_audio = data["media"]["payload"]

        # stop eventを受信した場合は接続を閉じる
        if data["event"] == "stop":
            break

取得した音声はBase64エンコーディングされているため、必要に応じてデコードして使用します。

import base64 

wave = bytes(base64.b64decode(raw_audio))

これでストリーミングで流れてくる音声データを取得し、任意の音声処理を適用することができるようになりました。例えば、自作のストリーミング発話区間認識モデルを適用したり、並行してキューなどに溜めていた音声から発話区間の音声に対し終話判定を行ったタイミングで任意のstaticな音声認識モデルをセミリアルタイムに適用する、といったことなどができるようになりました。

双方向ストリーミングでのTwilioへのデータ送信

タイトルからは少し脱線しますが、双方向ストリーミングの場合、Media, Mark, Clearの3eventをアプリケーション側からTwilioに与えることができます。アプリケーション側で合成した応答メッセージの音声を送る際は、音声を受け取るときと同様Media eventのpayloadに設定することで実現できます。その際、音声は電話の形式に合わせてaudio/x-mulaw形式、サンプリングレート=8000、base64エンコードする必要があります。
音声の送信が完了すると音声が再生されます。また、Clear eventを送ることでその再生をキャンセルすることができます。各種event送信後に任意の名前を付けたMark eventを送信することで、各種event終了後にその名前のついたMark eventを受信できるので、これにより通話を続けるかどうかなどの状態を管理することができます。

import audioop
import json

fragment = "YOUR_AUDIO_DATA" # numpy.frombufferなどで読み込んだ音声配列
width = "サンプル幅"

ulaw_data = audioop.lin2ulaw(fragment, width)
payload = base64.b64encode(ulaw_data).decode("ascii")

out_media = json.dumps({
    "event": "media",
    "media": {"payload": payload},
    "streamSid": STREAMING_SID,
})
await ws.send_text(out_media)

out_mark = json.dumps(
    {
        "event": "mark",
        "streamSid": STREAMING_SID,
        "mark": {"name": "任意の文字列"},
    }
)
await ws.send_text(out_mark)

終わりに

今回の記事では、Twilioとの双方向ストリーミングの中で流れてくるユーザー音声の扱い方と、アプリケーション側から音声を流す方法を紹介しました。
前回の記事と合わせて、static/dynamicに任意の音声処理を行うことができるようになりましたが、実際には音声の通信を行うアプリケーションと、各種機械学習などの重たい処理を行うものは別インフラ/アプリケーションになっていることも多いと思います。我々もGPU on GKEでの推論のためにgRPCでストリーミング音声の通信を行なっていますが、そこでもいくつかハマるポイントがあったため機会があればそちらも紹介したいと思います。                 

参考

https://www.twilio.com/docs/voice/twiml/stream

PICK UP

TAG