Stream microservices logs – how Riemann simplify things


This article is inspired by my step in into Clojure. I want to show you again my considerations about microservices logs streaming and analysis. I need to write it once again: logs from application are one of the most important things when it comes to debugging problems on production system. Nowadays it’s also important for DevOps team to react when bad things happen in system so logs analysis is must-have if you want to detect problems in your application before your business client will call you at night. I think that current, modern approach for logs analysis would involve machine learning algorithms which can detect real issues more precisely and ignore “noise” which is in system all the time. That article show improved architecture which originated there: 

This time I’m making improvement by simplifying whole architecture and using existing frameworks to achieve goals.


As you can see in that design microservices (which are Python 3 programs in my example) are pushing JSON logs directly to Kafka without using Fluentd to collect logs. I think that’s better approach because:

  1. Fluentd is out – one technology less, one problem less
  2. It’s easier to keep consistent configuration like this between deployments and local developer’s environment. It means that you can use that configuration on Kubernetes cluster in cloud and on local dockers – in case of fluentd it’s not so straightforward as fluentd usually attach to microservices logs files what is hard to simulate on local dockers

Important thing to point there is that microservices should have standardized and consistent logs structure. I would really advise to create logs in JSON format as it explicitly declare what is in your logs so it eliminates problems in parsing. Here is example log

  "level" : "info",
  "timestamp" : "2018-08-19T16:41:41+00:00",
  "path" : "",
  "line" : "10",
  "message": "My log"

That approach has also one disadvantage – size of logs grow. If your application produce big number of logs and you are afraid that it may be a problem to add name for every field in log you can consider Avro or Protocol Buffers which serialize data to binary format and use schema but sadly they are not human readable so please consider using there some “switch” for production and non-production environments.

Again Kafka will act in this design as efficient buffer for logs and infrastructure decouple mechanism. If we are streaming logs to Kafka we are sure that they will be saved even when logs consumer has problems/is down. As Kafka is extremely efficient we can be calm about our logs. Also we can attach many consumers to get our logs so alerting is possible, storing them in S3 or Elasticsearch, feeding machine learning models, etc. Kafka also decouples parts of infrastructure so we can avoid “infrastructure spaghetti” when X infrastructure components talk directly to Y other infrastructure components causing complexity on X*Y level.

Next part of technology stack in this example is Riemann – server/framework for handling event streams and attaching logic/alerts on them. Riemann is based on Clojure and allow to direct data streams from one source to another and to make analysis in middle of that. You can find great introduction to Riemann on that site:

I need to quote first paragraph from that article:

“For the last year I’ve been using nights and weekends to look to a variety of monitoring and logging tools. For reasons. I’ve spent a lot of hours playing with Nagios again (some years ago I wrote a book about it) as well as looking at tools like Sensu and Heka. One of the tools I am reviewing and am quite excited about is Riemann.”

My feelings about Riemann are the same – in this article Riemann will consume logs from Kafka, push them to Elasticsearch to allow searching using Kibana and make analysis to alert developer that high number of errors happen in particular python file.

As always full source code can be found on github:

Python microservice

So at first we need microservice which will generate logs. I wrote really dummy implementation which generate quite big number of logs on info and error level. First part is to create python login handler which will push logs to Kafka:

import logging

from confluent_kafka import Producer

class KafkaHandler(logging.Handler):

    def __init__(self, kafka_bootstrap_servers="kafka:9092", kafka_topic="program-logs", data_serializer_func=lambda v: v.encode('utf-8')):
        self.producer = Producer({'bootstrap.servers': kafka_bootstrap_servers})
        self.kafka_topic = kafka_topic
        self.data_serializer_func = data_serializer_func

    def delivery_report(self, err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    def emit(self, record):
            msg = self.format(record)
            self.producer.produce(self.kafka_topic, self.data_serializer_func(msg), callback=self.delivery_report)
        except Exception as e:
            logging.Handler.handleError(self, record)

    def close(self):

As you can see it simply extend logging handler by adding Confluent Kafka producer which take log JSON string and push on program-logs Kafka topic. Delivery report is added to easier POC diagnosis but should be probably deleted in real application.

Next part is to create JSON logs – I used for that example from – our logs will contain fields like:

  • message – log message
  • lineno – line number
  • pathname – name of python file
  • level – level of logs e.g. info, error

After configuration I simply start infinitive loop which calls 2 subprograms to achieve different pathname which will be used in Riemann.

while True:
    print("Logging loop start")
    print("Logging loop end")

So that loop will generate 2 logs record every 10ms so quite big number.

def random_log():
    log = structlog.getLogger(__name__)
    if random() > 0.7   :
        log.error("Fatal error")
    else:"I'm fine")

Random is used to generate errors with lower probability that regular logs.


To run that application in form of POC we need of course local environment which with we can play on local machine. As I wrote in

“Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools.”

So of course we will use docker-compose file to run local environment:

version: '3'

    image: confluent/zookeeper:3.4.6-cp1

    image: confluent/kafka:
      - zookeeper:zookeeper

    image: riemannio/riemann:0.3.1
      - ./riemann.config:/etc/riemann.config
      - HOST_IP=


      - 5601:5601
      context: .

Comments to lines:

4,7 – run Kafka with Zookeeper

15 – map riemann.config clojure file to configure Riemann

17 – here is little trick to access host from docker container so it’s my WIFI adress – you should put your host IP there

19, 22 – run Elasticsearch and Kibana to store and browse logs

25 – publish Kibana on localhost:5601 to browse logs

26 – Python microservices, built using Dockerfile which add all python files and install requirements

Riemann in action

Here comes riemann.config. At first we need to consume JSON logs from Kafka topic. It’s very easy as Kafka consumer is part of Riemann core so we need to just:

(kafka-consumer {:consumer.config {:bootstrap.servers "kafka:9092"
                 :topics ["program-logs"]})

So now Riemann consume messages from Kafka and allow us to manipulate that stream. As next step we want to push that stream to Elasticsearch.

(def elastic
  (elasticsearch {:es-endpoint "http://elasticsearch:9200"
                  :es-index "riemann"
                  :index-suffix "-yyyy.MM.dd"
                  :type "event"}
                  (fn [event]
                        (merge event {}))))
  (where (not (tagged "riemann")) elastic)

Streams is Riemann function which allow to attach logic into events. In this case we are filtering events to exclude those tagged as “riemann” as we don’t want to collect Riemann events which inform us about his metrics – we want only application events/logs. After that operation we can browse logs in Kibana. Please note that JSON is exploded to fields in Elasticsearch so we can easily filter by level or path.

Screenshot from 2018-08-18 23-05-02

Last part will be to push notifications to developer. On Ubuntu you can send notification to GUI from terminal by using notify-send command. Sadly it’s not easy to use that command from docker container to send notification to host’s GUI. As a workaround I started following bash script on host machine (script found on stackoverflow):


# no multiple connections: needs to improve
while true; do
    line="$(netcat -l -p 10000)"
    notify-send -- "Received Message" "$line"

So that code listen on 10000 TCP port and send notification with string received. All we need to do is to send string from Riemann docker to host IP on port 10000 – that explain why I placed HOST_IP in docker-compose file.

To send string on TCP socket we will use Java class within Clojure/Riemann as I not found some Clojure-native way.

(defn- transmit
  [writer socket message]
  (let [bytes-out   (bytes (byte-array (map byte message)))]
    (.write writer bytes-out 0 (count bytes-out))
    (.flush writer)))

(defn send-warning
  (with-open [socket (  (System/getenv "HOST_IP") 10000)
              writer ( (.getOutputStream socket))]
    (transmit writer socket
    (str "FOUND PROBLEM IN "(get event :pathname)":"(get event :lineno) )

That program open TCP socket on HOST_IP:10000 and write there warning string with pathname and lineno from event to inform developer that something bad happen in his code.

And final step is to attach alerting logic on events stream. To make example a little more advanced let’s say that we want to send warning only if logs with level error occurred more than 25 times within 10 seconds window for particular python file.

( streams
    (where (= (:level event) "error")
        (with :metric 1
            (by :pathname
                (rate 10
                      (where (> metric 25)

So to achieve that goal we are adding additional field :metric to every event with error level. Then “by” split event stream for us to N streams grouped by “pathname”. Rate command make 10 second windows and sum fields within that window so we can observe if added :metric is above 25 – if yes just send warning on TCP socket. Effect:


So that’s first article showing power of Riemann – I promise I will comeback to Riemann in next article with some more advanced alerting strategy as it’s real fun to write on Clojure.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s