セキュリティグループの棚卸しの実践
はじめに
西藤です。
AWSクラウドを活用するに際して、ワークロードに必要な機能を実現するためにAWSのリソースを組み合わせて利用することが多いと思います。
その中でもセキュリティグループは、通信制御を司る重要な仕組みでありセキュリティを担保する上で欠かせないものですが、ワークロードが複雑になるにつれて、その数も増えていき管理が大変になります。
私が携わった業務でも「セキュリティグループのルールを把握したい。何か一覧で見やすい方法はないだろうか。」というニーズに対面することがありました。
今回はそういった課題を解決するために、セキュリティグループの棚卸しを行うための仕組みづくりを実践したので、その内容を紹介します。
前提条件
今回の実践では、以下のような前提条件を設定します。
- AWS ConfigにてAWSリソースの構成情報が収集されている
- AWS Configの収集対象にセキュリティグループが含まれている
- セキュリティグループのルール単位で、どのような通信が許可されているかを把握ためのリストを出力したい
- Ingress/Egressの両方を対象とする
各セキュリティグループのルールを把握するためには、AWS VPCの管理画面で詳細を見ることは可能です。
しかし、対象が増えてくるとマネジメントコンソール画面だけでは効率が悪く、現実的な対応として、Excelなどに出力して整理したいというニーズが出てきます。
今回は、そのニーズに応えるためのリスト出力を目指します。
アーキテクチャ
実装される仕組みのアーキテクチャは以下のようになります。
番号ごとに以下のような仕組みです。
(1)AWS Configに収集されているセキュリティグループルール情報を取得
(2)(1)で取得した情報を元に、セキュリティグループのルールの整形されたリストをcsvファイルとしてS3に出力
(3)(2)でS3に配備されたcsvファイルのpre-signed URLを取得し、webhookでチャットツール(今回はSlack)に通知
(4)(3)で通知されたURLをクリックすることで、csvファイルをダウンロードできる
実装内容
上記の構成を実現するためにAWS LambdaをServerless Frameworkで実装します。
ディレクトリ構成
Serverless Frameworkでデプロイするためのファイル群のディレクトリ構成は以下の通りです。
project
├── .env
├── handler.py
└── serverless.yml
コード内容
各ファイルの内容は以下の通りです。
.env
S3_BUCKET=<S3のバケット名を指定>
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/xxxxxxxxxxx # SlackのWebhook URLを指定
handler.py
import json
import boto3
import csv
import os
import logging
from urllib.request import Request
from urllib.error import HTTPError
from urllib.error import URLError
from urllib.request import urlopen
# loggerを定義する
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_bucket = os.environ['S3_BUCKET']
slack_webhook_url = os.environ['SLACK_WEBHOOK_URL']
def get_security_group_ids():
client = boto3.client('config')
response = client.list_discovered_resources(
resourceType='AWS::EC2::SecurityGroup'
)
security_group_ids = []
for resource in response['resourceIdentifiers']:
security_group_ids.append(resource['resourceId'])
return security_group_ids
def get_security_group_rules(security_group_ids):
client = boto3.client('ec2')
security_group_rules = []
response = client.describe_security_groups(
GroupIds=security_group_ids
)
security_groups = response['SecurityGroups']
for security_group in security_groups:
for ip_permission in security_group['IpPermissions']:
SecurityGroupId = security_group['GroupId']
SecurityGroupName = security_group['GroupName']
Description = security_group['Description']
IngressFromPort = ip_permission.get('FromPort', "All")
IngressToPort = ip_permission.get('ToPort', "All")
if len(ip_permission['IpRanges']) != 0:
IngressIpRanges = ip_permission['IpRanges'][0]['CidrIp']
else:
IngressIpRanges = ""
if len(ip_permission['UserIdGroupPairs']) != 0:
IngressUserIdGroupPairs = ip_permission['UserIdGroupPairs'][0]['GroupId']
else:
IngressUserIdGroupPairs = ""
security_group_rule = {
'SecurityGroupId': SecurityGroupId,
'SecurityGroupName': SecurityGroupName,
'Description': Description,
'RuleType': 'Ingress',
'IngressFromPort': IngressFromPort,
'IngressToPort': IngressToPort,
'IngressIpRanges': IngressIpRanges,
'IngressSecurityGroupId': IngressUserIdGroupPairs,
'EgressFromPort': "",
'EgressToPort': "",
'EgressIpRanges': "",
'EgressSecurityGroupId': ""
}
security_group_rules.append(security_group_rule)
for ip_permission in security_group['IpPermissionsEgress']:
SecurityGroupId = security_group['GroupId']
SecurityGroupName = security_group['GroupName']
Description = security_group['Description']
EgressFromPort = ip_permission.get('FromPort', "All")
EgressToPort = ip_permission.get('ToPort', "All")
if len(ip_permission['IpRanges']) != 0:
EgressIpRanges = ip_permission['IpRanges'][0]['CidrIp']
else:
EgressIpRanges = ""
if len(ip_permission['UserIdGroupPairs']) != 0:
EgressUserIdGroupPairs = ip_permission['UserIdGroupPairs'][0]['GroupId']
else:
EgressUserIdGroupPairs = ""
security_group_rule = {
'SecurityGroupId': SecurityGroupId,
'SecurityGroupName': SecurityGroupName,
'Description': Description,
'RuleType': 'Egress',
'IngressFromPort': "",
'IngressToPort': "",
'IngressIpRanges': "",
'IngressSecurityGroupId': "",
'EgressFromPort': EgressFromPort,
'EgressToPort': EgressToPort,
'EgressIpRanges': EgressIpRanges,
'EgressSecurityGroupId': EgressUserIdGroupPairs
}
security_group_rules.append(security_group_rule)
return security_group_rules
def generate_s3_presigned_url(security_group_rules):
# CSVに変換する
with open('/tmp/security_group_rules.csv', 'w') as f:
writer = csv.DictWriter(f, fieldnames=security_group_rules[0].keys())
writer.writeheader()
writer.writerows(security_group_rules)
# S3にアップロードする
s3 = boto3.resource('s3')
s3.meta.client.upload_file('/tmp/security_group_rules.csv',
s3_bucket,
'security_group_rules.csv')
# S3バケットからダウンロードするためのURLを取得する
url = s3.meta.client.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': s3_bucket,
'Key': 'security_group_rules.csv',
},
ExpiresIn=300,
)
return url
def notify_to_slack(url, aws_account_id, region):
# ダウンロード用のURLをSlackのWebhook URLにPOSTする
slack_message = {
'text': "AWSアカウントID: " + aws_account_id + "\nリージョン: " + region + "\nにおけるセキュリティグループルールの一覧です。",
'attachments': [
{
'title': "ダウンロード用のURL(5分間のみ有効)",
'title_link': url,
'color': '#2eb886'
}
]
}
req = Request(slack_webhook_url, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("メッセージを %s に投稿しました", slack_webhook_url)
except HTTPError as e:
logger.error("リクエストが失敗しました: %d %s", e.code, e.reason)
except URLError as e:
logger.error("サーバーへの接続が失敗しました: %s", e.reason)
return
def main(event, context):
# AWS ConfigからセキュリティグループIDを取得する
security_group_ids = get_security_group_ids()
# セキュリティグループルールを取得する
security_group_rules = get_security_group_rules(security_group_ids)
# S3に保存する
url = generate_s3_presigned_url(security_group_rules)
# 現在のAWSアカウントIDを取得する
aws_account_id = context.invoked_function_arn.split(":")[4]
# 現在のリージョンを取得する
region = context.invoked_function_arn.split(":")[3]
# Slackに通知する
notify_to_slack(url, aws_account_id, region)
return {
'statusCode': 200
}
serverless.yml
service: security-group
frameworkVersion: "3"
useDotenv: true
provider:
name: aws
runtime: python3.9
stage: dev
region: ap-northeast-1
iam:
role:
statements:
- Effect: "Allow"
Action:
- "config:ListDiscoveredResources"
- "ec2:DescribeSecurityGroups"
- "s3:PutObject"
- "s3:GetObject"
- "config:DescribeConfigRules"
Resource: "*"
functions:
list-from-config:
handler: handler.main
environment:
S3_BUCKET: ${env:S3_BUCKET}
SLACK_WEBHOOK_URL: ${env:SLACK_WEBHOOK_URL}
実行結果
今回作られたLambdaを実行した際の結果を見ていきます。
まず、Lambdaを実行すると、以下のようにSlackに通知が届きます。
Slackに通知されたURLをクリックすると、以下のようにcsvファイルがダウンロードできます。
ダウンロードしたcsvファイルをExcelで開くと、以下のようにセキュリティグループのルールを一覧できます。
1行に対して1つのルールが記載されているようになっており、CIDRブロックの他、セキュリティグループIDで、inbound/outboundを指定している際には、そのセキュリティグループIDを表示するようにしました。
各列の内容は以下の通りです。
以下はセキュリティグループの棚卸しに使用するテーブルの形式です。
フィールド名 | 説明 |
---|---|
SecurityGroupId | ルールが設定されているセキュリティグループID |
SecurityGroupName | ルールが設定されているセキュリティグループ名 |
Description | セキュリティグループの説明欄の内容 |
RuleType | Ingress/Egressのどちらか |
IngressFromPort | IngressルールのFromポート番号 |
IngressToPort | IngressルールのToポート番号 |
IngressIpRanges | IngressルールでCIDRブロックを指定している場合のCIDRブロック |
IngressSecurityGroupId | IngressルールでセキュリティグループIDを指定している場合のセキュリティグループID |
EgressFromPort | EgressルールのFromポート番号 |
EgressToPort | EgressルールのToポート番号 |
EgressIpRanges | EgressルールでCIDRブロックを指定している場合のCIDRブロック |
EgressSecurityGroupId | EgressルールでセキュリティグループIDを指定している場合のセキュリティグループID |
日常業務においてセキュリティグループルールの棚卸しする際には、AWSマネジメントコンソール上で完結するのが理想ではありますが、現実的にはこのようにExcelなどの表計算ツールで整理することが見込まれます。その際に便利になるようにするための工夫として、このような形式にしました。
まとめ
以上、セキュリティグループの棚卸しを実践するための仕組みを紹介しました。
簡易的なものではありますが、業務上の現実的なニーズに応えることができるものになっていると思います。
AWS Configの構成情報を活用した作りになっておりますので、AWS Config Ruleとの適合状況なども付加しながら、より実用的にセキュリティグループの棚卸しをできればと思います。
本記事が、AWSクラウドのセキュリティ担当者の業務の効率化に少しでも役立てば幸いです。