Apache Kafka Container & Spring Boot Kommunikation

Die Kommunikation zwischen Microservices ist eines der wichtigsten Aspekte einer Microservice-Architektur. In diesem Tutorial geht es darum wie man mit Apache Kafka Nachrichten von einem Spring Boot Producer zu einem Spring Boot Consumer sendet.
Spring Boot Apache Kafka

Microservicekommunikation mit Apache Kafka & Spring Boot

Einleitung

In diesem Tutorial möchte ich euch gerne zeigen wie Ihr mit Spring Boot bzw Spring Cloud Nachrichten von einem Microserivce zum anderen mit Apache Kafka versenden könnt. Die Kommunikation zwischen den einzelnen Microservices ist ein wesentlicher Bestandteil einer Microservices architektur.

Wir werden einen Producer und einen Consumer Microservice erstellen um das versenden der Nachrichten zu demonstrieren. Der Producer Mircoservice wird so aufgebaut sein, dass er über eine POST Rest Schnittstelle einen String entgegegen nimmt und diesen dann an Apache Kafka sendet.

Der Consumer Microservice wird dann von Apache Kafka benachrichtigt, dass es eine neue Nachricht gibt. Diesen werden wir dann in der Console ausgeben.

Wichtig bei diesem Scenario ist es, dass Producer und Consumer sich nicht kennen. Beide haben keine Ahnung von der Existenz des anderen. Dieses Prinzip nennt man auch Lose-Kopplung. Durch die Lose-Kopplung erreicht man, dass Softwarearchitekturen bzw Komponenten von Softwarearchitekturen unabhängiger werden. Das auf Dauer gesehen dazu führt, dass die Software wartbarer wird.

Als erstes benötigen wir eine Spring Boot Applikation mit der wir über HTTP Post Werte entgegeben nehmen können. Dieses Tutorial basiert auf meinem vorherigen Tutorial, wo ich schon mal beschrieben habe wie man einen RestController mit Spring Boot erstellt.

Apache Kafka starten

Um mit Messagebrocker Apache Kafka Nachrichten von einem Microservice zum anderen zu versenden benötigen wir eine existierende Kafka Umgebung. Solltet ihr keinen eigenen Apache Kafka Server zur Verfügung haben, empfehle ich euch mein Tutorial Apache Kafka mit Docker-Compose.

Nachdem der Messagebrocker Apache Kafka gestartet ist können wir uns mit der Erstellung des Spring Boot Microservices befassen.

Producer erstellen

Um Messages versenden zu können brauchen wir einen Microservice, der in der Lage ist Messages zu versenden. Daher werden wir das Spring Boot Projekt aus dem Tutorial RestController mit Spring Boot um die Messaging Funktionen von Spring Cloud Stream erweitern.

Apache Maven pom.xml erwetiern

Spring Cloud Stream nutzen zu können müssen wir erst einmal unsere pom.xml wie folgt erweitern.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>de.ertantoker.tutorial</groupId>
  <artifactId>spring-boot-consumer</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>spring-boot-consumer2</name>
  <url>http://maven.apache.org</url>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.12.RELEASE</version>
  </parent>

  <properties>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Edgware.SR3</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

Wie man an der pom.xml erkennen kann haben wir diese um ein Eintrag dependencyManagement erweitert. Dieser ist notwendig um die Spring Boot Libraries laden zu können. Als weiteren schritt wurde die „spring-cloud-starter-stream-kafka“ Library eingebunden als dependency. Hierdurch ist jetzt unser Projekt in der Lage die Klassen für Spring Cloud Stream zu nutzen.

Spring application.yml Datei erstellen

Nachdem wir die pom.xml Datei erweitert haben müssen wir die zentrale Konfigurationsdatei applicationyml von Spring Boot erweitern bzw neu anlegen, falls diese nicht vorhanden ist. In Maven Projekten wird die application.xml Datei in Verzeichnis „src/main/resources“ abgelegt.

Die application.xml sieht dann wie folgt aus:

server:
  port: 8080

spring:
  application:
    name: producer-service
  cloud:
    stream:
      bindings:
        producerOutput:
          destination: TopicProducerConsumerNew
          content-type: application/json
      kafka:
        binder:
          autoCreateTopics: true
          brokers: localhost

Wichtig hier ist, dass wir die Konfigutration spring.cloud.stream einbinden. Hier geben wir an wie der Name des Channels ist mit dem unsere Java Applikation später mit Spring Cloud Stream kommunizieren wird. In diesem Fall heißt sie „producerOutput“. Diesen Eintrag werden wir später auch im Java Code finden. Als nächstes geben wir in der „destination“ den Namen des Topics an, der im Kafka verwendet werden soll. Der Parameter content-type gibt an, dass die Payload (Die Objekte die in der Nachricht versendet werden) als JSON versendet werden soll. Das hat den Vorteil das Consumer die JSON bearbeiten können eben falls auf Kafka Topics reagieren können und die Nachrichten auswerten können.

Weiter unten haben wir die Kafka Konfiguration. Der Parameter binder definiert die Verbindung zum Kafka. Da wir Spring Cloud Stream nutzen hätten wir hier auch einen anderen Binder verwenden können. Es gibt Binder für unterschiedliche Systeme. Zum Beispiel RabbitMQ!

Der Parameter „autoCreateTopics: true“ gibt an, dass wenn ein Topic im Kafka nicht existiert, dass dieser dann automatisch angelegt wird. Das hat den Vorteil, dass man nicht noch separat den Kafka administrieren muss. Als brokers geben wir hier localhost an, da wir unseren lokalen Kafka nutzen werden. Wie man einen lokalen Kafka einrichtet habe ich in meinem Tutorial Apache Kafka mit Docker-Compose erstellen beschreiben.

Spring Cloud Stream einbinden & aktivieren

Nachdem die Konfiguration abgeschlossen ist müssen wir unserer Java Applikation diese Konfiguration mitteilen.

Als erstes benötigen wir einen MessageChannel mit dem wir Nachrichten von unserer Applikation an Kafka versenden können. Hierfür bietet Spring Boot und Spring Cloud Source die Möglichkeit Java Interfaces zu definieren die dann in Spring Boot Component, Service und Controllern injected werden können.

package de.ertantoker.tutorial;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface ProducerSource {

    @Output("producerOutput")
    MessageChannel producerOutput();
}

Der nächste Schritt ist dann, das Binding von Spring Cloud Stream zu aktivieren. Spring bietet hier den Vorteil, dass man diverse Spring Projekte wie Spring Cloud Stream über eine Annotation aktivieren kann. Das Ganze nutz die Autoconfiguration von Spring und ist somit sehr einfach für den Entwickler zu integrieren.

Durch die Annotation @EnableBinding({ProducerSource.class}) wird die Spring Cloud Stream Binding aktiviert. Der Annotation wird als Parameter die ProducerSource Klasse übergeben die wir im Vorfeld angelegt haben. Somit erkennt Spring Cloud Stream, dass dieser Message Channel in die Konfiguration eingebunden werden muss.

package de.ertantoker.tutorial;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({ProducerSource.class})
public class ProducerApp
{
    public static void main( String[] args )
    {
        SpringApplication.run(ProducerApp.class, args);
    }
}

Nachdem wir das Source Interface definiert und und das Binding eingeschaltet haben müssen wir im Producer nur noch das versenden der Nachricht einbauen. Hierfür müssen wir das Source Interface, welches wir schon erstellt haben in unserem Controller injecten. Natürlich muss es nicht unbedingt in Controller stattfinden. Es kann auch jede andere Bean in Spring Boot sein die man injecten kann.

Im folgenden sieht ihr den Aufruf in der Controller Klasse.

package de.ertantoker.tutorial;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/producer")
public class ProducerController {

    private ProducerSource producerSource;

    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    public ProducerController(ProducerSource producerSource) {
        this.producerSource = producerSource;
    }


    @PostMapping("/{id}")
    public ResponseEntity produce(@PathVariable String id) {
        producerSource.producerOutput().send(MessageBuilder.withPayload("ID: " + id).build());
        return new ResponseEntity<>(HttpStatus.CREATED);
    }
}

Somit wäre der Producer fertig. Als nächstes werden wir den Consumer implementieren.

Consumer erstellen

Wie bei dem Producer muss wir auch dem Consumer Microservice mit Libarires für Speing Cloud Stream in der pom.xml bekannt machen. Erst dann stehen uns die entsprechenden Klassen in Java zur Verfügung.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>de.ertantoker.tutorial</groupId>
  <artifactId>spring-boot-consumer</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>spring-boot-consumer</name>
  <url>http://maven.apache.org</url>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.4.RELEASE</version>
  </parent>

  <properties>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Finchley.SR1</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

Nachdem die pom.xml Datei erweitert ist müssen wir auch hier die Konfugiration in der application.xml vornehmen. Diese sieht fast identisch aus mit der Konfiguration des Producers. In unserem Fall mit dem Unterscheid, dass der server.port auf 0 gesetzt ist. Dies bedeutet, dass die Spring Boot Anwendung beim start ein zufälliger Port zugewiesen wird. Da unser Consumer keine Rest Schnittstelle hat ist der Port uns erst mal egal. Ein weiterer Unterscheid ist das die Binding nicht mit Output endet sondern mit Input. Das ist keine Pflicht hat sich aber bei Projekten bewährt um auf die schnelle zu erfahren ob ein Microservice in eine Topic schreibt oder von dort liest.

Im folgenden findet ihr den kompletten YAML Code.

server:
  port: 0
spring:
  application:
    name: consumer-service
  cloud:
    stream:
      bindings:
        consumerInput:
          destination: TopicProducerConsumerNew
          content-type: application/json
      kafka:
        binder:
          autoCreateTopics: true
          brokers: localhost

Hier sehen wir auch, dass als detination TopicProducerConsumerNew verwendet wird. Somit hätte nun auch unser Consumer die Verbindung zum Kafka realisiert.

Als nächstes werden wir das Sink Interface erstellen. Das Sink Interface ist wie das Source Interface des Producers ein Java Interface welches die Verbindung zwischen der Java Applikation und dem Apache Kafka realisiert.

Im folgenden sieht ihr das Sink Interface

package de.ertantoker.tutorial;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerSink {
    @Input("consumerInput")
    SubscribableChannel input();
}

Im Sink Interface sehen wir die Annotation @Input(„consumerInput“) dies ist die einzige Konfiguration die wir in Java machen müssen, damit unser Consumer Kafka nutzen kann. Der Wert der Annotation gibt an welches Bindung genutzt werden soll. In diesem Fall ist es consumerInput, welches wir vorher in der application.yml konfiguriert haben.

Als nächsten Schritt müssen wir das Binding in der Consumer Applikation einschalten. Dies geschieht genau so wie beim Producer. Auch hier reicht es aus wenn das @EnableBinding in der App Klasse eintragen.

package de.ertantoker.tutorial;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({ConsumerSink.class})
public class ConsumerApp
{
    public static void main( String[] args )
    {
        SpringApplication.run(ConsumerApp.class, args);
    }
}

Nun ist auch das Binding im Consumer aktiviert. Jetzt fehlt nur noch der Code Ausschnitt wo der Consumer auf die eingehenden nachrichten reagiert. Der Consumer muss in der Lage sein die Nachrichten die von Kafka aus an dem Consumer gesendet werden zu bearbeiten. Hierfür reicht es aus, dass man einen StreamListener mit dem Binding erstellt, der auf das Binding „consumerInput“ reagiert.

Folgender Code zeigt wie die Serfice Klasse um einem ServieLisener erweitert wurde.

package de.ertantoker.tutorial;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerSink {
    @Input("consumerInput")
    SubscribableChannel input();
}

Im Sink Interface sehen wir die Annotation @Input(„consumerInput“) dies ist die einzige Konfiguration die wir in Java machen müssen, damit unser Consumer Kafka nutzen kann. Der Wert der Annotation gibt an welches Bindung genutzt werden soll. In diesem Fall ist es consumerInput, welches wir vorher in der application.yml konfiguriert haben.

Als nächsten Schritt müssen wir das Binding in der Consumer Applikation einschalten. Dies geschieht genau so wie beim Producer. Auch hier reicht es aus wenn das @EnableBinding in der App Klasse eintragen.

package de.ertantoker.tutorial;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerService {

    @StreamListener("consumerInput")
    public void handleConsumerInput(String input) {
        System.out.println("Receive Message: " + input);
    }
}

Unser Consumer ist nun fertig und kann auf Nachrichten die von Kafka aus kommen reagieren. Dies ist zwar ein einfaches Beispiel aber es demontiert wir Nachtichten in Spring Boot Anwendungen versendet und empfangen werden. Man hätte auch ein POJO versenden können.

Producer & Consumer zusammenarbeit

Nachdem wir nun beide Microservices gestartet haben senden wir einen Rest POST Aufruf auf die URL http://localhost:8080/producer/TestMessage. Wir erwarten nun, dass der Producer die Wert „TestMessage“ über Kafka an den Consumer sendet.

Folgende Consolenausgabe sieht ihr wenn ihr den Producer gestartet habt.

Der Consumer hat eine sehr ähnliche Consolenausgabe. Hier werdet ihr aber sehen, dass der Port nicht auf 8080 gesetzt ist sondern zufällig gewählt ist. Das erreichen wir wenn wir in der application.yml den Wert von server.port auf 0 setzen.

Folgende Ausgabe erscheint wenn ihr den Consumer startet.

Wenn wir jetzt über Postman oder einen anderen HTTP Client einen Rest Post auf ruf auf die URL http://localhost:8080/producer/TestMessage ausführen sehen wir, dass beim Consumer der Wert „TestMessage“ in der Console ausgegeben wird.

Problem bei mehreren Consumern – Die Lösung ConsumerGroups

Wir haben nun gesehen wie die Kommunikation von zwei Microservices untereinander funktionieren kann. Es kann aber bei dieser Art der Konfiguration zu Problemen kommen die den Betrieb extrem stören können. Ein Vorteil von Microservices ist es, dass man in Kubernetes Umgebungen zum Beispiel einen weitern Pod (Instanz)  als Consumer starten kann.

Wenn es nun einen Producer gibt und zwei Consumer, die auf die selbe nachricht reagieren, würden beide Consumer das gleiche tun. Stellt man sich nun vor, dass man einen Email-Service oder Notifikation-Service hat, der Emails oder Push Nachrichten an Benutzer sendet, dann wird schnell klar das hier beide Consumer Services das gleiche tun.

Um dieses Problem zu lösen gibt es in der Spring Cloud Stream Konfiguration einen Paramter den wir setzten müssen. Hierbei handelt es sich um den Parameter „spring.cloud.stream.bindings.input.group“. Mit dieser Parameter werden die Consumer einer Topic in Gruppen zusammen gefasst. Hierdurch erreicht man, dass ein Consumer nur ein Event abarbeitet. Es ist somit ausgeschlossen, dass ein Event zweimal abgearbeitet wird. Natürlich gibt es auch Scenarien wo es gewünscht ist, dass zwei Consumer das gleiche Event abarbeiten. In diesem Fall sollte man diesen Parameter nicht nutzen.

Im Folgenden die application.yml nach dem der Parameter spring.cloud.stream.bindings.input.group gesetzt wurde.

server:
  port: 0
spring:
  application:
    name: consumer-service
  cloud:
    stream:
      bindings:
        consumerInput:
          destination: TopicProducerConsumerNew
          content-type: application/json
          group: ConstumerServiceGroup
      kafka:
        binder:
          autoCreateTopics: true
          brokers: localhost

Das Ergebnis

Wir haben nun gesehen wie Spring Boot Anwendungen mit der Hilfe von Apache Kafka und Spring Cloud in Microservice Umgebungen miteinander Kommunizieren können.  Durch die Lose Kopplung mit Apache Kafka bleiben die einzelnen Microservices unabhängig von einander. Wichtig bei dieser Art der Kommunikation ist immer zu beachten wie die Consumer die Nachrichten abarbeiten. Je nach Scenario muss man dann die Consumer in Gruppen zusammenfassen oder nicht.

Dies ist natürlich nur eine Möglichkeit wie Microservices miteinander kommunizieren könne. Es gibt auch die Möglichkeit, dass Micrsoservices HTTP Rest Aufrufe machen um nachrichten von einem Service zum anderen zu versenden. Auch hierfür bietet Spring Boot in Kombination mit Spring Cloud Feign eine Möglichkeit an.

Ich hoffe euch hat mein Tutorial gefallen. Solltet ihr Fragen, Kritik und Anregungen haben, dann würde ich mich über ein Feedback freuen!

[amazon_link asins=’3864905265,3864905559,3864905257,3864904838′ template=’ProductCarousel‘ store=’fitnes06-21′ marketplace=’DE‘ link_id=’c4af0025-b814-11e8-b425-f39b031dfaaa‘]

Verwandte Beiträge

Comments (1)

Super erklärt Ertan. Vielen Dank!

Leave a comment