PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFramen muuntaminen on mahdollista pandas_udf()-funktiolla. Se on käyttäjän määrittämä toiminto, jota käytetään PySpark DataFrame -kehyksessä nuolella. Voimme suorittaa vektorisoidut toiminnot käyttämällä pandas_udf(). Se voidaan toteuttaa suorittamalla tämä toiminto sisustajana. Sukellaan tähän oppaaseen saadaksesi tietää syntaksin, parametrit ja erilaiset esimerkit.

Sisällön aihe:

Jos haluat tietää PySpark DataFramen ja moduulin asennuksesta, käy läpi tämä artikla .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () on saatavilla PySparkin sql.functions-moduulissa, joka voidaan tuoda 'from'-avainsanalla. Sitä käytetään PySpark DataFrame -kehyksen vektorisoitujen toimintojen suorittamiseen. Tämä toiminto toteutetaan kuin koristelija välittämällä kolme parametria. Sen jälkeen voimme luoda käyttäjän määrittämän funktion, joka palauttaa tiedot vektorimuodossa (kuten käytämme sarjaa/NumPytä tähän) nuolen avulla. Tämän toiminnon sisällä voimme palauttaa tuloksen.



Rakenne ja syntaksi:



Katsotaanpa ensin tämän funktion rakennetta ja syntaksia:

@pandas_udf(tietotyyppi)
def-funktion_nimi(operaatio) -> convert_format:
palautusilmoitus

Tässä funktion_nimi on määrittämämme funktion nimi. Tietotyyppi määrittää tietotyypin, jonka tämä funktio palauttaa. Voimme palauttaa tuloksen käyttämällä 'return'-avainsanaa. Kaikki toiminnot suoritetaan toiminnon sisällä nuolella.





Pandas_udf (funktio ja palautustyyppi)

  1. Ensimmäinen parametri on käyttäjän määrittämä funktio, joka välitetään sille.
  2. Toista parametria käytetään määrittämään funktion palautustietotyyppi.

Tiedot:

Tässä koko oppaassa käytämme vain yhtä PySpark DataFrame -kehystä esittelyyn. Kaikki määrittämämme käyttäjän määrittämät toiminnot ovat käytössä tässä PySpark DataFrame -kehyksessä. Varmista, että luot tämän DataFramen ympäristöösi ensin PySparkin asennuksen jälkeen.



tuonti pyspark

pyspark.sql-tiedostosta tuo SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

pyspark.sql.functions -tiedostosta tuo pandas_udf

pyspark.sql.types tuonti *

tuo pandat pandaksi

# vihannestiedot

kasvis =[{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 800 },

{ 'tyyppi' : 'hedelmä' , 'nimi' : 'banaani' , 'locate_country' : 'KIINA' , 'määrä' : kaksikymmentä },

{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 800 },

{ 'tyyppi' : 'kasvis' , 'nimi' : 'Mango' , 'locate_country' : 'JAPANI' , 'määrä' : 0 },

{ 'tyyppi' : 'hedelmä' , 'nimi' : 'sitruuna' , 'locate_country' : 'INTIA' , 'määrä' : 1700 },

{ 'tyyppi' : 'kasvis' , 'nimi' : 'tomaatti' , 'locate_country' : 'USA' , 'määrä' : 1200 },

{ 'tyyppi' : 'kasvis' , 'nimi' : 'Mango' , 'locate_country' : 'JAPANI' , 'määrä' : 0 },

{ 'tyyppi' : 'hedelmä' , 'nimi' : 'sitruuna' , 'locate_country' : 'INTIA' , 'määrä' : 0 }

]

# luo markkinatietokehys yllä olevista tiedoista

market_df = linuxhint_spark_app.createDataFrame(kasvis)

market_df.show()

Lähtö:

Tässä luomme tämän DataFramen, jossa on 4 saraketta ja 8 riviä. Nyt käytämme pandas_udf()-funktiota käyttäjän määrittämien funktioiden luomiseen ja soveltamiseen näihin sarakkeisiin.

Pandas_udf() eri tietotyypeillä

Tässä skenaariossa luomme joitain käyttäjän määrittämiä funktioita komennolla pandas_udf() ja käytämme niitä sarakkeissa ja näytämme tulokset select()-menetelmällä. Kussakin tapauksessa käytämme pandas.Seriesia suorittaessamme vektorisoituja operaatioita. Tämä pitää sarakearvoja yksiulotteisena taulukkona ja toimintoa sovelletaan sarakkeeseen. Itse sisustajassa määritämme funktion palautustyypin.

Esimerkki 1: Pandas_udf() merkkijonotyypillä

Täällä luomme kaksi käyttäjän määrittämää funktiota merkkijonon palautustyypillä muuntamaan merkkijonotyypin sarakkeen arvot isoiksi ja pieniksi kirjaimille. Lopuksi käytämme näitä toimintoja 'type'- ja 'locate_country'-sarakkeissa.

# Muunna tyyppisarake isoiksi kirjaimilla pandas_udf:lla

@pandas_udf(merkkijonotyyppi())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Muunna locate_country-sarake pieniksi komennolla pandas_udf

@pandas_udf(merkkijonotyyppi())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Näytä sarakkeet käyttämällä select()

market_df.select( 'tyyppi' ,type_upper_case( 'tyyppi' ), 'locate_country' ,
maa_pieni_kirjain( 'locate_country' )).näytä()

Lähtö:

Selitys:

StringType()-funktio on käytettävissä pyspark.sql.types-moduulissa. Toimme tämän moduulin jo luodessasi PySpark DataFramea.

  1. Ensin UDF (käyttäjän määrittämä funktio) palauttaa merkkijonot isoilla kirjaimilla käyttämällä str.upper()-funktiota. Str.upper() on käytettävissä Series Data Structuressa (kun muunnamme sarjoiksi nuolella funktion sisällä), joka muuntaa annetun merkkijonon isoiksi kirjaimiksi. Lopuksi tätä funktiota sovelletaan 'type' -sarakkeeseen, joka on määritetty select()-menetelmän sisällä. Aiemmin kaikki tyyppisarakkeen merkkijonot ovat pieniä kirjaimia. Nyt ne on muutettu isoiksi.
  2. Toiseksi UDF palauttaa merkkijonot isoilla kirjaimilla käyttämällä str.lower()-funktiota. Str.lower() on käytettävissä Series Data Structuressa, joka muuntaa annetun merkkijonon pieniksi kirjaimiksi. Lopuksi tätä funktiota sovelletaan 'type' -sarakkeeseen, joka on määritetty select()-menetelmän sisällä. Aiemmin kaikki tyyppisarakkeen merkkijonot ovat isoja kirjaimia. Nyt ne on vaihdettu pieniin kirjaimiin.

Esimerkki 2: Pandas_udf() ja kokonaislukutyyppi

Luodaan UDF, joka muuntaa PySpark DataFrame -kokonaislukusarakkeen Pandas-sarjaksi ja lisää 100 jokaiseen arvoon. Siirrä 'määrä'-sarake tälle funktiolle select()-menetelmän sisällä.

# Lisää 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

palauta i+ 100

# Siirrä määräsarake yllä olevaan toimintoon ja näyttöön.

market_df.select( 'määrä' ,add_100( 'määrä' )).näytä()

Lähtö:

Selitys:

UDF:n sisällä iteroimme kaikki arvot ja muunnamme ne sarjaksi. Sen jälkeen lisäämme sarjan jokaiseen arvoon 100. Lopuksi välitämme 'määrä' -sarakkeen tälle funktiolle ja voimme nähdä, että kaikkiin arvoihin lisätään 100.

Pandas_udf() eri tietotyypeillä käyttäen Groupby() & Agg()

Katsotaanpa esimerkkejä UDF:n välittämiseksi koostetuille sarakkeille. Tässä sarakearvot ryhmitellään ensin groupby()-funktiolla ja yhdistäminen agg()-funktiolla. Välitämme UDF:mme tämän aggregaattifunktion sisällä.

Syntaksi:

pyspark_dataframe_object.groupby( 'ryhmittelysarake' ).agg(UDF
(pyspark_dataframe_object[ 'pylväs' ]))

Tässä ryhmittelysarakkeen arvot ryhmitellään ensin. Sitten aggregointi tehdään jokaiselle ryhmitellylle tiedolle suhteessa UDF:ään.

Esimerkki 1: Pandas_udf() ja kokonaiskeskiarvo()

Täällä luomme käyttäjän määrittämän funktion palautustyypin floatilla. Laskemme funktion sisällä keskiarvon käyttämällä mean()-funktiota. Tämä UDF välitetään 'määrä'-sarakkeeseen, jotta saadaan kunkin tyypin keskimääräinen määrä.

# palauttaa keskiarvon/keskiarvon

@pandas_udf( 'kellua' )

def medium_function(i: panda.Series) -> float:

return i.mean()

# Välitä määräsarake funktiolle ryhmittelemällä tyyppisarake.

market_df.groupby( 'tyyppi' ).agg(average_function(market_df[ 'määrä' ])).näytä()

Lähtö:

Ryhmittelemme 'tyyppi'-sarakkeen elementtien perusteella. Muodostetaan kaksi ryhmää - 'hedelmä' ja 'kasvis'. Jokaisen ryhmän keskiarvo lasketaan ja palautetaan.

Esimerkki 2: Pandas_udf() ja Aggregate Max() ja Min()

Täällä luomme kaksi käyttäjän määrittämää funktiota, joiden palautustyyppi on kokonaisluku (int). Ensimmäinen UDF palauttaa minimiarvon ja toinen UDF palauttaa maksimiarvon.

# pandas_udf, jotka palauttavat vähimmäisarvon

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

paluu i.min()

# pandas_udf, jotka palauttavat enimmäisarvon

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

paluu i.max()

# Siirrä määräsarake kohtaan min_ pandas_udf ryhmittelemällä locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'määrä' ])).näytä()

# Siirrä määräsarake kohtaan max_ pandas_udf ryhmittelemällä locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'määrä' ])).näytä()

Lähtö:

Minimi- ja maksimiarvojen palauttamiseksi käytämme min()- ja max()-funktioita UDF-tiedostojen palautustyypissä. Nyt ryhmittelemme tiedot 'locate_country' -sarakkeeseen. Muodostetaan neljä ryhmää ('KIINA', 'INTIA', 'JAPANI', 'USA'). Jokaisesta ryhmästä palautetaan enimmäismäärä. Vastaavasti palautamme vähimmäismäärän.

Johtopäätös

Periaatteessa pandas_udf () -funktiota käytetään PySpark DataFrame -kehyksen vektorisoitujen toimintojen suorittamiseen. Olemme nähneet, kuinka pandas_udf() luodaan ja käytetään PySpark DataFrame -kehyksessä. Paremman ymmärtämisen vuoksi keskustelimme erilaisista esimerkeistä ottamalla huomioon kaikki tietotyypit (merkkijono, float ja kokonaisluku). Pandas_udf()-funktiota voidaan käyttää groupby():n kanssa agg()-funktion kautta.