Amazon CognitoのMFA登録(TOTP)〜MFAログインをLambdaで行う

AWS

はじめに

Amazon CognitoのMFA登録をLambdaで実装してみます。具体的には以下の処理です。
・ Cognitoのアクセストークンを利用してQRコードを生成
・ クライアントからMFAコードを受け取り、クライアントのMFA認証を有効にする
・ MFA認証を利用したログイン

前提

・認証方式には「TOTP」を利用します。
・今回はユーザ毎にMFA有効/無効を選択できる前提とするためユーザープールのMFA有効化は「OPTIONAL」に設定しておきます。
・Lambdaプロキシ統合を有効

MFA登録の流れ

CognitoIdentityProvider.Clientを利用します。

  • 1. associate_software_tokenでシークレットキーを取得
  • 「1」で取得した値をもとにQRコードを生成
  • クライアントでQRコードを読み込みMFAコードを生成
  • verify_software_tokenでMFAコード(TOTPトークン)を検証

公式ドキュメントの詳細な説明は以下を参照
CognitoIdentityProviderのAPI仕様
TOTP ソフトウェアトークン MF

MFAログインの流れ

  1. admin_initiate_authで認証パラメータを取得
  2. 「1」で取得した認証パラメータとユーザが入力したMFAコードを使い   admin_respond_to_auth_challengeで認証を行う

事前準備

Cognitoユーザの登録

以下の記事を参照ください。

Lambdaのレイヤー作成

今回はQRコード生成にqrcode/PILを利用し、IDトークンのデコードや検証にpyjwtを利用するのでLambdaのレイヤーを作成しておきます。
レイヤー作成方法については以下の記事を参照してください。
以下の記事ではqrcode/PILだけですが、requirements.txtにpyjwtを追加すればOKです。

MFA登録用のQRコードの仕様

QRコードにはCognitoより発行されたシークレットキーを埋め込みますが
それ以外にもTOTPの仕様(RFC6238)に従い設定する項目があります。
今回は以下の「google-authenticator」の仕様に従い実装してみます。
https://github.com/google/google-authenticator/wiki/Key-Uri-Format

■基本フォーマット

otpauth://TYPE/LABEL?PARAMETERS
(例)otpauth://totp/Example:alice@google.com?secret=JBSWY3DPEHPK3PXP&issuer=Example

■TYPE
認証方式を指定。
hotp または totp が指定可能。今回は totp を設定。

■LABEL
accountnameのみ、またはaccountnameとissuerを指定。
accountnameはアカウント、issuerは発行者の識別情報になります。今回は両方の値を設定します。

仕様は以下のとおりです。
・2つの値は「:」(コロン)で区切ります。
・accountname、issuerに「:」(コロン)は使用出来ません
・accountname、issuerはURLエンコードする必要があります。

どちらもユニークにする必要がありますが今回はCognitoの以下の情報を設定します。

項目設定値
accountnameアクセストークンの「username」クレーム
issuer任意の値

■PARAMETERS
必須項目の「Secret」と強い推奨の「Issuer」を設定します。
値はBase32でエンコードする必要があります。

項目名内容設定値
Secretシークレットキー(秘密鍵)boto3.client.associate_software_tokenで取得した値
Issuer発行者名任意の値

Lambda関数:MFA登録用のQRコード生成

処理内容
・ クライアントからアクセストークンを取得
・ associate_software_tokenでQRコード用のシークレットキーを生成
・ QRコードのLABEL用にアクセストークンからアカウント名を取得
・ 上記を元にSVG形式のQRコードを生成

QRコードを生成するためにアクセストークンのユーザー名(username)クレームを取得しています。
アクセストークンの中身は以下ドキュメントを参照。
アクセストークンの使用

import boto3
import io
import qrcode
import base64
import json
import qrcode.image.svg
import urllib.parse
import logging
import jwt

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    client = boto3.client('cognito-idp', region_name='ap-northeast-1')
    body_json = json.loads(event['body'])

    # アクセストークンを取得
    access_token = body_json['AccessToken']

    # アクセストークンのペイロードを取得
    access_token_payload = jwt.decode(access_token, options={"verify_signature": False})
    logger.info(access_token_payload)

    # Cognitoのアカウント名を取得
    account_name = access_token_payload['username']
    logger.info("sub is [" + account_name + "]")

    # 発行元。
    issuer = 'issuerName'

    # LABELパラメータを生成。アカウント名、発行者名に区切り文字であるコロン(:)は使用出来ない。
    # URLエンコードを実施。「/」もエンコード対象にするためsafeをクリア。
    label_str = urllib.parse.quote(account_name + ':' + issuer, safe='')

    # シークレットキーを取得
    response = client.associate_software_token(
        AccessToken=access_token,
    )
    logger.info("secret key is [" + response['SecretCode'] + "]")
#    base32_secret_code = base64.b32encode(response['SecretCode'].encode()).decode()
    base32_secret_code = response['SecretCode']

    # 発行元。
#    base32_issuer = base64.b32encode(issuer.encode()).decode()
    base32_issuer = issuer

    # QRコード用パラメータを生成
    parameters = 'secret=' +  base32_secret_code + '&issuer=' + base32_issuer

    # TOTPの仕様(RFC6238)に従ったQRコード用文字列
    qrcode_str = 'otpauth://totp/' + label_str + '?' + parameters
    logger.info("qrcode is [" + qrcode_str + "]")

    # QRコードの生成(SVG形式)
    encoded_image = make_base64_svg(qrcode_str)

    return {
        "statusCode": 200,
        "headers": {
            'Access-Control-Allow-Headers': 'Content-Type',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'OPTIONS,POST,GET',
        },
        "isBase64Encoded": True,
        "body": encoded_image,
    }

def make_base64_svg(data):
    qr = qrcode.QRCode(
        box_size=400,
        border=2,
    )
    qr.add_data(data)
    qr.make(fit=True)
    img = qr.make_image(image_factory=qrcode.image.svg.SvgPathImage,fill_color="black", back_color="white")
    buffered = io.BytesIO()
    img.save(buffered)
    buffered.flush()
    base64str = base64.b64encode(buffered.getvalue()).decode("utf-8")

    return base64str 

Lambda関数:MFAの有効化

・ クライアントからアクセストークンとMFAコードを受け取る
・ verify_software_tokenでMFAコードを検証
・ set_user_mfa_preferenceでMFAを有効化

import json
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    body_json = json.loads(event['body'])
    access_token = body_json['accessToken']
    mfa_code = body_json['mfaCode']

    client = boto3.client('cognito-idp', region_name='ap-northeast-1')

    # MFAコードを検証する
    response = client.verify_software_token(
        AccessToken=access_token,
        UserCode=mfa_code,
    )

    # MFA(TOTP)を有効にする
    response = client.set_user_mfa_preference(
        SoftwareTokenMfaSettings={
            'Enabled': True,
            'PreferredMfa': True
        },
        AccessToken=access_token
    )

    return {
        "statusCode": 200,
        "headers": {
            'Access-Control-Allow-Headers': 'Content-Type',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'OPTIONS,POST,GET'
        },
        "isBase64Encoded": False,
        "body": json.dumps(response),
    }

Lambda関数:サインイン

・ admin_initiate_authを呼び出しサインインを行う
・ MFAを有効にしていないユーザの場合はこの処理で認証が完了し 
レスポンスでAuthenticationResultが返却される。
・ AuthenticationResultにはアクセストークン、IDトークン、リフレッシュトークンが含まれう
・MFAを有効にしているユーザの場合は、レスポンスに
 次のMFA認証に必要な認証パラメータ(Session、ChallengeParameters)が設定され
「ChallengeName」に”SOFTWARE_TOKEN_MFA”が設定される。
 この時点では認証完了ではないのでAuthenticationResultはレスポンスに含まれない
・認証フローは様々なタイプがあるが今回はサーバー側の認証で、SRPも利用しないので
 「ADMIN_USER_PASSWORD_AUTH」を設定
・管理系のAPIを利用するので「AmazonCognitoPowerUser」ポリシーをアタッチしておく

import json
import boto3
import logging
import botocore

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    # ユーザプールID
    user_poolid = 'yourUserPoolId'
    # クライアントID
    client_id = 'yourClientId'

    # アカウント情報
    username = ''
    userpass = ''
    if 'body' in event:
        body_json = json.loads(event['body'])
        username = body_json['userId'] if 'userId' in body_json else 'nobody'
        userpass = body_json['userPassword'] if 'userPassword' in body_json else 'nopassword'
    else:
        return {
            'statusCode': 400,
            'body': json.dumps('bad request.')
        }

    try:
        client = boto3.client('cognito-idp', region_name='ap-northeast-1')

        response = client.admin_initiate_auth(
            UserPoolId = user_poolid,
            ClientId = client_id,
            AuthFlow = "ADMIN_USER_PASSWORD_AUTH",
            AuthParameters = {
                "USERNAME": username,
                "PASSWORD": userpass,
            }
        )

        logger.info(response)

        return {
            'statusCode': 200,
            'isBase64Encoded': False,
            'headers': {
                'Access-Control-Allow-Headers': 'accept-encoding,content-length,sec-ch-ua,accept,accept-language,origin,user-agen,Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token',
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'GET,POST,OPTIONS',
                'Access-Control-Allow-Credentials' : True,
            },
            'body': json.dumps(response)
        }

    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'UserNotFoundException':
            return {
                'statusCode': 200,
                'isBase64Encoded': False,
                'headers': {
                    'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token',
                    'Access-Control-Allow-Origin': '*',
                    'Access-Control-Allow-Methods': 'GET,POST,OPTIONS',
                    'Access-Control-Allow-Credentials' : True,    
                },
                'body': json.dumps('user not found.')
            }
        elif error.response['Error']['Code'] == 'NotAuthorizedException':
            return {
                'statusCode': 403,
                'isBase64Encoded': False,
                'headers': {
                    'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token',
                    'Access-Control-Allow-Origin': '*',
                    'Access-Control-Allow-Methods': 'GET,POST,OPTIONS',
                    'Access-Control-Allow-Credentials' : True,
                },
                'body': json.dumps('AuthorizeError.')
            }
        else:
            return {
                'statusCode': 403,
                'isBase64Encoded': False,
                'headers': {
                    'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token',
                    'Access-Control-Allow-Origin': '*',
                    'Access-Control-Allow-Methods': 'GET,POST,OPTIONS',
                    'Access-Control-Allow-Credentials' : True,
                },
                'body': json.dumps('UnknownError.')
            }

Lambda関数:サインイン/MFAコード送信

・ admin_initiate_authのレスポンスで返却された認証パラメータ(Sessionとアカウント名)と
 ユーザーが入力したMFAコードを受け取る
・ アカウント名は「ChallengeParameters」内の「USER_ID_FOR_SRP」で取得可能
・ 上記の値をadmin_respond_to_auth_challengeに設定し、認証を行う
・管理系のAPIを利用するので「AmazonCognitoPowerUser」ポリシーをアタッチしておく
・ boto3のadmin_respond_to_auth_challengeの仕様には設定パラメータの詳細記載が無いので非管理系のrespond_to_auth_challengeの仕様書を確認する

import json
import boto3
import logging
import botocore

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    # ユーザプールID
    user_poolid = 'yourUserPoolId'
    # クライアントID
    client_id = 'yourClientId'
    # 認証パラメータ
    body_json = json.loads(event['body'])
    mfa_code = body_json["mfaCode"]
    authParamSession = body_json["authParamSession"]
    authParamUserId = body_json["authParamUserId"]

    logger.info("mfa_code [" + mfa_code + "]")
    logger.info("authParamSession [" + authParamSession + "]")
    logger.info("authParamUserId [" + authParamUserId + "]")

    try:
        client = boto3.client('cognito-idp', region_name='ap-northeast-1')

        response = client.admin_respond_to_auth_challenge(
            UserPoolId = user_poolid,
            ClientId = client_id,
            ChallengeName = "SOFTWARE_TOKEN_MFA",
            Session = authParamSession,
            ChallengeResponses = {
                "USERNAME": authParamUserId,
                "SOFTWARE_TOKEN_MFA_CODE": mfa_code,
            }
        )

        return {
            'statusCode': 200,
            'headers': {
                'Access-Control-Allow-Headers': 'Content-Type',
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'OPTIONS,POST,GET'
            },
            'body': json.dumps(response)
        }

    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'CodeMismatchException':
            return {
                'statusCode': 200,
                'body': json.dumps('AuthError...') + json.dumps(response)
            }
        else:
            raise error

テスト用HTML:サインイン

・ 「SignIn」ボタンでサインイン
・「MFA設定」ボタンでMFA設定用の画面に遷移
・ レスポンスと、各種トークン情報をテキストボックスで表示

<!DOCTYPE html>
<html>
  <head>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
    <meta charset="UTF-8" />
    <title>Cognito SignIn</title>

    <script type="text/javascript">
    $(function(){
      //サインイン
      $("#signInButton").click(
        function(){
          var url = "https://xxx/signin";
          var JSONdata = {
            "userId": $("#userId").val(),
            "userPassword": $("#userPassword").val(),
          };

        $.ajax({
          type : 'post',
          url : url,
          data : JSON.stringify(JSONdata),
          contentType: 'application/json',
          dataType : 'json',
          scriptCharset: 'utf-8',
          success : function(data) {
            // MFAが有効かを判断
            if (data["ChallengeName"]) {
              if (data["ChallengeName"]=="SOFTWARE_TOKEN_MFA") {
                // MFA(TOTP)が有効の場合
                // RespondToAuthChallenge用の認証パラメータを取得
                sessionStorage.setItem("authParamSession", data["Session"]);
                sessionStorage.setItem("authParamUserId", data["ChallengeParameters"]["USER_ID_FOR_SRP"]);
                window.location.href = './mfalogin.html';
              }
            } else if (data["AuthenticationResult"]) {
              // MFAが無効の場合
              $("#AccessToken").val(data["AuthenticationResult"]["AccessToken"]);
              $("#RefreshToken").val(data["AuthenticationResult"]["RefreshToken"]);
              $("#IdToken").val(data["AuthenticationResult"]["IdToken"]);
            }
            $("#response").html(JSON.stringify(data));              
          },
          error : function(data) {
            $("#response").html(JSON.stringify(data));
          }
        });
      })

      // MFA設定
      $("#mfaSettingButton").click(
        function(){
          var url = "https://xxx/mfa"
          var JSONdata = {
            "AccessToken": $("#AccessToken").val(),
            "RefreshToken": $("#RefreshToken").val(),
            "IdToken": $("#IdToken").val(),
          };

          $.ajax({
            type : 'post',
            url : url,
            data : JSON.stringify(JSONdata),
            contentType: 'application/json',
            dataType : 'json',
            scriptCharset: 'utf-8',
            success : function(data) {
              sessionStorage.setItem("qrcode", data);
              sessionStorage.setItem("accessToken", $("#AccessToken").val());
              window.location.href = './mfa.html';
            },
            error : function(data) {
              $("#response").html(JSON.stringify(data));
            }
          })
      });
    });
    </script>
</head>
  <body>
    <h2>Cognito SignIn</h2>
    <p>User: <input size="30" type="text" id="userId"></p>
    <p>Password: <input size="30" type="password" id="userPassword"></p>
    <p>ResponsData</p>
    <textarea id="response" cols=120 rows=10 disabled></textarea>
    <p><button id="signInButton">SignIn</button></p>
    <p><button id="mfaSettingButton">MFA設定</button></p>
    <p>
      <label for="AccessToken">AccessToken</label>
      <input type="text" id="AccessToken" readonly size="100">
    </p>
    <p>
      <label for="RefreshToken">RefreshToken</label>
      <input type="text" id="RefreshToken" readonly size="100">
    </p>
    <p>
      <label for="IdToken">IdToken</label>
      <input type="text" id="IdToken" readonly size="100">
    </p>

  </body>
</html>

テスト用HTML:MFA設定(QRコード表示)

・MFA用のQRコードを表示
・MFAコードを入力し、「検証ボタン」を押下するとMFAが有効化される

<html>
    <head>
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
        <meta charset="UTF-8" />
        <title>MFA登録</title>
        <script type="text/javascript">
            $(document).ready(function(){
            var qrcodeSvgText = sessionStorage.getItem("qrcode");
            $("#qrcodeImg").attr("src", "data:image/svg+xml;base64," + qrcodeSvgText);
        });

        $(function(){
            $("#mfaCodeVefifyButton").click(
                function(){
                    var url = "https://xxx/verify";
                    var accessToken = sessionStorage.getItem("accessToken");
                    var JSONdata = {
                        "mfaCode": $("#mfaCode").val(),
                        "accessToken": accessToken,
                };

                $.ajax({
                    type : 'post',
                    url : url,
                    data : JSON.stringify(JSONdata),
                    contentType: 'application/json',
                    dataType : 'json',
                    scriptCharset: 'utf-8',
                    success : function(data) {
                        $("#response").html(JSON.stringify(data));
                    },
                    error : function(data) {
                        $("#response").html(JSON.stringify(data));
                    }
                });
            })
        });

        </script>
    </head>
<body>
    <center>
    <img id="qrcodeImg" src="#" alt="MFACode" height="200" width="200" />
    <p>
      <label for="mfaCode">MFAコード</label>
      <input type="text" name="mfaCode" id="mfaCode" size="10"><button id="mfaCodeVefifyButton">検証</button>
    </p>
    </center>
    <p>ResponsData</p>
    <textarea id="response" cols=120 rows=10 disabled></textarea>
</body>
</html>

テスト用HTML:サインイン(MFAコード送信)

・ MFAが有効になっている場合、サインイン画面から遷移してくる
・ MFAコードを入力し、「送信」ボタンを押下するとサインインを実行

<html>
    <head>
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
        <meta charset="UTF-8" />
        <title>MFAコード入力</title>
        <script type="text/javascript">
        $(function(){
            $("#mfaLogin").click(
                function(){
                    var url = "https://xxx/mfasignin";
                    var JSONdata = {
                        "mfaCode": $("#mfaCode").val(),
                        "authParamSession": sessionStorage.getItem("authParamSession"),
                        "authParamUserId": sessionStorage.getItem("authParamUserId")
                    };

                $.ajax({
                    type : 'post',
                    url : url,
                    data : JSON.stringify(JSONdata),
                    contentType: 'application/json',
                    dataType : 'json',
                    scriptCharset: 'utf-8',
                    success : function(data) {
                        $("#response").html(JSON.stringify(data));
                    },
                    error : function(data) {
                        $("#response").html(JSON.stringify(data));
                    }
                });
            })
        });

        </script>
    </head>
<body>
    <center>
    <p>
      <label for="mfaCode">MFAコード</label>
      <input type="text" name="mfaCode" id="mfaCode" size="10"><button id="mfaLogin">送信</button>
    </p>
    </center>
    <p>ResponsData</p>
    <textarea id="response" cols=120 rows=10 disabled></textarea>
</body>
</html>

以上

タイトルとURLをコピーしました