RDD naudojant „Spark“: „Apache Spark“ statybinė medžiaga



Šis tinklaraštis apie RDD, naudojant „Spark“, suteiks jums išsamių ir išsamių žinių apie RDD, kuris yra pagrindinis „Spark & ​​How naudingas“ padalinys.

, Pakanka paties žodžio, kad sukeltų kibirkštį kiekvieno „Hadoop“ inžinieriaus galvoje. Į n atmintyje apdorojimo įrankis kuri yra žaibiška skaičiuojant klasterius. Palyginti su „MapReduce“, dalijantis atmintyje esančiais duomenimis, RDD 10–100 kartų greičiau nei dalijimasis tinklu ir disku, ir visa tai įmanoma dėl RDD (elastingų paskirstytų duomenų rinkinių). Pagrindiniai dalykai, į kuriuos šiandien atkreipiame dėmesį šiame RDD, naudodami „Spark“ straipsnį, yra šie:

Reikia RDD?

Kodėl mums reikia RDD? -RDD naudojant „Spark“



Pasaulis vystosi kartu ir Duomenų mokslas dėl pažangos . Algoritmai remiantis Regresija , , ir kuris veikia toliau Paskirstyta Pakartotinis skaičiavimas atijimas mados, kuri apima pakartotinį duomenų naudojimą ir dalijimąsi keliais skaičiavimo įrenginiais.

Tradicinis reikalingos stabilios tarpinės ir paskirstytos saugyklos HDFS susideda iš pasikartojančių skaičiavimų su duomenų replikacijomis ir duomenų eiliškumu, todėl procesas tapo daug lėtesnis. Surasti sprendimą niekada nebuvo lengva.



Tai kur RDD (Elastingi paskirstyti duomenų rinkiniai) ateina į bendrą vaizdą.

RDD S juos lengva naudoti ir kurti nesunku, nes duomenys yra importuojami iš duomenų šaltinių ir patenka į RDD. Be to, operacijos taikomos joms apdoroti. Jie yra a paskirstytas atminties rinkimas su leidimais kaip Tik skaitymui ir svarbiausia, kad jie yra Atsparus gedimams .



Jei bet kuris duomenų skaidinys apie RDD yra pasimetęs , jį galima regeneruoti taikant tą patį transformacija operacijos tą prarastą skaidinį giminė , o ne apdoroti visus duomenis nuo nulio. Toks požiūris realiuoju laiku gali sukelti stebuklus duomenų praradimo situacijose arba kai sistema neveikia.

Kas yra RDD?

RDD arba ( Elastingas paskirstytų duomenų rinkinys ) yra pagrindinis dalykas duomenų struktūra Sparke. Terminas Atsparus apibrėžia gebėjimą automatiškai generuoti duomenis arba duomenis rieda atgal į pirminė būsena kai įvyksta netikėta nelaimė su duomenų praradimo tikimybe.

Į RDD įrašyti duomenys yra skaidytas ir saugomi keli vykdomieji mazgai . Jei vykdomasis mazgas nepavyksta vykdymo metu, tada jis iškart gauna atsarginę kopiją iš kitas vykdomas mazgas . Štai kodėl RDD yra laikomi išplėstiniu duomenų struktūrų tipu, palyginti su kitomis tradicinėmis duomenų struktūromis. RDD gali saugoti struktūrizuotus, nestruktūruotus ir pusiau struktūruotus duomenis.

Pažvelkime į savo RDD, naudodamiesi „Spark“ tinklaraščiu, ir sužinokime apie unikalias RDD ypatybes, kurios suteikia pranašumą prieš kitų tipų duomenų struktūras.

RDD ypatybės

  • Atmintyje (RAM) Skaičiavimai : „In-Memory“ skaičiavimo koncepcija perkelia duomenis į greitesnį ir efektyvesnį etapą spektaklis sistemos yra atnaujinta.
  • L jo vertinimas : Tinginio vertinimo terminas sako transformacijos yra taikomi RDD duomenims, tačiau išvestis nesukuriama. Vietoj to, taikomos transformacijos yra prisijungęs.
  • Atkaklumas : Gauti RDD visada daugkartinio naudojimo.
  • Stambiagrūdės operacijos : Vartotojas gali pritaikyti transformacijas visiems duomenų rinkinių elementams per žemėlapis, filtras arba Grupuoti pagal operacijos.
  • Gedimas tolerantiškas : Jei prarandami duomenys, sistema gali atsukti atgal prie jos pirminė būsena naudojant užregistruotą transformacijos .
  • Nekintamumas : Duomenys negali būti apibrėžti, gauti ar sukurti pasikeitė kai jis bus prisijungęs prie sistemos. Jei jums reikia prieiti ir modifikuoti esamą RDD, turite sukurti naują RDD pritaikydami rinkinį Transformacija funkcijos, esančios ar ankstesnės RDD.
  • Skirstymas : Tai yra lemiamas vienetas lygiagretumo „Spark“ RDD. Pagal numatytuosius nustatymus sukurtų skaidinių skaičius priklauso nuo jūsų duomenų šaltinio. Jūs netgi galite nuspręsti, kiek skaidinių norite naudoti pasirinktinis skaidinys funkcijos.

RDD sukūrimas naudojant „Spark“

RDD galima sukurti trimis būdais:

  1. Skaitomi duomenys iš lygiagrečios kolekcijos
val PCRDD = spark.sparkContext.parallelize (Masyvas („Pirmadienis“, „Antradienis“, „Trečiadienis“, „Antradienis“, „Penktadienis“, „Šeštadienis“), 2) val rezultatasRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Taikymas transformacija dėl ankstesnių RDD
val žodžiai = spark.sparkContext.parallelize (seka ('Spark', 'is', 'a', 'labai', 'galinga', 'kalba')) val wordpair = žodžiai.map (w = (w.charAt 0), w)) wordpair.collect (). Foreach (println)
  1. Skaitomi duomenys iš išorinė saugykla arba failų kelius, pvz HDFS arba HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

RDD atliktos operacijos:

RDD atliekamos daugiausia dviejų tipų operacijos:

  • Transformacijos
  • Veiksmai

Transformacijos : The operacijos mes taikome RDD filtras, prieiga ir modifikuoti pirminio RDD duomenis, kad būtų sukurtas a paeiliui RDD vadinamas transformacija . Naujasis RDD grąžina žymeklį į ankstesnį RDD, užtikrinant jų priklausomybę.

Transformacijos yra Tingi vertinimai, kitaip tariant, veiksmai, taikomi RDD, kurį dirbate, bus užregistruoti, bet ne įvykdytas. Suaktyvinus sistemą sistema meta rezultatą ar išimtį Veiksmas .

Transformacijas galime suskirstyti į du tipus, kaip nurodyta toliau:

  • Siauros transformacijos
  • Plati transformacija

Siauros transformacijos Taikome siauras transformacijas a vienas skaidinys pirminio RDD generuoti naują RDD, nes duomenys, reikalingi RDD apdoroti, yra prieinami viename tėvų ASD . Siaurų transformacijų pavyzdžiai yra šie:

  • žemėlapis ()
  • filtras()
  • „flatMap“ ()
  • skaidinys ()
  • mapPartitions ()

Plati transformacija: Mes pritaikome plačią transformaciją kelios pertvaros sugeneruoti naują RDD. RDD apdorojimui reikalingi duomenys yra prieinami keliuose tėvų ASD . Plataus virsmo pavyzdžiai yra šie:

  • sumažintiBy ()
  • sąjunga ()

Veiksmai : Veiksmai nurodo „Apache Spark“ taikyti skaičiavimas ir perduoti rezultatą ar išimtį vairuotojo RDD. Keletas veiksmų apima:

eilutė iki datos konvertuoti Java
  • rinkti ()
  • suskaičiuoti ()
  • imti ()
  • Pirmas()

Praktiškai pritaikykime operacijas RDD:

IPL (Indijos „Premier“ lyga) yra kriketo turnyras, kurio aukščiausias lygis. Taigi, šiandien galime pasinaudoti IPL duomenų rinkiniu ir vykdyti mūsų RDD naudodami „Spark“.

  • Pirma, parsisiųskime IPL CSV atitikties duomenis. Atsisiųsdamas jis pradeda atrodyti kaip EXCEL failas su eilutėmis ir stulpeliais.

Kitame žingsnyje įsijungiame kibirkštį ir įkeliame failą match.csv iš jo vietos, mano atveju manocsvfailo vieta yra „/User/edureka_566977/test/matches.csv“

Dabar pradėkime nuo Transformacija pirma dalis:

  • žemėlapis ():

Mes naudojame Žemėlapio transformacija taikyti konkrečią transformacijos operaciją kiekvienam RDD elementui. Čia mes sukuriame RDD pavadinimu CKfile, kur saugome mūsųcsvfailą. Mes sukursime dar vieną RDD, vadinamą valstybėmis saugokite miesto detales .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val būsenos = CKfile.map (_. split (',') (2)) teigia.collect (). foreach (println)

  • filtras():

Filtro transformacija, pats pavadinimas apibūdina jo naudojimą. Šią transformacijos operaciją naudojame tam, kad iš pateiktų duomenų rinkinio filtruotume atrankinius duomenis. Mes kreipiamės filtro veikimas čia rasite metų IPL rungtynių įrašus 2017 m ir saugokite jį faile RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • „flatMap“ ():

„FlatMap“ yra transformacijos operacija kiekvienam iš RDD elementų, norint sukurti naują RDD. Tai panašu į žemėlapio transformaciją. čia mes taikomėsPlokščiasis žemėlapisį išspjauti Hyderabado miesto degtukus ir saugoti duomenis įfilRDDRDD.

val filRDD = fil.flatMap (eilutė => line.split ('Hyderabad')). rinkti ()

  • skaidinys ():

Visi duomenys, kuriuos mes rašome į RDD, yra padalijami į tam tikrą skaičių skaidinių. Mes naudojame šią transformaciją norėdami rasti pertvarų skaičius duomenys iš tikrųjų yra padalinti į.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

lentos darbalaukis 9 kvalifikuotas bendradarbis

  • mapPartitions ():

Mes laikome „MapPatitions“ kaip „Map“ (ir) alternatyvąkiekvienam() kartu. Čia mes naudojame „mapPartitions“, kad rastume eilučių skaičius turime savo faile RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • sumažintiBy ():

Mes naudojameReduceBy() įjungta Pagrindinės vertės poros . Mes panaudojome šią transformacijącsvfailą, norėdami rasti grotuvą su aukščiausias žmogus iš rungtynių .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (klaidingas) ManOTH.take (10) .foreach (println)

  • sąjunga ():

Viskas paaiškina viską, mes naudojame sąjungos pertvarkymas yra klubas du RDD kartu . Čia mes kuriame du RDD, būtent fil ir fil2. fil RDD yra 2017 m. IPL rungtynių įrašai, o fil2 RDD yra 2016 m. IPL rungtynių įrašai.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Pradėkime nuo Veiksmas dalis, kurioje parodome faktinę produkciją:

  • rinkti ():

Kolekcija yra veiksmas, kurį mes naudojame rodyti turinį RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • skaičius ():

Grafasyra veiksmas, kurį naudojame suskaičiuoti įrašų skaičius yra RDD.Čiames naudojame šią operaciją norėdami suskaičiuoti bendrą įrašų skaičių mūsų match.csv faile.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • paimti ():

„Take“ yra veiksmo operacija, panaši į rinkimą, tačiau vienintelis skirtumas yra tai, kad ji gali atsispausdinti bet kurią atrankinis eilučių skaičius pagal vartotojo užklausą. Čia mes naudojame šį kodą atspausdinti dešimt geriausių ataskaitų.

val statecountm = Scount.reduceByKey ((x, y) => x + y). žemėlapis (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. imti (10). foreach (println)

  • Pirmas():

Pirmoji () yra veiksmo operacija, panaši į rinkti () ir imti ()tainaudojamas atspausdinti viršutinę ataskaitą s išvestį Čia mes naudojame pirmąją () operaciją norėdami rasti maksimalus tam tikrame mieste sužaistų rungtynių skaičius ir gauname Mumbajų kaip išvestį.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val būsenos = CKfile.map (_. split (',') (2)) val Scount = state.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach [println] val statecountm = Scount.reduceByKey ((x, y) => x + y). žemėlapis (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Kad mūsų procesas taptų mūsų mokymosi RDD naudojant „Spark“ dar įdomesnis, aš sugalvojau įdomų naudojimo atvejį.

RDD naudojant „Spark“: „Pokemon“ naudojimo atvejis

  • Pirma, Atsisiųskime „Pokemon.csv“ failą ir įkelkite jį į „spark-shell“, kaip mes padarėme faile „Matches.csv“.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemonai yra iš tikrųjų labai įvairūs. Leiskite mums rasti keletą veislių.

  • Schemos pašalinimas iš failo Pokemon.csv

Mums gali to nereikėti Schema failo „Pokemon.csv“. Vadinasi, mes jį pašaliname.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Surasti skaičių pertvaros mūsų pokemon.csv yra paskirstytas į.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vandens pokemonai

Rasti vandens pokemono skaičius

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Ugnis Pokemonas

Rasti „Fire“ pokemono skaičius

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Taip pat galime aptikti gyventojų skirtingo tipo pokemon, naudojant skaičiavimo funkciją
WaterRDD.count () FireRDD.count ()

  • Kadangi man patinka žaidimas gynybinė strategija leisk mums rasti pokemoną su maksimali gynyba.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Aukščiausia_Gynimas:' + defenceList.max ())

  • Mes žinome maksimumą gynybos stiprumo vertė bet mes nežinome, kuris pokemonas tai yra. Taigi, suraskime būtent tai pokemonas.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Užsakymas [Double] .reverse.on (_._ 1)] MaxDefencePokemon.foreach (println)

  • Dabar išsiaiškinkime pokemoną mažiausiai gynyba
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5). foreach (println)

  • Dabar pažiūrėkime „Pokemon“ su a mažiau gynybinė strategija.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Užsakymas [Dvigubas ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

kas yra jamafas ir hashtable

Taigi, naudodamiesi „Spark“ straipsniu, mes baigėme šį RDD. Tikiuosi, kad mes šiek tiek apšvietėme jūsų žinias apie RDD, jų ypatybes ir įvairias operacijas, kurias jiems galima atlikti.

Šis straipsnis pagrįstas yra skirtas pasiruošti „Cloudera Hadoop“ ir „Spark Developer“ sertifikavimo egzaminui (CCA175). Jūs gausite išsamių žinių apie „Apache Spark“ ir „Spark“ ekosistemą, įskaitant „Spark RDD“, „Spark SQL“, „Spark MLlib“ ir „Spark Streaming“. Jūs gausite išsamių žinių apie „Scala“ programavimo kalbą, HDFS, „Sqoop“, „Flume“, „Spark GraphX“ ir pranešimų sistemą, pvz., „Kafka“.