PySpark Pandas_Udf()

Pyspark Pandas Udf



การแปลง PySpark DataFrame เป็นไปได้โดยใช้ฟังก์ชัน pandas_udf() เป็นฟังก์ชันที่ผู้ใช้กำหนดซึ่งใช้กับ PySpark DataFrame ด้วยลูกศร เราสามารถดำเนินการ vectorized โดยใช้ pandas_udf() สามารถทำได้โดยผ่านฟังก์ชั่นนี้ในฐานะมัณฑนากร มาดูคู่มือนี้เพื่อทำความเข้าใจเกี่ยวกับไวยากรณ์ พารามิเตอร์ และตัวอย่างต่างๆ

หัวข้อเนื้อหา:

หากคุณต้องการทราบเกี่ยวกับ PySpark DataFrame และการติดตั้งโมดูล โปรดอ่านสิ่งนี้ บทความ .







Pyspark.sql.functions.pandas_udf()

pandas_udf () มีอยู่ในโมดูล sql.functions ใน PySpark ซึ่งสามารถนำเข้าได้โดยใช้คีย์เวิร์ด 'จาก' มันถูกใช้เพื่อดำเนินการ vectorized บน PySpark DataFrame ของเรา ฟังก์ชั่นนี้ใช้งานเหมือนมัณฑนากรโดยผ่านพารามิเตอร์สามตัว หลังจากนั้น เราสามารถสร้างฟังก์ชันที่ผู้ใช้กำหนดเองซึ่งส่งคืนข้อมูลในรูปแบบเวกเตอร์ (เช่น เราใช้ชุด/NumPy สำหรับสิ่งนี้) โดยใช้ลูกศร ภายในฟังก์ชันนี้ เราสามารถคืนค่าผลลัพธ์ได้



โครงสร้างและไวยากรณ์:



ก่อนอื่น มาดูโครงสร้างและไวยากรณ์ของฟังก์ชันนี้กันก่อน:

@pandas_udf(ประเภทข้อมูล)
def function_name(การทำงาน) -> converter_format:
ส่งคืนใบแจ้งยอด

ที่นี่ function_name คือชื่อของฟังก์ชันที่เรากำหนดไว้ ชนิดข้อมูลระบุชนิดข้อมูลที่ส่งคืนโดยฟังก์ชันนี้ เราสามารถส่งคืนผลลัพธ์โดยใช้คีย์เวิร์ด 'return' การดำเนินการทั้งหมดจะดำเนินการภายในฟังก์ชันด้วยการกำหนดลูกศร





Pandas_udf (ฟังก์ชันและประเภทการส่งคืน)

  1. พารามิเตอร์แรกคือฟังก์ชันที่ผู้ใช้กำหนดเองซึ่งส่งผ่านไปยังพารามิเตอร์นั้น
  2. พารามิเตอร์ที่สองใช้เพื่อระบุชนิดข้อมูลที่ส่งคืนจากฟังก์ชัน

ข้อมูล:

ในคำแนะนำทั้งหมดนี้ เราใช้ PySpark DataFrame เพียงอันเดียวในการสาธิต ฟังก์ชันที่ผู้ใช้กำหนดเองทั้งหมดที่เรากำหนดจะถูกนำไปใช้กับ PySpark DataFrame นี้ ตรวจสอบให้แน่ใจว่าคุณสร้าง DataFrame นี้ในสภาพแวดล้อมของคุณก่อนหลังจากติดตั้ง PySpark



นำเข้า pyspark

จาก pyspark.sql นำเข้า SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'คำแนะนำลินุกซ์' ).getOrCreate()

จาก pyspark.sql.functions นำเข้า pandas_udf

จาก pyspark.sql.types นำเข้า *

นำเข้าแพนด้าเป็นหมีแพนด้า

#รายละเอียดผัก

ผัก =[{ 'พิมพ์' : 'ผัก' , 'ชื่อ' : 'มะเขือเทศ' , 'locate_country' : 'สหรัฐอเมริกา' , 'ปริมาณ' : 800 },

{ 'พิมพ์' : 'ผลไม้' , 'ชื่อ' : 'กล้วย' , 'locate_country' : 'จีน' , 'ปริมาณ' : ยี่สิบ },

{ 'พิมพ์' : 'ผัก' , 'ชื่อ' : 'มะเขือเทศ' , 'locate_country' : 'สหรัฐอเมริกา' , 'ปริมาณ' : 800 },

{ 'พิมพ์' : 'ผัก' , 'ชื่อ' : 'มะม่วง' , 'locate_country' : 'ญี่ปุ่น' , 'ปริมาณ' : 0 },

{ 'พิมพ์' : 'ผลไม้' , 'ชื่อ' : 'มะนาว' , 'locate_country' : 'อินเดีย' , 'ปริมาณ' : 1700 },

{ 'พิมพ์' : 'ผัก' , 'ชื่อ' : 'มะเขือเทศ' , 'locate_country' : 'สหรัฐอเมริกา' , 'ปริมาณ' : 1200 },

{ 'พิมพ์' : 'ผัก' , 'ชื่อ' : 'มะม่วง' , 'locate_country' : 'ญี่ปุ่น' , 'ปริมาณ' : 0 },

{ 'พิมพ์' : 'ผลไม้' , 'ชื่อ' : 'มะนาว' , 'locate_country' : 'อินเดีย' , 'ปริมาณ' : 0 }

]

# สร้าง dataframe ตลาดจากข้อมูลข้างต้น

market_df = linuxhint_spark_app.createDataFrame (ผัก)

market_df.show()

เอาท์พุต:

ที่นี่ เราสร้าง DataFrame ที่มี 4 คอลัมน์และ 8 แถว ตอนนี้ เราใช้ pandas_udf() เพื่อสร้างฟังก์ชันที่ผู้ใช้กำหนดและนำไปใช้กับคอลัมน์เหล่านี้

Pandas_udf() ที่มีประเภทข้อมูลต่างกัน

ในสถานการณ์สมมตินี้ เราสร้างฟังก์ชันที่กำหนดโดยผู้ใช้ด้วย pandas_udf() และนำไปใช้กับคอลัมน์และแสดงผลโดยใช้เมธอด select() ในแต่ละกรณี เราใช้ pandas.Series ขณะที่เราดำเนินการ vectorized ซึ่งถือว่าค่าของคอลัมน์เป็นอาร์เรย์หนึ่งมิติ และการดำเนินการจะถูกนำไปใช้กับคอลัมน์ ในมัณฑนากรเอง เราระบุประเภทการส่งคืนฟังก์ชัน

ตัวอย่างที่ 1: Pandas_udf() กับประเภทสตริง

ที่นี่ เราสร้างสองฟังก์ชันที่ผู้ใช้กำหนดด้วยประเภทการส่งคืนสตริงเพื่อแปลงค่าคอลัมน์ประเภทสตริงให้เป็นตัวพิมพ์ใหญ่และตัวพิมพ์เล็ก สุดท้าย เราใช้ฟังก์ชันเหล่านี้กับคอลัมน์ 'type' และ 'locate_country'

# แปลงคอลัมน์ประเภทเป็นตัวพิมพ์ใหญ่ด้วย pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

กลับ i.str.upper()

# แปลงคอลัมน์ locate_country เป็นตัวพิมพ์เล็กด้วย pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

กลับ i.str.lower()

# แสดงคอลัมน์โดยใช้การเลือก ()

market_df.select( 'พิมพ์' ,type_upper_case( 'พิมพ์' ), 'locate_ประเทศ' ,
country_lower_case( 'locate_ประเทศ' )).แสดง()

เอาท์พุต:

คำอธิบาย:

ฟังก์ชัน StringType() มีอยู่ในโมดูล pyspark.sql.types เรานำเข้าโมดูลนี้แล้วในขณะที่สร้าง PySpark DataFrame

  1. ขั้นแรก UDF (ฟังก์ชันที่ผู้ใช้กำหนด) จะคืนค่าสตริงเป็นตัวพิมพ์ใหญ่โดยใช้ฟังก์ชัน str.upper() str.upper() มีอยู่ในโครงสร้างข้อมูลซีรี่ส์ (ในขณะที่เรากำลังแปลงเป็นซีรี่ส์โดยมีลูกศรอยู่ภายในฟังก์ชัน) ซึ่งจะแปลงสตริงที่กำหนดให้เป็นตัวพิมพ์ใหญ่ สุดท้าย ฟังก์ชันนี้ใช้กับคอลัมน์ 'type' ซึ่งระบุไว้ในเมธอด select() ก่อนหน้านี้ สตริงทั้งหมดในคอลัมน์ประเภทจะเป็นตัวพิมพ์เล็ก ตอนนี้เปลี่ยนเป็นตัวพิมพ์ใหญ่แล้ว
  2. ประการที่สอง UDF ส่งคืนสตริงเป็นตัวพิมพ์ใหญ่โดยใช้ฟังก์ชัน str.lower() str.lower() มีอยู่ใน Series Data Structure ซึ่งจะแปลงสตริงที่กำหนดเป็นตัวพิมพ์เล็ก สุดท้าย ฟังก์ชันนี้ใช้กับคอลัมน์ 'type' ซึ่งระบุไว้ในเมธอด select() ก่อนหน้านี้ สตริงทั้งหมดในคอลัมน์ประเภทจะเป็นตัวพิมพ์ใหญ่ ตอนนี้พวกเขาเปลี่ยนเป็นตัวพิมพ์เล็ก

ตัวอย่างที่ 2: Pandas_udf() ที่มีประเภทจำนวนเต็ม

มาสร้าง UDF ที่แปลงคอลัมน์จำนวนเต็ม PySpark DataFrame เป็นชุด Pandas และเพิ่ม 100 ให้กับแต่ละค่า ส่งคอลัมน์ 'ปริมาณ' ไปยังฟังก์ชันนี้ภายในเมธอด select()

#เพิ่ม100

@pandas_udf(ประเภทจำนวนเต็ม())

def add_100(i: panda.Series) -> panda.Series:

ส่งคืน i+ 100

# ส่งคอลัมน์ปริมาณไปยังฟังก์ชันด้านบนและแสดงผล

market_df.select( 'ปริมาณ' ,เพิ่ม_100( 'ปริมาณ' )).แสดง()

เอาท์พุต:

คำอธิบาย:

ภายใน UDF เราจะวนซ้ำค่าทั้งหมดและแปลงเป็น Series หลังจากนั้น เราเพิ่ม 100 ให้กับแต่ละค่าในซีรี่ส์ สุดท้าย เราส่งคอลัมน์ 'ปริมาณ' ไปยังฟังก์ชันนี้ และเราจะเห็นว่ามีการบวก 100 เข้ากับค่าทั้งหมด

Pandas_udf() ที่มีประเภทข้อมูลต่างกันโดยใช้ Groupby() & Agg()

มาดูตัวอย่างการส่ง UDF ไปยังคอลัมน์รวม ที่นี่ ค่าคอลัมน์จะถูกจัดกลุ่มก่อนโดยใช้ฟังก์ชัน groupby() และการรวมจะทำโดยใช้ฟังก์ชัน agg() เราส่ง UDF ของเราภายในฟังก์ชันรวมนี้

ไวยากรณ์:

pyspark_dataframe_object.groupby( 'การจัดกลุ่ม_คอลัมน์' ).agg(UDF
(pyspark_dataframe_object[ 'คอลัมน์' ]))

ที่นี่ ค่าในคอลัมน์การจัดกลุ่มจะถูกจัดกลุ่มก่อน จากนั้น การรวมจะเสร็จสิ้นในแต่ละข้อมูลที่จัดกลุ่มตาม UDF ของเรา

ตัวอย่างที่ 1: Pandas_udf() กับค่าเฉลี่ยรวม()

ที่นี่ เราสร้างฟังก์ชันที่ผู้ใช้กำหนดโดยมีประเภทผลตอบแทนเป็นทศนิยม ภายในฟังก์ชัน เราคำนวณค่าเฉลี่ยโดยใช้ฟังก์ชัน mean() UDF นี้จะถูกส่งผ่านไปยังคอลัมน์ 'ปริมาณ' เพื่อรับปริมาณเฉลี่ยสำหรับแต่ละประเภท

# คืนค่าเฉลี่ย / ค่าเฉลี่ย

@pandas_udf( 'ลอย' )

def average_function(i: panda.Series) -> ลอย:

ส่งคืน i.mean ()

# ส่งคอลัมน์ปริมาณไปยังฟังก์ชันโดยจัดกลุ่มคอลัมน์ประเภท

market_df.groupโดย( 'พิมพ์' ).agg(average_function(market_df[ 'ปริมาณ' ])).แสดง()

เอาท์พุต:

เรากำลังจัดกลุ่มตามองค์ประกอบในคอลัมน์ 'ประเภท' มีการสร้างสองกลุ่ม - 'ผลไม้' และ 'ผัก' สำหรับแต่ละกลุ่ม ค่าเฉลี่ยจะถูกคำนวณและส่งกลับ

ตัวอย่างที่ 2: Pandas_udf() กับ Aggregate Max() และ Min()

ที่นี่ เราสร้างฟังก์ชันที่ผู้ใช้กำหนดขึ้นสองฟังก์ชันด้วยประเภทการคืนค่าจำนวนเต็ม (int) UDF แรกส่งคืนค่าต่ำสุด และ UDF ที่สองส่งคืนค่าสูงสุด

# pandas_udf ที่ส่งคืนค่าต่ำสุด

@pandas_udf( 'อินท์' )

def min_(i: panda.Series) -> int:

กลับ i.min()

# pandas_udf ที่คืนค่าสูงสุด

@pandas_udf( 'อินท์' )

def max_(i: panda.Series) -> int:

ส่งคืน i.max()

# ส่งคอลัมน์ปริมาณไปยัง min_ pandas_udf โดยจัดกลุ่ม locate_country

market_df.groupโดย( 'locate_ประเทศ' ).agg(min_(market_df[ 'ปริมาณ' ])).แสดง()

# ส่งคอลัมน์ปริมาณไปยัง max_ pandas_udf โดยจัดกลุ่ม locate_country

market_df.groupโดย( 'locate_ประเทศ' ).agg(สูงสุด_(market_df[ 'ปริมาณ' ])).แสดง()

เอาท์พุต:

ในการส่งคืนค่าต่ำสุดและสูงสุด เราใช้ฟังก์ชัน min() และ max() ในประเภทการส่งคืนของ UDF ตอนนี้ เราจัดกลุ่มข้อมูลในคอลัมน์ “locate_country” มีสี่กลุ่มเกิดขึ้น (“จีน”, “อินเดีย”, “ญี่ปุ่น”, “สหรัฐอเมริกา”) สำหรับแต่ละกลุ่ม เราจะส่งคืนปริมาณสูงสุด ในทำนองเดียวกัน เราส่งคืนปริมาณขั้นต่ำ

บทสรุป

โดยทั่วไป pandas_udf () ใช้เพื่อดำเนินการ vectorized บน PySpark DataFrame ของเรา เราได้เห็นวิธีการสร้าง pandas_udf() และนำไปใช้กับ PySpark DataFrame เพื่อความเข้าใจที่ดีขึ้น เราได้กล่าวถึงตัวอย่างต่างๆ โดยพิจารณาประเภทข้อมูลทั้งหมด (สตริง ทศนิยม และจำนวนเต็ม) คุณสามารถใช้ pandas_udf() ร่วมกับ groupby() ผ่านฟังก์ชัน agg()