PySpark Read.Parquet()

Pyspark Read Parquet



ใน PySpark ฟังก์ชัน write.parquet() เขียน DataFrame ไปยังไฟล์ parquet และ read.parquet() อ่านไฟล์ parquet ไปยัง PySpark DataFrame หรือ DataSource อื่นๆ ในการประมวลผลคอลัมน์ใน Apache Spark อย่างรวดเร็วและมีประสิทธิภาพ เราจำเป็นต้องบีบอัดข้อมูล การบีบอัดข้อมูลช่วยประหยัดหน่วยความจำของเราและคอลัมน์ทั้งหมดจะถูกแปลงเป็นระดับแบน นั่นหมายความว่ามีที่จัดเก็บระดับคอลัมน์แบบเรียบอยู่ ไฟล์ที่เก็บสิ่งเหล่านี้เรียกว่าไฟล์ PARQUET

ในคู่มือนี้ เราจะมุ่งเน้นไปที่การอ่าน/โหลดไฟล์ parquet ลงใน PySpark DataFrame/SQL เป็นหลัก โดยใช้ฟังก์ชัน read.parquet() ซึ่งมีอยู่ในคลาส pyspark.sql.DataFrameReader

หัวข้อเนื้อหา:







รับไฟล์ Parquet



อ่านไฟล์ Parquet ไปยัง PySpark DataFrame



อ่านไฟล์ Parquet ไปยัง PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

ฟังก์ชันนี้ใช้เพื่ออ่านไฟล์ปาร์เก้และโหลดลงใน PySpark DataFrame ใช้เส้นทาง/ชื่อไฟล์ของไฟล์ปาร์เก้ เราสามารถใช้ฟังก์ชัน read.parquet() เนื่องจากเป็นฟังก์ชันทั่วไป

ไวยากรณ์:



มาดูไวยากรณ์ของ read.parquet():

spark_app.read.parquet(file_name.parquet/path)

ก่อนอื่นให้ติดตั้งโมดูล PySpark โดยใช้คำสั่ง pip:

pip ติดตั้ง pyspark

รับไฟล์ Parquet

หากต้องการอ่านไฟล์ปาร์เก้ คุณต้องใช้ข้อมูลที่สร้างไฟล์ปาร์เก้จากข้อมูลนั้น ในส่วนนี้ เราจะดูวิธีสร้างไฟล์ parquet จาก PySpark DataFrame

มาสร้าง PySpark DataFrame ด้วย 5 เรคคอร์ด แล้วเขียนลงในไฟล์ปาร์เก้ “industry_parquet”

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# สร้าง dataframe ที่เก็บรายละเอียดอุตสาหกรรม

Industry_df = linuxhint_spark_app.createDataFrame([แถว(ประเภท= 'เกษตรกรรม' ,พื้นที่= 'สหรัฐอเมริกา' ,
คะแนน= 'ร้อน' ,Total_employees= 100 ),

แถว(ประเภท= 'เกษตรกรรม' ,พื้นที่= 'อินเดีย' ,เรทติ้ง= 'ร้อน' ,Total_employees= 200 ),

แถว(ประเภท= 'การพัฒนา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'อบอุ่น' ,Total_employees= 100 ),

แถว(ประเภท= 'การศึกษา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'เย็น' ,Total_employees= 400 ),

แถว(ประเภท= 'การศึกษา' ,พื้นที่= 'สหรัฐอเมริกา' ,เรทติ้ง= 'อบอุ่น' ,Total_employees= ยี่สิบ )

])

# DataFrame จริง

Industry_df.show()

# เขียน Industry_df ลงในไฟล์ปาร์เก้

Industry_df.coalesce( 1 ).write.parquet( 'อุตสาหกรรม_ปาร์เก้' )

เอาท์พุต:

นี่คือ DataFrame ที่เก็บ 5 ระเบียน

ไฟล์ parquet ถูกสร้างขึ้นสำหรับ DataFrame ก่อนหน้า ที่นี่ ชื่อไฟล์ของเราที่มีนามสกุลคือ เราใช้ไฟล์นี้ในการสอนทั้งหมด

อ่านไฟล์ Parquet ไปยัง PySpark DataFrame

เรามีไฟล์ไม้ปาร์เก้ มาอ่านไฟล์นี้โดยใช้ฟังก์ชัน read.parquet() และโหลดลงใน PySpark DataFrame

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านไฟล์ parquet ใน dataframe_from_parquet object

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# แสดง dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

เอาท์พุต:

เราแสดง DataFrame โดยใช้เมธอด show() ซึ่งสร้างขึ้นจากไฟล์ปาร์เก้

แบบสอบถาม SQL ด้วยไฟล์ Parquet

หลังจากโหลดลงใน DataFrame แล้ว คุณจะสามารถสร้างตาราง SQL และแสดงข้อมูลที่มีอยู่ใน DataFrame ได้ เราจำเป็นต้องสร้าง TEMPORARY VIEW และใช้คำสั่ง SQL เพื่อส่งคืนระเบียนจาก DataFrame ซึ่งสร้างขึ้นจากไฟล์ parquet

ตัวอย่างที่ 1:

สร้างมุมมองชั่วคราวชื่อ “ภาคส่วน” และใช้คำสั่ง SELECT เพื่อแสดงบันทึกใน DataFrame คุณสามารถอ้างถึงสิ่งนี้ กวดวิชา ที่อธิบายวิธีสร้าง VIEW ใน Spark – SQL

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านไฟล์ parquet ใน dataframe_from_parquet object

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# สร้างมุมมองจากไฟล์ไม้ปาร์เก้ด้านบนชื่อ - 'ภาค'

dataframe_from_parquet.createOrReplaceTempView( 'ภาค' )

# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจากภาค

linuxhint_spark_app.sql( 'เลือก * จากภาค' ).แสดง()

เอาท์พุต:

ตัวอย่างที่ 2:

ใช้ VIEW ก่อนหน้า เขียนแบบสอบถาม SQL:

  1. เพื่อแสดงบันทึกทั้งหมดจากภาคที่เป็นของ “อินเดีย”
  2. เมื่อต้องการแสดงเรกคอร์ดทั้งหมดจากเซกเตอร์กับพนักงานที่มากกว่า 100
# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจากภาคที่เป็นของ 'อินเดีย'

linuxhint_spark_app.sql( 'เลือก * จากภาคที่ Area='India'' ).แสดง()

# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจากภาคที่มีพนักงานมากกว่า 100

linuxhint_spark_app.sql( 'เลือก * จากส่วนที่ Total_employees>100' ).แสดง()

เอาท์พุต:

มีเพียงระเบียนเดียวที่มีพื้นที่ซึ่งเป็น 'อินเดีย' และสองระเบียนที่มีพนักงานมากกว่า 100 คน

อ่านไฟล์ Parquet ไปยัง PySpark SQL

ก่อนอื่นเราต้องสร้าง VIEW โดยใช้คำสั่ง CREATE การใช้คีย์เวิร์ด “path” ในเคียวรี SQL เราสามารถอ่านไฟล์ parquet ไปยัง Spark SQL ได้ หลังจากเส้นทาง เราต้องระบุชื่อไฟล์/ตำแหน่งของไฟล์

ไวยากรณ์:

spark_app.sql( 'สร้างมุมมองชั่วคราว view_name โดยใช้ตัวเลือกปาร์เก้ (เส้นทาง ' file_name.parquet ')' )

ตัวอย่างที่ 1:

สร้างมุมมองชั่วคราวชื่อ 'Sector2' และอ่านไฟล์ปาร์เก้ลงไป ใช้ฟังก์ชัน sql() เขียนแบบสอบถามแบบเลือกเพื่อแสดงระเบียนทั้งหมดที่มีอยู่ในมุมมอง

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านไฟล์ปาร์เก้ใน Spark- SQL

linuxhint_spark_app.sql( 'สร้างมุมมองชั่วคราว Sector2 โดยใช้ตัวเลือกไม้ปาร์เก้ (เส้นทาง ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจาก Sector2

linuxhint_spark_app.sql( 'เลือก * จาก Sector2' ).แสดง()

เอาท์พุต:

ตัวอย่างที่ 2:

ใช้ VIEW ก่อนหน้าและเขียนคิวรี่เพื่อแสดงเรคคอร์ดทั้งหมดที่มีเรคคอร์ดเป็น 'Hot' หรือ 'Cool'

# แบบสอบถามเพื่อแสดงบันทึกทั้งหมดจาก Sector2 พร้อม Rating- Hot หรือ Cool

linuxhint_spark_app.sql( 'เลือก * จากภาค 2 โดยที่ Rating='Hot' หรือ Rating='Cool'' ).แสดง()

เอาท์พุต:

มีสามระเบียนที่มีการจัดอันดับ 'ร้อน' หรือ 'เย็น'

บทสรุป

ใน PySpark ฟังก์ชัน write.parquet() เขียน DataFrame ไปยังไฟล์ parquet ฟังก์ชัน read.parquet() อ่านไฟล์ parquet ไปยัง PySpark DataFrame หรือ DataSource อื่นๆ เราได้เรียนรู้วิธีอ่านไฟล์ปาร์เก้ใน PySpark DataFrame และในตาราง PySpark ในส่วนหนึ่งของบทช่วยสอนนี้ เรายังกล่าวถึงวิธีสร้างตารางจาก PySpark DataFrame และกรองข้อมูลโดยใช้ส่วนคำสั่ง WHERE