Sisällön aihe:
Jos haluat tietää PySpark DataFramen ja moduulin asennuksesta, käy läpi tämä artikla .
Pyspark.sql.functions.pandas_udf()
Pandas_udf () on saatavilla PySparkin sql.functions-moduulissa, joka voidaan tuoda 'from'-avainsanalla. Sitä käytetään PySpark DataFrame -kehyksen vektorisoitujen toimintojen suorittamiseen. Tämä toiminto toteutetaan kuin koristelija välittämällä kolme parametria. Sen jälkeen voimme luoda käyttäjän määrittämän funktion, joka palauttaa tiedot vektorimuodossa (kuten käytämme sarjaa/NumPytä tähän) nuolen avulla. Tämän toiminnon sisällä voimme palauttaa tuloksen.
Rakenne ja syntaksi:
Katsotaanpa ensin tämän funktion rakennetta ja syntaksia:
@pandas_udf(tietotyyppi)def-funktion_nimi(operaatio) -> convert_format:
palautusilmoitus
Tässä funktion_nimi on määrittämämme funktion nimi. Tietotyyppi määrittää tietotyypin, jonka tämä funktio palauttaa. Voimme palauttaa tuloksen käyttämällä 'return'-avainsanaa. Kaikki toiminnot suoritetaan toiminnon sisällä nuolella.
Pandas_udf (funktio ja palautustyyppi)
- Ensimmäinen parametri on käyttäjän määrittämä funktio, joka välitetään sille.
- Toista parametria käytetään määrittämään funktion palautustietotyyppi.
Tiedot:
Tässä koko oppaassa käytämme vain yhtä PySpark DataFrame -kehystä esittelyyn. Kaikki määrittämämme käyttäjän määrittämät toiminnot ovat käytössä tässä PySpark DataFrame -kehyksessä. Varmista, että luot tämän DataFramen ympäristöösi ensin PySparkin asennuksen jälkeen.
tuonti pyspark
pyspark.sql-tiedostosta tuo SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
pyspark.sql.functions -tiedostosta tuo pandas_udf
pyspark.sql.types tuonti *
tuo pandat pandaksi
# vihannestiedot
kasvis =[{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 800 },
{ 'tyyppi' : 'hedelmä' , 'nimi' : 'banaani' , 'locate_country' : 'KIINA' , 'määrä' : kaksikymmentä },
{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 800 },
{ 'tyyppi' : 'kasvis' , 'nimi' : 'Mango' , 'locate_country' : 'JAPANI' , 'määrä' : 0 },
{ 'tyyppi' : 'hedelmä' , 'nimi' : 'sitruuna' , 'locate_country' : 'INTIA' , 'määrä' : 1700 },
{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 1200 },
{ 'tyyppi' : 'kasvis' , 'nimi' : 'Mango' , 'locate_country' : 'JAPANI' , 'määrä' : 0 },
{ 'tyyppi' : 'hedelmä' , 'nimi' : 'sitruuna' , 'locate_country' : 'INTIA' , 'määrä' : 0 }
]
# luo markkinatietokehys yllä olevista tiedoista
market_df = linuxhint_spark_app.createDataFrame(kasvis)
market_df.show()
Lähtö:
Tässä luomme tämän DataFramen, jossa on 4 saraketta ja 8 riviä. Nyt käytämme pandas_udf()-funktiota käyttäjän määrittämien funktioiden luomiseen ja soveltamiseen näihin sarakkeisiin.
Pandas_udf() eri tietotyypeillä
Tässä skenaariossa luomme joitain käyttäjän määrittämiä funktioita komennolla pandas_udf() ja käytämme niitä sarakkeissa ja näytämme tulokset select()-menetelmällä. Kussakin tapauksessa käytämme pandas.Seriesia suorittaessamme vektorisoituja operaatioita. Tämä pitää sarakearvoja yksiulotteisena taulukkona ja toimintoa sovelletaan sarakkeeseen. Itse sisustajassa määritämme funktion palautustyypin.
Esimerkki 1: Pandas_udf() merkkijonotyypillä
Täällä luomme kaksi käyttäjän määrittämää funktiota merkkijonon palautustyypillä muuntamaan merkkijonotyypin sarakkeen arvot isoiksi ja pieniksi kirjaimille. Lopuksi käytämme näitä toimintoja 'type'- ja 'locate_country'-sarakkeissa.
# Muunna tyyppisarake isoiksi kirjaimilla pandas_udf:lla@pandas_udf(merkkijonotyyppi())
def type_upper_case(i: panda.Series) -> panda.Series:
return i.str.upper()
# Muunna locate_country-sarake pieniksi komennolla pandas_udf
@pandas_udf(merkkijonotyyppi())
def country_lower_case(i: panda.Series) -> panda.Series:
return i.str.lower()
# Näytä sarakkeet käyttämällä select()
market_df.select( 'tyyppi' ,type_upper_case( 'tyyppi' ), 'locate_country' ,
maa_pieni_kirjain( 'locate_country' )).näytä()
Lähtö:
Selitys:
StringType()-funktio on käytettävissä pyspark.sql.types-moduulissa. Toimme tämän moduulin jo luodessasi PySpark DataFramea.
- Ensin UDF (käyttäjän määrittämä funktio) palauttaa merkkijonot isoilla kirjaimilla käyttämällä str.upper()-funktiota. Str.upper() on käytettävissä Series Data Structuressa (kun muunnamme sarjoiksi nuolella funktion sisällä), joka muuntaa annetun merkkijonon isoiksi kirjaimiksi. Lopuksi tätä funktiota sovelletaan 'type' -sarakkeeseen, joka on määritetty select()-menetelmän sisällä. Aiemmin kaikki tyyppisarakkeen merkkijonot ovat pieniä kirjaimia. Nyt ne on muutettu isoiksi.
- Toiseksi UDF palauttaa merkkijonot isoilla kirjaimilla käyttämällä str.lower()-funktiota. Str.lower() on käytettävissä Series Data Structuressa, joka muuntaa annetun merkkijonon pieniksi kirjaimiksi. Lopuksi tätä funktiota sovelletaan 'type' -sarakkeeseen, joka on määritetty select()-menetelmän sisällä. Aiemmin kaikki tyyppisarakkeen merkkijonot ovat isoja kirjaimia. Nyt ne on vaihdettu pieniin kirjaimiin.
Esimerkki 2: Pandas_udf() ja kokonaislukutyyppi
Luodaan UDF, joka muuntaa PySpark DataFrame -kokonaislukusarakkeen Pandas-sarjaksi ja lisää 100 jokaiseen arvoon. Siirrä 'määrä'-sarake tälle funktiolle select()-menetelmän sisällä.
# Lisää 100@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
palauta i+ 100
# Siirrä määräsarake yllä olevaan toimintoon ja näyttöön.
market_df.select( 'määrä' ,add_100( 'määrä' )).näytä()
Lähtö:
Selitys:
UDF:n sisällä iteroimme kaikki arvot ja muunnamme ne sarjaksi. Sen jälkeen lisäämme sarjan jokaiseen arvoon 100. Lopuksi välitämme 'määrä' -sarakkeen tälle funktiolle ja voimme nähdä, että kaikkiin arvoihin lisätään 100.
Pandas_udf() eri tietotyypeillä käyttäen Groupby() & Agg()
Katsotaanpa esimerkkejä UDF:n välittämiseksi koostetuille sarakkeille. Tässä sarakearvot ryhmitellään ensin groupby()-funktiolla ja yhdistäminen agg()-funktiolla. Välitämme UDF:mme tämän aggregaattifunktion sisällä.
Syntaksi:
pyspark_dataframe_object.groupby( 'ryhmittelysarake' ).agg(UDF(pyspark_dataframe_object[ 'pylväs' ]))
Tässä ryhmittelysarakkeen arvot ryhmitellään ensin. Sitten aggregointi tehdään jokaiselle ryhmitellylle tiedolle suhteessa UDF:ään.
Esimerkki 1: Pandas_udf() ja kokonaiskeskiarvo()
Täällä luomme käyttäjän määrittämän funktion palautustyypin floatilla. Laskemme funktion sisällä keskiarvon käyttämällä mean()-funktiota. Tämä UDF välitetään 'määrä'-sarakkeeseen, jotta saadaan kunkin tyypin keskimääräinen määrä.
# palauttaa keskiarvon/keskiarvon@pandas_udf( 'kellua' )
def medium_function(i: panda.Series) -> float:
return i.mean()
# Välitä määräsarake funktiolle ryhmittelemällä tyyppisarake.
market_df.groupby( 'tyyppi' ).agg(average_function(market_df[ 'määrä' ])).näytä()
Lähtö:
Ryhmittelemme 'tyyppi'-sarakkeen elementtien perusteella. Muodostetaan kaksi ryhmää - 'hedelmä' ja 'kasvis'. Jokaisen ryhmän keskiarvo lasketaan ja palautetaan.
Esimerkki 2: Pandas_udf() ja Aggregate Max() ja Min()
Täällä luomme kaksi käyttäjän määrittämää funktiota, joiden palautustyyppi on kokonaisluku (int). Ensimmäinen UDF palauttaa minimiarvon ja toinen UDF palauttaa maksimiarvon.
# pandas_udf, jotka palauttavat vähimmäisarvon@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
paluu i.min()
# pandas_udf, jotka palauttavat enimmäisarvon
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
paluu i.max()
# Siirrä määräsarake kohtaan min_ pandas_udf ryhmittelemällä locate_country.
market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'määrä' ])).näytä()
# Siirrä määräsarake kohtaan max_ pandas_udf ryhmittelemällä locate_country.
market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'määrä' ])).näytä()
Lähtö:
Minimi- ja maksimiarvojen palauttamiseksi käytämme min()- ja max()-funktioita UDF-tiedostojen palautustyypissä. Nyt ryhmittelemme tiedot 'locate_country' -sarakkeeseen. Muodostetaan neljä ryhmää ('KIINA', 'INTIA', 'JAPANI', 'USA'). Jokaisesta ryhmästä palautetaan enimmäismäärä. Vastaavasti palautamme vähimmäismäärän.
Johtopäätös
Periaatteessa pandas_udf () -funktiota käytetään PySpark DataFrame -kehyksen vektorisoitujen toimintojen suorittamiseen. Olemme nähneet, kuinka pandas_udf() luodaan ja käytetään PySpark DataFrame -kehyksessä. Paremman ymmärtämisen vuoksi keskustelimme erilaisista esimerkeistä ottamalla huomioon kaikki tietotyypit (merkkijono, float ja kokonaisluku). Pandas_udf()-funktiota voidaan käyttää groupby():n kanssa agg()-funktion kautta.