socialgekon.com
  • Tärkein
  • Suunnitteluprosessi
  • Muokkaus
  • Prosessi Ja Työkalut
  • Sijoittajat Ja Rahoitus
Tietojenkäsittely Ja Tietokannat

Apache Spark Streaming -opastus: Twitter Trending Hashtagien tunnistaminen

Nykyään data kasvaa ja kerääntyy nopeammin kuin aiemmin. Tällä hetkellä noin 90% maailmassamme tuotetuista tiedoista on tuotettu kahden viime vuoden aikana. Tämän nopeuden kasvun ansiosta alustat Suuri data heidän oli omaksuttava radikaaleja ratkaisuja voidakseen ylläpitää niin suuria tietomääriä.

Yksi merkittävimmistä tietolähteistä on nykyään sosiaalinen media. Haluan osoittaa tosielämän esimerkin: sosiaalisen median tietojen hallitseminen, analysointi ja poiminta reaaliajassa käyttäen yhtä ekoratkaisuista Suuri data tärkeimmät niistä - Apache Spark ja Python.

Apache Spark Streaming -palvelua voidaan käyttää tietojen keräämiseen sosiaalisesta mediasta, kuten nousevat Twitter-hashtagit



Tässä artikkelissa näytän sinulle, kuinka rakennetaan yksinkertainen sovellus, joka lukee Twitterin online-syötteet Pythonilla ja käsittelee sitten twiitit käyttämällä Apache Spark-suoratoisto tunnistaa hashtagit ja lopuksi palauttaa tärkeimmät trendikkäät hashtagit ja näyttää nämä tiedot kojelaudassa reaaliajassa.

Omien kirjautumistietojen luominen Twitter-sovellusliittymille

Saadaksesi twiittejä Twitteristä, sinun on rekisteröidyttävä osoitteessa TwitterApps Napsauttamalla 'Luo uusi sovellus' ja kun olet täyttänyt alla olevan lomakkeen, napsauta 'Luo Twitter-sovellus'.

Näyttökuva: Kuinka luoda Twitter-sovellus

Toiseksi, siirry äskettäin luotuun sovellukseesi ja avaa 'Tunnisteet ja avaimet' -ikkuna. Napsauta sitten 'Luo käyttöoikeustunnus'.

Näyttökuva: Twitter-sovelluksen tunnistetietojen, salasanojen ja käyttöoikeustunnusten asentaminen

Uudet kirjautumistunnuksesi näkyvät alla olevan kuvan mukaisesti.

Näyttökuva: Asennetaan käyttäjätunnuksia Twiiter-sovellukselle

Ja nyt olet valmis seuraavaan vaiheeseen.

Luo HTTP Twitter -asiakasohjelma

Tässä vaiheessa näytän sinulle, kuinka rakennetaan yksinkertainen asiakas, joka hakee twiitit Twitter-sovellusliittymästä Pythonin avulla ja välittää ne sitten ilmentymään Kipinän suoratoisto . Sen pitäisi olla helppo seurata kenellekään python-kehittäjä ammattilainen.

Ensin aiomme luoda tiedoston nimeltä twitter_app.py ja sitten aiomme lisätä koodin yhteen alla olevan kuvan mukaisesti.

Tuo kirjastot, joita aiomme käyttää, kuten alla on esitetty:

import socket import sys import requests import requests_oauthlib import json

Ja lisää muuttujat, joita käytetään OAuthissa yhteyden muodostamiseen Twitteriin alla esitetyllä tavalla:

# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Luodaan nyt uusi funktio nimeltä get_tweets joka kutsuu Twitter-sovellusliittymän URL-osoitetta ja palauttaa vastauksen merkkijonoista twiittejä.

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Sitten luot toiminnon, joka ottaa vastauksen yllä olevasta näkymästä ja poimii twiittien tekstin täydellisten twiittien JSON-objektista. Tämän jälkeen lähetä kukin tweet instanssille Kipinän suoratoisto (keskustellaan myöhemmin) TCP-yhteyden kautta.

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

Nyt teemme pääosan. Tämä saa sovelluksen isännöimään yhteyksiä pistorasiaan , johon se myöhemmin muodostaa yhteyden Kipinä . Määritetään IP: ksi localhost koska kaikki toimii samalla koneella ja satamassa 9009. Sitten aiomme kutsua get_tweets -menetelmää, jonka teimme yllä, saadaksemme twiitit Twitteristä ja välitettäessä vastauksesi yhteyden kanssa pistorasiaan a send_tweets_to_spark lähettää twiitit Sparkille.

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

Apache Spark Streaming -sovelluksen asentaminen

Rakennetaan sovelluksemme Kipinän suoratoisto , joka käsittelee saapuvat twiitit reaaliajassa, poimi niistä hashtagit ja laskee kuinka monta hashtagia on mainittu.

Kuva: * Spark-suoratoisto * mahdollistaa saapuvien twiittien reaaliaikaisen käsittelyn ja hashtag-uutteen

Ensinnäkin meidän on luotava ilmentymä Kipinäkonteksti sc, sitten luomme Suoratoiston konteksti ssc / _ + _ | kahden sekunnin välein, joka suorittaa muunnoksen kaikissa vastaanotetuissa lähetyksissä joka toinen sekunti. Huomaa, että lokitasoksi asetettiin sc pystyäksesi poistamaan käytöstä suurimman osan kirjoittamistasi lokeista Kipinä .

Määritämme tässä tarkistuspisteen voidaksemme sallia säännöllisen RDD-tarkistuksen; tätä on pakko käyttää sovelluksessamme, koska käytämme tilanomaisia ​​palontorjuntamuutoksia (keskustelemme myöhemmin samasta osasta).

Sitten määrittelemme DStream-päätiedostomme, joka yhdistää palvelimen pistorasiaan jonka loimme aiemmin satamassa ERROR ja se lukee twiitit tuosta portista. Jokainen DStreamin levy on twiitti.

9009

Nyt aiomme määritellä muunnoslogiikkamme. Ensin aiomme jakaa kaikki twiitit sanoiksi ja laittaa ne RDD-sanoiksi. Sitten suodatamme vain hashtagit kaikista sanoista ja piirrämme ne from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009) : n viereen ja laitamme ne RDD-hashtageihin.

Sitten meidän on laskettava, kuinka monta kertaa hashtag on mainittu. Voimme tehdä sen käyttämällä (hashtag, 1) -funktiota. Tämä toiminto laskee, kuinka monta kertaa kukin ryhmä on maininnut hashtagin, eli se nollaa kunkin ryhmän tilin.

Meidän tapauksessamme meidän on laskettava kaikkien ryhmien lukumäärä, joten käytämme toista funktiota nimeltä reduceByKey koska tämän toiminnon avulla voit pitää RDD-tilan päivitettäessä sitä uusilla tiedoilla. Tätä lomaketta kutsutaan nimellä updateStateByKey.

Huomaa, että Stateful Transformation: n käyttämiseksi sinun on määritettävä tarkistuspiste ja edellisessä vaiheessa tehdyt toimet.

updateStateByKey

# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination() ottaa funktion parametrina, jota kutsutaan updateStateByKey -funktioksi. Tämä suoritetaan jokaiselle RDD: n kohteelle ja suorittaa halutun logiikan.

Meidän tapauksessamme olemme luoneet päivitystoiminnon nimeltä update joka summaa kaikki aggregate_tags_count (uudet arvot) kustakin hashtagista ja lisää ne new_values (yhteensä summa), joka on kaikkien ryhmien summa ja tallentaa tiedot RDD: hen total_sum

tags_totals

Sitten suoritamme RDD-käsittelyn def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0) kussakin ryhmässä, jotta se voidaan muuntaa väliaikaiseksi taulukoksi käyttämällä Spark SQL -yhteys ja tee tämän jälkeen lausunto, jotta voit ottaa kymmenen parhaan hashtagin tilinsä kanssa ja laittaa ne tietokehykseen tags_totals.

hashtag_counts_df

Spark-sovelluksen viimeinen vaihe on lähettää datakehys def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e) kojelautasovellukseen. Siksi muunnamme datakehyksen kahdeksi matriisiksi, yhden hashtageille ja toisen heidän tileilleen. Sitten siirrymme kojelautasovellukseen REST API: n kautta.

hashtag_counts_df

Lopuksi tässä on näyte Kipinän suoratoisto käynnissä ja tulostettaessa def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data) Huomaat, että tulosteet tulostetaan tarkalleen kahden sekunnin välein kutakin ryhmäväliä kohden.

Esimerkki Twitter * Spark -suoratoisto * -ulostuloksesta, joka on tulostettu jokaiselle ryhmäväliasetukselle

Luo yksinkertainen reaaliaikainen hallintapaneeli tietojen esittämiseksi

Nyt aiomme luoda yksinkertaisen kojelautasovelluksen, jonka Spark päivittää reaaliajassa. Aiomme rakentaa sen käyttämällä Python, Flask ja Charts.js .

Ensin aiomme luoda Python-projektin, jolla on alla oleva rakenne, ladata ja lisätä tiedosto Chart.js staattisessa hakemistossa.

Kuva: Luo Python-projekti käytettäväksi Twitter Hashtag -analyysissä

Sitten aiomme luoda tiedostoon hashtag_counts_df funktion nimeltä app.py, jota Spark kutsuu URL-osoitteen update_data voidakseen päivittää yleisiä tunnisteita ja arvotaulukoita.

Samoin funktio http://localhost:5001/updateData se luodaan kutsumaan AJAX-pyynnöstä palauttamaan uudet päivitetyt tarrat ja arvotaulukot JSON-muodossa. Toiminto refresh_graph_data poistuu sivulta get_chart_page kun soitetaan.

chart.html

Nyt aiomme luoda yksinkertaisen kuvaajan tiedostoon from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001) pystyä näyttämään hashtag-tiedot ja päivittämään ne reaaliajassa. Kuten alla on määritelty, meidän on tuotava JavaScript-kirjastot chart.html ja Chart.js.

Tunnisteen rungossa meidän on luotava kangas ja annettava sille tunnus, jotta voimme viitata siihen samalla, kun näytämme kaaviota, kun käytät JavaScriptiä seuraavassa vaiheessa.

jquery.min.js

Nyt aiomme luoda kaavion alla olevalla JavaScript-koodilla. Ensinnäkin otamme kangaselementin ja sitten luomme uuden kaavioobjektin, välitämme sille piirtoelementin ja määrittelemme dataobjektin alla esitetyllä tavalla.

Huomaa, että tietotarrat yhdistetään tunnisteisiin ja arvomuuttujiin, jotka palautetaan sivulta poistuttaessa, kun kutsutaan Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

tiedostossa get_chart_page.

Viimeinen osa on toiminto, joka on määritetty tekemään Ajax-pyyntö joka sekunti ja kutsumaan URL app.py, joka suorittaa /refreshData sisään refresh_graph_data ja se palauttaa uudet päivitetyt tiedot ja päivittää sitten kaavio, jonka uudet tiedot jättävät.

app.py

Suorita sovellukset yhdessä

Suoritamme kolme sovellusta alla olevassa järjestyksessä: 1. Twitter App Client. 2. Spark-sovellus 3. Dashboard-verkkosovellus.

Sitten voit käyttää ohjauspaneelia reaaliajassa etsimällä URL-osoitetta

Nyt näet kaaviosi päivittyvän alla:

Animaatio: Kaavio trendikkäistä hashtageista Twitterissä reaaliajassa

Apache-suoratoiston tosielämän käyttötarkoitukset

Olemme oppineet tekemään yksinkertaisen data-analyysin reaaliaikaisessa datassa Spark Streamingin avulla ja integroimalla sen suoraan yksinkertaiseen ohjauspaneeliin RESTful-verkkopalvelun avulla. Tästä esimerkistä voimme nähdä, kuinka tehokas Spark on, koska se sieppaa massiivisen datavirran, muuntaa sen ja poimii arvokasta tietoa, jota voidaan käyttää helposti päätöksenteossa lyhyessä ajassa. On monia hyödyllisiä käyttötapauksia, jotka voidaan toteuttaa ja jotka voivat palvella eri toimialoja, kuten uutisia tai markkinointia.

Kuva: Hashtageja voidaan käyttää tietojen ja arvojen poimimiseen, joita voidaan käyttää useilla toimialoilla.

Uutisteollisuusesimerkki

Voimme seurata yleisimmin mainittuja hashtageja saadaksesi selville, mistä aiheista ihmiset puhuvat sosiaalisessa mediassa. Voimme myös seurata tiettyjä hashtageja ja niiden tweettejä saadakseen selville, mitä ihmiset sanovat tietyistä aiheista tai tapahtumista maailmassa.

Esimerkki markkinoinnista

Voimme kerätä twiittien lähetyksen ja tekemällä mielipideanalyysin luokitella ne ja määrittää ihmisten edut, jotta voimme tarjota heille heidän kiinnostuksen kohteisiinsa liittyviä tarjouksia.

Lisäksi on monia käyttötapauksia, joita voidaan soveltaa erityisesti analytiikkaan. Suuri data ja ne voivat palvella monia teollisuudenaloja. Lisää Apache Spark -käyttötapauksia varten suosittelen, että tutustut johonkin meidän edelliset viestit .

Suosittelen, että luet lisää Kipinän suoratoisto tässä oppia lisää sen ominaisuuksista ja tehdä edistyneempi tiedonsiirto saadaksesi lisätietoja reaaliajassa käytettäessä sitä.

Estetiikka ja havainnointi - Kuinka lähestyä käyttökokemuksia

Ux-Suunnittelu

Estetiikka ja havainnointi - Kuinka lähestyä käyttökokemuksia
Tietorakenteen periaatteet mobiililaitteille (infografiikan kanssa)

Tietorakenteen periaatteet mobiililaitteille (infografiikan kanssa)

Ux-Suunnittelu

Suosittu Viestiä
Init.js: Opas Full-Stack-JavaScriptin miksi ja miten
Init.js: Opas Full-Stack-JavaScriptin miksi ja miten
Taso ylöspäin - opas pelin käyttöliittymään (infografiikan kanssa)
Taso ylöspäin - opas pelin käyttöliittymään (infografiikan kanssa)
Tarvitset sankarin: projektipäällikkö
Tarvitset sankarin: projektipäällikkö
Viimeinen opas päivämäärän ja ajan manipulointiin
Viimeinen opas päivämäärän ja ajan manipulointiin
Kuinka poseeraa kuvissa, jotta ne näyttävät rennolta ja luonnolliselta
Kuinka poseeraa kuvissa, jotta ne näyttävät rennolta ja luonnolliselta
 
Käytännöllinen lähestymistapa pelisuunnitteluun
Käytännöllinen lähestymistapa pelisuunnitteluun
Kuinka rakentaa vahva etätyökulttuuri: Haastattelu Christy Schumannin kanssa
Kuinka rakentaa vahva etätyökulttuuri: Haastattelu Christy Schumannin kanssa
Kannustaminen toimintaan ja oikea-aikaisuuteen etätyössä
Kannustaminen toimintaan ja oikea-aikaisuuteen etätyössä
Kuinka rekrytoida UX-tutkimuksen osallistujia
Kuinka rekrytoida UX-tutkimuksen osallistujia
Monimutkainen mutta kiinteä: Katsaus kiinteistöjen vesiputouksiin
Monimutkainen mutta kiinteä: Katsaus kiinteistöjen vesiputouksiin
Luokat
Tuotteen ElinkaariTeknologiaUi-SuunnitteluProsessi Ja TyökalutKaukosäätimen NousuiOS-vinkkejäSuunnitteluKetterä KykyOngelmien karttoittaminenSuunnittelu Ja Ennustaminen

© 2023 | Kaikki Oikeudet Pidätetään

socialgekon.com