ในคู่มือนี้ เราจะมุ่งเน้นไปที่การอ่าน/โหลดไฟล์ parquet ลงใน PySpark DataFrame/SQL เป็นหลัก โดยใช้ฟังก์ชัน read.parquet() ซึ่งมีอยู่ในคลาส pyspark.sql.DataFrameReader
หัวข้อเนื้อหา:
อ่านไฟล์ Parquet ไปยัง PySpark DataFrame
อ่านไฟล์ Parquet ไปยัง PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
ฟังก์ชันนี้ใช้เพื่ออ่านไฟล์ปาร์เก้และโหลดลงใน PySpark DataFrame ใช้เส้นทาง/ชื่อไฟล์ของไฟล์ปาร์เก้ เราสามารถใช้ฟังก์ชัน read.parquet() เนื่องจากเป็นฟังก์ชันทั่วไป
ไวยากรณ์:
มาดูไวยากรณ์ของ read.parquet():
spark_app.read.parquet(file_name.parquet/path)ก่อนอื่นให้ติดตั้งโมดูล PySpark โดยใช้คำสั่ง pip:
pip ติดตั้ง pyspark
รับไฟล์ Parquet
หากต้องการอ่านไฟล์ปาร์เก้ คุณต้องใช้ข้อมูลที่สร้างไฟล์ปาร์เก้จากข้อมูลนั้น ในส่วนนี้ เราจะดูวิธีสร้างไฟล์ parquet จาก PySpark DataFrame
มาสร้าง PySpark DataFrame ด้วย 5 เรคคอร์ด แล้วเขียนลงในไฟล์ปาร์เก้ “industry_parquet”
นำเข้า pysparkจาก pyspark.sql นำเข้า SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# สร้าง dataframe ที่เก็บรายละเอียดอุตสาหกรรม
Industry_df = linuxhint_spark_app.createDataFrame([แถว(ประเภท= 'เกษตรกรรม' ,พื้นที่= 'สหรัฐอเมริกา' ,
คะแนน= 'ร้อน' ,Total_employees= 100 ),
แถว(ประเภท= 'เกษตรกรรม' ,พื้นที่= 'อินเดีย' ,เรทติ้ง= 'ร้อน' ,Total_employees= 200 ),
แถว(ประเภท= 'การพัฒนา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'อบอุ่น' ,Total_employees= 100 ),
แถว(ประเภท= 'การศึกษา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'เย็น' ,Total_employees= 400 ),
แถว(ประเภท= 'การศึกษา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'อบอุ่น' ,Total_employees= ยี่สิบ )
])
# DataFrame จริง
Industry_df.show()
# เขียน Industry_df ลงในไฟล์ปาร์เก้
Industry_df.coalesce( 1 ).write.parquet( 'อุตสาหกรรม_ปาร์เก้' )
เอาท์พุต:
นี่คือ DataFrame ที่เก็บ 5 ระเบียน
ไฟล์ parquet ถูกสร้างขึ้นสำหรับ DataFrame ก่อนหน้า ที่นี่ ชื่อไฟล์ของเราที่มีนามสกุลคือ เราใช้ไฟล์นี้ในการสอนทั้งหมด
อ่านไฟล์ Parquet ไปยัง PySpark DataFrame
เรามีไฟล์ไม้ปาร์เก้ มาอ่านไฟล์นี้โดยใช้ฟังก์ชัน read.parquet() และโหลดลงใน PySpark DataFrame
นำเข้า pysparkจาก pyspark.sql นำเข้า SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# อ่านไฟล์ parquet ใน dataframe_from_parquet object
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# แสดง dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
เอาท์พุต:
เราแสดง DataFrame โดยใช้เมธอด show() ซึ่งสร้างขึ้นจากไฟล์ปาร์เก้
แบบสอบถาม SQL ด้วยไฟล์ Parquet
หลังจากโหลดลงใน DataFrame แล้ว คุณจะสามารถสร้างตาราง SQL และแสดงข้อมูลที่มีอยู่ใน DataFrame ได้ เราจำเป็นต้องสร้าง TEMPORARY VIEW และใช้คำสั่ง SQL เพื่อส่งคืนระเบียนจาก DataFrame ซึ่งสร้างขึ้นจากไฟล์ parquet
ตัวอย่างที่ 1:
สร้างมุมมองชั่วคราวชื่อ “ภาคส่วน” และใช้คำสั่ง SELECT เพื่อแสดงบันทึกใน DataFrame คุณสามารถอ้างถึงสิ่งนี้ กวดวิชา ที่อธิบายวิธีสร้าง VIEW ใน Spark – SQL
นำเข้า pysparkจาก pyspark.sql นำเข้า SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# อ่านไฟล์ parquet ใน dataframe_from_parquet object
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# สร้างมุมมองจากไฟล์ไม้ปาร์เก้ด้านบนชื่อ - 'ภาค'
dataframe_from_parquet.createOrReplaceTempView( 'ภาค' )
# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจากภาค
linuxhint_spark_app.sql( 'เลือก * จากภาค' ).แสดง()
เอาท์พุต:
ตัวอย่างที่ 2:
ใช้ VIEW ก่อนหน้า เขียนแบบสอบถาม SQL:
- เพื่อแสดงบันทึกทั้งหมดจากภาคที่เป็นของ “อินเดีย”
- เมื่อต้องการแสดงเรกคอร์ดทั้งหมดจากเซกเตอร์กับพนักงานที่มากกว่า 100
linuxhint_spark_app.sql( 'เลือก * จากภาคที่ Area='India'' ).แสดง()
# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจากภาคที่มีพนักงานมากกว่า 100
linuxhint_spark_app.sql( 'เลือก * จากส่วนที่ Total_employees>100' ).แสดง()
เอาท์พุต:
มีเพียงระเบียนเดียวที่มีพื้นที่ซึ่งเป็น 'อินเดีย' และสองระเบียนที่มีพนักงานมากกว่า 100 คน
อ่านไฟล์ Parquet ไปยัง PySpark SQL
ก่อนอื่นเราต้องสร้าง VIEW โดยใช้คำสั่ง CREATE การใช้คีย์เวิร์ด “path” ในเคียวรี SQL เราสามารถอ่านไฟล์ parquet ไปยัง Spark SQL ได้ หลังจากเส้นทาง เราต้องระบุชื่อไฟล์/ตำแหน่งของไฟล์
ไวยากรณ์:
spark_app.sql( 'สร้างมุมมองชั่วคราว view_name โดยใช้ตัวเลือกปาร์เก้ (เส้นทาง ' file_name.parquet ')' )ตัวอย่างที่ 1:
สร้างมุมมองชั่วคราวชื่อ 'Sector2' และอ่านไฟล์ปาร์เก้ลงไป ใช้ฟังก์ชัน sql() เขียนแบบสอบถามแบบเลือกเพื่อแสดงระเบียนทั้งหมดที่มีอยู่ในมุมมอง
นำเข้า pysparkจาก pyspark.sql นำเข้า SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# อ่านไฟล์ปาร์เก้ใน Spark- SQL
linuxhint_spark_app.sql( 'สร้างมุมมองชั่วคราว Sector2 โดยใช้ตัวเลือกไม้ปาร์เก้ (เส้นทาง ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจาก Sector2
linuxhint_spark_app.sql( 'เลือก * จาก Sector2' ).แสดง()
เอาท์พุต:
ตัวอย่างที่ 2:
ใช้ VIEW ก่อนหน้าและเขียนคิวรี่เพื่อแสดงเรคคอร์ดทั้งหมดที่มีเรคคอร์ดเป็น 'Hot' หรือ 'Cool'
# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจาก Sector2 พร้อม Rating- Hot หรือ Coollinuxhint_spark_app.sql( 'เลือก * จากภาค 2 โดยที่ Rating='Hot' หรือ Rating='Cool'' ).แสดง()
เอาท์พุต:
มีสามระเบียนที่มีการจัดอันดับ 'ร้อน' หรือ 'เย็น'
บทสรุป
ใน PySpark ฟังก์ชัน write.parquet() เขียน DataFrame ไปยังไฟล์ parquet ฟังก์ชัน read.parquet() อ่านไฟล์ parquet ไปยัง PySpark DataFrame หรือ DataSource อื่นๆ เราได้เรียนรู้วิธีอ่านไฟล์ปาร์เก้ใน PySpark DataFrame และในตาราง PySpark ในส่วนหนึ่งของบทช่วยสอนนี้ เรายังกล่าวถึงวิธีสร้างตารางจาก PySpark DataFrame และกรองข้อมูลโดยใช้ส่วนคำสั่ง WHERE