English 中文(简体)
Python lambda function returns ErrorType KeyError
原标题:

i am creating a lambda function for Certification expiration Alert, and getting this error message when Test run

Response

{
  "errorMessage": " detail-type ",
  "errorType": "KeyError",
  "requestId": "5449b430-e32a-4645-93b0-f204e92ef6e6",
  "stackTrace": [
    "  File "/var/task/lambda_function.py", line 25, in lambda_handler
    if (event [ detail-type ] == "ACM Certificate Approaching Expiration"):
"
  ]
}

Function Logs

START RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6 Version: $LATEST
[ERROR] KeyError:  detail-type 
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 25, in lambda_handler
    if (event [ detail-type ] == "ACM Certificate Approaching Expiration"):END RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6
REPORT RequestId: 5449b430-e32a-4645-93b0-f204e92ef6e6  Duration: 1.54 ms   Billed Duration: 2 ms   Memory Size: 128 MB Max Memory Used: 53 MB  Init Duration: 233.48 ms

Request ID 5449b430-e32a-4645-93b0-f204e92ef6e6

here it is the full lambda funtion

import json
import boto3
import os
from datetime import datetime, timedelta, timezone

# -------------------------------------------
# setup global data
# -------------------------------------------
utc = timezone.utc

# make today timezone aware
today = datetime.now().replace(tzinfo=utc)

# set up time window for alert - default to 45 if its missing
if os.environ.get( EXPIRY_DAYS ) is None:
    expiry_days = 45
else:
    expiry_days = int(os.environ[ EXPIRY_DAYS ])

expiry_window = today + timedelta(days = expiry_days)

def lambda_handler(event, context):

    # if this is coming from the ACM event, its for a single certificate
    if (event [ detail-type ] == "ACM Certificate Approaching Expiration"):
        
        response = handle_single_cert (event, context.invoked_function_arn)

    # otherwise, we need to get all the expiring certs that are expiring from CloudWatch Metrics
    else:
        response = handle_multiple_certs(event, context.invoked_function_arn)
    
    return {
         statusCode : 200,
         body : response 
    }


def handle_single_cert(event, context_arn):
    cert_client = boto3.client( acm )

    cert_details = cert_client.describe_certificate(CertificateArn=event[ resources ][0])

    result =  The following certificate is expiring within   + str(expiry_days) +   days:   + cert_details[ Certificate ][ DomainName ]
    
    # check the expiry window before logging to Security Hub and sending an SNS
    if cert_details[ Certificate ][ NotAfter ] < expiry_window:
        # This call is the text going into the SNS notification
        result = result +   (  + cert_details[ Certificate ][ CertificateArn ] +  )  

        # this call is publishing to SH
        result = result +   -   + log_finding_to_sh(event, cert_details, context_arn)
        
        # if there s an SNS topic, publish a notification to it
        if os.environ.get( SNS_TOPIC_ARN ) is None:
            response = result
        else:
            sns_client = boto3.client( sns )
            response = sns_client.publish(TopicArn=os.environ[ SNS_TOPIC_ARN ], Message=result, Subject= Certificate Expiration Notification )
        
    return result

def handle_multiple_certs(event, context_arn):
    cert_client = boto3.client( acm )

    cert_list = json.loads(get_expiring_cert_arns())
    
    if cert_list is None:
        response =  No certificates are expiring within   + str(expiry_days) +   days. 

    else:
        response =  The following certificates are expiring within   + str(expiry_days) +   days: 
 

        # loop through the cert list and pull out certs that are expiring within the expiry window
        for csl in cert_list:
            cert_arn = json.dumps(csl[ Dimensions ][0][ Value ]).replace( " ,   )
            cert_details = cert_client.describe_certificate(CertificateArn=cert_arn)

            if cert_details[ Certificate ][ NotAfter ] < expiry_window:
                current_cert =  Domain:  + cert_details[ Certificate ][ DomainName ] +   (  + cert_details[ Certificate ][ CertificateArn ] +  ), 
 
                print(current_cert)

                # this is publishing to SH
                result = log_finding_to_sh(event, cert_details, context_arn)

                # This is the text going into the SNS notification
                response = response + current_cert
                
    # if there s an SNS topic, publish a notification to it
    if os.environ.get( SNS_TOPIC_ARN ) is not None:
        sns_client = boto3.client( sns )
        response = sns_client.publish(TopicArn=os.environ[ SNS_TOPIC_ARN ], Message=response.rstrip( , 
 ), Subject= Certificate Expiration Notification )

    return response

def log_finding_to_sh(event, cert_details, context_arn):
    # setup for security hub
    sh_region = get_sh_region(event[ region ])
    sh_hub_arn = "arn:aws:securityhub:{0}:{1}:hub/default".format(sh_region, event[ account ])
    sh_product_arn = "arn:aws:securityhub:{0}:{1}:product/{1}/default".format(sh_region, event[ account ])

    # check if security hub is enabled, and if the hub arn exists
    sh_client = boto3.client( securityhub , region_name = sh_region)
    try:
        sh_enabled = sh_client.describe_hub(HubArn = sh_hub_arn)

    # the previous command throws an error indicating the hub doesn t exist or lambda doesn t have rights to it so we ll stop attempting to use it
    except Exception as error:
        sh_enabled = None
        print ( Default Security Hub product doesn t exist )
        response =  Security Hub disabled 
    
    # This is used to generate the URL to the cert in the Security Hub Findings to link directly to it
    cert_id = right(cert_details[ Certificate ][ CertificateArn ], 36)

    if sh_enabled:
        # set up a new findings list
        new_findings = []
    
            # add expiring certificate to the new findings list
        new_findings.append({
            "SchemaVersion": "2018-10-08",
            "Id": cert_id,
            "ProductArn": sh_product_arn,
            "GeneratorId": context_arn,
            "AwsAccountId": event[ account ],
            "Types": [
                "Software and Configuration Checks/AWS Config Analysis"
            ],
            "CreatedAt": event[ time ],
            "UpdatedAt": event[ time ],
            "Severity": {
                "Original":  89.0 ,
                "Label":  HIGH 
            },
            "Title":  Certificate expiration ,
            "Description":  cert expiry ,
             Remediation : {
                 Recommendation : {
                     Text :  A new certificate for   + cert_details[ Certificate ][ DomainName ] +   should be imported to replace the existing imported certificate before expiration ,
                     Url : "https://console.aws.amazon.com/acm/home?region=" + event[ region ] + "#/?id=" + cert_id
                }
            },
             Resources : [
                {
                     Id : event[ id ],
                     Type :  ACM Certificate ,
                     Partition :  aws ,
                     Region : event[ region ]
                }
            ],
             Compliance : { Status :  WARNING }
        })
    
        # push any new findings to security hub
        if new_findings:
            try:
                response = sh_client.batch_import_findings(Findings=new_findings)
    
                if response[ FailedCount ] > 0:
                    print("Failed to import {} findings".format(response[ FailedCount ]))
    
            except Exception as error:
                print("Error: ", error)
                raise
            
    return json.dumps(response)

def get_expiring_cert_arns():
    cert_list = []
    
    # Create CloudWatch client
    cloudwatch = boto3.client( cloudwatch )
    
    paginator = cloudwatch.get_paginator( list_metrics )
    
    for response in paginator.paginate(
        MetricName= DaysToExpiry ,
        Namespace= AWS/CertificateManager ,
        Dimensions=[{ Name :  CertificateArn }],):
            cert_list = cert_list + (response[ Metrics ])
            
    # return all certs that are expiring according to CW
    return json.dumps(cert_list)

# function to setup the sh region    
def get_sh_region(event_region):
    # security hub findings may need to go to a different region so set that here
    if os.environ.get( SECURITY_HUB_REGION ) is None:
        sh_region_local = event_region
    else:
        sh_region_local = os.environ[ SECURITY_HUB_REGION ]
    
    return sh_region_local
    
# quick function to trim off right side of a string
def right(value, count):
    # To get right part of string, use negative first index in slice.
    return value[-count:] 


  [1]: https://i.stack.imgur.com/uGjJ4.png
问题回答

You can first print the event to help you troubleshoot it by checking whether you are actually getting the key you expect ( detail-type ):

import json
print(f"Received: {json.dumps(obj=event, indent=2)})"

If there is a chance that you will not have detail-type , then you can either use try/except to deal with the key error, or use if event.get( detail-type ) to guard against it.

When you test your lambda function, you can create a JSON string to be used as a test event. Make sure that the test event has all the attributes that it needs. For example, a test lambda event is initially set to this:

{
  "key1": "value1",
  "key2": "value2",
  "key3": "value3"
}

You should edit the test event and add all the items that your code expects (find all your occurrences of event[ someKey ]. In your code, the list of keys I see are:

{
  "detail-type": "some value",
  "resources": "some list maybe with square brackets here",
  "region": "some-regions",
  "time": "some time",
  "id": "998877",
  "account": "12344567",
}

Once all the event[ keys ] are part of the test event, you should not see your error anymore.





相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签