AWS

セキュリティグループの棚卸しの実践

sho

はじめに

西藤です。

AWSクラウドを活用するに際して、ワークロードに必要な機能を実現するためにAWSのリソースを組み合わせて利用することが多いと思います。

その中でもセキュリティグループは、通信制御を司る重要な仕組みでありセキュリティを担保する上で欠かせないものですが、ワークロードが複雑になるにつれて、その数も増えていき管理が大変になります。

私が携わった業務でも「セキュリティグループのルールを把握したい。何か一覧で見やすい方法はないだろうか。」というニーズに対面することがありました。
今回はそういった課題を解決するために、セキュリティグループの棚卸しを行うための仕組みづくりを実践したので、その内容を紹介します。

前提条件

今回の実践では、以下のような前提条件を設定します。

  1. AWS ConfigにてAWSリソースの構成情報が収集されている
  2. AWS Configの収集対象にセキュリティグループが含まれている
  3. セキュリティグループのルール単位で、どのような通信が許可されているかを把握ためのリストを出力したい
  4. Ingress/Egressの両方を対象とする

各セキュリティグループのルールを把握するためには、AWS VPCの管理画面で詳細を見ることは可能です。
しかし、対象が増えてくるとマネジメントコンソール画面だけでは効率が悪く、現実的な対応として、Excelなどに出力して整理したいというニーズが出てきます。

今回は、そのニーズに応えるためのリスト出力を目指します。

アーキテクチャ

実装される仕組みのアーキテクチャは以下のようになります。

番号ごとに以下のような仕組みです。

(1)AWS Configに収集されているセキュリティグループルール情報を取得

(2)(1)で取得した情報を元に、セキュリティグループのルールの整形されたリストをcsvファイルとしてS3に出力

(3)(2)でS3に配備されたcsvファイルのpre-signed URLを取得し、webhookでチャットツール(今回はSlack)に通知

(4)(3)で通知されたURLをクリックすることで、csvファイルをダウンロードできる

実装内容

上記の構成を実現するためにAWS LambdaをServerless Frameworkで実装します。

ディレクトリ構成

Serverless Frameworkでデプロイするためのファイル群のディレクトリ構成は以下の通りです。

project
├── .env
├── handler.py
└── serverless.yml

コード内容

各ファイルの内容は以下の通りです。

.env

S3_BUCKET=<S3のバケット名を指定>
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/xxxxxxxxxxx # SlackのWebhook URLを指定

handler.py

import json
import boto3
import csv
import os
import logging
from urllib.request import Request
from urllib.error import HTTPError
from urllib.error import URLError
from urllib.request import urlopen


# loggerを定義する
logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_bucket = os.environ['S3_BUCKET']
slack_webhook_url = os.environ['SLACK_WEBHOOK_URL']


def get_security_group_ids():
    client = boto3.client('config')
    response = client.list_discovered_resources(
        resourceType='AWS::EC2::SecurityGroup'
    )
    security_group_ids = []
    for resource in response['resourceIdentifiers']:
        security_group_ids.append(resource['resourceId'])
    return security_group_ids


def get_security_group_rules(security_group_ids):
    client = boto3.client('ec2')
    security_group_rules = []
    response = client.describe_security_groups(
        GroupIds=security_group_ids
    )
    security_groups = response['SecurityGroups']
    for security_group in security_groups:
        for ip_permission in security_group['IpPermissions']:
            SecurityGroupId = security_group['GroupId']
            SecurityGroupName = security_group['GroupName']
            Description = security_group['Description']
            IngressFromPort = ip_permission.get('FromPort', "All")
            IngressToPort = ip_permission.get('ToPort', "All")
            if len(ip_permission['IpRanges']) != 0:
                IngressIpRanges = ip_permission['IpRanges'][0]['CidrIp']
            else:
                IngressIpRanges = ""
            if len(ip_permission['UserIdGroupPairs']) != 0:
                IngressUserIdGroupPairs = ip_permission['UserIdGroupPairs'][0]['GroupId']
            else:
                IngressUserIdGroupPairs = ""
            security_group_rule = {
                'SecurityGroupId': SecurityGroupId,
                'SecurityGroupName': SecurityGroupName,
                'Description': Description,
                'RuleType': 'Ingress',
                'IngressFromPort': IngressFromPort,
                'IngressToPort': IngressToPort,
                'IngressIpRanges': IngressIpRanges,
                'IngressSecurityGroupId': IngressUserIdGroupPairs,
                'EgressFromPort': "",
                'EgressToPort': "",
                'EgressIpRanges': "",
                'EgressSecurityGroupId': ""
            }
            security_group_rules.append(security_group_rule)
        for ip_permission in security_group['IpPermissionsEgress']:
            SecurityGroupId = security_group['GroupId']
            SecurityGroupName = security_group['GroupName']
            Description = security_group['Description']
            EgressFromPort = ip_permission.get('FromPort', "All")
            EgressToPort = ip_permission.get('ToPort', "All")
            if len(ip_permission['IpRanges']) != 0:
                EgressIpRanges = ip_permission['IpRanges'][0]['CidrIp']
            else:
                EgressIpRanges = ""
            if len(ip_permission['UserIdGroupPairs']) != 0:
                EgressUserIdGroupPairs = ip_permission['UserIdGroupPairs'][0]['GroupId']
            else:
                EgressUserIdGroupPairs = ""
            security_group_rule = {
                'SecurityGroupId': SecurityGroupId,
                'SecurityGroupName': SecurityGroupName,
                'Description': Description,
                'RuleType': 'Egress',
                'IngressFromPort': "",
                'IngressToPort': "",
                'IngressIpRanges': "",
                'IngressSecurityGroupId': "",
                'EgressFromPort': EgressFromPort,
                'EgressToPort': EgressToPort,
                'EgressIpRanges': EgressIpRanges,
                'EgressSecurityGroupId': EgressUserIdGroupPairs
            }
            security_group_rules.append(security_group_rule)
    return security_group_rules


def generate_s3_presigned_url(security_group_rules):
    # CSVに変換する
    with open('/tmp/security_group_rules.csv', 'w') as f:
        writer = csv.DictWriter(f, fieldnames=security_group_rules[0].keys())
        writer.writeheader()
        writer.writerows(security_group_rules)
    # S3にアップロードする
    s3 = boto3.resource('s3')
    s3.meta.client.upload_file('/tmp/security_group_rules.csv',
                               s3_bucket,
                               'security_group_rules.csv')
    # S3バケットからダウンロードするためのURLを取得する
    url = s3.meta.client.generate_presigned_url(
        ClientMethod='get_object',
        Params={
            'Bucket': s3_bucket,
            'Key': 'security_group_rules.csv',
        },
        ExpiresIn=300,
    )
    return url


def notify_to_slack(url, aws_account_id, region):
    # ダウンロード用のURLをSlackのWebhook URLにPOSTする
    slack_message = {
        'text': "AWSアカウントID: " + aws_account_id + "\nリージョン: " + region + "\nにおけるセキュリティグループルールの一覧です。",
        'attachments': [
            {
                'title': "ダウンロード用のURL(5分間のみ有効)",
                'title_link': url,
                'color': '#2eb886'
            }
        ]
    }
    req = Request(slack_webhook_url, json.dumps(slack_message).encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("メッセージを %s に投稿しました", slack_webhook_url)
    except HTTPError as e:
        logger.error("リクエストが失敗しました: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("サーバーへの接続が失敗しました: %s", e.reason)
    return


def main(event, context):
    # AWS ConfigからセキュリティグループIDを取得する
    security_group_ids = get_security_group_ids()
    # セキュリティグループルールを取得する
    security_group_rules = get_security_group_rules(security_group_ids)
    # S3に保存する
    url = generate_s3_presigned_url(security_group_rules)
    # 現在のAWSアカウントIDを取得する
    aws_account_id = context.invoked_function_arn.split(":")[4]
    # 現在のリージョンを取得する
    region = context.invoked_function_arn.split(":")[3]
    # Slackに通知する
    notify_to_slack(url, aws_account_id, region)

    return {
        'statusCode': 200
    }

serverless.yml

service: security-group
frameworkVersion: "3"

useDotenv: true

provider:
  name: aws
  runtime: python3.9
  stage: dev
  region: ap-northeast-1
  iam:
    role:
      statements:
        - Effect: "Allow"
          Action:
            - "config:ListDiscoveredResources"
            - "ec2:DescribeSecurityGroups"
            - "s3:PutObject"
            - "s3:GetObject"
            - "config:DescribeConfigRules"
          Resource: "*"

functions:
  list-from-config:
    handler: handler.main
    environment:
      S3_BUCKET: ${env:S3_BUCKET}
      SLACK_WEBHOOK_URL: ${env:SLACK_WEBHOOK_URL}

実行結果

今回作られたLambdaを実行した際の結果を見ていきます。

まず、Lambdaを実行すると、以下のようにSlackに通知が届きます。

Slackに通知されたURLをクリックすると、以下のようにcsvファイルがダウンロードできます。

ダウンロードしたcsvファイルをExcelで開くと、以下のようにセキュリティグループのルールを一覧できます。

1行に対して1つのルールが記載されているようになっており、CIDRブロックの他、セキュリティグループIDで、inbound/outboundを指定している際には、そのセキュリティグループIDを表示するようにしました。

各列の内容は以下の通りです。

以下はセキュリティグループの棚卸しに使用するテーブルの形式です。

フィールド名説明
SecurityGroupIdルールが設定されているセキュリティグループID
SecurityGroupNameルールが設定されているセキュリティグループ名
Descriptionセキュリティグループの説明欄の内容
RuleTypeIngress/Egressのどちらか
IngressFromPortIngressルールのFromポート番号
IngressToPortIngressルールのToポート番号
IngressIpRangesIngressルールでCIDRブロックを指定している場合のCIDRブロック
IngressSecurityGroupIdIngressルールでセキュリティグループIDを指定している場合のセキュリティグループID
EgressFromPortEgressルールのFromポート番号
EgressToPortEgressルールのToポート番号
EgressIpRangesEgressルールでCIDRブロックを指定している場合のCIDRブロック
EgressSecurityGroupIdEgressルールでセキュリティグループIDを指定している場合のセキュリティグループID

日常業務においてセキュリティグループルールの棚卸しする際には、AWSマネジメントコンソール上で完結するのが理想ではありますが、現実的にはこのようにExcelなどの表計算ツールで整理することが見込まれます。その際に便利になるようにするための工夫として、このような形式にしました。

まとめ

以上、セキュリティグループの棚卸しを実践するための仕組みを紹介しました。
簡易的なものではありますが、業務上の現実的なニーズに応えることができるものになっていると思います。

AWS Configの構成情報を活用した作りになっておりますので、AWS Config Ruleとの適合状況なども付加しながら、より実用的にセキュリティグループの棚卸しをできればと思います。

本記事が、AWSクラウドのセキュリティ担当者の業務の効率化に少しでも役立てば幸いです。

AUTHOR
sho
sho
記事URLをコピーしました