AWS

AWS上のJP1にクロスアカウントでCloudWatch Alarmのアラート情報を集約させる方法について考えてみた

kim

おはようございます!DWSの木村です!

今回は、AWS上でJP1を使う際に、「どうやって様々なアカウントのアラート情報を集約させるか?」について考えてみました!

いくつかのパターンを考えてみましたので、同じような課題に直面した方は、ご自身の要件に当てはまる方法を選出して試していただければと思います!

JP1とは?

JP1とは、企業のITシステム運用を統合的に管理するソフトウェアの総称です。システムの稼働監視、業務の自動化、資産管理、セキュリティ管理などを一元的に行うことで、運用効率化と安定稼働を支援します。

今回はJP1の中でも、JP1/IM3という統合監視ツールを使って、オンプレからAWSまでのシステム全体の状況を可視化して把握する「オブザーバビリティ」を実現する場合の、マルチアカウントでの対応について検証パターンを見ていきたいと思います。

今回検証したことと環境について

検証内容

file
別アカウント(アカウントA)にあるCloudWatch AlarmのアラートをアカウントBのJP1に集約させるにはどうすればいいか?

環境

  • JP1:IM3(13.5)
  • Python(EC2):3.13.7
  • Python(Lambda):3.13
  • AWS CLI:2.28.20

1. 検証パターン1(JP1の転送機能を使う)

file
JP1には自身が収集したイベント情報を転送できる機能がございます。その機能を使えばアカウント間でアラート情報を転送することが可能です。

私が試みた、イベント転送の手順について下記にて記載させていただきます。


※アカウント内のCloudWatch Alarmのイベント情報をJP1で収集方法は、CloudWatch連携ツールというものを使用します。詳しくは下記をご参照ください。

参考:11.6 AWS CloudWatch連携ツール

https://itpfdoc.hitachi.co.jp/manuals/3021/30213L0600/IMRB0530.HTM

メリット/デメリット

  • メリット
    • JP1の設定はそれほど難しくない
  • デメリット
    • アカウントAに別途JP1を立てる必要があり、手間と、ライセンス費用が発生する

設定手順

1-1. フォルダーの検索欄から下記ファイルを開く

<JP1フォルダ>\JP1Base\conf\event\servers\default\forward

1-2. forwardファイルを下記の通り編集し、cmd + sでセーブする

to オンプレ環境ホスト名
B.ID NOTIN FFFFFFFF
end-to

※<ホスト名>には実際のホスト名を入れてください。IPやドメイン名でOKです。
※送信ポートは自動で20098になります。アカウントB側でのファイアーウォールでブロックされる場合がございますので、許可しておいてください。
B.ID NOTIN FFFFFFFFはすべてのイベントを転送するという設定です。他にも設定がございますので下記JP1/Base 運用ガイドをご確認ください。

参考:JP1/Base 運用ガイド(6.5.3 フィルターの文法)

https://itpfdoc.hitachi.co.jp/manuals/3020/30203k0660/BASE0111.HTM

1-3. PowerShellで下記コマンドを実行し、転送ファイルを有効化する

$ jevreload

2. 検証パターン2(CloudWatchのメトリクス共有機能を使う)

file
CloudWatchにはMetricsを他のアカウントに共有する機能があります。この機能を使えば、一つのアカウントにMetricsを集約させ、集約させたMetricsからAlarmを作成し、検証パターン1で説明したCloudWatch連携ツールを使えば、JP1で複数アカウントのAlarmの監視を行うことができます。

Metricsのアカウント間共有の設定方法については下記記事が大変わかりやすいので、ご参照ください。

参考:CloudWatchを別アカウントに共有して1アカウントでまとめてモニタリングしたい

https://dev.classmethod.jp/articles/shared-cloudwatch-to-other-account/

メリット/デメリット

  • メリット
    • アカウントAにJP1を立てなくていいので、手間とコストが下がる
  • デメリット
    • Metricsのアカウント共有の手順が少し手間
    • アカウントAに共有したくないMetricsが存在する場合に、アカウントA内で収集したいMetricsを一個一個Policyに追加することが必要になり、少し手間になる

3. 検証パターン3(CloudWatch連携ツールの収集先を別アカウントで指定する)

file
検証パターン1でも紹介しましたが、JP1/IM3ではCloudWatch連携ツールというCloudWatch Alarmの情報を吸い出すツールがあります。

CloudWatch連携ツールの実行ファイルは、AWS CLIを実行して、アラート情報を収集しているので、実行ファイル内の環境変数に、アカウントAで作成されているCloudWatch Alarmの情報を収集できるIAMアクセスキーのシークレット情報を入れれば、他アカウントのCloudWatch Alarmの情報を収集できます。

環境変数の設定箇所は下記ページの「(3) セットアップ」をご確認ください。

参考:11.6.2 導入(AWS CloudWatch連携ツール)

https://itpfdoc.hitachi.co.jp/manuals/3021/30213L0600/IMRB0532.HTM

メリット/デメリット

  • メリット
    • アカウントAではIAMアクセスキーの設定だけなので簡単
  • デメリット
    • 一つのアカウントからしか収集できない
    • アカウントAに共有したくないAlarmが存在する場合に、アカウントA内で収集したいAlarmを一個一個Policyに追加することが必要になり、少し手間になる

4. 検証パターン4(SQSにアラートメッセージを集約させ、連携ツールで読み取る)

file
CloudWatch Alarmに紐づくSNSでは、SQSの通知先をSQSに指定することで、アラート情報をSQSに貯めることができます。SQSに貯めたCloudWatchのAlert情報は、CloudWatch連携ツールを改修して、SQSから情報を吸い出せるようにすれば、利用可能です。

CloudWatch連携ツールの改修方法など、参照資料がなかったので、下記にて私が試した手順を記載させていただきます。

メリット/デメリット

  • メリット
    • アカウントAでは、SNSの通知先をSQSに向けるだけの簡単な作業だけになる
  • デメリット
    • JP1のCloudWatch連携ツールの改修が必要になる

手順

4-1. SQSの設定

4-1-1. 下記の設定を行い、「キューの作成」をクリックする

  • タイプ:FIFO
  • 名前:任意の名前.fifo
file

4-1-2. 「キューポリシー」 -> 「編集」をクリックする
file

4-1-3. 既存の設定にLambdaのサービスロールに権限を許可する下記ポリシーを追加する

{
      "Sid": "AllowSNSFromSpecificTopic",
      "Effect": "Allow",
      "Principal": {
        "Service": "sns.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "<SQSのARN>",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "<SNSのARN>"
        }
      }
    }

4-2. SNSの設定

4-2-1. CloudWatch Alarmに紐づくSNSに、下記の設定でサブスクリプションを作成する

  • プロトコル:Amazon SQS
  • エンドポイント:SQSのARN
    file

4-3. CloudWatch連携ツールの設定

4-3-1. JP1のCloudWatch連携ツールの実行ファイル(sendevent_aws.py)を下記のコードに置き換える

コード(sendevent_aws.py)

#!/usr/bin/env python3
"""
CloudWatch 連携ツール

CloudWatchからアラーム情報を取得し、イベント定義に従ってJP1イベントを登録する
"""

import sys
import os
import subprocess
import datetime
import csv
import json
import re
import argparse
import traceback
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from logging import config, getLogger

###############################################################
# ユーザ可変値(値はダブルクォートで括る)
###############################################################
BASE_DIR = r"C:\Program Files (x86)\Hitachi\JP1Base"
DEFAULT_REGION = ""
TIMESTAMP_DIR_PATH = ""
SQS_QUEUE_URL = ""
SQS_MESSAGE_FILE = "sqs-message.log"
###############################################################

# バージョン
__version__ = "1.3"

# 内部定数(変更不可)
LOCAL_TIME_ZONE = datetime.timezone(datetime.timedelta(hours=+9))  # JST

LOGGING_CONF_NAME = "logging.conf"
TIMESTAMP_NAME = "timestamp"
EVENTCONF_CSV_NAME = "eventconf.csv"
EVENTCONF_CSV_COLUMN_COUNT_MIN = 8
EVENTIGNORE_CSV_NAME = "eventignore.csv"
EVENTIGNORE_CSV_COLUMN_COUNT = 3

ISO_DATE_FIX_REGEXP = re.compile("Z$")
ISO_DATE_FIX_AFTER = "+00:00"

NOW_DATETIME = datetime.datetime.now(datetime.timezone.utc)

SCRIPT_IO_ENCODING = "utf-8"

CONF_COMMENT_START_STRING = "#"

CSV_ESCAPE_CHAR = "\\"

SCRIPT_OPTION_NOW_NAME = "now"
SCRIPT_OPTION_NOW_SHORT = "-n"
SCRIPT_OPTION_NOW_LONG = "--%s" % (SCRIPT_OPTION_NOW_NAME)

EXIT_CODE_SUCCESS = 0
EXIT_CODE_ERROR = 255


ENV_KEY_PYTHONIOENCODING = "PYTHONIOENCODING"

JP1_JEVSEND_COMMAND = os.path.join(BASE_DIR, "bin", "jevsend")
JP1_JEVSEND_OPTION_EVENT_ID = "-i"
JP1_JEVSEND_OPTION_MESSAGE = "-m"
JP1_JEVSEND_OPTION_DESTINATION = "-d"
JP1_JEVSEND_OPTION_SOURCE = "-s"
JP1_JEVSEND_OPTION_EXATTR = "-e"
JP1_JEVSEND_OPTION_EXATTR_FORMAT = "%s=%s"
JP1_JEVSEND_OPTION_EXATTR_KEY_ALARM_TIMESTAMP = "AWS_ALARM_TIMESTAMP"
JP1_JEVSEND_OPTION_EXATTR_KEY_ALARM_DATETIME = "AWS_ALARM_DATETIME"
JP1_JEVSEND_OPTION_EXATTR_VALUE_DATETIME_FORMAT = "%Y/%m/%d %H:%M:%S"
JP1_JEVSEND_OPTION_MESSAGE_MAX_LENGTH = 1023

ARGUMENT_PARSER_ADD_ARGUMENT_ACTION_STORE_TRUE = "store_true"

MESSAGE_SANITIZE_RULE = {"\r": " ", "\n": " ", "\xa0": " "}

# グローバル変数
logger = None
jp1_cli_env = None
home_dir_path = None
logging_conf_path = None
timestamp_path = None
eventconf_csv_path = None
eventignore_csv_path = None
last_timestamp = None
eventconf_entries = []
eventignore_entries = []
use_now_timestamp = False
updated_timestamp = False


# クラス定義
class ParseException(Exception):
    """
    パース例外クラス
    """

    def __init__(self, msg, file, num=None):
        """
        コンストラクタ
        """

        self._msg = msg
        self._file = file
        self._num = num

    def __str__(self):
        """
        文字列化
        """

        if self._num is None:
            return "%s(file:%s)" % (self._msg, self._file)
        else:
            return "%s(file:%s, line_num:%d)" % (self._msg, self._file, self._num)


class EventConditionEntry:
    """
    イベント条件情報保持クラス
    """

    def __init__(self, alarm_name, name_space, metric_name):
        """
        コンストラクタ
        """

        self._alarm_name = alarm_name
        self._name_space = name_space
        self._metric_name = metric_name

    def __str__(self):
        """
        文字列化
        """

        return "%s:{alarm_name:%s, name_space:%s, metric_name:%s}" % (
            "EventConditionEntry",
            self._alarm_name,
            self._name_space,
            self._metric_name,
        )

    @property
    def alarm_name(self):
        """
        アラーム名取得
        """

        return self._alarm_name

    @property
    def name_space(self):
        """
        ネームスペース取得
        """

        return self._name_space

    @property
    def metric_name(self):
        """
        メトリック名取得
        """

        return self._metric_name


class EventIgnoreEntry(EventConditionEntry):
    """
    イベント除外ファイル情報保持クラス
    """

    def __str__(self):
        """
        文字列化
        """

        return "%s:{%s}" % ("EventIgnoreEntry", super().__str__())

    @classmethod
    def parse(cls, row):
        """
        パース
        """

        return cls(row[0], row[1], row[2])


class Jp1EventEntry:
    """
    JP1イベント情報保持クラス
    """

    def __init__(self, event_id, msg, dest, src, ex_attrs):
        """
        コンストラクタ
        """

        self._event_id = event_id
        self._msg = msg
        self._dest = dest
        self._src = src
        self._ex_attrs = list(ex_attrs)

    def __str__(self):
        """
        文字列化
        """

        return "%s:{event_id:%s, msg:%s, dest:%s, src:%s, ex_attrs:%s}" % (
            "Jp1EventEntry",
            self._event_id,
            self._msg,
            self._dest,
            self._src,
            str(self._ex_attrs),
        )

    @property
    def event_id(self):
        """
        イベントID取得
        """

        return self._event_id

    @property
    def msg(self):
        """
        メッセージ取得
        """

        return self._msg

    @property
    def dest(self):
        """
        送信先イベントサーバ名取得
        """

        return self._dest

    @property
    def src(self):
        """
        送信元イベントサーバ名取得
        """

        return self._src

    @property
    def ex_attrs(self):
        """
        拡張属性取得取得
        """

        return self._ex_attrs


class EventConfEntry(EventConditionEntry, Jp1EventEntry):
    """
    イベント定義ファイル情報保持クラス
    """

    def __init__(
        self, alarm_name, name_space, metric_name, event_id, msg, dest, src, ex_attrs
    ):
        """
        コンストラクタ
        """

        EventConditionEntry.__init__(self, alarm_name, name_space, metric_name)
        Jp1EventEntry.__init__(self, event_id, msg, dest, src, ex_attrs)

    def __str__(self):
        """
        文字列化
        """

        return "%s:{%s, %s}" % (
            "EventConfEntry",
            EventConditionEntry.__str__(self),
            Jp1EventEntry.__str__(self),
        )

    @classmethod
    def parse(cls, row):
        """
        パース
        """

        return cls(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7:])


# 関数定義
def align_message_length(message):
    """
    メッセージの長さを揃える
    """

    lenb = 0
    all_lenb = 0
    result = ""

    # 全文字ループ
    for ch in list(message):
        lenb = len(ch.encode(SCRIPT_IO_ENCODING))
        if (lenb + all_lenb) > JP1_JEVSEND_OPTION_MESSAGE_MAX_LENGTH:
            # 最大バイト数を越えた場合
            break
        else:
            # 最大バイト数を越えない場合
            result += ch
            all_lenb += lenb

    # リターン
    return result


def sanitize_message(message):
    """
    メッセージの無害化
    """

    return (
        message.strip().translate(str.maketrans(MESSAGE_SANITIZE_RULE))
        if message
        else ""
    )


def iso_date_to_datetime(iso_date):
    """
    ISO日時文字列からdatetimeオブジェクトを生成
    """

    return datetime.datetime.fromisoformat(
        ISO_DATE_FIX_REGEXP.sub(ISO_DATE_FIX_AFTER, iso_date)
    )




def execute_jp1_cli(command_line_tokens):
    """
    JP1 CLI コマンド実行
    """

    return execute_cli(command_line_tokens, jp1_cli_env)


def execute_cli(command_line_tokens, command_env):
    """
    CLI コマンド実行
    """

    try:
        logger.debug("execute command line: %s" % (str(command_line_tokens)))
        return subprocess.run(
            command_line_tokens,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=command_env,
            check=True,
            shell=False,
            text=True
        )
    except (FileNotFoundError, subprocess.CalledProcessError) as e:
        logger.error("failed to execute command: %s" %
                     (str(command_line_tokens)))
        logger.error("    stdout: %s" % (e.stdout))
        logger.error("    stderr: %s" % (e.stderr))
        logger.error("    retcode: %s" % str(e.returncode))
        raise e


def build_jp1_jevsend_command_line_tokens(jp1_event_entry, state_reason, add_ex_attrs=None, account_id=None):
    """
    JP1イベント登録コマンドライン組み立て
    """

    result = []
    all_ex_attrs = []

    # JP1イベント登録コマンド
    result.append(JP1_JEVSEND_COMMAND)

    # イベントID
    if jp1_event_entry.event_id:
        result.append(JP1_JEVSEND_OPTION_EVENT_ID)
        result.append(jp1_event_entry.event_id)

    # メッセージ
    jp1_event_msg = None
    if jp1_event_entry.msg:
        jp1_event_msg = jp1_event_entry.msg
    elif state_reason:
        jp1_event_msg = state_reason

    if jp1_event_msg:
        # アカウントIDが提供されている場合は、メッセージに追加
        if account_id:
            jp1_event_msg = f"{jp1_event_msg} (Account ID:{account_id})"
        
        jp1_event_msg = align_message_length(sanitize_message(jp1_event_msg))
        if jp1_event_msg:
            result.append(JP1_JEVSEND_OPTION_MESSAGE)
            result.append(jp1_event_msg)

    # 送信先イベントサーバ名
    if jp1_event_entry.dest:
        result.append(JP1_JEVSEND_OPTION_DESTINATION)
        result.append(jp1_event_entry.dest)

    # 送信元イベントサーバ名
    if jp1_event_entry.src:
        result.append(JP1_JEVSEND_OPTION_SOURCE)
        result.append(jp1_event_entry.src)

    # 拡張属性
    if jp1_event_entry.ex_attrs:
        all_ex_attrs += jp1_event_entry.ex_attrs
    if add_ex_attrs:
        all_ex_attrs += add_ex_attrs
    if all_ex_attrs:
        for ex_attr in all_ex_attrs:
            if ex_attr:
                result.append(JP1_JEVSEND_OPTION_EXATTR)
                result.append(ex_attr)

    # リターン
    return result


def print_stderr(message, exc_info=None):
    """
    標準エラー出力
    """

    # メッセージを出力
    print(message, file=sys.stderr, end="")

    # 例外を出力
    if exc_info is None:
        # 例外存在しない場合、改行のみ出力
        print("", file=sys.stderr)
    else:
        # 例外が存在する場合のみ出力
        traceback.print_exception(*exc_info)


def log_aws_response(response_type, response_data):
    """
    レスポンスをログファイルに出力
    """
    
    if logger:
        logger.info("=== regist_jp1_event Response: %s ===" % response_type)
        
        # オブジェクトを文字列に変換してからログ出力
        if isinstance(response_data, dict):
            # 辞書の場合は、値を文字列に変換
            serializable_data = {}
            for key, value in response_data.items():
                if hasattr(value, '__str__'):
                    serializable_data[key] = str(value)
                else:
                    serializable_data[key] = value
            logger.info("Raw Response: %s" % json.dumps(serializable_data, ensure_ascii=False, indent=2))
        else:
            # その他の場合は文字列として出力
            logger.info("Raw Response: %s" % str(response_data))
            
        logger.info("=== End of regist_jp1_event Response: %s ===" % response_type)


def log_jp1_event_format(eventconf_entry, state_reason, add_ex_attrs, command_line_tokens, account_id=None):
    """
    JP1イベント登録用フォーマットをログファイルに出力
    """
    
    if logger:
        logger.info("=== JP1 Event Registration ===")
        logger.info("Event ID: %s" % eventconf_entry.event_id)
        
        # メッセージにアカウントIDを含める
        message = state_reason or eventconf_entry.msg
        if account_id:
            message = f"{message} (Account ID:{account_id})"
        logger.info("Message: %s" % message)
        
        logger.info("Destination: %s" % eventconf_entry.dest)
        logger.info("Source: %s" % eventconf_entry.src)
        logger.info("Extended Attributes: %s" % str(add_ex_attrs))
        logger.info("Command Line: %s" % " ".join(command_line_tokens))
        logger.info("=== End of JP1 Event Registration ===")



def load_logging_conf():
    """
    logging.conf読み込み
    """

    global logger

    # logging.conf読み込み
    config.fileConfig(logging_conf_path)

    # ロガー取得
    logger = getLogger(__name__)


def load_timestamp():
    """
    タイムスタンプ記録ファイル読み込み
    """

    global last_timestamp
    global updated_timestamp

    if use_now_timestamp:
        # 現在日時を使用する場合、現在日時を使用
        last_timestamp = NOW_DATETIME
        updated_timestamp = True
    else:
        # 現在日時を使用しない場合、タイムスタンプ記録ファイルから取得
        if os.path.isfile(timestamp_path):
            # ファイルが存在する場合
            with open(
                timestamp_path, "r", encoding=SCRIPT_IO_ENCODING
            ) as timestamp_file:
                # ファイル読み込み
                last_timestamp_str = timestamp_file.read().strip()

                # デバッグ
                logger.debug("last_timestamp_str: %s" % (last_timestamp_str))
                if last_timestamp_str:
                    # タイムスタンプ文字列が取得できた場合
                    last_timestamp = iso_date_to_datetime(last_timestamp_str)

    # デバッグ
    logger.debug("load timestamp: %s" % (str(last_timestamp or "")))


def load_eventignore_csv():
    """
    イベント除外ファイル読み込み
    """

    global eventignore_entries

    eventignore_entries = []

    if os.path.isfile(eventignore_csv_path):
        # イベント除外ファイルが存在する場合
        with open(
            eventignore_csv_path, "r", encoding=SCRIPT_IO_ENCODING
        ) as eventignore_csv_file:
            # ファイル読み込み
            csv_reader = csv.reader(
                eventignore_csv_file, doublequote=False, escapechar=CSV_ESCAPE_CHAR
            )
            for row in csv_reader:
                if len(row) > 0 and not row[0].startswith(CONF_COMMENT_START_STRING):
                    # 空行、コメント以外の場合
                    colmun_length = len(row)
                    if colmun_length != EVENTIGNORE_CSV_COLUMN_COUNT:
                        # カラム数が不正な場合
                        raise ParseException(
                            "invalid column count.(column_count:%d, columns:%s)"
                            % (colmun_length, str(row)),
                            eventignore_csv_path,
                            csv_reader.line_num,
                        )

                    # エントリ生成
                    eventignore_entry = EventIgnoreEntry.parse(row)
                    if (not eventignore_entry.alarm_name) and (
                        not eventignore_entry.name_space
                    ):
                        # 必須パラメータの指定がない
                        raise ParseException(
                            "Required parameter is not specified.",
                            eventignore_csv_path,
                            csv_reader.line_num,
                        )

                    # パース(リストに要素を追加)
                    eventignore_entries.append(eventignore_entry)
    else:
        # イベント除外ファイルが存在しない場合
        logger.error("eventignore.csv not exists.(path:%s)" %
                     (eventignore_csv_path))

    # デバッグ
    for eventignore_entry in eventignore_entries:
        logger.debug("eventignore_entry: %s" % (str(eventignore_entry)))


def load_eventconf_csv():
    """
    イベント定義ファイル読み込み
    """

    global eventconf_entries

    eventconf_entries = []

    if os.path.isfile(eventconf_csv_path):
        # イベント除外ファイルが存在する場合
        with open(
            eventconf_csv_path, "r", encoding=SCRIPT_IO_ENCODING
        ) as eventconf_csv_file:
            # ファイル読み込み
            csv_reader = csv.reader(
                eventconf_csv_file, doublequote=False, escapechar=CSV_ESCAPE_CHAR
            )
            for row in csv_reader:
                if len(row) > 0 and not row[0].startswith(CONF_COMMENT_START_STRING):
                    # 空行、コメント以外の場合
                    colmun_length = len(row)
                    if colmun_length < EVENTCONF_CSV_COLUMN_COUNT_MIN:
                        # カラム数が不正な場合
                        raise ParseException(
                            "invalid column count.(column_count:%d, columns:%s)"
                            % (colmun_length, str(row)),
                            eventconf_csv_path,
                            csv_reader.line_num,
                        )

                    # エントリ生成
                    eventconf_entry = EventConfEntry.parse(row)
                    if (not eventconf_entry.alarm_name) and (
                        not eventconf_entry.name_space
                    ):
                        # 必須パラメータの指定がない
                        raise ParseException(
                            "Required parameter is not specified.",
                            eventconf_csv_path,
                            csv_reader.line_num,
                        )

                    # パース(リストに要素を追加)
                    eventconf_entries.append(eventconf_entry)
    else:
        # イベント除外ファイルが存在しない場合
        logger.error("eventconf.csv not exists.(path:%s)" %
                     (eventconf_csv_path))

    # デバッグ
    for eventconf_entry in eventconf_entries:
        logger.debug("eventconf_entry: %s" % (str(eventconf_entry)))




def get_sqs_messages():
    """
    SQSからメッセージを取得(1回の呼び出しで1メッセージ)
    """
    
    result = []
    
    if SQS_QUEUE_URL:
        # SQSキューからメッセージを取得
        try:
            sqs_client = boto3.client(
                'sqs',
                region_name=DEFAULT_REGION
            )
            
            # メッセージを受信(1メッセージのみ)
            response = sqs_client.receive_message(
                QueueUrl=SQS_QUEUE_URL,
                MaxNumberOfMessages=1,
                WaitTimeSeconds=1
            )
            
            if 'Messages' in response:
                for message in response['Messages']:
                    try:
                        # メッセージボディをJSONとしてパース
                        message_body = json.loads(message['Body'])
                        result.append(message_body)
                        
                        # メッセージを削除(処理完了後)
                        sqs_client.delete_message(
                            QueueUrl=SQS_QUEUE_URL,
                            ReceiptHandle=message['ReceiptHandle']
                        )
                    except json.JSONDecodeError as e:
                        logger.error("Failed to parse SQS message: %s" % str(e))
                        continue
                        
        except (ClientError, NoCredentialsError) as e:
            logger.error("Failed to get SQS messages: %s" % str(e))
            raise e
    else:
        # ファイルからメッセージを読み取り(テスト用)
        if os.path.isfile(SQS_MESSAGE_FILE):
            with open(SQS_MESSAGE_FILE, "r", encoding=SCRIPT_IO_ENCODING) as f:
                try:
                    message_data = json.load(f)
                    result.append(message_data)
                except json.JSONDecodeError as e:
                    logger.error("Failed to parse SQS message file: %s" % str(e))
                    raise e
        else:
            logger.error("SQS message file not found: %s" % SQS_MESSAGE_FILE)
    
    return result


def parse_sns_message_to_alarm_data(sns_message):
    """
    SNSメッセージからアラームデータを抽出
    """
    
    alarm_data = {}
    
    try:
        # SNSメッセージのMessageフィールドをパース
        if 'Message' in sns_message:
            message_content = json.loads(sns_message['Message'])
            
            # アラーム情報を抽出
            alarm_data = {
                'AlarmName': message_content.get('AlarmName', ''),
                'AlarmDescription': message_content.get('AlarmDescription', ''),
                'AWSAccountId': message_content.get('AWSAccountId', ''),
                'NewStateValue': message_content.get('NewStateValue', ''),
                'NewStateReason': message_content.get('NewStateReason', ''),
                'StateChangeTime': message_content.get('StateChangeTime', ''),
                'Region': message_content.get('Region', ''),
                'AlarmArn': message_content.get('AlarmArn', ''),
                'OldStateValue': message_content.get('OldStateValue', ''),
                'Trigger': message_content.get('Trigger', {})
            }
            
            # メトリック情報を追加
            if 'Trigger' in message_content:
                trigger = message_content['Trigger']
                alarm_data['Namespace'] = trigger.get('Namespace', '')
                alarm_data['MetricName'] = trigger.get('MetricName', '')
                
    except (KeyError, json.JSONDecodeError) as e:
        logger.error("Failed to parse SNS message: %s" % str(e))
        raise e
    
    return alarm_data


def get_alarm_data_from_sqs():
    """
    SQSメッセージからアラームデータを取得(1回の呼び出しで1メッセージ)
    """
    
    result = []
    sqs_messages = get_sqs_messages()
    
    for message in sqs_messages:
        try:
            alarm_data = parse_sns_message_to_alarm_data(message)
            result.append(alarm_data)
        except Exception as e:
            logger.error("Failed to process SQS message: %s" % str(e))
            continue
    
    return result




def make_jp1_event_condition(alarm_data_list, event_entries):
    """
    JP1イベント条件作成(SQSメッセージ対応)
    """

    result = {}
    alarm_name = None

    for event_entry in event_entries:
        alarm_name = None
        if event_entry.alarm_name:
            # アラーム名が有効な場合
            alarm_name = event_entry.alarm_name
            if alarm_name and not alarm_name in result:
                # アラーム名が設定されている場合
                result[alarm_name] = event_entry

        elif event_entry.name_space:
            # ネームスペースが有効な場合
            for alarm_data in alarm_data_list:
                alarm_name = None

                if (
                    'Namespace' in alarm_data
                    and 'AlarmName' in alarm_data
                ):
                    # Namespaceが存在する場合
                    if (
                        event_entry.name_space
                        == alarm_data['Namespace']
                    ):
                        # ネームスペースと一致した場合
                        if event_entry.metric_name:
                            # メトリック名が有効な場合
                            if (
                                'MetricName' in alarm_data
                                and event_entry.metric_name
                                == alarm_data['MetricName']
                            ):
                                alarm_name = alarm_data['AlarmName']

                        else:
                            # メトリック名が無効な場合
                            alarm_name = alarm_data['AlarmName']

                    if alarm_name and not alarm_name in result:
                        # アラーム名が設定されている場合
                        result[alarm_name] = event_entry

    # リターン
    return result


def make_jp1_event_ignore_condition(alarm_data_list):
    """
    JP1イベント除外条件作成(SQSメッセージ対応)
    """

    result = None

    # 条件生成
    result = make_jp1_event_condition(alarm_data_list, eventignore_entries)

    # デバッグ
    for result_entry in result.items():
        logger.debug(
            "jp1_event_ignore_condition: %s:%s"
            % (result_entry[0], str(result_entry[1]))
        )

    return result


def make_jp1_event_regist_condition(alarm_data_list):
    """
    JP1イベント登録条件作成(SQSメッセージ対応)
    """

    result = None

    # 条件生成
    result = make_jp1_event_condition(alarm_data_list, eventconf_entries)

    log_aws_response("登録イベント条件", result)

    # デバッグ
    for result_entry in result.items():
        logger.debug(
            "jp1_event_regist_condition: %s:%s"
            % (result_entry[0], str(result_entry[1]))
        )

    return result


def regist_jp1_event_from_sqs(
    jp1_event_regist_condition, jp1_event_ignore_condition, alarm_data_list
):
    """
    JP1イベント登録(SQSメッセージ対応)
    """

    global last_timestamp
    global updated_timestamp

    registed_jp1_event_count = 0
    alarm_count = 0

    state_reason = None
    timestamp = None
    add_ex_attrs = None

    for alarm_data in alarm_data_list:
        alarm_count += 1

        # タイムスタンプの取得
        timestamp = None
        if 'StateChangeTime' in alarm_data and alarm_data['StateChangeTime']:
            # タイムスタンプが存在する場合
            timestamp = iso_date_to_datetime(alarm_data['StateChangeTime'])

        # 判定処理
        if 'AlarmName' in alarm_data and 'NewStateValue' in alarm_data:
            # AlarmName、NewStateValueが存在する場合
            alarm_name = alarm_data['AlarmName']
            
            if alarm_name in jp1_event_ignore_condition:
                # イベント除外条件に一致したらスキップ
                logger.debug("jp1 event ignore: %s" % (str(alarm_data)))
                continue

            if (
                alarm_name
                and alarm_data['NewStateValue'] == 'ALARM'
                and alarm_name in jp1_event_regist_condition
            ):
                # JP1イベント登録対象の場合
                eventconf_entry = jp1_event_regist_condition[alarm_name]

                # stateReasonの取得
                state_reason = alarm_data.get('NewStateReason', '')

                # 追加の拡張属性を生成
                add_ex_attrs = []

                if timestamp:
                    # アラーム発生日時(タイムスタンプ)
                    add_ex_attrs.append(JP1_JEVSEND_OPTION_EXATTR_FORMAT % (
                        JP1_JEVSEND_OPTION_EXATTR_KEY_ALARM_TIMESTAMP, str(int(timestamp.timestamp()))))

                    # アラーム発生日時(日付文字列)
                    add_ex_attrs.append(JP1_JEVSEND_OPTION_EXATTR_FORMAT % (JP1_JEVSEND_OPTION_EXATTR_KEY_ALARM_DATETIME, timestamp.astimezone(
                        LOCAL_TIME_ZONE).strftime(JP1_JEVSEND_OPTION_EXATTR_VALUE_DATETIME_FORMAT)))

                # コマンドライン組み立て
                account_id = alarm_data.get('AWSAccountId', '')
                command_line_tokens = build_jp1_jevsend_command_line_tokens(
                    eventconf_entry, state_reason, add_ex_attrs, account_id)

                # JP1イベント登録用フォーマットをログファイルに出力
                log_jp1_event_format(eventconf_entry, state_reason, add_ex_attrs, command_line_tokens, account_id)

                # コマンド実行
                execute_jp1_cli(command_line_tokens)

                # JP1イベント登録数カウント
                registed_jp1_event_count += 1

        if timestamp:
            # タイムスタンプが有効な値の場合、最終タイムスタンプを更新
            last_timestamp = timestamp
            updated_timestamp = True

    # JP1イベント登録数出力
    logger.info("alarm messages     : %d" % (alarm_count))
    logger.info("registed jp1 events: %d" % (registed_jp1_event_count))


def update_timestamp():
    """
    タイムスタンプ記録ファイル更新
    """

    if updated_timestamp:
        # タイムスタンプが更新されている場合のみ、ファイルへ書き込む
        last_timestamp_str = last_timestamp.isoformat() if last_timestamp else ""
        if last_timestamp_str:
            with open(
                timestamp_path, "w", encoding=SCRIPT_IO_ENCODING
            ) as timestamp_file:
                timestamp_file.write(last_timestamp_str)
            logger.debug("update timestamp: %s" % (last_timestamp_str))


def init():
    """
    初期化関数
    """

    global jp1_cli_env
    global home_dir_path
    global logging_conf_path
    global timestamp_path
    global eventconf_csv_path
    global eventignore_csv_path

    timestamp_dir_path = ""

    # 共通環境変数設定
    os.environ[ENV_KEY_PYTHONIOENCODING] = SCRIPT_IO_ENCODING

    # JP1 CLIコマンド用環境変数設定
    jp1_cli_env = os.environ.copy()

    # ホームディレクトリ設定
    home_dir_path = os.path.dirname(os.path.abspath(__file__))

    # タイムスタンプファイルパス設定
    timestamp_dir_path = TIMESTAMP_DIR_PATH if TIMESTAMP_DIR_PATH else home_dir_path
    timestamp_path = os.path.join(timestamp_dir_path, TIMESTAMP_NAME)

    # 各種ファイルパス設定
    logging_conf_path = os.path.join(home_dir_path, LOGGING_CONF_NAME)
    eventconf_csv_path = os.path.join(home_dir_path, EVENTCONF_CSV_NAME)
    eventignore_csv_path = os.path.join(home_dir_path, EVENTIGNORE_CSV_NAME)


def analyze_arguments():
    """
    引数解析
    """

    global use_now_timestamp

    parser = None
    args = None
    args_vars = None

    # 引数情報
    parser = argparse.ArgumentParser()
    parser.add_argument(
        SCRIPT_OPTION_NOW_SHORT,
        SCRIPT_OPTION_NOW_LONG,
        dest=SCRIPT_OPTION_NOW_NAME,
        action=ARGUMENT_PARSER_ADD_ARGUMENT_ACTION_STORE_TRUE,
    )

    # 引数解析
    args = parser.parse_args()

    # 引数から値を取得
    args_vars = vars(args)
    if SCRIPT_OPTION_NOW_NAME in args_vars and args_vars[SCRIPT_OPTION_NOW_NAME]:
        # -n/--nowオプションが指定されている場合
        use_now_timestamp = True


def dump_init_params():
    """
    初期パラメータのダンプ
    """

    global_vars = None
    param_value = ""
    target_param_names = [
        "BASE_DIR",
        "DEFAULT_REGION",
        "SQS_QUEUE_URL",
        "SQS_MESSAGE_FILE",
        "TIMESTAMP_DIR_PATH",
        "jp1_cli_env",
        "home_dir_path",
        "logging_conf_path",
        "timestamp_path",
        "eventconf_csv_path",
        "eventignore_csv_path",
    ]

    global_vars = globals()
    logger.debug("----------")
    for param_name in target_param_names:
        param_value = ""
        if param_name in global_vars:
            param_value = global_vars[param_name]
        logger.debug("%s: %s" % (param_name, param_value))
    logger.debug("----------")


def main():
    """
    メイン関数(SQSメッセージ対応)
    """

    registed_jp1_event_count = 0
    alarm_data_list = None

    # 引数解析
    analyze_arguments()

    # 初期処理
    init()

    # ログ設定ファイル読み込み
    try:
        load_logging_conf()
    except (KeyError, FileNotFoundError, IOError):
        print_stderr("invalid logging conf file.", sys.exc_info())
        return EXIT_CODE_ERROR

    # 開始ログ出力
    logger.info("start.(now_option:%s)" % (str(use_now_timestamp)))

    # デバッグ
    dump_init_params()

    # タイムスタンプ記録ファイル読み込み
    try:
        load_timestamp()
    except (IOError, ValueError):
        logger.exception("invalid timestamp file.")
        return EXIT_CODE_ERROR

    # イベント除外ファイル読み込み
    try:
        load_eventignore_csv()
    except (FileNotFoundError, IOError, ParseException):
        logger.exception("invalid eventignore csv file.")
        return EXIT_CODE_ERROR

    # イベント定義ファイル読み込み
    try:
        load_eventconf_csv()
    except (FileNotFoundError, IOError, ParseException):
        logger.exception("invalid eventconf csv file.")
        return EXIT_CODE_ERROR

    # SQSメッセージ処理ループ
    while True:
        # SQSからアラーム情報を取得(1メッセージずつ)
        try:
            alarm_data_list = get_alarm_data_from_sqs()
        except (FileNotFoundError, json.JSONDecodeError, ClientError, NoCredentialsError):
            logger.exception("failed to get alarm data from SQS.")
            return EXIT_CODE_ERROR

        # メッセージが取得できない場合は終了
        if not alarm_data_list:
            break

        # JP1イベント除外条件を作成
        jp1_event_ignore_condition = make_jp1_event_ignore_condition(
            alarm_data_list)

        # JP1イベント登録条件を作成
        jp1_event_regist_condition = make_jp1_event_regist_condition(
            alarm_data_list)

        # JP1イベント登録
        try:
            regist_jp1_event_from_sqs(
                jp1_event_regist_condition, jp1_event_ignore_condition, alarm_data_list
            )
            registed_jp1_event_count += len([data for data in alarm_data_list if data.get('NewStateValue') == 'ALARM'])
        except (FileNotFoundError, subprocess.CalledProcessError):
            logger.exception("failed to regist jp1 event.")
            return EXIT_CODE_ERROR

    # タイムスタンプ記録ファイル更新
    try:
        update_timestamp()
    except IOError:
        logger.exception("failed to update timestamp file.")
        return EXIT_CODE_ERROR

    # 情報ログ出力
    logger.info("registed jp1 events: %d" % (registed_jp1_event_count))

    # 終了ログ出力
    logger.info("finish")

    # 正常終了
    return EXIT_CODE_SUCCESS


# メイン処理実行
if __name__ == "__main__":
    sys.exit(main())


4-3-2. 以下環境変数を置き換える

  • DEFAULT_REGION = "<SQSのリージョン>"
  • SQS_QUEUE_URL = "<SQSのURL>"

4-3-3. 下記コマンドで実行し、SQSからメッセージが取得できれば完了
※キューにメッセージがない場合、この後の補足の手順を実施し、テスト用のメッセージをSQSに送信する

cd  <Pythonのファイルの場所>
python sendevent_aws_lambda.py

補足手順. テスト用にCloudWatch Alarmを一時的にAlarm状態にする

補足手順1. 下記コマンドを実行し、CloudWatch Alarmを一時的にAlarm状態にする

※<Alarm名>のところに実際のCloudWatch Alarmの名前を入れてください

$ aws cloudwatch set-alarm-state \
   --alarm-name <Alarm名> \
   --state-value ALARM \
   --state-reason "Temporarily putting to ALARM state for testing"

補足手順2. SQSにメッセージが格納されれば完了
file

まとめ

いかがでしょうか?JP1で他アカウントのAlertのイベント情報の収集方法について考えてみました。
JP1のAWSでのクロスアカウント対応についてはあまりネットに情報がなかったので、同じような境遇な方の参考になれば幸いです。

AUTHOR
kim
kim
記事URLをコピーしました