Nhảy đến nội dung chính

Luồng Xử Lý Dịch Vụ API Tích Hợp Kafka (Kafka API Processing Flow)

1.1. Mục Tiêu

Tài liệu này mô tả luồng xử lý chuẩn của dịch vụ Loadbalance_kafka trên nền tảng WSO2 Micro Integrator (WSO2 MI), từ khi Client gửi request đến khi dữ liệu được xử lý và phản hồi về. Mục tiêu:

  • Thống nhất cách hiểu kiến trúc giữa các thành viên
  • Đảm bảo đúng luồng dữ liệu qua từng lớp Kafka
  • Dễ dàng trace lỗi và mở rộng sau này

1.2. Phạm Vi Áp Dụng

  • Dự án: Loadbalance_kafka (WSO2 MI v4.4.0)
  • Ngôn ngữ: Synapse XML (WSO2 Mediation)
  • Các component: API, Inbound Endpoint, Sequence, Local Entry
  • Kafka Connector: mi-inbound-kafka v2.0.6, mi-connector-kafka v3.3.10

1.3. Tổng Quan Luồng Xử Lý


Client (HTTP POST)
KafkaProducerApi (/kafka-producer)
kafkaTransport.init (KafkaConnection)
Kafka Topic: test_topic_01
Inbound Endpoint (KafkaMessageConsumer)
inboundSequence (Load_balance_example-inboundSequence)
Backend API (POST http://192.168.0.133:8080/api/v1/plannings)
payloadFactory (Đóng gói lại kết quả)
Kafka Topic: processed_topic

📸 [Ảnh minh họa] — Chụp màn hình sơ đồ luồng WSO2 MI Studio hoặc Kafka Manager


1.4. Các Bước Xử Lý Chi Tiết


Bước 1. Client Gửi Request

Client gọi API qua HTTP:

http
POST http://<wso2mi-host>:8290/kafka-producer
Content-Type: application/json
Authorization: Bearer <jwt_token>

{
"field1": "value1",
"field2": "value2"
}

Quy định:

  • Phương thức bắt buộc: POST
  • Content-Type: application/json
  • Payload là JSON tuỳ ý — không bị validate tại lớp này

📸 [Ảnh minh họa] — Ảnh chụp Postman/Curl gửi request thành công


Bước 2. KafkaProducerApi Nhận Request

File: 


KafkaProducerApi.xml

 

API nhận request và bắt đầu luồng xử lý inSequence.

xml
<api context="/kafka-producer" name="KafkaProducerApi">
<resource methods="POST">
<inSequence>
<!-- Log request vào -->
<log level="custom">
<property name="API_NAME" value="KafkaProducerApi"/>
<property name="STATUS" value="Processing incoming message..."/>
<property name="PAYLOAD" expression="json-eval($)"/>
</log>
...
</inSequence>
<faultSequence>
<!-- Trả lỗi nếu Kafka fail -->
</faultSequence>
</resource>
</api>

Log ghi nhận:

  • API_NAME: Tên API xác định nguồn log
  • STATUS: Trạng thái hiện tại của luồng
  • PAYLOAD: Toàn bộ JSON body của request

📸 [Ảnh minh họa] — Ảnh log WSO2 MI Console hoặc file log khi request vào


Bước 3. Khởi Tạo Kết Nối Kafka (kafkaTransport.init)

File: 


KafkaProducerApi.xml (trong inSequence)

 

xml
<kafkaTransport.init>
<name>KafkaConnection</name>
<bootstrapServers>hdp-master:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<maxBlock>5000</maxBlock>
</kafkaTransport.init>

Giải thích từng trường:

Trường Giá trị Mô tả
name KafkaConnection Tên kết nối dùng để Sequence tham chiếu (configKey).
bootstrapServers hdp-master:9092 Địa chỉ Kafka Broker.
keySerializerClass StringSerializer Class serialize Key của message (kiểu chuỗi).
valueSerializerClass StringSerializer Class serialize Value của message (kiểu chuỗi).
acks all Broker phải xác nhận ghi đủ trên mọi replica mới coi là thành công.
requestTimeout 10000 Timeout cho mỗi request gửi lên Kafka (ms).
maxBlock 5000 Thời gian tối đa chờ nếu buffer đầy hoặc Kafka chưa sẵn sàng (ms).

Bước 4. Publish Message vào Kafka

File: 


KafkaProducerApi.xml

 

xml
<kafkaTransport.publishMessages>
<topic>test_topic_01</topic>
</kafkaTransport.publishMessages>

Giải thích:

  • topictest_topic_01 — Topic nhận dữ liệu đầu vào.
  • Nội dung message là toàn bộ payload nhận từ Client.
  • Không có key, partition cụ thể → Kafka tự điều phối vào các partition.

Bước 5. Trả Response Cho Client (Producer)

xml
<payloadFactory media-type="json">
<format>{"success": "true", "message": "Hồ sơ đã được gửi thành công"}</format>
</payloadFactory>
<respond/>

Sau khi publish xong, API trả ngay về Client mà không chờ xử lý phía Consumer.

Response thành công:

json
{
"success": "true",
"message": "Hồ sơ đã được gửi thành công"
}

Response thất bại (faultSequence):

json
{
"status": "Failed",
"error": "<error_message>"
}

Bước 6. Inbound Endpoint Lắng Nghe Kafka

File: 


Load_balance_example.xml

 

Đây là thành phần chạy ngầm liên tục, tự động poll message từ Kafka.

xml
<inboundEndpoint
name="Load_balance_example"
class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"
sequence="Load_balance_example-inboundSequence"
onError="Load_balance_example-inboundErrorSequence">

Toàn bộ tham số cấu hình:

Tham số Giá trị Mô tả
interval 100 Khoảng cách giữa 2 lần poll (ms).
sequential false false = đa luồng, tận dụng tối đa CPU.
coordination true Tránh consume trùng khi chạy cluster nhiều node.
suspend false false = khởi động ngay, không dừng.
bootstrap.servers hdp-master:9092 Địa chỉ Kafka Broker kết nối đến.
topic.name test_topic_01 Topic mà Inbound Endpoint theo dõi.
group.id group1 Tên Consumer Group. Nhiều node cùng group sẽ chia partition.
contentType application/json Định dạng nội dung message nhận về.
poll.timeout 5000 Thời gian chờ khi không có dữ liệu (ms).
key.deserializer StringDeserializer Class giải mã Key của message.
value.deserializer StringDeserializer Class giải mã Value của message.
avro.use.logical.type.converters false Không dùng Avro logical type.
enable.auto.commit true Tự commit offset sau khi đọc xong.
auto.commit.interval.ms 5000 Chu kỳ tự commit offset (ms).
auto.offset.reset latest Bắt đầu đọc từ message mới nhất nếu không có offset cũ.
exclude.internal.topics true Bỏ qua các topic nội bộ của Kafka.
check.crcs true Kiểm tra tính toàn vẹn dữ liệu (CRC).
partition.assignment.strategy RangeAssignor Chiến lược phân chia partition cho consumer.
max.poll.interval.ms 300000 Thời gian tối đa xử lý một đợt poll (5 phút).
max.poll.records 500 Số message tối đa lấy về mỗi đợt poll.
fetch.max.wait.ms 500 Thời gian chờ tối đa fetch từ broker nếu chưa đủ data (ms).
receive.buffer.bytes 65536 Kích thước buffer nhận dữ liệu TCP (64KB).
send.buffer.bytes 131072 Kích thước buffer gửi dữ liệu TCP (128KB).
request.timeout.ms 305000 Timeout cho mỗi request gửi đến Broker (ms).
reconnect.backoff.ms 50 Thời gian chờ trước khi kết nối lại sau lỗi (ms).
retry.backoff.ms 100 Thời gian chờ trước khi thử lại request thất bại (ms).
connections.max.idle.ms 540000 Đóng kết nối nếu idle quá thời gian này (9 phút).
security.protocol PLAINTEXT Giao thức bảo mật (tắt mã hóa).
metrics.num.samples 2 Số mẫu dùng để tính metrics.
metrics.recording.level INFO Mức độ ghi metrics.
metrics.sample.window.ms 30000 Cửa sổ thời gian lấy mẫu metrics (30 giây).

Bước 7. Sequence Xử Lý Message Nhận Được

File: 


Load_balance_example-inboundSequence.xml

 

7.1. Lấy Payload từ Kafka

xml
<property name="PAYLOAD" expression="json-eval($)" />
<log level="custom">
<property name="STATUS" value="[KafkaConsumer] Message du lieu nhan duoc"/>
<property name="TOPIC" value="test_topic_01"/>
<property name="PAYLOAD" expression="get-property('PAYLOAD')"/>
</log>

Các property được tạo:

Property Giá trị / Expression Mô tả
PAYLOAD json-eval($) Nội dung JSON toàn bộ của message từ Kafka.

7.2. Gọi Backend API

xml
<header name="Authorization" scope="transport"
value="Bearer eyJhbGciOiJIUzI1NiJ9..."/>
<property name="Content-Type" value="application/json" scope="transport"/>

<call>
<endpoint>
<address uri="http://192.168.0.133:8080/api/v1/plannings">
<suspendOnFailure>
<initialDuration>1000</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>60000</maximumDuration>
</suspendOnFailure>
</address>
</endpoint>
</call>

Thông tin endpoint:

Trường Giá trị Mô tả
uri http://192.168.0.133:8080/api/v1/plannings URL Backend API nhận dữ liệu.
Authorization Bearer <jwt_token> Token xác thực gửi kèm header.
Content-Type application/json Định dạng body gửi đến Backend.
initialDuration 1000 Chờ 1 giây trước khi retry khi lỗi (ms).
progressionFactor 1.0 Hệ số tăng thời gian retry (1.0 = không tăng).
maximumDuration 60000 Thời gian chờ tối đa khi retry (60 giây).

7.3. Lưu Response từ Backend

xml
<property name="API_RESPONSE" expression="json-eval($)" scope="default"/>
<property name="HTTP_STATUS" expression="$axis2:HTTP_SC" scope="default"/>

Các property lưu kết quả:

Property Expression Mô tả
API_RESPONSE json-eval($) Toàn bộ JSON response từ Backend API.
HTTP_STATUS $axis2:HTTP_SC Mã HTTP status code (200, 500...) của Backend.
RESPONSE_SIZE fn:string-length(...) Độ dài chuỗi JSON của response (dùng để log).

7.4. Đóng Gói Lại Dữ Liệu (payloadFactory)

xml
<payloadFactory media-type="json" template-type="default">
<format>{
"eventType": "transaction",
"source": "bank-transactions",
"data": ${payload}
}</format>
</payloadFactory>

Cấu trúc JSON đầu ra:

Trường Giá trị Kiểu Mô tả
eventType transaction String (cố định) Phân loại sự kiện.
source bank-transactions String (cố định) Nguồn gốc dữ liệu.
data ${payload} Object (JSON động) Toàn bộ message gốc nhận từ Kafka.

7.5. Publish Kết Quả vào Topic Phản Hồi

xml
<kafkaTransport.publishMessages configKey="KafkaConnection">
<topic>processed_topic</topic>
<partitionNo>0</partitionNo>
<forwardExistingHeaders>None</forwardExistingHeaders>
<customHeaders>[]</customHeaders>
</kafkaTransport.publishMessages>

Các trường publish:

Trường Giá trị Mô tả
configKey KafkaConnection Tham chiếu đến Local Entry kết nối Kafka.
topic processed_topic Topic nhận kết quả đã xử lý.
partitionNo 0 Partition cố định (0 = partition đầu tiên).
forwardExistingHeaders None Không chuyển tiếp header gốc.
customHeaders [] Không thêm header tuỳ chỉnh.

Bước 8. Local Entry - Cấu Hình Kết Nối Kafka Tái Sử Dụng

File: 


KafkaConnection.xml

 

xml
<localEntry key="KafkaConnection">
<kafkaTransport.init>
<connectionType>KAFKA</connectionType>
<bootstrapServers>hdp-master:9092</bootstrapServers>
<keySerializerClass>...StringSerializer</keySerializerClass>
<valueSerializerClass>...StringSerializer</valueSerializerClass>
<poolingEnabled>false</poolingEnabled>
<name>KafkaConnection</name>
</kafkaTransport.init>
</localEntry>

Giải thích:

Trường Giá trị Mô tả
key KafkaConnection Tên Local Entry dùng để gọi qua configKey.
connectionType KAFKA Loại kết nối.
bootstrapServers hdp-master:9092 Kafka Broker.
poolingEnabled false Không dùng connection pool.

Bước 9. Xử Lý Lỗi (Error Sequence)

File: 


Load_balance_example-inboundErrorSequence.xml

 

xml
<sequence name="Load_balance_example-inboundErrorSequence">
<log category="INFO" logMessageID="false" logFullPayload="false">
<message>Lỗi khi lắng nghe lấy dữ liệu từ kafka</message>
</log>
</sequence>

Khi Inbound Endpoint bị lỗi (không parse được message, mất kết nối...), WSO2 MI sẽ kích hoạt sequence lỗi này.

⚠️ Ghi chú: Hiện tại sequence lỗi chỉ log thông báo. Cần bổ sung logic retry hoặc dead-letter queue cho production.


1.5. Luồng Phát Triển & Deploy


Dev Code XML → Build CAR → Deploy lên WSO2 MI

Bước 10. Build Project

bash
mvn clean install

Output: File Loadbalance_kafka_1.0.0.car trong thư mục target/.

Bước 11. Deploy lên WSO2 MI Server

Tự động qua Maven:

bash
mvn deploy

Hoặc copy file .car vào thư mục <WSO2MI_HOME>/repository/deployment/server/carbonapps/.

Cấu hình server deploy (trong 


pom.xml):

 

Trường Giá trị Mô tả
serverUrl http://192.168.0.167:9201 địa chỉ WSO2 MI Management API.
userName adminvp Tài khoản admin MI.
serverType mi Loại server WSO2.
operation deploy Hành động thực hiện.

📸 [Ảnh minh họa] — Ảnh chụp Deploy thành công trong WSO2 MI Dashboard


1.6. Sơ Đồ Tổng Thể API Lifecycle


Client POST Request
KafkaProducerApi (context: /kafka-producer)
kafkaTransport.init (KafkaConnection Local Entry)
Publish → Kafka: test_topic_01
↓ ↓ (response lại Client ngay)
↓ {"success": "true", ...}
Inbound Endpoint (poll interval: 100ms)
inboundSequence
Call Backend: POST /api/v1/plannings
payloadFactory: {eventType, source, data}
Publish → Kafka: processed_topic
↓ (error path)
inboundErrorSequence → Log lỗi

1.7. Phụ Lục: Cấu Hình Build (


pom.xml)

Tham số Giá trị Mô tả
artifactId Loadbalance_kafka Tên artifact Maven.
groupId com.microintegrator.projects Group package.
version 1.0.0 Phiên bản hiện tại.
project.runtime.version 4.4.0 Phiên bản WSO2 MI.
dockerfile.base.image wso2/wso2mi:4.4.0 Docker base image.
dockerfile.name loadbalance_kafka:1.0.0 Tên Docker image output.
mi-inbound-kafka 2.0.6 Phiên bản connector nhận message từ Kafka.
mi-connector-kafka 3.3.10 Phiên bản connector gửi message lên Kafka.
mi-connector-http 0.1.14 Phiên bản connector gọi HTTP.