Pyspark.sql.DataFrameWriter.saveAsTable()
อันดับแรก เราจะดูวิธีการเขียน PySpark DataFrame ที่มีอยู่ลงในตารางโดยใช้ฟังก์ชัน write.saveAsTable() ใช้ชื่อตารางและพารามิเตอร์ทางเลือกอื่นๆ เช่น โหมด, partionBy ฯลฯ เพื่อเขียน DataFrame ลงในตาราง มันถูกจัดเก็บเป็นไฟล์ปาร์เก้
ไวยากรณ์:
dataframe_obj.write.saveAsTable(พาธ/Table_name,mode,partitionBy,...)
- Table_name คือชื่อของตารางที่สร้างขึ้นจาก dataframe_obj
- เราสามารถผนวก/เขียนทับข้อมูลของตารางโดยใช้พารามิเตอร์โหมด
- PartitionBy ใช้คอลัมน์เดี่ยว/หลายคอลัมน์เพื่อสร้างพาร์ติชันตามค่าในคอลัมน์ที่ให้มาเหล่านี้
ตัวอย่างที่ 1:
สร้าง PySpark DataFrame ที่มี 5 แถว 4 คอลัมน์ เขียน Dataframe นี้ลงในตารางชื่อ “Agri_Table1”
นำเข้า pyspark
จาก pyspark.sql นำเข้า SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# ข้อมูลการทำฟาร์มที่มี 5 แถวและ 5 คอลัมน์
เกษตร =[{ 'ดิน_ชนิด' : 'สีดำ' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 2500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหรัฐอเมริกา' },
{ 'ดิน_ชนิด' : 'สีดำ' , 'การชลประทาน_ความพร้อมใช้' : 'ใช่' , 'เอเคอร์' : 3500 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'อินเดีย' },
{ 'ดิน_ชนิด' : 'สีแดง' , 'การชลประทาน_ความพร้อมใช้' : 'ใช่' , 'เอเคอร์' : 210 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหราชอาณาจักร' },
{ 'ดิน_ชนิด' : 'อื่น' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 1,000 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'สหรัฐอเมริกา' },
{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'อินเดีย' }]
# สร้าง dataframe จากข้อมูลด้านบน
agri_df = linuxhint_spark_app.createDataFrame (agri)
agri_df.show()
# เขียน DataFrame ด้านบนลงในตาราง
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
เอาท์พุต:
เราจะเห็นว่าไฟล์ parquet หนึ่งไฟล์ถูกสร้างขึ้นด้วย PySpark Data ก่อนหน้า
ตัวอย่างที่ 2:
พิจารณา DataFrame ก่อนหน้าและเขียน 'Agri_Table2' ลงในตารางโดยแบ่งพาร์ติชันระเบียนตามค่าในคอลัมน์ 'ประเทศ'
# เขียน DataFrame ด้านบนลงในตารางด้วยพารามิเตอร์ partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,พาร์ติชันโดย=[ 'ประเทศ' ])
เอาท์พุต:
มีค่าที่ไม่ซ้ำกันสามค่าในคอลัมน์ 'ประเทศ' ได้แก่ 'อินเดีย' 'สหราชอาณาจักร' และ 'สหรัฐอเมริกา' ดังนั้นจึงมีการสร้างพาร์ติชันสามพาร์ติชัน แต่ละพาร์ติชันเก็บไฟล์ไม้ปาร์เก้
Pyspark.sql.DataFrameReader.table()
มาโหลดตารางลงใน PySpark DataFrame โดยใช้ฟังก์ชัน spark.read.table() ใช้เพียงพารามิเตอร์เดียวซึ่งเป็นชื่อพาธ/ตาราง มันโหลดตารางโดยตรงลงใน PySpark DataFrame และฟังก์ชั่น SQL ทั้งหมดที่ใช้กับ PySpark DataFrame ก็สามารถนำไปใช้กับ DataFrame ที่โหลดนี้ได้เช่นกัน
ไวยากรณ์:
spark_app.read.table(เส้นทาง/'ชื่อตาราง')ในสถานการณ์สมมตินี้ เราใช้ตารางก่อนหน้านี้ซึ่งสร้างจาก PySpark DataFrame ตรวจสอบให้แน่ใจว่าคุณต้องใช้ตัวอย่างโค้ดของสถานการณ์ก่อนหน้านี้ในสภาพแวดล้อมของคุณ
ตัวอย่าง:
โหลดตาราง “Agri_Table1” ลงใน DataFrame ชื่อ “loaded_data”
load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )load_data.show()
เอาท์พุต:
เราจะเห็นว่ามีการโหลดตารางลงใน PySpark DataFrame
ดำเนินการแบบสอบถาม SQL
ตอนนี้ เราดำเนินการค้นหา SQL บางส่วนบน DataFrame ที่โหลดโดยใช้ฟังก์ชัน spark.sql()
# ใช้คำสั่ง SELECT เพื่อแสดงคอลัมน์ทั้งหมดจากตารางด้านบนlinuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1' ).แสดง()
#ข้อไหน
linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1 WHERE Soil_status='Dry' ' ).แสดง()
linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1 ที่เอเคอร์ > 2000 ' ).แสดง()
เอาท์พุต:
- แบบสอบถามแรกแสดงคอลัมน์และระเบียนทั้งหมดจาก DataFrame
- ข้อความค้นหาที่สองแสดงระเบียนตามคอลัมน์ “Soil_status” มีเพียงสามบันทึกที่มีองค์ประกอบ 'แห้ง'
- ข้อความค้นหาสุดท้ายส่งคืนระเบียนสองรายการที่มี 'เอเคอร์' ซึ่งมากกว่า 2000
Pyspark.sql.DataFrameWriter.insertInto()
การใช้ฟังก์ชัน insertInto() เราสามารถผนวก DataFrame ลงในตารางที่มีอยู่ เราสามารถใช้ฟังก์ชันนี้ร่วมกับ selectExpr() เพื่อกำหนดชื่อคอลัมน์แล้วแทรกลงในตาราง ฟังก์ชันนี้ยังใช้ชื่อตารางเป็นพารามิเตอร์
ไวยากรณ์:
DataFrame_obj.write.insertInto(’Table_name’)ในสถานการณ์สมมตินี้ เราใช้ตารางก่อนหน้านี้ซึ่งสร้างจาก PySpark DataFrame ตรวจสอบให้แน่ใจว่าคุณต้องใช้ส่วนย่อยโค้ดของสถานการณ์ก่อนหน้าในสภาพแวดล้อมของคุณ
ตัวอย่าง:
สร้าง DataFrame ใหม่ที่มีสองระเบียนแล้วแทรกลงในตาราง “Agri_Table1”
นำเข้า pysparkจาก pyspark.sql นำเข้า SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()
# ข้อมูลการทำฟาร์ม 2 แถว
เกษตร =[{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 2500 , 'สถานะดิน_' : 'แห้ง' ,
'ประเทศ' : 'สหรัฐอเมริกา' },
{ 'ดิน_ชนิด' : 'ทราย' , 'การชลประทาน_ความพร้อมใช้' : 'เลขที่' , 'เอเคอร์' : 1200 , 'สถานะดิน_' : 'เปียก' ,
'ประเทศ' : 'ญี่ปุ่น' }]
# สร้าง dataframe จากข้อมูลด้านบน
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'เอเคอร์' , 'ประเทศ' , 'การชลประทาน_ความพร้อมใช้' , 'ดิน_ชนิด' ,
'ดิน_สถานะ' ).write.insertInto( 'Agri_Table1' )
# แสดง Agri_Table1 สุดท้าย
linuxhint_spark_app.sql( 'เลือก * จาก Agri_Table1' ).แสดง()
เอาท์พุต:
ตอนนี้ จำนวนแถวทั้งหมดที่มีอยู่ใน DataFrame คือ 7
บทสรุป
ตอนนี้คุณเข้าใจวิธีเขียน PySpark DataFrame ลงในตารางโดยใช้ฟังก์ชัน write.saveAsTable() ใช้ชื่อตารางและพารามิเตอร์ทางเลือกอื่นๆ จากนั้น เราโหลดตารางนี้ลงใน PySpark DataFrame โดยใช้ฟังก์ชัน spark.read.table() ใช้เพียงพารามิเตอร์เดียวซึ่งเป็นชื่อพาธ/ตาราง หากคุณต้องการผนวก DataFrame ใหม่ลงในตารางที่มีอยู่ ให้ใช้ฟังก์ชัน insertInto()