Nykyään data kasvaa ja kerääntyy nopeammin kuin aiemmin. Tällä hetkellä noin 90% maailmassamme tuotetuista tiedoista on tuotettu kahden viime vuoden aikana. Tämän nopeuden kasvun ansiosta alustat Suuri data heidän oli omaksuttava radikaaleja ratkaisuja voidakseen ylläpitää niin suuria tietomääriä.
Yksi merkittävimmistä tietolähteistä on nykyään sosiaalinen media. Haluan osoittaa tosielämän esimerkin: sosiaalisen median tietojen hallitseminen, analysointi ja poiminta reaaliajassa käyttäen yhtä ekoratkaisuista Suuri data tärkeimmät niistä - Apache Spark ja Python.
Tässä artikkelissa näytän sinulle, kuinka rakennetaan yksinkertainen sovellus, joka lukee Twitterin online-syötteet Pythonilla ja käsittelee sitten twiitit käyttämällä Apache Spark-suoratoisto tunnistaa hashtagit ja lopuksi palauttaa tärkeimmät trendikkäät hashtagit ja näyttää nämä tiedot kojelaudassa reaaliajassa.
Saadaksesi twiittejä Twitteristä, sinun on rekisteröidyttävä osoitteessa TwitterApps Napsauttamalla 'Luo uusi sovellus' ja kun olet täyttänyt alla olevan lomakkeen, napsauta 'Luo Twitter-sovellus'.
Toiseksi, siirry äskettäin luotuun sovellukseesi ja avaa 'Tunnisteet ja avaimet' -ikkuna. Napsauta sitten 'Luo käyttöoikeustunnus'.
Uudet kirjautumistunnuksesi näkyvät alla olevan kuvan mukaisesti.
Ja nyt olet valmis seuraavaan vaiheeseen.
Tässä vaiheessa näytän sinulle, kuinka rakennetaan yksinkertainen asiakas, joka hakee twiitit Twitter-sovellusliittymästä Pythonin avulla ja välittää ne sitten ilmentymään Kipinän suoratoisto . Sen pitäisi olla helppo seurata kenellekään python-kehittäjä ammattilainen.
Ensin aiomme luoda tiedoston nimeltä twitter_app.py
ja sitten aiomme lisätä koodin yhteen alla olevan kuvan mukaisesti.
Tuo kirjastot, joita aiomme käyttää, kuten alla on esitetty:
import socket import sys import requests import requests_oauthlib import json
Ja lisää muuttujat, joita käytetään OAuthissa yhteyden muodostamiseen Twitteriin alla esitetyllä tavalla:
# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Luodaan nyt uusi funktio nimeltä get_tweets
joka kutsuu Twitter-sovellusliittymän URL-osoitetta ja palauttaa vastauksen merkkijonoista twiittejä.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Sitten luot toiminnon, joka ottaa vastauksen yllä olevasta näkymästä ja poimii twiittien tekstin täydellisten twiittien JSON-objektista. Tämän jälkeen lähetä kukin tweet instanssille Kipinän suoratoisto (keskustellaan myöhemmin) TCP-yhteyden kautta.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Nyt teemme pääosan. Tämä saa sovelluksen isännöimään yhteyksiä pistorasiaan , johon se myöhemmin muodostaa yhteyden Kipinä . Määritetään IP: ksi localhost
koska kaikki toimii samalla koneella ja satamassa 9009
. Sitten aiomme kutsua get_tweets
-menetelmää, jonka teimme yllä, saadaksemme twiitit Twitteristä ja välitettäessä vastauksesi yhteyden kanssa pistorasiaan a send_tweets_to_spark
lähettää twiitit Sparkille.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Rakennetaan sovelluksemme Kipinän suoratoisto , joka käsittelee saapuvat twiitit reaaliajassa, poimi niistä hashtagit ja laskee kuinka monta hashtagia on mainittu.
Ensinnäkin meidän on luotava ilmentymä Kipinäkonteksti sc
, sitten luomme Suoratoiston konteksti ssc
/ _ + _ | kahden sekunnin välein, joka suorittaa muunnoksen kaikissa vastaanotetuissa lähetyksissä joka toinen sekunti. Huomaa, että lokitasoksi asetettiin sc
pystyäksesi poistamaan käytöstä suurimman osan kirjoittamistasi lokeista Kipinä .
Määritämme tässä tarkistuspisteen voidaksemme sallia säännöllisen RDD-tarkistuksen; tätä on pakko käyttää sovelluksessamme, koska käytämme tilanomaisia palontorjuntamuutoksia (keskustelemme myöhemmin samasta osasta).
Sitten määrittelemme DStream-päätiedostomme, joka yhdistää palvelimen pistorasiaan jonka loimme aiemmin satamassa ERROR
ja se lukee twiitit tuosta portista. Jokainen DStreamin levy on twiitti.
9009
Nyt aiomme määritellä muunnoslogiikkamme. Ensin aiomme jakaa kaikki twiitit sanoiksi ja laittaa ne RDD-sanoiksi. Sitten suodatamme vain hashtagit kaikista sanoista ja piirrämme ne from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
: n viereen ja laitamme ne RDD-hashtageihin.
Sitten meidän on laskettava, kuinka monta kertaa hashtag on mainittu. Voimme tehdä sen käyttämällä (hashtag, 1)
-funktiota. Tämä toiminto laskee, kuinka monta kertaa kukin ryhmä on maininnut hashtagin, eli se nollaa kunkin ryhmän tilin.
Meidän tapauksessamme meidän on laskettava kaikkien ryhmien lukumäärä, joten käytämme toista funktiota nimeltä reduceByKey
koska tämän toiminnon avulla voit pitää RDD-tilan päivitettäessä sitä uusilla tiedoilla. Tätä lomaketta kutsutaan nimellä updateStateByKey
.
Huomaa, että Stateful Transformation
: n käyttämiseksi sinun on määritettävä tarkistuspiste ja edellisessä vaiheessa tehdyt toimet.
updateStateByKey
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
ottaa funktion parametrina, jota kutsutaan updateStateByKey
-funktioksi. Tämä suoritetaan jokaiselle RDD: n kohteelle ja suorittaa halutun logiikan.
Meidän tapauksessamme olemme luoneet päivitystoiminnon nimeltä update
joka summaa kaikki aggregate_tags_count
(uudet arvot) kustakin hashtagista ja lisää ne new_values
(yhteensä summa), joka on kaikkien ryhmien summa ja tallentaa tiedot RDD: hen total_sum
tags_totals
Sitten suoritamme RDD-käsittelyn def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
kussakin ryhmässä, jotta se voidaan muuntaa väliaikaiseksi taulukoksi käyttämällä Spark SQL -yhteys ja tee tämän jälkeen lausunto, jotta voit ottaa kymmenen parhaan hashtagin tilinsä kanssa ja laittaa ne tietokehykseen tags_totals
.
hashtag_counts_df
Spark-sovelluksen viimeinen vaihe on lähettää datakehys def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
kojelautasovellukseen. Siksi muunnamme datakehyksen kahdeksi matriisiksi, yhden hashtageille ja toisen heidän tileilleen. Sitten siirrymme kojelautasovellukseen REST API: n kautta.
hashtag_counts_df
Lopuksi tässä on näyte Kipinän suoratoisto käynnissä ja tulostettaessa def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Huomaat, että tulosteet tulostetaan tarkalleen kahden sekunnin välein kutakin ryhmäväliä kohden.
Nyt aiomme luoda yksinkertaisen kojelautasovelluksen, jonka Spark päivittää reaaliajassa. Aiomme rakentaa sen käyttämällä Python, Flask ja Charts.js .
Ensin aiomme luoda Python-projektin, jolla on alla oleva rakenne, ladata ja lisätä tiedosto Chart.js staattisessa hakemistossa.
Sitten aiomme luoda tiedostoon hashtag_counts_df
funktion nimeltä app.py
, jota Spark kutsuu URL-osoitteen update_data
voidakseen päivittää yleisiä tunnisteita ja arvotaulukoita.
Samoin funktio http://localhost:5001/updateData
se luodaan kutsumaan AJAX-pyynnöstä palauttamaan uudet päivitetyt tarrat ja arvotaulukot JSON-muodossa. Toiminto refresh_graph_data
poistuu sivulta get_chart_page
kun soitetaan.
chart.html
Nyt aiomme luoda yksinkertaisen kuvaajan tiedostoon from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
pystyä näyttämään hashtag-tiedot ja päivittämään ne reaaliajassa. Kuten alla on määritelty, meidän on tuotava JavaScript-kirjastot chart.html
ja Chart.js
.
Tunnisteen rungossa meidän on luotava kangas ja annettava sille tunnus, jotta voimme viitata siihen samalla, kun näytämme kaaviota, kun käytät JavaScriptiä seuraavassa vaiheessa.
jquery.min.js
Nyt aiomme luoda kaavion alla olevalla JavaScript-koodilla. Ensinnäkin otamme kangaselementin ja sitten luomme uuden kaavioobjektin, välitämme sille piirtoelementin ja määrittelemme dataobjektin alla esitetyllä tavalla.
Huomaa, että tietotarrat yhdistetään tunnisteisiin ja arvomuuttujiin, jotka palautetaan sivulta poistuttaessa, kun kutsutaan Top Trending Twitter Hashtags
tiedostossa Top Trending Twitter Hashtags
get_chart_page
.
Viimeinen osa on toiminto, joka on määritetty tekemään Ajax-pyyntö joka sekunti ja kutsumaan URL app.py
, joka suorittaa /refreshData
sisään refresh_graph_data
ja se palauttaa uudet päivitetyt tiedot ja päivittää sitten kaavio, jonka uudet tiedot jättävät.
app.py
Suoritamme kolme sovellusta alla olevassa järjestyksessä: 1. Twitter App Client. 2. Spark-sovellus 3. Dashboard-verkkosovellus.
Sitten voit käyttää ohjauspaneelia reaaliajassa etsimällä URL-osoitetta
Nyt näet kaaviosi päivittyvän alla:
Olemme oppineet tekemään yksinkertaisen data-analyysin reaaliaikaisessa datassa Spark Streamingin avulla ja integroimalla sen suoraan yksinkertaiseen ohjauspaneeliin RESTful-verkkopalvelun avulla. Tästä esimerkistä voimme nähdä, kuinka tehokas Spark on, koska se sieppaa massiivisen datavirran, muuntaa sen ja poimii arvokasta tietoa, jota voidaan käyttää helposti päätöksenteossa lyhyessä ajassa. On monia hyödyllisiä käyttötapauksia, jotka voidaan toteuttaa ja jotka voivat palvella eri toimialoja, kuten uutisia tai markkinointia.
Uutisteollisuusesimerkki
Voimme seurata yleisimmin mainittuja hashtageja saadaksesi selville, mistä aiheista ihmiset puhuvat sosiaalisessa mediassa. Voimme myös seurata tiettyjä hashtageja ja niiden tweettejä saadakseen selville, mitä ihmiset sanovat tietyistä aiheista tai tapahtumista maailmassa.
Esimerkki markkinoinnista
Voimme kerätä twiittien lähetyksen ja tekemällä mielipideanalyysin luokitella ne ja määrittää ihmisten edut, jotta voimme tarjota heille heidän kiinnostuksen kohteisiinsa liittyviä tarjouksia.
Lisäksi on monia käyttötapauksia, joita voidaan soveltaa erityisesti analytiikkaan. Suuri data ja ne voivat palvella monia teollisuudenaloja. Lisää Apache Spark -käyttötapauksia varten suosittelen, että tutustut johonkin meidän edelliset viestit .
Suosittelen, että luet lisää Kipinän suoratoisto tässä oppia lisää sen ominaisuuksista ja tehdä edistyneempi tiedonsiirto saadaksesi lisätietoja reaaliajassa käytettäessä sitä.