CHƯƠNG III: CƠ CHẾ ĐỒNG BỘ DỮ LIỆU TỰ ĐỘNG

Xử lý đồng bộ dữ liệu lớn

TRANG 1: Sơ đồ luồng hoạt động tổng quát (Sequence Diagram)

Hệ thống đồng bộ của LGSP không chạy theo lịch cứng mà chạy theo cơ chế "Quét và Đánh giá" (Scan & Evaluate) dựa trên cấu hình linh hoạt trong Database.

image.png

Sau khi Task kích hoạt, hệ thống sẽ đi qua một chuỗi các Sequence có nhiệm vụ chuyên biệt để đảm bảo tính an toàn và linh hoạt.

1.1. Bước 1: Quản lý Luồng (ManagerEvaluateTasksSequence)

Đây là sequence "cửa ngõ". Nhiệm vụ chính:

1.2. Bước 2: Đánh giá Cron (EvaluateSingleTaskSequence)

Với mỗi Task được tách ra, bộ Evaluator sẽ:

1.3. Bước 3: Bộ định tuyến động (RunTaskSequence)

Sequence này đóng vai trò là "Cầu giao điện" tổng. Nó thực hiện:

sequence_lgsp.jpg

 

1.4. Cơ chế nạp dữ liệu động (Dynamic Batch Insert)

Trong dự án LGSP, thay vì viết hàng trăm câu lệnh INSERT cho hàng trăm loại danh mục khác nhau, chúng ta sử dụng cơ chế Dynamic SQL thông qua 2 Stored Procedure phối hợp. Cơ chế này cho phép hệ thống tự động nhận diện cấu trúc tệp JSON trả về từ đối tác và nạp vào bảng tương ứng trong Database.

1.4.1. Sơ đồ luồng xử lý tại Database

Biểu đồ không có tiêu đề (2).jpg

1.4.2. Phân tích Procedure bộ điều phối: sp_batch_insert_data

Nhiệm vụ chính: Phân rã mảng dữ liệu (Array Parsing).

1.4.3. Phân tích Procedure thực thi động: sp_insert_generic_data

Đây là chìa khóa của sự linh hoạt. Nó thực hiện Tự động nhận diện cấu trúc bảng (Table Discovery).

1.4.4. Các bảng cấu hình phụ thuộc

Để 2 Procedure này hoạt động, bạn cần cấu hình bảng mapping

CREATE TABLE endpoint_mappings (
    id INT AUTO_INCREMENT PRIMARY KEY,
    endpoint_path VARCHAR(200) NOT NULL, -- Đường dẫn API (Vd: /api/v1/don-vi)
    table_name VARCHAR(100) NOT NULL,    -- Tên bảng trong Database
    status TINYINT DEFAULT 1             -- 1: Kích hoạt
);

TRANG 2: Bộ máy điều phối Task (Orchestrator)

2.1. MultiEndpointSyncTask (Điểm khởi đầu)

Nằm tại: src/main/wso2mi/artifacts/tasks/MultiEndpointSyncTask.xml.

2.2. ManagerEvaluateTasksSequence (Bộ quản lý)

Đây là "bộ não" điều phối toàn bộ quá trình:

  1. Kiểm tra Khóa (sync_lock): Hệ thống gọi syncLockDataService/getLock. Nếu một luồng đồng bộ khác đang chạy, luồng mới sẽ tự động hủy để tránh tình trạng "Race Condition" (nhiều tiến trình cùng ghi vào một bảng dữ liệu).
  2. Lấy cấu hình Task: Truy vấn lên tới 200 Task đang ở trạng thái kích hoạt (status='1') từ bảng job_schedule.
  3. Lặp (Iterate): Sử dụng thẻ <iterate> của WSO2 để chia nhỏ danh sách Task cho các luồng xử lý song song, tối ưu hóa hiệu suất CPU.

TRANG 3: Phân tích Logic Đánh giá Cron Expression (JS Engine)

3.1. Tại sao cần JavaScript?

WSO2 MI mặc định không có hàm so sánh thời gian với biểu thức Cron phức tạp (như 0 0/15 * * * ?). Do đó, dự án sử dụng JavaScript (Nashorn Engine) để gọi trực tiếp các thư viện Java của Quarkz.

3.2. Chi tiết mã nguồn Evaluator

Tại EvaluateSingleTaskSequence.xml, hệ thống thực hiện các bước:

  1. Lấy cron_expression và last_run (chuỗi string) từ Database.
  2. Chuyển đổi last_run sang đối tượng Date của Java.
  3. Sử dụng class org.quartz.CronExpression để tính toán thời điểm chạy tiếp theo (nextRun).
  4. So sánh: Nếu nextRun nhỏ hơn hoặc bằng thời gian hiện tại (now), hệ thống xác định Task này đã đến hạn chạy (shouldRun = true).

TRANG 4: Thực thi và Cập nhật trạng thái (Runner)

4.1. RunTaskSequence (Bộ thực thi)

Ngay khi nhận được tín hiệu shouldRun = true, Runner sẽ thực hiện:

4.2. Hoàn tất và Giải phóng

Sau khi nghiệp vụ đồng bộ hoàn thành (thành công hoặc thất bại):