Go module to push data to Kinesis respecting the limits
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Darshil Chanpura 214d8d2393 using the same in main.go 4 months ago
metrics Added prometheus metrics support and debug logs 4 months ago
runner Added prometheus metrics support and debug logs 4 months ago
.gitignore Added LICENSE and README.md 5 months ago
LICENSE Added LICENSE and README.md 5 months ago
README.md Added prometheus metrics support and debug logs 4 months ago
go.mod Added prometheus metrics support and debug logs 4 months ago
go.sum Added prometheus metrics support and debug logs 4 months ago
main.go using the same in main.go 4 months ago

README.md

Kinesis Pusher

This is intended to be used as a module and following the functions to be called to set it up.

Introduction

The module has runner.Runner which has client for AWS Kinesis and other details like AWS configuration, Kinesis Stream name, initial channel sizes, etc.

This struct runner.Runner has few methods that needs to be run as Go routines to start the tickers and ingesting the channels.

Prerequisites

This module designed for AWS Kinesis, and to use this one will need following,

  • AWS Account
  • Kinesis Stream
  • AWS Credentials - either via IAM User, or AWS Role using Instance Profile, etc.
  • AWS Credentials should have access to PutRecords and Describe* actions on provided Kinesis Stream

Usage

Create runner.Runner

This creates a new runner.Runner with 1000 as channel buffer size for pushing records and 10 as API Calls of PutRecords.

r := runner.New(1000, 10)

The Runner has some builder pattern options as well

  • WithIntervals allows to set intervals, for checking shards and checking records.
  • WithAsync enables the async flag to not wait for PutRecords Response, and send the data asynchronously.
  • WithDebug enables the debug flag, writes metrics to logger.

Configure the AWS Configuration

AWS Details that we need are Region, Profile (in case of shared profile) and Kinesis Stream name.

// configure the underlying kinesis client
r.Configure(
	runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)

(Optionally) In case if there's a need to enable metrics, use ConfigureWithMetrics()

r.ConfigureWithMetrics(
	runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)

Refresh the number of open shards at first

Kinesis can have some shards open (for read and write) and some close (just for reading). As rate limiting depends a lot on number of shards to have efficient data push in first cycle we check what is the number of open shards and the limits are applied based on that.

This also returns an error in case if there's a problem with credentials or stream's existence, so if something is wrong here, it doesn't make sense to move forward.

if err := r.CheckShards(); err != nil {
	log.Fatal(err)
}

Start the runner

As simple as this a long running go routine will start that will batch the incoming data and push it in batches to Kinesis, using PutRecords API.

// start the runner so that incoming data can be pushed
go r.Run()

Sending data

This struct runner.Runner has an exported channel RecordChan which accepts records of type runner.Record. Sending it is as simple as follows.

r.RecordChan <- runner.Record{
	Data: []byte("hello\n"),
	PartitionKey: fmt.Sprintf("pk-%d", time.Now().UnixNano()%1000)
}

Metrics (Optional)

Setup

Metrics are important here as there are higher chances of failure and recovery. Setting up Metrics is optional, it can be helpful if there's already a port running and can be added as a route to it.

Enabling metrics option is as simple as configuring Runner via ConfigureWithMetrics().

Once enabled, the handler needs to be registered via a r.Metrics.RegisterHandler(mux) call. In future this will accept something like Basic Auth.

r := runner.New(1000, 10)

r.ConfigureWithMetrics(
	runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)

r.Metrics.RegisterHandler(http.DefaultServeMux) // can be used like http.NewServeMux()

go r.Run()

http.ListenAndServe(":8001", nil)

Details

Currently there are two metrics exposed via /metrics endpoint.

  • Kinesis Push Delay: The delay in between sending data to r.echan for Kinesis PutRecords Call and the time it finished.
  • Kinesis Records Count: The number of records aggregated by success, failed and recovered. Ideally the difference between failed and recovered should be zero, which means all failed records were recovered.

License

MIT