PySpark อ่าน JSON()

Pyspark Xan Json



ในขณะที่ทำงานกับ PySpark DataFrames จะต้องจัดเก็บไว้ใน PySpark DataFrame หากคุณต้องการประมวลผลข้อมูล JSON หลังจากจัดเก็บไว้ใน DataFrame แล้ว เราสามารถใช้การดำเนินการและวิธีการต่างๆ กับข้อมูลได้ นอกจากนี้ ยังมีข้อดีอีกมากมายหากเราแปลง JSON เป็น PySpark DataFrame เพราะมันง่ายและเราสามารถแปลง/แบ่งพาร์ติชันข้อมูลด้วยวิธีที่ง่ายกว่า

หัวข้อเนื้อหา:

อ่าน JSON ลงใน PySpark DataFrame โดยใช้ Pandas.read_json()







อ่าน JSON เป็น PySpark DataFrame โดยใช้ Spark.read.json()



อ่าน JSON เป็น PySpark DataFrame โดยใช้ PySpark SQL



ในบทช่วยสอนนี้ เราจะดูวิธีอ่าน JSON ใน PySpark DataFrame โดยใช้ pandas.read_json(), spark.read.json() และ spark.sql ในทุกสถานการณ์ เราจะดูตัวอย่างต่างๆ โดยพิจารณาจากรูปแบบ JSON ที่แตกต่างกัน





ติดตั้งไลบรารี PySpark ก่อนดำเนินการตามตัวอย่างต่อไปนี้

pip ติดตั้ง pyspark

หลังจากติดตั้งสำเร็จ คุณจะเห็นผลลัพธ์ดังต่อไปนี้:



อ่าน JSON ลงใน PySpark DataFrame โดยใช้ Pandas.read_json()

ใน PySpark ใช้เมธอด createDataFrame() เพื่อสร้าง DataFrame โดยตรง ที่นี่ เราเพียงแค่ต้องส่งไฟล์/พาธ JSON ไปยังไฟล์ JSON ผ่านเมธอด pandas.read_json() เมธอด read_json() นี้ใช้ชื่อไฟล์/เส้นทางที่มีอยู่ในโมดูล Pandas ด้วยเหตุนี้จึงจำเป็นต้องนำเข้าและใช้โมดูล Pandas

ไวยากรณ์:

spark_app.createDataFrame(แพนด้า.read_json( 'file_name.json' ))

ตัวอย่าง:

มาสร้างไฟล์ JSON ชื่อ “student_skill.json” ที่มี 2 ระเบียน ที่นี่ คีย์/คอลัมน์คือ “Student 1” และ “Student 2” แถวคือชื่อ อายุ ทักษะ1 และทักษะ2

นำเข้า pyspark

นำเข้าแพนด้า

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# การใช้ pandas.read_json()

candidate_skills = linuxhint_spark_app.createDataFrame(แพนด้า.read_json( 'student_skill.json' ))

candidate_skills.show()

เอาท์พุต:

เราจะเห็นว่าข้อมูล JSON ถูกแปลงเป็น PySpark DataFrame ด้วยคอลัมน์และแถวที่ระบุ

2. การอ่าน JSON ไปยัง PySpark DataFrame โดยใช้ Spark.read.json()

read.json() เป็นเมธอดที่คล้ายกับ read_json() ใน Pandas ที่นี่ read.json() ใช้เส้นทางไปยัง JSON หรือโดยตรงไปยังไฟล์ JSON และโหลดลงใน PySpark DataFrame โดยตรง ไม่จำเป็นต้องใช้วิธี createDataFrame() ในสถานการณ์นี้ หากคุณต้องการอ่านไฟล์ JSON หลายไฟล์พร้อมกัน เราจำเป็นต้องส่งรายการชื่อไฟล์ JSON ผ่านรายการที่คั่นด้วยเครื่องหมายจุลภาค ระเบียน JSON ทั้งหมดจะจัดเก็บไว้ใน DataFrame เดียว

ไวยากรณ์:

ไฟล์เดียว - spark_app.read.json( 'file_name.json' )

หลายไฟล์ - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

สถานการณ์ที่ 1: อ่าน JSON มีบรรทัดเดียว

หากไฟล์ JSON ของคุณอยู่ในรูปแบบ record1, record2, record3… (บรรทัดเดียว) เราสามารถเรียกมันว่า JSON ด้วยบรรทัดเดียว Spark ประมวลผลบันทึกเหล่านี้และจัดเก็บไว้ใน PySpark DataFrame เป็นแถว แต่ละระเบียนเป็นแถวใน PySpark DataFrame

มาสร้างไฟล์ JSON ชื่อ “candidate_skills.json” ที่มี 3 ระเบียน อ่าน JSON นี้ใน PySpark DataFrame

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านcandidate_skills.json ลงใน PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

candidate_skills.show()

เอาท์พุต:

เราจะเห็นว่าข้อมูล JSON ถูกแปลงเป็น PySpark DataFrame ด้วยระเบียนและชื่อคอลัมน์ที่ระบุ

สถานการณ์ที่ 2: อ่าน JSON ที่มีหลายบรรทัด

หากไฟล์ JSON ของคุณมีหลายบรรทัด คุณต้องใช้เมธอด read.option().json() เพื่อส่งพารามิเตอร์หลายบรรทัดซึ่งต้องตั้งค่าเป็นจริง สิ่งนี้ช่วยให้เราสามารถโหลด JSON ที่มีหลายบรรทัดลงใน PySpark DataFrame

อ่านตัวเลือก ( 'หลายบรรทัด' , 'จริง' ).json( 'file_name.json' )

มาสร้างไฟล์ JSON ชื่อ “multi.json” ที่เก็บ 3 ระเบียน อ่าน JSON นี้ใน PySpark DataFrame

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่าน multi.json (มีหลายบรรทัด) ใน PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.option( 'หลายบรรทัด' , 'จริง' ).json( 'multi.json' )

candidate_skills.show()

เอาท์พุต:

สถานการณ์ที่ 3: อ่าน JSON หลายรายการ

เราได้พูดคุยกันแล้วในช่วงเริ่มต้นของบทช่วยสอนนี้เกี่ยวกับไฟล์ JSON หลายไฟล์ หากคุณต้องการอ่านไฟล์ JSON หลายไฟล์พร้อมกันและจัดเก็บไว้ใน PySpark DataFrame เดียว เราจำเป็นต้องส่งรายชื่อไฟล์ไปยังเมธอด read.json()

มาสร้างไฟล์ JSON สองไฟล์ชื่อ “candidate_skills.json” และ “candidate_skills2.json” แล้วโหลดลงใน PySpark DataFrame

ไฟล์ “candidate_skills.json” มีสามบันทึก

ไฟล์ “candidate_skill2.json” เก็บเพียงระเบียนเดียว

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านcandidate_skillsและcandidate_skills2ไฟล์พร้อมกันใน PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])

candidate_skills.show()

เอาท์พุต:

สุดท้าย DataFrame เก็บสี่ระเบียน สามระเบียนแรกเป็นของ JSON แรก และระเบียนสุดท้ายเป็นของ JSON ที่สอง

อ่าน JSON เป็น PySpark DataFrame โดยใช้ Spark.read.json()

read.json() เป็นเมธอดที่คล้ายกับ read_json() ใน Pandas ที่นี่ read.json() ใช้เส้นทางไปยัง JSON หรือโดยตรงไปยังไฟล์ JSON และโหลดลงใน PySpark DataFrame โดยตรง ไม่จำเป็นต้องใช้วิธี createDataFrame() ในสถานการณ์นี้ หากคุณต้องการอ่านไฟล์ JSON หลายไฟล์พร้อมกัน เราจำเป็นต้องส่งรายการชื่อไฟล์ JSON ผ่านรายการที่คั่นด้วยเครื่องหมายจุลภาค ระเบียน JSON ทั้งหมดจะจัดเก็บไว้ใน DataFrame เดียว

ไวยากรณ์:

ไฟล์เดียว - spark_app.read.json( 'file_name.json' )

หลายไฟล์ - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

สถานการณ์ที่ 1: อ่าน JSON มีบรรทัดเดียว

หากไฟล์ JSON ของคุณอยู่ในรูปแบบ record1, record2, record3… (บรรทัดเดียว) เราสามารถเรียกมันว่า JSON ด้วยบรรทัดเดียว Spark ประมวลผลบันทึกเหล่านี้และจัดเก็บไว้ใน PySpark DataFrame เป็นแถว แต่ละระเบียนเป็นแถวใน PySpark DataFrame

มาสร้างไฟล์ JSON ชื่อ “candidate_skills.json” ที่มี 3 ระเบียน อ่าน JSON นี้ใน PySpark DataFrame

นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# อ่านcandidate_skills.json ลงใน PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

candidate_skills.show()

เอาท์พุต:

เราจะเห็นว่าข้อมูล JSON ถูกแปลงเป็น PySpark DataFrame ด้วยระเบียนและชื่อคอลัมน์ที่ระบุ

อ่าน JSON เป็น PySpark DataFrame โดยใช้ PySpark SQL

สามารถสร้างมุมมองชั่วคราวของข้อมูล JSON ของเราโดยใช้ PySpark SQL เราสามารถจัดเตรียม JSON ได้โดยตรงในขณะที่สร้างมุมมองชั่วคราว ดูไวยากรณ์ต่อไปนี้ หลังจากนั้น เราสามารถใช้คำสั่ง SELECT เพื่อแสดง PySpark DataFrame

ไวยากรณ์:

spark_app.sql( 'สร้างมุมมองชั่วคราว VIEW_NAME โดยใช้ตัวเลือก json (เส้นทาง 'file_name.json')' )

ที่นี่ “VIEW_NAME” คือมุมมองของข้อมูล JSON และ “file_name” คือชื่อของไฟล์ JSON

ตัวอย่างที่ 1:

พิจารณาไฟล์ JSON ที่ใช้ในตัวอย่างก่อนหน้านี้ – “candidate_skills.json” เลือกแถวทั้งหมดจาก DataFrame โดยใช้ SELECT กับตัวดำเนินการ “*” ที่นี่ * เลือกคอลัมน์ทั้งหมดจาก PySpark DataFrame

นำเข้า pyspark

นำเข้าแพนด้า

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

# การใช้ spark.sql เพื่อสร้าง VIEW จาก JSON

candidate_skills = linuxhint_spark_app.sql( 'สร้างมุมมองชั่วคราว Candidate_data โดยใช้ตัวเลือก json (เส้นทาง 'candidate_skills.json')' )

# ใช้แบบสอบถาม SELECT เพื่อเลือกระเบียนทั้งหมดจาก Candidate_data

linuxhint_spark_app.sql( 'เลือก * จาก Candidate_data' ).แสดง()

เอาท์พุต:

จำนวนบันทึกทั้งหมดใน PySpark DataFrame (อ่านจาก JSON) คือ 3

ตัวอย่างที่ 2:

ตอนนี้ กรองบันทึกใน PySpark DataFrame ตามคอลัมน์อายุ ใช้ตัวดำเนินการ 'มากกว่า' กับอายุเพื่อรับแถวที่มีอายุมากกว่า 22 ปี

# ใช้แบบสอบถาม SELECT เพื่อเลือกระเบียนที่มีอายุ > 22 ปี

linuxhint_spark_app.sql( 'เลือก * จาก Candidate_data ที่อายุ>22' ).แสดง()

เอาท์พุต:

มีเพียงบันทึกเดียวใน PySpark DataFrame ที่มีอายุมากกว่า 22 ปี

บทสรุป

เราได้เรียนรู้สามวิธีที่แตกต่างกันในการอ่าน JSON ใน PySpark DataFrame ก่อนอื่น เราเรียนรู้วิธีใช้เมธอด read_json() ที่มีอยู่ในโมดูล Pandas เพื่ออ่าน JSON ไปยัง PySpark DataFrame ต่อไป เราเรียนรู้วิธีอ่านไฟล์ JSON บรรทัดเดียว/หลายบรรทัดโดยใช้เมธอด spark.read.json() กับ option() หากต้องการอ่านไฟล์ JSON หลายไฟล์พร้อมกัน เราจำเป็นต้องส่งรายชื่อไฟล์ไปยังเมธอดนี้ เมื่อใช้ PySpark SQL ไฟล์ JSON จะถูกอ่านในมุมมองชั่วคราวและ DataFrame จะแสดงโดยใช้แบบสอบถาม SELECT