Stream microservices logs from Kubernetes to Kafka


Application logs are very important in case of problem investigation and application health monitoring. Nowadays you want to monitor your logs and react on problems before your client will know that something is wrong. Also when your system loose stability you need to react immediately and keep your logs available and up-to-date as they are your ‘sight’ in application – let’s say that logs are first eye and metrics are second eye. In microservices world logs aggregation is even harder because you have X instances of particular microservice, every instance is printing huge number of logs to inform you what’s happening in application and those microservices are typically distributed to many different ‘nodes’ – how to deal with that?

This article describe how to use Fluentd on Kubernetes acting as Kafka producer to stream logs and how to use Fluentd on virtual machine acting as Kafka consumer to push logs to Elasticsearch. Also I want to show possibilities which give you application logs stream on Kafka by making dummy python implementation sending notifications on Slack to inform developers/devops about detected exceptions.

Why Kafka?

Streaming logs via Kafka gives you following advantages:

  1. As Kafka is extremely efficient it’s safe ‘buffer’ for your logs when your logs storage cannot consume huge amount of logs on load spikes
  2. Logs on Kafka are ready to integrate – you can attach many consumers and place logs in different storage engines or attach directly some analysis e.g. Apache Spark
  3. If you treat Kafka as integration bus for your system you can avoid ‘infrastructure spaghetti’ by making integration of different infrastructure modules using single point.

Proposed design


Fluentd on Kubernetes

Fluentd is deployed on Kubernetes using DaemonSet deployment. Please see deployment descriptor below:

apiVersion: extensions/v1beta1
kind: DaemonSet
  name: fluent
  namespace: fluent
    k8s-app: fluent
    version: v1 "true"
        k8s-app: fluent
        version: v1 "true"
      - key:
        effect: NoSchedule
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.1.3-debian-elasticsearch
        command: ["/bin/sh"]
        args: ["-c", "gem install fluent-plugin-kafka && cp /fluent-config/fluent.conf /fluentd/etc/ && /fluentd/"]
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-config
          mountPath: /fluent-config
      terminationGracePeriodSeconds: 30
      - name: varlog
          path: /var/log
      - name: varlibdockercontainers
          path: /var/lib/docker/containers
      - name: fluent-config
          name: fluent
          - key: fluent_conf
            path: fluent.conf

Please keep eye on important sections in that file:

  • Toleration means that we want to deploy fluent also on Kubernetes masters
  • Command override install kafka plugin and copy fluent.conf which we are attaching as config map

Here you can find config map:

apiVersion: v1
kind: ConfigMap
  name: fluent
  namespace: fluent
  fluent_conf: |
    @include kubernetes.conf

      @type kafka_buffered

      brokers             your-kafka-broker-host:9092

      topic_key             your-logs-topic
      default_topic         your-logs-topic
      output_data_type      json
      output_include_tag    true
      output_include_time   true
      get_kafka_client_log  false

      required_acks         1
      compression_codec     gzip

In that config we simply say to fluent: please produce all collected logs as messages on Kafka. Also we are activating gzip compression for optimization. We need at least one ack – you should set your-logs-topic replication factor to at least 2 to keep your logs when one broker is failing. 

Kafka consumer – fluentd and Elasticsearch

After streaming logs to Kafka you can easily consume them using another fluent instance – my advise here is to place that consumer on the same machine where you have Elasticsearch or if you are using Elasticsearch SaaS like AWS ES, at least in the same subnet. Here you can find config:

 @type kafka_group
 brokers your-kafka-broker-host:9092
 consumer_group fluentd
 topics your-logs-topic
 format json
 use_record_time true

 @type elasticsearch
 @id out_es
 log_level info
 include_tag_key true
 host your-elasticsearch-host
 port 443
 scheme https
 logstash_format true
 logstash_prefix kubernetes
   flush_thread_count 8
   flush_interval 5s
   chunk_limit_size 6M
   queue_limit_length 256M
   retry_max_interval 30
   retry_forever true


Please keep eye on important sections in that file:

  • We are consuming logs as fluentd consumer group – if your-logs-topic has more than one partition you can consume logs using multiple instances of consumers
  • We are simply saving logs in ‘kubernetes’ index in Elasticsearch

Kafka consumer – search & notify

The latest step is to attach consumer for log analysis. For that I’m going to modify example from confluent-kafka-python library using Python 3.6:

from confluent_kafka import Consumer, KafkaError
import json
import requests

c = Consumer({
    'bootstrap.servers': 'your-kafka-broker-host',
    '': 'logs-inspector',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'


while True:
    msg = c.poll(1.0)

    if msg is None:
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:

    message_dict = json.loads(msg.value().decode('utf-8'))
    if "log" in message_dict.keys():
      if "exception" in message_dict["log"].lower():
        notification = f"Found problem in log\nContainer: {message_dict['kubernetes']['container_name']}\nNamespace: {message_dict['kubernetes']['namespace_name']} \nPod: {message_dict['kubernetes']['pod_name']} \nHost: {message_dict['kubernetes']['host']}"
          data=json.dumps({"attachments": [{
            'text': notification,
            'color': "#00FF00"
          headers={'Content-Type': 'application/json'})

That script is very simple. Consume log message from topic, check if log key exists and it contains “exception” message – if yes, send notification via slack webhook to your team members. Please note additional information which Kubernetes gives you like node, namespace and pod name.

Useful Docker #1 – Seed container at runtime

Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools. Using that feature I can cover many cases using simple docker container and some magic around. In this article series I’m going to show you how to use docker in creative way and how to debug problems inside docker containers.

Today something about CMD, magic in runtime and docker commit.

I have assumption that you have basic knowledge about docker and some experience in using that great tool – if not I suggest to check that site:

My environment in this article:

  • Docker version 17.12.0-ce, build c97c6d6
  • Ubuntu 17.10
  • zsh 5.2 (x86_64-ubuntu-linux-gnu)
  • tmux 2.5

Everything should also work in docker on windows (this one using hyper-v) – if there is problem somewhere please contact me on my linkedin


Seed your container in runtime

Dockerhub provides many images (as mentioned here in 2016 was 400k) with useful defaults but these defaults rarely meet our requirements. To deal with that situation we can take many strategies but one of my favorite is overriding default docker command with some magic.

Let’s say that we want to host blog in docker container and switch my site logo to


just for fun!

Required steps:

  • Start container
  • Download site content recursive
  • Start web hosting
  • Change image

We need to start with some static web content hosting like

We have command from this site (with port publish):

docker run -p 8080:80 --name some-nginx -v /some/content:/usr/share/nginx/html:ro -d nginx

Ok but after staring this container we can see only welcome site from nginx – how to inject content there? There are 2 ways:

  • Add content to docker image – useful if we want to use that image many times
  • Override run command and download content before web server start – our approach because we are fast&furious

So what’s default command in nginx? To check that we should go to origin Dockerfile like this and at end we see:

CMD ["nginx", "-g", "daemon off;"]

Cool, let’s try this way:

docker run -it -p 8080:80 nginx bash -c "cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate ; nginx -g 'daemon off;'"

What’s happening there:

  • docker – 😉
  • run – start new container
  • -it – interactive (get stdout, pass stdin)
  • -p 8080:80 – publish internal nginx 80 port onto our machine 8080 so we can access nginx using localhost:8080
  • bash -c ” ” – that’s little magic to pass some long command with &&, ; – bash man says:
    -c string If the -c option is present,  then  commands  are  read  from
                 string.   If  there  are arguments after the string, they are
                 assigned to the positional parameters, starting with $0.
  • cd… – sure
  • wget – we need to download recursive my site
  • nginx -g – start nginx (use default command from Dockerfile)

Ok so let’s run it… woooops!!! Something is not working ;(

bash: wget: command not found

Please remember that every docker container is isolated virtual OS  (so actually it’s not but you can think about docker in this way 😉 ) – it means that every docker image has different tools installed. Sadly nginx image doesn’t contain wget tool but okey dokey – we can install wget using apt-get inside container, no worries:

docker run -it -p 8080:80 nginx bash -c "apt-get update && apt-get install --no-install-recommends --no-install-suggests -y wget && cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate ; nginx -g 'daemon off;'"

What’s happening there:

  • sudo not needed because inside container we are root
  • apt-get update – very important! Please remember to run that command before any apt-get install inside containers to get valid result
  • apt-get install – here -y is important because that command will run in non-interactive mode what means that we need auto-confirm that we are sure to install following package

So after running this command everything seems fine – wget is downloading my site and nginx is starting but at localhost:8080 I see:

Screenshot from 2018-02-18 12-14-04

Damn that’s not cool….


To debug that situation we need to get into container and see whats happening in file system after wget command. To do that we need to start with:

docker ps

And output:

Screenshot from 2018-02-18 12-16-45

The most important there is Container Id – we’re going to use that ID to get into container, open new console and try (change container id to yours because it’s random string):

docker exec -it 69cb502ab0d1 bash

After that command we are in new shell process inside container in interactive mode. Let’s check what’s under /usr/share/nginx/html:

drwxr-xr-x 1 root root 4.0K Feb 18 11:20 . 
drwxr-xr-x 1 root root 4.0K Dec 26 18:16 .. 
-rw-r--r-- 1 root root 537 Dec 26 11:11 50x.html 
-rw-r--r-- 1 root root 612 Dec 26 11:11 index.html 
drwxr-xr-x 7 root root 4.0K Feb 18 11:20

Ok so wget downloaded my site into directory – try in web browser:


You should see my blog 🙂

Change image

Stay please in our shell process inside container. Changing site logo is so easy, just type: vim /usr/share/nginx/html/ omg there is no vim


<disclaimer> Many popular images don’t have basic tools like vim or even bash because every additional tool not needed at runtime means bigger image size to download for users </disclaimer>

apt-get install vim

I believe that you are VIM WIZARD LVL 300 but if not just type:

ESC /site-logo ENTER

To find my logo section and change next <img> tag src to (should help: ESC a ) :



Go to web browser and hit F5:

Screenshot from 2018-02-18 12-36-26.png

That was so easy, was it?

Commit container into image

Last step is to convert container into image.


It means that we can say to docker: hey! I have here some container where I installed some tools and modified some state, please convert that container into image so I can reuse that manual configuration many times


docker commit 69cb502ab0d1 my-awesome-image:1.0

So again we are using Container ID and docker is creating for us image with name my-awesome-image:1.0 – let’s check if that works:

docker run -it -p 8081:80 my-awesome-image:1.0 bash -c "nginx -g 'daemon off;'"

What’s happening:

  • Run container from committed image
  • Override command again – docker set my-awesome-image’s cmd to that one with wget but we don’t need that here! We committed whole container so site is already in image – if we don’t need latest version we can override again cmd to start only web server

Go under:


And you should see my blog and docker logo instead mine.


As you can see docker is quite powerful – when using with care you can really do magic cases. I hope that was helpful for you – I decided to divide docker article into series because I have many more examples how to use that tool in creative way. See you in next article!

How GPU can help you if your algorithm is not efficient


Nowadays IT world is quite complex – programmers must learn how to deal with data integrity, how to talk with database using ORM, how to design application using DDD or how to call microservice using REST request. We should remember that computer programming is more about writing algorithms for computers and less about learning APIs of libraries – I’m actually asking myself: is it truth? Programmers today have not enough time to write efficient algorithms because:

  • please add new feature
  • please remember that you have 100 features in backlog
  • please make it now working and we will go back there in future for performance improvements

Writing quick algorithm is very advanced issue – you must really understand problem domain and have big knowledge about algorithms complexity. Look at size of book “Introduction to algorithms” by Thomas H. Cormen, Charles E. Leiserson, Ronald L. Rivest, Clifford Stein – 1300 pages!


In this article I’m going to use simple algorithm from that book: find maximum subarray.

Assumption: we have 1D array of integer numbers

Task: find startIndex and endIndex of maximum subarray ( addition operation on elements) in that array, provide result of addition

Ok so let’s say that we have following array:

array = [1, 2, 10, 12]

Answer for that case is very easy – whole array is maximum so startIndex is 0, endIndex is 3 and result is 25. So when all array items are positive it means that it’s boring case, take a look on something more interesting:

array = [13,-3, -25, 20, -3, -16, -23, 18, 20, -7, 12, -5, -22, 15, -4, 7]

Here answer is not so easy – when we just add all items our result is: -3

But when we add items from index 7 to index 10 our result is: 43 what is maximum subarray for that array. How to code that algorithm for best performance?

Right solution

Proper and efficient solution for such problem is to use divide and conquer algorithm. In this article I’m not going to focus on explaining how that algorithm works because it’s not a main point. In simple world – it uses recursion and divide your array to smaller parts in next steps what allows to find result really fast. I’ve implemented that algorithm in golang, here is some part of code:

func findMaximumSubarray(array []int, low int, high int) (int, int, int) {
        if high == low {
                return low,high,array[low]
        } else {
                mid := (low+high)/2
                leftLow,leftHigh,leftSum := findMaximumSubarray(array, low, mid)
                rightLow,rightHigh,rightSum := findMaximumSubarray(array, mid + 1, high)
                crossLow,crossHigh,crossSum := findMaxCrossingSubarray(array, low, mid, high)
                if leftSum >= rightSum && leftSum >= crossSum {
                        return leftLow, leftHigh, leftSum
                } else if rightSum >= leftSum && rightSum >= crossSum {
                        return rightLow, rightHigh, rightSum
                } else {
                        return crossLow, crossHigh, crossSum

func findMaxCrossingSubarray(array []int, low int, mid int, high int) (int, int, int) {
        leftSum := math.MinInt32
        sum := 0
        maxLeft := 0
        for i:=mid; i > low; i-- {
                sum += array[i]
                if sum > leftSum {
                        leftSum = sum
                        maxLeft = i
        rightSum := math.MinInt32
        sum = 0
        maxRight := 0
        for i:=mid+1; i  rightSum {
                        rightSum = sum
                        maxRight = i
        return maxLeft, maxRight, leftSum+rightSum

Full source code here:

To analyze execution time of that algorithm we should use it on some random integer array and measure execution using time linux program, here some examples for 50 000 000 array length:

./divide-and-conquer-recursive 4,73s user 0,08s system 100% cpu 4,808 total

./divide-and-conquer-recursive 4,67s user 0,14s system 100% cpu 4,807 total

./divide-and-conquer-recursive 4,68s user 0,12s system 100% cpu 4,794 total

It’s very fast and working well but requires deep understand of problem and big knowledge about algorithms – how to implement that simpler?

Bad solution – brute-force

The easiest implementation of that program to solve that problem is to use brute force – in other words just move across whole array item-by-item and find maximum result by adding next items. Take a look on simple implementation in python:

def algorithm(array):
    bestStartIndex = 0
    bestEndIndex = 0
    bestSum = 0
    for i in range(0, len(array)):
        currentSum = array[i]
        for j in range(i + 1, len(array)):
            currentSum += array[j]
            if currentSum > bestSum:
                bestSum = currentSum
                bestStartIndex = i
                bestEndIndex = j
    return bestStartIndex, bestEndIndex, bestSum

Full source code here:

So easy right? But look what’s happening – we are making nested for loop what means that for 50 000 000 elements array we have that number of loop calculations (programmers&mathematicians please don’t be angry because I rounded some numbers for easier calculation):

\sum_{n=1}^{50 000 000} 50 000 000 - n = (0+1+2+3+4+...+49 999 999) = \newline \newline  \frac{0 + 49 999 999}{2} * 50 000 000 = 1,249999975 * 10^{15}

Quite big number – so what’s happening when we run that program? Maybe start with 25000 array length:

python3 21,82s user 0,03s system 99% cpu 21,862 total

python3 22,48s user 0,00s system 99% cpu 22,487 total

python3 21,82s user 0,00s system 99% cpu 21,821 total

It’s soo bad. Even small number of elements cause that program very, very, VERY slow. Here is some magic from python – out of the box python is not very good language for mathematical operations. To solve that problem we need some external library

Numba works by generating optimized machine code using the LLVM compiler infrastructure at import time, runtime, or statically (using the included pycc tool).

So just modify our program this way:

from numba import jit

def algorithm(array):
    bestStartIndex = 0
    bestEndIndex = 0
    bestSum = 0
    for i in range(0, len(array)):
        currentSum = array[i]
        for j in range(i + 1, len(array)):
            currentSum += array[j]
            if currentSum > bestSum:
                bestSum = currentSum
                bestStartIndex = i
                bestEndIndex = j
    return bestStartIndex, bestEndIndex, bestSum

And now:

python3 0,95s user 0,52s system 195% cpu 0,754 total

python3 0,92s user 0,58s system 191% cpu 0,787 total

python3 0,95s user 0,56s system 189% cpu 0,793 total

So far so good!!! Try with some bigger size, 250 000

python3 34,00s user 0,42s system 101% cpu 33,906 total

python3 32,83s user 0,58s system 102% cpu 32,688 total

python3 34,43s user 0,57s system 102% cpu 34,284 total

Not really bad but how we can speed up that algorithm? Maybe some parallelism would help?


How GPU can help us? Here is some nice info from wikipedia:

The graphics processing unit (GPU), as a specialized computer processor, addresses the demands of real-time high-resolution 3D graphics compute-intensive tasks. By 2012, GPUs had evolved into highly parallel multi-core systems allowing very efficient manipulation of large blocks of data. This design is more effective than general-purpose central processing unit (CPUs) for algorithms in situations where processing large blocks of data is done in parallel

Great, API? If you have GeForce in your PC it’s soo easy to write program for GPU using CUDA… no… actually not! There is library for CUDA in python ( but that library allows you to write program for GPU in… C! The best strategy here is to prepare whole data set in python and then pass it to GPU to make some expensive computation. CUDA use grid concept for computing on GPU but definitely I’m not a right person to explain that, just go there:

theory.stop(); practice.start();

Still bad solution – but parallel!

How we can run our brute-force algorithm in parallel? Easy peasy lemon squeezy – just run every nested loop in parallel. How we can run that on GPU using CUDA? Easy pea… no… actually not! Please find code and explanation below:

import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule
import numpy
from random import randint
import operator

def generateRandomArray(length):
    array = []
    for i in range(0, length):
        array.append(randint(-1000, 1000))
    return array

# Define CUDA function
mod = SourceModule("""
__global__ void find(int *array, int *result, int *result_end_index, int *N)  {
  int id = blockIdx.x*blockDim.x + threadIdx.x;
  int bestEndIndex = id+1;
  int bestSum = array[id];
  int currentSum = array[id];
  for(int j=id+1; j  bestSum) {
         bestSum = currentSum;
         bestEndIndex = j;
   result[id] = bestSum;
   result_end_index[id] = bestEndIndex;
N = 1000000
N_numpy = numpy.array([N]).astype(numpy.int32)
testArray = generateRandomArray(N)
array = numpy.array(testArray).astype(numpy.int32)
result = numpy.zeros(N).astype(numpy.int32)
result_end_index = numpy.zeros(N).astype(numpy.int32)
find_func = mod.get_function("find")

# Allocate on device
array_gpu = cuda.mem_alloc(array.size * array.dtype.itemsize)
result_gpu = cuda.mem_alloc(result.size * result.dtype.itemsize)
result_end_index_gpu = cuda.mem_alloc(result_end_index.size * result_end_index.dtype.itemsize)
N_gpu = cuda.mem_alloc(N_numpy.size * N_numpy.dtype.itemsize)

# Copy from host to device
cuda.memcpy_htod(array_gpu, array)
cuda.memcpy_htod(N_gpu, N_numpy)

# Number of threads per block
threadCount = 128

# Number of blocks per grid
blockCount = int(numpy.ceil(float(N) / threadCount))

find_func(array_gpu, result_gpu, result_end_index_gpu, N_gpu, block=(threadCount, 1, 1), grid=(blockCount, 1))

# Copy result to host
cuda.memcpy_dtoh(result, result_gpu)
cuda.memcpy_dtoh(result_end_index, result_end_index_gpu)

index, value = max(enumerate(result), key=operator.itemgetter(1))

print(value, index, result_end_index[index])

4: numpy library is basic library for mathematical computations in python, see:

8: simple function for generating random array using length and random items

32: create 1 element numpy array to pass array length to GPU

33,34: generate random array and convert to numpy pass to GPU

35: create array for result – here at index i will be placed maximum sum between range i and array length

36: create array for the best end index – here at index i will be placed index x for maximum subarray between i and x

39-43: allocated memory on GPU – do you remember malloc from C 😉 ?

46,47: copy source array onto GPU – after that operation, program running on GPU can access input array

50: I took it random, probably should be somehow related to graphic card parameters but who cares?

55: run program on GPU – take a look on that program

16: crème de la crème, program written in C running on GPU. As parameters we are passing input array, result array, result end index array, array length (it’s also pointer because of line 30). Inside that function we are calculating id of that execution what is for us also index in input and output array. After making calculations just simple write with result of that sum and end index.

58,59: copy output from GPU back to python

61: find maximum value in output array – in that array we have the best output sums for every index so finding biggest value solve our problem

How about time? 1 000 000 elems on my GeForce GTX 970M:

python3 8,38s user 3,22s system 105% cpu 11,011 total

python3 8,38s user 3,33s system 106% cpu 10,993 total

python3 8,39s user 3,26s system 106% cpu 10,947 total


As you can see in this article writing efficient algorithms is the best solution – divide and conquer algorithm has computational complexity around n*ln(n), brute-force algorithm has around n^2 what means that good algorithm can make ‘things’ much faster using less resources. Anyway by using GPU power in your PC and simple ( 😉 ) parallelization you can speed up even the worst algorithm – maybe after that operation it will meet your requirements? Just write simple performance tests and check!

Github repo for this article: