Stream microservices logs – how Riemann simplify things


This article is inspired by my step in into Clojure. I want to show you again my considerations about microservices logs streaming and analysis. I need to write it once again: logs from application are one of the most important things when it comes to debugging problems on production system. Nowadays it’s also important for DevOps team to react when bad things happen in system so logs analysis is must-have if you want to detect problems in your application before your business client will call you at night. I think that current, modern approach for logs analysis would involve machine learning algorithms which can detect real issues more precisely and ignore “noise” which is in system all the time. That article show improved architecture which originated there: 

This time I’m making improvement by simplifying whole architecture and using existing frameworks to achieve goals.


As you can see in that design microservices (which are Python 3 programs in my example) are pushing JSON logs directly to Kafka without using Fluentd to collect logs. I think that’s better approach because:

  1. Fluentd is out – one technology less, one problem less
  2. It’s easier to keep consistent configuration like this between deployments and local developer’s environment. It means that you can use that configuration on Kubernetes cluster in cloud and on local dockers – in case of fluentd it’s not so straightforward as fluentd usually attach to microservices logs files what is hard to simulate on local dockers

Important thing to point there is that microservices should have standardized and consistent logs structure. I would really advise to create logs in JSON format as it explicitly declare what is in your logs so it eliminates problems in parsing. Here is example log

  "level" : "info",
  "timestamp" : "2018-08-19T16:41:41+00:00",
  "path" : "",
  "line" : "10",
  "message": "My log"

That approach has also one disadvantage – size of logs grow. If your application produce big number of logs and you are afraid that it may be a problem to add name for every field in log you can consider Avro or Protocol Buffers which serialize data to binary format and use schema but sadly they are not human readable so please consider using there some “switch” for production and non-production environments.

Again Kafka will act in this design as efficient buffer for logs and infrastructure decouple mechanism. If we are streaming logs to Kafka we are sure that they will be saved even when logs consumer has problems/is down. As Kafka is extremely efficient we can be calm about our logs. Also we can attach many consumers to get our logs so alerting is possible, storing them in S3 or Elasticsearch, feeding machine learning models, etc. Kafka also decouples parts of infrastructure so we can avoid “infrastructure spaghetti” when X infrastructure components talk directly to Y other infrastructure components causing complexity on X*Y level.

Next part of technology stack in this example is Riemann – server/framework for handling event streams and attaching logic/alerts on them. Riemann is based on Clojure and allow to direct data streams from one source to another and to make analysis in middle of that. You can find great introduction to Riemann on that site:

I need to quote first paragraph from that article:

“For the last year I’ve been using nights and weekends to look to a variety of monitoring and logging tools. For reasons. I’ve spent a lot of hours playing with Nagios again (some years ago I wrote a book about it) as well as looking at tools like Sensu and Heka. One of the tools I am reviewing and am quite excited about is Riemann.”

My feelings about Riemann are the same – in this article Riemann will consume logs from Kafka, push them to Elasticsearch to allow searching using Kibana and make analysis to alert developer that high number of errors happen in particular python file.

As always full source code can be found on github:

Python microservice

So at first we need microservice which will generate logs. I wrote really dummy implementation which generate quite big number of logs on info and error level. First part is to create python login handler which will push logs to Kafka:

import logging

from confluent_kafka import Producer

class KafkaHandler(logging.Handler):

    def __init__(self, kafka_bootstrap_servers="kafka:9092", kafka_topic="program-logs", data_serializer_func=lambda v: v.encode('utf-8')):
        self.producer = Producer({'bootstrap.servers': kafka_bootstrap_servers})
        self.kafka_topic = kafka_topic
        self.data_serializer_func = data_serializer_func

    def delivery_report(self, err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    def emit(self, record):
            msg = self.format(record)
            self.producer.produce(self.kafka_topic, self.data_serializer_func(msg), callback=self.delivery_report)
        except Exception as e:
            logging.Handler.handleError(self, record)

    def close(self):

As you can see it simply extend logging handler by adding Confluent Kafka producer which take log JSON string and push on program-logs Kafka topic. Delivery report is added to easier POC diagnosis but should be probably deleted in real application.

Next part is to create JSON logs – I used for that example from – our logs will contain fields like:

  • message – log message
  • lineno – line number
  • pathname – name of python file
  • level – level of logs e.g. info, error

After configuration I simply start infinitive loop which calls 2 subprograms to achieve different pathname which will be used in Riemann.

while True:
    print("Logging loop start")
    print("Logging loop end")

So that loop will generate 2 logs record every 10ms so quite big number.

def random_log():
    log = structlog.getLogger(__name__)
    if random() > 0.7   :
        log.error("Fatal error")
    else:"I'm fine")

Random is used to generate errors with lower probability that regular logs.


To run that application in form of POC we need of course local environment which with we can play on local machine. As I wrote in

“Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools.”

So of course we will use docker-compose file to run local environment:

version: '3'

    image: confluent/zookeeper:3.4.6-cp1

    image: confluent/kafka:
      - zookeeper:zookeeper

    image: riemannio/riemann:0.3.1
      - ./riemann.config:/etc/riemann.config
      - HOST_IP=


      - 5601:5601
      context: .

Comments to lines:

4,7 – run Kafka with Zookeeper

15 – map riemann.config clojure file to configure Riemann

17 – here is little trick to access host from docker container so it’s my WIFI adress – you should put your host IP there

19, 22 – run Elasticsearch and Kibana to store and browse logs

25 – publish Kibana on localhost:5601 to browse logs

26 – Python microservices, built using Dockerfile which add all python files and install requirements

Riemann in action

Here comes riemann.config. At first we need to consume JSON logs from Kafka topic. It’s very easy as Kafka consumer is part of Riemann core so we need to just:

(kafka-consumer {:consumer.config {:bootstrap.servers "kafka:9092"
                 :topics ["program-logs"]})

So now Riemann consume messages from Kafka and allow us to manipulate that stream. As next step we want to push that stream to Elasticsearch.

(def elastic
  (elasticsearch {:es-endpoint "http://elasticsearch:9200"
                  :es-index "riemann"
                  :index-suffix "-yyyy.MM.dd"
                  :type "event"}
                  (fn [event]
                        (merge event {}))))
  (where (not (tagged "riemann")) elastic)

Streams is Riemann function which allow to attach logic into events. In this case we are filtering events to exclude those tagged as “riemann” as we don’t want to collect Riemann events which inform us about his metrics – we want only application events/logs. After that operation we can browse logs in Kibana. Please note that JSON is exploded to fields in Elasticsearch so we can easily filter by level or path.

Screenshot from 2018-08-18 23-05-02

Last part will be to push notifications to developer. On Ubuntu you can send notification to GUI from terminal by using notify-send command. Sadly it’s not easy to use that command from docker container to send notification to host’s GUI. As a workaround I started following bash script on host machine (script found on stackoverflow):


# no multiple connections: needs to improve
while true; do
    line="$(netcat -l -p 10000)"
    notify-send -- "Received Message" "$line"

So that code listen on 10000 TCP port and send notification with string received. All we need to do is to send string from Riemann docker to host IP on port 10000 – that explain why I placed HOST_IP in docker-compose file.

To send string on TCP socket we will use Java class within Clojure/Riemann as I not found some Clojure-native way.

(defn- transmit
  [writer socket message]
  (let [bytes-out   (bytes (byte-array (map byte message)))]
    (.write writer bytes-out 0 (count bytes-out))
    (.flush writer)))

(defn send-warning
  (with-open [socket (  (System/getenv "HOST_IP") 10000)
              writer ( (.getOutputStream socket))]
    (transmit writer socket
    (str "FOUND PROBLEM IN "(get event :pathname)":"(get event :lineno) )

That program open TCP socket on HOST_IP:10000 and write there warning string with pathname and lineno from event to inform developer that something bad happen in his code.

And final step is to attach alerting logic on events stream. To make example a little more advanced let’s say that we want to send warning only if logs with level error occurred more than 25 times within 10 seconds window for particular python file.

( streams
    (where (= (:level event) "error")
        (with :metric 1
            (by :pathname
                (rate 10
                      (where (> metric 25)

So to achieve that goal we are adding additional field :metric to every event with error level. Then “by” split event stream for us to N streams grouped by “pathname”. Rate command make 10 second windows and sum fields within that window so we can observe if added :metric is above 25 – if yes just send warning on TCP socket. Effect:


So that’s first article showing power of Riemann – I promise I will comeback to Riemann in next article with some more advanced alerting strategy as it’s real fun to write on Clojure.

Form your cloud#1 Please smile! Highly available web camera photo upload to s3 bucket


This article show how to create simple highly available web application which allow to upload photo from your web camera to s3 bucket. Whole example can be done using AWS Free Tier. Technology stack:

  • Terraform
  • Ansible
  • Packer
  • AWS EC2
  • AWS S3
  • Piece of HTML5&JS
  • Piece of go lang
  • Certbot

Full source can be found there: 

Application contain 3 main parts:

  • Frontend
    • HTML5 video capture from camera
    • Snap photo and send it to backend using JQuery Ajax call
    • List existing photos in bucket as hyperlinks
  • Backend
    • Serve frontend
    • Integrate with S3
    • Expose actions like upload, list, get particular image
  • Infrastructure
    • Bring up EC2 instances in ASG from baked AMI with application
    • Spread across 3 AZs
    • Spawn ALB to access application

Proposed design:

form-you-cloud-1 (1)

Go app

Our application is really simple web application, exposing some endpoints in HTTP protocol and integrating with AWS S3.

Main libraries:

Endpoints :

  • GET / – dummy web UI with camera capture using HTML5 video and list of photos
  • GET /image/{image_name} – get particular image/photo
  • POST /image – save new image/photo

Screen of simple UI – you can see A4 paper which I’m showing to camera

Screenshot from 2018-07-27 19-12-04

That UI is served directly from go as it’s saved in go application as big multi-line string. List of images is generated using built-in template engine in go:

{{ range $key, $value := . }}
	<li><a href="/image/{{ $value.Key }}">{{ $value.Key }}</a></li>
{{ end }}

Map to iterate is generated in following way:

func GetRootSite(ResponseWriter http.ResponseWriter, _ *http.Request) {
	svc := SpawnAwsSdkS3Service()
	resp, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: aws.String(bucket)})
	if err != nil {

	tmpl, err := template.New("hello").Parse(site_template)
	if err != nil { panic(err) }
	err = tmpl.Execute(ResponseWriter, resp.Contents)
	if err != nil { panic(err) }

Comments to lines:

2 – my wrapper for S3 session from AWS SDK

3 – list bucket to get existing images

8 – parse site_template – big string in code with HTML/JS UI

10 – render template directly to HTTP response, passing as parameter list from line 3

Trick with HTML5/JS camera comes from:

I modified a little script to snap camera view into hidden canvas and send it to endpoint using JQuery AJAX call after clicking Snap&Upload button. To see effect I just simply reload site after upload to add image on list. What interesting here


return snapped photo as base64 with content-type header. So I’m just sending that to backend and then simply remove the header and convert that to bytes:

decoded, err := base64.StdEncoding.DecodeString(strings.Split(string(body), ",")[1])

S3 file upload is like:

_ , err1 := svc.PutObject(&s3.PutObjectInput{Key: aws.String(strconv.FormatInt(time.Now().UnixNano(), 10)+".jpg"), Bucket: aws.String(bucket), Body: bytes.NewReader(decoded)})

so create new file in s3 bucket with numeric timestamp name.

Last important sentence: application take s3 bucket name as starting argument to allow dynamic configuration and avoid hardcoding that in code.

How about state?

As we have application we can take a look on infrastructure/deployment now. Firstly, before working with Terraform, it’s good practice to store state in S3 bucket.

variable "state_bucket_name" {


resource "aws_s3_bucket" "state" {
  bucket = "${var.state_bucket_name}"
  acl    = "private"

  versioning {
    enabled = true


Activated versioning is important, especially in case of damaged state when you can rollback state to previous version as fixing damaged state in Terraform is something what definitely should be avoided. Also we want to keep that bucket private as it’s showing some internals of infrastructure construction so it would be useful for hackers.

Ok so we have bucket for state but building that s3 bucket also generate state – so it’s classic chicken and egg problem, where should I store that state? As it’s one timer operation it’s totally safe to store that state in file system so usually I commit that to git repository.

To tell Terraform to use that state bucket we need to put following config:

terraform {
  backend "s3" {
    key = "deployment"

Where key is path to state file inside the bucket.

After adding that config we need to properly initialize Terraform and activate s3 state store with command:

terraform init -backend-config="bucket=bucket_name" -backend-config="region=bucket_region"

Bake image with Ansible and Packer

As application is ready it need to be delivered to AWS somehow. To achieve that we are going to use Ansible and Packer to build application and create AMI following immutable infrastructure pattern.

To start with Packer we need subnet in AWS as flow which Packer makes for you is like:

  • start new EC2 instance
  • apply Ansible playbook
  • create AMI
  • destroy EC2 instance

Code for placing subnet in TF:

resource "aws_subnet" "packer" {
  vpc_id = "${}"
  cidr_block = "${local.packer_cidr}"
  availability_zone = "${local.region}a"

  tags {
    Name = "packer"

resource "aws_route_table_association" "packer" {
  subnet_id = "${}"
  route_table_id = "${}"

Comments to lines:

3 – it’s good practice to define your network CIDRs in one file and refer that using TF locals as it gives you better overview of network design

Now we need Ansible playbook to build application and transfer it to destination machine what can be done in naive way as Packer actually start Ansible on your machine to provision EC2 instance for building AMI.

- hosts: all
  become: yes
  gather_facts: yes
    ansible_python_interpreter: /usr/bin/python3
    - name: build app
      become_user: jbujny
      local_action: shell go build chdir=../../goapp
    - name: copy go app
        src: ../../goapp/goapp
        dest: /opt/goapp
        mode: 0777

Comments to lines:

6 – you need to place that if you want to run Ansible on some new Ubuntu to tell Ansible that it should use python3 instead of python2

10 – build our go app on host machine (my PC)

11 – copy that application to remote host

So finally packer json file:

  "description": "Go app for uploading files",
  "builders": [
      "type": "amazon-ebs",
      "ami_name": "go-app-{{ timestamp }}",
      "region": "eu-central-1",
      "source_ami_filter": {
        "filters": {
          "virtualization-type": "hvm",
          "name": "ubuntu/images/*ubuntu-bionic-18.04-amd64-server-*",
          "root-device-type": "ebs"
        "owners": ["099720109477"],
        "most_recent": true
      "instance_type": "t2.micro",
      "ssh_username": "ubuntu"
  "provisioners": [
      "type": "ansible",
      "playbook_file": "playbook.yml",
      "user": "ubuntu"

Comments to lines:

6 – that name is important as we will refer to that in Terraform

8 – find Ubuntu 18.04 as base image

18 – that user has sudo on Ubuntu machines in AWS

21 – tell Packer to use Ansible to provision AMI

Now we can start packer by just typing:

packer build packer.json

and packer should create AMI for us so we have ready image with application – now it’s time for infrastructure.


Our application need following network to operate:

  • VPC
  • 3 subnets for auto scaling group to spread EC2 instances and make multi-az HA application
  • 3 subnets for application load balancer
  • security group for ec2 instances
  • security group for ALB

We are going to place network in separate terraform state to make better isolation between different part of application.

Firstly to allow communication to clients we need internet gateway and proper routing:

resource "aws_internet_gateway" "internet" {
  vpc_id = "${}"

  tags {
    Name = "main"

resource "aws_route_table" "internet" {
  vpc_id = "${}"

  route {
    cidr_block = ""
    gateway_id = "${}"

  tags {
    Name = "internet"

Here is example of single subnet which is part of ASG.

resource "aws_subnet" "asg_a" {
  vpc_id = "${}"
  cidr_block = "${local.asg_a}"
  availability_zone = "${local.region}a"

  tags {
    Name = "asg_a"

output "asg_a_subnet_id" {
  value = "${}"

resource "aws_route_table_association" "asg_a" {
  subnet_id = "${}"
  route_table_id = "${}"

I don’t like loops in TF as I think that declarative language should avoid imperative things so when I need to define <= 3 resources related to AZs I’m just using copy&paste approach.

Next we need SG for ASG, egress in such form is required to access AWS S3, ingress is required to allow incoming traffic from ALB.

 resource "aws_security_group" "asg" {
  vpc_id = "${}"
  name = "asg"
  ingress {
    from_port = 8000
    to_port = 8000
    protocol = "tcp"
    cidr_blocks = ["${aws_subnet.alb_a.cidr_block}","${aws_subnet.alb_b.cidr_block}","${aws_subnet.alb_c.cidr_block}"]
  egress {
    from_port = 443
    to_port = 443
    protocol = "tcp"
    cidr_blocks = [""]

And SG for ALB, egress is required to access our application in ASG, ingress is required to allow clients to connect to our application. We are allowing here HTTP port.

resource "aws_security_group" "alb" {
  vpc_id = "${}"
  name = "alb"
  egress {
    from_port = 8000
    to_port = 8000
    protocol = "tcp"
    cidr_blocks = ["${aws_subnet.asg_a.cidr_block}","${aws_subnet.asg_b.cidr_block}","${aws_subnet.asg_c.cidr_block}"]
  ingress {
    from_port = 80
    to_port = 80
    protocol = "tcp"
    cidr_blocks = [""]


We have AMI with application in place and created network so now it’s time to run infrastructure and expose application in Internet. To achieve that we need:

  • S3 buckets for photos
  • IAM role with policy which can be attached to EC2 instance allowing access to S3 bucket with photos
  • ASG definition with spread across AZs
  • ALB exposing UI and API

S3 bucket definition is quite obvious and very similar to that bucket with state so let’s take a look on IAM role definition:

resource "aws_iam_role_policy" "iam_role_policy" {
  name = "web_iam_role_policy"
  role = "${}"
  policy = <<EOF
  "Version": "2012-10-17",
  "Statement": [
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": [":${}"]
      "Effect": "Allow",
      "Action": [
      "Resource": ["${}/*"]

We need to allow our application to:

  • list bucket to get existing photos
  • put new object in bucket to upload new photo
  • get object from bucket to serve uploaded photo

Please note asterisk in second statement – that’s quite important and it’s a little tricky as giving put/get to just bucket without asterisk will cause deny error coming from AWS.

To start EC2 instances in ASG we need launch configuration which defines attributes related to particular EC2 instance so it’s comparable to aws_instance resource with a few additional parameters. Also we need to effectively find built AMI as we don’t want to manually read created image id and hardcode it in configuration.

data "aws_ami" "goapp" {
  most_recent = true

  filter {
    name = "name"
    values = [

  filter {
    name = "virtualization-type"
    values = [

  owners = [

resource "aws_launch_configuration" "goapp" {

  lifecycle {
    create_before_destroy = true
  name_prefix = "goapp-"
  iam_instance_profile = "${}"
  image_id = "${}"
  instance_type = "t2.micro"
  associate_public_ip_address = true
  security_groups = ["${data.terraform_remote_state.network_state.asg_sg_id}"]

  root_block_device = {
    volume_type = "gp2"
    volume_size = 8
    delete_on_termination = true
  user_data = &lt;<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>&lt;EOF1
cd /opt/
./goapp ${}

Comments to lines:

1 – data block find latest AMI with proper name prefix which we can refer in other blocks

23 – as terraform documentation says: "In order to update a Launch Configuration, Terraform will destroy the existing resource and create a replacement. In order to effectively use a Launch Configuration resource with an AutoScaling Group resource, it's recommended to specify create_before_destroy in a lifecycle block."

40 – just start application in user data – in real application we should use some other mechanism like starting application as systemd service or docker container as in current design after machine reboot our app won't go up.

Line 30 need to be described deeper as we are referring there networking module by using trick with remote state.

data "terraform_remote_state" "network_state" {
  backend = "s3"
  config {
    bucket = "jakubbujny-article-form-your-cloud-1"
    region = "eu-central-1"
    key = "network"

So in Terraform you can say that you want to use output variables from some other state by specifying state bucket name and state file name what allows to divide complex environments into smaller parts.

Next step is to define ASG

 resource "aws_placement_group" "placement" {
  name = "goapp"
  strategy = "spread"
resource "aws_autoscaling_group" "goapp" {

  name = "goapp"
  max_size = 3
  min_size = 3
  placement_group = "${}"
  desired_capacity = 3
  launch_configuration = "${}"
  vpc_zone_identifier = [

  tag {
    key = "Name"
    value = "goapp"
    propagate_at_launch = true

Comments to lines:

1 – placement group allow us to say if we want to place ASG instances in cluster so they will be deployed "near" to allow fast data exchange or to use "spread" what means that ASG will be spread across AZs

Finally ALB can be attached to ASG to expose application.

resource "aws_alb" "goapp_alb" {
  name = "goapp-alb"
  security_groups = [
  internal = false
  subnets = [


resource "aws_alb_target_group" "alb_target_group" {
  name     = "goapp-alb"
  port     = "8000"
  protocol = "HTTP"
  vpc_id   = "${data.terraform_remote_state.network_state.vpc_id}"
  tags {
    name = "goapp"

  health_check {
    healthy_threshold   = 3
    unhealthy_threshold = 10
    timeout             = 5
    interval            = 10
    path                = "/"
    port                = "8000"

resource "aws_alb_listener" "alb_listener" {
  load_balancer_arn = "${aws_alb.goapp_alb.arn}"
  port              = "80"
  protocol          = "HTTP"

  default_action {
    target_group_arn = "${aws_alb_target_group.alb_target_group.arn}"
    type             = "forward"

resource "aws_autoscaling_attachment" "attachement" {
  alb_target_group_arn   = "${aws_alb_target_group.alb_target_group.arn}"
  autoscaling_group_name = "${}"

First block is quite obvious – create ALB and place in proper network subnets. Second block define target group – where we want to attach our ALB so we need to specify application protocol/port and simple healtcheck. 3rd block define listener so the part of ALB which see client coming from internet – default action is to forward requests to target group. Last bock is obvious ALB attachment.

So finally we have all part of our application and we can see the effect but…

Oh no! SSL!

Using that configuration our application is exposed on plain text HTTP what sadly is real problem because Chrome won’t allow you to give permission to particular site to use your camera if that site is not secured using SSL encryption – we need some fast workaround!

So I’m going to do it in little hacky way – please do not use that in any real application.

Simple workaround will be to use Certbot to issue SSL certificate for our application and then attach that certificate to ALB and expose HTTPS/443 connection. It’s not so easy as AWS domain are on blacklist in Certbot probably to avoid situation when hackers want to issue certificates to make some bad things like MITM attacks.

I’m going to make that using manual procedure of issuing certificate what means that Certbot generate for you filename and file content which need to be exposed on web server on domain to which you want certificate. So we need:

  • Create another S3 bucket for that Certbot file
  • Modify policy
  • Modify ALB SG to allow 443
  • Add endpoint in application to expose that file on demand
  • Add some domain to our application – the easiest way is to make CNAME in some existing domain
  • Issue certificate
  • Create HTTPS forward in ALB using that cert

Looks like a lot of work? Actually it’s really easy. Creating S3 bucket is quite obvious – we need to remember to add new S3 bucket ARN to policy to allow EC2 instances to access that bucket. We need to add 2nd parameter in application to pass bucket name and following lines of code:

func GetCertbotAuth(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	svc := SpawnAwsSdkS3Service()
	resp, err := svc.GetObject(&s3.GetObjectInput{Bucket: aws.String(bucket_auth), Key: aws.String(vars["auth"])})
	if err != nil {
	io.Copy(w, resp.Body)

For CNAME record I just used domain on which I have blog. As that domain is parked on wordpress DNS I just added that in panel:

Screenshot from 2018-07-27 19-13-33

So that CNAME refer to ALB domain. After that operation I can issue certificate on domain by running certbot command, manually upload required file to s3 bucket and confirm in Certbot that it’s ready for verification – Certbot will connect on port 80 to search for specified file with specified content. After that I have:

Screenshot from 2018-07-27 19-12-42

So far so good. In normal application you would use Route53 domain, regular SSL certificate or some issued by Amazon for you and define everything in Terraform. But we are hacking here a little so I’m going to add HTTPS endpoint in ALB using console as it allows to define forward and paste certificate content in one form which will be uploaded to ACM:

Screenshot from 2018-07-27 19-15-51


So now finally I can access and upload my face to S3 – easy-peasy! 😀

Stream microservices logs from Kubernetes to Kafka


Application logs are very important in case of problem investigation and application health monitoring. Nowadays you want to monitor your logs and react on problems before your client will know that something is wrong. Also when your system loose stability you need to react immediately and keep your logs available and up-to-date as they are your ‘sight’ in application – let’s say that logs are first eye and metrics are second eye. In microservices world logs aggregation is even harder because you have X instances of particular microservice, every instance is printing huge number of logs to inform you what’s happening in application and those microservices are typically distributed to many different ‘nodes’ – how to deal with that?

This article describe how to use Fluentd on Kubernetes acting as Kafka producer to stream logs and how to use Fluentd on virtual machine acting as Kafka consumer to push logs to Elasticsearch. Also I want to show possibilities which give you application logs stream on Kafka by making dummy python implementation sending notifications on Slack to inform developers/devops about detected exceptions.

Why Kafka?

Streaming logs via Kafka gives you following advantages:

  1. As Kafka is extremely efficient it’s safe ‘buffer’ for your logs when your logs storage cannot consume huge amount of logs on load spikes
  2. Logs on Kafka are ready to integrate – you can attach many consumers and place logs in different storage engines or attach directly some analysis e.g. Apache Spark
  3. If you treat Kafka as integration bus for your system you can avoid ‘infrastructure spaghetti’ by making integration of different infrastructure modules using single point.

Proposed design


Fluentd on Kubernetes

Fluentd is deployed on Kubernetes using DaemonSet deployment. Please see deployment descriptor below:

apiVersion: extensions/v1beta1
kind: DaemonSet
  name: fluent
  namespace: fluent
    k8s-app: fluent
    version: v1 "true"
        k8s-app: fluent
        version: v1 "true"
      - key:
        effect: NoSchedule
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.1.3-debian-elasticsearch
        command: ["/bin/sh"]
        args: ["-c", "gem install fluent-plugin-kafka && cp /fluent-config/fluent.conf /fluentd/etc/ && /fluentd/"]
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-config
          mountPath: /fluent-config
      terminationGracePeriodSeconds: 30
      - name: varlog
          path: /var/log
      - name: varlibdockercontainers
          path: /var/lib/docker/containers
      - name: fluent-config
          name: fluent
          - key: fluent_conf
            path: fluent.conf

Please keep eye on important sections in that file:

  • Toleration means that we want to deploy fluent also on Kubernetes masters
  • Command override install kafka plugin and copy fluent.conf which we are attaching as config map

Here you can find config map:

apiVersion: v1
kind: ConfigMap
  name: fluent
  namespace: fluent
  fluent_conf: |
    @include kubernetes.conf

      @type kafka_buffered

      brokers             your-kafka-broker-host:9092

      topic_key             your-logs-topic
      default_topic         your-logs-topic
      output_data_type      json
      output_include_tag    true
      output_include_time   true
      get_kafka_client_log  false

      required_acks         1
      compression_codec     gzip

In that config we simply say to fluent: please produce all collected logs as messages on Kafka. Also we are activating gzip compression for optimization. We need at least one ack – you should set your-logs-topic replication factor to at least 2 to keep your logs when one broker is failing. 

Kafka consumer – fluentd and Elasticsearch

After streaming logs to Kafka you can easily consume them using another fluent instance – my advise here is to place that consumer on the same machine where you have Elasticsearch or if you are using Elasticsearch SaaS like AWS ES, at least in the same subnet. Here you can find config:

 @type kafka_group
 brokers your-kafka-broker-host:9092
 consumer_group fluentd
 topics your-logs-topic
 format json
 use_record_time true

 @type elasticsearch
 @id out_es
 log_level info
 include_tag_key true
 host your-elasticsearch-host
 port 443
 scheme https
 logstash_format true
 logstash_prefix kubernetes
   flush_thread_count 8
   flush_interval 5s
   chunk_limit_size 6M
   queue_limit_length 256M
   retry_max_interval 30
   retry_forever true


Please keep eye on important sections in that file:

  • We are consuming logs as fluentd consumer group – if your-logs-topic has more than one partition you can consume logs using multiple instances of consumers
  • We are simply saving logs in ‘kubernetes’ index in Elasticsearch

Kafka consumer – search & notify

The latest step is to attach consumer for log analysis. For that I’m going to modify example from confluent-kafka-python library using Python 3.6:

from confluent_kafka import Consumer, KafkaError
import json
import requests

c = Consumer({
    'bootstrap.servers': 'your-kafka-broker-host',
    '': 'logs-inspector',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'


while True:
    msg = c.poll(1.0)

    if msg is None:
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:

    message_dict = json.loads(msg.value().decode('utf-8'))
    if "log" in message_dict.keys():
      if "exception" in message_dict["log"].lower():
        notification = f"Found problem in log\nContainer: {message_dict['kubernetes']['container_name']}\nNamespace: {message_dict['kubernetes']['namespace_name']} \nPod: {message_dict['kubernetes']['pod_name']} \nHost: {message_dict['kubernetes']['host']}"
          data=json.dumps({"attachments": [{
            'text': notification,
            'color': "#00FF00"
          headers={'Content-Type': 'application/json'})

That script is very simple. Consume log message from topic, check if log key exists and it contains “exception” message – if yes, send notification via slack webhook to your team members. Please note additional information which Kubernetes gives you like node, namespace and pod name.

Useful Docker #1 – Seed container at runtime

Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools. Using that feature I can cover many cases using simple docker container and some magic around. In this article series I’m going to show you how to use docker in creative way and how to debug problems inside docker containers.

Today something about CMD, magic in runtime and docker commit.

I have assumption that you have basic knowledge about docker and some experience in using that great tool – if not I suggest to check that site:

My environment in this article:

  • Docker version 17.12.0-ce, build c97c6d6
  • Ubuntu 17.10
  • zsh 5.2 (x86_64-ubuntu-linux-gnu)
  • tmux 2.5

Everything should also work in docker on windows (this one using hyper-v) – if there is problem somewhere please contact me on my linkedin


Seed your container in runtime

Dockerhub provides many images (as mentioned here in 2016 was 400k) with useful defaults but these defaults rarely meet our requirements. To deal with that situation we can take many strategies but one of my favorite is overriding default docker command with some magic.

Let’s say that we want to host blog in docker container and switch my site logo to


just for fun!

Required steps:

  • Start container
  • Download site content recursive
  • Start web hosting
  • Change image

We need to start with some static web content hosting like

We have command from this site (with port publish):

docker run -p 8080:80 --name some-nginx -v /some/content:/usr/share/nginx/html:ro -d nginx

Ok but after staring this container we can see only welcome site from nginx – how to inject content there? There are 2 ways:

  • Add content to docker image – useful if we want to use that image many times
  • Override run command and download content before web server start – our approach because we are fast&furious

So what’s default command in nginx? To check that we should go to origin Dockerfile like this and at end we see:

CMD ["nginx", "-g", "daemon off;"]

Cool, let’s try this way:

docker run -it -p 8080:80 nginx bash -c "cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate ; nginx -g 'daemon off;'"

What’s happening there:

  • docker – 😉
  • run – start new container
  • -it – interactive (get stdout, pass stdin)
  • -p 8080:80 – publish internal nginx 80 port onto our machine 8080 so we can access nginx using localhost:8080
  • bash -c ” ” – that’s little magic to pass some long command with &&, ; – bash man says:
    -c string If the -c option is present,  then  commands  are  read  from
                 string.   If  there  are arguments after the string, they are
                 assigned to the positional parameters, starting with $0.
  • cd… – sure
  • wget – we need to download recursive my site
  • nginx -g – start nginx (use default command from Dockerfile)

Ok so let’s run it… woooops!!! Something is not working ;(

bash: wget: command not found

Please remember that every docker container is isolated virtual OS  (so actually it’s not but you can think about docker in this way 😉 ) – it means that every docker image has different tools installed. Sadly nginx image doesn’t contain wget tool but okey dokey – we can install wget using apt-get inside container, no worries:

docker run -it -p 8080:80 nginx bash -c "apt-get update && apt-get install --no-install-recommends --no-install-suggests -y wget && cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate ; nginx -g 'daemon off;'"

What’s happening there:

  • sudo not needed because inside container we are root
  • apt-get update – very important! Please remember to run that command before any apt-get install inside containers to get valid result
  • apt-get install – here -y is important because that command will run in non-interactive mode what means that we need auto-confirm that we are sure to install following package

So after running this command everything seems fine – wget is downloading my site and nginx is starting but at localhost:8080 I see:

Screenshot from 2018-02-18 12-14-04

Damn that’s not cool….


To debug that situation we need to get into container and see whats happening in file system after wget command. To do that we need to start with:

docker ps

And output:

Screenshot from 2018-02-18 12-16-45

The most important there is Container Id – we’re going to use that ID to get into container, open new console and try (change container id to yours because it’s random string):

docker exec -it 69cb502ab0d1 bash

After that command we are in new shell process inside container in interactive mode. Let’s check what’s under /usr/share/nginx/html:

drwxr-xr-x 1 root root 4.0K Feb 18 11:20 . 
drwxr-xr-x 1 root root 4.0K Dec 26 18:16 .. 
-rw-r--r-- 1 root root 537 Dec 26 11:11 50x.html 
-rw-r--r-- 1 root root 612 Dec 26 11:11 index.html 
drwxr-xr-x 7 root root 4.0K Feb 18 11:20

Ok so wget downloaded my site into directory – try in web browser:


You should see my blog 🙂

Change image

Stay please in our shell process inside container. Changing site logo is so easy, just type: vim /usr/share/nginx/html/ omg there is no vim


<disclaimer> Many popular images don’t have basic tools like vim or even bash because every additional tool not needed at runtime means bigger image size to download for users </disclaimer>

apt-get install vim

I believe that you are VIM WIZARD LVL 300 but if not just type:

ESC /site-logo ENTER

To find my logo section and change next <img> tag src to (should help: ESC a ) :



Go to web browser and hit F5:

Screenshot from 2018-02-18 12-36-26.png

That was so easy, was it?

Commit container into image

Last step is to convert container into image.


It means that we can say to docker: hey! I have here some container where I installed some tools and modified some state, please convert that container into image so I can reuse that manual configuration many times


docker commit 69cb502ab0d1 my-awesome-image:1.0

So again we are using Container ID and docker is creating for us image with name my-awesome-image:1.0 – let’s check if that works:

docker run -it -p 8081:80 my-awesome-image:1.0 bash -c "nginx -g 'daemon off;'"

What’s happening:

  • Run container from committed image
  • Override command again – docker set my-awesome-image’s cmd to that one with wget but we don’t need that here! We committed whole container so site is already in image – if we don’t need latest version we can override again cmd to start only web server

Go under:


And you should see my blog and docker logo instead mine.


As you can see docker is quite powerful – when using with care you can really do magic cases. I hope that was helpful for you – I decided to divide docker article into series because I have many more examples how to use that tool in creative way. See you in next article!

How GPU can help you if your algorithm is not efficient


Nowadays IT world is quite complex – programmers must learn how to deal with data integrity, how to talk with database using ORM, how to design application using DDD or how to call microservice using REST request. We should remember that computer programming is more about writing algorithms for computers and less about learning APIs of libraries – I’m actually asking myself: is it truth? Programmers today have not enough time to write efficient algorithms because:

  • please add new feature
  • please remember that you have 100 features in backlog
  • please make it now working and we will go back there in future for performance improvements

Writing quick algorithm is very advanced issue – you must really understand problem domain and have big knowledge about algorithms complexity. Look at size of book “Introduction to algorithms” by Thomas H. Cormen, Charles E. Leiserson, Ronald L. Rivest, Clifford Stein – 1300 pages!


In this article I’m going to use simple algorithm from that book: find maximum subarray.

Assumption: we have 1D array of integer numbers

Task: find startIndex and endIndex of maximum subarray ( addition operation on elements) in that array, provide result of addition

Ok so let’s say that we have following array:

array = [1, 2, 10, 12]

Answer for that case is very easy – whole array is maximum so startIndex is 0, endIndex is 3 and result is 25. So when all array items are positive it means that it’s boring case, take a look on something more interesting:

array = [13,-3, -25, 20, -3, -16, -23, 18, 20, -7, 12, -5, -22, 15, -4, 7]

Here answer is not so easy – when we just add all items our result is: -3

But when we add items from index 7 to index 10 our result is: 43 what is maximum subarray for that array. How to code that algorithm for best performance?

Right solution

Proper and efficient solution for such problem is to use divide and conquer algorithm. In this article I’m not going to focus on explaining how that algorithm works because it’s not a main point. In simple world – it uses recursion and divide your array to smaller parts in next steps what allows to find result really fast. I’ve implemented that algorithm in golang, here is some part of code:

func findMaximumSubarray(array []int, low int, high int) (int, int, int) {
        if high == low {
                return low,high,array[low]
        } else {
                mid := (low+high)/2
                leftLow,leftHigh,leftSum := findMaximumSubarray(array, low, mid)
                rightLow,rightHigh,rightSum := findMaximumSubarray(array, mid + 1, high)
                crossLow,crossHigh,crossSum := findMaxCrossingSubarray(array, low, mid, high)
                if leftSum >= rightSum && leftSum >= crossSum {
                        return leftLow, leftHigh, leftSum
                } else if rightSum >= leftSum && rightSum >= crossSum {
                        return rightLow, rightHigh, rightSum
                } else {
                        return crossLow, crossHigh, crossSum

func findMaxCrossingSubarray(array []int, low int, mid int, high int) (int, int, int) {
        leftSum := math.MinInt32
        sum := 0
        maxLeft := 0
        for i:=mid; i > low; i-- {
                sum += array[i]
                if sum > leftSum {
                        leftSum = sum
                        maxLeft = i
        rightSum := math.MinInt32
        sum = 0
        maxRight := 0
        for i:=mid+1; i  rightSum {
                        rightSum = sum
                        maxRight = i
        return maxLeft, maxRight, leftSum+rightSum

Full source code here:

To analyze execution time of that algorithm we should use it on some random integer array and measure execution using time linux program, here some examples for 50 000 000 array length:

./divide-and-conquer-recursive 4,73s user 0,08s system 100% cpu 4,808 total

./divide-and-conquer-recursive 4,67s user 0,14s system 100% cpu 4,807 total

./divide-and-conquer-recursive 4,68s user 0,12s system 100% cpu 4,794 total

It’s very fast and working well but requires deep understand of problem and big knowledge about algorithms – how to implement that simpler?

Bad solution – brute-force

The easiest implementation of that program to solve that problem is to use brute force – in other words just move across whole array item-by-item and find maximum result by adding next items. Take a look on simple implementation in python:

def algorithm(array):
    bestStartIndex = 0
    bestEndIndex = 0
    bestSum = 0
    for i in range(0, len(array)):
        currentSum = array[i]
        for j in range(i + 1, len(array)):
            currentSum += array[j]
            if currentSum > bestSum:
                bestSum = currentSum
                bestStartIndex = i
                bestEndIndex = j
    return bestStartIndex, bestEndIndex, bestSum

Full source code here:

So easy right? But look what’s happening – we are making nested for loop what means that for 50 000 000 elements array we have that number of loop calculations (programmers&mathematicians please don’t be angry because I rounded some numbers for easier calculation):

\sum_{n=1}^{50 000 000} 50 000 000 - n = (0+1+2+3+4+...+49 999 999) = \newline \newline  \frac{0 + 49 999 999}{2} * 50 000 000 = 1,249999975 * 10^{15}

Quite big number – so what’s happening when we run that program? Maybe start with 25000 array length:

python3 21,82s user 0,03s system 99% cpu 21,862 total

python3 22,48s user 0,00s system 99% cpu 22,487 total

python3 21,82s user 0,00s system 99% cpu 21,821 total

It’s soo bad. Even small number of elements cause that program very, very, VERY slow. Here is some magic from python – out of the box python is not very good language for mathematical operations. To solve that problem we need some external library

Numba works by generating optimized machine code using the LLVM compiler infrastructure at import time, runtime, or statically (using the included pycc tool).

So just modify our program this way:

from numba import jit

def algorithm(array):
    bestStartIndex = 0
    bestEndIndex = 0
    bestSum = 0
    for i in range(0, len(array)):
        currentSum = array[i]
        for j in range(i + 1, len(array)):
            currentSum += array[j]
            if currentSum > bestSum:
                bestSum = currentSum
                bestStartIndex = i
                bestEndIndex = j
    return bestStartIndex, bestEndIndex, bestSum

And now:

python3 0,95s user 0,52s system 195% cpu 0,754 total

python3 0,92s user 0,58s system 191% cpu 0,787 total

python3 0,95s user 0,56s system 189% cpu 0,793 total

So far so good!!! Try with some bigger size, 250 000

python3 34,00s user 0,42s system 101% cpu 33,906 total

python3 32,83s user 0,58s system 102% cpu 32,688 total

python3 34,43s user 0,57s system 102% cpu 34,284 total

Not really bad but how we can speed up that algorithm? Maybe some parallelism would help?


How GPU can help us? Here is some nice info from wikipedia:

The graphics processing unit (GPU), as a specialized computer processor, addresses the demands of real-time high-resolution 3D graphics compute-intensive tasks. By 2012, GPUs had evolved into highly parallel multi-core systems allowing very efficient manipulation of large blocks of data. This design is more effective than general-purpose central processing unit (CPUs) for algorithms in situations where processing large blocks of data is done in parallel

Great, API? If you have GeForce in your PC it’s soo easy to write program for GPU using CUDA… no… actually not! There is library for CUDA in python ( but that library allows you to write program for GPU in… C! The best strategy here is to prepare whole data set in python and then pass it to GPU to make some expensive computation. CUDA use grid concept for computing on GPU but definitely I’m not a right person to explain that, just go there:

theory.stop(); practice.start();

Still bad solution – but parallel!

How we can run our brute-force algorithm in parallel? Easy peasy lemon squeezy – just run every nested loop in parallel. How we can run that on GPU using CUDA? Easy pea… no… actually not! Please find code and explanation below:

import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule
import numpy
from random import randint
import operator

def generateRandomArray(length):
    array = []
    for i in range(0, length):
        array.append(randint(-1000, 1000))
    return array

# Define CUDA function
mod = SourceModule("""
__global__ void find(int *array, int *result, int *result_end_index, int *N)  {
  int id = blockIdx.x*blockDim.x + threadIdx.x;
  int bestEndIndex = id+1;
  int bestSum = array[id];
  int currentSum = array[id];
  for(int j=id+1; j  bestSum) {
         bestSum = currentSum;
         bestEndIndex = j;
   result[id] = bestSum;
   result_end_index[id] = bestEndIndex;
N = 1000000
N_numpy = numpy.array([N]).astype(numpy.int32)
testArray = generateRandomArray(N)
array = numpy.array(testArray).astype(numpy.int32)
result = numpy.zeros(N).astype(numpy.int32)
result_end_index = numpy.zeros(N).astype(numpy.int32)
find_func = mod.get_function("find")

# Allocate on device
array_gpu = cuda.mem_alloc(array.size * array.dtype.itemsize)
result_gpu = cuda.mem_alloc(result.size * result.dtype.itemsize)
result_end_index_gpu = cuda.mem_alloc(result_end_index.size * result_end_index.dtype.itemsize)
N_gpu = cuda.mem_alloc(N_numpy.size * N_numpy.dtype.itemsize)

# Copy from host to device
cuda.memcpy_htod(array_gpu, array)
cuda.memcpy_htod(N_gpu, N_numpy)

# Number of threads per block
threadCount = 128

# Number of blocks per grid
blockCount = int(numpy.ceil(float(N) / threadCount))

find_func(array_gpu, result_gpu, result_end_index_gpu, N_gpu, block=(threadCount, 1, 1), grid=(blockCount, 1))

# Copy result to host
cuda.memcpy_dtoh(result, result_gpu)
cuda.memcpy_dtoh(result_end_index, result_end_index_gpu)

index, value = max(enumerate(result), key=operator.itemgetter(1))

print(value, index, result_end_index[index])

4: numpy library is basic library for mathematical computations in python, see:

8: simple function for generating random array using length and random items

32: create 1 element numpy array to pass array length to GPU

33,34: generate random array and convert to numpy pass to GPU

35: create array for result – here at index i will be placed maximum sum between range i and array length

36: create array for the best end index – here at index i will be placed index x for maximum subarray between i and x

39-43: allocated memory on GPU – do you remember malloc from C 😉 ?

46,47: copy source array onto GPU – after that operation, program running on GPU can access input array

50: I took it random, probably should be somehow related to graphic card parameters but who cares?

55: run program on GPU – take a look on that program

16: crème de la crème, program written in C running on GPU. As parameters we are passing input array, result array, result end index array, array length (it’s also pointer because of line 30). Inside that function we are calculating id of that execution what is for us also index in input and output array. After making calculations just simple write with result of that sum and end index.

58,59: copy output from GPU back to python

61: find maximum value in output array – in that array we have the best output sums for every index so finding biggest value solve our problem

How about time? 1 000 000 elems on my GeForce GTX 970M:

python3 8,38s user 3,22s system 105% cpu 11,011 total

python3 8,38s user 3,33s system 106% cpu 10,993 total

python3 8,39s user 3,26s system 106% cpu 10,947 total


As you can see in this article writing efficient algorithms is the best solution – divide and conquer algorithm has computational complexity around n*ln(n), brute-force algorithm has around n^2 what means that good algorithm can make ‘things’ much faster using less resources. Anyway by using GPU power in your PC and simple ( 😉 ) parallelization you can speed up even the worst algorithm – maybe after that operation it will meet your requirements? Just write simple performance tests and check!

Github repo for this article: