Cải thiện xử lý dữ liệu với Spark 3.0 & Delta Lake

Nút nguồn: 1013539

Thu thập, xử lý và thực hiện phân tích trên truyền dữ liệu, trong các ngành như công nghệ quảng cáo liên quan đến kỹ thuật dữ liệu cường độ cao. Dữ liệu được tạo ra hàng ngày là rất lớn (100 GB dữ liệu) và cần thời gian xử lý đáng kể để xử lý dữ liệu cho các bước tiếp theo.

Một thách thức khác là việc kết hợp các tập dữ liệu để có được thông tin chi tiết. Mỗi quá trình trung bình có hơn 10 bộ dữ liệu và số lượng tham gia bằng nhau với nhiều khóa. Kích thước phân vùng cho mỗi khóa là không thể đoán trước trong mỗi lần chạy.

Và, cuối cùng, nếu số lượng dữ liệu vượt quá trong một số trường hợp nhất định, bộ nhớ có thể hết. Điều này có nghĩa là quá trình sẽ chết ở giữa các lần ghi cuối cùng, khiến người tiêu dùng đọc rõ ràng các khung dữ liệu đầu vào.

Trong blog này, chúng tôi sẽ giới thiệu tổng quan về hồ châu thổ, lợi thế của nó và cách những thách thức trên có thể được khắc phục bằng cách chuyển sang Delta Lake và chuyển sang Spark 3.0 từ Spark 2.4. 

Hồ Delta là gì?

Được phát triển tại Databricks, “Delta Lake là một lớp lưu trữ dữ liệu nguồn mở chạy trên Data Lake hiện có và hoàn toàn hợp tác với Apache Spark API. Cùng với khả năng thực hiện các giao dịch ACID và xử lý siêu dữ liệu có thể mở rộng, Delta Lakes cũng có thể thống nhất việc phân luồng và xử lý dữ liệu hàng loạt ”. 

Delta Lake sử dụng các tệp Parquet đã được tạo phiên bản để lưu trữ dữ liệu trên đám mây. Khi vị trí đám mây được định cấu hình, Delta Lake theo dõi tất cả các thay đổi được thực hiện đối với bảng hoặc thư mục cửa hàng blob để cung cấp các giao dịch ACID. 

Ưu điểm của việc sử dụng Delta Lakes 

Delta lake cho phép hàng nghìn dữ liệu chạy song song, giải quyết các thách thức về tối ưu hóa và phân vùng, hoạt động siêu dữ liệu nhanh hơn, duy trì nhật ký giao dịch và liên tục cập nhật dữ liệu. Dưới đây chúng tôi thảo luận về một số lợi thế chính: 

Nhật ký giao dịch của Delta Lake

Nhật ký giao dịch của Delta Lake là một tệp chỉ dành cho phần phụ và chứa một bản ghi có thứ tự của tất cả các giao dịch được thực hiện trên bảng Delta Lake. Nhật ký giao dịch cho phép nhiều người dùng khác nhau đọc và ghi song song vào bảng đã cho. Nó hoạt động như một nguồn chân lý duy nhất hoặc kho lưu trữ trung tâm ghi lại tất cả các thay đổi do người dùng thực hiện đối với bảng. Nó duy trì tính nguyên tử và liên tục theo dõi các giao dịch được thực hiện trên Delta Lake.

Như đã đề cập ở trên, Spark kiểm tra nhật ký delta cho bất kỳ giao dịch mới nào, sau đó Delta Lake đảm bảo rằng phiên bản của người dùng luôn đồng bộ với bản ghi chính. Nó cũng đảm bảo rằng không có thay đổi mâu thuẫn nào đang được thực hiện đối với bảng. Nếu quá trình bị lỗi trước khi cập nhật nhật ký delta, các tệp sẽ không có sẵn cho bất kỳ quá trình đọc nào vì quá trình đọc luôn đi qua nhật ký giao dịch.

Nhật ký giao dịch đang hoạt động và các cam kết nguyên tử

Hồ Delta có một trạm kiểm soát cứ sau mười lần cam kết. Tệp điểm kiểm tra chứa trạng thái hiện tại của dữ liệu ở định dạng Parquet có thể được đọc nhanh chóng. Khi nhiều người dùng cố gắng sửa đổi bảng cùng một lúc, Delta Lake sẽ giải quyết các xung đột bằng cách sử dụng điều khiển đồng thời lạc quan.

Lược đồ của siêu dữ liệu như sau: 

CộtKiểuMô tả
định dạngchuỗiĐịnh dạng của bảng, đó là, "delta".
idchuỗiID duy nhất của bảng
tênchuỗiTên của bảng như được xác định trong vùng di căn
Mô tảchuỗiMô tả của bảng.
địa điểm thư viện nàochuỗiVị trí của bàn
đã tạodấu thời gianKhi bảng được tạo
Sửa đổi lần cuốidấu thời gianKhi bảng được sửa đổi lần cuối
phân vùngmảng chuỗiTên của các cột phân vùng nếu bảng được phân vùng
số tập tinDàiSố lượng tệp trong phiên bản mới nhất của bảng
tài sảnBản đồ chuỗi chuỗiTất cả các thuộc tính được đặt cho bảng này
phiên bản minReaderintPhiên bản tối thiểu của trình đọc (theo giao thức nhật ký) có thể đọc bảng.
phiên bản minWriterintPhiên bản tối thiểu của trình đọc (theo giao thức nhật ký) có thể ghi vào bảng.
nguồn: GitHub

Thêm và Xóa tệp

Bất cứ khi nào một tệp được thêm vào hoặc một tệp hiện có bị xóa, các hành động này sẽ được ghi lại. Đường dẫn tệp là duy nhất và được coi là khóa chính cho tập hợp các tệp bên trong nó. Khi tệp mới được thêm vào đường dẫn đã có trong bảng, số liệu thống kê và siêu dữ liệu khác trên đường dẫn được cập nhật từ phiên bản trước. Tương tự, hành động xóa được biểu thị bằng dấu thời gian. Hành động xóa vẫn còn trong bảng dưới dạng bia mộ cho đến khi hành động đó hết hạn. Bia mộ sẽ hết hạn khi vượt quá TTL (Time-To-Live).

Vì các hành động trong một tệp Delta nhất định không được đảm bảo sẽ được áp dụng theo thứ tự, nó không hợp lệ để nhiều hoạt động tệp có cùng đường dẫn tồn tại trong một phiên bản.

Cờ dataChange trên 'add' hoặc 'remove' có thể được đặt thành false để giảm thiểu xung đột hoạt động đồng thời.

Lược đồ của hành động thêm như sau:

Tên trườngLoại dữ liệuMô tả
con đườngChuỗiMột đường dẫn tương đối, từ thư mục gốc của bảng, đến tệp cần được thêm vào bảng
phân vùngBản đồ [Chuỗi, Chuỗi]Bản đồ từ cột phân vùng đến giá trị cho tệp này. 
kích thướcdàiKích thước của tệp này tính bằng byte
thời gian sửa đổidàiThời gian tệp này được tạo, tính bằng mili giây kể từ kỷ nguyên
dữ liệuThay đổiBooleanKhi sai, tệp đó phải đã có trong bảng hoặc các bản ghi trong tệp được thêm vào phải được chứa trong một hoặc nhiều hành động xóa trong cùng một phiên bản
số liệu thống kêCấu trúc thống kêChứa thống kê (ví dụ: đếm, giá trị tối thiểu / tối đa cho các cột) về dữ liệu trong tệp này
thẻBản đồ [Chuỗi, Chuỗi]Bản đồ chứa siêu dữ liệu về tệp này

Lược đồ của hành động loại bỏ như sau:

Tên trườngNgày KiểuMô tả
con đườngchuỗiĐường dẫn tuyệt đối hoặc tương đối đến tệp cần được xóa khỏi bảng
xóaDàiThời gian xảy ra quá trình xóa, được biểu thị bằng mili giây kể từ kỷ nguyên
dữ liệuThay đổiBooleanKhi sai, các bản ghi trong tệp đã xóa phải được chứa trong một hoặc nhiều hành động thêm tệp trong cùng một phiên bản
ExtendedFileMetadataBooleanKhi đúng, các trường phân vùng Giá trị, kích thước và thẻ hiển thị
phân vùngBản đồ [Chuỗi, Chuỗi]Bản đồ từ cột phân vùng đến giá trị cho tệp này. Xem thêm Serialization giá trị phân vùng
kích thướcdàiKích thước của tệp này tính bằng byte
thẻBản đồ [Chuỗi, Chuỗi]Bản đồ chứa siêu dữ liệu về tệp này
nguồn: GitHub

Lược đồ của siêu dữ liệu chứa đường dẫn tệp trên mỗi hành động thêm / xóa và quá trình đọc Spark không cần phải quét toàn bộ để có được danh sách tệp.

Nếu ghi không thành công mà không cập nhật nhật ký giao dịch, vì quá trình đọc của người tiêu dùng sẽ luôn đi qua siêu dữ liệu, các tệp đó sẽ bị bỏ qua. 

Ưu điểm của việc chuyển sang Spark 3.0

Ngoài việc tận dụng những lợi ích của Delta Lake, việc chuyển sang Spark 3.0 đã cải thiện quá trình xử lý dữ liệu theo những cách sau:

Tối ưu hóa tham gia bị lệch

Độ lệch dữ liệu là tình trạng dữ liệu của bảng được phân phối không đồng đều giữa các phân vùng trong cụm và có thể hạ cấp nghiêm trọng hiệu suất của các truy vấn, đặc biệt là những truy vấn có liên kết. Độ lệch có thể dẫn đến mất cân bằng cực độ trong cụm do đó làm tăng thời gian xử lý dữ liệu.

Điều kiện sai lệch dữ liệu có thể được xử lý chủ yếu bằng ba cách tiếp cận.

  1. Sử dụng cấu hình “spark.sql.shuffle.partitions” để tăng tính song song trên dữ liệu được phân phối đồng đều hơn.
  2. Tăng ngưỡng tham gia băm phát sóng bằng cách sử dụng cấu hình spark.sql.autoBroadcastJoinThreshold lên kích thước tối đa tính bằng byte cho bảng phải được phát sóng cho tất cả các nút công nhân trong khi thực hiện kết hợp.
  3. Key Salting (Thêm tiền tố vào các khóa lệch để làm cho cùng một khóa khác nhau và sau đó điều chỉnh phân phối dữ liệu).

Spark 3.0 đã thêm một tính năng tối ưu hóa để tự động xử lý kết hợp xiên dựa trên thống kê thời gian chạy với khung thực thi thích ứng mới.

Tình trạng phân vùng bị xiên

Thách thức về phân vùng lệch tồn tại trong phiên bản trước của Spark 2.4 đã ảnh hưởng rất lớn đến thời gian mạng và thời gian thực hiện một tác vụ cụ thể. Hơn nữa, các phương pháp để giải quyết nó chủ yếu là thủ công. Spark 3.0 vượt qua những thách thức này.

Phân vùng lệch sẽ có tác động đến lưu lượng mạng và thời gian thực hiện tác vụ, vì tác vụ cụ thể này sẽ có nhiều dữ liệu hơn để xử lý. Bạn cũng cần biết điều này ảnh hưởng đến an ninh mạng như thế nào, vì lưu lượng truy cập mạng là thứ mà tin tặc lợi dụng.

Phân vùng kết hợp lệch được tính bằng kích thước dữ liệu và số lượng hàng từ thống kê bản đồ thời gian chạy.

Tối ưu hóa

Được điều chỉnh từ: Apache Spark Jira

Từ bảng trên, Chiến dịch khung dữ liệu tham gia với Tổ chức khung dữ liệu. Một trong những phân vùng (Phân vùng 0) từ Tổ chức là lớn và lệch. Phân vùng 0 là kết quả của 9 bản đồ từ giai đoạn trước (Bản đồ-0 đến Bản đồ-8). Quy tắc OptimizeSkewedJoin của Spark sẽ chia Phân vùng thành 3 và sau đó tạo 3 nhiệm vụ riêng biệt, mỗi tác vụ là một phân vùng riêng từ Phân vùng 0 (Bản đồ-0 đến Bản đồ-2, Bản đồ-3 đến Bản đồ-5 và Bản đồ-6 đến Bản đồ-9) và tham gia với Phân vùng Chiến dịch 0. Cách tiếp cận này dẫn đến chi phí bổ sung bằng cách đọc Phân vùng 0 của bảng Chiến dịch bằng số lượng phân vùng một phần từ bảng Tổ chức.

Kết quả cuối cùng

Bằng cách sử dụng Delta Lake và Spark 3.0, chúng tôi đã kích hoạt các kết quả sau cho công ty công nghệ quảng cáo:

  • Thời gian xử lý dữ liệu giảm từ 15 giờ xuống còn 5-6 giờ
  • Giảm 50% chi phí AWS EMR
  • Ngăn ngừa mất dữ liệu và chết các quy trình thường xuyên xảy ra khi hệ thống hết bộ nhớ hoặc quá trình xử lý bị dừng do trục trặc trong hệ thống
  • Các tính năng Giám sát & Cảnh báo đã được cài đặt để thông báo trong trường hợp quy trình không thành công
  • Hoàn thành điều phối bằng cách sử dụng Luồng không khí để đạt được tự động hóa hoàn toàn và quản lý sự phụ thuộc giữa các quy trình
Nguồn: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Dấu thời gian:

Thêm từ Tập thể SmartData