Pyspark.sql.DataFrameWriter.saveAsTable()
Ensinnäkin näemme, kuinka olemassa oleva PySpark DataFrame kirjoitetaan taulukkoon write.saveAsTable()-funktiolla. DataFramen kirjoittaminen taulukkoon vaatii taulukon nimen ja muut valinnaiset parametrit, kuten moodit, partitionBy jne.. Se on tallennettu parkettiviilana.
Syntaksi:
dataframe_obj.write.saveAsTable(polku/taulukon_nimi,tila,osio,…)
- Taulukon_nimi on dataframe_obj:sta luodun taulukon nimi.
- Voimme liittää/korvaaa taulukon tiedot mode-parametrilla.
- PartitionBy käyttää yksittäisiä/useita sarakkeita luodakseen osioita näiden sarakkeiden arvojen perusteella.
Esimerkki 1:
Luo PySpark DataFrame, jossa on 5 riviä ja 4 saraketta. Kirjoita tämä tietokehys taulukkoon nimeltä 'Agri_Table1'.
tuonti pyspark
pyspark.sql-tiedostosta tuo SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# viljelydataa 5 rivillä ja 5 sarakkeella
maatalouden =[{ 'Soil_Type' : 'Musta' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 2500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'USA' },
{ 'Soil_Type' : 'Musta' , 'Kastelun_saatavuus' : 'Joo' , 'Acres' : 3500 , 'Soil_status' : 'Märkä' ,
'Maa' : 'Intia' },
{ 'Soil_Type' : 'Punainen' , 'Kastelun_saatavuus' : 'Joo' , 'Acres' : 210 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'UK' },
{ 'Soil_Type' : 'muu' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 1000 , 'Soil_status' : 'Märkä' ,
'Maa' : 'USA' },
{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'Intia' }]
# luo datakehys yllä olevista tiedoista
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Kirjoita yllä oleva DataFrame taulukkoon.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Lähtö:
Näemme, että yksi parkettitiedosto luodaan edellisellä PySpark Datalla.
Esimerkki 2:
Harkitse edellistä DataFrame-kehystä ja kirjoita 'Agri_Table2' taulukkoon osioiden tietueet 'Maa'-sarakkeen arvojen perusteella.
# Kirjoita yllä oleva DataFrame taulukkoon partitionBy-parametrillaagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Maa' ])
Lähtö:
'Maa'-sarakkeessa on kolme yksilöllistä arvoa - 'Intia', 'UK' ja 'USA'. Joten luodaan kolme osiota. Jokainen osio sisältää parkettitiedostot.
Pyspark.sql.DataFrameReader.table()
Ladataan taulukko PySpark DataFrameen spark.read.table()-funktiolla. Se vaatii vain yhden parametrin, joka on polun/taulukon nimi. Se lataa taulukon suoraan PySpark DataFrameen, ja kaikki PySpark DataFrameen sovellettavat SQL-toiminnot voidaan myös käyttää tähän ladatuun DataFrameen.
Syntaksi:
spark_app.read.table(polku/'taulukon_nimi')Tässä skenaariossa käytämme edellistä taulukkoa, joka luotiin PySpark DataFrame -kehyksestä. Varmista, että sinun on otettava käyttöön aiemmat skenaariokoodinpätkät ympäristössäsi.
Esimerkki:
Lataa 'Agri_Table1' -taulukko DataFrame-kehykseen nimeltä 'loaded_data'.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Lähtö:
Näemme, että taulukko on ladattu PySpark DataFrameen.
SQL-kyselyjen suorittaminen
Nyt suoritamme joitain SQL-kyselyitä ladatussa DataFrame-kehyksessä spark.sql()-funktiolla.
# Käytä SELECT-komentoa näyttääksesi kaikki yllä olevan taulukon sarakkeet.linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).näytä()
# WHERE -lauseke
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Kuiva'' ).näytä()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).näytä()
Lähtö:
- Ensimmäinen kysely näyttää kaikki sarakkeet ja tietueet DataFramesta.
- Toinen kysely näyttää tietueet 'Soil_status' -sarakkeen perusteella. 'Dry'-elementillä varustettuja tietueita on vain kolme.
- Viimeinen kysely palauttaa kaksi tietuetta, joiden 'Acres' on suurempi kuin 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Käyttämällä insertInto()-funktiota voimme liittää DataFramen olemassa olevaan taulukkoon. Voimme käyttää tätä funktiota yhdessä selectExpr()-funktion kanssa määrittääksesi sarakkeiden nimet ja lisätä sen sitten taulukkoon. Tämä funktio ottaa myös parametrin tableName.
Syntaksi:
DataFrame_obj.write.insertInto('Taulukon_nimi')Tässä skenaariossa käytämme edellistä taulukkoa, joka luotiin PySpark DataFrame -kehyksestä. Varmista, että sinun on otettava käyttöön aiemmat skenaariokoodinpätkät ympäristössäsi.
Esimerkki:
Luo uusi DataFrame kahdella tietueella ja lisää ne 'Agri_Table1' -taulukkoon.
tuonti pysparkpyspark.sql-tiedostosta tuo SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# viljelydataa 2 rivillä
maatalouden =[{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 2500 , 'Soil_status' : 'Kuiva' ,
'Maa' : 'USA' },
{ 'Soil_Type' : 'Hiekka' , 'Kastelun_saatavuus' : 'Ei' , 'Acres' : 1200 , 'Soil_status' : 'Märkä' ,
'Maa' : 'Japani' }]
# luo datakehys yllä olevista tiedoista
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acres' , 'Maa' , 'Kastelun_saatavuus' , 'Soil_Type' ,
'Maaperän tila' ).write.insertInto( 'Agri_Table1' )
# Näytä lopullinen Agri_Table1
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).näytä()
Lähtö:
Nyt DataFrame-kehyksessä olevien rivien kokonaismäärä on 7.
Johtopäätös
Ymmärrät nyt, kuinka PySpark DataFrame kirjoitetaan taulukkoon write.saveAsTable()-funktiolla. Se ottaa taulukon nimen ja muut valinnaiset parametrit. Sitten latasimme tämän taulukon PySpark DataFrameen käyttämällä spark.read.table()-funktiota. Se vaatii vain yhden parametrin, joka on polun/taulukon nimi. Jos haluat liittää uuden DataFramen olemassa olevaan taulukkoon, käytä insertInto()-funktiota.