Introduction
Application logs are very important in case of problem investigation and application health monitoring. Nowadays you want to monitor your logs and react on problems before your client will know that something is wrong. Also when your system loose stability you need to react immediately and keep your logs available and up-to-date as they are your ‘sight’ in application – let’s say that logs are first eye and metrics are second eye. In microservices world logs aggregation is even harder because you have X instances of particular microservice, every instance is printing huge number of logs to inform you what’s happening in application and those microservices are typically distributed to many different ‘nodes’ – how to deal with that?
This article describe how to use Fluentd on Kubernetes acting as Kafka producer to stream logs and how to use Fluentd on virtual machine acting as Kafka consumer to push logs to Elasticsearch. Also I want to show possibilities which give you application logs stream on Kafka by making dummy python implementation sending notifications on Slack to inform developers/devops about detected exceptions.
Why Kafka?
Streaming logs via Kafka gives you following advantages:
- As Kafka is extremely efficient it’s safe ‘buffer’ for your logs when your logs storage cannot consume huge amount of logs on load spikes
- Logs on Kafka are ready to integrate – you can attach many consumers and place logs in different storage engines or attach directly some analysis e.g. Apache Spark
- If you treat Kafka as integration bus for your system you can avoid ‘infrastructure spaghetti’ by making integration of different infrastructure modules using single point.
Proposed design
Fluentd on Kubernetes
Fluentd is deployed on Kubernetes using DaemonSet deployment. Please see deployment descriptor below:
apiVersion: extensions/v1beta1 kind: DaemonSet metadata: name: fluent namespace: fluent labels: k8s-app: fluent version: v1 kubernetes.io/cluster-service: "true" spec: template: metadata: labels: k8s-app: fluent version: v1 kubernetes.io/cluster-service: "true" spec: tolerations: - key: node-role.kubernetes.io/master effect: NoSchedule containers: - name: fluentd image: fluent/fluentd-kubernetes-daemonset:v1.1.3-debian-elasticsearch command: ["/bin/sh"] args: ["-c", "gem install fluent-plugin-kafka && cp /fluent-config/fluent.conf /fluentd/etc/ && /fluentd/entrypoint.sh"] volumeMounts: - name: varlog mountPath: /var/log - name: varlibdockercontainers mountPath: /var/lib/docker/containers readOnly: true - name: fluent-config mountPath: /fluent-config terminationGracePeriodSeconds: 30 volumes: - name: varlog hostPath: path: /var/log - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers - name: fluent-config configMap: name: fluent items: - key: fluent_conf path: fluent.conf
Please keep eye on important sections in that file:
- Toleration means that we want to deploy fluent also on Kubernetes masters
- Command override install kafka plugin and copy fluent.conf which we are attaching as config map
Here you can find config map:
apiVersion: v1 kind: ConfigMap metadata: name: fluent namespace: fluent data: fluent_conf: | @include kubernetes.conf @type kafka_buffered brokers your-kafka-broker-host:9092 topic_key your-logs-topic default_topic your-logs-topic output_data_type json output_include_tag true output_include_time true get_kafka_client_log false required_acks 1 compression_codec gzip
In that config we simply say to fluent: please produce all collected logs as messages on Kafka. Also we are activating gzip compression for optimization. We need at least one ack – you should set your-logs-topic replication factor to at least 2 to keep your logs when one broker is failing.
Kafka consumer – fluentd and Elasticsearch
After streaming logs to Kafka you can easily consume them using another fluent instance – my advise here is to place that consumer on the same machine where you have Elasticsearch or if you are using Elasticsearch SaaS like AWS ES, at least in the same subnet. Here you can find config:
@type kafka_group brokers your-kafka-broker-host:9092 consumer_group fluentd topics your-logs-topic format json use_record_time true @type elasticsearch @id out_es log_level info include_tag_key true host your-elasticsearch-host port 443 scheme https logstash_format true logstash_prefix kubernetes flush_thread_count 8 flush_interval 5s chunk_limit_size 6M queue_limit_length 256M retry_max_interval 30 retry_forever true
Please keep eye on important sections in that file:
- We are consuming logs as fluentd consumer group – if your-logs-topic has more than one partition you can consume logs using multiple instances of consumers
- We are simply saving logs in ‘kubernetes’ index in Elasticsearch
Kafka consumer – search & notify
The latest step is to attach consumer for log analysis. For that I’m going to modify example from confluent-kafka-python library using Python 3.6:
from confluent_kafka import Consumer, KafkaError import json import requests c = Consumer({ 'bootstrap.servers': 'your-kafka-broker-host', 'group.id': 'logs-inspector', 'default.topic.config': { 'auto.offset.reset': 'smallest' } }) c.subscribe(['your-logs-topic']) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break message_dict = json.loads(msg.value().decode('utf-8')) if "log" in message_dict.keys(): if "exception" in message_dict["log"].lower(): notification = f"Found problem in log\nContainer: {message_dict['kubernetes']['container_name']}\nNamespace: {message_dict['kubernetes']['namespace_name']} \nPod: {message_dict['kubernetes']['pod_name']} \nHost: {message_dict['kubernetes']['host']}" requests.post( "https://your-slack-webhook-url.com", data=json.dumps({"attachments": [{ 'text': notification, 'color': "#00FF00" }]}), headers={'Content-Type': 'application/json'}) c.close()
That script is very simple. Consume log message from topic, check if log key exists and it contains “exception” message – if yes, send notification via slack webhook to your team members. Please note additional information which Kubernetes gives you like node, namespace and pod name.
Why you are using here another fluentd as kafka consumer where kafka can directly communicate to elasticsearch.
LikeLike
Could you please describe which Kafka’s feature you mean? As far as I know Kafka cannot directly write to elasticsearch without additional tools like Kafka Connect Elasticsearch Connector what must be deployed separately – the same scenario as fluentd but when you use fluentd on both sides you simplify your deployment.
LikeLike