AWS

内部専用環境で安全に実現する音声合成サーバーレスシステム

内山浩佑

はじめに

本記事では、Slack のスラッシュコマンドからのリクエストを受け、VOICEVOX エンジンで音声合成を行い、その結果をSlackにアップロードするシステムを、AWS の各種サービス(ECS、Lambda、SQS、Secrets Manager など)を活用して構築した事例をご紹介します。

特に、外部にエンドポイントが公開されない内部専用のネットワーク構成、VPCエンドポイントを利用したSQSアクセス、さらにSlackリクエストの署名検証など、セキュリティ面にも十分な配慮を行っています。

システム全体の構成

主な構成要素

Slackコマンド処理用Lambda

Slack のスラッシュコマンドからリクエストを受信。受信後、API Gateway 経由で実行される API Lambda が署名検証を行い、リクエスト内容を SQS に投入します。

SQS と 音声合成用Lambda

SQS キューに投入されたメッセージをトリガーとして、音声合成用Lambda が起動し、VOICEVOX エンジン(ECS Fargate 上)へリクエストを送り、音声合成を実施。生成された音声ファイルは、Slack へアップロードされ、結果がSlackに通知されます。また、入力テキストの長さチェックやエラー時の通知も実装しています。

VOICEVOXエンジン(ECS Fargate)

https://voicevox.hiroshiba.jp/

VOICEVOXは、無料で使えるテキスト読み上げソフトウェアです。OSは、Windows/Mac/Linuxに対応しています。今回は、「VOICEVOX: ずんだもん」の音声を利用した例となっています。

今回の構成では、Docker コンテナで稼働する VOICEVOX エンジンを ECS Fargate でデプロイ。内部向けのALBを利用して、直接インターネットにエンドポイントが公開されないようにしています。

Secrets Manager & Lambda Layers

Slack Bot Token と Slack 署名シークレットは Secrets Manager により安全に管理。また、Python の依存ライブラリ(requests、slack_sdk)は Lambda Layer で管理し、デプロイメントパッケージのサイズ軽減と管理の効率化を実現しています。

AWS構成とセキュリティ対策

VPC・サブネット・ALBの内部配置

VPC構成では、パブリックサブネットとプライベートサブネットを作成し、ECSタスクとLambda関数はプライベートサブネット内で実行することで、直接外部に公開されない構成にしています。

内部ALBは内部専用に配置。これにより、VoiceVoxエンジンのエンドポイントはVPC内からのみアクセス可能です。

ALB用セキュリティグループは、VPC内からのHTTP(ポート80)アクセスのみを許可。ECSタスク用セキュリティグループは、ALBからのアクセス(ポート50021)のみを許可しています。

VPC Interface Endpointを追加することで、Lambda関数からSQSへの通信がインターネットを経由せずに行われるようになります。

Secrets ManagerとLambdaの連携

SecretsManagerには、Slack Bot TokenとSlack署名シークレットを登録。Lambda関数では、環境変数にシークレットのARNを設定し、boto3を利用して安全に取得します。

Lambda Layersを使用し、Python依存ライブラリをレイヤーとして管理。これにより、各Lambda関数は必要なライブラリ(requests、slack_sdk)を含む形で実行されます。

Slack署名検証

API Lambdaでは、Slackからのリクエストヘッダーに含まれる署名(X-Slack-Signature)とタイムスタンプ(X-Slack-Request-Timestamp)を検証し、不正なリクエストを弾く仕組みを実装しています。署名シークレットはSecrets Managerから取得するため、安全性が向上しています。

Slackの実行例とLambda実装コードの紹介

Slackでの利用フロー

1. ユーザーがSlack上でSlashコマンドを入力

例: /zundamonize これはテキストをずんだもんの音声にするアプリなのだ

このとき、Slackは、API Gateway 経由でSlackコマンド処理用Lambdaに対してリクエストを送信します。

2. Slackコマンド処理用Lambdaの動作

Slackからのリクエストは、まず署名検証が行われ、不正なリクエストは拒否されます。

正当なリクエストの場合、リクエスト内容(テキスト、チャンネルID、response_url)がSQSキューに投入され、Slackには「変換開始なのだ」とエフェメラルメッセージで即時応答されます。

3. 音声合成用Lambdaの動作

SQSにメッセージが投入されると、音声合成用Lambdaがトリガーされ、VOICEVOXエンジン(ECS Fargate上)に対して変換リクエストを行います。

変換結果の音声ファイルはSlackにアップロードされ、成功または失敗の結果がSlackに通知されます。

以下は、実際の実行結果イメージです。

Lambda 実装コードの紹介

Slackコマンド処理用Lambda

import json
import urllib.parse
import os
import boto3
import uuid
import time
import hmac
import hashlib

sqs = boto3.client('sqs')
QUEUE_URL = os.environ.get('SQS_QUEUE_URL')

def get_slack_signing_secret():
    # Secrets ManagerからSlack署名シークレットを取得する処理(コード例は別途実装)
    # ここでは環境変数等で取得したARNをもとに、Secrets Managerからシークレット内容を取得する想定です
    # 詳細はLambda側の署名検証実装を参照
    pass

def verify_slack_request(event):
    headers = event.get("headers", {})
    slack_signature = headers.get("X-Slack-Signature") or headers.get("x-slack-signature")
    timestamp = headers.get("X-Slack-Request-Timestamp") or headers.get("x-slack-request-timestamp")
    if not slack_signature or not timestamp:
        raise Exception("Missing Slack signature or timestamp.")
    if abs(time.time() - int(timestamp)) > 300:
        raise Exception("Request timestamp is too old.")
    body = event.get("body", "")
    basestring = f"v0:{timestamp}:{body}"
    # シークレット取得処理(詳細は実装済みの関数を利用)
    signing_secret = get_slack_signing_secret()
    computed_signature = "v0=" + hmac.new(
        signing_secret.encode('utf-8'),
        basestring.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()
    if not hmac.compare_digest(computed_signature, slack_signature):
        raise Exception("Invalid Slack signature.")

def lambda_handler(event, context):
    try:
        verify_slack_request(event)
    except Exception as e:
        return {
            "statusCode": 403,
            "body": json.dumps({"error": str(e)})
        }
    
    parsed_body = urllib.parse.parse_qs(event.get("body", ""))
    text = parsed_body.get("text", [None])[0]
    channel_id = parsed_body.get("channel_id", [None])[0]
    response_url = parsed_body.get("response_url", [None])[0]
    
    if not text or not channel_id or not response_url:
        return {
            "statusCode": 400,
            "body": "Missing required parameters."
        }
    
    message_body = json.dumps({
        "text": text,
        "channel_id": channel_id,
        "response_url": response_url,
        "request_id": str(uuid.uuid4())
    })
    
    sqs.send_message(QueueUrl=QUEUE_URL, MessageBody=message_body)
    
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({
            "response_type": "ephemeral",
            "text": "変換開始なのだ"
        })
    }

Slackのリクエストを受信した後、署名検証(実装例は一部省略)を行い、必要パラメータ(text, channel_id, response_url)をパースして、SQSにメッセージを投入します。署名検証では、Secrets ManagerからSlack署名シークレットを取得し、リクエストの正当性を検証しています。

音声合成用Lambda

import json
import os
import requests
import boto3
import secrets
import string
from urllib.parse import urlparse, urlunparse
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

def generate_random_filename(length=12):
    allowed_chars = string.ascii_letters + string.digits
    return ''.join(secrets.choice(allowed_chars) for _ in range(length))

def get_base_url_without_port(url: str) -> str:
    parsed = urlparse(url)
    new_netloc = parsed.hostname
    new_parts = parsed._replace(netloc=new_netloc)
    return urlunparse(new_parts)

def get_slack_token():
    secret_arn = os.environ.get("SLACK_SECRET_ARN")
    if not secret_arn:
        raise Exception("SLACK_SECRET_ARN is not configured.")
    client = boto3.client("secretsmanager")
    response = client.get_secret_value(SecretId=secret_arn)
    secret_string = response.get("SecretString")
    if not secret_string:
        raise Exception("SecretString is empty.")
    secret = json.loads(secret_string)
    token = secret.get("token")
    if not token:
        raise Exception("Token not found in secret.")
    return token

def convert_text_to_voicevox_audio(text):
    base_url = os.environ.get("VOICEVOX_API_URL")
    if not base_url:
        raise Exception("VOICEVOX_API_URL is not configured.")
    base_url = get_base_url_without_port(base_url)
    
    params = {"text": text, "speaker": 1}
    query_resp = requests.post(f"{base_url}/audio_query", params=params, timeout=30)
    if query_resp.status_code != 200:
        raise Exception("audio_query failed: " + query_resp.text)
    query_json = query_resp.json()
    
    synthesis_params = {"speaker": params["speaker"]}
    synthesis_resp = requests.post(f"{base_url}/synthesis", params=synthesis_params, json=query_json, timeout=60)
    if synthesis_resp.status_code != 200:
        raise Exception("synthesis failed: " + synthesis_resp.text)
    return synthesis_resp.content

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['body'])
        text = payload.get("text")
        channel_id = payload.get("channel_id")
        response_url = payload.get("response_url")
        
        # 長すぎるテキストの場合はエラー通知
        if len(text) > 200:
            error_message = "テキストが長すぎるのだ"
            try:
                requests.post(response_url, json={"replace_original": True, "text": error_message})
            except Exception as e:
                print("Error sending text-length error message to Slack:", e)
            continue
        
        try:
            requests.post(response_url, json={"text": "変換中なのだ...", "replace_original": False})
        except Exception as e:
            print("Error posting in-progress message:", e)
        
        try:
            audio_data = convert_text_to_voicevox_audio(text)
        except Exception as e:
            error_message = f"変換失敗: {str(e)}"
            print(error_message)
            try:
                requests.post(response_url, json={"replace_original": True, "text": error_message})
            except Exception as slack_err:
                print("Error sending error message to Slack:", slack_err)
            continue
        
        try:
            slack_token = get_slack_token()
        except Exception as e:
            error_message = "Slack token retrieval failed: " + str(e)
            print(error_message)
            try:
                requests.post(response_url, json={"replace_original": True, "text": error_message})
            except Exception as slack_err:
                print("Error sending token error message to Slack:", slack_err)
            continue
        
        client = WebClient(token=slack_token)
        random_filename = f"{generate_random_filename()}.wav"
        try:
            client.files_upload_v2(
                channel=channel_id,
                file=audio_data,
                filename=random_filename,
                title="ずんだもん",
                initial_comment=text
            )
        except SlackApiError as e:
            error_message = f"Slack upload failed: {e.response['error']}"
            print(error_message)
            try:
                requests.post(response_url, json={"replace_original": True, "text": error_message})
            except Exception as err:
                print("Error sending Slack upload error message:", err)
            continue
        
        try:
            requests.post(response_url, json={"replace_original": True, "text": "変換完了!"})
        except Exception as e:
            print("Error updating message:", e)
            
    return {"statusCode": 200, "body": "Processed"}

SQSからトリガーされるLambda関数です。入力テキストが200文字を超える場合は、エラーメッセージ「テキストが長すぎるのだ」をSlackに送信し、処理を中断します。正常な場合は、VOICEVOXのAPI(audio_query と synthesis)をQUERY_STRINGで呼び出して音声変換を実行し、生成された音声データをSlackにアップロードします。ファイル名は、ランダムな12文字+拡張子”.wav”となるように生成しています。

AWS CDKの実装コード

以下は、AWS CDKを用いて実装した VoiceVoxSlackStack クラスのソースコードです。このコードでは、VPC、ECS、ALB、SQS、Secrets Manager、およびLambda関数を構成しています。

export class VoiceVoxSlackStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // VPCの作成:PublicとPrivateサブネットを作成
    const vpc = new ec2.Vpc(this, 'VoiceVoxVpc', {
      maxAzs: 2,
      subnetConfiguration: [
        {
          name: 'Public',
          subnetType: ec2.SubnetType.PUBLIC,
          cidrMask: 24,
        },
        {
          name: 'Private',
          subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
          cidrMask: 24,
        },
      ],
    });

    // SQSへのアクセス用にVPC Gateway Endpointを追加(これによりLambdaはインターネット経由せずSQSにアクセス可能)
    vpc.addInterfaceEndpoint('SQSEndpoint', {
      service: ec2.InterfaceVpcEndpointAwsService.SQS,
    });
    vpc.addInterfaceEndpoint('SecretsManagerEndpoint', {
      service: ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
    });

    // ALB用セキュリティグループ(内部からのアクセスのみ許可)
    const albSG = new ec2.SecurityGroup(this, 'ALBSG', {
      vpc,
      description: 'Allow internal HTTP traffic on port 80',
      allowAllOutbound: true,
    });
    // VPC内からのHTTPアクセスのみ許可(必要に応じてCIDRを絞り込む)
    albSG.addIngressRule(ec2.Peer.ipv4(vpc.vpcCidrBlock), ec2.Port.tcp(80), 'Allow internal HTTP traffic');

    // ECSタスク用セキュリティグループ(ALBからのアクセスのみを許可)
    const ecsSG = new ec2.SecurityGroup(this, 'ECSSG', {
      vpc,
      description: 'Allow traffic from ALB on port 50021',
      allowAllOutbound: true,
    });
    ecsSG.addIngressRule(albSG, ec2.Port.tcp(50021), 'Allow ALB access to ECS tasks on port 50021');

    // ECSクラスタの作成
    const cluster = new ecs.Cluster(this, 'VoiceVoxCluster', { vpc });

    // Fargateサービス:VoiceVoxエンジンを内部ALB経由でデプロイ
    const voiceVoxService = new ecs_patterns.ApplicationLoadBalancedFargateService(this, 'VoiceVoxService', {
      cluster,
      desiredCount: 1,
      cpu: 1024,
      memoryLimitMiB: 2048,
      taskImageOptions: {
        image: ecs.ContainerImage.fromRegistry('voicevox/voicevox_engine'),
        containerPort: 50021,
        enableLogging: true,
      },
      // ALBを内部向けに設定
      publicLoadBalancer: false,
      listenerPort: 80,
      securityGroups: [ecsSG],
    });
    // ALBに対して内部用のセキュリティグループを追加
    voiceVoxService.loadBalancer.addSecurityGroup(albSG);
    const voiceVoxEndpoint = voiceVoxService.loadBalancer.loadBalancerDnsName;

    // Secrets Manager: Slack Bot Token用シークレット
    const slackBotSecret = new secretsmanager.Secret(this, 'SlackBotTokenSecret', {
      secretName: 'SlackBotToken',
      generateSecretString: {
        secretStringTemplate: JSON.stringify({ token: '<YOUR_SLACK_BOT_TOKEN>' }),
        generateStringKey: 'token',
      },
    });

    // Secrets Manager: Slack署名シークレット用シークレット
    const slackSigningSecret = new secretsmanager.Secret(this, 'SlackSigningSecret', {
      secretName: 'SlackSigningSecret',
      generateSecretString: {
        secretStringTemplate: JSON.stringify({ signing_secret: '<YOUR_SLACK_SIGNING_SECRET>' }),
        generateStringKey: 'signing_secret',
        excludePunctuation: true,
      },
    });

    // SQSキュー(非同期処理用)
    const slackQueue = new sqs.Queue(this, 'SlackCommandQueue', {
      visibilityTimeout: cdk.Duration.hours(1),
    });

    // API Lambda(同期処理):SlackのSlashコマンド受信後、SQSに投入し即時応答
    const slackApiLambda = new lambda.Function(this, 'SlackCommandApiLambda', {
      runtime: lambda.Runtime.PYTHON_3_9,
      handler: 'api_lambda.lambda_handler',
      code: lambda.Code.fromAsset('lambda/api'),
      vpc,
      vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_NAT },
      environment: {
        SQS_QUEUE_URL: slackQueue.queueUrl,
        SLACK_SIGNING_SECRET_ARN: slackSigningSecret.secretArn,
      },
      timeout: cdk.Duration.seconds(10),
    });
    slackSigningSecret.grantRead(slackApiLambda);
    slackQueue.grantSendMessages(slackApiLambda);

    // Lambdaレイヤー: requestsライブラリ
    const requestsLayer = new lambda.LayerVersion(this, 'RequestsLayer', {
      code: lambda.Code.fromAsset(path.join(__dirname, '..', 'lambda_package', 'requests', 'requests_layer.zip')),
      compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
      description: 'A layer containing the requests library',
    });

    // Lambdaレイヤー: slack_sdkライブラリ
    const slackLayer = new lambda.LayerVersion(this, 'SlackLayer', {
      code: lambda.Code.fromAsset(path.join(__dirname, '..', 'lambda_package', 'slack_sdk', 'slack_layer.zip')),
      compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
      description: 'A layer containing the slack_sdk library',
    });

    // Processor Lambda(非同期処理):SQSトリガーで処理実行
    const slackProcessorLambda = new lambda.Function(this, 'SlackVoiceVoxProcessorLambda', {
      runtime: lambda.Runtime.PYTHON_3_9,
      handler: 'processor_lambda.lambda_handler',
      code: lambda.Code.fromAsset('lambda/processor'),
      layers: [requestsLayer, slackLayer],
      vpc,
      vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_NAT },
      environment: {
        // VOICEVOX_API_URLは内部ALBのDNS名を使い、ポート指定は不要(リスナーは80)
        VOICEVOX_API_URL: `http://${voiceVoxEndpoint}`,
        SLACK_SECRET_ARN: slackBotSecret.secretArn,
      },
      timeout: cdk.Duration.minutes(15),
    });
    slackBotSecret.grantRead(slackProcessorLambda);
    slackProcessorLambda.addEventSource(new lambdaEventSources.SqsEventSource(slackQueue));

    // API Gateway:Slackのスラッシュコマンド用エンドポイント(API LambdaはVPC内にあるが、NAT経由で外部と通信可能)
    new apigateway.LambdaRestApi(this, 'SlackCommandApi', {
      handler: slackApiLambda,
      restApiName: 'SlackVoiceVoxApi',
      proxy: true,
    });

    new cdk.CfnOutput(this, 'VoiceVoxEndpoint', {
      value: voiceVoxEndpoint,
      description: '内部ALBのDNS名(VoiceVoxエンジン用)',
    });
  }
}

まとめ

今回のシステムは、Slackとの連携からVOICEVOXによる音声合成、そして生成された音声ファイルのSlackアップロードまでを、厳格なセキュリティ対策のもと内部専用ネットワークで実現しました。

  • 内部環境構築: ECS、Lambda、ALBをすべてプライベートサブネット内で運用し、VPC Interface EndpointによりSQSへのアクセスも安全に
  • Secrets Managerによる認証情報管理: Slack Bot Tokenおよび署名シークレットはSecrets Managerで管理し、Lambda関数から安全に参照
  • Slack署名検証とエラーハンドリング: API Lambdaでの署名検証により不正リクエストを排除し、入力テキストの長さチェックやエラー発生時の通知も実装
  • 実装の工夫: 入力テキストの長さチェック、ランダムなファイル名生成、エラーハンドリングなど、実用性と安全性を両立

この記事が、セキュリティと柔軟性を両立したAWS構成でのシステム構築の参考になれば幸いです。

AUTHOR
uchiko
uchiko
記事URLをコピーしました