วิธีอ่านและเขียนข้อมูลตารางใน PySpark

Withi Xan Laea Kheiyn Khxmul Tarang Ni Pyspark



การประมวลผลข้อมูลใน PySpark จะเร็วขึ้นหากโหลดข้อมูลในรูปแบบของตาราง ด้วยเหตุนี้ การใช้ SQl Expressions จะทำให้การประมวลผลเป็นไปอย่างรวดเร็ว ดังนั้น การแปลง PySpark DataFrame/RDD เป็นตารางก่อนที่จะส่งไปประมวลผลจึงเป็นวิธีที่ดีกว่า วันนี้เราจะมาดูวิธีอ่านข้อมูลตารางใน PySpark DataFrame เขียน PySpark DataFrame ลงในตาราง และแทรก DataFrame ใหม่ลงในตารางที่มีอยู่โดยใช้ฟังก์ชันในตัว ไปกันเถอะ!

Pyspark.sql.DataFrameWriter.saveAsTable()

อันดับแรก เราจะดูวิธีการเขียน PySpark DataFrame ที่มีอยู่ลงในตารางโดยใช้ฟังก์ชัน write.saveAsTable() ใช้ชื่อตารางและพารามิเตอร์ทางเลือกอื่นๆ เช่น โหมด, partionBy ฯลฯ เพื่อเขียน DataFrame ลงในตาราง มันถูกจัดเก็บเป็นไฟล์ปาร์เก้

ไวยากรณ์:







dataframe_obj.write.saveAsTable(พาธ/Table_name,mode,partitionBy,...)
  1. Table_name คือชื่อของตารางที่สร้างขึ้นจาก dataframe_obj
  2. เราสามารถผนวก/เขียนทับข้อมูลของตารางโดยใช้พารามิเตอร์โหมด
  3. PartitionBy ใช้คอลัมน์เดี่ยว/หลายคอลัมน์เพื่อสร้างพาร์ติชันตามค่าในคอลัมน์ที่ให้มาเหล่านี้

ตัวอย่างที่ 1:

สร้าง PySpark DataFrame ที่มี 5 แถว 4 คอลัมน์ เขียน Dataframe นี้ลงในตารางชื่อ “Agri_Table1”



นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# ข้อมูลการทำฟาร์มที่มี 5 แถวและ 5 คอลัมน์

เกษตร =[{ 'ดิน_ชนิด' : 'สีดำ' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 2500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหรัฐอเมริกา' },

{ 'ดิน_ชนิด' : 'สีดำ' , 'การชลประทาน_ความพร้อมใช้' : 'ใช่' , 'เอเคอร์' : 3500 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'อินเดีย' },

{ 'ดิน_ชนิด' : 'สีแดง' , 'การชลประทาน_ความพร้อมใช้' : 'ใช่' , 'เอเคอร์' : 210 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหราชอาณาจักร' },

{ 'ดิน_ชนิด' : 'อื่น' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 1,000 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'สหรัฐอเมริกา' },

{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'อินเดีย' }]



# สร้าง dataframe จากข้อมูลด้านบน

agri_df = linuxhint_spark_app.createDataFrame (agri)

agri_df.show()

# เขียน DataFrame ด้านบนลงในตาราง

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

เอาท์พุต:







เราจะเห็นว่าไฟล์ parquet หนึ่งไฟล์ถูกสร้างขึ้นด้วย PySpark Data ก่อนหน้า



ตัวอย่างที่ 2:

พิจารณา DataFrame ก่อนหน้าและเขียน 'Agri_Table2' ลงในตารางโดยแบ่งพาร์ติชันระเบียนตามค่าในคอลัมน์ 'ประเทศ'

# เขียน DataFrame ด้านบนลงในตารางด้วยพารามิเตอร์ partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,พาร์ติชันโดย=[ 'ประเทศ' ])

เอาท์พุต:

มีค่าที่ไม่ซ้ำกันสามค่าในคอลัมน์ 'ประเทศ' ได้แก่ 'อินเดีย' 'สหราชอาณาจักร' และ 'สหรัฐอเมริกา' ดังนั้นจึงมีการสร้างพาร์ติชันสามพาร์ติชัน แต่ละพาร์ติชันเก็บไฟล์ไม้ปาร์เก้

Pyspark.sql.DataFrameReader.table()

มาโหลดตารางลงใน PySpark DataFrame โดยใช้ฟังก์ชัน spark.read.table() ใช้เพียงพารามิเตอร์เดียวซึ่งเป็นชื่อพาธ/ตาราง มันโหลดตารางโดยตรงลงใน PySpark DataFrame และฟังก์ชั่น SQL ทั้งหมดที่ใช้กับ PySpark DataFrame ก็สามารถนำไปใช้กับ DataFrame ที่โหลดนี้ได้เช่นกัน

ไวยากรณ์:

spark_app.read.table(เส้นทาง/'ชื่อตาราง')

ในสถานการณ์สมมตินี้ เราใช้ตารางก่อนหน้านี้ซึ่งสร้างจาก PySpark DataFrame ตรวจสอบให้แน่ใจว่าคุณต้องใช้ตัวอย่างโค้ดของสถานการณ์ก่อนหน้านี้ในสภาพแวดล้อมของคุณ

ตัวอย่าง:

โหลดตาราง “Agri_Table1” ลงใน DataFrame ชื่อ “loaded_data”

load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

load_data.show()

เอาท์พุต:

เราจะเห็นว่ามีการโหลดตารางลงใน PySpark DataFrame

ดำเนินการแบบสอบถาม SQL

ตอนนี้ เราดำเนินการค้นหา SQL บางส่วนบน DataFrame ที่โหลดโดยใช้ฟังก์ชัน spark.sql()

# ใช้คำสั่ง SELECT เพื่อแสดงคอลัมน์ทั้งหมดจากตารางด้านบน

linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1' ).แสดง()

#ข้อไหน

linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1 WHERE Soil_status='Dry' ' ).แสดง()

linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1 ที่เอเคอร์ > 2000 ' ).แสดง()

เอาท์พุต:

  1. แบบสอบถามแรกแสดงคอลัมน์และระเบียนทั้งหมดจาก DataFrame
  2. ข้อความค้นหาที่สองแสดงระเบียนตามคอลัมน์ “Soil_status” มีเพียงสามบันทึกที่มีองค์ประกอบ 'แห้ง'
  3. ข้อความค้นหาสุดท้ายส่งคืนระเบียนสองรายการที่มี 'เอเคอร์' ซึ่งมากกว่า 2000

Pyspark.sql.DataFrameWriter.insertInto()

การใช้ฟังก์ชัน insertInto() เราสามารถผนวก DataFrame ลงในตารางที่มีอยู่ เราสามารถใช้ฟังก์ชันนี้ร่วมกับ selectExpr() เพื่อกำหนดชื่อคอลัมน์แล้วแทรกลงในตาราง ฟังก์ชันนี้ยังใช้ชื่อตารางเป็นพารามิเตอร์

ไวยากรณ์:

DataFrame_obj.write.insertInto(’Table_name’)

ในสถานการณ์สมมตินี้ เราใช้ตารางก่อนหน้านี้ซึ่งสร้างจาก PySpark DataFrame ตรวจสอบให้แน่ใจว่าคุณต้องใช้ส่วนย่อยโค้ดของสถานการณ์ก่อนหน้าในสภาพแวดล้อมของคุณ

ตัวอย่าง:

สร้าง DataFrame ใหม่ที่มีสองระเบียนแล้วแทรกลงในตาราง “Agri_Table1”

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# ข้อมูลการทำฟาร์ม 2 แถว

เกษตร =[{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 2500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหรัฐอเมริกา' },

{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 1200 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'ญี่ปุ่น' }]

# สร้าง dataframe จากข้อมูลด้านบน

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'เอเคอร์' , 'ประเทศ' , 'การชลประทาน_ความพร้อมใช้' , 'ดิน_ชนิด' ,
'ดิน_สถานะ' ).write.insertInto( 'Agri_Table1' )

# แสดง Agri_Table1 สุดท้าย

linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1' ).แสดง()

เอาท์พุต:

ตอนนี้ จำนวนแถวทั้งหมดที่มีอยู่ใน DataFrame คือ 7

บทสรุป

ตอนนี้คุณเข้าใจวิธีเขียน PySpark DataFrame ลงในตารางโดยใช้ฟังก์ชัน write.saveAsTable() ใช้ชื่อตารางและพารามิเตอร์ทางเลือกอื่นๆ จากนั้น เราโหลดตารางนี้ลงใน PySpark DataFrame โดยใช้ฟังก์ชัน spark.read.table() ใช้เพียงพารามิเตอร์เดียวซึ่งเป็นชื่อพาธ/ตาราง หากคุณต้องการผนวก DataFrame ใหม่ลงในตารางที่มีอยู่ ให้ใช้ฟังก์ชัน insertInto()