AWS

Amazon ECRのイメージスキャン結果をSlack通知、Backlogに課題登録

gene

2019年10月に Amazon ECR の脆弱性のスキャン機能が実装された。

Announcing Image Scanning for Amazon ECR

今回は、下記の記事を参考に、

  • ECR へイメージをプッシュ
  • ECR のイメージスキャン
  • スキャン実行をトリガーに Lambda を実行して Slack 通知

という流れでやってみた。

【参考URL】

ECR のスキャン設定

まずは、 ECR の設定を行う。
ECR のリポジトリ作成時、もしくは既存でリポジトリがある場合は設定を編集して、 Scan on push (プッシュ時にスキャン) を有効にする。
これを設定することで、 ECR にイメージをプッシュするたびにスキャンが実行される。

Lambda Layers

今回は、 PythonLambda を実行する。
ECR のイメージのスキャン結果を取得するためには、最新の boto3 が必要になるため、 Lambda Layers を設定する。

 【参考URL】

ローカル環境にて、下記のように boto3./python ディレクトリ配下にインストールし、 zip で固める。

$ mkdir python
$ pip install -t ./python boto3
$ zip -r boto3-1.10.28.zip python

AWS Lambda のメニューより、 Layers をクリック。

Create Layer をクリックして、

先ほど作成した zip ファイルをアップロードして、 Layer を作成する。

IAM role 設定

ECR のスキャン結果取得のため、 Lambda を実行するロールに、 ecr:DescribeImages を付与する。

Lambda 関数

Lambda 関数は、参考にさせていただいた記事のものをベースに少し手を加えて、チャンネル名(CHANNEL)、会社名(COMPANY)、SlackのウェブフックURLの(WEBHOOK_URL)を環境変数から取得するようにした。
弊社では、 GitHubCircleCI などの通知を集約している Slack チャンネルがあり、複数の開発や運用が同時で動いているため、どのプロジェクトのものか判別できるように対応したものである。

from datetime import datetime
from logging import getLogger, INFO
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
import json
import os
from botocore.exceptions import ClientError
import boto3

logger = getLogger()
logger.setLevel(INFO)


def get_properties(s_counts):
    """Returns the color setting of severity"""
    if s_counts['CRITICAL'] != 0:
        properties = {'color': 'danger', 'icon': ':red_circle:'}
    elif s_counts['HIGH'] != 0:
        properties = {'color': 'warning', 'icon': ':large_orange_diamond:'}
    else:
        properties = {'color': 'good', 'icon': ':green_heart:'}
    return properties


def get_params(scan_result):
    """Slack message formatting"""
    region = 'ap-northeast-1'
    channel = os.environ['CHANNEL']
    company = os.environ['COMPANY']
    severity_list = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'INFORMATIONAL', 'UNDEFINED']
    s_counts = scan_result['imageScanFindingsSummary']['findingSeverityCounts']

    for severity in severity_list:
        s_counts.setdefault(severity, 0)

    message = f"*ECRイメージスキャン結果 | {company} | Account:{scan_result['registryId']}*"
    description = scan_result['imageScanStatus']['description']
    text_properties = get_properties(s_counts)

    complete_at = datetime.strftime(
        scan_result['imageScanFindingsSummary']['imageScanCompletedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )
    source_update_at = datetime.strftime(
        scan_result['imageScanFindingsSummary']['vulnerabilitySourceUpdatedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )

    slack_message = {
        'username': 'Amazon ECR',
        'channels': channel,
        'icon_emoji': ':ecr:',
        'text': message,
        'attachments': [
            {
                'fallback': 'AmazonECR Image Scan Findings Description.',
                'color': text_properties['color'],
                'title': f'''{text_properties['icon']} {
                    scan_result['repositoryName']}:{
                    scan_result['imageTags'][0]}''',
                'title_link': f'''https://console.aws.amazon.com/ecr/repositories/{
                    scan_result['repositoryName']}/image/{
                    scan_result['imageDigest']}/scan-results?region={region}''',
                'text': f'''{description}
スキャン完了日時: {
                    complete_at}
脆弱性情報更新日時: {source_update_at}''',
                'fields': [
                    {'title': 'Critical', 'value': s_counts['CRITICAL'], 'short': True},
                    {'title': 'High', 'value': s_counts['HIGH'], 'short': True},
                    {'title': 'Medium', 'value': s_counts['MEDIUM'], 'short': True},
                    {'title': 'Low', 'value': s_counts['LOW'], 'short': True},
                    {'title': 'Info', 'value': s_counts['INFORMATIONAL'], 'short': True},
                    {'title': 'Undefined', 'value': s_counts['UNDEFINED'], 'short': True},
                ]
            }
        ]
    }
    return slack_message


def get_findings(detail):
    """Returns the image scan findings summary"""
    ecr = boto3.client('ecr')
    try:
        response = ecr.describe_images(
            repositoryName=detail['repository-name'],
            imageIds=[
                {'imageDigest': detail['image-digest']}
            ]
        )
    except ClientError as err:
        logger.error("Request failed: %s", err.response['Error']['Message'])
    else:
        return response['imageDetails'][0]


def lambda_handler(event, context):
    """AWS Lambda Function to send ECR Image Scan Findings to Slack"""
    response = 1
    scan_result = get_findings(event['detail'])

    slack_message = get_params(scan_result)
    req = Request(os.environ['WEBHOOK_URL'], json.dumps(slack_message).encode('utf-8'))
    try:
        with urlopen(req) as res:
            res.read()
            logger.info("Message posted.")
    except HTTPError as err:
        logger.error("Request failed: %d %s", err.code, err.reason)
    except URLError as err:
        logger.error("Server connection failed: %s", err.reason)
    else:
        response = 0

    return response

Lambda ファンクションに、先ほど作成した Layer を忘れずに設定する。

トリガー設定

Lambda 関数のトリガーとなる、 CloudWatch EventsRule を作成する。

サービス名に、 Elastic Container Registry(ECR) を選択。

イベントタイプとして、 ECR Image Scan を設定。

ターゲットとして、先ほど作成した Lambda ファンクションをセット。

動作確認

試しに、脆弱性を含んだイメージをプッシュしてみると、

無事通知が行われた。

Backlog に課題作成もやってみる

Slack の通知ではなくて、

  • 検出された脆弱性のレベルに CRITICAL が含まれていた場合は Backlog にて課題登録

ということもやってみる。

【参考URL】

Lambda ファンクションは下記のようにした。

from datetime import datetime
from logging import getLogger, INFO
import os
from botocore.exceptions import ClientError
import boto3
import requests

logger = getLogger()
logger.setLevel(INFO)


def get_findings(detail):
    """Returns the image scan findings summary"""
    ecr = boto3.client('ecr')
    try:
        response = ecr.describe_images(
            repositoryName=detail['repository-name'],
            imageIds=[
                {'imageDigest': detail['image-digest']}
            ]
        )
    except ClientError as err:
        logger.error("Request failed: %s", err.response['Error']['Message'])
    else:
        return response['imageDetails'][0]


def create_backlog_issue(scan_result):
    """Create Backlog Issue"""
    company = os.environ['COMPANY']
    severity = 2
    region = 'ap-northeast-1'
    title = "ECRイメージスキャンにて脆弱性が検出されました"
    url = f"https://{os.environ['BACKLOG_SPACE_ID']}.backlog.com/api/v2/issues"
    repo = f"{scan_result['repositoryName']}:{scan_result['imageTags'][0]}"

    summary = scan_result['imageScanFindingsSummary']['findingSeverityCounts']
    complete_at = datetime.strftime(
        scan_result['imageScanFindingsSummary']['imageScanCompletedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )
    source_update_at = datetime.strftime(
        scan_result['imageScanFindingsSummary']['vulnerabilitySourceUpdatedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )
    result_url = f'''https://console.aws.amazon.com/ecr/repositories/{
                    scan_result['repositoryName']}/image/{
                    scan_result['imageDigest']}/scan-results?region={region}'''

    description = """
会社名: %s
イメージ: %s
スキャン完了日時: %s
脆弱性情報更新日時: %s
スキャン結果: %s

URL: %s
    """ % (company, repo, complete_at, source_update_at, summary, result_url)

    payload = {
        'projectId' : os.environ['BACKLOG_PROJECT_ID'],
        'issueTypeId' : os.environ['BACKLOG_ISSUE_TYPE_ID'],
        'priorityId' : severity,
        'summary' : title,
        'description' : description,
        'apiKey' : os.environ['BACKLOG_API_KEY']
    }
    response = requests.post(url, params=payload)
    return response.json()


def lambda_handler(event, context):
    """ECR Image Scan Findings to Backlog"""
    logger.info(event)
    response = 1
    scan_result = get_findings(event['detail'])

    # create backlog issue if 'CRITICAL' found
    counts = scan_result['imageScanFindingsSummary']['findingSeverityCounts']
    logger.info(counts)
    if 'CRITICAL' in counts:
        logger.info("CRITICAL found. Creating Backlog Issue.")
        response = create_backlog_issue(scan_result)

    return response

※ 環境変数に下記の変数設定をする必要があるので注意。

COMPANY
BACKLOG_SPACE_ID
BACKLOG_PROJECT_ID
BACKLOG_ISSUE_TYPE_ID
BACKLOG_API_KEY

※ また、このファンクションでは、 requests を使用しているので、 Lambda Layerrequests を含めてやる必要があるので注意。

$ mkdir python
$ pip install -t ./python boto3 requests
$ zip -r boto3-1.10.28.zip python

動作確認

脆弱性を含んだイメージをプッシュしてみると、 無事 Backlog 上に課題が登録された。

上記の処理を組み合わせて、開発時に ECR へプッシュするたびにスキャン実行、スキャン結果を Slack へ通知、もし脆弱性のレベルに CRITICAL が含まれていたら Backlog へ課題登録する、という流れができる。
リンク先の URL でスキャン結果の詳細が確認できるようになっているので、確認および対応をすることができる。

AUTHOR
gene
gene
記事URLをコピーしました