本文介紹了以下內容:
1.什么是Kafka?
2.為什么我們需要使用Kafka這樣的消息系統及使用它的好處
3.如何將Kafka使用到我們的后端設計中。
譯自timber.io:《hello-world-in-kafka-using-python》,有部分刪改。
1.Kafka是什么、為什么我們需要它?
簡而言之,Kafka是一個分布式消息系統。這是什么意思呢?
想象一下,你現在有一個簡單的Web應用,其包含了網頁前端客戶端(Client)、服務端和數據庫:
你需要記錄所有發生在你的Web應用的事件,比如點擊、請求、搜索等,以便后續進行計算和運營分析。
假設每個事件都由單獨的APP完成,那么一個簡單的解決方案就是將數據存儲在數據庫中,所有APP連接到數據庫進行存儲:
這看起來簡單,但是其中還會出現許多問題:
1.點擊、請求、搜索等事件會產生大量的數據到數據庫中,這可能會導致插入事件存在延遲。
2.如果選擇將高頻數據存儲在SQL或MongoDB等數據庫中,很難再原有歷史數據的基礎上擴展數據庫。
3.如果你需要用這些數據進行數據分析,你可能無法直接對數據庫進行高頻率的讀取操作。
4.每個APP可以遵循自己的數據格式,這就意味著當你需要在不同的APP進行數據交換時,你需要進行數據格式的轉換。
通過使用像Kafka這樣的消息流系統,可以很好地解決這些問題,因為他們可以執行以下操作:
1.存儲的大量數據可以被持久化、校驗和復制,具備容錯能力。
2.支持跨系統實時處理連續的數據流。
3.允許APP獨立發布數據或數據流,并與使用它的APP無關。
那么它和傳統數據庫有何不同?
盡管Kafka可以持久化地存儲數據,但它不是數據庫。
Kafka不僅允許APP存儲或提取連續的數據流,還支持實時處理。這與對被動數據執行CRUD操作或對傳統數據庫執行查詢的方式不同。
聽起來不錯,那么Kafka是如何解決以上挑戰的?
Kafka是一個分布式平臺,是為規模而構建的,這意味著它可以處理高頻率的讀寫和存儲大量數據。它確保數據始終可靠。它還支持從故障中恢復的強大機制。
以下是為什么應該使用Kafka的一些關鍵因素:
1.1 簡化后端架構
在Kafka的幫助下,我們前面的結構會變得簡單一些:
1.2 通用數據管道
如上所示,Kafka充當多個APP和服務的通用數據管道,這給了我們兩個好處:
1.數據是集成的,我們將來自不同系統的數據都存在一個地方,這使得Kafka成為真正的數據源。任何APP都可以將數據推送到該平臺,然后由另一個APP提取數據。
2.Kafka使得應用程序之間交換數據變得容易。因為我們可以標準化數據格式,減少了數據格式的轉換。
1.3 通用連接性
盡管Kafka允許你使用標準數據格式,但并不意味著你的APP就不需要數據轉換了,它只是減少了我們轉換數據的頻率罷了。
此外,Kafka提供了一個叫 Kafka Connect 的框架允許我們維護遺留的老系統。
1.4 實時數據處理
類似于監控系統這樣的實時APP,往往需要連續的數據流,這些數據需要被立即處理或盡量減少延遲處理。
Kafka的流式處理,使得處理引擎可以在很短的時間內(幾毫米到幾分鐘)內取數、分析、以及響應。
2.Kafka入門
2.1 安裝
安裝Kafka是一個相當簡單的過程。只需遵循以下給定步驟:
2.使用以下命令解壓縮下載文件: tar -xzf kafka_2.11-1.1.0.tgz
3.cd到Kafka目錄開始使用它: cd kafka_2.11-1.1.0
2.2 啟動服務器
ZooKeeper是一個針對Kafka等分布式環境的集中管理工具,它為大型分布式系統提供配置服務、同步服務及命名注冊表。
因此,我們需要先啟動ZooKeeper服務器,然后再啟動Kafka服務器。使用以下命令即可:
# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Server
bin/kafka-server-start.sh config/server.properties
2.3 Kafka 基本概念
我們快速介紹一下Kafka體系結構的核心概念:
1.Kafka在一個或多個服務器上作為集群運行。
2.Kafka將數據流存儲在名為topics的類別中。每條數據均由鍵、值、時間戳組成。
3.Kafka使用發布-訂閱模式。它允許某些APP充當producers(生產者),記錄數據并將數據發布到Kafka topic中。
同樣,它允許某些APP充當consumer(消費者)和訂閱Kafka topic并處理由它產生的數據。
4.除了Prodcuer API 和 Consumer API,Kafka還為應用提供了一個 Streams API 作為流處理器。通過 Connector API 我們可以將Kafka連接到其他現有的應用程序和數據系統。
2.4 架構
如你所見,每個Kafka的 Topic 可以分為多個Partition(分區),可以使用broker(經紀人)在不同的計算機上復制這些 Topic,從而使消費者可以并行讀取 Topic.
kafka的復制是針對分區的:
比如上圖中有4個broker, 1個topic, 2個分區,復制因子是3。當producer發送一個消息的時候,它會選擇一個分區,比如topic1-part1
分區,將消息發送給這個分區的leader, broker2、broker3會拉取這個消息,一旦消息被拉取過來,slave會發送ack給master,這時候master才commit這個log。
因此,整個系統的容錯級別極高。當系統正常運行時,對Topic的所有讀取和寫入都將通過leader,且leader會保證所有其他broker均被更新。
如果Broker失效了,系統會自動重新配置,此時副本也可以接管成為Leader.
2.5 創建Kafka Topic
讓我們創建一個名為 sample,含有一個partition(分區)和一個replica(副本)的Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
列出所有的Kafka Topics,檢查是否成功創建了sample Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
describe topics 命令還可以獲得特定Topic的詳細信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sample
2.6 創建生產者與消費者
這里是本章的代碼實戰部分,利用Kafka-Python實現簡單的生產者和消費者。
1.首先需要安裝kafka-python:
pip install kafka-python
2.創建消費者(consumer.py)
from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
print (message)
3.創建生產者(producer.py)
有一個消費者正在訂閱我們的消息流,因此我們要創建一個生產者,發布消息到Kafka:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
現在,你重新運行消費者(consumer.py),你就會接收到生產者發送過來的消息。
-
存儲
+關注
關注
13文章
4329瀏覽量
85946 -
數據庫
+關注
關注
7文章
3826瀏覽量
64509 -
服務端
+關注
關注
0文章
66瀏覽量
7024 -
Web應用
+關注
關注
0文章
16瀏覽量
3509 -
kafka
+關注
關注
0文章
51瀏覽量
5226
發布評論請先 登錄
相關推薦
評論