-->
  • Recent Articles

    AWS lambda Error: Data transformation in firehose. The Lambda function .. it returned an error result

    While working with lambda to transform  data , you could come across this error. We will find this error in Destination  error logs. The error is "The Lambda function was successfully invoked but it returned an error result" . If you Check the releated lambda logs , you will most probably find out some errors related to key like "lambda_handler  for record in event['Records']:  KeyError: 'Records'". 

    Solution: 

    Its basiclly boils down, how you have setup the Firehose,Kinesis and Lambda pipeline. If your Kinesis stream triggers a Lambda to delivers the data to Firehose, then you'll be interested in Kinesis Record Event. Checkout Using AWS Lambda with Amazon Kinesis. Sample event below

    {
        "Records": [
            {
                "kinesis": {
                    "kinesisSchemaVersion": "1.0",
                    "partitionKey": "1",
                    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                    "approximateArrivalTimestamp": 1545084650.987
                },
                "eventSource": "aws:kinesis",
                "eventVersion": "1.0",
                "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
                "eventName": "aws:kinesis:record",
                "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
                "awsRegion": "us-east-2",
                "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
            },
            {
                "kinesis": {
                    "kinesisSchemaVersion": "1.0",
                    "partitionKey": "1",
                    "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                    "approximateArrivalTimestamp": 1545084711.166
                },
                "eventSource": "aws:kinesis",
                "eventVersion": "1.0",
                "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
                "eventName": "aws:kinesis:record",
                "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
                "awsRegion": "us-east-2",
                "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
            }
        ]
    }
    

    Another setup is to be Firehose polling the Kinesis stream. Also, we get the flexibility to setup a transformation Lambda for Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)

    {
      "invocationId": "invoked123",
      "deliveryStreamArn": "aws:lambda:events",
      "region": "us-west-2",
      "records": [
        {
          "data": "SGVsbG8gV29ybGQ=",
          "recordId": "record1",
          "approximateArrivalTimestamp": 1510772160000,
          "kinesisRecordMetadata": {
            "shardId": "shardId-000000000000",
            "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
            "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
            "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
            "subsequenceNumber": ""
          }
        },
        {
          "data": "SGVsbG8gV29ybGQ=",
          "recordId": "record2",
          "approximateArrivalTimestamp": 151077216000,
          "kinesisRecordMetadata": {
            "shardId": "shardId-000000000001",
            "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
            "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
            "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
            "subsequenceNumber": ""
          }
        }
      ]
    }

    In my Use case its the second type of the setup I've done. But still i was getting the
    Key Errors. I was running Python 3.9 and not decoding properly. I have given working
    code below, which you can refer. I'll explain the code in a later Post.
    import base64
    import json
    import calendar
    import time

    print('Loading function')


    def lambda_handler(event, context):
    output = []
    now = calendar.timegm(time.gmtime())
    for record in event['records']:
    print(record['recordId'])
    payload = base64.b64decode(record['data']).decode('utf-8')

    # Do custom processing on the payload here
    message = json.loads(base64.b64decode(record['data']).decode('utf-8'))

    if float(message['CHANGE']) < 0:
    trend = 'DOWN'
    elif float(message['CHANGE']) > 0:
    trend = 'POSITIVE'
    # Construct output
    data_field = {
    'timestamp': now,
    'trend' : trend,
    'SECTOR': message['SECTOR'],
    'CHANGE': float(message['CHANGE']),
    'TICKER_SYMBOL': message['TICKER_SYMBOL'],
    'PRICE': float(message['PRICE'])
    }

    output_record = {
    'recordId': record['recordId'],
    'result': 'Ok',
    # 'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
    'data': base64.b64encode(json.dumps(data_field).encode('utf-8')).decode('utf-8')
    }
    output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

    No comments