Luồng Xử Lý Dịch Vụ API Tích Hợp Kafka (Kafka API Processing Flow)
1.1. Mục tiêu chương
Tiêu
MôTài liệu này mô tả luồng xử lý chuẩn của mộtdịch APIvụ backendLoadbalance_kafka trên nền tảng WSO2 Micro Integrator (WSO2 MI), từ khi nhậnClient gửi request đến khi deploydữ productionliệu nhằm:được xử lý và phản hồi về. Mục tiêu:
Thống nhất cách
thành viênxâyhiểudựngkiếnAPItrúc giữa cácteamĐảm bảo
tuânđúngthủluồngđầydữđủliệucácquaquytừngđịnhlớpkiến trúcKafka- Dễ
mởDễdàng trace lỗi vàauditrộng - sau
Liên kết các chuẩn đã quy định ở các chương trướcnày
1.2. Phạm viVi áp dụng
Áp dụng cho:
- Dự
Tấtán:cảLoadbalance_kafkaREST(WSO2APIMItrong modules/**v4.4.0) - Ngôn
Corengữ:TeamSynapsevàXMLPartner(WSO2TeamMediation) - Các
Sequence, Local EntryTấtcomponent:cảAPI,serviceInboundSpringEndpoint,Boot - Kafka Connector:
mi-inbound-kafkav2.0.6,mi-connector-kafkav3.3.10
1.3. Tổng quanQuan luồng xử lý API
Luồng chuẩn:
Client Request
↓
Security / Auth
↓
Controller
↓
DTO Validate
↓
Service Logic
↓
Repository / DB
↓
Response Mapping
↓
Logging & Audit
↓
Return Response
1.4. Các bước xử lý chi tiết
Bước 1. Client gọi API
Ví dụ:
POST /api/v1/users
Payload:
{
"username": "test",
"password": "123456"
}
Quy định liên quan:
👉 Chương 7 — API Design & Response Standard
Bước 2. Security / Authentication
Hệ thống kiểm tra:
JWT / SSO tokenPermissionRole
Nếu fail:
401 Unauthorized
403 Forbidden
Quy định liên quan:
👉 Chương 8 — Cơ chế xác thực & SSO (WSO2)
Bước 3. Controller nhận request
@PostMapping
public ApiResponse<UserResponse> create(
@Valid @RequestBody UserCreateRequest request) {
return ApiResponse.ok(userService.create(request));
}
Quy định:
Name ControllerKhông chứa business logic
👉 Chương 3 — Coding Convention👉 Chương 7 — API Design
Bước 4. DTO Validation
DTO:
public class UserCreateRequest {
@NotBlank
@Size(max = 100)
private String username;
@NotBlank
private String password;
}
Spring tự validate trước khi vào service.
Nếu lỗi:
400 Bad Request
Response chuẩn:
{
"code": "VALIDATION_ERROR",
"message": "username is required"
}
Quy định:
👉 Chương 3 — Coding Convention (Validation)👉 Chương 7 — Response Standard
Bước 5. Service xử lý logic
public UserResponse create(UserCreateRequest req) {
log.info("Create user {}", req.getUsername());
UserEntity e = mapper.toEntity(req);
repo.save(e);
return mapper.toResponse(e);
}
Cấu trúc chuẩn 1 hàm service:
validate business
↓
main logic
↓
repository
↓
mapping
↓
return
Quy định:
👉 Chương 3 — Coding Convention👉 Chương 4 — Entity & Database
Bước 6. Repiository & Database
userRepository.save(entity);
Nếu thay đổi schema:
update entitymigration script
Quy định:
👉 Chương 4 — Thay đổi Entity & DB👉 Chương 5 — Schema & Migration
Bước 7. Mapping Response
return UserResponse.builder()
.id(e.getId())
.username(e.getUsername())
.build();
Response chuẩn:
{
"code": "SUCCESS",
"data": {
"id": 1,
"username": "test"
}
}
Quy định:
👉 Chương 7 — Response Standard
Bước 8. Logging & Audit
Trong service:
log.info("User created id={}", e.getId());
Nếu nghiệp vụ quan trọng:
ghi audit_log
Quy định:
👉 Chương 3 — Logging👉 Chương 6 — Audit Log
Bước 9. Exception Handling
Ví dụ:
if (exists) {
throw new BusinessException("USER_EXISTS");
}
Global handler:
@ExceptionHandler(BusinessException.class)
Response:
{
"code": "USER_EXISTS",
"message": "User already exists"
}
Quy định:
👉 Chương 7 — Response & Error
1.5. Luồng phát triển & release API
Sau khi code xong:
Dev → Commit → PR → Review → Merge → CI Build → Deploy
Bước 10. Commit & PR
Quy định:
👉 Chương 9 — Git Workflow & Pull Request
Bước 11. Checklist trước merge
Quy định:
👉 Chương 10 — Checklist merge
Bước 12. Deploy & Update DB
Nếu có migration:
chạy scriptdeploy service
Quy định:
👉 Chương 5 — Migration👉 Chương 12 — Connect DB
1.6. Sơ đồ tổng thể API lifecycle
Request
↓
Auth (Ch8)
↓
Controller (Ch3,7)
↓
DTO Validate (Ch3)
↓
Service Logic (Ch3)
↓
Entity/DB (Ch4,5)
↓
Mapping (Ch7)
↓
Logging/Audit (Ch3,6)
↓
Response (Ch7)
↓
Git/PR (Ch9,10)
↓
Deploy (Ch5,12)1.7. Transaction trong xử lý API
1.7.1. Nguyên tắc transaction
Mọi thao tác thay đổi dữ liệu nghiệp vụ trong API phải được thực hiện trong transaction nhằm đảm bảo:
Tính toàn vẹn dữ liệuTính nhất quán hệ thốngKhả năng rollback khi lỗiĐồng bộ dữ liệu và audit log
Transaction phải được đặt tại Service layer.
1.7.2. Quy định transaction
Transaction:
đặt tại Servicebao phủ toàn bộ nghiệp vụbao gồm repository và auditrollback khi exception
Không được đặt transaction tại:
ControllerRepositoryMapper
1.7.3. Ví dụ transaction chuẩn
Client (HTTP POST)
@Transactional
public UserResponse create(UserCreateRequest req) {
if (repo.existsByUsername(req.getUsername())) {
throw new BusinessException("USER_EXISTS");
}
UserEntity e = mapper.toEntity(req);
repo.save(e);
auditService.logCreate("USER", e.getId());
return mapper.toResponse(e);
}
📸 [Ảnh minh họa] — Chụp màn hình sơ đồ luồng WSO2 MI Studio hoặc Kafka Manager
1.7.4. QuyCác tắcBước rollback
Xử Lý Chi Tiết
📸 [Ảnh minh họa] — Chụp màn hình sơ đồ luồng WSO2 MI Studio hoặc Kafka Manager
Bước 1. Client Gửi Request
TransactionClient phảigọi rollbackAPI khi:qua HTTP:
- http
BusinessException RuntimeExceptionDatabase errorValidation fail trong serviceghi audit trước khi save dữ liệucommit từng phần nghiệp vụxử lý lỗi nhưng vẫn commituserIdusernamerolepermissiontenantorganizationxác định user thao táckiểm tra permissionghi auditfilter dữ liệu theo user- Phương thức bắt buộc:
POST - Content-Type:
application/json - Payload là JSON tuỳ ý — không bị validate tại lớp này
- xml
tin userId từ request truyền userId từ Controller nếu đã có contextbypass permission checkAPI_NAME: Tên APIdanhxácsáchđịnh nguồn logSTATUS: Trạng thái hiện tại của luồngPAYLOAD: Toàn bộ JSON body của request- xml
pagination sortingfiltertránh trả dữ liệu lớnđảm bảo hiệu năng DBhỗ trợ UI paging-
Trường Giá trị Mô tả nameKafkaConnectionTên kết nối dùng để Sequence tham chiếu ( configKey).bootstrapServershdp-master:9092Địa chỉ Kafka Broker. keySerializerClassStringSerializerClass serialize Key của message (kiểu chuỗi). valueSerializerClassStringSerializerClass serialize Value của message (kiểu chuỗi). acksallBroker phải xác nhận ghi đủ trên mọi replica mới coi là thành công. requestTimeout10000Timeout cho mỗi request gửi lên Kafka (ms). maxBlock5000Thời gian tối đa chờ nếu buffer đầy hoặc Kafka chưa sẵn sàng (ms). sizeKafkaProducerApi.xmlsortfilterxml1.9.3.Chuẩn response List API
<kafkaTransport.publishMessages><topic>test_topic_01</topic></kafkaTransport.publishMessages>{"code":"SUCCESS","data":{"content":[],"page":0,"size":20,"totalElements":100,"totalPages":5}}1.9.4.Giải
Quy định bắt buộcthích:topic:Khôngtest_topic_01trả— Topic nhận dữ liệu đầu vào.- Nội dung message là toàn bộ
bảngpayloadnhận từ Client. Pagination bắt buộc với list lớnRepository phải dùng paging queryKhông
cụ thể → Kafka tự điều phối vào các partition.loadcótoànkey,bộpartitionrelation
1.10.BướcGọi5.hệTrảthốngResponsengoàiChotrongClientAPI(Producer)1.10.1. Phạm viAPI có thể cần gọi:SSOExternal APIMessage QueueCacheFile Storage- json
Controller Repositorygọi trong Servicehandle lỗi externaltimeout / retry nếu cầnmap thành BusinessException hoặc SystemExceptionkhông expose lỗi rawghi log lỗitheo dõi hiệu năngtheo dõi lỗitheo dõi tải hệ thốngrequest counterror rateresponse timeDB query timeexternal call timetraceIddurationstatuslogauditexceptionKhông thay đổi breaking trong cùng versionAPI cũ phải giữ backward compatibilityAPI mới → tăng versionKhông reuse version- xml
Validation testError caseIntegration test API-
thànhcôngProperty Giá - trị
Expressionvalidation/failMô - tả
businessfail - trị
permissionPAYLOADfailjson-eval($)Nội dung 1.14. Hiệu năng & Database1.14.1. Nguyên tắc thiết kế DB trong APIAPI phải đảm bảo:query tối ưuindex phù hợpkhông N+1 querypagination cho list
1.14.2. Các lỗi cần tránh
củaloadJSON toàn bộrelationmessage - từ
queryKafka.
trongloopselect * bảng lớnthiếu index filter- bash
- hình server deploy (trong
-
Trường Giá trị Mô tả serverUrlhttp://192.168.0.167:9201địa chỉ WSO2 MI Management API. userNameadminvpTài khoản admin MI. serverTypemiLoại server WSO2. operationdeployHành động thực hiện. join📸có[Ảnhcầnminhthiếthọa]không— Ảnh chụp Deploy thành công trong WSO2 MI Dashboard
có1.6.
N+1SơkhôngĐồ Tổng
xml1.10.2.Kiến trúc gọi ngoàiLuồng chuẩn:Service → Integration Client → External SystemVí dụ:
<payloadFactory media-type="json"><format>{"success": "true", "message": "Hồ sơ đã được gửi thành công"}</format></payloadFactory><respond/>UserInfoinfo=ssoClient.getUser(token);1.10.3.Sau
Quykhiđịnhpublish xong, API trả ngay về Client mà không chờ xử lý phía Consumer.KhôngResponseđượcthànhgọi external tại:công:Phải:1.10.4. Xử lý lỗi externalNếu external fail:1.11. Monitoring & Metrics trong API1.11.1. Mục tiêuNgoài Logging & Audit, API production cần hỗ trợ:1.11.2. Metrics cần có1.11.3. Trace requestMỗi request phải có:TraceId phải liên kết:1.12. Versioning API1.12.1. Chuẩn versionAPI phải có version trong URL:
{"success": "true","message": "Hồ sơ đã được gửi thành công"}/api/v1/...1.12.2.Response
Nguyênthấttắcbạiversion(faultSequence):json1.12.3.Ví dụ
{"status": "Failed","error": "<error_message>"}/api/v1/users/api/v2/users
Bước 6. Inbound Endpoint Lắng Nghe Kafka
File:
1.13. Testing trước merge1.13.1. Mục tiêuLoad_balance_example.xml
ĐảmbảoAPIĐây
hoạtlà thành phần chạy ngầm liên tục, tự độngđúngpoll message từ Kafka.xml<inboundEndpointname="Load_balance_example"class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"sequence="Load_balance_example-inboundSequence"onError="Load_balance_example-inboundErrorSequence">Toàn bộ tham số cấu hình:
Tham số Giá trị Mô tả interval100Khoảng cách giữa 2 lần poll (ms). sequentialfalsefalse= đa luồng, tận dụng tối đa CPU.coordinationtrueTránh consume trùng khi chạy cluster nhiều node. suspendfalsefalse= khởi động ngay, không dừng.bootstrap.servershdp-master:9092Địa chỉ Kafka Broker kết nối đến. topic.nametest_topic_01Topic mà Inbound Endpoint theo dõi. group.idgroup1Tên Consumer Group. Nhiều node cùng group sẽ chia partition. contentTypeapplication/jsonĐịnh dạng nội dung message nhận về. poll.timeout5000Thời gian chờ khi không có dữ liệu (ms). key.deserializerStringDeserializerClass giải mã Key của message. value.deserializerStringDeserializerClass giải mã Value của message. avro.use.logical.type.convertersfalseKhông dùng Avro logical type. enable.auto.committrueTự commit offset sau khi đọc xong. auto.commit.interval.ms5000Chu kỳ tự commit offset (ms). auto.offset.resetlatestBắt đầu đọc từ message mới nhất nếu không có offset cũ. exclude.internal.topicstrueBỏ qua các topic nội bộ của Kafka. check.crcstrueKiểm tra tính toàn vẹn dữ liệu (CRC). partition.assignment.strategyRangeAssignorChiến lược phân chia partition cho consumer. max.poll.interval.ms300000Thời gian tối đa xử lý một đợt poll (5 phút). max.poll.records500Số message tối đa lấy về mỗi đợt poll. fetch.max.wait.ms500Thời gian chờ tối đa fetch từ broker nếu chưa đủ data (ms). receive.buffer.bytes65536Kích thước buffer nhận dữ liệu TCP (64KB). send.buffer.bytes131072Kích thước buffer gửi dữ liệu TCP (128KB). request.timeout.ms305000Timeout cho mỗi request gửi đến Broker (ms). reconnect.backoff.ms50Thời gian chờ trước khi mergekếtvànốideploy.lại sau lỗi (ms).retry.backoff.ms100Thời gian chờ trước khi thử lại request thất bại (ms). connections.max.idle.ms540000Đóng kết nối nếu idle quá thời gian này (9 phút). security.protocolPLAINTEXTGiao thức bảo mật (tắt mã hóa). metrics.num.samples2Số mẫu dùng để tính metrics. metrics.recording.levelINFOMức độ ghi metrics. metrics.sample.window.ms30000Cửa sổ thời gian lấy mẫu metrics (30 giây).
Bước 7. Sequence Xử Lý Message Nhận Được
File:
Load_balance_example-inboundSequence.xml
7.1.
13.2.LoạiLấytestPayloadbắttừbuộcKafka<property name="PAYLOAD" expression="json-eval($)" /><log level="custom"><property name="STATUS" value="[KafkaConsumer] Message du lieu nhan duoc"/><property name="TOPIC" value="test_topic_01"/><property name="PAYLOAD" expression="get-property('PAYLOAD')"/></log>
đượcUnitCáctestpropertyService1.13.3. Quy địnhAPI mới hoặc thay đổi logic phải có test:tạo:7.2. Gọi Backend API
xml<header name="Authorization" scope="transport"value="Bearer eyJhbGciOiJIUzI1NiJ9..."/><property name="Content-Type" value="application/json" scope="transport"/><call><endpoint><address uri="http://192.168.0.133:8080/api/v1/plannings"><suspendOnFailure><initialDuration>1000</initialDuration><progressionFactor>1.14.3.0</progressionFactor>Kiểmtra<maximumDuration>60000</maximumDuration></suspendOnFailure></address></endpoint></call>Thông tin endpoint:
Trường Giá trị Mô tả urihttp://192.168.0.133:8080/api/v1/planningsURL Backend API nhận dữ liệu. AuthorizationBearer <jwt_token>Token xác thực gửi kèm header. Content-Typeapplication/jsonĐịnh dạng body gửi đến Backend. initialDuration1000Chờ 1 giây trước khi reviewretry khi lỗi (ms).progressionFactor1.0Hệ số tăng thời gian retry (1.0 = không tăng). maximumDuration60000Thời gian chờ tối đa khi retry (60 giây). 7.3. Lưu Response từ Backend
xml<property name="API_RESPONSE" expression="json-eval($)" scope="default"/><property name="HTTP_STATUS" expression="$axis2:HTTP_SC" scope="default"/>Các property lưu kết quả:
Property Expression Mô tả API_RESPONSEjson-eval($)Toàn bộ JSON response từ Backend API. HTTP_STATUS$axis2:HTTP_SCMã HTTP status code (200, 500...) của Backend. RESPONSE_SIZEfn:string-length(...)Độ dài chuỗi JSON của response (dùng để log). 7.4. Đóng Gói Lại Dữ Liệu (payloadFactory)
xml<payloadFactory media-type="json" template-type="default"><format>{"eventType": "transaction","source": "bank-transactions","data": ${payload}}</format></payloadFactory>Cấu trúc JSON đầu ra:
Trường Giá trị Kiểu Mô tả eventTypetransactionString(cố định)Phân loại sự kiện. sourcebank-transactionsString(cố định)Nguồn gốc dữ liệu. data${payload}Object(JSON động)Toàn bộ message gốc nhận từ Kafka. 7.5. Publish Kết Quả vào Topic Phản Hồi
xml<kafkaTransport.publishMessages configKey="KafkaConnection"><topic>processed_topic</topic><partitionNo>0</partitionNo><forwardExistingHeaders>None</forwardExistingHeaders><customHeaders>[]</customHeaders></kafkaTransport.publishMessages>Các trường publish:
Trường Giá trị Mô tả configKeyKafkaConnectionTham chiếu đến Local Entry kết nối Kafka. topicprocessed_topicTopic nhận kết quả đã xử lý. partitionNo0Partition cố định (0 = partition đầu tiên). forwardExistingHeadersNoneKhông chuyển tiếp header gốc. customHeaders[]Không thêm header tuỳ chỉnh.
Bước 8. Local Entry - Cấu Hình Kết Nối Kafka Tái Sử Dụng
File:
KafkaConnection.xml
xml<localEntry key="KafkaConnection"><kafkaTransport.init><connectionType>KAFKA</connectionType><bootstrapServers>hdp-master:9092</bootstrapServers><keySerializerClass>...StringSerializer</keySerializerClass><valueSerializerClass>...StringSerializer</valueSerializerClass><poolingEnabled>false</poolingEnabled><name>KafkaConnection</name></kafkaTransport.init></localEntry>Giải thích:
Trường Giá trị Mô tả keyKafkaConnectionTên Local Entry dùng để gọi qua configKey.connectionTypeKAFKALoại kết nối. bootstrapServershdp-master:9092Kafka Broker. poolingEnabledfalseKhông dùng connection pool.
Bước 9. Xử Lý Lỗi (Error Sequence)
File:
Load_balance_example-inboundErrorSequence.xml
xml<sequence name="Load_balance_example-inboundErrorSequence"><log category="INFO" logMessageID="false" logFullPayload="false"><message>Lỗi khi lắng nghe lấy dữ liệu từ kafka</message></log></sequence>Khi
reviewInboundAPIEndpointcầnbịkiểmlỗitra:(không parse được message, mất kết nối...), WSO2 MI sẽ kích hoạt sequence lỗi này.⚠️ Ghi chú: Hiện tại sequence lỗi chỉ log thông báo. Cần bổ sung logic retry hoặc dead-letter queue cho production.
1.5. Luồng Phát Triển & Deploy
Dev Code XML → Build CAR → Deploy lên WSO2 MIBước 10. Build Project
bashmvn clean installOutput: File
Loadbalance_kafka_1.0.0.cartrong thư mụctarget/.Bước 11. Deploy lên WSO2 MI Server
Tự động qua Maven:
mvn deployqueryHoặccócopyindexfilekhông.carvào thư mục<WSO2MI_HOME>/repository/deployment/server/carbonapps/.Cấu
list có paging khôngpom.xml):Client POST Request↓KafkaProducerApi (context: /kafka-producer)↓kafkaTransport.init (KafkaConnection Local Entry)↓Publish → Kafka: test_topic_01↓ ↓ (response lại Client ngay)↓ {"success": "true", ...}↓Inbound Endpoint (poll interval: 100ms)↓inboundSequence↓Call Backend: POST /api/v1/plannings↓payloadFactory: {eventType, source, data}↓Publish → Kafka: processed_topic↓ (error path)inboundErrorSequence → Log lỗi
1.7. Phụ Lục: Cấu Hình Build (
pom.xml)
Tham số Giá trị Mô tả artifactIdLoadbalance_kafkaTên artifact Maven. groupIdcom.microintegrator.projectsGroup package. version1.0.0Phiên bản hiện tại. project.runtime.version4.4.0Phiên bản WSO2 MI. dockerfile.base.imagewso2/wso2mi:4.4.0Docker base image. dockerfile.nameloadbalance_kafka:1.0.0Tên Docker image output. mi-inbound-kafka2.0.6Phiên bản connector nhận message từ Kafka. mi-connector-kafka3.3.10Phiên bản connector gửi message lên Kafka. mi-connector-http0.1.14Phiên bản connector gọi HTTP.
Không được:
1.8. Security Context trong Service
1.8.1. Khái niệm
Security Context là thông tin người dùng hiện tại sau khi xác thực (JWT / SSO).
Thông tin thường có:
1.8.2. Sử dụng trong Service
Service được phép truy cập Security Context để:
Ví dụ:
Long userId = SecurityUtils.getCurrentUserId();
String username = SecurityUtils.getCurrentUsername();
1.8.3.
Quy địnhđịnh:
Không📸được:[Ảnh minh họa] — Ảnh chụp Postman/Curl gửi request thành công
Bước 2. KafkaProducerApi Nhận Request
File:
KafkaProducerApi.xml
API nhận request và bắt đầu luồng xử lý inSequence.
Permission check phải tại Service layer.
1.8.4. Ví dụ kiểm tra quyền
<api context="/kafka-producer" name="KafkaProducerApi">
public void updateUser(Long id, UserUpdateRequest req) {
Long currentUser = SecurityUtils.getCurrentUserId();
if (!permissionService.canUpdateUser(currentUser, id)) {
throw new AccessDeniedException("NO_PERMISSION");
}
...
}
1.9.
Log Xửghi lýnhận:
📸 [Ảnh minh họa] — Ảnh log WSO2 MI Console hoặc file log khi request vào
Bước 3. Khởi Tạo Kết Nối Kafka (List API)kafkaTransport.init)
1.9.1.
File: Nguyên
APIKafkaProducerApi.xml trả(trong danhinSequence)
phải hỗ trợ:
Mục tiêu:
1.9.2. Chuẩn request List API
Ví dụ:
<kafkaTransport.init>
GET /api/v1/users?page=0&size=20&sort=createdAt,desc
ThamGiải sốthích chuẩn:từng trường:
Bước 4. Publish Message vào Kafka
pageFile: (bắt đầu từ 0)