การประมวลผลไฟล์ขนาดใหญ่แบบขนานใน Python

การประมวลผลไฟล์ขนาดใหญ่แบบขนานใน Python

โหนดต้นทาง: 1970104

การประมวลผลไฟล์ขนาดใหญ่แบบขนานใน Python
ภาพโดยผู้เขียน
 

สำหรับการประมวลผลแบบขนาน เราแบ่งงานออกเป็นหน่วยย่อย จะเพิ่มจำนวนงานที่ประมวลผลโดยโปรแกรมและลดเวลาในการดำเนินการโดยรวม 

ตัวอย่างเช่น หากคุณกำลังทำงานกับไฟล์ CSV ขนาดใหญ่และต้องการแก้ไขคอลัมน์เดียว เราจะป้อนข้อมูลเป็นอาร์เรย์ของฟังก์ชัน และจะประมวลผลหลายค่าพร้อมกันตามจำนวนที่มีอยู่  แรงงาน. ผู้ปฏิบัติงานเหล่านี้ขึ้นอยู่กับจำนวนคอร์ภายในโปรเซสเซอร์ของคุณ 
 

หมายเหตุ การใช้การประมวลผลแบบขนานกับชุดข้อมูลที่มีขนาดเล็กลงจะไม่ช่วยปรับปรุงเวลาในการประมวลผล

 

ในบล็อกนี้ เราจะเรียนรู้วิธีลดเวลาในการประมวลผลไฟล์ขนาดใหญ่โดยใช้ มัลติโปรเซสเซอร์, จ็อบลิบและ ตร.ม แพ็คเกจไพทอน เป็นบทช่วยสอนง่ายๆ ที่สามารถนำไปใช้กับไฟล์ ฐานข้อมูล รูปภาพ วิดีโอ และเสียง 
 

หมายเหตุ เรากำลังใช้สมุดบันทึก Kaggle สำหรับการทดลอง เวลาในการประมวลผลอาจแตกต่างกันไปในแต่ละเครื่อง  

 

เราจะใช้ อุบัติเหตุในสหรัฐอเมริกา (2016 – 2021) ชุดข้อมูลจาก Kaggle ซึ่งประกอบด้วย 2.8 ล้านระเบียนและ 47 คอลัมน์ 

เราจะนำเข้า multiprocessing, joblibและ tqdm for การประมวลผลแบบขนาน, pandas for การนำเข้าข้อมูลและ re, nltkและ string for การประมวลผลข้อความ

# คอมพิวเตอร์แบบขนาน
นำเข้า มัลติโปรเซสเซอร์ as mp
ราคาเริ่มต้นที่ จ็อบลิบ นำเข้า ขนานกันล่าช้า
ราคาเริ่มต้นที่ tqdm.โน๊ตบุ๊ค นำเข้า ตร.ม # การนำเข้าข้อมูล 
นำเข้า หมีแพนด้า as pd # การประมวลผลข้อความ 
นำเข้า re ราคาเริ่มต้นที่ nltk.คลังข้อมูล นำเข้า คำหยุด
นำเข้า เชือก

ก่อนที่เราจะโดดเข้าไป มาเซ็ตกัน n_workers โดยทวีคูณ cpu_count(). อย่างที่คุณเห็น เรามีคนงาน 8 คน

n_workers = 2 * mp.cpu_count() พิมพ์(f"{n_workers} คนว่าง") >>> มีคนงาน 8 คน

ในขั้นตอนต่อไป เราจะนำเข้าไฟล์ CSV ขนาดใหญ่โดยใช้ หมีแพนด้า read_csv การทำงาน. จากนั้นพิมพ์รูปร่างของดาต้าเฟรม ชื่อของคอลัมน์ และเวลาประมวลผล 

หมายเหตุ ฟังก์ชั่นเวทย์มนตร์ของ Jupyter %%time สามารถแสดง ซีพียูครั้ง และ เวลาผนัง เมื่อสิ้นสุดกระบวนการ 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnชื่อคอลัมน์:n{df.columns}n")

เอาท์พุต

รูปร่าง:(2845342, 47) ชื่อคอลัมน์: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'คำอธิบาย', 'หมายเลข', 'ถนน', 'ด้านข้าง', 'เมือง', 'เขต', 'รัฐ', 'รหัสไปรษณีย์', 'ประเทศ', 'เขตเวลา', 'รหัสสนามบิน', 'สภาพอากาศ_เวลาประทับ', 'อุณหภูมิ(F)', 'Wind_Chill(F)', 'ความชื้น(%)', 'ความดัน(เข้า)', 'ทัศนวิสัย(ไมล์)', 'ทิศทางลม', 'ความเร็วลม(mph)', 'ปริมาณน้ำฝน(ใน )', 'สภาพอากาศ_สภาพ', 'สิ่งอำนวยความสะดวก', 'ชน', 'ทางข้าม', 'ให้_ทาง', 'ทางแยก', 'ไม่มีทางออก', 'ทางรถไฟ', 'วงเวียน', 'สถานี', 'หยุด', 'การจราจรสงบเงียบ' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') เวลา CPU: ผู้ใช้ 33.9 วินาที, ซิสเต็ม: 3.93 วินาที, ทั้งหมด: 37.9 วินาที เวลาผนัง : 46.9 วินาที

พื้นที่ clean_text เป็นฟังก์ชันที่ตรงไปตรงมาสำหรับการประมวลผลและทำความสะอาดข้อความ เราจะได้ภาษาอังกฤษ คำหยุด การใช้ nltk.copus ใช้เพื่อกรองคำหยุดจากบรรทัดข้อความ หลังจากนั้น เราจะลบอักขระพิเศษและช่องว่างพิเศษออกจากประโยค มันจะเป็นฟังก์ชันพื้นฐานในการกำหนดเวลาการประมวลผลสำหรับ อนุกรม, ขนานและ ชุด การประมวลผล 

def clean_text(ข้อความ): # ลบคำหยุด หยุด = stopwords.words("อังกฤษ") text = " ".join([คำ for
 word in ข้อความ.split() if word ไม่ in หยุด]) # ลบอักขระพิเศษ text = text.translate(str.maketrans('', '', string.punctuation)) #ลบช่องว่างพิเศษ text = re.sub('+',' ', text) กลับ ข้อความ

สำหรับการประมวลผลแบบอนุกรม เราสามารถใช้แพนด้าได้ .apply() ฟังก์ชั่น แต่ถ้าคุณต้องการดูแถบความคืบหน้า คุณต้องเปิดใช้งาน ตร.ม for หมีแพนด้า แล้วใช้ .progress_apply() ฟังก์ชัน 

เราจะประมวลผลข้อมูล 2.8 ล้านรายการและบันทึกผลลัพธ์กลับไปที่คอลัมน์คอลัมน์ "คำอธิบาย" 

%%เวลา tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

เอาท์พุต

ใช้เวลา 9 นาที 5 วินาทีในการ ระดับ high-end โปรเซสเซอร์สู่กระบวนการอนุกรม 2.8 ล้านแถว 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] เวลา CPU: ผู้ใช้ 8 นาที 14 วินาที, ซิสเต็ม: 53.6 วินาที, ทั้งหมด: 9 นาที 7 วินาที เวลาผนัง: 9 นาที 5 วินาที

มีหลายวิธีในการประมวลผลไฟล์แบบขนาน และเราจะเรียนรู้เกี่ยวกับวิธีเหล่านี้ทั้งหมด เดอะ multiprocessing เป็นแพ็คเกจ python ในตัวที่ใช้กันทั่วไปสำหรับการประมวลผลไฟล์ขนาดใหญ่แบบขนาน 

เราจะสร้างการประมวลผลหลายตัว สระ กับ แรงงาน 8 และใช้ แผนที่ ทำหน้าที่เริ่มต้นกระบวนการ เพื่อแสดงแถบความคืบหน้า เราใช้ ตร.ม.

ฟังก์ชั่นแผนที่ประกอบด้วยสองส่วน อันแรกต้องการฟังก์ชัน และอันที่สองต้องการอาร์กิวเมนต์หรือรายการอาร์กิวเมนต์ 

เรียนรู้เพิ่มเติมโดยการอ่าน เอกสาร

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

เอาท์พุต

เราได้ปรับปรุงเวลาในการดำเนินการของเราเกือบ 3X. เวลาในการดำเนินการลดลงจาก 9 นาที 5 วินาที ไปยัง 3 นาที 51 วินาที.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] เวลา CPU: ผู้ใช้ 5.68 วินาที, ซิสเต็ม: 1.56 วินาที, ทั้งหมด: 7.23 วินาที เวลาผนัง: 3 นาที 51 วินาที

ตอนนี้เราจะเรียนรู้เกี่ยวกับแพ็คเกจ Python อื่นเพื่อดำเนินการประมวลผลแบบขนาน ในส่วนนี้เราจะใช้ joblib's Parallel และ ล่าช้า เพื่อทำซ้ำ แผนที่ ฟังก์ชัน 

  • Parallel ต้องการสองอาร์กิวเมนต์: n_jobs = 8 และ backend = multiprocessing
  • จากนั้นเราจะเพิ่ม clean_text  ไป ล่าช้า ฟังก์ชัน 
  • สร้างการวนซ้ำเพื่อป้อนค่าเดียวในแต่ละครั้ง 

ขั้นตอนด้านล่างนี้ค่อนข้างทั่วไป และคุณสามารถปรับเปลี่ยนฟังก์ชันและอาร์เรย์ได้ตามความต้องการของคุณ ฉันใช้มันเพื่อประมวลผลไฟล์เสียงและวิดีโอนับพันไฟล์โดยไม่มีปัญหาใดๆ 

ที่แนะนำ: เพิ่มการจัดการข้อยกเว้นโดยใช้ try: และ except:

def text_parallel_clean(อาร์เรย์): ผลลัพธ์ = Parallel(n_jobs=n_workers,backend="multiprocessing")( ล่าช้า (clean_text) (ข้อความ) for
 ข้อความ in tqdm(อาร์เรย์) ) กลับ ผล

เพิ่มคอลัมน์ "คำอธิบาย" ไปที่ text_parallel_clean()

%%time df['คำอธิบาย'] = text_parallel_clean(df['คำอธิบาย'])

เอาท์พุต

ฟังก์ชันของเราใช้เวลานานกว่าการประมวลผลหลายตัว . 13 วินาที สระ ถึงอย่างนั้น Parallel เร็วกว่า 4 นาที 59 วินาที อนุกรม การประมวลผล 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] เวลา CPU: ผู้ใช้ 44.2 วินาที, ซิสเต็ม: 2.92 วินาที, ทั้งหมด: 47.1 วินาที เวลาผนัง: 4 นาที 4 วินาที

มีวิธีที่ดีกว่าในการประมวลผลไฟล์ขนาดใหญ่โดยแบ่งเป็นแบทช์และประมวลผลแบบขนาน เริ่มต้นด้วยการสร้างฟังก์ชันแบตช์ที่จะเรียกใช้ a clean_function ในชุดค่าเดียว 

ฟังก์ชันการประมวลผลแบบแบตช์

def proc_batch(แบทช์): กลับ [ clean_text (ข้อความ) for
 ข้อความ in ชุด ]

การแยกไฟล์ออกเป็นแบทช์

ฟังก์ชันด้านล่างนี้จะแบ่งไฟล์ออกเป็นหลายแบทช์ตามจำนวนผู้ปฏิบัติงาน ในกรณีของเรา เราได้รับ 8 ชุด 

def ไฟล์ชุด(อาร์เรย์,n_workers): file_len = len(อาร์เรย์) batch_size = รอบ(file_len / n_workers) แบทช์ = [ อาร์เรย์[ix:ix+batch_size] for
 ix in tqdm(ช่วง(0, file_len, batch_size)) ] กลับ แบทช์ แบทช์ = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

กำลังเรียกใช้การประมวลผลแบทช์คู่ขนาน

สุดท้าย เราจะใช้ Parallel และ ล่าช้า เพื่อประมวลผลแบทช์ 

หมายเหตุ ในการรับค่าอาร์เรย์เดียว เราต้องรัน list comprehension ดังที่แสดงด้านล่าง 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( ล่าช้า (proc_batch) (แบทช์) for
 ชุด in tqdm(แบทช์) ) df['Description'] = [j for
 i in ชุด_เอาท์พุท for
 j in i]

เอาท์พุต

เราได้ปรับปรุงเวลาในการประมวลผล เทคนิคนี้มีชื่อเสียงในการประมวลผลข้อมูลที่ซับซ้อนและฝึกโมเดลการเรียนรู้เชิงลึก 

100% 8/8 [00:00<00:00, 2.19it/s] เวลา CPU: ผู้ใช้ 3.39 วินาที, ซิสเต็ม: 1.42 วินาที, ทั้งหมด: 4.81 วินาที เวลาผนัง: 3 นาที 56 วินาที

tqdm นำการประมวลผลหลายตัวไปสู่อีกระดับ มันเรียบง่ายและทรงพลัง ฉันจะแนะนำให้นักวิทยาศาสตร์ข้อมูลทุกคน 

วันที่ออก เอกสาร เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับการประมวลผลหลายตัว 

พื้นที่ process_map ต้องใช้:

  1. ชื่อฟังก์ชัน
  2. คอลัมน์ดาต้าเฟรม
  3. max_workers
  4. chucksize คล้ายกับขนาดแบทช์ เราจะคำนวณขนาดแบทช์โดยใช้จำนวนผู้ปฏิบัติงาน หรือคุณสามารถเพิ่มจำนวนตามความต้องการของคุณ 
%%เวลา
ราคาเริ่มต้นที่ tqdm.contrib.พร้อมกัน นำเข้า process_map batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch )

เอาท์พุต

ด้วยรหัสบรรทัดเดียว เราจะได้ผลลัพธ์ที่ดีที่สุด 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] เวลา CPU: ผู้ใช้ 7.32 วินาที, ซิสเต็ม: 1.97 วินาที, ทั้งหมด: 9.29 วินาที เวลาผนัง: 3 นาที 51 วินาที

คุณต้องหาจุดสมดุลและเลือกเทคนิคที่เหมาะสมกับกรณีของคุณมากที่สุด อาจเป็นการประมวลผลแบบอนุกรม แบบขนาน หรือแบบแบทช์ การประมวลผลแบบขนานสามารถย้อนกลับได้หากคุณทำงานกับชุดข้อมูลที่มีขนาดเล็กกว่าและซับซ้อนน้อยกว่า 

ในบทช่วยสอนขนาดเล็กนี้ เราได้เรียนรู้เกี่ยวกับแพ็คเกจและเทคนิค Python ต่างๆ ที่ช่วยให้เราประมวลผลฟังก์ชันข้อมูลของเราแบบขนาน 

หากคุณกำลังทำงานกับชุดข้อมูลแบบตารางเท่านั้นและต้องการปรับปรุงประสิทธิภาพการประมวลผล เราขอแนะนำให้คุณลอง แผงควบคุม, ตารางข้อมูลและ RAPIDS 

อ้างอิง 

 
 
อาบิด อาลี อาวัน (@1อบีดาลิวัน) เป็นนักวิทยาศาสตร์ข้อมูลที่ได้รับการรับรองมืออาชีพที่รักการสร้างแบบจำลองการเรียนรู้ของเครื่อง ปัจจุบันเขามุ่งเน้นไปที่การสร้างเนื้อหาและการเขียนบล็อกทางเทคนิคเกี่ยวกับการเรียนรู้ของเครื่องและเทคโนโลยีวิทยาศาสตร์ข้อมูล อาบิดสำเร็จการศึกษาระดับปริญญาโทด้านการจัดการเทคโนโลยีและปริญญาตรีสาขาวิศวกรรมโทรคมนาคม วิสัยทัศน์ของเขาคือการสร้างผลิตภัณฑ์ AI โดยใช้โครงข่ายประสาทเทียมแบบกราฟสำหรับนักเรียนที่ป่วยเป็นโรคทางจิต
 

ประทับเวลา:

เพิ่มเติมจาก KD นักเก็ต

เรื่องเด่น 2-8 ส.ค.: 3 เหตุผลที่คุณควรใช้แบบจำลองการถดถอยเชิงเส้นแทนโครงข่ายประสาทเทียม บูตสแตรป Modern Data Stack ใน 5 นาทีด้วย Terraform

โหนดต้นทาง: 1860956
ประทับเวลา: สิงหาคม 9, 2021