NTTドコモR&Dの技術ブログです。

Security Hubのベストプラクティスに違反したAWSリソースの検出

はじめに

NTTドコモR&Dイノベーション本部サービスイノベーション部ビッグデータ基盤担当一年目の筒井です。今日ではシステムの運用を可能な限り自動化しようという流れがあります。私の所属するチームでも積極的に推進しており、様々なシステムの運用の自動化に努めています。そのためAWSリソースのセキュリティチェックを自動化するためのアプリを作成したのでご紹介します。

課題

私の所属するチームでは社内用アプリ開発プラットフォームを運用しており、管理項目が多く運用に大きなヒューマンリソースを割いています。このプラットフォームはAWSをインフラ基盤として構築されており、社内のユーザーがこのAWS環境上に沢山のリソースを作成します。従来はセキュリティの観点で脆弱性のあるリソースを手動で確認していましたが、確認するリソース数が多く自動化し効率化する必要がありました。そのため、EventBridgeとLambdaを用いて、Security Hubのベストプラクティスに違反したリソースを検出し、Slackに通知するようなアプリを開発しました。

構成

本アプリのアーキテクチャは以下の通りです。まず、EventBridgeのSchedulerでLambdaを起動します。次に、LambdaがSecurity Hub上の違反リソースのリストを取得します。最後に、LambdaがSecrets ManagerからSlack通知用のトークンを取得し、違反リソースの一覧をSlackに通知する設計としました。

アーキテクチャ

Security Hub

AWSリソースの設定が、AWSの定めるベストプラクティスに違反している場合、Security Hubでそのリソースが検出・列挙されます。 ここで言うベストプラクティスとは、「AWS 基礎セキュリティのベストプラクティス(FSBP)」などの、AWSが推奨するセキュリティ基準のことです。 例えば、「S3.1」というコントロールIDでは、「S3バケットのパブリックアクセスブロック設定が有効になっているか」 が確認されます。これが無効になっていると、意図せずデータがインターネットに公開されるリスクがあるため違反となります。 本アプリでは、このように数あるチェック項目の中から特定の項目のIDを指定し、それに違反しているリソースのみをアカウント内から抽出して通知します。

EventBridge

EventBridge Schedulerは、指定したターゲットを任意のスケジュールで定期実行できるサービスです。 本アプリでは、ターゲットとしてLambda関数を登録し、定期的に起動するように設定しました。 構築時の注意点として、SchedulerがLambdaを起動できるように、適切な権限を持ったIAMロールを作成し、Schedulerに割り当てる必要があります。

Secrets Manager

Secrets ManagerはAPIキーやパスワードなどの機密情報を安全に保護・管理するサービスで、今回はSlackのアクセストークンを格納するために利用しました。 本アプリの動作にSecrets Managerは必須ではないのですが、Lambdaのコードのハードコーディングを避けるために用いました。

Lambda

Lambdaはサーバーの管理なしでコードを実行できるサービスで、本アプリではSecurity Hubからの情報取得やSlackへの通知といったメインの処理を担います。 Schedulerによって起動されたLambdaは、具体的に以下の3つのステップを実行します。

  1. Security Hubにアクセスし、違反リソースのリストを取得した後にcsvファイルにまとめる
  2. SlackへのtokenをSecrets Managerから取得する
  3. Slackへの通知

以下、それぞれのタスクのコードについて解説します。

1.Security Hubにアクセスし、違反リソースのリストを取得した後にcsvファイルにまとめる

LambdaはPythonのboto3ライブラリを用いてSecurity Hubにアクセスし、paginator.paginate() でSecurity Hub上の検出結果を取得します。Filtersの部分で取得するイベントを絞ります。 具体的には、セキュリティ基準に違反している(ComplianceStatus が FAILED)リソースの中で、まだ対応が完了しておらず(WorkflowState が NEW または NOTIFIED)、かつ現在も有効なデータ(RecordState が ACTIVE)であるものだけを抽出しています。

import boto3
import csv
import io

# 例: 'S3.1' や 'IAM.1' など、検出したいID
TARGET_CONTROL_IDS = ('S3.1', 'IAM.1', 'CR.1')

# Security Hubへの接続
securityhub_client = boto3.client('securityhub')
paginator = securityhub_client.get_paginator('get_findings')

# Security Hubのページ内を設定したフィルタで検索
response_iterator = paginator.paginate(
    Filters={
        'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}],
        'WorkflowState': [
            {'Value': 'NEW', 'Comparison': 'EQUALS'},
            {'Value': 'NOTIFIED', 'Comparison': 'EQUALS'}
        ],
        'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}]
    }
)

csv_output_data = []

for page in response_iterator:
    for finding in page['Findings']:
        generator_id = finding.get('GeneratorId', '')
        control_id = generator_id.split('/')[-1]

        if control_id in TARGET_CONTROL_IDS: 
            if finding.get('Resources'):
                resource_id = finding['Resources'][0]['Id']
            else:
                resource_id = 'N/A'
            
            violation_title = finding['Title']
            severity = finding['Severity']['Label']
            csv_output_data.append([resource_id, violation_title, control_id, severity])

# 検出結果をcsvにまとめる
with io.StringIO() as string_buffer:
    writer = csv.writer(string_buffer)
    writer.writerow(['リソース名', '違反事項', 'コントロールID', '重要度'])
    writer.writerows(csv_output_data)
    
    csv_content = string_buffer.getvalue()


print(csv_content)
2.SlackへのtokenをSecrets Managerから取得する

ハードコーディングを避けるためにSlackのワークスペースへのtokenをSecrets Managerに格納しました。tokenを取り出す際もSecurity Hubへのアクセスの時と同じくboto3を使用しました。

from slack-sdk import WebClient
import json 
import boto3

SLACK_SECRET_ARN = 'arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:your-secret-name'

# Secrets Managerへの接続
secrets_manager_client = boto3.client('secretsmanager')

# シークレット値を取得
response = secrets_manager_client.get_secret_value(SecretId=SLACK_SECRET_ARN)

secret_string = response['SecretString']
secret_json = json.loads(secret_string)
SLACK_BOT_TOKEN = secret_json.get('token')

# 取得できているか以下のコメントを外して確認できますが、確認後は戻しましょう。
# print(SLACK_BOT_TOKEN)
3.Slackへの通知

Slackにメッセージと共にcsvファイルを通知します。slackへの通知はPythonのslack_sdkライブラリを使用しました。

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

SLACK_CHANNEL = 'C1234567890' # チャンネルID
ENVIRONMENT_NAME = 'アカウント名'
comment = f'🚨 {ENVIRONMENT_NAME} 環境で {len(csv_output_data)} 件の違反を検出しました。詳細は添付のCSVファイルを確認してください。'
file_name = 'security_hub_findings.csv'
file_content = csv_content 

slack_client = WebClient(token=SLACK_BOT_TOKEN)

# Slackの指定したチャンネルにメッセージを送信
try:
    response = slack_client.files_upload_v2(
        channel=SLACK_CHANNEL,      # 通知するチャンネルのID
        initial_comment=comment,    # 通知する際のコメント
        filename=file_name,         # ファイルの名前
        content=file_content        # CSVファイルの中身(テキストデータ)
    )
    print(f"File uploaded: {response.get('file').get('permalink')}")

except SlackApiError as e:
    print(f"Error uploading file: {e.response['error']}")

次の画像はSlackでの通知の様子です。無事にcsvファイルとして通知されています。

slackへの通知の様子

今後の展望

今回は定期的にSecurity Hubにアクセスし、違反リソースを検出するアプリを開発しました。 次のステップとして、違反リソースのオーナーに対し、Slackで自動的にメンションを送る機能を追加する予定です。

また、本アプリは既存の全リソースをチェックできる利点がある一方で、「作成後に検知するため即時性に欠ける」「修正の手戻りが発生する」といった課題もあります。 そのため、cdk-nagを用いてデプロイ前の段階で違反を検出できるよう、CI/CDパイプラインへの組み込みについても検討を進めています。

ここまで読んでいただき、ありがとうございました。