ปรับปรุงการประมวลผลข้อมูลด้วย Spark 3.0 & Delta Lake

โหนดต้นทาง: 1013539

รวบรวม ประมวลผล และดำเนินการวิเคราะห์เกี่ยวกับ สตรีมข้อมูลในอุตสาหกรรมเช่น ad-tech เกี่ยวข้องกับวิศวกรรมข้อมูลที่รุนแรง ข้อมูลที่สร้างขึ้นทุกวันมีขนาดใหญ่มาก (ข้อมูล 100s GB) และต้องใช้เวลาประมวลผลที่สำคัญในการประมวลผลข้อมูลสำหรับขั้นตอนต่อๆ ไป

ความท้าทายอีกประการหนึ่งคือการรวมชุดข้อมูลเพื่อให้ได้ข้อมูลเชิงลึก โดยเฉลี่ยแต่ละกระบวนการมีชุดข้อมูลมากกว่า 10 ชุดและมีจำนวนการเข้าร่วมที่เท่ากันด้วยหลายคีย์ ขนาดพาร์ติชั่นสำหรับแต่ละคีย์ไม่สามารถคาดเดาได้ในการรันแต่ละครั้ง

และสุดท้าย หากปริมาณข้อมูลเกินในบางโอกาส หน่วยความจำอาจไม่เพียงพอ ซึ่งหมายความว่ากระบวนการจะตายในช่วงกลางของการเขียนขั้นสุดท้าย ทำให้ผู้บริโภคอ่านเฟรมข้อมูลอินพุตอย่างชัดเจน

ในบล็อกนี้ เราจะพูดถึงภาพรวมของ เดลต้าเลคส์ข้อดี และวิธีเอาชนะความท้าทายข้างต้นด้วยการย้ายไปยัง Delta Lake และย้ายไปยัง Spark 3.0 จาก Spark 2.4 

ทะเลสาบเดลต้าคืออะไร?

พัฒนาขึ้นที่ Databricks “Delta Lake เป็นเลเยอร์การจัดเก็บข้อมูลโอเพนซอร์สที่ทำงานบน Data Lake ที่มีอยู่และให้ความร่วมมืออย่างเต็มที่กับ Apache Spark API นอกเหนือจากความสามารถในการใช้ธุรกรรม ACID และการจัดการข้อมูลเมตาที่ปรับขนาดได้ Delta Lakes ยังสามารถรวมการสตรีมและการประมวลผลข้อมูลเป็นชุดได้” 

Delta Lake ใช้ไฟล์ Parquet ที่มีเวอร์ชันเพื่อจัดเก็บข้อมูลในระบบคลาวด์ เมื่อกำหนดค่าตำแหน่งบนคลาวด์แล้ว Delta Lake จะติดตามการเปลี่ยนแปลงทั้งหมดที่ทำกับไดเร็กทอรี table หรือ blob store เพื่อจัดเตรียมธุรกรรม ACID 

ข้อดีของการใช้ Delta Lakes 

เดลต้าเลคช่วยให้ข้อมูลนับพันทำงานพร้อมกัน จัดการกับความท้าทายด้านการปรับให้เหมาะสมและการแบ่งพาร์ติชั่น การดำเนินการข้อมูลเมตาที่เร็วขึ้น รักษาบันทึกการทำธุรกรรม และคอยอัปเดตข้อมูลอย่างต่อเนื่อง ด้านล่างเราจะพูดถึงข้อดีที่สำคัญบางประการ: 

บันทึกธุรกรรมของ Delta Lake

บันทึกธุรกรรมของ Delta Lake เป็นไฟล์ต่อท้ายเท่านั้นและมีบันทึกที่สั่งซื้อของธุรกรรมทั้งหมดที่ดำเนินการในตาราง Delta Lake บันทึกธุรกรรมอนุญาตให้ผู้ใช้หลายคนอ่านและเขียนไปยังตารางที่กำหนดในแบบคู่ขนาน มันทำหน้าที่เป็นแหล่งความจริงเพียงแหล่งเดียวหรือแหล่งเก็บข้อมูลกลางที่บันทึกการเปลี่ยนแปลงทั้งหมดที่ทำกับตารางโดยผู้ใช้ มันรักษาความเป็นอะตอมมิกและดูธุรกรรมที่ทำในเดลต้าเลคอย่างต่อเนื่อง

ดังที่กล่าวไว้ข้างต้น Spark จะตรวจสอบบันทึกของเดลต้าสำหรับธุรกรรมใหม่ใดๆ ก็ตาม ซึ่ง Delta Lake จะทำให้แน่ใจว่าเวอร์ชันของผู้ใช้จะซิงค์กับมาสเตอร์เรคคอร์ดเสมอ นอกจากนี้ยังช่วยให้แน่ใจว่าไม่มีการเปลี่ยนแปลงที่ขัดแย้งกันเกิดขึ้นกับตาราง หากกระบวนการหยุดทำงานก่อนที่จะอัปเดตบันทึกเดลต้า ไฟล์จะไม่พร้อมใช้งานสำหรับกระบวนการอ่านใดๆ เนื่องจากการอ่านจะผ่านบันทึกธุรกรรมเสมอ

การทำงานของบันทึกธุรกรรมและ Atomic Commits

เดลต้าเลคทำด่านทุก ๆ สิบครั้ง ไฟล์จุดตรวจสอบมีสถานะปัจจุบันของข้อมูลในรูปแบบปาร์เก้ซึ่งสามารถอ่านได้อย่างรวดเร็ว เมื่อผู้ใช้หลายคนพยายามแก้ไขตารางพร้อมกัน Delta Lake จะแก้ไขข้อขัดแย้งโดยใช้การควบคุมการทำงานพร้อมกันในแง่ดี

สคีมาของข้อมูลเมตามีดังนี้: 

คอลัมน์ชนิดภาพเขียนรายละเอียด
รูปเชือกรูปแบบของตาราง นั่นคือ “เดลต้า”
idเชือกรหัสเฉพาะของตาราง
ชื่อเชือกชื่อของตารางตามที่กำหนดไว้ใน metastore
ลักษณะเชือกคำอธิบายของตาราง
ที่ตั้งเชือกที่ตั้งของโต๊ะ
สร้างขึ้นที่การประทับเวลาเมื่อตารางถูกสร้างขึ้น
ปรับปรุงล่าสุดการประทับเวลาเมื่อตารางถูกแก้ไขล่าสุด
พาร์ทิชันคอลัมน์อาร์เรย์ของสตริงชื่อของคอลัมน์พาร์ติชั่นหากตารางถูกแบ่งพาร์ติชั่น
numFilesยาวจำนวนไฟล์ในเวอร์ชันล่าสุดของตาราง
คุณสมบัติแผนที่สตริงสตริงคุณสมบัติทั้งหมดที่กำหนดไว้สำหรับตารางนี้
minReaderVersionintเวอร์ชันขั้นต่ำของผู้อ่าน (ตามโปรโตคอลบันทึก) ที่สามารถอ่านตารางได้
minWriterเวอร์ชันintเวอร์ชันขั้นต่ำของผู้อ่าน (ตามโปรโตคอลบันทึก) ที่สามารถเขียนลงในตารางได้
ที่มา: GitHub

เพิ่มและลบไฟล์

เมื่อใดก็ตามที่มีการเพิ่มไฟล์หรือลบไฟล์ที่มีอยู่ การดำเนินการเหล่านี้จะถูกบันทึกไว้ เส้นทางของไฟล์ไม่ซ้ำกันและถือเป็นคีย์หลักสำหรับชุดของไฟล์ที่อยู่ภายใน เมื่อมีการเพิ่มไฟล์ใหม่บนพาธที่มีอยู่แล้วในตาราง สถิติและข้อมูลเมตาอื่นๆ บนพาธจะได้รับการอัปเดตจากเวอร์ชันก่อนหน้า ในทำนองเดียวกัน การดำเนินการลบจะถูกระบุด้วยการประทับเวลา การดำเนินการลบยังคงอยู่ในตารางเป็นหลุมฝังศพจนกว่าจะหมดอายุ หลุมฝังศพจะหมดอายุเมื่อ TTL (Time-To-Live) เกิน

เนื่องจากไม่รับประกันว่าการดำเนินการภายในไฟล์เดลต้าที่กำหนดจะถูกนำมาใช้ตามลำดับ จึงไม่ถูกต้องสำหรับการดำเนินการไฟล์หลายไฟล์ที่มีเส้นทางเดียวกันในเวอร์ชันเดียว

แฟล็ก dataChange บน 'เพิ่ม' หรือ 'ลบ' สามารถตั้งค่าเป็น false เพื่อลดความขัดแย้งในการดำเนินการที่เกิดขึ้นพร้อมกัน

สคีมาของการดำเนินการเพิ่มมีดังนี้:

ชื่อฟิลด์ประเภทข้อมูลรายละเอียด
เส้นทางเชือกพาธสัมพัทธ์ จากรูทของตาราง ไปยังไฟล์ที่ควรเพิ่มลงใน table
ค่าพาร์ทิชันแผนที่[สตริง,สตริง]แผนที่จากคอลัมน์พาร์ทิชันกับค่าสำหรับไฟล์นี้ 
ขนาดนานขนาดของไฟล์นี้เป็นไบต์
แก้ไขเวลานานเวลาที่ไฟล์นี้ถูกสร้างขึ้น เป็นมิลลิวินาทีนับตั้งแต่ยุค
ข้อมูลการเปลี่ยนแปลงบูลีนเมื่อเป็นเท็จ ไฟล์จะต้องมีอยู่แล้วในตารางหรือบันทึกในไฟล์ที่เพิ่มจะต้องมีการดำเนินการลบอย่างน้อยหนึ่งรายการในเวอร์ชันเดียวกัน
สถิติโครงสร้างสถิติมีสถิติ (เช่น นับ ค่าต่ำสุด/สูงสุดสำหรับคอลัมน์) เกี่ยวกับข้อมูลในไฟล์นี้
แท็กแผนที่[สตริง,สตริง]แผนที่ที่มีข้อมูลเมตาเกี่ยวกับไฟล์นี้

สคีมาของการดำเนินการลบมีดังนี้:

ชื่อฟิลด์ข้อมูล ชนิดภาพเขียนรายละเอียด
เส้นทางเชือกเส้นทางที่แน่นอนหรือสัมพันธ์กับไฟล์ที่ควรถูกลบออกจาก table
การลบการประทับเวลายาวเวลาที่เกิดการลบ ซึ่งแสดงเป็นมิลลิวินาทีนับตั้งแต่ยุค
ข้อมูลการเปลี่ยนแปลงบูลีนเมื่อเป็นเท็จ บันทึกในไฟล์ที่ถูกลบจะต้องมีการดำเนินการเพิ่มไฟล์อย่างน้อยหนึ่งรายการในเวอร์ชันเดียวกัน
ExtendedFileMetadataบูลีนเมื่อเป็นจริง ฟิลด์ partitionValues, size และ tags จะปรากฏขึ้น
ค่าพาร์ทิชันแผนที่[สตริง สตริง]แผนที่จากคอลัมน์พาร์ทิชันกับค่าสำหรับไฟล์นี้ ดูเพิ่มเติมที่ Partition Value Serialization
ขนาดนานขนาดของไฟล์นี้เป็นไบต์
แท็กแผนที่[สตริง สตริง]แผนที่ที่มีข้อมูลเมตาเกี่ยวกับไฟล์นี้
ที่มา: GitHub

สคีมาของข้อมูลเมตามีเส้นทางของไฟล์ในการดำเนินการเพิ่ม/ลบแต่ละรายการ และกระบวนการอ่าน Spark ไม่จำเป็นต้องทำการสแกนแบบเต็มเพื่อรับรายการไฟล์

หากการเขียนล้มเหลวโดยไม่ได้อัปเดตบันทึกธุรกรรม เนื่องจากการอ่านของผู้ใช้บริการจะผ่านเมตาดาต้าเสมอ ไฟล์เหล่านั้นจะถูกละเว้น 

ข้อดีของการย้ายไปยัง Spark 3.0

นอกเหนือจากการใช้ประโยชน์จากเดลต้าเลคแล้ว การโยกย้ายไปยัง Spark 3.0 ได้ปรับปรุงการประมวลผลข้อมูลด้วยวิธีต่อไปนี้:

การเพิ่มประสิทธิภาพการเข้าร่วมแบบเบ้

ความเอียงของข้อมูลเป็นเงื่อนไขที่ข้อมูลของตารางมีการกระจายอย่างไม่สม่ำเสมอระหว่างพาร์ติชันต่างๆ ในคลัสเตอร์ และสามารถลดระดับประสิทธิภาพของการสืบค้นลงได้อย่างมาก โดยเฉพาะอย่างยิ่งผู้ที่มีการรวม ความเบ้สามารถนำไปสู่ความไม่สมดุลอย่างมากในคลัสเตอร์ ซึ่งจะเป็นการเพิ่มเวลาในการประมวลผลข้อมูล

เงื่อนไขเบ้ของข้อมูลสามารถจัดการได้โดยหลักสามวิธี

  1. การใช้การกำหนดค่า "spark.sql.shuffle.partitions" เพื่อเพิ่มความเท่าเทียมในข้อมูลที่กระจายอย่างเท่าเทียมกัน
  2. การเพิ่มขีดจำกัดการออกอากาศแฮชโดยใช้การกำหนดค่า spark.sql.autoBroadcastJoinThreshold เป็นขนาดสูงสุดเป็นไบต์สำหรับตารางที่ต้องออกอากาศไปยังโหนดของผู้ปฏิบัติงานทั้งหมดในระหว่างการดำเนินการเข้าร่วม
  3. คีย์ซอลท์ (เพิ่มคำนำหน้าให้กับคีย์เบ้เพื่อทำให้คีย์เดียวกันแตกต่างออกไป แล้วปรับการกระจายข้อมูล)

Spark 3.0 ได้เพิ่มการเพิ่มประสิทธิภาพในการจัดการอัตโนมัติ skew join ตามสถิติรันไทม์ด้วยเฟรมเวิร์กการดำเนินการแบบปรับตัวใหม่

สภาพพาร์ทิชันเบ้

ความท้าทายของพาร์ติชันแบบเบ้ที่มีอยู่ในเวอร์ชันก่อนหน้าของ Spark 2.4 มีผลกระทบอย่างมากต่อเวลาเครือข่ายและเวลาดำเนินการของงานเฉพาะ นอกจากนี้ วิธีการจัดการกับมันส่วนใหญ่เป็นแบบแมนนวล Spark 3.0 เอาชนะความท้าทายเหล่านี้

พาร์ติชั่นที่เบ้จะมีผลกระทบต่อทราฟฟิกเครือข่ายและต่อเวลาดำเนินการงาน เนื่องจากงานเฉพาะนี้จะมีข้อมูลที่ต้องดำเนินการอีกมาก คุณต้องรู้ด้วยว่าสิ่งนี้ส่งผลต่อความปลอดภัยในโลกไซเบอร์อย่างไรตั้งแต่ ปริมาณการรับส่งข้อมูลเครือข่ายเป็นสิ่งที่แฮ็กเกอร์ใช้ประโยชน์จาก.

พาร์ติชั่นการรวมแบบเบ้คำนวณโดยขนาดข้อมูลและจำนวนแถวจากสถิติการแมปรันไทม์

การเพิ่มประสิทธิภาพ

ดัดแปลงมาจาก:อาปาเช่ สปาร์ค จิระ

จากตารางข้างต้น แคมเปญ Dataframe จะเข้าร่วมกับองค์กร Dataframe หนึ่งในพาร์ติชั่น (Partition 0) จาก Organizations มีขนาดใหญ่และเบ้ Partition 0 เป็นผลจาก 9 แผนที่จากด่านที่แล้ว (Map-0 to Map-8) กฎ OptimizeSkewedJoin ของ Spark จะแบ่ง Partition ออกเป็น 3 ส่วน แล้วสร้าง 3 งานแยกกัน โดยแต่ละส่วนจะเป็น Partition 0 (Map-0 to Map-2, Map-3 to Map-5 และ Map-6 to Map-9) และเข้าร่วมกับ Campaigns Partition 0 แนวทางนี้ส่งผลให้เกิดค่าใช้จ่ายเพิ่มเติมโดยการอ่าน Partition 0 ของ Table Campaigns เท่ากับจำนวนพาร์ติชั่นบางส่วนจากตาราง Organizations

สิ้นสุดผลลัพธ์

เมื่อใช้ Delta Lake และ Spark 3.0 เราเปิดใช้งานผลลัพธ์ต่อไปนี้สำหรับบริษัทเทคโนโลยีโฆษณา:

  • เวลาในการประมวลผลข้อมูลลดลงจาก 15 ชั่วโมงเป็น 5-6 ชั่วโมง
  • ลดค่าใช้จ่าย AWS EMR ลง 50%
  • ป้องกันการสูญหายของข้อมูลและการตายของกระบวนการซึ่งเกิดขึ้นบ่อยครั้งเมื่อระบบไม่มีหน่วยความจำหรือการประมวลผลหยุดลงเนื่องจากความผิดพลาดในระบบ
  • มีการติดตั้งคุณสมบัติการตรวจสอบและการแจ้งเตือนเพื่อแจ้งเตือนในกรณีที่กระบวนการล้มเหลว
  • ประสานการทำงานให้สมบูรณ์โดยใช้ Airflow เพื่อให้เกิดการทำงานอัตโนมัติเต็มรูปแบบและการจัดการการพึ่งพาระหว่างกระบวนการต่างๆ
ที่มา: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

ประทับเวลา:

เพิ่มเติมจาก กลุ่ม SmartData