Kuinka lukea ja kirjoittaa taulukkotietoja PySparkissa

Kuinka Lukea Ja Kirjoittaa Taulukkotietoja Pysparkissa



PySparkissa tietojenkäsittely on nopeampaa, jos tiedot ladataan taulukon muodossa. Tällä SQL-lausekkeita käyttämällä käsittely on nopeaa. PySpark DataFrame/RDD:n muuntaminen taulukoksi ennen sen lähettämistä käsittelyyn on siis parempi tapa. Tänään näemme kuinka luetaan taulukkotiedot PySpark DataFrameen, kirjoitetaan PySpark DataFrame taulukkoon ja lisätään uusi DataFrame olemassa olevaan taulukkoon sisäänrakennettujen toimintojen avulla. Mennään!

Pyspark.sql.DataFrameWriter.saveAsTable()

Ensinnäkin näemme, kuinka olemassa oleva PySpark DataFrame kirjoitetaan taulukkoon write.saveAsTable()-funktiolla. DataFramen kirjoittaminen taulukkoon vaatii taulukon nimen ja muut valinnaiset parametrit, kuten moodit, partitionBy jne.. Se on tallennettu parkettiviilana.

Syntaksi:







dataframe_obj.write.saveAsTable(polku/taulukon_nimi,tila,osio,…)
  1. Taulukon_nimi on dataframe_obj:sta luodun taulukon nimi.
  2. Voimme liittää/korvaaa taulukon tiedot mode-parametrilla.
  3. PartitionBy käyttää yksittäisiä/useita sarakkeita luodakseen osioita näiden sarakkeiden arvojen perusteella.

Esimerkki 1:

Luo PySpark DataFrame, jossa on 5 riviä ja 4 saraketta. Kirjoita tämä tietokehys taulukkoon nimeltä 'Agri_Table1'.



tuonti pyspark

pyspark.sql-tiedostosta tuo SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# viljelydataa 5 rivillä ja 5 sarakkeella

maatalouden =[{ 'Soil_Type' : 'Musta' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 2500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'USA' },

{ 'Soil_Type' : 'Musta' , 'Kastelun_saatavuus' : 'Joo' , 'Acres' : 3500 , 'Soil_status' : 'Märkä' ,
'Maa' : 'Intia' },

{ 'Soil_Type' : 'Punainen' , 'Kastelun_saatavuus' : 'Joo' , 'Acres' : 210 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'UK' },

{ 'Soil_Type' : 'muu' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 1000 , 'Soil_status' : 'Märkä' ,
'Maa' : 'USA' },

{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'Intia' }]



# luo datakehys yllä olevista tiedoista

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Kirjoita yllä oleva DataFrame taulukkoon.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Lähtö:







Näemme, että yksi parkettitiedosto luodaan edellisellä PySpark Datalla.



Esimerkki 2:

Harkitse edellistä DataFrame-kehystä ja kirjoita 'Agri_Table2' taulukkoon osioiden tietueet 'Maa'-sarakkeen arvojen perusteella.

# Kirjoita yllä oleva DataFrame taulukkoon partitionBy-parametrilla

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Maa' ])

Lähtö:

'Maa'-sarakkeessa on kolme yksilöllistä arvoa - 'Intia', 'UK' ja 'USA'. Joten luodaan kolme osiota. Jokainen osio sisältää parkettitiedostot.

Pyspark.sql.DataFrameReader.table()

Ladataan taulukko PySpark DataFrameen spark.read.table()-funktiolla. Se vaatii vain yhden parametrin, joka on polun/taulukon nimi. Se lataa taulukon suoraan PySpark DataFrameen, ja kaikki PySpark DataFrameen sovellettavat SQL-toiminnot voidaan myös käyttää tähän ladatuun DataFrameen.

Syntaksi:

spark_app.read.table(polku/'taulukon_nimi')

Tässä skenaariossa käytämme edellistä taulukkoa, joka luotiin PySpark DataFrame -kehyksestä. Varmista, että sinun on otettava käyttöön aiemmat skenaariokoodinpätkät ympäristössäsi.

Esimerkki:

Lataa 'Agri_Table1' -taulukko DataFrame-kehykseen nimeltä 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Lähtö:

Näemme, että taulukko on ladattu PySpark DataFrameen.

SQL-kyselyjen suorittaminen

Nyt suoritamme joitain SQL-kyselyitä ladatussa DataFrame-kehyksessä spark.sql()-funktiolla.

# Käytä SELECT-komentoa näyttääksesi kaikki yllä olevan taulukon sarakkeet.

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).näytä()

# WHERE -lauseke

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Kuiva'' ).näytä()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).näytä()

Lähtö:

  1. Ensimmäinen kysely näyttää kaikki sarakkeet ja tietueet DataFramesta.
  2. Toinen kysely näyttää tietueet 'Soil_status' -sarakkeen perusteella. 'Dry'-elementillä varustettuja tietueita on vain kolme.
  3. Viimeinen kysely palauttaa kaksi tietuetta, joiden 'Acres' on suurempi kuin 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Käyttämällä insertInto()-funktiota voimme liittää DataFramen olemassa olevaan taulukkoon. Voimme käyttää tätä funktiota yhdessä selectExpr()-funktion kanssa määrittääksesi sarakkeiden nimet ja lisätä sen sitten taulukkoon. Tämä funktio ottaa myös parametrin tableName.

Syntaksi:

DataFrame_obj.write.insertInto('Taulukon_nimi')

Tässä skenaariossa käytämme edellistä taulukkoa, joka luotiin PySpark DataFrame -kehyksestä. Varmista, että sinun on otettava käyttöön aiemmat skenaariokoodinpätkät ympäristössäsi.

Esimerkki:

Luo uusi DataFrame kahdella tietueella ja lisää ne 'Agri_Table1' -taulukkoon.

tuonti pyspark

pyspark.sql-tiedostosta tuo SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# viljelydataa 2 rivillä

maatalouden =[{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 2500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'USA' },

{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 1200 , 'Soil_status' : 'Märkä' ,
'Maa' : 'Japani' }]

# luo datakehys yllä olevista tiedoista

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acres' , 'Maa' , 'Kastelun_saatavuus' , 'Soil_Type' ,
'Maaperän tila' ).write.insertInto( 'Agri_Table1' )

# Näytä lopullinen Agri_Table1

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).näytä()

Lähtö:

Nyt DataFrame-kehyksessä olevien rivien kokonaismäärä on 7.

Johtopäätös

Ymmärrät nyt, kuinka PySpark DataFrame kirjoitetaan taulukkoon write.saveAsTable()-funktiolla. Se ottaa taulukon nimen ja muut valinnaiset parametrit. Sitten latasimme tämän taulukon PySpark DataFrameen käyttämällä spark.read.table()-funktiota. Se vaatii vain yhden parametrin, joka on polun/taulukon nimi. Jos haluat liittää uuden DataFramen olemassa olevaan taulukkoon, käytä insertInto()-funktiota.