Warning – Kinesis Firehose Stream Lambda function tutorial could incur an excess cost. Unless you plan on performing the other tutorials, delete your AWS resources to avoid incurring a cost.
This tutorial was tested on OS-X and Windows 10.
In this tutorial you create a semi-realistic example of using AWS Kinesis Firehose. You also create a Kinesis Firehose Stream Lambda function using the AWS Toolkit for Pycharm to create a Lambda transformation function that is deployed to AWS CloudFormation using a Serverless Application Model (SAM) template. After that, you then create the Kinesis Firehose stream and attach the lambda function to the stream to transform the data.
Introduction
Amazon Web Services Kinesis Firehose is a service offered by Amazon for streaming large amounts of data in near real-time. Streaming data is continuously generated data that can be originated by many sources and can be sent simultaneously and in small payloads. Logs, Internet of Things (IoT) devices, and stock market data are three obvious data stream examples. Kinesis Streams Firehose manages scaling for you transparently. Firehose allows you to load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. You can also transform the data using a Lambda function. Firehose also allows easy encryption of data and compressing the data so that data is secure and takes less space. For more information, refer to Amazon’s introduction to Kinesis Firehose.
If you prefer watching a video introduction, the following is a good Kinesis Firehose overview.
Other Tutorials
Although this tutorial stands alone, you might wish to view some more straight-forward tutorials on Kinesis Firehose before continuing with this tutorial. Here we add complexity by using Pycharm and an AWS Serverless Application Model (SAM) template to deploy a Lambda function.
- AWS Tutorial on Creating a Kinesis Firehose Stream
- Delivering Real-time Streaming Data to Amazon S3 Using Amazon Kinesis Data Firehose
The following is a good video demonstration of using Kinesis Firehose by Arpan Solanki. The example project focuses on the out of the box functionality of Kinesis Firehose and will make this tutorial easier to understand.
Tasks Performed Here
In this tutorial you add more complexity to the more straightforward demonstrations on using Kinesis Firehose. Rather than creating the Lambda function while creating the Kinesis Stream, you create a more realistic Lambda function using Pycharm. Moreover, you deploy that function using an AWS Serverless Application Model (SAM) template. We will perform the following tasks in this tutorial.
- Create and test a Kinesis Firehose stream.
- Create a Lambda function that applies a transformation to the stream data.
- Deploy the Lambda function using a Serverless Application Model (SAM) template.
- Modify the Kinesis Firehose stream to use the Lambda data transformer.
- Test the Kinesis Firehose stream.
- Trace and fix an error in the Lambda function.
- Redeploy the Lambda function.
- Test the Kinesis Firehose stream
Sample Project Architecture
Assume we have many locations that record the ambient temperature. We need to aggregate this data from the many different locations in almost real-time. We decide to use AWS Kinesis Firehose to stream data to an S3 bucket for further back-end processing.
Data is recorded as either fahrenheit or celsius depending upon the location sending the data. But the back-end needs the data standardized as kelvin. To transform data in a Kinesis Firehose stream we use a Lambda transform function. The following illustrates the application’s architecture.
Prerequisites
This tutorial expects you to have an AWS developer account and knowledge of the AWS console. You should have PyCharm with the AWS Toolkit installed and the AWS CLI also installed.
This tutorial requires a rudimentary knowledge of S3, CloudFormation and SAM templates, Lambda functions, and of course, Python. The following links should help if you are missing prerequisites.
- Creating and Activating an AWS Developer Account
- Getting Started with AWS
- Run a Serverless Hello World Application Tutorial
- AWS S3 Getting Started Guide
- CloudFormation
- Serverless Application Model (SAM)
- Python SDK for AWS
- AWS CLI
- JetBrain AWS Toolkit
Kinesis Firehose
AWS Kinesis Firehose is a fully managed service
Create Stream
- Log in to the AWS Console and select Services and then Kinesis.
- Click Get Started if first time visiting Kinesis.
- Click Create delivery stream in the Firehose panel.
Name the Stream
- Name the delivery stream temperatureStream.
- Accept the default values for the remaining settings.
- Click Next.
A data producer is any application that sends data records to Kinesis Firehose. By selecting Direct PUT or other sources you are allowing producers to write records directly to the stream.
- Accept the default setting of Disabled for Transform source records with AWS Lambda and Convert record format.
- Click Next.
The Transform source records with AWS Lambda allows you to define a Lambda function. Later in this tutorial you will change this setting and define a Lambda function. For now, leave it disabled.
Configure S3 Bucket
- Select Amazon S3 as the Destination.
- Under the S3 destination, click Create new.
- Name the S3 bucket with a reasonable name (remember all names must be globally unique in S3). Here I use the name temperaturebucket123 as the bucket name and select the appropriate Region.
Configure Permissions
- Click Next.
- Accept the defaults and scroll to the Permissions section.
- Click Create new or choose to associate an IAM role to the stream.
- Create a role named temperature_stream_role (we return to this role in a moment) by accepting the defaults.
- Click Allow.
- Click Next after returned to the stream creation.
- Review the delivery stream and click Create delivery stream to create the stream.
- You should be taken to the list of streams and the Status of temperatureStream should be …Creating.
- After the stream’s status is Active, click on temperatureStream to be taken to the stream’s configuration page.
- Click on the IAM role to return to the role settings in IAM.
- Now, we are being very lazy…you would not do this in production, but delete the attached policy and attach the AWSLambdaFullAccess, AmazonS3FullAccess, and AmazonKinesisFirehoseFullAccess roles.
Here we are granting the role too much access. In reality, you should grant the minimal access needed in a production setting.
Test Stream
For a simple stream such as what you just developed AWS provides an easy means of testing your data. Let’s test your data before continuing development.
- If not on the stream configuration screen, select the stream on the Kinesis dashboard to navigate to the stream’s configuration screen.
- Expand the Test with demo data section.
- Click the Start sending demo data button.
- Wait about a minute and click the Stop sending demo data button.
- From the Amazon S3 destination section click on the bucket name to navigate to the S3 bucket. Be certain to wait five minutes to give the data time to stream to the S3 bucket.
If you tire of waiting five minutes, return to the stream’s configuration and change the buffer time to a smaller interval than 300 seconds.
- Click on the sub-folders until taken to the data file. If you do not see the top level folder, then wait five minutes and refresh the page. Remember, the data is buffered.
- Open the file and you should see the test records written to the file.
- Navigate to the top level folder and delete the test data. Be certain you delete the top level folder and not the bucket itself.
- Open a command-line terminal on your computer and enter the following aws firehose put-record commands.
These commands worked with cli 1.18.11 on OS-X and they worked in Git-Bash on Windows 10. If you can get these working in Windows 10 command-line, please post in comments, as I wasted hours trying to send using cmd.
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="99.55F"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="33.22C"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="57.99f"'
You should see something similar to the following in your command-line terminal.
For details on the put-record command refer to the AWS reference page on the command (AWS CLI Command Reference: put-record).
- Return to the AWS Console and navigate to the S3 bucket and note the data was written to the bucket. Remember to allow the records time to process by waiting five minutes.
- Rather than sending a simple string, modify the commands to send Json. Note that you escape the double-quotes.
See warning above regarding instability of cli accepted input.
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
- Return to the AWS Console and you should see a file in the S3 bucket with data formatted as follows. Do not forget to give the record time to stream before checking the S3 bucket.
{"station":"A1","temp":"57.99f"}{"station":"A1","temp":"57.99f"}
In the sample architecture note that the you need to convert the temperature data to kelvin. To accomplish this transformation you create a Lambda transform function for the Kinesis Firehose stream.
Lambda Function
Recall when creating the stream you were provided the option of transforming the data.
Although you left this feature disabled, the requirements dictate that you need to modify temperature readings from fahrenheit or celsius to kelvin. Kinesis firehose provides an easy way to transform data using a Lambda function. If you referred to any of the linked tutorials above then you know that you can create and edit the Lambda function directly in the AWS console.
Here you develop the Lambda function in a local development environment, debug the function, and then deploy the function to AWS. Here you develop a Python Lambda function locally and deploy it to AWS using a CloudFormation SAM template.
PyCharm
Hopefully you have installed PyCharm and the AWS Toolkit. If not, do so now. Refer to the prerequisites above for information on installing both.
- Start PyCharm.
- Create a new AWS Serverless Application named kelvinTempConversion.
- Click No if the following Create Project popup appears.
- Open the template.yaml folder and notice the generated SAM template.
- Modify the timeout from 3 to 60 seconds (Kinesis Firehose requires a 60 second timeout).
- Right click the hello_world folder and select Refactor | Rename to rename the folder to kelvinConversion.
- After reviewing the changes to be made, click the Do Refactor button.
- Change all instances of HelloWorld with KelvinConversion in template.yaml.
- Modify the function timeout (Globals:Function:Timeout:) to 60 seconds, the minimum for Kinesis Firehose.
- Remove the Events section and the KelvinConversionApi section. These two sections are for building a public rest API. As we are developing a transformation function for our stream, neither is needed.
- After modifying all instances of the hello world text, template.yaml should appear similar to the following.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
AWS
Sample SAM Template for AWS
Globals:
Function:
Timeout: 60
Resources:
KelvinConversionFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: kelvinConversion/
Handler: app.lambda_handler
Runtime: python3.8
Outputs:
KelvinConversionFunction:
Description: "Kelvin Conversion Lambda Function ARN"
Value: !GetAtt KelvinConversionFunction.Arn
KelvinConversionFunctionIamRole:
Description: "Implicit IAM Role created for Kelvin Conversion function"
Value: !GetAtt KelvinConversionFunctionRole.Arn
- From the upper right drop down, select Edit Configurations.
- Modify the template to reflect the new folder.
- Click Ok.
- Select the dropdown item and click the green arrow to run the application.
/usr/local/bin/sam local invoke --template /Users/jamesabrannan/PycharmProjects/kelvinTempConversion/.aws-sam/build/template.yaml --event "/private/var/folders/xr/j9kyhs2n3gqcc0n1mct4g3lr0000gp/T/[Local] KelvinConversionFunction-event.json" KelvinConversionFunction
Invoking app.lambda_handler (python3.8)
Fetching lambci/lambda:python3.8 Docker container image......
Mounting /Users/jamesabrannan/PycharmProjects/kelvinTempConversion/.aws-sam/build/KelvinConversionFunction as /var/task:ro,delegated inside runtime container
START RequestId: 1ffa20fa-486e-1827-e987-e92f16101778 Version: $LATEST
END RequestId: 1ffa20fa-486e-1827-e987-e92f16101778
REPORT RequestId: 1ffa20fa-486e-1827-e987-e92f16101778 Init Duration: 531.94 ms Duration: 14.75 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 24 MB
{"statusCode":200,"body":"{\"message\": \"hello world\"}"}
- Now that you are assured the project is configured correctly and executes locally, open app.py and replace the sample code with the following. Note that the line using the index string function is in error. This error is by design and you will fix it later in the tutorial.
import base64
import json
from decimal import Decimal
def lambda_handler(event, context):
output = []
for record in event['records'] :
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
print(payload)
reading = json.loads(payload)
print(reading)
temp = reading['temp']
print(temp)
# note: this is in error, if celcius this causes error
# this is fixed later in tutorial
isfarenheit = bool(temp.upper().index('F') > 0)
kelvin = 0
if isfarenheit:
print(float(temp.upper().strip('F')))
kelvin = (float(temp.upper().strip('F')) + 459.67) * 5.0 / 9.0
else:
kelvin = float(temp.upper().strip('C')) + 273.15
print("{:.2f}".format(kelvin))
reading['temp'] = str("{:.2f}".format(kelvin))
print(reading)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(reading).encode('UTF-8'))
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Local Testing
To test the record you need to use an event template. There are event types you can choose, depending upon how the Lambda function is to be used.
- From Event Templates select Kinesis Firehose.
- Create the sample record {“station”:”A1″,”temp”:”99.33F”} and base64 encode the record. A good site to encode and decode is the base64encode.org website.
- Replace the data string generated when you selected the Kinesis Firehose Event Template and replace it with the base64 encoded string.
- Run the application locally and you should see the returned record.
- Copy the data string and decode the record from base64.
- Validate the converted kelvin measurement is correct.
Note, you only tested fahrenheit. This is by design to illustrate debugging in the AWS Console. You fix this error later in this tutorial.
Deploying Serverless Application
- Right click on template.yaml and select Deploy Serverless Application from the popup menu.
- Select Create Stack and name the stack kelvinTemperatureConversionStack.
- Select or create an S3 Bucket.
- Click Deploy.
- If you receive a credentials error, then you need to configure the AWS Toolkit correctly.
- At the extreme lower right of the window, click the message telling you the issue.
- After fixing credentials (if applicable) then try again. A dialog window should appear informing you of the deployment progress.
- Notice that the window is using CLI Sam commands to deploy the function to AWS.
Verifying the Lambda Function
After deploying you should verify the function was deployed correctly.
Lambda Function
- Navigate to the AWS Lambda service and you should see the newly created Lambda function.
S3 Bucket
- Navigate to the S3 buckets and you should see the newly created bucket used for deploying the Lambda function.
AWS CloudFormation
- Navigate to CloudFormation and you should see the created stack.
- Return to the Kinesis Firehose stream to add the Lambda function to the stream.
For more information on using S3 from a Java application, refer to the tutorial Amazon Web Services Simple Queue Service Using the Java 2 Software Development Kit
Modifying Kinesis Firehose Stream
- Navigate to the temperatureStream configuration page.
- Click Edit.
- Enable source record transformation in the Transform source records with AWS Lambda section.
- Select the Lambda function created and deployed by PyCharm.
- Click Save.
Testing Kinesis Firehose Stream Using CLI
- Open a command-line window and send several records to the stream. Be certain to escape the double-quotes, with the exception of the double quotes surrounding the data record.
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
- After waiting five minutes, navigate to the S3 bucket and you should see a new folder entitled processing-failed.
- Navigate down the processing-failed folder hierarchy and open the failure records.
- The error messages are not very informative. But at least they tell you the Lambda function processing caused the error.
- Navigate to the stream and select Amazon S3 Logs.
- The log message is also not very informative.
- Navigate to the Lambda function details.
- Select the LogStream from the most recent invocation of the Lambda function.
- The detailed log records the exact cause of the error, the index function. Unlike some languages such as Java, the Python index function returns an error if the string is not found.
Fixing Error
- Return to the PyCharm project to fix the error and redeploy the Lambda function to AWS.
You might notice that you can edit a function directly in the AWS Console. DO NOT EDIT! Remember, you deployed this application using SAM in CloudFormation. The correct process is to fix the function and then redeploy it using SAM.
- Modify the function to use find rather than the index function.
isfarenheit = bool(temp.upper().find('F') > 0)
- Run the application locally using a celsius value. As before encode and decode and test the converted value.
- After testing, right click on template.yaml and redeploy the serverless application.
- Accept the Update Stack defaults.
- After clicking Deploy a popup window informs you of the deployment progress.
- Navigate to the Lambda function details in the AWS Console and you should see the corrected source code.
- From your command-line send several records to the stream.
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
- Navigate to the S3 bucket and you should see the transformed records.
Summary
In this tutorial you created a Kinesis FIrehose stream and created a Lambda transformation function. You configured the stream manually and used SAM to deploy the Lambda function. An obvious next step would be to add the creation of the Kinesis Firehose and associated bucket to the Cloudformation template in your PysCharm project. This tutorial was sparse on explanation, so refer to the many linked resources to understand the technologies demonstrated here better. However, this tutorial was intended to provide a variation on the numerous more straightforward Kinesis Firehose tutorials available.