Nhảy đến nội dung chính

Luồng Xử Lý Dịch Vụ API Tích Hợp Kafka (Kafka API Processing Flow)

1.1. Mục tiêu chương

Tiêu

Tài liệu này mô tả luồng xử lý chuẩn của mộtdịch APIvụ backendLoadbalance_kafka trên nền tảng WSO2 Micro Integrator (WSO2 MI), từ khi nhậnClient gửi request đến khi deploydữ productionliệu nhằm:được xử lý và phản hồi về. Mục tiêu:

  • Thống nhất cách xâyhiểu dựngkiến APItrúc giữa các team

    thành viên
  • Đảm bảo tuânđúng thủluồng đầydữ đủliệu cácqua quytừng địnhlớp kiến trúc

    Kafka
  • Dễ

    Dễdàng trace lỗi và audit

    mở
  • rộng
  • sau

    Liên kết các chuẩn đã quy định ở các chương trước

    này

1.2. Phạm viVi áp dụng

Áp dụng cho:

Dụng
  • Dự

    Tấtán: cảLoadbalance_kafka REST(WSO2 APIMI trong modules/**

    v4.4.0)
  • Ngôn

    Corengữ: TeamSynapse XML Partner(WSO2 Team

    Mediation)
  • Các

    Tấtcomponent: cảAPI, serviceInbound SpringEndpoint, Boot

    Sequence, Local Entry
  • Kafka Connector: mi-inbound-kafka v2.0.6, mi-connector-kafka v3.3.10

1.3. Tổng quanQuan luồng xử lý API

Luồng chuẩn:

Xử

Client Request ↓ Security / Auth ↓ Controller ↓ DTO Validate ↓ Service Logic ↓ Repository / DB ↓ Response Mapping ↓ Logging & Audit ↓ Return Response

1.4. Các bước xử lý chi tiết

Bước 1. Client gọi API

Ví dụ:

POST /api/v1/users

Payload:

{ "username": "test", "password": "123456" }

Quy định liên quan:

👉 Chương 7 — API Design & Response Standard


Bước 2. Security / Authentication

Hệ thống kiểm tra:

  • JWT / SSO token

  • Permission

  • Role

Nếu fail:

401 Unauthorized 403 Forbidden

Quy định liên quan:

👉 Chương 8 — Cơ chế xác thực & SSO (WSO2)


Bước 3. Controller nhận request


@PostMapping public ApiResponse<UserResponse> create( @Valid @RequestBody UserCreateRequest request) { return ApiResponse.ok(userService.create(request)); }

Quy định:

  • Name Controller

  • Không chứa business logic

👉 Chương 3 — Coding Convention
👉 Chương 7 — API Design


Bước 4. DTO Validation

DTO:

public class UserCreateRequest { @NotBlank @Size(max = 100) private String username; @NotBlank private String password; }

Spring tự validate trước khi vào service.

Nếu lỗi:

400 Bad Request

Response chuẩn:


{ "code": "VALIDATION_ERROR", "message": "username is required" }

Quy định:

👉 Chương 3 — Coding Convention (Validation)
👉 Chương 7 — Response Standard


Bước 5. Service xử lý logic


public UserResponse create(UserCreateRequest req) { log.info("Create user {}", req.getUsername()); UserEntity e = mapper.toEntity(req); repo.save(e); return mapper.toResponse(e); }

Cấu trúc chuẩn 1 hàm service:

validate business ↓ main logic ↓ repository ↓ mappingreturn

Quy định:

👉 Chương 3 — Coding Convention
👉 Chương 4 — Entity & Database


Bước 6. Repiository & Database


userRepository.save(entity);

Nếu thay đổi schema:

  • update entity

  • migration script

Quy định:

👉 Chương 4 — Thay đổi Entity & DB
👉 Chương 5 — Schema & Migration


Bước 7. Mapping Response


return UserResponse.builder() .id(e.getId()) .username(e.getUsername()) .build();

Response chuẩn:


{ "code": "SUCCESS", "data": { "id": 1, "username": "test" } }

Quy định:

👉 Chương 7 — Response Standard


Bước 8. Logging & Audit

Trong service:


log.info("User created id={}", e.getId());

Nếu nghiệp vụ quan trọng:

  • ghi audit_log

Quy định:

👉 Chương 3 — Logging
👉 Chương 6 — Audit Log


Bước 9. Exception Handling

Ví dụ:


if (exists) { throw new BusinessException("USER_EXISTS"); }

Global handler:


@ExceptionHandler(BusinessException.class)

Response:


{ "code": "USER_EXISTS", "message": "User already exists" }

Quy định:

👉 Chương 7 — Response & Error


1.5. Luồng phát triển & release API

Sau khi code xong:


Dev → Commit → PR → Review → Merge → CI Build → Deploy

Bước 10. Commit & PR

Quy định:

👉 Chương 9 — Git Workflow & Pull Request


Bước 11. Checklist trước merge

Quy định:

👉 Chương 10 — Checklist merge


Bước 12. Deploy & Update DB

Nếu có migration:

  • chạy script

  • deploy service

Quy định:

👉 Chương 5 — Migration
👉 Chương 12 — Connect DB


1.6. Sơ đồ tổng thể API lifecycle


Request ↓ Auth (Ch8) ↓ Controller (Ch3,7) ↓ DTO Validate (Ch3) ↓ Service Logic (Ch3) ↓ Entity/DB (Ch4,5) ↓ Mapping (Ch7) ↓ Logging/Audit (Ch3,6) ↓ Response (Ch7) ↓ Git/PR (Ch9,10) ↓ Deploy (Ch5,12)

1.7. Transaction trong xử lý API

1.7.1. Nguyên tắc transaction

Mọi thao tác thay đổi dữ liệu nghiệp vụ trong API phải được thực hiện trong transaction nhằm đảm bảo:

  • Tính toàn vẹn dữ liệu

  • Tính nhất quán hệ thống

  • Khả năng rollback khi lỗi

  • Đồng bộ dữ liệu và audit log

Transaction phải được đặt tại Service layer.


1.7.2. Quy định transaction

Transaction:

  • đặt tại Service

  • bao phủ toàn bộ nghiệp vụ

  • bao gồm repository và audit

  • rollback khi exception

Không được đặt transaction tại:

  • Controller

  • Repository

  • Mapper


1.7.3. Ví dụ transaction chuẩn


Client (HTTP POST)
 ↓
KafkaProducerApi (/kafka-producer)
 ↓
kafkaTransport.init (KafkaConnection)
 ↓
Kafka Topic: test_topic_01
 ↓
Inbound Endpoint (KafkaMessageConsumer)
 ↓
inboundSequence (Load_balance_example-inboundSequence)
 ↓
Backend API (POST http://192.168.0.133:8080/api/v1/plannings)
 ↓
payloadFactory (Đóng gói lại kết quả)
 ↓
Kafka Topic: processed_topic
@Transactional public UserResponse create(UserCreateRequest req) { if (repo.existsByUsername(req.getUsername())) { throw new BusinessException("USER_EXISTS"); } UserEntity e = mapper.toEntity(req); repo.save(e); auditService.logCreate("USER", e.getId()); return mapper.toResponse(e); }

📸 [Ảnh minh họa] — Chụp màn hình sơ đồ luồng WSO2 MI Studio hoặc Kafka Manager


1.7.4. QuyCác tắcBước rollback

Xử Lý Chi Tiết

Bước 1. Client Gửi Request

TransactionClient phảigọi rollbackAPI khi:qua HTTP:

  • http

    BusinessException

  • RuntimeException

  • Database error

  • Validation fail trong service

Không được:

  • ghi audit trước khi save dữ liệu

  • commit từng phần nghiệp vụ

  • xử lý lỗi nhưng vẫn commit


1.8. Security Context trong Service

1.8.1. Khái niệm

Security Context là thông tin người dùng hiện tại sau khi xác thực (JWT / SSO).

Thông tin thường có:

  • userId

  • username

  • role

  • permission

  • tenant

  • organization


1.8.2. Sử dụng trong Service

Service được phép truy cập Security Context để:

  • xác định user thao tác

  • kiểm tra permission

  • ghi audit

  • filter dữ liệu theo user

Ví dụ:

POST http://<wso2mi-host>:8290/kafka-producer
Content-Type: application/json
Authorization: Bearer <jwt_token>

{
 "field1": "value1",
 "field2": "value2"
}
Long userId = SecurityUtils.getCurrentUserId(); String username = SecurityUtils.getCurrentUsername();

1.8.3.

Quy địnhđịnh:

sử
    dụng

  • Phương thức bắt buộc: POST
  • Content-Type: application/json
  • Payload là JSON tuỳ ý — không bị validate tại lớp này
  • Không📸 được:[Ảnh minh họa] — Ảnh chụp Postman/Curl gửi request thành công


    Bước 2. KafkaProducerApi Nhận Request

    File: 


    KafkaProducerApi.xml

     

    API nhận request và bắt đầu luồng xử lý inSequence.

    • xml

      tin userId từ request

    • truyền userId từ Controller nếu đã có context

    • bypass permission check

    Permission check phải tại Service layer.


    1.8.4. Ví dụ kiểm tra quyền


    <api context="/kafka-producer" name="KafkaProducerApi">
     <resource methods="POST">
     <inSequence>
    <!-- Log request vào -->
     <log level="custom">
     <property name="API_NAME" value="KafkaProducerApi"/>
     <property name="STATUS" value="Processing incoming message..."/>
     <property name="PAYLOAD" expression="json-eval($)"/>
     </log>
     ...
     </inSequence>
     <faultSequence>
    <!-- Trả lỗi nếu Kafka fail -->
     </faultSequence>
     </resource>
    </api>
    public void updateUser(Long id, UserUpdateRequest req) { Long currentUser = SecurityUtils.getCurrentUserId(); if (!permissionService.canUpdateUser(currentUser, id)) { throw new AccessDeniedException("NO_PERMISSION"); } ... }

    1.9.

    Log Xửghi nhận:

    • API_NAME: Tên API danhxác sáchđịnh nguồn log
    • STATUS: Trạng thái hiện tại của luồng
    • PAYLOAD: Toàn bộ JSON body của request

    📸 [Ảnh minh họa] — Ảnh log WSO2 MI Console hoặc file log khi request vào


    Bước 3. Khởi Tạo Kết Nối Kafka (List API)kafkaTransport.init)

    1.9.1.

    File: Nguyên

    tắc List API


    APIKafkaProducerApi.xml trả(trong danhinSequence)

    sách

     phải hỗ trợ:

    • xml

      pagination

    • sorting

    • filter

    Mục tiêu:

    • tránh trả dữ liệu lớn

    • đảm bảo hiệu năng DB

    • hỗ trợ UI paging


    1.9.2. Chuẩn request List API

    Ví dụ:


    <kafkaTransport.init>
     <name>KafkaConnection</name>
     <bootstrapServers>hdp-master:9092</bootstrapServers>
     <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
     <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
     <acks>all</acks>
     <requestTimeout>10000</requestTimeout>
     <maxBlock>5000</maxBlock>
    </kafkaTransport.init>
    GET /api/v1/users?page=0&size=20&sort=createdAt,desc

    ThamGiải sốthích chuẩn:từng trường:

    • TrườngGiá trịMô tả
      nameKafkaConnectionTên kết nối dùng để Sequence tham chiếu (configKey).
      bootstrapServershdp-master:9092Địa chỉ Kafka Broker.
      keySerializerClassStringSerializerClass serialize Key của message (kiểu chuỗi).
      valueSerializerClassStringSerializerClass serialize Value của message (kiểu chuỗi).
      acksallBroker phải xác nhận ghi đủ trên mọi replica mới coi là thành công.
      requestTimeout10000Timeout cho mỗi request gửi lên Kafka (ms).
      maxBlock5000Thời gian tối đa chờ nếu buffer đầy hoặc Kafka chưa sẵn sàng (ms).

    Bước 4. Publish Message vào Kafka

    pageFile: (bắt đầu từ 0)


  • sizeKafkaProducerApi.xml

  •  

  • sort

  • filter


  • xml

    1.9.3.
    Chuẩn response List API


    <kafkaTransport.publishMessages>
     <topic>test_topic_01</topic>
    </kafkaTransport.publishMessages>
    { "code": "SUCCESS", "data": { "content": [], "page": 0, "size": 20, "totalElements": 100, "totalPages": 5 } }

    1.9.4.

    Giải Quy định bắt buộc

    thích:

    • topic: 

      Khôngtest_topic_01 trả— Topic nhận dữ liệu đầu vào.

    • Nội dung message là toàn bộ bảng

      payload nhận từ Client.
    • Pagination bắt buộc với list lớn

    • Repository phải dùng paging query

    • Không load toànkey, bộpartition relation

      cụ thể → Kafka tự điều phối vào các partition.

    1.10.Bước Gọi5. hệTrả thốngResponse ngoàiCho trongClient API(Producer)

    1.10.1. Phạm vi

    API có thể cần gọi:

    • SSO

    • External API

    • Message Queue

    • Cache

    • File Storage


    xml

    1.10.2.
    Kiến trúc gọi ngoài

    Luồng chuẩn:

    Service → Integration Client → External System

    Ví dụ:


    <payloadFactory media-type="json">
     <format>{"success": "true", "message": "Hồ sơ đã được gửi thành công"}</format>
    </payloadFactory>
    <respond/>
    UserInfo info = ssoClient.getUser(token);

    1.10.3.

    Sau Quykhi định

    publish xong, API trả ngay về Client mà không chờ xử lý phía Consumer.

    KhôngResponse đượcthành gọi external tại:công:

    • json

      Controller

    • Repository

    Phải:

    • gọi trong Service

    • handle lỗi external

    • timeout / retry nếu cần


    1.10.4. Xử lý lỗi external

    Nếu external fail:

    • map thành BusinessException hoặc SystemException

    • không expose lỗi raw

    • ghi log lỗi


    1.11. Monitoring & Metrics trong API

    1.11.1. Mục tiêu

    Ngoài Logging & Audit, API production cần hỗ trợ:

    • theo dõi hiệu năng

    • theo dõi lỗi

    • theo dõi tải hệ thống


    1.11.2. Metrics cần có

    • request count

    • error rate

    • response time

    • DB query time

    • external call time


    1.11.3. Trace request

    Mỗi request phải có:

    • traceId

    • duration

    • status

    TraceId phải liên kết:

    • log

    • audit

    • exception


    1.12. Versioning API

    1.12.1. Chuẩn version

    API phải có version trong URL:


    {
    "success": "true",
    "message": "Hồ sơ đã được gửi thành công"
    }
    /api/v1/...

    1.12.2.

    Response Nguyênthất tắcbại version

    (faultSequence):

    • Không thay đổi breaking trong cùng version

    • API cũ phải giữ backward compatibility

    • API mới → tăng version

    • Không reuse version


    json

    1.12.3.
    Ví dụ


    {
    "status": "Failed",
    "error": "<error_message>"
    }
    /api/v1/users
    /api/v2/users

    Bước 6. Inbound Endpoint Lắng Nghe Kafka

    File: 


    1.13. Testing trước merge

    1.13.1. Mục tiêu

    Load_balance_example.xml

    Đảm bảo

    API

    Đây hoạtlà thành phần chạy ngầm liên tục, tự động đúngpoll message từ Kafka.

    xml
    <inboundEndpoint
    name="Load_balance_example"
    class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer"
    sequence="Load_balance_example-inboundSequence"
    onError="Load_balance_example-inboundErrorSequence">

    Toàn bộ tham số cấu hình:

    Tham sốGiá trịMô tả
    interval100Khoảng cách giữa 2 lần poll (ms).
    sequentialfalsefalse = đa luồng, tận dụng tối đa CPU.
    coordinationtrueTránh consume trùng khi chạy cluster nhiều node.
    suspendfalsefalse = khởi động ngay, không dừng.
    bootstrap.servershdp-master:9092Địa chỉ Kafka Broker kết nối đến.
    topic.nametest_topic_01Topic mà Inbound Endpoint theo dõi.
    group.idgroup1Tên Consumer Group. Nhiều node cùng group sẽ chia partition.
    contentTypeapplication/jsonĐịnh dạng nội dung message nhận về.
    poll.timeout5000Thời gian chờ khi không có dữ liệu (ms).
    key.deserializerStringDeserializerClass giải mã Key của message.
    value.deserializerStringDeserializerClass giải mã Value của message.
    avro.use.logical.type.convertersfalseKhông dùng Avro logical type.
    enable.auto.committrueTự commit offset sau khi đọc xong.
    auto.commit.interval.ms5000Chu kỳ tự commit offset (ms).
    auto.offset.resetlatestBắt đầu đọc từ message mới nhất nếu không có offset cũ.
    exclude.internal.topicstrueBỏ qua các topic nội bộ của Kafka.
    check.crcstrueKiểm tra tính toàn vẹn dữ liệu (CRC).
    partition.assignment.strategyRangeAssignorChiến lược phân chia partition cho consumer.
    max.poll.interval.ms300000Thời gian tối đa xử lý một đợt poll (5 phút).
    max.poll.records500Số message tối đa lấy về mỗi đợt poll.
    fetch.max.wait.ms500Thời gian chờ tối đa fetch từ broker nếu chưa đủ data (ms).
    receive.buffer.bytes65536Kích thước buffer nhận dữ liệu TCP (64KB).
    send.buffer.bytes131072Kích thước buffer gửi dữ liệu TCP (128KB).
    request.timeout.ms305000Timeout cho mỗi request gửi đến Broker (ms).
    reconnect.backoff.ms50Thời gian chờ trước khi mergekết nối deploy.lại sau lỗi (ms).
    retry.backoff.ms100Thời gian chờ trước khi thử lại request thất bại (ms).
    connections.max.idle.ms540000Đóng kết nối nếu idle quá thời gian này (9 phút).
    security.protocolPLAINTEXTGiao thức bảo mật (tắt mã hóa).
    metrics.num.samples2Số mẫu dùng để tính metrics.
    metrics.recording.levelINFOMức độ ghi metrics.
    metrics.sample.window.ms30000Cửa sổ thời gian lấy mẫu metrics (30 giây).

    Bước 7. Sequence Xử Lý Message Nhận Được

    File: 



    Load_balance_example-inboundSequence.xml

     

    7.1.13.2. LoạiLấy testPayload bắttừ buộcKafka

    • xml
    • <property name="PAYLOAD" expression="json-eval($)" />
      <log level="custom">
       <property name="STATUS" value="[KafkaConsumer] Message du lieu nhan duoc"/>
       <property name="TOPIC" value="test_topic_01"/>
       <property name="PAYLOAD" expression="get-property('PAYLOAD')"/>
      </log>

    UnitCác testproperty Service

    được
  • Validation test

  • Error case

  • Integration test API


  • 1.13.3. Quy định

    API mới hoặc thay đổi logic phải có test:tạo:

    • thành

      công

      business

      fail

    • permission

      fail

      trongloop

      Property Giá
    • trị

      validation/ fail

      Expression
    • tả
    • PAYLOAD json-eval($) Nội dung

      1.14. Hiệu năng & Database

      1.14.1. Nguyên tắc thiết kế DB trong API

      API phải đảm bảo:

      • query tối ưu

      • index phù hợp

      • không N+1 query

      • pagination cho list


      1.14.2. Các lỗi cần tránh

      • loadJSON toàn bộ relation

        của
      • message
      • từ

        queryKafka.

    • select * bảng lớn

    • thiếu index filter


    7.2. Gọi Backend API

    xml
    <header name="Authorization" scope="transport"
    value="Bearer eyJhbGciOiJIUzI1NiJ9..."/>
    <property name="Content-Type" value="application/json" scope="transport"/>

    <call>
     <endpoint>
     <address uri="http://192.168.0.133:8080/api/v1/plannings">
     <suspendOnFailure>
     <initialDuration>1000</initialDuration>
     <progressionFactor>1.14.3.0</progressionFactor>
    Kiểm
    tra
     <maximumDuration>60000</maximumDuration>
     </suspendOnFailure>
     </address>
     </endpoint>
    </call>

    Thông tin endpoint:

    TrườngGiá trịMô tả
    urihttp://192.168.0.133:8080/api/v1/planningsURL Backend API nhận dữ liệu.
    AuthorizationBearer <jwt_token>Token xác thực gửi kèm header.
    Content-Typeapplication/jsonĐịnh dạng body gửi đến Backend.
    initialDuration1000Chờ 1 giây trước khi reviewretry khi lỗi (ms).
    progressionFactor1.0Hệ số tăng thời gian retry (1.0 = không tăng).
    maximumDuration60000Thời gian chờ tối đa khi retry (60 giây).

    7.3. Lưu Response từ Backend

    xml
    <property name="API_RESPONSE" expression="json-eval($)" scope="default"/>
    <property name="HTTP_STATUS" expression="$axis2:HTTP_SC" scope="default"/>

    Các property lưu kết quả:

    PropertyExpressionMô tả
    API_RESPONSEjson-eval($)Toàn bộ JSON response từ Backend API.
    HTTP_STATUS$axis2:HTTP_SCMã HTTP status code (200, 500...) của Backend.
    RESPONSE_SIZEfn:string-length(...)Độ dài chuỗi JSON của response (dùng để log).

    7.4. Đóng Gói Lại Dữ Liệu (payloadFactory)

    xml
    <payloadFactory media-type="json" template-type="default">
     <format>{
     "eventType": "transaction",
     "source": "bank-transactions",
     "data": ${payload}
     }</format>
    </payloadFactory>

    Cấu trúc JSON đầu ra:

    TrườngGiá trịKiểuMô tả
    eventTypetransactionString (cố định)Phân loại sự kiện.
    sourcebank-transactionsString (cố định)Nguồn gốc dữ liệu.
    data${payload}Object (JSON động)Toàn bộ message gốc nhận từ Kafka.

    7.5. Publish Kết Quả vào Topic Phản Hồi

    xml
    <kafkaTransport.publishMessages configKey="KafkaConnection">
     <topic>processed_topic</topic>
     <partitionNo>0</partitionNo>
     <forwardExistingHeaders>None</forwardExistingHeaders>
     <customHeaders>[]</customHeaders>
    </kafkaTransport.publishMessages>

    Các trường publish:

    TrườngGiá trịMô tả
    configKeyKafkaConnectionTham chiếu đến Local Entry kết nối Kafka.
    topicprocessed_topicTopic nhận kết quả đã xử lý.
    partitionNo0Partition cố định (0 = partition đầu tiên).
    forwardExistingHeadersNoneKhông chuyển tiếp header gốc.
    customHeaders[]Không thêm header tuỳ chỉnh.

    Bước 8. Local Entry - Cấu Hình Kết Nối Kafka Tái Sử Dụng

    File: 


    KafkaConnection.xml

     

    xml
    <localEntry key="KafkaConnection">
     <kafkaTransport.init>
     <connectionType>KAFKA</connectionType>
     <bootstrapServers>hdp-master:9092</bootstrapServers>
     <keySerializerClass>...StringSerializer</keySerializerClass>
     <valueSerializerClass>...StringSerializer</valueSerializerClass>
     <poolingEnabled>false</poolingEnabled>
     <name>KafkaConnection</name>
     </kafkaTransport.init>
    </localEntry>

    Giải thích:

    TrườngGiá trịMô tả
    keyKafkaConnectionTên Local Entry dùng để gọi qua configKey.
    connectionTypeKAFKALoại kết nối.
    bootstrapServershdp-master:9092Kafka Broker.
    poolingEnabledfalseKhông dùng connection pool.

    Bước 9. Xử Lý Lỗi (Error Sequence)

    File: 


    Load_balance_example-inboundErrorSequence.xml

     

    xml
    <sequence name="Load_balance_example-inboundErrorSequence">
     <log category="INFO" logMessageID="false" logFullPayload="false">
     <message>Lỗi khi lắng nghe lấy dữ liệu từ kafka</message>
     </log>
    </sequence>

    Khi reviewInbound APIEndpoint cầnbị kiểmlỗi tra:(không parse được message, mất kết nối...), WSO2 MI sẽ kích hoạt sequence lỗi này.

    ⚠️ Ghi chú: Hiện tại sequence lỗi chỉ log thông báo. Cần bổ sung logic retry hoặc dead-letter queue cho production.


    1.5. Luồng Phát Triển & Deploy


    Dev Code XML → Build CAR → Deploy lên WSO2 MI

    Bước 10. Build Project

    bash
    mvn clean install

    Output: File Loadbalance_kafka_1.0.0.car trong thư mục target/.

    Bước 11. Deploy lên WSO2 MI Server

    Tự động qua Maven:

    • bash
    • mvn deploy

    queryHoặc copy indexfile không.car vào thư mục <WSO2MI_HOME>/repository/deployment/server/carbonapps/.

    Cấu

  • hình server deploy (trong 


    list có paging khôngpom.xml):

  •  

  • TrườngGiá trịMô tả
    serverUrlhttp://192.168.0.167:9201địa chỉ WSO2 MI Management API.
    userNameadminvpTài khoản admin MI.
    serverTypemiLoại server WSO2.
    operationdeployHành động thực hiện.

    join📸 [Ảnh cầnminh thiếthọa] không— Ảnh chụp Deploy thành công trong WSO2 MI Dashboard


  • 1.6. N+1 không

    Đồ

  • Tổng Thể API Lifecycle

    Client POST Request
     ↓
    KafkaProducerApi (context: /kafka-producer)
     ↓
    kafkaTransport.init (KafkaConnection Local Entry)
     ↓
    Publish → Kafka: test_topic_01
     ↓ ↓ (response lại Client ngay)
     ↓ {"success": "true", ...}
     ↓
    Inbound Endpoint (poll interval: 100ms)
     ↓
    inboundSequence
     ↓
    Call Backend: POST /api/v1/plannings
     ↓
    payloadFactory: {eventType, source, data}
     ↓
    Publish → Kafka: processed_topic
     ↓ (error path)
    inboundErrorSequence → Log lỗi

    1.7. Phụ Lục: Cấu Hình Build (


    pom.xml)

    Tham sốGiá trịMô tả
    artifactIdLoadbalance_kafkaTên artifact Maven.
    groupIdcom.microintegrator.projectsGroup package.
    version1.0.0Phiên bản hiện tại.
    project.runtime.version4.4.0Phiên bản WSO2 MI.
    dockerfile.base.imagewso2/wso2mi:4.4.0Docker base image.
    dockerfile.nameloadbalance_kafka:1.0.0Tên Docker image output.
    mi-inbound-kafka2.0.6Phiên bản connector nhận message từ Kafka.
    mi-connector-kafka3.3.10Phiên bản connector gửi message lên Kafka.
    mi-connector-http0.1.14Phiên bản connector gọi HTTP.