Luồng Xử Lý Dịch Vụ API Tích Hợp Kafka (Kafka API Processing Flow)
1.1. Mục Tiêu
Tài liệu này mô tả luồng xử lý chuẩn của dịch vụ Loadbalance_kafka trên nền tảng WSO2 Micro Integrator (WSO2 MI), từ khi Client gửi request đến khi dữ liệu được xử lý và phản hồi về. Mục tiêu:
- Thống nhất cách hiểu kiến trúc giữa các thành viên
- Đảm bảo đúng luồng dữ liệu qua từng lớp Kafka
- Dễ dàng trace lỗi và mở rộng sau này
1.2. Phạm Vi Áp Dụng
- Dự án:
Loadbalance_kafka(WSO2 MI v4.4.0) - Ngôn ngữ: Synapse XML (WSO2 Mediation)
- Các component: API, Inbound Endpoint, Sequence, Local Entry
- Kafka Connector:
mi-inbound-kafkav2.0.6,mi-connector-kafkav3.3.10
1.3. Tổng Quan Luồng Xử Lý
📸 [Ảnh minh họa] — Chụp màn hình sơ đồ luồng WSO2 MI Studio hoặc Kafka Manager
1.4. Các Bước Xử Lý Chi Tiết
Bước 1. Client Gửi Request
Client gọi API qua HTTP:
Quy định:
- Phương thức bắt buộc:
POST - Content-Type:
application/json - Payload là JSON tuỳ ý — không bị validate tại lớp này
📸 [Ảnh minh họa] — Ảnh chụp Postman/Curl gửi request thành công
Bước 2. KafkaProducerApi Nhận Request
File:
KafkaProducerApi.xml
API nhận request và bắt đầu luồng xử lý inSequence.
Log ghi nhận:
API_NAME: Tên API xác định nguồn logSTATUS: Trạng thái hiện tại của luồngPAYLOAD: Toàn bộ JSON body của request
📸 [Ảnh minh họa] — Ảnh log WSO2 MI Console hoặc file log khi request vào
Bước 3. Khởi Tạo Kết Nối Kafka (kafkaTransport.init)
File:
KafkaProducerApi.xml (trong inSequence)
Giải thích từng trường:
| Trường | Giá trị | Mô tả |
|---|---|---|
name |
KafkaConnection |
Tên kết nối dùng để Sequence tham chiếu (configKey). |
bootstrapServers |
hdp-master:9092 |
Địa chỉ Kafka Broker. |
keySerializerClass |
StringSerializer |
Class serialize Key của message (kiểu chuỗi). |
valueSerializerClass |
StringSerializer |
Class serialize Value của message (kiểu chuỗi). |
acks |
all |
Broker phải xác nhận ghi đủ trên mọi replica mới coi là thành công. |
requestTimeout |
10000 |
Timeout cho mỗi request gửi lên Kafka (ms). |
maxBlock |
5000 |
Thời gian tối đa chờ nếu buffer đầy hoặc Kafka chưa sẵn sàng (ms). |
Bước 4. Publish Message vào Kafka
File:
KafkaProducerApi.xml
Giải thích:
topic:test_topic_01— Topic nhận dữ liệu đầu vào.- Nội dung message là toàn bộ
payloadnhận từ Client. - Không có key, partition cụ thể → Kafka tự điều phối vào các partition.
Bước 5. Trả Response Cho Client (Producer)
Sau khi publish xong, API trả ngay về Client mà không chờ xử lý phía Consumer.
Response thành công:
Response thất bại (faultSequence):
Bước 6. Inbound Endpoint Lắng Nghe Kafka
File:
Load_balance_example.xml
Đây là thành phần chạy ngầm liên tục, tự động poll message từ Kafka.
Toàn bộ tham số cấu hình:
| Tham số | Giá trị | Mô tả |
|---|---|---|
interval |
100 |
Khoảng cách giữa 2 lần poll (ms). |
sequential |
false |
false = đa luồng, tận dụng tối đa CPU. |
coordination |
true |
Tránh consume trùng khi chạy cluster nhiều node. |
suspend |
false |
false = khởi động ngay, không dừng. |
bootstrap.servers |
hdp-master:9092 |
Địa chỉ Kafka Broker kết nối đến. |
topic.name |
test_topic_01 |
Topic mà Inbound Endpoint theo dõi. |
group.id |
group1 |
Tên Consumer Group. Nhiều node cùng group sẽ chia partition. |
contentType |
application/json |
Định dạng nội dung message nhận về. |
poll.timeout |
5000 |
Thời gian chờ khi không có dữ liệu (ms). |
key.deserializer |
StringDeserializer |
Class giải mã Key của message. |
value.deserializer |
StringDeserializer |
Class giải mã Value của message. |
avro.use.logical.type.converters |
false |
Không dùng Avro logical type. |
enable.auto.commit |
true |
Tự commit offset sau khi đọc xong. |
auto.commit.interval.ms |
5000 |
Chu kỳ tự commit offset (ms). |
auto.offset.reset |
latest |
Bắt đầu đọc từ message mới nhất nếu không có offset cũ. |
exclude.internal.topics |
true |
Bỏ qua các topic nội bộ của Kafka. |
check.crcs |
true |
Kiểm tra tính toàn vẹn dữ liệu (CRC). |
partition.assignment.strategy |
RangeAssignor |
Chiến lược phân chia partition cho consumer. |
max.poll.interval.ms |
300000 |
Thời gian tối đa xử lý một đợt poll (5 phút). |
max.poll.records |
500 |
Số message tối đa lấy về mỗi đợt poll. |
fetch.max.wait.ms |
500 |
Thời gian chờ tối đa fetch từ broker nếu chưa đủ data (ms). |
receive.buffer.bytes |
65536 |
Kích thước buffer nhận dữ liệu TCP (64KB). |
send.buffer.bytes |
131072 |
Kích thước buffer gửi dữ liệu TCP (128KB). |
request.timeout.ms |
305000 |
Timeout cho mỗi request gửi đến Broker (ms). |
reconnect.backoff.ms |
50 |
Thời gian chờ trước khi kết nối lại sau lỗi (ms). |
retry.backoff.ms |
100 |
Thời gian chờ trước khi thử lại request thất bại (ms). |
connections.max.idle.ms |
540000 |
Đóng kết nối nếu idle quá thời gian này (9 phút). |
security.protocol |
PLAINTEXT |
Giao thức bảo mật (tắt mã hóa). |
metrics.num.samples |
2 |
Số mẫu dùng để tính metrics. |
metrics.recording.level |
INFO |
Mức độ ghi metrics. |
metrics.sample.window.ms |
30000 |
Cửa sổ thời gian lấy mẫu metrics (30 giây). |
Bước 7. Sequence Xử Lý Message Nhận Được
File:
Load_balance_example-inboundSequence.xml
7.1. Lấy Payload từ Kafka
Các property được tạo:
| Property | Giá trị / Expression | Mô tả |
|---|---|---|
PAYLOAD |
json-eval($) |
Nội dung JSON toàn bộ của message từ Kafka. |
7.2. Gọi Backend API
Thông tin endpoint:
| Trường | Giá trị | Mô tả |
|---|---|---|
uri |
http://192.168.0.133:8080/api/v1/plannings |
URL Backend API nhận dữ liệu. |
Authorization |
Bearer <jwt_token> |
Token xác thực gửi kèm header. |
Content-Type |
application/json |
Định dạng body gửi đến Backend. |
initialDuration |
1000 |
Chờ 1 giây trước khi retry khi lỗi (ms). |
progressionFactor |
1.0 |
Hệ số tăng thời gian retry (1.0 = không tăng). |
maximumDuration |
60000 |
Thời gian chờ tối đa khi retry (60 giây). |
7.3. Lưu Response từ Backend
Các property lưu kết quả:
| Property | Expression | Mô tả |
|---|---|---|
API_RESPONSE |
json-eval($) |
Toàn bộ JSON response từ Backend API. |
HTTP_STATUS |
$axis2:HTTP_SC |
Mã HTTP status code (200, 500...) của Backend. |
RESPONSE_SIZE |
fn:string-length(...) |
Độ dài chuỗi JSON của response (dùng để log). |
7.4. Đóng Gói Lại Dữ Liệu (payloadFactory)
Cấu trúc JSON đầu ra:
| Trường | Giá trị | Kiểu | Mô tả |
|---|---|---|---|
eventType |
transaction |
String (cố định) |
Phân loại sự kiện. |
source |
bank-transactions |
String (cố định) |
Nguồn gốc dữ liệu. |
data |
${payload} |
Object (JSON động) |
Toàn bộ message gốc nhận từ Kafka. |
7.5. Publish Kết Quả vào Topic Phản Hồi
Các trường publish:
| Trường | Giá trị | Mô tả |
|---|---|---|
configKey |
KafkaConnection |
Tham chiếu đến Local Entry kết nối Kafka. |
topic |
processed_topic |
Topic nhận kết quả đã xử lý. |
partitionNo |
0 |
Partition cố định (0 = partition đầu tiên). |
forwardExistingHeaders |
None |
Không chuyển tiếp header gốc. |
customHeaders |
[] |
Không thêm header tuỳ chỉnh. |
Bước 8. Local Entry - Cấu Hình Kết Nối Kafka Tái Sử Dụng
File:
KafkaConnection.xml
Giải thích:
| Trường | Giá trị | Mô tả |
|---|---|---|
key |
KafkaConnection |
Tên Local Entry dùng để gọi qua configKey. |
connectionType |
KAFKA |
Loại kết nối. |
bootstrapServers |
hdp-master:9092 |
Kafka Broker. |
poolingEnabled |
false |
Không dùng connection pool. |
Bước 9. Xử Lý Lỗi (Error Sequence)
File:
Load_balance_example-inboundErrorSequence.xml
Khi Inbound Endpoint bị lỗi (không parse được message, mất kết nối...), WSO2 MI sẽ kích hoạt sequence lỗi này.
⚠️ Ghi chú: Hiện tại sequence lỗi chỉ log thông báo. Cần bổ sung logic retry hoặc dead-letter queue cho production.
1.5. Luồng Phát Triển & Deploy
Bước 10. Build Project
Output: File Loadbalance_kafka_1.0.0.car trong thư mục target/.
Bước 11. Deploy lên WSO2 MI Server
Tự động qua Maven:
Hoặc copy file .car vào thư mục <WSO2MI_HOME>/repository/deployment/server/carbonapps/.
Cấu hình server deploy (trong
pom.xml):
| Trường | Giá trị | Mô tả |
|---|---|---|
serverUrl |
http://192.168.0.167:9201 |
địa chỉ WSO2 MI Management API. |
userName |
adminvp |
Tài khoản admin MI. |
serverType |
mi |
Loại server WSO2. |
operation |
deploy |
Hành động thực hiện. |
📸 [Ảnh minh họa] — Ảnh chụp Deploy thành công trong WSO2 MI Dashboard
1.6. Sơ Đồ Tổng Thể API Lifecycle
1.7. Phụ Lục: Cấu Hình Build (
pom.xml)
| Tham số | Giá trị | Mô tả |
|---|---|---|
artifactId |
Loadbalance_kafka |
Tên artifact Maven. |
groupId |
com.microintegrator.projects |
Group package. |
version |
1.0.0 |
Phiên bản hiện tại. |
project.runtime.version |
4.4.0 |
Phiên bản WSO2 MI. |
dockerfile.base.image |
wso2/wso2mi:4.4.0 |
Docker base image. |
dockerfile.name |
loadbalance_kafka:1.0.0 |
Tên Docker image output. |
mi-inbound-kafka |
2.0.6 |
Phiên bản connector nhận message từ Kafka. |
mi-connector-kafka |
3.3.10 |
Phiên bản connector gửi message lên Kafka. |
mi-connector-http |
0.1.14 |
Phiên bản connector gọi HTTP. |
Không có bình luận nào để hiển thị
Không có bình luận nào để hiển thị