Kaupiamasis būsenos transformavimas „Apache Spark“ sraute



Šiame tinklaraščio įraše aptariamos reikšmingos „Spark Streaming“ pertvarkos. Sužinokite viską apie kaupiamąjį „Hadoop Spark“ karjeros stebėjimą ir įgūdžių tobulinimą.

Prisidėjo Prithviraj Bose

konvertuoti dešimtainį į dvejetainį pitono kodą

Ankstesniame savo tinklaraštyje aptariau reikšmingas transformacijas, naudodamas „Apache Spark Streaming“ koncepciją. Galite perskaityti čia .





Šiame įraše aptarsiu „Apache Spark Streaming“ kaupiamąsias būsenos operacijas. Jei esate „Spark Streaming“ naujokas, labai rekomenduoju perskaityti mano ankstesnį tinklaraštį, kad suprastumėte, kaip veikia „windowing“.

Valstybinio virsmo rūšys srautiniame sraute (tęsinys ...)

> Kaupiamasis stebėjimas

Mes naudojome reducByKeyAndWindow (…) API raktų būsenoms sekti, tačiau tam tikrais naudojimo atvejais apribojimai kelia apribojimus. Ką daryti, jei norime kaupti raktų būsenas, o ne apriboti laiko langą? Tokiu atveju mes turėtume naudoti „updateStateByKey“ (…) UGNIS.



Ši API buvo pristatyta „Spark 1.3.0“ ir buvo labai populiari. Tačiau ši API turi šiek tiek našumo, jos našumas blogėja, kai laikui bėgant valstybių dydis didėja. Parašiau pavyzdį, norėdamas parodyti šios API naudojimą. Galite rasti kodą čia .

„Spark 1.6.0“ pristatė naują API mapWithState (…) kuris išsprendžia spektaklio pridėtines išlaidas „updateStateByKey“ (…) . Šiame tinklaraštyje aptarsiu šią konkrečią API naudodama mano parašytą programos pavyzdį. Galite rasti kodą čia .

Prieš pasinerdamas į kodą, pasigailėkime kelių žodžių apie kontrolinį tašką. Bet kokio būsenos transformavimo atveju kontrolinis taškas yra privalomas. „Checkpointing“ yra raktų būsenos atkūrimo mechanizmas, jei vairuotojo programa sugenda. Paleidus tvarkyklę iš naujo, raktų būsena atkuriama iš kontrolinių taškų failų. Kontrolinių punktų vietos paprastai yra HDFS arba „Amazon S3“ arba bet kuri patikima saugykla. Testuojant kodą, jis taip pat gali būti saugomas vietinėje failų sistemoje.



Pavyzdinėje programoje išklausome lizdo teksto srautą host = localhost ir port = 9999. Tai žymi įeinantį srautą į (žodžiai, įvykių skaičius) ir seka žodžių skaičių naudodama 1.6.0 API mapWithState (…) . Be to, raktai be naujinimų pašalinami naudojant StateSpec. Timeout API. Mes tikriname HDFS ir tikrinimo taškų dažnis yra kas 20 sekundžių.

Pirmiausia sukurkime „Spark Streaming“ sesiją,

Spark-streaming-session

Mes kuriame a kontrolinis punktasDir HDFS ir tada iškvieskite objekto metodą „getOrCreate“ (…) . The „getOrCreate“ API patikrina kontrolinis punktasDir norėdamas sužinoti, ar yra kokių nors ankstesnių būsenų, kurias reikia atkurti, jei tokių yra, ji atkuria „Spark Streaming“ sesiją ir atnaujina raktų būsenas iš failuose saugomų duomenų, prieš pereidama su naujais duomenimis. Kitu atveju sukuriama nauja „Spark Streaming“ sesija.

The „getOrCreate“ paima kontrolinio punkto katalogo pavadinimą ir funkciją (kurią mes pavadinome createFunc ) kurio parašas turėtų būti () => „StreamingContext“ .

kaip dinamiškai paskirstyti masyvą Java

Panagrinėkime kodą viduje createFunc .

2 eilutė: Sukuriame srautinį kontekstą su darbo pavadinimu „TestMapWithStateJob“ ir paketinis intervalas = 5 sekundės.

5 eilutė: nustatykite kontrolinio punkto katalogą.

8 eilutė: nustatykite būsenos specifikaciją naudodami klasę org.apache.streaming.StateSpec objektas. Pirmiausia nustatome funkciją, kuri stebės būseną, tada nustatysime gautų „DStream“ skaidinių skaičių, kurie bus sukurti per tolesnes transformacijas. Galiausiai nustatėme skirtąjį laiką (iki 30 sekundžių), jei per 30 sekundžių nebus gautas raktų atnaujinimas, tada raktų būsena bus pašalinta.

12 eilutė #: nustatykite lizdo srautą, išlyginkite gaunamus paketinius duomenis, sukurkite raktų ir verčių porą, paskambinkite mapWithState , nustatykite kontrolinio taško intervalą į 20s ir galiausiai išspausdinkite rezultatus.

„Spark“ sistema vadina tūkst e createFunc už kiekvieną raktą su ankstesne verte ir esama būsena. Apskaičiuojame sumą ir būseną atnaujiname kaupiamąja suma, o galiausiai grąžiname rakto sumą.

kaip naudoti spyderio pitoną

„Github“ šaltiniai -> „TestMapStateWithKey.scala“ , „TestUpdateStateByKey.scala“

Turite mums klausimą? Prašau paminėti tai komentarų skiltyje ir mes su jumis susisieksime.

Susijusios žinutės:

Pradėkite nuo „Apache Spark & ​​Scala“

Valstybinės transformacijos su „Windowing“ srautu