Apache Kafka

Erste Schritte mit Apache Kafka und Python

Erste Schritte mit Apache Kafka und Python
In dieser Lektion sehen wir, wie wir Apache Kafka mit Python verwenden und eine Beispielanwendung mit dem Python-Client für Apache Kafka erstellen.

Um diese Lektion abzuschließen, müssen Sie auf Ihrem Computer eine aktive Installation von Kafka haben. Lesen Sie Apache Kafka unter Ubuntu installieren, um zu erfahren, wie das geht.

Python-Client für Apache Kafka installieren

Bevor wir mit Apache Kafka im Python-Programm arbeiten können, müssen wir den Python-Client für Apache Kafka installieren. Dies kann mit Pip (Python-Paket-Index). Hier ist ein Befehl, um dies zu erreichen:

pip3 installieren kafka-python

Dies wird eine schnelle Installation auf dem Terminal sein:

Installation des Python Kafka-Clients mit PIP

Nachdem wir nun eine aktive Installation für Apache Kafka und auch den Python Kafka-Client installiert haben, können wir mit der Codierung beginnen.

Einen Produzenten machen

Das erste, was Sie zum Veröffentlichen von Nachrichten auf Kafka benötigen, ist eine Producer-Anwendung, die Nachrichten an Themen in Kafka senden kann.

Beachten Sie, dass Kafka-Produzenten asynchrone Nachrichtenproduzenten sind. Dies bedeutet, dass die Operationen, die ausgeführt werden, während eine Nachricht auf der Kafka Topic-Partition veröffentlicht wird, nicht blockieren. Um die Dinge einfach zu halten, werden wir für diese Lektion einen einfachen JSON-Publisher schreiben.

Erstellen Sie zunächst eine Instanz für den Kafka Producer:

aus Kafka-Import KafkaProduzent
json importieren
pprint importieren
Produzent = KafkaProduzent(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.Deponien (v).codieren('utf-8'))

Das Attribut bootstrap_servers informiert über den Host & Port für den Kafka-Server. Das Attribut value_serializer dient nur der JSON-Serialisierung der gefundenen JSON-Werte.

Um mit dem Kafka Producer zu spielen, versuchen wir, die Metriken für den Producer und den Kafka-Cluster auszudrucken:

Metriken = Produzent.Metriken()
pprint.pprint(Metriken)

Wir werden jetzt folgendes sehen:

Kafka Mterics

Versuchen wir nun endlich, eine Nachricht an die Kafka-Warteschlange zu senden. Ein einfaches JSON-Objekt ist ein gutes Beispiel:

Produzent.send('linuxhint', 'topic': 'kafka')

Das linuxhint ist die Themenpartition, an die das JSON-Objekt gesendet wird. Wenn Sie das Skript ausführen, erhalten Sie keine Ausgabe, da die Nachricht nur an die Themenpartition gesendet wird. Es ist an der Zeit, einen Verbraucher zu schreiben, damit wir unsere Anwendung testen können.

Einen Verbraucher machen

Jetzt sind wir bereit, als Verbraucheranwendung eine neue Verbindung herzustellen und die Nachrichten aus dem Kafka-Thema zu erhalten. Beginnen Sie damit, eine neue Instanz für den Verbraucher zu erstellen:

von kafka import KafkaConsumer
von kafka importieren TopicPartition
print('Verbindung herstellen.')
Consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

Ordnen Sie nun dieser Verbindung ein Thema und auch einen möglichen Offset-Wert zu.

print('Thema zuweisen.')
Verbraucher.Assign([TopicPartition('linuxhint', 2)])

Endlich können wir die Nachricht drucken:

print('Nachricht erhalten.')
für Nachricht im Consumer:
print("OFFSET: " + str(Nachricht[0])+ "\t MSG: " + str(Nachricht))

Dadurch erhalten wir eine Liste aller veröffentlichten Nachrichten auf der Kafka Consumer Topic Partition. Die Ausgabe für dieses Programm ist:

Kafka Verbraucher

Hier ist das vollständige Producer-Skript zur schnellen Referenz:

aus Kafka-Import KafkaProduzent
json importieren
pprint importieren
Produzent = KafkaProduzent(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.Deponien (v).codieren('utf-8'))
Produzent.send('linuxhint', 'topic': 'kafka')
# Messwerte = Produzent.Metriken()
# pprint.pprint(Metriken)

Und hier ist das vollständige Consumer-Programm, das wir verwendet haben:

von kafka import KafkaConsumer
von kafka importieren TopicPartition
print('Verbindung herstellen.')
Consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
print('Thema zuweisen.')
Verbraucher.Assign([TopicPartition('linuxhint', 2)])
print('Nachricht erhalten.')
für Nachricht im Consumer:
print("OFFSET: " + str(Nachricht[0])+ "\t MSG: " + str(Nachricht))

Fazit

In dieser Lektion haben wir uns angesehen, wie wir Apache Kafka in unseren Python-Programmen installieren und verwenden können. Wir haben gezeigt, wie einfach es ist, einfache Aufgaben im Zusammenhang mit Kafka in Python mit dem demonstrierten Kafka-Client für Python auszuführen.

SuperTuxKart für Linux
SuperTuxKart ist ein großartiger Titel, der entwickelt wurde, um Ihnen das Mario Kart-Erlebnis kostenlos auf Ihrem Linux-System zu bieten. Es ist ziem...
Battle for Wesnoth-Tutorial
The Battle for Wesnoth ist eines der beliebtesten Open-Source-Strategiespiele, die Sie derzeit spielen können. Dieses Spiel befindet sich nicht nur se...
0 A.D. Lernprogramm
Von den vielen Strategiespielen da draußen, 0 A.D. schafft es, sich trotz Open Source als umfassender Titel und sehr tiefgehendes, taktisches Spiel ab...