วิธีใช้งานการสตรีมข้อมูลแบบเรียลไทม์ใน Python

Withi Chi Ngan Ka Rs Tri M Khx Mul Baeb Rei Yl Thim Ni Python



การเรียนรู้การใช้งานการสตรีมข้อมูลแบบเรียลไทม์ใน Python ถือเป็นทักษะสำคัญในโลกที่เกี่ยวข้องกับข้อมูลในปัจจุบัน คู่มือนี้จะสำรวจขั้นตอนหลักและเครื่องมือที่จำเป็นสำหรับการใช้การสตรีมข้อมูลแบบเรียลไทม์ด้วยความถูกต้องใน Python ตั้งแต่การเลือกเฟรมเวิร์กที่เหมาะสม เช่น Apache Kafka หรือ Apache Pulsar ไปจนถึงการเขียนโค้ด Python เพื่อการใช้ข้อมูล การประมวลผล และการแสดงภาพที่มีประสิทธิภาพอย่างง่ายดาย เราจะได้รับทักษะที่จำเป็นในการสร้างช่องทางข้อมูลแบบเรียลไทม์ที่คล่องตัวและมีประสิทธิภาพ

ตัวอย่างที่ 1: การใช้งานการสตรีมข้อมูลแบบเรียลไทม์ใน Python

การใช้การสตรีมข้อมูลแบบเรียลไทม์ใน Python ถือเป็นสิ่งสำคัญในยุคและโลกที่ขับเคลื่อนด้วยข้อมูลในปัจจุบัน ในตัวอย่างโดยละเอียดนี้ เราจะอธิบายกระบวนการสร้างระบบสตรีมข้อมูลแบบเรียลไทม์โดยใช้ Apache Kafka และ Python ใน Google Colab







ในการเริ่มต้นตัวอย่างก่อนที่เราจะเริ่มเขียนโค้ด การสร้างสภาพแวดล้อมเฉพาะใน Google Colab ถือเป็นสิ่งสำคัญ สิ่งแรกที่เราต้องทำคือติดตั้งไลบรารี่ที่จำเป็น เราใช้ไลบรารี “kafka-python” สำหรับการบูรณาการ Kafka



! ปิ๊ป ติดตั้ง คาฟคา-หลาม


คำสั่งนี้จะติดตั้งไลบรารี “kafka-python” ซึ่งมีฟังก์ชัน Python และการโยงสำหรับ Apache Kafka ต่อไป เราจะนำเข้าไลบรารีที่จำเป็นสำหรับโครงการของเรา การนำเข้าไลบรารีที่จำเป็น รวมถึง “KafkaProducer” และ “KafkaConsumer” เป็นคลาสจากไลบรารี “kafka-python” ที่ช่วยให้เราสามารถโต้ตอบกับโบรกเกอร์ Kafka ได้ JSON เป็นไลบรารี Python ที่จะทำงานกับข้อมูล JSON ซึ่งเราใช้เพื่อทำให้ข้อความเป็นอนุกรมและดีซีเรียลไลซ์



จาก kafka นำเข้า KafkaProducer, KafkaConsumer
นำเข้า json


การสร้างผู้ผลิตคาฟคา





นี่เป็นสิ่งสำคัญเนื่องจากผู้ผลิต Kafka ส่งข้อมูลไปยังหัวข้อ Kafka ในตัวอย่างของเรา เราสร้างผู้ผลิตเพื่อส่งข้อมูลเรียลไทม์จำลองไปยังหัวข้อที่เรียกว่า 'หัวข้อเรียลไทม์'

เราสร้างอินสแตนซ์ “KafkaProducer” ซึ่งระบุที่อยู่ของนายหน้า Kafka เป็น “localhost:9092” จากนั้นเราใช้ 'value_serializer' ซึ่งเป็นฟังก์ชันที่ทำให้ข้อมูลเป็นอนุกรมก่อนที่จะส่งไปที่ Kafka ในกรณีของเรา ฟังก์ชัน lambda จะเข้ารหัสข้อมูลเป็น JSON ที่เข้ารหัส UTF-8 ตอนนี้ เรามาจำลองข้อมูลแบบเรียลไทม์และส่งไปที่หัวข้อคาฟคากันดีกว่า



โปรดิวเซอร์ = คาฟคาโปรดิวเซอร์ ( bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_serializer =แลมบ์ดา v: json.dumps ( ใน ) .เข้ารหัส ( 'utf-8' ) )
# ข้อมูลจำลองแบบเรียลไทม์
ข้อมูล = { 'เซ็นเซอร์_id' : : 1 , 'อุณหภูมิ' : : 25.5 , 'ความชื้น' : : 60.2 }
#ส่งข้อมูลเข้าหัวข้อ
โปรดิวเซอร์.ส่ง ( 'หัวข้อเรียลไทม์' , ข้อมูล )


ในบรรทัดเหล่านี้ เรากำหนดพจนานุกรม 'ข้อมูล' ที่แสดงถึงข้อมูลเซ็นเซอร์จำลอง จากนั้นเราใช้วิธี 'ส่ง' เพื่อเผยแพร่ข้อมูลนี้ไปยัง 'หัวข้อแบบเรียลไทม์'

จากนั้น เราต้องการสร้างผู้ใช้ Kafka และผู้ใช้ Kafka อ่านข้อมูลจากหัวข้อ Kafka เราสร้างผู้บริโภคเพื่อใช้และประมวลผลข้อความใน 'หัวข้อแบบเรียลไทม์' เราสร้างอินสแตนซ์ “KafkaConsumer” โดยระบุหัวข้อที่เราต้องการใช้ เช่น (หัวข้อแบบเรียลไทม์) และที่อยู่ของนายหน้า Kafka จากนั้น “value_deserializer” จะเป็นฟังก์ชันที่ทำการดีซีเรียลไลซ์ข้อมูลที่ได้รับจาก Kafka ในกรณีของเรา ฟังก์ชัน lambda จะถอดรหัสข้อมูลเป็น JSON ที่เข้ารหัส UTF-8

ผู้บริโภค = KafkaConsumer ( 'หัวข้อเรียลไทม์' ,
bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_deserializer =แลมบ์ดา x: json.loads ( x.ถอดรหัส ( 'utf-8' ) ) )


เราใช้การวนซ้ำเพื่อใช้และประมวลผลข้อความจากหัวข้ออย่างต่อเนื่อง

# การอ่านและประมวลผลข้อมูลแบบเรียลไทม์
สำหรับ ข้อความ ใน ผู้บริโภค:
ข้อมูล = message.value
พิมพ์ ( 'ข้อมูลที่ได้รับ: {data}' )


เราดึงค่าของแต่ละข้อความและข้อมูลเซ็นเซอร์จำลองของเราภายในลูปและพิมพ์ไปยังคอนโซล การเรียกใช้ผู้ผลิตและผู้บริโภคของ Kafka เกี่ยวข้องกับการเรียกใช้โค้ดนี้ใน Google Colab และดำเนินการเซลล์โค้ดทีละเซลล์ ผู้ผลิตส่งข้อมูลจำลองไปยังหัวข้อ Kafka และผู้บริโภคอ่านและพิมพ์ข้อมูลที่ได้รับ


การวิเคราะห์ผลลัพธ์ในขณะที่โค้ดรัน

เราจะสังเกตข้อมูลแบบเรียลไทม์ที่ผลิตและบริโภค รูปแบบข้อมูลอาจแตกต่างกันไปขึ้นอยู่กับการจำลองหรือแหล่งข้อมูลจริงของเรา ในตัวอย่างโดยละเอียดนี้ เราครอบคลุมกระบวนการทั้งหมดในการตั้งค่าระบบสตรีมข้อมูลแบบเรียลไทม์โดยใช้ Apache Kafka และ Python ใน Google Colab เราจะอธิบายโค้ดแต่ละบรรทัดและความสำคัญของโค้ดในการสร้างระบบนี้ การสตรีมข้อมูลแบบเรียลไทม์เป็นความสามารถอันทรงพลัง และตัวอย่างนี้ทำหน้าที่เป็นรากฐานสำหรับแอปพลิเคชันในโลกแห่งความเป็นจริงที่ซับซ้อนยิ่งขึ้น

ตัวอย่างที่ 2: การใช้การสตรีมข้อมูลแบบเรียลไทม์ใน Python โดยใช้ข้อมูลตลาดหุ้น

เรามาทำอีกตัวอย่างหนึ่งของการนำการสตรีมข้อมูลแบบเรียลไทม์ใน Python โดยใช้สถานการณ์ที่แตกต่างกัน ครั้งนี้เราจะเน้นไปที่ข้อมูลตลาดหุ้น เราสร้างระบบสตรีมข้อมูลแบบเรียลไทม์ที่บันทึกการเปลี่ยนแปลงของราคาหุ้นและประมวลผลโดยใช้ Apache Kafka และ Python ใน Google Colab ดังที่แสดงในตัวอย่างก่อนหน้านี้ เราเริ่มต้นด้วยการกำหนดค่าสภาพแวดล้อมของเราใน Google Colab ขั้นแรกเราติดตั้งไลบรารีที่จำเป็น:

! ปิ๊ป ติดตั้ง คาฟคา-หลาม yfinance


ที่นี่ เราได้เพิ่มไลบรารี “yfinance” ซึ่งช่วยให้เราได้รับข้อมูลตลาดหุ้นแบบเรียลไทม์ ต่อไปเราจะนำเข้าไลบรารีที่จำเป็น เรายังคงใช้คลาส “KafkaProducer” และ “KafkaConsumer” จากไลบรารี “kafka-python” สำหรับการโต้ตอบกับ Kafka ต่อไป เรานำเข้า JSON เพื่อทำงานกับข้อมูล JSON นอกจากนี้เรายังใช้ 'yfinance' เพื่อรับข้อมูลตลาดหุ้นแบบเรียลไทม์ นอกจากนี้เรายังนำเข้าไลบรารี 'เวลา' เพื่อเพิ่มการหน่วงเวลาเพื่อจำลองการอัปเดตแบบเรียลไทม์

จาก kafka นำเข้า KafkaProducer, KafkaConsumer
นำเข้า json
นำเข้า yfinance เช่น ปี
นำเข้า เวลา


ตอนนี้เราสร้างผู้ผลิต Kafka สำหรับข้อมูลสต็อก ผู้ผลิต Kafka ของเราได้รับข้อมูลหุ้นแบบเรียลไทม์และส่งไปยังหัวข้อ Kafka ชื่อ 'ราคาหุ้น'

โปรดิวเซอร์ = คาฟคาโปรดิวเซอร์ ( bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_serializer =แลมบ์ดา v: json.dumps ( ใน ) .เข้ารหัส ( 'utf-8' ) )

ในขณะที่ จริง:
หุ้น = yf.Ticker ( “เอเอพีแอล” ) # ตัวอย่าง: หุ้นของ Apple Inc
stock_data = stock.history ( ระยะเวลา = '1 วัน' )
Last_price = stock_data [ 'ปิด' ] .iloc [ - - 1 ]
ข้อมูล = { 'เครื่องหมาย' : : “เอเอพีแอล” , 'ราคา' : Last_price }
โปรดิวเซอร์.ส่ง ( 'ราคาหุ้น' , ข้อมูล )
เวลา.การนอนหลับ ( 10 ) # จำลองการอัปเดตแบบเรียลไทม์ทุกๆ 10 วินาที


เราสร้างอินสแตนซ์ “KafkaProducer” โดยมีที่อยู่ของนายหน้า Kafka ในโค้ดนี้ ภายในวง เราใช้ 'yfinance' เพื่อรับราคาหุ้นล่าสุดสำหรับ Apple Inc. (“AAPL”) จากนั้นเราแยกราคาปิดสุดท้ายแล้วส่งไปที่หัวข้อ “ราคาหุ้น” ในที่สุด เราจะแนะนำการหน่วงเวลาเพื่อจำลองการอัปเดตแบบเรียลไทม์ทุกๆ 10 วินาที

เรามาสร้าง Kafka Consumer เพื่ออ่านและประมวลผลข้อมูลราคาหุ้นจากหัวข้อ “ราคาหุ้น” กันดีกว่า

ผู้บริโภค = KafkaConsumer ( 'ราคาหุ้น' ,
bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_deserializer =แลมบ์ดา x: json.loads ( x.ถอดรหัส ( 'utf-8' ) ) )

สำหรับ ข้อความ ใน ผู้บริโภค:
stock_data = message.value
พิมพ์ ( 'ข้อมูลหุ้นที่ได้รับ: {stock_data['สัญลักษณ์']} - ราคา: {stock_data['price']}' )


รหัสนี้คล้ายกับการตั้งค่าผู้บริโภคของตัวอย่างก่อนหน้า โดยจะอ่านและประมวลผลข้อความจากหัวข้อ “ราคาหุ้น” อย่างต่อเนื่อง และพิมพ์สัญลักษณ์หุ้นและราคาไปยังคอนโซล เราดำเนินการเซลล์โค้ดตามลำดับ เช่น ทีละเซลล์ใน Google Colab เพื่อเรียกใช้ผู้ผลิตและผู้บริโภค ผู้ผลิตจะได้รับและส่งการอัปเดตราคาหุ้นแบบเรียลไทม์ในขณะที่ผู้บริโภคอ่านและแสดงข้อมูลนี้

! ปิ๊ป ติดตั้ง คาฟคา-หลาม yfinance
จาก kafka นำเข้า KafkaProducer, KafkaConsumer
นำเข้า json
นำเข้า yfinance เช่น ปี
นำเข้า เวลา
โปรดิวเซอร์ = คาฟคาโปรดิวเซอร์ ( bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_serializer =แลมบ์ดา v: json.dumps ( ใน ) .เข้ารหัส ( 'utf-8' ) )

ในขณะที่ จริง:
หุ้น = yf.Ticker ( “เอเอพีแอล” ) # หุ้น Apple Inc
stock_data = stock.history ( ระยะเวลา = '1 วัน' )
Last_price = stock_data [ 'ปิด' ] .iloc [ - - 1 ]

ข้อมูล = { 'เครื่องหมาย' : : “เอเอพีแอล” , 'ราคา' : Last_price }

โปรดิวเซอร์.ส่ง ( 'ราคาหุ้น' , ข้อมูล )

เวลา.การนอนหลับ ( 10 ) # จำลองการอัปเดตแบบเรียลไทม์ทุกๆ 10 วินาที
ผู้บริโภค = KafkaConsumer ( 'ราคาหุ้น' ,
bootstrap_servers = 'โฮสต์ท้องถิ่น:9092' ,
value_deserializer =แลมบ์ดา x: json.loads ( x.ถอดรหัส ( 'utf-8' ) ) )

สำหรับ ข้อความ ใน ผู้บริโภค:
stock_data = message.value
พิมพ์ ( 'ข้อมูลหุ้นที่ได้รับ: {stock_data['สัญลักษณ์']} - ราคา: {stock_data['price']}' )


ในการวิเคราะห์ผลลัพธ์หลังจากการรันโค้ด เราจะสังเกตการอัปเดตราคาหุ้นแบบเรียลไทม์สำหรับ Apple Inc. ที่ผลิตและบริโภค

บทสรุป

ในตัวอย่างที่ไม่เหมือนใครนี้ เราได้สาธิตการใช้งานการสตรีมข้อมูลแบบเรียลไทม์ใน Python โดยใช้ Apache Kafka และไลบรารี “yfinance” เพื่อบันทึกและประมวลผลข้อมูลตลาดหุ้น เราอธิบายโค้ดแต่ละบรรทัดอย่างละเอียด การสตรีมข้อมูลแบบเรียลไทม์สามารถนำไปใช้กับสาขาต่างๆ เพื่อสร้างแอปพลิเคชันในโลกแห่งความเป็นจริงในด้านการเงิน IoT และอื่นๆ