Container and cluster scaling on Kubernetes using Horizontal Pod Autoscaler and Cluster Autoscaler on AWS EKS

Introduction

Kubernetes is about scaling but it doesn’t mean that we have auto scaling out-of-the-box – we must activate some additional components and configure them. In this article I want to show you working example of simple service scaling on Kubernetes using Horizontal Pod Autoscaler. After reaching maximum cluster capacity we will automatically add more workers to our cluster using Cluster Autoscaler. Everything will run on Amazon EKS – managed Kubernetes on AWS.

article-kubernetes-scaling

As always full source code can be found on my github: https://github.com/jakubbujny/article-scale-containers-and-eks

Simple deployment creating load

To test auto scaling we need some simple service which will create big load for us so scaling can be triggered. For that task we are going to use following Python code:

import flask
import uuid
import hashlib
app = flask.Flask(__name__)
@app.route("/")
def hello():
    for i in range(0,800000):
      hashlib.sha224(uuid.uuid4().hex.upper()[0:6].encode()).hexdigest()
    return "Done"

app.run(host="0.0.0.0", threaded=True)

That code is really simple – just create web endpoint using Flask framework – GET request on “/” will cause long loop which calculate a lot of SHA hashes from random UUID what should take about 5–10 seconds and consume a lot of CPU during that time.

To avoid building own Docker image as we want to avoid creating docker registry (to simplify example) we can use simple trick by taking docker image jazzdd/alpine-flask:python3 which is available on dockerhub and contains Python/Flask installed. So we can create our python file in “command” section and run it, see full yaml below:

---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  namespace: default
  name: microservice
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: microservice
    spec:
      containers:
      - name: microservice
        image: jazzdd/alpine-flask:python3
        command: ["sh"]
        args: ["-c", "printf \"import flask\\nimport uuid\\nimport hashlib\\napp = flask.Flask(__name__)\\n@app.route(\\\"/\\\")\\ndef hello():\\n    for i in range(0,800000):\\n     hashlib.sha224(uuid.uuid4().hex.upper()[0:6].encode()).hexdigest()\\n    return \\\"Done\\\"\\napp.run(host=\\\"0.0.0.0\\\", threaded=True)\" > script.py && python3 script.py"]
        ports:
        - name: http-port
          containerPort: 5000
        resources:
          requests:
            cpu: 200m

Important thing there is resources request block which says that on 1 CPU core machine (which we are going to use in this article) we can create 5 microservice PODs (200m x 5 = 1000m = 1CPU) and reaching that number means end of capacity of particular node. Reaching cluster capacity will be trigger for Cluster Autoscaler.

Horizontal Pod Autoscaler

Horizontal scaling in Kubernetes world means adding more pods in particular deployment. To achieve that Horizontal Pod Autoscaler can be used but we need to note one important thing: In the newest Kubernetes version metrics-server need to be installed to use HPA – Heapster is deprecated and shouldn’t be used anymore. 

To test that on minikube you need just to type:

minikube addons enable metrics-server

To deploy metrics-server on EKS you need to clone following repository: https://github.com/kubernetes-incubator/metrics-server and then issue command:

kubectl apply -f metrics-server/deploy/1.8+/

To activate HPA for our microservice we need to apply following yaml file:

---
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
  namespace: default
  name: microservice
spec:
  scaleTargetRef:
    apiVersion: apps/v1beta1
    kind: Deployment
    name: microservice
  minReplicas: 1
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        targetAverageUtilization: 50

targetAverageUtilization: 50 means that Kubernetes will try to maintain half of usage of CPU requested by our microservice (50% * 200m = 100m) on particular POD. E.g. when we have single POD which is having 200m of CPU, Kubernetes will create new POD so 200m can be divided on 2 PODs (100m and 100m).

AWS EKS and Cluster Autoscaler

Disclaimer – why use Cluster Autoscaler instead of ASG scaling trigger based on CPU?

From Cluster Autoscaler FAQ:

“Cluster Autoscaler makes sure that all pods in the cluster have a place to run, no matter if there is any CPU load or not. Moreover, it tries to ensure that there are no unneeded nodes in the cluster.

CPU-usage-based (or any metric-based) cluster/node group autoscalers don’t care about pods when scaling up and down. As a result, they may add a node that will not have any pods, or remove a node that has some system-critical pods on it, like kube-dns. Usage of these autoscalers with Kubernetes is discouraged.”

For EKS deployment we are going to use modified EKS version from my previous article.

Cluster Autoscaler is component which will be installed on EKS cluster. It will look in Kubernetes API and make request to AWS API to scale worker nodes’s ASG. It means that node on which Cluster Autoscaler will reside need proper IAM policy which will allow container from that node to make operations on ASG.

 resource "aws_iam_role_policy" "for-autoscaler" {
  name = "for-autoscaler"
  policy = <<POLICY
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "autoscaling:DescribeAutoScalingGroups",
                "autoscaling:DescribeAutoScalingInstances",
                "autoscaling:SetDesiredCapacity",
                "autoscaling:DescribeTags",
                "autoscaling:TerminateInstanceInAutoScalingGroup"
            ],
            "Resource": "*"
        }
    ]
}
POLICY
  role = "${aws_iam_role.eks-node.name}"
}

That policy should be probably limited in Resource section but we will leave * to simplify example.

We put some additional tags to ASG to use them in Cluster Autoscaler

  tag {
    key = "k8s.io/cluster-autoscaler/enabled"
    value = "whatever"
    propagate_at_launch = false
  }

  tag {
    key                 = "kubernetes.io/cluster/eks"
    value               = "owned"
    propagate_at_launch = true
  }

We must also setup security groups to allow 443 port communication from cluster control plane to nodes as mentioned in this issue: https://github.com/kubernetes-incubator/metrics-server/issues/45

For Cluster Autoscaler we will modify a little example deployment from here: https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-autodiscover.yaml

We need to modify tags which Cluster Autoscaler will use to discover ASG which will be scaled:

- --node-group-auto-discovery=asg:tag=k8s.io/cluster-autoscaler/enabled,kubernetes.io/cluster/eks

Add env with region in which we are operating:

 env:
   - name: "AWS_REGION"
     value: eu-west-1

Change certificate to use what is required for EKS:

 volumeMounts:
   - name: ssl-certs
     mountPath: /etc/ssl/certs/ca-bundle.crt
     readOnly: true

Cluster Autoscaler is ready to use and will scale up or down worker nodes using ASG scaling between 1 and 10 instances.

Testing

Last step is to create load balancer attached to microservice and test auto scaling by making some requests to create load.

apiVersion: v1
kind: Service
metadata:
  name: microservice
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: nlb
  labels:
    app: microservice
spec:
  ports:
  - port: 80
    targetPort: http-port
  selector:
    app: microservice
  type: LoadBalancer

You can just simply try to open load balancer endpoint on root in web browser and hit f5 a few times to generate load or use script like:

while true; do sleep 2; timeout 1 curl http://<elb_id>.elb.eu-west-1.amazonaws.com/; done

 

After that you should see in Kubernetes that HPA scaled your containers up and reached maximum node capacity. After while Cluster Autoscaler should scale AWS ASG and add new worker node so HPA can complete PODs scaling.

Replicate cloud AWS RDS MySQL to on-premise PostgreSQL in Docker – future is today! Debezium and Kafka on AWS EKS

Introduction

Nowadays applications have bigger requirements then older ones – many SaaS systems need to operate globally on all continents or in hybrid solutions, sharing some data between private and public clouds. That requirement is really complicated as many systems and users require real-time responses – there are expectations that modern IT systems are fast, efficient and provide zero waiting time. Looking on pure speed of light (as in ideal environment that’s speed of electrons running through cables) when some information need to pass 10 000 km it will take around 30ms and that’s limit coming from physics (best regards Einstein!). We all know that it will take much longer because of electronic parts delays, routers, protocols overhead, traffic congestion, etc. How to deal with that?

As currently IT systems are more about reads than writes (probably not in IoT but that’s different story) there is concept to replicate data from “source of truth” and store it near to client to provide following advantages:

  • data is near client so he can make fast reads
  • when “source of truth” database is down, reads from replica are still working
  • when connection with “source of truth” is broken, reads from replica are still working
  • load on “source of truth” can be reduced by passing all reads to replica

In this article I show you how to replicate changes happening in database located in AWS cloud to database located on-prem. We can achieve that using Debezium tool which use CDC concept (change data capture) to stream changes onto Kafka, acting as data producer. On the other side we can attach Kafka Connect tool which can consume that stream and write that to target database using JDBC. To make it more interesting and easier I deployed Kafka and Debezium on Amazon EKS – new Kubernetes service in AWS. See architecture diagram:

article-rds-debezium-postgres-kafka-kubernetes (2)

As always full source code can be found on my Github:

https://github.com/jakubbujny/article-replicate-cloud-AWS-RDS-MySQL-to-on-premise-PostgreSQL-in-Docker-future-is-today

Disclaimer – security

Configuration showed in that article is definitely far away from proper security standards required by production systems looking on e.g. lack of SSL/encryption, free public access to services, not limited network or weak passwords so please keep in mind that it’s more about POC less about implementation which you can copy/paste to your real/production system.

Configure AWS EKS using Terraform

There is perfect tutorial describing EKS deployment using Terraform: https://www.terraform.io/docs/providers/aws/guides/eks-getting-started.html As EKS availability is limited for now I chose Ireland region and modified a little configuration from tutorial.

EKS master configuration is very easy and require a few blocks:


resource "aws_iam_role" "eks-cluster" {
  name = "eks-cluster"

  assume_role_policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "eks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
POLICY
}

resource "aws_iam_role_policy_attachment" "AmazonEKSClusterPolicy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSClusterPolicy"
  role       = "${aws_iam_role.eks-cluster.name}"
}

resource "aws_iam_role_policy_attachment" "AmazonEKSServicePolicy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSServicePolicy"
  role       = "${aws_iam_role.eks-cluster.name}"
}

resource "aws_eks_cluster" "eks" {
  name            = "eks"
  role_arn        = "${aws_iam_role.eks-cluster.arn}"

  vpc_config {
    security_group_ids = ["${aws_security_group.eks-cluster.id}"]
    subnet_ids         = ["${aws_subnet.eks_a.id}","${aws_subnet.eks_b.id}"]
  }

  depends_on = [
    "aws_iam_role_policy_attachment.AmazonEKSClusterPolicy",
    "aws_iam_role_policy_attachment.AmazonEKSServicePolicy",
  ]
}

Important notes to lines:

21,26 – AWS delivered proper policies for us, we need only to attach them

30 – Create EKS cluster (masters) and place in defined network. That operation may take 10-20 minutes so be calm.

And that's almost all what is required to create EKS masters – the last thing which we need is kubectl config which is required to administrate Kubernetes. To obtain config we will use following Terraform output:

locals {
  kubeconfig = <<KUBECONFIG
apiVersion: v1
clusters:
- cluster:
    server: ${aws_eks_cluster.eks.endpoint}
    certificate-authority-data: ${aws_eks_cluster.eks.certificate_authority.0.data}
  name: kubernetes
contexts:
- context:
    cluster: kubernetes
    user: aws
  name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: aws
  user:
    exec:
      apiVersion: client.authentication.k8s.io/v1alpha1
      command: aws-iam-authenticator
      args:
        - "token"
        - "-i"
        - "eks"
KUBECONFIG
}

output "kubeconfig" {
  value = "${local.kubeconfig}"
}

There is one important thing there – aws-iam-authenticator command will be executed on your PC. It means that you need to go on https://github.com/kubernetes-sigs/aws-iam-authenticator/releases download cli and place it somewhere in your PATH e.g. in /usr/bin with name aws-iam-authenticator. After that execute following command to get your kubectl config:

 terraform output kubeconfig > kubeconfig.yaml

Adding nodes to EKS cluster is a little more complex.

Again attach policies delivered by AWS to role.

resource "aws_iam_role" "eks-node" {
  name = "eks-node"

  assume_role_policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
POLICY
}

resource "aws_iam_role_policy_attachment" "eks-node-AmazonEKSWorkerNodePolicy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy"
  role       = "${aws_iam_role.eks-node.name}"
}

resource "aws_iam_role_policy_attachment" "eks-node-AmazonEKS_CNI_Policy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy"
  role       = "${aws_iam_role.eks-node.name}"
}

resource "aws_iam_role_policy_attachment" "eks-node-AmazonEC2ContainerRegistryReadOnly" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly"
  role       = "${aws_iam_role.eks-node.name}"
}
resource "aws_iam_instance_profile" "eks-node" {
name = "eks-node"
role = "${aws_iam_role.eks-node.name}"
}

Next create Auto Scaling Group to get up Kubernetes node:

data "aws_ami" "eks-worker" {
  filter {
    name   = "name"
    values = ["amazon-eks-node-v*"]
  }

  most_recent = true
  owners      = ["602401143452"] # Amazon Account ID
}

locals {
  eks-node-userdata = <<USERDATA
#!/bin/bash
set -o xtrace
/etc/eks/bootstrap.sh --apiserver-endpoint '${aws_eks_cluster.eks.endpoint}' --b64-cluster-ca '${aws_eks_cluster.eks.certificate_authority.0.data}' 'eks'
USERDATA
}

resource "aws_launch_configuration" "node" {
  associate_public_ip_address = true
  iam_instance_profile        = "${aws_iam_instance_profile.eks-node.name}"
  image_id                    = "${data.aws_ami.eks-worker.id}"
  instance_type               = "t2.medium"
  name_prefix                 = "terraform-eks"
  security_groups             = ["${aws_security_group.eks-node.id}"]
  user_data_base64            = "${base64encode(local.eks-node-userdata)}"

  root_block_device {
    delete_on_termination = true
    volume_size = 20
    volume_type = "gp2"
  }

  lifecycle {
    create_before_destroy = true
  }
}

resource "aws_autoscaling_group" "node" {
  desired_capacity     = 1
  launch_configuration = "${aws_launch_configuration.node.id}"
  max_size             = 1
  min_size             = 1
  name                 = "eks"
  vpc_zone_identifier  = ["${aws_subnet.eks_a.id}"]

  tag {
    key                 = "Name"
    value               = "eks"
    propagate_at_launch = true
  }

  tag {
    key                 = "kubernetes.io/cluster/eks"
    value               = "owned"
    propagate_at_launch = true
  }
}

Important notes to lines:

1 – fetch AMI created by AWS which can be used as EKS node. AWS installed there proper scripts/configuration.

11 – command in user data for instance required by EKS to configure node and things like kubelet, systemd service etc.

23 – t2.medium should be enough to run Kafka (Zookeeper + Broker) and Debezium

30 – we need at least 20G disk as AMI has 20G size

39 – create ASG which will spawn one node for us

So as last step we need to apply following ConfigMap to allow nodes to join cluster, generated by Terraform:

locals {
  config-map-aws-auth = &lt;<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>&lt;CONFIGMAPAWSAUTH

apiVersion: v1
kind: ConfigMap
metadata:
  name: aws-auth
  namespace: kube-system
data:
  mapRoles: |
    - rolearn: ${aws_iam_role.eks-node.arn}
      username: system:node:{{EC2PrivateDNSName}}
      groups:
        - system:bootstrappers
        - system:nodes
CONFIGMAPAWSAUTH
}

output &quot;config-map-aws-auth&quot; {
  value = &quot;${local.config-map-aws-auth}&quot;
}

And create service account with admin rights to use in Kubernetes dashboard:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: eks-admin
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: eks-admin
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
- kind: ServiceAccount
  name: eks-admin
  namespace: kube-system

To get admin token which can be used in Kubernetes Dashboard login we can use following command:

kubectl -n kube-system describe secret $(./kubectl.sh -n kube-system get secret | grep eks-admin | awk '{print $1}') | grep token

To get full source code of those configs please take a look on: https://github.com/jakubbujny/article-replicate-cloud-AWS-RDS-MySQL-to-on-premise-PostgreSQL-in-Docker-future-is-today/tree/master/aws

Configure AWS RDS using Terraform

We need to configure Amazon RDS as our “source of truth” – database which we will replicate to on-prem deployment. Debezium will read binlog from MySQL to stream changes on Kafka.

resource "aws_db_instance" "rds" {
  apply_immediately = true
  publicly_accessible = true
  skip_final_snapshot = true
  identifier = "mymysql"
  availability_zone = "${local.region}a"
  allocated_storage    = 5
  storage_type         = "gp2"
  engine               = "mysql"
  engine_version       = "5.7"
  instance_class       = "db.t2.micro"
  name                 = "mydb"
  username             = "mymysql"
  password             = "mysqlmysql"
  db_subnet_group_name = "${aws_db_subnet_group.rds.name}"
  vpc_security_group_ids = ["${aws_security_group.rds.id}"]
  backup_retention_period = 1
  parameter_group_name = "${aws_db_parameter_group.rds.name}"
}

resource "aws_db_parameter_group" "rds" {
  family = "mysql5.7"
  name = "rds"

  parameter {
    name = "binlog_format"
    value = "ROW"
  }
}

Important notes to lines:

3 – make RDS public to easily access DB from our PC – we need to insert some data

15 – we need to create DB subnet group which contains at least 2 subnets even when we create single-AZ database – that’s requirement from AWS coming from possibility of promoting single-AZ deployment to multi-AZ deployment

17 – that’s important – automatic backups enable possibility of streaming binlog which is required by Debezium

21 – we need to attach custom parameter group to stream binlog in ROW format required by Debezium

Deploy Kafka and Zookeeper as StatefulSet on Kubernetes

Kafka requires Zookeeper to work what is well known fact. We can deploy both things as 2 different StatefulSets with AWS EBS attached for data. As we want to keep that POC simple we will just use one Kubernetes node and expose Kafka on node-port to allow on-prem client to connect. For Kafka deployment we will use popular docker images from Wurstmeister.

To allow dynamic provisioning of EBS volumes by Kubernetes we need to define following StorageClass

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: ebs
provisioner: kubernetes.io/aws-ebs
parameters:
  type: gp2
  zones: eu-west-1a
  fsType: ext4

In Zookeeper Dockerfile we can find following 2 volumes:

VOLUME ["/opt/zookeeper-${ZOOKEEPER_VERSION}/conf", "/opt/zookeeper-${ZOOKEEPER_VERSION}/data"]

So we need to attach EBS volumes to those 2 paths to keep our state safe.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  labels:
    app: zookeeper
spec:
  selector:
    matchLabels:
      app: zookeeper
  serviceName: "zookeeper"
  replicas: 1
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: zookeeper
        image: wurstmeister/zookeeper:3.4.6
        ports:
        - containerPort: 2181
          name: zookeeper
        volumeMounts:
        - name: zookeeper-data-pv-claim
          mountPath: /opt/zookeeper-3.4.6/data
        - name: zookeeper-conf-pv-claim
          mountPath: /opt/zookeeper-3.4.6/conf
  volumeClaimTemplates:
    - metadata:
        name: zookeeper-data-pv-claim
      spec:
        accessModes: [ "ReadWriteOnce" ]
        storageClassName: ebs
        resources:
          requests:
            storage: 1Gi
    - metadata:
        name: zookeeper-conf-pv-claim
      spec:
        accessModes: [ "ReadWriteOnce" ]
        storageClassName: ebs
        resources:
          requests:
            storage: 1Gi

So by defining volumeClaims we say to Kubernetes that he should create dynamically  EBS volumes using AWS API, make ext4 file system and mount them in proper paths. All more what we need is service which will allow to access Zookeeper inside cluster.

apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  labels:
    app: zookeeper
spec:
  ports:
  - port: 2181
    name: zookeeper
  clusterIP: None
  selector:
    app: zookeeper

clusterIP: None means that service is Headless service so Kubernetes will expose POD’s IP directly under service’s DNS record.

Kafka deployment is a little tricky because of “advertised listener” – it means that we need to put node’s public IP which will be presented to clients. We cannot easily use e.g. Kubernetes Downward API as status.hostIP will be private node’s IP, not accessible from on-prem infrastructure. So for keeping POC simple we will just fetch nodes’s public IP using query by tag name:eks via AWS CLI.

aws --region eu-west-1 ec2 describe-instances --filters "Name=tag:Name,Values=eks" --filters "Name=instance-state-name,Values=running" --query "Reservations[0].Instances[0].PublicIpAddress"

then inject it to proper environment variable during deployment by sed NODE_PUBLIC_IP in Kafka’s yaml file using following script:

kafka_file=$(cat kafka.yaml | sed -- expression="s/NODE_PUBLIC_IP/$public_node_ip/g")
echo "$kafka_file" | kubectl apply -f -

That simple trick read Kafka deployment file from disk, make sed in the fly and then apply it to Kubernetes cluster by sending changed file on kubectl STDIN.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: "kafka"
  replicas: 1
  template:
    metadata:
      labels:
        app: kafka
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: kafka
        image: wurstmeister/kafka:2.11-2.0.0
        env:
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper:2181
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://NODE_PUBLIC_IP:30002
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        ports:
        - containerPort: 9092
          name: kafka
        volumeMounts:
        - name: kafka-data-pv-claim
          mountPath: /kafka
  volumeClaimTemplates:
    - metadata:
        name: kafka-data-pv-claim
      spec:
        accessModes: [ "ReadWriteOnce" ]
        storageClassName: ebs
        resources:
          requests:
            storage: 1Gi

So again we attach EBS disk for data on Kafka and we connect Kafka to Zookeeper using service (zookeeper:2181). What we need next is Kafka service so Debezium can access it locally for bootstrap and we need to expose Kafka on NodePort.

apiVersion: v1
kind: Service
metadata:
  name: kafka
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: kafka
  clusterIP: None
  selector:
    app: kafka
---

apiVersion: v1
kind: Service
metadata:
  name: kafka-node
  labels:
    app: kafka-node
spec:
  ports:
  - port: 9092
    targetPort: 9092
    name: kafka
    nodePort: 30002
  type: NodePort
  selector:
    app: kafka

Please note that we expose Kafka on non-standard port 30002 due to default NodePort range 30000-32767

After those operations there should be working deployment of Kafka on Kubernetes wich can be tested by connecting from local PC on node_public_ip:30002

Deploy and configure Debezium on Kubernetes

Debezium is type of Kafka Connector what means that it stores whole state in Kafka so we can deploy it as Kubernetes Deployment because from Kubernetes point of view it’s stateless application.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: debezium-connect-source
spec:
  selector:
    matchLabels:
      app: debezium-connect-source
  replicas: 1
  template:
    metadata:
      labels:
        app: debezium-connect-source
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: debezium-connect-source
        image: debezium/connect:0.8
        env:
        - name: BOOTSTRAP_SERVERS
          value: kafka:9092
        - name: GROUP_ID
          value: debezium-connect-source
        - name: CONFIG_STORAGE_TOPIC
          value: debezium-connect-source_config
        - name: OFFSET_STORAGE_TOPIC
          value: debezium-connect-source_offset
        ports:
        - containerPort: 8083
          name: dm-c-source

There are 3 important env variables which are perfectly described on Debezium’s dockerhub site:

GROUP_ID
This environment variable is required when running the Kafka Connect service. Set this to an ID that uniquely identifies the Kafka Connect cluster the service and its workers belong to.

CONFIG_STORAGE_TOPIC
This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector configurations. The topic must have a single partition and be highly replicated (e.g., 3x or more).

OFFSET_STORAGE_TOPIC
This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector offsets. The topic must have a large number of partitions (e.g., 25 or 50), be highly replicated (e.g., 3x or more) and should be configured for compaction.

Last step on Kubernetes is to expose Debezium on NodePort so we can easily configure it from PC.

apiVersion: v1
kind: Service
metadata:
  name: debezium-connect-source
  labels:
    app: debezium-connect-source
spec:
  ports:
  - port: 8083
    targetPort: 8083
    name: debezium-connect-source
    nodePort: 30001
  type: NodePort
  selector:
    app: debezium-connect-source

Now is time to configure Debezium – it can be done using web API exposed by Debezium where we need to define connector with proper options. So first configuration json:

{
  "name": "my-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mymysql.clmru7r0arad.eu-west-1.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "mymysql",
    "database.password": "mysqlmysql",
    "database.server.id": "184054",
    "database.server.name": "mydebezium",
    "database.whitelist": "mydb",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.mydb",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}

Important notes to lines:

6 – there should be RDS host created by Terraform

12 – here we define which database we want to observe

15:18 – here you can find deeper explanation about those lines: https://debezium.io/blog/2017/09/25/streaming-to-another-database/

Nice explanation about parameters: https://debezium.io/docs/connectors/mysql/#example-configuration

Save that json to source.json file and then you can execute following command:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"  -d @source.json http://$public_node_ip:30001/connectors/

Where public_node_ip is Eks node. That curl will configure connector so stream of changes in database should start.

Load data

We need to put some data in AWS MySQL which can be streamed now to Kafka. To do that we can run following sql script:


 CREATE TABLE IF NOT EXISTS my_debezium_test (
  ID int NOT NULL,
  test varchar(255),
  PRIMARY KEY (ID)
);

INSERT INTO my_debezium_test VALUES(1, 'hello!');
INSERT INTO my_debezium_test VALUES(2, 'world!');
INSERT INTO my_debezium_test VALUES(3, 'it rox!!');

UPDATE my_debezium_test SET test = "my friend!" WHERE ID = 2;

DELETE FROM my_debezium_test WHERE ID = 1;

So we create my_debezium_test table with ID field as PK and then make some inserts, update and delete to observe what will happen. Debezium should capture those changes and stream them to Kafka in real time.

Tip: you can use docker to execute mysql client to run that SQL:

sql=$(cat rds.sql)
docker run -it  --rm mysql bash -c "echo '$sql' | mysql -h${rds_endpoint_without_port} -umymysql -pmysqlmysql mydb"

Start on-prem deployment

To make quick on-prem deployment we can use docker-compose where we need to start Kafka connector which will connect to Kafka in AWS and consume changes stream, pushing them to PostgreSQL.

version: '3'

volumes:
  postgres-data:

services:
  debezium:
    image: debezium/connect-jdbc:0.8
    build:
      context: .
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=${PUBLIC_NODE_IP}:30002
      - GROUP_ID=debezium-connect-sink
      - CONFIG_STORAGE_TOPIC=debezium-connect-sink_config
      - OFFSET_STORAGE_TOPIC=debezium-connect-sink_offset

  postgres:
    image: postgres
    volumes:
      - postgres-data:/var/lib/postgresql/data

Important notes to lines:

4 and 22 – we define docker named volume to store PostgreSQL data in it

13 – again we need to put connection to Kafka using EKS node IP and also place configuration required by Kafka Connect interface

10 – that line says that we need to build custom connect, let’s take a look on that

FROM debezium/connect:0.8
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

# Deploy PostgreSQL JDBC Driver
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-42.1.4.jar

# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
	curl -sO http://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/3.3.0/kafka-connect-jdbc-3.3.0.jar

So we need to extend debezium/connect image to add JDBC driver specific for PostgreSQL so our connector can consume changes from Kafka and push them to PostgreSQL.

So next step is to start those containers and configure JDBC sink using following JSON config:

 {
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "my_debezium_test",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "ID",
    "pk.mode": "record_value"
  }
}

Important notes to lines:

6 – it’s name of table which will be consumed, created in AWS MySQL in previous step

8,9 – explanation here https://debezium.io/blog/2017/09/25/streaming-to-another-database/

12 – Primary Key field name, created in AWS MySQL in previous step

After that configuration captured changes should be pushed to on-prem PostgreSQL database.

Results

So our PostgreSQL should contain now automatically created my_debezium_test table but what about data? There is small surprise here. INSERT and UPDATE are perfectly replicated but there is problem with DELETE operation as it’s currently dropped by Jdbc sink connector. There is opened pull request for that on Github https://github.com/confluentinc/kafka-connect-jdbc/pull/282

I asked about this on my Twitter and I received response from Gunnar Morling that workaround for that problem is coming in Debezium 0.9.x version:

Edit:

Debezium guys decided to deliver that feature faster than expected! Nice!

 

So as conclusion I can say that solution is not perfect and in case of pure database replication there are some missing points but in my opinion it’s very big step forward as replicating database from MySQL to PostgreSQL in producer/consumer pattern via queue is something very innovative and amazing – that define new way to create reliable and HA global applications distributed across whole world.

What stores Kubernetes in Etcd?

Introduction

Kubernetes uses Etcd to store information what’s happening on cluster – it means that master nodes read/write data from/to Etcd cluster to maintain cluster state. Etcd itself is simple distributed key-value store which uses Raft consensus algorithm.

As I like to dig deeper and I couldn’t find how looks data structure stored in Etcd I decided to make a little reverse engineering on my local Minikube cluster. To go trough that article by yourself you need installed Minikube on your machine.

Kubernetes version used: 1.10

How to get into Etcd in Minikube

Firstly we need to discover way how to see data stored in Etcd in Minikube – it’s not so trivial as Etcd has some security mechanisms which blocks us to just read data from any container on Kubernetes or local machine. That security is natural as Kubernetes store there e.g. secrets, private keys, etc. as plain text if encryption is not activated (default). Also writing data directly into Etcd gives us effectively admin rights on cluster.

Firtly start your minikube:

minikube start

And launch Kubernetes dashboard:

minikube dashboard

In kube-system namespace we can find etcd-minikube POD which in we are interested – in start command section we can see some useful things:

etcd
--advertise-client-urls=https://127.0.0.1:2379
--cert-file=/var/lib/localkube/certs/etcd/server.crt
--trusted-ca-file=/var/lib/localkube/certs/etcd/ca.crt
--peer-cert-file=/var/lib/localkube/certs/etcd/peer.crt
--peer-key-file=/var/lib/localkube/certs/etcd/peer.key
--listen-client-urls=https://127.0.0.1:2379
--client-cert-auth=true
--peer-client-cert-auth=true
--data-dir=/data/minikube
--key-file=/var/lib/localkube/certs/etcd/server.key
--peer-trusted-ca-file=/var/lib/localkube/certs/etcd/ca.crt
  1. We see that client interface is bound to 127.0.0.1 so we are able to access Etcd only using localhost (not for example POD IP)
  2. We see that Etcd listen on 2379 port what is default Etcd port
  3. –client-cert-auth=true says us that certificate authentication is activated what means that we need trusted client certificate to get into Etcd

That’s a little strange – how any Kubernetes service can access Etcd if it’s listening only on localhost? So answer is in POD descriptor:

"hostNetwork": true

What means that Etcd’s docker container bind to host’s network interface. So who is connecting on localhost:2379 on Minikube? Let’s go into Minikube VM by using:

minikube ssh

In Minikube VM we have Sysdig available so we can check who is using that port:

sudo sysdig fd.port=2379

....
194074 10:30:39.093997519 1 etcd (3379) > read fd=20(<4t>127.0.0.1:57152->127.0.0.1:2379) size=1024 
194075 10:30:39.094001352 1 etcd (3379) < read res=-11(EAGAIN) data= 
194219 10:30:39.095750456 0 etcd (3236) > write fd=20(<4t>127.0.0.1:57152->127.0.0.1:2379) size=46 
194229 10:30:39.095821582 0 etcd (3236) < write res=46 data=....)......#..+jk.....I5o..N{z....e.9.;..wf... 
194240 10:30:39.095941041 1 kube-apiserver (3360) > read fd=16(<4t>127.0.0.1:57152->127.0.0.1:2379) size=2048 
194244 10:30:39.095983703 1 kube-apiserver (3360) < read res=46 data=....)......#..+jk.....I5o..N{z....e.9.;..wf... 
194247 10:30:39.096011985 1 kube-apiserver (3360) > read fd=16(<4t>127.0.0.1:57152->127.0.0.1:2379) size=2048 
194249 10:30:39.096014883 1 kube-apiserver (3360) < read res=-11(EAGAIN) data= 
.....

We see that kube-apiserver (as expected) communicate with Etcd over localhost (so kube-apiserver is also bound to host’s network interface). So let’s take a look on kubeapi starting command in Dashboard:

kube-apiserver
--admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota
--enable-bootstrap-token-auth=true
--requestheader-extra-headers-prefix=X-Remote-Extra-
...
...
--etcd-servers=https://127.0.0.1:2379
--etcd-cafile=/var/lib/localkube/certs/etcd/ca.crt
--etcd-certfile=/var/lib/localkube/certs/apiserver-etcd-client.crt
--etcd-keyfile=/var/lib/localkube/certs/apiserver-etcd-client.key

What is important here:

  1. We have confirmation that etcd and kube-apiserver communicate over localhost
  2. CA file is required to accept Etcd server certificate
  3. Kube-apiserver contains own certificates to authenticate in Etcd

To see data stored in Etcd we can borrow certificates from kube-apiserver and inject them into Etcd container as there is installed etcdctl – CLI which allows to access data in Etcd.

We need to find etcd and apiserver pods. Kubectl is automatically configured to target minikube:

kubectl get pods --namespace kube-system                                                                                              
NAME                                    READY     STATUS    RESTARTS   AGE
etcd-minikube                           1/1       Running   0          3m
kube-addon-manager-minikube             1/1       Running   2          4d
kube-apiserver-minikube                 1/1       Running   0          3m
kube-controller-manager-minikube        1/1       Running   0          3m
kube-dns-86f4d74b45-hpzx9               3/3       Running   9          4d
kube-proxy-tzlzs                        1/1       Running   0          2m
kube-scheduler-minikube                 1/1       Running   2          4d
kubernetes-dashboard-5498ccf677-fs5vk   1/1       Running   6          4d
storage-provisioner                     1/1       Running   6          4d

So we see etcd and apiserver pods – we need to copy certificates from apiserver to etcd:

kubectl cp --namespace kube-system kube-apiserver-minikube:var/lib/localkube/certs/apiserver-etcd-client.crt apiserver-etcd-client.crt
kubectl cp --namespace kube-system apiserver-etcd-client.crt etcd-minikube:var/lib/localkube/certs/

kubectl cp --namespace kube-system kube-apiserver-minikube:var/lib/localkube/certs/apiserver-etcd-client.key apiserver-etcd-client.key
kubectl cp --namespace kube-system apiserver-etcd-client.key etcd-minikube:var/lib/localkube/certs/

And go into Etcd container:

kubectl exec -it --namespace kube-system etcd-minikube sh

We need to set etcdctl tool to v3 API version using following environment variable:

export ETCDCTL_API=3

And test if we have access to Etcd:

/var/lib/localkube/certs # etcdctl --cacert="etcd/ca.crt" --key=apiserver-etcd-client.key --cert=apiserver-etcd-client.crt endpoint status 
127.0.0.1:2379, 8e9e05c52164694d, 3.1.12, 2.2 MB, true, 3, 18349

Data structure description

To get all keys from Etcd we need to type:

/var/lib/localkube/certs # etcdctl --cacert="etcd/ca.crt" --key=apiserver-etcd-client.key --cert=apiserver-etcd-client.crt get / --prefix --keys-only

Output is really big and divided into some namespaces – let’s look on them.

 

/registry/apiregistration.k8s.io/apiservices/{version}.{api name}

Contains definition of API Services in Kubernetes so we can find there all existing core APIs used by Kubernetes like /registry/apiregistration.k8s.io/apiservices/v1.batch or /registry/apiregistration.k8s.io/apiservices/v1beta1.rbac.authorization.k8s.io or custom APIs definition (see https://github.com/kubernetes-incubator/apiserver-builder/blob/master/docs/concepts/aggregation.md). You can get information about APIs by reading value of that key in Etcd (you will get human-readable json) or in more friendly way by using kubectl get apiservice v1beta1.authorization.k8s.io -o json (the same value as in direct Etcd access)

/registry/clusterroles/{role name}

Contains definition of all cluster-wide roles in Kubernetes so we can find there things like /registry/clusterroles/cluster-admin or /registry/clusterroles/system:kube-scheduler. Data in Etcd is human readable but hard to understand – we can see there some actions like get, patch, update on some parts of API

/registry/clusterrolebindings/{entity name}

Contains binding between roles and users/groups/service accounts which are cluster-wide so we can find there things like /registry/clusterrolebindings/cluster-admin or /registry/clusterrolebindings/kubeadm:kubelet-bootstrap. Data in Etcd is human readable but hard to understand.

/registry/roles/{namespace}/{role name} and /registry/rolebindings/{namespace}/{entity name}

Same story as in cluster roles/bindings but scoped by namespace e.g. /registry/roles/kube-system/system:controller:token-cleaner

/registry/serviceaccounts/{namespace}/{name}

Definition of all service accounts

/registry/configmaps/{namespace}/{map name}

All configs maps stored as yamls

/registry/controllerrevisions/{namespace}/{pod}

I found ControllerRevision resource is used to provide rollback possilibities in DaemonSet and StatefulSet (https://kubernetes.io/docs/tasks/manage-daemon/rollback-daemon-set/). In Etcd we can find snapshot of pods spec.

/registry/daemonsets/{namespace}/{name} and /registry/deployments/{namespace}/{name} etc.

Under those keys Kubernetes stores information about different deployments like DaemonSet, Deployment, ReplicaSet, Job, etc. What’s interesting in case of deployment we see there last-applied-configuration described there https://kubernetes.io/docs/concepts/overview/object-management-kubectl/declarative-config/#merge-patch-calculation

/registry/minions/{node name}

Kubernetes nodes were previously called “minions” so in Etcd name is still not changed. We see there big amount of data describing node like:

  • CPU cores
  • Memory size
  • Status of kubelet: e.g. kubelet has sufficient disk space available or kubelet has sufficient PID available
  • Ip address
  • Hostname
  • Docker version
  • Docker image/registry/ranges/servicenodeportss available on node

/registry/namespaces/{namespace}

Just defining namespace. There is also state of particular namespace like Active or Terminating.

/registry/pods/{namespace}/{pod name}

State of every pod running in cluster. Contains a lot of information like pod IP, mounted volumes, docker image etc.

/registry/ranges/serviceips

CIDR for services

/registry/ranges/servicenodeports

Ports range for exposing services

/registry/secrets/{namespace}/{pod}

All secrets in cluster stored as plain text in default mode. For encryption see https://kubernetes.io/docs/tasks/administer-cluster/encrypt-data/

/registry/services/endpoints/{namespace}/{name}

Services definition. Kubernetes calculates which pods are selected by particular service and stores that information in service value so we can see pods ip addresses and names there.

Stream microservices logs – how Riemann simplify things

Introduction

This article is inspired by my step in into Clojure. I want to show you again my considerations about microservices logs streaming and analysis. I need to write it once again: logs from application are one of the most important things when it comes to debugging problems on production system. Nowadays it’s also important for DevOps team to react when bad things happen in system so logs analysis is must-have if you want to detect problems in your application before your business client will call you at night. I think that current, modern approach for logs analysis would involve machine learning algorithms which can detect real issues more precisely and ignore “noise” which is in system all the time. That article show improved architecture which originated there: https://jakubbujny.com/2018/07/02/stream-microservices-logs-from-kubernetes-to-kafka/ 

This time I’m making improvement by simplifying whole architecture and using existing frameworks to achieve goals.

riemann-in-action

As you can see in that design microservices (which are Python 3 programs in my example) are pushing JSON logs directly to Kafka without using Fluentd to collect logs. I think that’s better approach because:

  1. Fluentd is out – one technology less, one problem less
  2. It’s easier to keep consistent configuration like this between deployments and local developer’s environment. It means that you can use that configuration on Kubernetes cluster in cloud and on local dockers – in case of fluentd it’s not so straightforward as fluentd usually attach to microservices logs files what is hard to simulate on local dockers

Important thing to point there is that microservices should have standardized and consistent logs structure. I would really advise to create logs in JSON format as it explicitly declare what is in your logs so it eliminates problems in parsing. Here is example log

{
  "level" : "info",
  "timestamp" : "2018-08-19T16:41:41+00:00",
  "path" : "program.py",
  "line" : "10",
  "message": "My log"
}

That approach has also one disadvantage – size of logs grow. If your application produce big number of logs and you are afraid that it may be a problem to add name for every field in log you can consider Avro or Protocol Buffers which serialize data to binary format and use schema but sadly they are not human readable so please consider using there some “switch” for production and non-production environments.

Again Kafka will act in this design as efficient buffer for logs and infrastructure decouple mechanism. If we are streaming logs to Kafka we are sure that they will be saved even when logs consumer has problems/is down. As Kafka is extremely efficient we can be calm about our logs. Also we can attach many consumers to get our logs so alerting is possible, storing them in S3 or Elasticsearch, feeding machine learning models, etc. Kafka also decouples parts of infrastructure so we can avoid “infrastructure spaghetti” when X infrastructure components talk directly to Y other infrastructure components causing complexity on X*Y level.

Next part of technology stack in this example is Riemann – server/framework for handling event streams and attaching logic/alerts on them. Riemann is based on Clojure and allow to direct data streams from one source to another and to make analysis in middle of that. You can find great introduction to Riemann on that site: https://www.kartar.net/2014/12/an-introduction-to-riemann/

I need to quote first paragraph from that article:

“For the last year I’ve been using nights and weekends to look to a variety of monitoring and logging tools. For reasons. I’ve spent a lot of hours playing with Nagios again (some years ago I wrote a book about it) as well as looking at tools like Sensu and Heka. One of the tools I am reviewing and am quite excited about is Riemann.”

My feelings about Riemann are the same – in this article Riemann will consume logs from Kafka, push them to Elasticsearch to allow searching using Kibana and make analysis to alert developer that high number of errors happen in particular python file.

As always full source code can be found on github: https://github.com/jakubbujny/article-stream-microservices-logs-how-riemann-simplify-things

Python microservice

So at first we need microservice which will generate logs. I wrote really dummy implementation which generate quite big number of logs on info and error level. First part is to create python login handler which will push logs to Kafka:

import logging

from confluent_kafka import Producer

class KafkaHandler(logging.Handler):

    def __init__(self, kafka_bootstrap_servers="kafka:9092", kafka_topic="program-logs", data_serializer_func=lambda v: v.encode('utf-8')):
        logging.Handler.__init__(self)
        self.producer = Producer({'bootstrap.servers': kafka_bootstrap_servers})
        self.kafka_topic = kafka_topic
        self.data_serializer_func = data_serializer_func

    def delivery_report(self, err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    def emit(self, record):
        try:
            self.producer.poll(1.0)
            msg = self.format(record)
            self.producer.produce(self.kafka_topic, self.data_serializer_func(msg), callback=self.delivery_report)
        except Exception as e:
            print(e)
            logging.Handler.handleError(self, record)

    def close(self):
        self.producer.flush(timeout=1.0)

As you can see it simply extend logging handler by adding Confluent Kafka producer which take log JSON string and push on program-logs Kafka topic. Delivery report is added to easier POC diagnosis but should be probably deleted in real application.

Next part is to create JSON logs – I used for that example from https://blog.sneawo.com/blog/2017/07/28/json-logging-in-python/ – our logs will contain fields like:

  • message – log message
  • lineno – line number
  • pathname – name of python file
  • level – level of logs e.g. info, error

After configuration I simply start infinitive loop which calls 2 subprograms to achieve different pathname which will be used in Riemann.

while True:
    print("Logging loop start")
    sys.stdout.flush()
    subprogram1.random_log()
    subprogram2.random_log()
    print("Logging loop end")
    sys.stdout.flush()
    time.sleep(0.01)

So that loop will generate 2 logs record every 10ms so quite big number.

def random_log():
    log = structlog.getLogger(__name__)
    if random() > 0.7   :
        log.error("Fatal error")
    else:
        log.info("I'm fine")

Random is used to generate errors with lower probability that regular logs.

Environment

To run that application in form of POC we need of course local environment which with we can play on local machine. As I wrote in https://jakubbujny.com/2018/02/18/useful-docker-1-seed-container-at-runtime/

“Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools.”

So of course we will use docker-compose file to run local environment:

version: '3'

services:
  zookeeper:
    image: confluent/zookeeper:3.4.6-cp1

  kafka:
    image: confluent/kafka:0.10.0.0-cp1
    links:
      - zookeeper:zookeeper

  riemann:
    image: riemannio/riemann:0.3.1
    volumes:
      - ./riemann.config:/etc/riemann.config
    environment:
      - HOST_IP=192.168.1.20

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.3.2

  kibana:
    image: docker.elastic.co/kibana/kibana:6.3.2
    ports:
      - 5601:5601
  program:
    build:
      context: .

Comments to lines:

4,7 – run Kafka with Zookeeper

15 – map riemann.config clojure file to configure Riemann

17 – here is little trick to access host from docker container so it’s my WIFI adress – you should put your host IP there

19, 22 – run Elasticsearch and Kibana to store and browse logs

25 – publish Kibana on localhost:5601 to browse logs

26 – Python microservices, built using Dockerfile which add all python files and install requirements

Riemann in action

Here comes riemann.config. At first we need to consume JSON logs from Kafka topic. It’s very easy as Kafka consumer is part of Riemann core so we need to just:

(kafka-consumer {:consumer.config {:bootstrap.servers "kafka:9092"
                                   :group.id "riemann"}
                 :topics ["program-logs"]})

So now Riemann consume messages from Kafka and allow us to manipulate that stream. As next step we want to push that stream to Elasticsearch.

(def elastic
  (elasticsearch {:es-endpoint "http://elasticsearch:9200"
                  :es-index "riemann"
                  :index-suffix "-yyyy.MM.dd"
                  :type "event"}
                  (fn [event]
                        (merge event {}))))
(streams
  (where (not (tagged "riemann")) elastic)
)

Streams is Riemann function which allow to attach logic into events. In this case we are filtering events to exclude those tagged as “riemann” as we don’t want to collect Riemann events which inform us about his metrics – we want only application events/logs. After that operation we can browse logs in Kibana. Please note that JSON is exploded to fields in Elasticsearch so we can easily filter by level or path.

Screenshot from 2018-08-18 23-05-02

Last part will be to push notifications to developer. On Ubuntu you can send notification to GUI from terminal by using notify-send command. Sadly it’s not easy to use that command from docker container to send notification to host’s GUI. As a workaround I started following bash script on host machine (script found on stackoverflow):

#!/bin/bash

# no multiple connections: needs to improve
while true; do
    line="$(netcat -l -p 10000)"
    notify-send -- "Received Message" "$line"
done

So that code listen on 10000 TCP port and send notification with string received. All we need to do is to send string from Riemann docker to host IP on port 10000 – that explain why I placed HOST_IP in docker-compose file.

To send string on TCP socket we will use Java class within Clojure/Riemann as I not found some Clojure-native way.

(defn- transmit
  [writer socket message]
  (let [bytes-out   (bytes (byte-array (map byte message)))]
    (.write writer bytes-out 0 (count bytes-out))
    (.flush writer)))

(defn send-warning
  [event]
  (with-open [socket (java.net.Socket.  (System/getenv "HOST_IP") 10000)
              writer (java.io.DataOutputStream. (.getOutputStream socket))]
    (transmit writer socket
    (str "FOUND PROBLEM IN "(get event :pathname)":"(get event :lineno) )
    )
    ))

That program open TCP socket on HOST_IP:10000 and write there warning string with pathname and lineno from event to inform developer that something bad happen in his code.

And final step is to attach alerting logic on events stream. To make example a little more advanced let’s say that we want to send warning only if logs with level error occurred more than 25 times within 10 seconds window for particular python file.

( streams
    (where (= (:level event) "error")
        (with :metric 1
            (by :pathname
                (rate 10
                      (where (> metric 25)
                            send-warning
                        )
                )
    )))
)

So to achieve that goal we are adding additional field :metric to every event with error level. Then “by” split event stream for us to N streams grouped by “pathname”. Rate command make 10 second windows and sum fields within that window so we can observe if added :metric is above 25 – if yes just send warning on TCP socket. Effect:

Untitled

So that’s first article showing power of Riemann – I promise I will comeback to Riemann in next article with some more advanced alerting strategy as it’s real fun to write on Clojure.

Form your cloud#1 Please smile! Highly available web camera photo upload to s3 bucket

Introduction

This article show how to create simple highly available web application which allow to upload photo from your web camera to s3 bucket. Whole example can be done using AWS Free Tier. Technology stack:

  • Terraform
  • Ansible
  • Packer
  • AWS VPC
  • AWS EC2
  • AWS ASG
  • AWS ALB
  • AWS S3
  • AWS IAM
  • Piece of HTML5&JS
  • Piece of go lang
  • Certbot

Full source can be found there: https://github.com/jakubbujny/article-form-your-cloud-1 

Application contain 3 main parts:

  • Frontend
    • HTML5 video capture from camera
    • Snap photo and send it to backend using JQuery Ajax call
    • List existing photos in bucket as hyperlinks
  • Backend
    • Serve frontend
    • Integrate with S3
    • Expose actions like upload, list, get particular image
  • Infrastructure
    • Bring up EC2 instances in ASG from baked AMI with application
    • Spread across 3 AZs
    • Spawn ALB to access application

Proposed design:

form-you-cloud-1 (1)

Go app

Our application is really simple web application, exposing some endpoints in HTTP protocol and integrating with AWS S3.

Main libraries:

Endpoints :

  • GET / – dummy web UI with camera capture using HTML5 video and list of photos
  • GET /image/{image_name} – get particular image/photo
  • POST /image – save new image/photo

Screen of simple UI – you can see A4 paper which I’m showing to camera

Screenshot from 2018-07-27 19-12-04

That UI is served directly from go as it’s saved in go application as big multi-line string. List of images is generated using built-in template engine in go:

{{ range $key, $value := . }}
	<li><a href="/image/{{ $value.Key }}">{{ $value.Key }}</a></li>
{{ end }}

Map to iterate is generated in following way:


func GetRootSite(ResponseWriter http.ResponseWriter, _ *http.Request) {
	svc := SpawnAwsSdkS3Service()
	resp, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: aws.String(bucket)})
	if err != nil {
		panic(err)
	}

	tmpl, err := template.New("hello").Parse(site_template)
	if err != nil { panic(err) }
	err = tmpl.Execute(ResponseWriter, resp.Contents)
	if err != nil { panic(err) }
}

Comments to lines:

2 – my wrapper for S3 session from AWS SDK

3 – list bucket to get existing images

8 – parse site_template – big string in code with HTML/JS UI

10 – render template directly to HTTP response, passing as parameter list from line 3

Trick with HTML5/JS camera comes from: https://davidwalsh.name/browser-camera

I modified a little script to snap camera view into hidden canvas and send it to endpoint using JQuery AJAX call after clicking Snap&Upload button. To see effect I just simply reload site after upload to add image on list. What interesting here

canvas.toDataURL("image/jpeg")

return snapped photo as base64 with content-type header. So I’m just sending that to backend and then simply remove the header and convert that to bytes:

decoded, err := base64.StdEncoding.DecodeString(strings.Split(string(body), ",")[1])

S3 file upload is like:

_ , err1 := svc.PutObject(&s3.PutObjectInput{Key: aws.String(strconv.FormatInt(time.Now().UnixNano(), 10)+".jpg"), Bucket: aws.String(bucket), Body: bytes.NewReader(decoded)})

so create new file in s3 bucket with numeric timestamp name.

Last important sentence: application take s3 bucket name as starting argument to allow dynamic configuration and avoid hardcoding that in code.

How about state?

As we have application we can take a look on infrastructure/deployment now. Firstly, before working with Terraform, it’s good practice to store state in S3 bucket.

variable "state_bucket_name" {

}

resource "aws_s3_bucket" "state" {
  bucket = "${var.state_bucket_name}"
  acl    = "private"

  versioning {
    enabled = true
  }

}

Activated versioning is important, especially in case of damaged state when you can rollback state to previous version as fixing damaged state in Terraform is something what definitely should be avoided. Also we want to keep that bucket private as it’s showing some internals of infrastructure construction so it would be useful for hackers.

Ok so we have bucket for state but building that s3 bucket also generate state – so it’s classic chicken and egg problem, where should I store that state? As it’s one timer operation it’s totally safe to store that state in file system so usually I commit that to git repository.

To tell Terraform to use that state bucket we need to put following config:

terraform {
  backend "s3" {
    key = "deployment"
  }
}

Where key is path to state file inside the bucket.

After adding that config we need to properly initialize Terraform and activate s3 state store with command:

terraform init -backend-config="bucket=bucket_name" -backend-config="region=bucket_region"

Bake image with Ansible and Packer

As application is ready it need to be delivered to AWS somehow. To achieve that we are going to use Ansible and Packer to build application and create AMI following immutable infrastructure pattern.

To start with Packer we need subnet in AWS as flow which Packer makes for you is like:

  • start new EC2 instance
  • apply Ansible playbook
  • create AMI
  • destroy EC2 instance

Code for placing subnet in TF:

resource "aws_subnet" "packer" {
  vpc_id = "${aws_vpc.main.id}"
  cidr_block = "${local.packer_cidr}"
  availability_zone = "${local.region}a"

  tags {
    Name = "packer"
  }
}

resource "aws_route_table_association" "packer" {
  subnet_id = "${aws_subnet.packer.id}"
  route_table_id = "${aws_route_table.internet.id}"
}

Comments to lines:

3 – it’s good practice to define your network CIDRs in one file and refer that using TF locals as it gives you better overview of network design

Now we need Ansible playbook to build application and transfer it to destination machine what can be done in naive way as Packer actually start Ansible on your machine to provision EC2 instance for building AMI.

---
- hosts: all
  become: yes
  gather_facts: yes
  vars:
    ansible_python_interpreter: /usr/bin/python3
  tasks:
    - name: build app
      become_user: jbujny
      local_action: shell go build chdir=../../goapp
    - name: copy go app
      copy:
        src: ../../goapp/goapp
        dest: /opt/goapp
        mode: 0777

Comments to lines:

6 – you need to place that if you want to run Ansible on some new Ubuntu to tell Ansible that it should use python3 instead of python2

10 – build our go app on host machine (my PC)

11 – copy that application to remote host

So finally packer json file:

{
  "description": "Go app for uploading files",
  "builders": [
    {
      "type": "amazon-ebs",
      "ami_name": "go-app-{{ timestamp }}",
      "region": "eu-central-1",
      "source_ami_filter": {
        "filters": {
          "virtualization-type": "hvm",
          "name": "ubuntu/images/*ubuntu-bionic-18.04-amd64-server-*",
          "root-device-type": "ebs"
        },
        "owners": ["099720109477"],
        "most_recent": true
      },
      "instance_type": "t2.micro",
      "ssh_username": "ubuntu"
    }
  ],
  "provisioners": [
    {
      "type": "ansible",
      "playbook_file": "playbook.yml",
      "user": "ubuntu"
    }
  ]
}

Comments to lines:

6 – that name is important as we will refer to that in Terraform

8 – find Ubuntu 18.04 as base image

18 – that user has sudo on Ubuntu machines in AWS

21 – tell Packer to use Ansible to provision AMI

Now we can start packer by just typing:

packer build packer.json

and packer should create AMI for us so we have ready image with application – now it’s time for infrastructure.

Network

Our application need following network to operate:

  • VPC
  • 3 subnets for auto scaling group to spread EC2 instances and make multi-az HA application
  • 3 subnets for application load balancer
  • security group for ec2 instances
  • security group for ALB

We are going to place network in separate terraform state to make better isolation between different part of application.

Firstly to allow communication to clients we need internet gateway and proper routing:

resource "aws_internet_gateway" "internet" {
  vpc_id = "${aws_vpc.main.id}"

  tags {
    Name = "main"
  }
}

resource "aws_route_table" "internet" {
  vpc_id = "${aws_vpc.main.id}"

  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = "${aws_internet_gateway.internet.id}"
  }

  tags {
    Name = "internet"
  }
}

Here is example of single subnet which is part of ASG.

resource "aws_subnet" "asg_a" {
  vpc_id = "${aws_vpc.main.id}"
  cidr_block = "${local.asg_a}"
  availability_zone = "${local.region}a"

  tags {
    Name = "asg_a"
  }
}

output "asg_a_subnet_id" {
  value = "${aws_subnet.asg_a.id}"
}

resource "aws_route_table_association" "asg_a" {
  subnet_id = "${aws_subnet.asg_a.id}"
  route_table_id = "${aws_route_table.internet.id}"
}

I don’t like loops in TF as I think that declarative language should avoid imperative things so when I need to define <= 3 resources related to AZs I’m just using copy&paste approach.

Next we need SG for ASG, egress in such form is required to access AWS S3, ingress is required to allow incoming traffic from ALB.


 resource "aws_security_group" "asg" {
  vpc_id = "${aws_vpc.main.id}"
  name = "asg"
  ingress {
    from_port = 8000
    to_port = 8000
    protocol = "tcp"
    cidr_blocks = ["${aws_subnet.alb_a.cidr_block}","${aws_subnet.alb_b.cidr_block}","${aws_subnet.alb_c.cidr_block}"]
  }
  egress {
    from_port = 443
    to_port = 443
    protocol = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

And SG for ALB, egress is required to access our application in ASG, ingress is required to allow clients to connect to our application. We are allowing here HTTP port.

resource "aws_security_group" "alb" {
  vpc_id = "${aws_vpc.main.id}"
  name = "alb"
  egress {
    from_port = 8000
    to_port = 8000
    protocol = "tcp"
    cidr_blocks = ["${aws_subnet.asg_a.cidr_block}","${aws_subnet.asg_b.cidr_block}","${aws_subnet.asg_c.cidr_block}"]
  }
  ingress {
    from_port = 80
    to_port = 80
    protocol = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

Deployment

We have AMI with application in place and created network so now it’s time to run infrastructure and expose application in Internet. To achieve that we need:

  • S3 buckets for photos
  • IAM role with policy which can be attached to EC2 instance allowing access to S3 bucket with photos
  • ASG definition with spread across AZs
  • ALB exposing UI and API

S3 bucket definition is quite obvious and very similar to that bucket with state so let’s take a look on IAM role definition:

resource "aws_iam_role_policy" "iam_role_policy" {
  name = "web_iam_role_policy"
  role = "${aws_iam_role.iam_role.id}"
  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": [":${aws_s3_bucket.photos.arn}"]
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject"
      ],
      "Resource": ["${aws_s3_bucket.photos.arn}/*"]
    }
  ]
}
EOF
}

We need to allow our application to:

  • list bucket to get existing photos
  • put new object in bucket to upload new photo
  • get object from bucket to serve uploaded photo

Please note asterisk in second statement – that’s quite important and it’s a little tricky as giving put/get to just bucket without asterisk will cause deny error coming from AWS.

To start EC2 instances in ASG we need launch configuration which defines attributes related to particular EC2 instance so it’s comparable to aws_instance resource with a few additional parameters. Also we need to effectively find built AMI as we don’t want to manually read created image id and hardcode it in configuration.

data "aws_ami" "goapp" {
  most_recent = true

  filter {
    name = "name"
    values = [
      "go-app-*"]
  }

  filter {
    name = "virtualization-type"
    values = [
      "hvm"]
  }

  owners = [
    "self"]
}

resource "aws_launch_configuration" "goapp" {

  lifecycle {
    create_before_destroy = true
  }
  name_prefix = "goapp-"
  iam_instance_profile = "${aws_iam_instance_profile.iam_instance_profile.id}"
  image_id = "${data.aws_ami.goapp.id}"
  instance_type = "t2.micro"
  associate_public_ip_address = true
  security_groups = ["${data.terraform_remote_state.network_state.asg_sg_id}"]

  root_block_device = {
    volume_type = "gp2"
    volume_size = 8
    delete_on_termination = true
  }
  user_data = &lt;<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>&lt;EOF1
#!/bin/bash
cd /opt/
./goapp ${aws_s3_bucket.photos.bucket}
EOF1

Comments to lines:

1 – data block find latest AMI with proper name prefix which we can refer in other blocks

23 – as terraform documentation says: "In order to update a Launch Configuration, Terraform will destroy the existing resource and create a replacement. In order to effectively use a Launch Configuration resource with an AutoScaling Group resource, it's recommended to specify create_before_destroy in a lifecycle block."

40 – just start application in user data – in real application we should use some other mechanism like starting application as systemd service or docker container as in current design after machine reboot our app won't go up.

Line 30 need to be described deeper as we are referring there networking module by using trick with remote state.

data "terraform_remote_state" "network_state" {
  backend = "s3"
  config {
    bucket = "jakubbujny-article-form-your-cloud-1"
    region = "eu-central-1"
    key = "network"
  }
}

So in Terraform you can say that you want to use output variables from some other state by specifying state bucket name and state file name what allows to divide complex environments into smaller parts.

Next step is to define ASG

 resource "aws_placement_group" "placement" {
  name = "goapp"
  strategy = "spread"
}
resource "aws_autoscaling_group" "goapp" {

  name = "goapp"
  max_size = 3
  min_size = 3
  placement_group = "${aws_placement_group.placement.name}"
  desired_capacity = 3
  launch_configuration = "${aws_launch_configuration.goapp.name}"
  vpc_zone_identifier = [
    "${data.terraform_remote_state.network_state.asg_a_subnet_id}",
    "${data.terraform_remote_state.network_state.asg_b_subnet_id}",
    "${data.terraform_remote_state.network_state.asg_c_subnet_id}"]

  tag {
    key = "Name"
    value = "goapp"
    propagate_at_launch = true
  }
}

Comments to lines:

1 – placement group allow us to say if we want to place ASG instances in cluster so they will be deployed "near" to allow fast data exchange or to use "spread" what means that ASG will be spread across AZs

Finally ALB can be attached to ASG to expose application.

resource "aws_alb" "goapp_alb" {
  name = "goapp-alb"
  security_groups = [
    "${data.terraform_remote_state.network_state.alb_sg_id}"]
  internal = false
  subnets = [
    "${data.terraform_remote_state.network_state.alb_a_subnet_id}",
    "${data.terraform_remote_state.network_state.alb_b_subnet_id}",
    "${data.terraform_remote_state.network_state.alb_c_subnet_id}"]

}

resource "aws_alb_target_group" "alb_target_group" {
  name     = "goapp-alb"
  port     = "8000"
  protocol = "HTTP"
  vpc_id   = "${data.terraform_remote_state.network_state.vpc_id}"
  tags {
    name = "goapp"
  }

  health_check {
    healthy_threshold   = 3
    unhealthy_threshold = 10
    timeout             = 5
    interval            = 10
    path                = "/"
    port                = "8000"
  }
}

resource "aws_alb_listener" "alb_listener" {
  load_balancer_arn = "${aws_alb.goapp_alb.arn}"
  port              = "80"
  protocol          = "HTTP"

  default_action {
    target_group_arn = "${aws_alb_target_group.alb_target_group.arn}"
    type             = "forward"
  }
}

resource "aws_autoscaling_attachment" "attachement" {
  alb_target_group_arn   = "${aws_alb_target_group.alb_target_group.arn}"
  autoscaling_group_name = "${aws_autoscaling_group.goapp.id}"
}

First block is quite obvious – create ALB and place in proper network subnets. Second block define target group – where we want to attach our ALB so we need to specify application protocol/port and simple healtcheck. 3rd block define listener so the part of ALB which see client coming from internet – default action is to forward requests to target group. Last bock is obvious ALB attachment.

So finally we have all part of our application and we can see the effect but…

Oh no! SSL!

Using that configuration our application is exposed on plain text HTTP what sadly is real problem because Chrome won’t allow you to give permission to particular site to use your camera if that site is not secured using SSL encryption – we need some fast workaround!

So I’m going to do it in little hacky way – please do not use that in any real application.

Simple workaround will be to use Certbot to issue SSL certificate for our application and then attach that certificate to ALB and expose HTTPS/443 connection. It’s not so easy as AWS domain are on blacklist in Certbot probably to avoid situation when hackers want to issue certificates to make some bad things like MITM attacks.

I’m going to make that using manual procedure of issuing certificate what means that Certbot generate for you filename and file content which need to be exposed on web server on domain to which you want certificate. So we need:

  • Create another S3 bucket for that Certbot file
  • Modify policy
  • Modify ALB SG to allow 443
  • Add endpoint in application to expose that file on demand
  • Add some domain to our application – the easiest way is to make CNAME in some existing domain
  • Issue certificate
  • Create HTTPS forward in ALB using that cert

Looks like a lot of work? Actually it’s really easy. Creating S3 bucket is quite obvious – we need to remember to add new S3 bucket ARN to policy to allow EC2 instances to access that bucket. We need to add 2nd parameter in application to pass bucket name and following lines of code:

func GetCertbotAuth(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	w.WriteHeader(http.StatusOK)
	svc := SpawnAwsSdkS3Service()
	resp, err := svc.GetObject(&s3.GetObjectInput{Bucket: aws.String(bucket_auth), Key: aws.String(vars["auth"])})
	if err != nil {
		panic(err)
	}
	io.Copy(w, resp.Body)
}

For CNAME record I just used domain on which I have blog. As that domain is parked on wordpress DNS I just added that in panel:

Screenshot from 2018-07-27 19-13-33

So that CNAME refer to ALB domain. After that operation I can issue certificate on article.jakubbujny.com domain by running certbot command, manually upload required file to s3 bucket and confirm in Certbot that it’s ready for verification – Certbot will connect on port 80 to search for specified file with specified content. After that I have:

Screenshot from 2018-07-27 19-12-42

So far so good. In normal application you would use Route53 domain, regular SSL certificate or some issued by Amazon for you and define everything in Terraform. But we are hacking here a little so I’m going to add HTTPS endpoint in ALB using console as it allows to define forward and paste certificate content in one form which will be uploaded to ACM:

Screenshot from 2018-07-27 19-15-51

 

So now finally I can access https://article.jakubbujny.com and upload my face to S3 – easy-peasy! 😀

Stream microservices logs from Kubernetes to Kafka

Introduction

Application logs are very important in case of problem investigation and application health monitoring. Nowadays you want to monitor your logs and react on problems before your client will know that something is wrong. Also when your system loose stability you need to react immediately and keep your logs available and up-to-date as they are your ‘sight’ in application – let’s say that logs are first eye and metrics are second eye. In microservices world logs aggregation is even harder because you have X instances of particular microservice, every instance is printing huge number of logs to inform you what’s happening in application and those microservices are typically distributed to many different ‘nodes’ – how to deal with that?

This article describe how to use Fluentd on Kubernetes acting as Kafka producer to stream logs and how to use Fluentd on virtual machine acting as Kafka consumer to push logs to Elasticsearch. Also I want to show possibilities which give you application logs stream on Kafka by making dummy python implementation sending notifications on Slack to inform developers/devops about detected exceptions.

Why Kafka?

Streaming logs via Kafka gives you following advantages:

  1. As Kafka is extremely efficient it’s safe ‘buffer’ for your logs when your logs storage cannot consume huge amount of logs on load spikes
  2. Logs on Kafka are ready to integrate – you can attach many consumers and place logs in different storage engines or attach directly some analysis e.g. Apache Spark
  3. If you treat Kafka as integration bus for your system you can avoid ‘infrastructure spaghetti’ by making integration of different infrastructure modules using single point.

Proposed design

fluend-kafka-kubernetes

Fluentd on Kubernetes

Fluentd is deployed on Kubernetes using DaemonSet deployment. Please see deployment descriptor below:

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  name: fluent
  namespace: fluent
  labels:
    k8s-app: fluent
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  template:
    metadata:
      labels:
        k8s-app: fluent
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.1.3-debian-elasticsearch
        command: ["/bin/sh"]
        args: ["-c", "gem install fluent-plugin-kafka && cp /fluent-config/fluent.conf /fluentd/etc/ && /fluentd/entrypoint.sh"]
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-config
          mountPath: /fluent-config
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-config
        configMap:
          name: fluent
          items:
          - key: fluent_conf
            path: fluent.conf

Please keep eye on important sections in that file:

  • Toleration means that we want to deploy fluent also on Kubernetes masters
  • Command override install kafka plugin and copy fluent.conf which we are attaching as config map

Here you can find config map:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent
  namespace: fluent
data:
  fluent_conf: |
    @include kubernetes.conf

    <match **>
      @type kafka_buffered

      brokers             your-kafka-broker-host:9092

      topic_key             your-logs-topic
      default_topic         your-logs-topic
      output_data_type      json
      output_include_tag    true
      output_include_time   true
      get_kafka_client_log  false

      required_acks         1
      compression_codec     gzip
    </match>
     

In that config we simply say to fluent: please produce all collected logs as messages on Kafka. Also we are activating gzip compression for optimization. We need at least one ack – you should set your-logs-topic replication factor to at least 2 to keep your logs when one broker is failing. 

Kafka consumer – fluentd and Elasticsearch

After streaming logs to Kafka you can easily consume them using another fluent instance – my advise here is to place that consumer on the same machine where you have Elasticsearch or if you are using Elasticsearch SaaS like AWS ES, at least in the same subnet. Here you can find config:

<source>
 @type kafka_group
 brokers your-kafka-broker-host:9092
 consumer_group fluentd
 topics your-logs-topic
 format json
 use_record_time true
</source>


<match **>
 @type elasticsearch
 @id out_es
 log_level info
 include_tag_key true
 host your-elasticsearch-host
 port 443
 scheme https
 logstash_format true
 logstash_prefix kubernetes
 <buffer>
   flush_thread_count 8
   flush_interval 5s
   chunk_limit_size 6M
   queue_limit_length 256M
   retry_max_interval 30
   retry_forever true
 </buffer>
</match>
 

Please keep eye on important sections in that file:

  • We are consuming logs as fluentd consumer group – if your-logs-topic has more than one partition you can consume logs using multiple instances of consumers
  • We are simply saving logs in ‘kubernetes’ index in Elasticsearch

Kafka consumer – search & notify

The latest step is to attach consumer for log analysis. For that I’m going to modify example from confluent-kafka-python library using Python 3.6:

from confluent_kafka import Consumer, KafkaError
import json
import requests

c = Consumer({
    'bootstrap.servers': 'your-kafka-broker-host',
    'group.id': 'logs-inspector',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
    }
})

c.subscribe(['your-logs-topic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    message_dict = json.loads(msg.value().decode('utf-8'))
    if "log" in message_dict.keys():
      if "exception" in message_dict["log"].lower():
        notification = f"Found problem in log\nContainer: {message_dict['kubernetes']['container_name']}\nNamespace: {message_dict['kubernetes']['namespace_name']} \nPod: {message_dict['kubernetes']['pod_name']} \nHost: {message_dict['kubernetes']['host']}"
        requests.post(
          "https://your-slack-webhook-url.com",
          data=json.dumps({"attachments": [{
            'text': notification,
            'color': "#00FF00"
            }]}),
          headers={'Content-Type': 'application/json'})
c.close()

That script is very simple. Consume log message from topic, check if log key exists and it contains “exception” message – if yes, send notification via slack webhook to your team members. Please note additional information which Kubernetes gives you like node, namespace and pod name.

Useful Docker #1 – Seed container at runtime

Docker is one of my favorite tool in IT – it gives me really fast access to already configured&installed tools. Using that feature I can cover many cases using simple docker container and some magic around. In this article series I’m going to show you how to use docker in creative way and how to debug problems inside docker containers.

Today something about CMD, magic in runtime and docker commit.

I have assumption that you have basic knowledge about docker and some experience in using that great tool – if not I suggest to check that site: https://docs.docker.com/engine/docker-overview/

My environment in this article:

  • Docker version 17.12.0-ce, build c97c6d6
  • Ubuntu 17.10
  • zsh 5.2 (x86_64-ubuntu-linux-gnu)
  • tmux 2.5

Everything should also work in docker on windows (this one using hyper-v) – if there is problem somewhere please contact me on my linkedin

g-gif-update

Seed your container in runtime

Dockerhub provides many images (as mentioned here in 2016 was 400k) with useful defaults but these defaults rarely meet our requirements. To deal with that situation we can take many strategies but one of my favorite is overriding default docker command with some magic.

Let’s say that we want to host jakubbujny.com blog in docker container and switch my site logo to

57761

just for fun!

Required steps:

  • Start container
  • Download jakubbujny.com site content recursive
  • Start web hosting
  • Change image

We need to start with some static web content hosting like https://hub.docker.com/_/nginx/

We have command from this site (with port publish):

docker run -p 8080:80 --name some-nginx -v /some/content:/usr/share/nginx/html:ro -d nginx

Ok but after staring this container we can see only welcome site from nginx – how to inject content there? There are 2 ways:

  • Add content to docker image – useful if we want to use that image many times
  • Override run command and download content before web server start – our approach because we are fast&furious

So what’s default command in nginx? To check that we should go to origin Dockerfile like this and at end we see:

CMD ["nginx", "-g", "daemon off;"]

Cool, let’s try this way:

docker run -it -p 8080:80 nginx bash -c "cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate https://jakubbujny.com ; nginx -g 'daemon off;'"

What’s happening there:

  • docker – 😉
  • run – start new container
  • -it – interactive (get stdout, pass stdin)
  • -p 8080:80 – publish internal nginx 80 port onto our machine 8080 so we can access nginx using localhost:8080
  • bash -c ” ” – that’s little magic to pass some long command with &&, ; – bash man says:
    -c string If the -c option is present,  then  commands  are  read  from
                 string.   If  there  are arguments after the string, they are
                 assigned to the positional parameters, starting with $0.
  • cd… – sure
  • wget – we need to download recursive my site
  • nginx -g – start nginx (use default command from Dockerfile)

Ok so let’s run it… woooops!!! Something is not working ;(

bash: wget: command not found

Please remember that every docker container is isolated virtual OS  (so actually it’s not but you can think about docker in this way 😉 ) – it means that every docker image has different tools installed. Sadly nginx image doesn’t contain wget tool but okey dokey – we can install wget using apt-get inside container, no worries:

docker run -it -p 8080:80 nginx bash -c "apt-get update && apt-get install --no-install-recommends --no-install-suggests -y wget && cd /usr/share/nginx/html && wget --recursive --no-parent --no-check-certificate https://jakubbujny.com ; nginx -g 'daemon off;'"

What’s happening there:

  • sudo not needed because inside container we are root
  • apt-get update – very important! Please remember to run that command before any apt-get install inside containers to get valid result
  • apt-get install – here -y is important because that command will run in non-interactive mode what means that we need auto-confirm that we are sure to install following package

So after running this command everything seems fine – wget is downloading my site and nginx is starting but at localhost:8080 I see:

Screenshot from 2018-02-18 12-14-04

Damn that’s not cool….

Debug

To debug that situation we need to get into container and see whats happening in file system after wget command. To do that we need to start with:

docker ps

And output:

Screenshot from 2018-02-18 12-16-45

The most important there is Container Id – we’re going to use that ID to get into container, open new console and try (change container id to yours because it’s random string):

docker exec -it 69cb502ab0d1 bash

After that command we are in new shell process inside container in interactive mode. Let’s check what’s under /usr/share/nginx/html:

drwxr-xr-x 1 root root 4.0K Feb 18 11:20 . 
drwxr-xr-x 1 root root 4.0K Dec 26 18:16 .. 
-rw-r--r-- 1 root root 537 Dec 26 11:11 50x.html 
-rw-r--r-- 1 root root 612 Dec 26 11:11 index.html 
drwxr-xr-x 7 root root 4.0K Feb 18 11:20 jakubbujny.com

Ok so wget downloaded my site into jakubbujny.com directory – try in web browser:

http://localhost:8080/jakubbujny.com/

You should see my blog 🙂

Change image

Stay please in our shell process inside container. Changing site logo is so easy, just type: vim /usr/share/nginx/html/jakubbujny.com/index.html omg there is no vim

tenor

<disclaimer> Many popular images don’t have basic tools like vim or even bash because every additional tool not needed at runtime means bigger image size to download for users </disclaimer>

apt-get install vim

I believe that you are VIM WIZARD LVL 300 but if not just type:

ESC /site-logo ENTER

To find my logo section and change next <img> tag src to (should help: ESC a ) : https://jakubbujny.files.wordpress.com/2018/02/57761.png

Then:

ESC :wq ENTER

Go to web browser and hit F5:

Screenshot from 2018-02-18 12-36-26.png

That was so easy, was it?

Commit container into image

Last step is to convert container into image.

PLEASE DO NOT USE THAT IN PRODUCTION – FOLLOW INFRASTRUCTURE AS CODE PATTERN

It means that we can say to docker: hey! I have here some container where I installed some tools and modified some state, please convert that container into image so I can reuse that manual configuration many times

Command:

docker commit 69cb502ab0d1 my-awesome-image:1.0

So again we are using Container ID and docker is creating for us image with name my-awesome-image:1.0 – let’s check if that works:

docker run -it -p 8081:80 my-awesome-image:1.0 bash -c "nginx -g 'daemon off;'"

What’s happening:

  • Run container from committed image
  • Override command again – docker set my-awesome-image’s cmd to that one with wget but we don’t need that here! We committed whole container so jakubbujny.com site is already in image – if we don’t need latest version we can override again cmd to start only web server

Go under:

http://localhost:8081/jakubbujny.com/

And you should see my blog and docker logo instead mine.

Conclusion

As you can see docker is quite powerful – when using with care you can really do magic cases. I hope that was helpful for you – I decided to divide docker article into series because I have many more examples how to use that tool in creative way. See you in next article!