Kuinka ottaa käyttöön reaaliaikainen datavirtaus Pythonissa

Kuinka Ottaa Kayttoon Reaaliaikainen Datavirtaus Pythonissa



Pythonin reaaliaikaisen datan suoratoiston toteutuksen hallinta on olennainen taito nykypäivän datamaailmassa. Tämä opas tutkii tärkeimmät vaiheet ja keskeiset työkalut reaaliaikaisen datan suoratoiston aitouden hyödyntämiseksi Pythonissa. Sopivan kehyksen, kuten Apache Kafka tai Apache Pulsar, valinnasta Python-koodin kirjoittamiseen vaivatonta tiedonkulutusta, käsittelyä ja tehokasta visualisointia varten, hankimme tarvittavat taidot kettereiden ja tehokkaiden reaaliaikaisten tietokanavien rakentamiseen.

Esimerkki 1: Reaaliaikaisen datan suoratoiston toteutus Pythonissa

Reaaliaikaisen datan suoratoiston toteuttaminen Pythonissa on ratkaisevan tärkeää nykypäivän datavetoisessa iässä ja maailmassa. Tässä yksityiskohtaisessa esimerkissä käymme läpi prosessin, jolla rakennetaan reaaliaikainen datan suoratoistojärjestelmä käyttämällä Apache Kafkaa ja Pythonia Google Colabissa.







Jotta esimerkki voidaan alustaa ennen koodauksen aloittamista, tietyn ympäristön luominen Google Colabissa on välttämätöntä. Ensimmäinen asia, joka meidän on tehtävä, on asentaa tarvittavat kirjastot. Käytämme 'kafka-python' -kirjastoa Kafka-integraatioon.



! pip Asentaa kafka-python


Tämä komento asentaa 'kafka-python' -kirjaston, joka tarjoaa Python-funktiot ja sidokset Apache Kafkalle. Seuraavaksi tuomme tarvittavat kirjastot projektiimme. Vaadittujen kirjastojen, mukaan lukien 'KafkaProducer' ja 'KafkaConsumer', tuonti ovat 'kafka-python' -kirjaston luokat, joiden avulla voimme olla vuorovaikutuksessa Kafka-välittäjien kanssa. JSON on Python-kirjasto, joka toimii JSON-tietojen kanssa, joita käytämme viestien sarjoittamiseen ja sarjoittamiseen.



kafka tuonti KafkaProducer, KafkaConsumer
tuo json


Kafka-tuottajan perustaminen





Tämä on tärkeää, koska Kafka-tuottaja lähettää tiedot Kafka-aiheeseen. Esimerkissämme luomme tuottajan, joka lähettää simuloidun reaaliaikaisen datan aiheeseen nimeltä 'reaaliaikainen aihe'.

Luomme 'KafkaProducer'-instanssin, joka määrittää Kafka-välittäjän osoitteeksi 'localhost:9092'. Sitten käytämme 'value_serializer' -funktiota, joka sarjoittaa tiedot ennen lähettämistä Kafkalle. Meidän tapauksessamme lambda-funktio koodaa tiedot UTF-8-koodatuksi JSONiksi. Simuloillaan nyt joitain reaaliaikaisia ​​tietoja ja lähetämme ne Kafka-aiheeseen.



tuottaja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( sisään ) .encode ( 'utf-8' ) )
# Simuloitu reaaliaikainen data
data = { 'sensor_id' : 1 , 'lämpötila' : 25.5 , 'kosteus' : 60.2 }
# Lähetetään tietoja aiheeseen
tuottaja.lähetä ( 'reaaliaikainen aihe' , dataa )


Näillä riveillä määrittelemme 'data'-sanakirjan, joka edustaa simuloitua anturidataa. Käytämme sitten 'lähetä'-menetelmää julkaistaksemme nämä tiedot 'reaaliaikaiseen aiheeseen'.

Sitten haluamme luoda Kafka-kuluttajan, ja Kafka-kuluttaja lukee tiedot Kafka-aiheesta. Luomme kuluttajan kuluttamaan ja käsittelemään viestejä 'reaaliaikaisessa aiheessa'. Luomme 'KafkaConsumer' -esiintymän, jossa määritetään aihe, jota haluamme käyttää, esim. (reaaliaikainen aihe) ja Kafka-välittäjän osoite. Sitten 'value_deserializer' on funktio, joka deserialisoi Kafkasta saadut tiedot. Meidän tapauksessamme lambda-funktio purkaa tiedot UTF-8-koodatuksi JSONiksi.

kuluttaja = KafkaConsumer ( 'reaaliaikainen aihe' ,
bootstrap_servers = 'localhost:9092' ,
arvo_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Käytämme iteratiivista silmukkaa aiheen viestien jatkuvaan kuluttamiseen ja käsittelyyn.

# Reaaliaikaisten tietojen lukeminen ja käsittely
varten viesti sisään kuluttaja:
data = viesti.arvo
Tulosta ( f 'Vastaanotetut tiedot: {data}' )


Haemme jokaisen viestin arvon ja simuloidut anturitietomme silmukan sisällä ja tulostamme ne konsoliin. Kafkan tuottajan ja kuluttajan suorittaminen edellyttää tämän koodin suorittamista Google Colabissa ja koodisolujen suorittamista yksitellen. Tuottaja lähettää simuloidut tiedot Kafka-aiheeseen, ja kuluttaja lukee ja tulostaa saamansa tiedot.


Lähdön analyysi koodin suorituksen aikana

Tarkkailemme reaaliaikaista dataa, jota tuotetaan ja kulutetaan. Tietomuoto voi vaihdella simulaatiostamme tai todellisesta tietolähteestä riippuen. Tässä yksityiskohtaisessa esimerkissä katamme koko reaaliaikaisen datan suoratoistojärjestelmän perustamisprosessin käyttämällä Apache Kafkaa ja Pythonia Google Colabissa. Selitämme jokaisen koodirivin ja sen merkityksen tämän järjestelmän rakentamisessa. Reaaliaikainen datan suoratoisto on tehokas ominaisuus, ja tämä esimerkki toimii perustana monimutkaisemmille reaalimaailman sovelluksille.

Esimerkki 2: Reaaliaikaisen datan suoratoiston toteuttaminen Pythonissa käyttämällä pörssitietoja

Tehdään toinen ainutlaatuinen esimerkki reaaliaikaisen datan suoratoiston toteuttamisesta Pythonissa käyttämällä erilaista skenaariota; Tällä kertaa keskitymme osakemarkkinatietoihin. Luomme reaaliaikaisen datan suoratoistojärjestelmän, joka tallentaa osakekurssien muutokset ja käsittelee ne Apache Kafkan ja Pythonin avulla Google Colabissa. Kuten edellisessä esimerkissä osoitettiin, aloitamme määrittämällä ympäristömme Google Colabissa. Ensin asennamme tarvittavat kirjastot:

! pip Asentaa kafka-python yfinance


Lisäämme tähän 'yfinance' -kirjaston, jonka avulla voimme saada reaaliaikaisia ​​osakemarkkinatietoja. Seuraavaksi tuomme tarvittavat kirjastot. Jatkamme 'kafka-python'-kirjaston 'KafkaProducer'- ja 'KafkaConsumer'-luokkien käyttöä Kafka-vuorovaikutukseen. Tuomme JSONin toimimaan JSON-tietojen kanssa. Käytämme myös 'yfinancea' saadaksemme reaaliaikaisia ​​osakemarkkinatietoja. Tuomme myös 'aika'-kirjaston lisätäksemme aikaviiveen reaaliaikaisten päivitysten simuloimiseksi.

kafka tuonti KafkaProducer, KafkaConsumer
tuo json
tuoda yrahoitusta kuten yf
tuonti aika


Nyt luomme Kafka-tuottajan varastotiedoille. Kafka-tuottajamme saa reaaliaikaiset varastotiedot ja lähettää sen Kafka-aiheeseen nimeltä 'osakehinta'.

tuottaja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
arvo_serializer =lambda v: json.dumps ( sisään ) .encode ( 'utf-8' ) )

sillä aikaa Totta:
stock = yf.Ticker ( 'AAPL' ) # Esimerkki: Apple Inc:n osakkeet
stock_data = stock.history ( ajanjaksoa = '1p' )
viimeinen_hinta = varastotiedot [ 'Kiinni' ] .iloc [ - 1 ]
data = { 'symboli' : 'AAPL' , 'hinta' : viimeinen hinta }
tuottaja.lähetä ( 'osakekurssi' , dataa )
aika.nukkua ( 10 ) # Simuloi reaaliaikaisia ​​päivityksiä 10 sekunnin välein


Luomme 'KafkaProducer'-esiintymän, jossa on Kafka-välittäjän osoite tässä koodissa. Silmukan sisällä käytämme 'yfinancea' saadaksemme Apple Inc:n ('AAPL') viimeisimmän osakekurssin. Sitten poimimme viimeisen päätöskurssin ja lähetämme sen 'osakehinta' -aiheeseen. Lopulta otamme käyttöön aikaviiveen, joka simuloi reaaliaikaisia ​​päivityksiä 10 sekunnin välein.

Luodaan Kafka-kuluttaja lukemaan ja käsittelemään osakekurssitietoja aiheesta 'osakehinta'.

kuluttaja = KafkaConsumer ( 'osakekurssi' ,
bootstrap_servers = 'localhost:9092' ,
arvo_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

varten viesti sisään kuluttaja:
stock_data = viesti.arvo
Tulosta ( f 'Vastaanotetut varastotiedot: {stock_data['symbol']} - Hinta: {stock_data['price']}' )


Tämä koodi on samanlainen kuin edellisen esimerkin kuluttaja-asetus. Se lukee ja käsittelee jatkuvasti 'osakehinta'-aiheen viestejä ja tulostaa osakesymbolin ja -hinnan konsoliin. Suoritamme koodisolut peräkkäin, esimerkiksi yksitellen Google Colabissa tuottajan ja kuluttajan ajamiseksi. Tuottaja saa ja lähettää reaaliaikaiset osakekurssipäivitykset, kun kuluttaja lukee ja näyttää nämä tiedot.

! pip Asentaa kafka-python yfinance
kafka tuonti KafkaProducer, KafkaConsumer
tuo json
tuoda yrahoitusta kuten yf
tuonti aika
tuottaja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
arvo_serializer =lambda v: json.dumps ( sisään ) .encode ( 'utf-8' ) )

sillä aikaa Totta:
stock = yf.Ticker ( 'AAPL' ) # Apple Inc:n osakkeet
stock_data = stock.history ( ajanjaksoa = '1p' )
viimeinen_hinta = varastotiedot [ 'Kiinni' ] .iloc [ - 1 ]

data = { 'symboli' : 'AAPL' , 'hinta' : viimeinen hinta }

tuottaja.lähetä ( 'osakekurssi' , dataa )

aika.nukkua ( 10 ) # Simuloi reaaliaikaisia ​​päivityksiä 10 sekunnin välein
kuluttaja = KafkaConsumer ( 'osakekurssi' ,
bootstrap_servers = 'localhost:9092' ,
arvo_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

varten viesti sisään kuluttaja:
stock_data = viesti.arvo
Tulosta ( f 'Vastaanotetut varastotiedot: {stock_data['symbol']} - Hinta: {stock_data['price']}' )


Koodin suorittamisen jälkeisen tulosteen analysoinnissa tarkkailemme Apple Inc:n reaaliaikaisia ​​osakekurssipäivityksiä, joita tuotetaan ja kulutetaan.

Johtopäätös

Tässä ainutlaatuisessa esimerkissä osoitimme reaaliaikaisen datan suoratoiston toteutuksen Pythonissa käyttämällä Apache Kafkaa ja 'yfinance'-kirjastoa osakemarkkinoiden tietojen keräämiseen ja käsittelemiseen. Selitimme perusteellisesti jokaisen koodin rivin. Reaaliaikaista datan suoratoistoa voidaan soveltaa useille aloille reaalimaailman sovellusten rakentamiseen rahoituksessa, IoT:ssä ja monessa muussa.