Skip to content

a8m/kinesis-producer

Repository files navigation

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"time"

	"github.com/sirupsen/logrus"
	"github.com/a8m/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
	"github.com/sirupsen/logrus"
	producer "github.com/a8m/kinesis-producer"
	"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger

License

MIT