Stream microservices logs from Kubernetes to Kafka

Introduction

Application logs are very important in case of problem investigation and application health monitoring. Nowadays you want to monitor your logs and react on problems before your client will know that something is wrong. Also when your system loose stability you need to react immediately and keep your logs available and up-to-date as they are your ‘sight’ in application – let’s say that logs are first eye and metrics are second eye. In microservices world logs aggregation is even harder because you have X instances of particular microservice, every instance is printing huge number of logs to inform you what’s happening in application and those microservices are typically distributed to many different ‘nodes’ – how to deal with that?

This article describe how to use Fluentd on Kubernetes acting as Kafka producer to stream logs and how to use Fluentd on virtual machine acting as Kafka consumer to push logs to Elasticsearch. Also I want to show possibilities which give you application logs stream on Kafka by making dummy python implementation sending notifications on Slack to inform developers/devops about detected exceptions.

Why Kafka?

Streaming logs via Kafka gives you following advantages:

  1. As Kafka is extremely efficient it’s safe ‘buffer’ for your logs when your logs storage cannot consume huge amount of logs on load spikes
  2. Logs on Kafka are ready to integrate – you can attach many consumers and place logs in different storage engines or attach directly some analysis e.g. Apache Spark
  3. If you treat Kafka as integration bus for your system you can avoid ‘infrastructure¬†spaghetti’ by making integration of different infrastructure modules using single point.

Proposed design

fluend-kafka-kubernetes

Fluentd on Kubernetes

Fluentd is deployed on Kubernetes using DaemonSet deployment. Please see deployment descriptor below:

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  name: fluent
  namespace: fluent
  labels:
    k8s-app: fluent
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  template:
    metadata:
      labels:
        k8s-app: fluent
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.1.3-debian-elasticsearch
        command: ["/bin/sh"]
        args: ["-c", "gem install fluent-plugin-kafka && cp /fluent-config/fluent.conf /fluentd/etc/ && /fluentd/entrypoint.sh"]
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-config
          mountPath: /fluent-config
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-config
        configMap:
          name: fluent
          items:
          - key: fluent_conf
            path: fluent.conf

Please keep eye on important sections in that file:

  • Toleration means that we want to deploy fluent also on Kubernetes masters
  • Command override install kafka plugin and copy fluent.conf which we are attaching as config map

Here you can find config map:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent
  namespace: fluent
data:
  fluent_conf: |
    @include kubernetes.conf

    <match **>
      @type kafka_buffered

      brokers             your-kafka-broker-host:9092

      topic_key             your-logs-topic
      default_topic         your-logs-topic
      output_data_type      json
      output_include_tag    true
      output_include_time   true
      get_kafka_client_log  false

      required_acks         1
      compression_codec     gzip
    </match>
     

In that config we simply say to fluent: please produce all collected logs as messages on Kafka. Also we are activating gzip compression for optimization. We need at least one ack Рyou should set your-logs-topic replication factor to at least 2 to keep your logs when one broker is failing. 

Kafka consumer – fluentd and Elasticsearch

After streaming logs to Kafka you can easily consume them using another fluent instance – my advise here is to place that consumer on the same machine where you have Elasticsearch or if you are using Elasticsearch SaaS like AWS ES, at least in the same subnet. Here you can find config:

<source>
 @type kafka_group
 brokers your-kafka-broker-host:9092
 consumer_group fluentd
 topics your-logs-topic
 format json
 use_record_time true
</source>


<match **>
 @type elasticsearch
 @id out_es
 log_level info
 include_tag_key true
 host your-elasticsearch-host
 port 443
 scheme https
 logstash_format true
 logstash_prefix kubernetes
 <buffer>
   flush_thread_count 8
   flush_interval 5s
   chunk_limit_size 6M
   queue_limit_length 256M
   retry_max_interval 30
   retry_forever true
 </buffer>
</match>
 

Please keep eye on important sections in that file:

  • We are consuming logs as fluentd consumer group – if your-logs-topic has more than one partition you can consume logs using multiple instances of consumers
  • We are simply saving logs in ‘kubernetes’ index in Elasticsearch

Kafka consumer – search & notify

The latest step is to attach consumer for log analysis. For that I’m going to modify example from confluent-kafka-python library using Python 3.6:

from confluent_kafka import Consumer, KafkaError
import json
import requests

c = Consumer({
    'bootstrap.servers': 'your-kafka-broker-host',
    'group.id': 'logs-inspector',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
    }
})

c.subscribe(['your-logs-topic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    message_dict = json.loads(msg.value().decode('utf-8'))
    if "log" in message_dict.keys():
      if "exception" in message_dict["log"].lower():
        notification = f"Found problem in log\nContainer: {message_dict['kubernetes']['container_name']}\nNamespace: {message_dict['kubernetes']['namespace_name']} \nPod: {message_dict['kubernetes']['pod_name']} \nHost: {message_dict['kubernetes']['host']}"
        requests.post(
          "https://your-slack-webhook-url.com",
          data=json.dumps({"attachments": [{
            'text': notification,
            'color': "#00FF00"
            }]}),
          headers={'Content-Type': 'application/json'})
c.close()

That script is very simple. Consume log message from topic, check if log key exists and it contains “exception” message – if yes, send notification via slack webhook to your team members. Please note additional information which Kubernetes gives you like node, namespace and pod name.