Cognito user pool カスタム認証チャレンジを利用した Email MFA
Cognito は MFA をサポートしていますが、SMS または TOTP のオプションからの選択のみが可能です。多くのウェブサイトでは、メールによる追加認証が提供されています。Cognito User Pool のカスタム認証チャレンジを使用して、この機能を実装できます。
前提条件
以下の手順で、GitHub からサンプルをローカルにクローンし、以下をインストールしてください。
ディレクトリ構成
./
|-- src/
| |-- create_auth_challenge/
| | `-- app.py
| |-- define_auth_challenge/
| | `-- app.py
| |-- example/
| | |-- main.py
| | `-- requirements.txt
| |-- layers/
| | `-- python/
| | `-- layers/
| | `-- cognito_custom_challenge_helper.py
| `-- verify_auth_challenge/
| `-- app.py
|-- tests/
|-- samconfig.toml
`-- template.yaml
Python スクリプト
Lambda Layer
カスタムチャレンジのリクエストとレスポンスを簡単に処理できるようにするために、以下のコードで src/layers/python/layers/cognito_custom_challenge_helper.py
を作成してください。
import copy
from enum import Enum
class CustomChallengeName(Enum):
SRP_A = 'SRP_A'
PASSWORD_VERIFIER = 'PASSWORD_VERIFIER'
CUSTOM_CHALLENGE = 'CUSTOM_CHALLENGE'
class Session:
def __init__(self, session: dict):
_session = copy.deepcopy(session)
self.challenge_name = _session['challengeName']
self.challenge_result = _session['challengeResult']
self.challenge_metadata = _session.get('challengeMetadata', '')
def is_srp_a(self) -> bool:
return self.challenge_name == CustomChallengeName.SRP_A.value \
and self.challenge_result is True
def is_password_verifier(self) -> bool:
return self.challenge_name == CustomChallengeName.PASSWORD_VERIFIER.value \
and self.challenge_result is True
def is_custom_challenge(self) -> bool:
return self.challenge_name == CustomChallengeName.CUSTOM_CHALLENGE.value
def can_issue_tokens(self) -> bool:
return self.is_custom_challenge() and self.challenge_result is True
class CustomChallengeRequest:
def __init__(self, event: dict):
_request = copy.deepcopy(event['request'])
_session = _request.get('session', [])
self.last_session = Session(_session[-1]) if _session else None
self.user_attributes = _request['userAttributes']
self.challenge_answer = _request.get('challengeAnswer', '')
self.private_challenge_parameters = _request.get('privateChallengeParameters', {})
def verify_answer(self) -> bool:
return self.private_challenge_parameters.get('answer') == self.challenge_answer
class CustomChallengeResponse:
def __init__(self, event: dict):
_response = copy.deepcopy(event['response'])
self._response = _response
def set_answer(self, answer: str) -> None:
self._response['privateChallengeParameters'] = {}
self._response['privateChallengeParameters']['answer'] = answer
def set_metadata(self, data: str) -> None:
self._response['challengeMetadata'] = data
def set_next_challenge(self, name: CustomChallengeName) -> None:
self._response['challengeName'] = name.value
self._response['issueTokens'] = False
self._response['failAuthentication'] = False
def set_answer_correct(self, correct: bool) -> None:
self._response['answerCorrect'] = correct
def issue_tokens(self) -> None:
self._response['challengeName'] = ''
self._response['issueTokens'] = True
self._response['failAuthentication'] = False
def fail(self) -> None:
self._response['issueTokens'] = False
self._response['failAuthentication'] = True
def __dict__(self) -> dict:
return self._response
Define Auth Challenge
カスタム認証フローを開始すると、Cognito は src/define_auth_challenge/app.py
に配置された “Define Auth challenge Lambda trigger” を呼びます。
from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse, CustomChallengeName
def lambda_handler(event: dict, context: dict) -> dict:
# Parse the event to create a request and response object.
request = CustomChallengeRequest(event)
response = CustomChallengeResponse(event)
last_session = request.last_session
if last_session.is_srp_a():
# When the last session is SRP_A, require the client to authenticate with a password.
response.set_next_challenge(CustomChallengeName.PASSWORD_VERIFIER)
elif last_session.is_password_verifier():
# When the last session is PASSWORD_VERIFIER, initiate the custom challenge.
response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)
elif last_session.is_custom_challenge():
if last_session.can_issue_tokens():
# When the last session is CUSTOM_CHALLENGE and authentication has been completed, issue tokens.
response.issue_tokens()
else:
# When the last session is CUSTOM_CHALLENGE and the client is still during authentication flow,
# require the client to answer the next challenge.
response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)
else:
# If the client is in an unexpected flow, the current authentication must fail.
response.fail()
event['response'] = response.__dict__()
return event
Create Auth Challenge
認証チャレンジフローでユーザーに送信される OTP コードを作成する際、Cognito は src/create_auth_challenge/app.py
に配置された “Create Auth challenge Lambda trigger” を呼び出します。
後述の AWS SAM テンプレートで指定された環境変数 CODE_LENGTH
および EMAIL_SENDER
を使用します。
import os
import random
import boto3
from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse
client = boto3.client('ses')
CODE_LENGTH = int(os.environ.get('CODE_LENGTH', 6))
EMAIL_SENDER = os.environ.get('EMAIL_SENDER')
def lambda_handler(event: dict, context: dict) -> dict:
# Parse the event to create a request object.
request = CustomChallengeRequest(event)
last_session = request.last_session
if last_session.is_custom_challenge():
# When the last session is a custom challenge, extract the otp code from the last session metadata.
code = last_session.challenge_metadata.replace('challenge-', '')
else:
# When the last session is not a custom challenge, generate an otp code and send it to the client.
code = generate_code()
message = create_message(code)
send_mail_to(request.user_attributes['email'], message)
# Create a response
response = CustomChallengeResponse(event)
response.set_answer(code)
response.set_metadata(f'challenge-{code}')
event['response'] = response.__dict__()
return event
def generate_code(length=CODE_LENGTH) -> str:
return str(random.randint(0, 10 ** length - 1)).zfill(length)
def create_message(code: str) -> str:
return f'Your authentication code: {code}'
def send_mail_to(email: str, body: str) -> None:
client.send_email(
Source=EMAIL_SENDER,
Destination={
'ToAddresses': [email],
},
Message={
'Subject': {
'Charset': 'UTF-8',
'Data': 'Authentication Code',
},
'Body': {
'Text': {
'Charset': 'UTF-8',
'Data': body,
},
},
},
)
Cognito では、認証フローセッションの期間を設定できます。
Verify Auth Challenge
OTP コードを検証する際、Cognito は src/verify_auth_challenge/app.py
に配置された “Verify Auth challenge Lambda trigger” を呼び出します。
from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse
def lambda_handler(event: dict, context: dict) -> dict:
# Parse the event to create a request and response object.
request = CustomChallengeRequest(event)
response = CustomChallengeResponse(event)
# Create a response.
correct = request.verify_answer()
response.set_answer_correct(correct)
event['response'] = response.__dict__()
return event
AWS リソース作成
AWS SAM テンプレート
カスタム認証チャレンジを有効にするには、ExplicitAuthFlows
内に ALLOW_CUSTOM_AUTH
(39行目)を設定します。各 AWS::Serverless::Function
内の CognitoEvent
が、Cognito Lambda トリガーとして機能する Lambda 関数にリンクされていることを確認してください。
Amazon SES を使用してメールを送信するためには、CreateAuthChallenge
関数内の Policies
の設定が必要です。
AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: Email MFA using Cognito User Pool custom authentication challenges
Globals:
Function:
Timeout: 3
MemorySize: 128
Parameters:
CodeLength:
Type: Number
Default: '6'
EmailSender:
Type: String
Resources:
CognitoUserPool:
Type: AWS::Cognito::UserPool
Properties:
UserPoolName: cognito-custom-auth-email-mfa
Policies:
PasswordPolicy:
MinimumLength: 8
UsernameAttributes:
- email
Schema:
- Name: email
AttributeDataType: String
Required: true
CognitoUserPoolClient:
Type: AWS::Cognito::UserPoolClient
Properties:
UserPoolId: !Ref CognitoUserPool
ClientName: client
GenerateSecret: false
ExplicitAuthFlows:
- ALLOW_CUSTOM_AUTH
- ALLOW_REFRESH_TOKEN_AUTH
- ALLOW_USER_SRP_AUTH
LambdaLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: cognito_email_mfa_layer
ContentUri: src/layers
CompatibleRuntimes:
- python3.11
RetentionPolicy: Delete
CreateAuthChallenge:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/create_auth_challenge
Handler: app.lambda_handler
Runtime: python3.11
Layers:
- !Ref LambdaLayer
Environment:
Variables:
CODE_LENGTH: !Ref CodeLength
EMAIL_SENDER: !Ref EmailSender
Events:
CognitoEvent:
Type: Cognito
Properties:
Trigger: CreateAuthChallenge
UserPool: !Ref CognitoUserPool
Policies:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: ses:SendEmail
Resource: "*"
DefineAuthChallenge:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/define_auth_challenge
Handler: app.lambda_handler
Runtime: python3.11
Layers:
- !Ref LambdaLayer
Events:
CognitoEvent:
Type: Cognito
Properties:
Trigger: DefineAuthChallenge
UserPool: !Ref CognitoUserPool
VerifyAuthChallenge:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/verify_auth_challenge
Handler: app.lambda_handler
Runtime: python3.11
Layers:
- !Ref LambdaLayer
Events:
CognitoEvent:
Type: Cognito
Properties:
Trigger: VerifyAuthChallengeResponse
UserPool: !Ref CognitoUserPool
ビルドおよびデプロイ
<YOUR_SES_EMAIL_SENDER>
を送信元のメールアドレスに置き換え、次のコマンドでビルドおよびデプロイしてください。
OTP コードの長さを変更する場合は、CodeLength
パラメータも指定してください。
sam build
sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER>
# sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER> CodeLength=10
テスト
<YOUR_USER_POOL_ID>
および <YOUR_EMAIL>
を実際の値で置き換え、以下のコマンドで Cognito にテストユーザーを作成してください。
POOL_ID=<YOUR_USER_POOL_ID>
EMAIL=<YOUR_EMAIL>
# Add a Cognito user.
aws cognito-idp admin-create-user \
--user-pool-id $POOL_ID \
--username $EMAIL
# Make the user confirmation status "Confirmed"
echo -n 'Password: '
read password
aws cognito-idp admin-set-user-password \
--user-pool-id $POOL_ID \
--username $EMAIL \
--password $password \
--permanent
以下のコマンドを実行してください。
cd src/example
pip install -r requirements.txt
python main.py \
--pool-id <YOUR_USER_POOL_ID> \
--client-id <YOUR_CLIENT_ID> \
--username <YOUR_EMAIL> \
--password <YOUR_PASSWORD>
main.py
では、SRP - Secure Remote Password で必要な値を計算するために、以下のライブラリを使用しています。
AWS 公式のコード例もご参考ください。
クリーンアップ
以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。
sam delete
まとめ
Cognito のカスタム認証チャレンジを活用することで、メールによる MFA を実現できます。Cognito による正式対応が期待されます。
この投稿が皆様のお役に立てば幸いです。
Happy Coding! 🚀