Σήμερα, τα δεδομένα αυξάνονται και συσσωρεύονται γρηγορότερα από πριν. Επί του παρόντος, περίπου το 90% των δεδομένων που δημιουργήθηκαν στον κόσμο μας δημιουργήθηκαν τα τελευταία δύο χρόνια. Λόγω αυτής της αύξησης του ρυθμού, πλατφόρμες μεγάλα δεδομένα έπρεπε να υιοθετήσουν ριζικές λύσεις για να μπορούν να διατηρήσουν τόσο μεγάλους όγκους δεδομένων.
Πώς να αποφύγετε τις διαρροές μνήμης στη java
Μία από τις πιο σημαντικές πηγές δεδομένων σήμερα είναι τα μέσα κοινωνικής δικτύωσης. Επιτρέψτε μου να δείξω ένα πραγματικό παράδειγμα: διαχείριση, ανάλυση και εξαγωγή πληροφοριών από δεδομένα κοινωνικών μέσων σε πραγματικό χρόνο, χρησιμοποιώντας μία από τις οικολογικές λύσεις στο μεγάλα δεδομένα τα πιο σημαντικά εκεί έξω - Apache Spark και Python.
Σε αυτό το άρθρο, θα σας δείξω πώς να δημιουργήσετε μια απλή εφαρμογή που διαβάζει τις διαδικτυακές ροές Twitter χρησιμοποιώντας το Python και, στη συνέχεια, επεξεργάζεται tweets χρησιμοποιώντας Ροή Apache Spark για να προσδιορίσετε τα hashtag και, τέλος, να επιστρέψετε τα πιο σημαντικά trending hashtag και να αποδώσετε αυτά τα δεδομένα στον πίνακα ελέγχου σε πραγματικό χρόνο.
Για να λάβετε tweets από το Twitter, πρέπει να εγγραφείτε στο TwitterApps Κάνοντας κλικ στο 'Δημιουργία νέας εφαρμογής' και αφού συμπληρώσετε την παρακάτω φόρμα, κάντε κλικ στο 'Δημιουργία της εφαρμογής σας στο Twitter'.
Δεύτερον, μεταβείτε στην εφαρμογή που δημιουργήσατε πρόσφατα και ανοίξτε το παράθυρο 'Αναγνωριστικά πρόσβασης και κλειδιά'. Στη συνέχεια, κάντε κλικ στο 'Δημιουργία αναγνωριστικού πρόσβασης'.
Τα νέα αναγνωριστικά σύνδεσης θα εμφανίζονται όπως φαίνεται παρακάτω.
Και τώρα είστε έτοιμοι για το επόμενο βήμα.
Σε αυτό το βήμα, θα σας δείξω πώς να δημιουργήσετε έναν απλό πελάτη που θα πάρει tweet από το API του Twitter χρησιμοποιώντας το Python και στη συνέχεια θα τα μεταβιβάσει στην παρουσία Ροή σπινθήρων . Πρέπει να είναι εύκολο να ακολουθηθεί για οποιοδήποτε προγραμματιστής python επαγγελματίας.
Αρχικά, θα δημιουργήσουμε ένα αρχείο με το όνομα _ _ + _ | και μετά θα προσθέσουμε τον κώδικα μαζί όπως φαίνεται παρακάτω.
τι είναι κίνητρο πωλήσεων
Εισαγάγετε τις βιβλιοθήκες που πρόκειται να χρησιμοποιήσουμε όπως φαίνεται παρακάτω:
twitter_app.py
Και προσθέστε τις μεταβλητές που θα χρησιμοποιηθούν στο OAuth για σύνδεση με το Twitter όπως φαίνεται παρακάτω:
import socket import sys import requests import requests_oauthlib import json
Τώρα, ας δημιουργήσουμε μια νέα συνάρτηση που ονομάζεται # Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
που θα καλέσει τη διεύθυνση URL του API Twitter και θα επιστρέψει την απάντηση για μια σειρά από tweets.
get_tweets
Στη συνέχεια, δημιουργείτε μια συνάρτηση που λαμβάνει την απάντηση από την παραπάνω προβολή και εξάγει το κείμενο των tweets από το αντικείμενο JSON των πλήρων tweets. Μετά από αυτό, στείλτε κάθε tweet στην παρουσία Ροή σπινθήρων (θα συζητηθεί αργότερα) μέσω σύνδεσης TCP.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Τώρα θα κάνουμε το κύριο μέρος. Αυτό θα κάνει την εφαρμογή να φιλοξενήσει τις συνδέσεις πρίζα , με το οποίο θα συνδεθεί αργότερα Σπίθα . Ας ορίσουμε την IP εδώ να είναι def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
αφού όλα θα λειτουργούν στο ίδιο μηχάνημα και στη θύρα localhost
. Στη συνέχεια, θα καλέσουμε τη μέθοδο 9009
, την οποία κάναμε παραπάνω, για να λάβουμε τα tweets από το Twitter και να περάσουμε την απάντησή σας με τη σύνδεση πρίζα α get_tweets
για να στείλετε τα tweets στο Spark.
send_tweets_to_spark
Ας φτιάξουμε την εφαρμογή μας Ροή σπινθήρων , το οποίο θα επεξεργάζεται εισερχόμενα tweets σε πραγματικό χρόνο, θα εξαγάγει hashtag από αυτά και θα υπολογίσει πόσα hashtag έχουν αναφερθεί.
Πρώτον, πρέπει να δημιουργήσουμε μια παρουσία Πλαίσιο σπινθήρων TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
, τότε δημιουργούμε Περιεχόμενο ροής sc
από ssc
με ένα διάστημα δύο δευτερολέπτων που θα πραγματοποιεί τον μετασχηματισμό σε όλες τις μεταδόσεις που λαμβάνονται κάθε δύο δευτερόλεπτα. Σημειώστε ότι ορίζουμε το επίπεδο καταγραφής σε sc
για να μπορείτε να απενεργοποιήσετε τα περισσότερα από τα αρχεία καταγραφής που γράφετε Σπίθα .
Ορίζουμε εδώ ένα σημείο ελέγχου για να μπορούμε να επιτρέπουμε έναν περιοδικό έλεγχο RDD. Αυτό είναι υποχρεωτικό για χρήση στην εφαρμογή μας καθώς θα χρησιμοποιούμε κρατικούς μετασχηματισμούς πυρόσβεσης (θα συζητηθούν αργότερα στην ίδια ενότητα).
Στη συνέχεια, ορίζουμε το κύριο datatream DStream, το οποίο θα συνδέει τον διακομιστή πρίζα που δημιουργήσαμε νωρίτερα στο λιμάνι ERROR
και θα διαβάσει τα tweets από αυτό το λιμάνι. Κάθε εγγραφή στο DStream θα είναι ένα tweet.
9009
Τώρα, πρόκειται να καθορίσουμε τη λογική του μετασχηματισμού μας. Πρώτον, θα σπάσουμε όλα τα tweets σε λέξεις και θα τα βάλουμε σε λέξεις RDD. Στη συνέχεια, φιλτράρουμε μόνο τα hashtag όλων των λέξεων και τα σχεδιάζουμε δίπλα στο from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
και τα βάζουμε σε hashtag RDD.
Στη συνέχεια, πρέπει να υπολογίσουμε πόσες φορές αναφέρεται το hashtag. Μπορούμε να το κάνουμε χρησιμοποιώντας τη συνάρτηση (hashtag, 1)
Αυτή η συνάρτηση θα υπολογίσει πόσες φορές το hashtag αναφέρεται από κάθε ομάδα, δηλαδή θα επαναφέρει τον λογαριασμό σε κάθε ομάδα.
ποια είναι τα στοιχεία και οι αρχές του σχεδιασμού
Στην περίπτωσή μας, πρέπει να υπολογίσουμε τις μετρήσεις σε όλες τις ομάδες, οπότε θα χρησιμοποιήσουμε μια άλλη συνάρτηση που ονομάζεται reduceByKey
δεδομένου ότι αυτή η λειτουργία σάς επιτρέπει να διατηρείτε την κατάσταση RDD ενώ την ενημερώνετε με νέα δεδομένα. Αυτή η φόρμα ονομάζεται updateStateByKey
.
Λάβετε υπόψη ότι για να χρησιμοποιήσετε το Stateful Transformation
, πρέπει να διαμορφώσετε ένα σημείο ελέγχου και τι έγινε στο προηγούμενο βήμα.
updateStateByKey
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
παίρνει μια συνάρτηση ως παράμετρος που ονομάζεται συνάρτηση updateStateByKey
Αυτό εκτελείται σε κάθε στοιχείο του RDD και εκτελεί την επιθυμητή λογική.
Στην περίπτωσή μας, έχουμε δημιουργήσει μια συνάρτηση ενημέρωσης που ονομάζεται update
που θα αθροίσει όλα τα aggregate_tags_count
(νέες τιμές) για κάθε hashtag και θα τα προσθέσει στο new_values
(συνολικό άθροισμα), που είναι το άθροισμα όλων των ομάδων και αποθηκεύει τα δεδομένα σε RDD total_sum
.
tags_totals
Στη συνέχεια κάνουμε επεξεργασία RDD def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
σε κάθε ομάδα για να μπορείτε να το μετατρέψετε σε προσωρινό πίνακα χρησιμοποιώντας Πλαίσιο Spark SQL και μετά από αυτό, κάντε μια δήλωση για να μπορέσετε να λάβετε τα δέκα πρώτα hashtag με τους λογαριασμούς τους και να τα βάλετε στο πλαίσιο δεδομένων tags_totals
.
hashtag_counts_df
Το τελευταίο βήμα στην εφαρμογή Spark είναι να στείλουμε το πλαίσιο δεδομένων def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
στην εφαρμογή πίνακα ελέγχου. Έτσι, θα μετατρέψουμε το πλαίσιο δεδομένων σε δύο πίνακες, ένα για τα hashtag και ένα για τους λογαριασμούς τους. Στη συνέχεια, θα προχωρήσουμε στην εφαρμογή ταμπλό μέσω REST API.
hashtag_counts_df
Τέλος, εδώ είναι ένα δείγμα της εξόδου του Ροή σπινθήρων ενώ εκτελείτε και εκτυπώνετε το def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
. Θα παρατηρήσετε ότι η έξοδος εκτυπώνεται ακριβώς κάθε δύο δευτερόλεπτα για κάθε διάστημα ομάδας.
Τώρα, πρόκειται να δημιουργήσουμε μια απλή εφαρμογή ταμπλό που θα ενημερώνεται σε πραγματικό χρόνο από το Spark. Θα το φτιάξουμε χρησιμοποιώντας Python, Flask και Charts.js .
Πρώτον, πρόκειται να δημιουργήσουμε ένα έργο Python με τη δομή που φαίνεται παρακάτω, να κατεβάσετε και να προσθέσετε το αρχείο Διάγραμμα.js στον στατικό κατάλογο.
Στη συνέχεια, στο αρχείο hashtag_counts_df
, θα δημιουργήσουμε μια συνάρτηση που ονομάζεται app.py
, η οποία θα κληθεί από τον Spark μέσω της διεύθυνσης URL update_data
για να μπορέσετε να ενημερώσετε καθολικές ετικέτες και πίνακες τιμών.
Ομοίως, η συνάρτηση http://localhost:5001/updateData
δημιουργήθηκε για να κληθεί από το αίτημα AJAX για να επιστρέψει τις νέες ενημερωμένες ετικέτες και πίνακες τιμών ως JSON. Η συνάρτηση refresh_graph_data
θα φύγει από τη σελίδα get_chart_page
όταν καλείται.
chart.html
Τώρα πρόκειται να δημιουργήσουμε ένα απλό γράφημα στο αρχείο from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
για να μπορείτε να εμφανίζετε τα δεδομένα hashtag και να τα ενημερώνετε σε πραγματικό χρόνο. Όπως ορίζεται παρακάτω, πρέπει να εισαγάγουμε τις βιβλιοθήκες JavaScript, chart.html
και Chart.js
.
Στο σώμα της ετικέτας, πρέπει να δημιουργήσουμε έναν καμβά και να του δώσουμε ένα αναγνωριστικό για να μπορούμε να το αναφέρουμε ενώ προβάλλουμε το γράφημα κατά τη χρήση JavaScript στο επόμενο βήμα.
jquery.min.js
Τώρα πρόκειται να δημιουργήσουμε το γράφημα χρησιμοποιώντας τον παρακάτω κώδικα JavaScript. Αρχικά, παίρνουμε το στοιχείο καμβά και μετά δημιουργούμε ένα νέο αντικείμενο γραφήματος και μεταβιβάζουμε σε αυτό το στοιχείο καμβά και ορίζουμε το αντικείμενο δεδομένων όπως φαίνεται παρακάτω.
Σημειώστε ότι οι ετικέτες δεδομένων συνδέονται με ετικέτες και μεταβλητές τιμής που επιστρέφονται κατά την έξοδο από τη σελίδα, όταν καλείτε το Top Trending Twitter Hashtags
στο αρχείο Top Trending Twitter Hashtags
get_chart_page
.
Το τελευταίο μέρος είναι η συνάρτηση που έχει διαμορφωθεί για να κάνει ένα αίτημα Ajax κάθε δευτερόλεπτο και καλεί τη διεύθυνση URL app.py
, η οποία θα εκτελέσει /refreshData
σε refresh_graph_data
και θα επιστρέψει τα νέα ενημερωμένα δεδομένα και στη συνέχεια θα ενημερώσει το γράφημα που αφήνουν τα νέα δεδομένα.
javascript από την πλευρά του διακομιστή node js
app.py
Θα εκτελέσουμε τις τρεις εφαρμογές με την παρακάτω σειρά: 1. Twitter App Client. 2. Spark App 3. Dashboard Web App.
Στη συνέχεια, μπορείτε να αποκτήσετε πρόσβαση στον πίνακα ελέγχου σε πραγματικό χρόνο αναζητώντας τη διεύθυνση URL
βέλτιστες πρακτικές responsive web design
Τώρα μπορείτε να δείτε το γράφημα σας να ενημερώνεται παρακάτω:
Έχουμε μάθει να κάνουμε απλή ανάλυση δεδομένων σε δεδομένα πραγματικού χρόνου χρησιμοποιώντας το Spark Streaming και να το ενσωματώνουμε απευθείας σε έναν απλό πίνακα ελέγχου, χρησιμοποιώντας μια υπηρεσία διαδικτύου RESTful. Από αυτό το παράδειγμα, μπορούμε να δούμε πόσο ισχυρό είναι το Spark, καθώς συλλαμβάνει μια τεράστια ροή δεδομένων, το μετατρέπει και εξάγει πολύτιμες πληροφορίες που μπορούν εύκολα να χρησιμοποιηθούν για τη λήψη αποφάσεων σε σύντομο χρονικό διάστημα. Υπάρχουν πολλές χρήσιμες περιπτώσεις χρήσης, οι οποίες μπορούν να εφαρμοστούν και μπορούν να εξυπηρετήσουν διάφορους κλάδους, όπως ειδήσεις ή μάρκετινγκ.
Παράδειγμα βιομηχανίας ειδήσεων
Μπορούμε να παρακολουθήσουμε τα πιο συχνά αναφερόμενα hashtag για να μάθουμε ποια θέματα μιλούν οι άνθρωποι στα μέσα κοινωνικής δικτύωσης. Μπορούμε επίσης να παρακολουθούμε συγκεκριμένα hashtag και τα tweets τους για να μάθουμε τι λένε οι άνθρωποι για συγκεκριμένα θέματα ή εκδηλώσεις στον κόσμο.
Παράδειγμα μάρκετινγκ
Μπορούμε να συλλέξουμε τη μετάδοση tweets και, κάνοντας μια ανάλυση γνώμης, να τα κατηγοριοποιήσουμε και να προσδιορίσουμε τα συμφέροντα των ανθρώπων προκειμένου να τους προσφέρουμε προσφορές που σχετίζονται με τα ενδιαφέροντά τους.
Επίσης, υπάρχουν πολλές περιπτώσεις χρήσης που μπορούν να εφαρμοστούν ειδικά για αναλυτικά στοιχεία. μεγάλα δεδομένα και μπορούν να εξυπηρετήσουν πολλές βιομηχανίες. Για περισσότερες περιπτώσεις χρήσης Apache Spark, σας προτείνω να ρίξετε μια ματιά σε μία από τις δικές μας προηγούμενες δημοσιεύσεις .
Σας συνιστώ να διαβάσετε περισσότερα για Ροή σπινθήρων εδώ για να μάθετε περισσότερα σχετικά με τις δυνατότητές του και να κάνετε πιο προηγμένο μετασχηματισμό δεδομένων για περισσότερες πληροφορίες σε πραγματικό χρόνο κατά τη χρήση.