portaldacalheta.pt
  • Κύριος
  • Επενδυτές & Χρηματοδότηση
  • Σχεδιασμός Διεπαφής Χρήστη
  • Τεχνολογία
  • Διαχείριση Έργου
Πίσω Μέρος

Ενορχήστρωση μιας ροής εργασίας στο παρασκήνιο στο σέλινο για Python



Οι σύγχρονες διαδικτυακές εφαρμογές και τα υποκείμενα συστήματά τους είναι ταχύτερα και πιο ευαίσθητα από ποτέ. Ωστόσο, εξακολουθούν να υπάρχουν πολλές περιπτώσεις όπου θέλετε να εκφορτώσετε την εκτέλεση μιας βαριάς εργασίας σε άλλα μέρη ολόκληρης της αρχιτεκτονικής του συστήματός σας αντί να τα αντιμετωπίζετε στο κύριο νήμα σας. Ο εντοπισμός τέτοιων εργασιών είναι τόσο απλός όσο ο έλεγχος για να διαπιστωθεί αν ανήκουν σε μία από τις ακόλουθες κατηγορίες:

  • Περιοδικές εργασίες - Εργασίες που θα προγραμματίσετε να εκτελούνται σε συγκεκριμένη ώρα ή μετά από ένα διάστημα, π.χ. μηνιαία δημιουργία αναφορών ή web scraper που εκτελείται δύο φορές την ημέρα.
  • Εργασίες τρίτου μέρους - Η εφαρμογή ιστού πρέπει να εξυπηρετεί γρήγορα τους χρήστες χωρίς να περιμένει την ολοκλήρωση άλλων ενεργειών κατά τη φόρτωση της σελίδας, π.χ. αποστολή email ή ειδοποίησης ή μετάδοση ενημερώσεων σε εσωτερικά εργαλεία (όπως συλλογή δεδομένων για δοκιμές A / B ή καταγραφή συστήματος ).
  • Μακροχρόνιες εργασίες - Εργασίες που είναι ακριβές σε πόρους, όπου οι χρήστες πρέπει να περιμένουν ενώ υπολογίζουν τα αποτελέσματά τους, π.χ. σύνθετη εκτέλεση ροής εργασίας (ροές εργασίας DAG), δημιουργία γραφημάτων, Map-Reduce όπως εργασίες και προβολή περιεχομένου μέσων (βίντεο, ήχος).

Μια απλή λύση για την εκτέλεση μιας εργασίας παρασκηνίου θα ήταν η εκτέλεση σε ένα ξεχωριστό νήμα ή διαδικασία. Η Python είναι μια πλήρης γλώσσα προγραμματισμού Turing υψηλού επιπέδου, η οποία δυστυχώς δεν παρέχει ενσωματωμένη ταυτόχρονη σε κλίμακα που ταιριάζει με εκείνη των Erlang, Go, Java, Scala ή Akka. Αυτά βασίζονται στις Διαδοχικές Διαδικασίες Επικοινωνίας του Tony Hoare ( CSP ). Τα νήματα Python, από την άλλη πλευρά, συντονίζονται και προγραμματίζονται από την παγκόσμια κλειδαριά διερμηνέα ( ΓΙΙΛ ), που εμποδίζει την εκτέλεση πολλαπλών κωδικών Python ταυτόχρονα από πολλά εγγενή νήματα. Η απαλλαγή από το GIL είναι ένα θέμα πολλών συζητήσεων μεταξύ Προγραμματιστές Python , αλλά δεν είναι το επίκεντρο αυτού του άρθρου. Ο ταυτόχρονος προγραμματισμός στην Python είναι ντεμοντέ, αν και μπορείτε να το διαβάσετε Εκμάθηση πολλαπλών νημάτων Python από τον συνάδελφο ApeeScapeer Marcus McCurdy. Έτσι, ο σχεδιασμός επικοινωνίας μεταξύ διεργασιών με συνέπεια είναι μια διαδικασία επιρρεπής σε σφάλματα και οδηγεί σε σύζευξη κώδικα και κακή συντήρηση του συστήματος, για να μην αναφέρουμε ότι επηρεάζει αρνητικά την επεκτασιμότητα. Επιπλέον, η διαδικασία Python είναι μια κανονική διαδικασία σε ένα Λειτουργικό Σύστημα (OS) και, με ολόκληρη την τυπική βιβλιοθήκη Python, γίνεται βαρέων βαρών. Καθώς ο αριθμός των διαδικασιών στην εφαρμογή αυξάνεται, η μετάβαση από μια τέτοια διαδικασία σε άλλη γίνεται μια χρονοβόρα λειτουργία.



Για να κατανοήσετε καλύτερα τη συνάφεια με τον Python, παρακολουθήστε αυτήν την απίστευτη ομιλία του David Beazley στο PyCon'15 .



Μια πολύ καλύτερη λύση είναι να σερβίρετε ένα κατανεμημένη ουρά ή το γνωστό παράδειγμα αδελφού του δημοσίευση-εγγραφή . Όπως απεικονίζεται στο Σχήμα 1, υπάρχουν δύο τύποι εφαρμογών στις οποίες μία, που ονομάζεται εκδότης , στέλνει μηνύματα και το άλλο, που ονομάζεται συνδρομητής , λαμβάνει μηνύματα. Αυτοί οι δύο πράκτορες δεν αλληλεπιδρούν μεταξύ τους άμεσα και δεν γνωρίζουν καν ο ένας τον άλλον. Οι εκδότες στέλνουν μηνύματα σε μια κεντρική ουρά ή μεσίτης και οι συνδρομητές λαμβάνουν μηνύματα ενδιαφέροντος από αυτόν τον μεσίτη. Υπάρχουν δύο βασικά πλεονεκτήματα σε αυτήν τη μέθοδο:



  • Επεκτασιμότητα - οι πράκτορες δεν χρειάζεται να γνωρίζουν ο ένας τον άλλον στο δίκτυο. Επικεντρώνονται ανά θέμα. Αυτό σημαίνει ότι ο καθένας μπορεί να συνεχίσει να λειτουργεί κανονικά ανεξάρτητα από τον άλλο με ασύγχρονο τρόπο.
  • Χαλαρή σύζευξη - κάθε πράκτορας αντιπροσωπεύει το μέρος του συστήματος (service, module). Δεδομένου ότι είναι χαλαρά συνδεδεμένα, το καθένα μπορεί να κλιμακώσει ξεχωριστά πέρα ​​από το κέντρο δεδομένων.

Υπάρχουν πολλά συστήματα ανταλλαγής μηνυμάτων που υποστηρίζουν τέτοια παραδείγματα και παρέχουν ένα τακτοποιημένο API, που οδηγείται είτε από πρωτόκολλα TCP ή HTTP, π.χ., JMS, RabbitMQ, Redis Pub / Sub, Apache ActiveMQ κ.λπ.

Δημοσίευση-Εγγραφείτε πρότυπο με το Celery Python
Εικόνα 1: Παράδειγμα δημοσίευσης-εγγραφής

Τι είναι το σέλινο;

Σέλινο είναι ένας από τους πιο δημοφιλείς διαχειριστές υποβάθρου στον κόσμο της Python. Το σέλινο είναι συμβατό με αρκετούς μεσίτες μηνυμάτων όπως το RabbitMQ ή το Redis και μπορεί να λειτουργήσει τόσο ως παραγωγός όσο και ως καταναλωτής.



Το σέλινο είναι μια ασύγχρονη ουρά εργασιών / ουρά εργασίας με βάση τη διανομή διανεμημένων μηνυμάτων. Επικεντρώνεται σε λειτουργίες σε πραγματικό χρόνο, αλλά υποστηρίζει επίσης προγραμματισμό. Οι μονάδες εκτέλεσης, που ονομάζονται εργασίες, εκτελούνται ταυτόχρονα σε έναν ή περισσότερους διακομιστές εργαζομένων χρησιμοποιώντας πολλαπλή επεξεργασία, Εκδήλωση , ή εξαερίζεται . Οι εργασίες μπορούν να εκτελούνται ασύγχρονα (στο παρασκήνιο) ή συγχρονισμένα (περιμένετε μέχρι να είναι έτοιμα). - Έργο σέλινο

Για να ξεκινήσετε με το Σέλινο, απλώς ακολουθήστε έναν βήμα προς βήμα οδηγό στο επίσημα έγγραφα .



Το επίκεντρο αυτού του άρθρου είναι να σας δώσει μια καλή κατανόηση για το ποιες περιπτώσεις χρήσης θα μπορούσαν να καλυφθούν από το Σέλινο. Σε αυτό το άρθρο, δεν θα δείξουμε μόνο ενδιαφέροντα παραδείγματα, αλλά θα προσπαθήσουμε να μάθουμε πώς να εφαρμόζουμε το Σέλινο σε εργασίες πραγματικού κόσμου, όπως αλληλογραφία παρασκηνίου, δημιουργία αναφορών, καταγραφή και αναφορές σφαλμάτων. Θα μοιραστώ τον τρόπο μου για να δοκιμάσω τις εργασίες πέρα ​​από την εξομοίωση και, τέλος, θα παράσχω μερικά κόλπα που δεν είναι (καλά) τεκμηριωμένα στην επίσημη τεκμηρίωση που μου πήρε ώρες έρευνας για να ανακαλύψω για τον εαυτό μου.

πράγματα που έχουν να κάνουν με sdr

Εάν δεν έχετε προηγούμενη εμπειρία με το Σέλινο, σας προτείνω πρώτα να το δοκιμάσετε ακολουθώντας το επίσημο σεμινάριο.



Εκνευρίζοντας την όρεξή σας

Εάν αυτό το άρθρο σας ενοχλεί και σας κάνει να θέλετε να μπείτε στον κώδικα αμέσως, ακολουθήστε αυτό Αποθήκη GitHub για τον κωδικό που χρησιμοποιείται σε αυτό το άρθρο. Το README Το αρχείο εκεί θα σας δώσει τη γρήγορη και βρώμικη προσέγγιση για τρέξιμο και παιχνίδι με τα παραδείγματα εφαρμογών.

Πρώτα βήματα με σέλινο

Για αρχάριους, θα ακολουθήσουμε μια σειρά πρακτικών παραδειγμάτων που θα δείξουν στον αναγνώστη πώς απλά και κομψά το Σέλινο επιλύει φαινομενικά μη ασήμαντες εργασίες. Όλα τα παραδείγματα θα παρουσιαστούν στο πλαίσιο του Django. Ωστόσο, τα περισσότερα από αυτά θα μπορούσαν εύκολα να μεταφερθούν σε άλλα πλαίσια Python (Flask, Pyramid).



Η διάταξη του έργου δημιουργήθηκε από το Cookiecutter Django ; Ωστόσο, κράτησα μόνο μερικές εξαρτήσεις που, κατά τη γνώμη μου, διευκολύνουν την ανάπτυξη και την προετοιμασία αυτών των περιπτώσεων χρήσης. Κατάργησα επίσης περιττές ενότητες για αυτήν την ανάρτηση και εφαρμογές για να μειώσω τον θόρυβο και να κάνω τον κώδικα πιο κατανοητό.

- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
  • celery_uncovered/{toyex,tricks,advex} περιέχει διαφορετικές εφαρμογές που θα καλύψουμε σε αυτήν την ανάρτηση. Κάθε εφαρμογή περιέχει ένα σύνολο παραδειγμάτων που οργανώνονται από το απαιτούμενο επίπεδο κατανόησης σέλινου.
  • celery_uncovered/celery.py ορίζει μια παρουσία σέλινου.

Αρχείο: celery_uncovered/celery.py:



from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()

Τότε πρέπει να διασφαλίσουμε ότι το Σέλινο θα ξεκινήσει μαζί με τον Django. Για αυτόν τον λόγο, εισάγουμε την εφαρμογή σε celery_uncovered/__init__.py.

Αρχείο: celery_uncovered/__init__.py:

from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])

config/settings είναι η πηγή διαμόρφωσης για την εφαρμογή μας και το Σέλινο. Ανάλογα με το περιβάλλον εκτέλεσης, το Django θα ξεκινήσει τις αντίστοιχες ρυθμίσεις: local.py για ανάπτυξη ή test.py για δοκιμή. Μπορείτε επίσης να ορίσετε το δικό σας περιβάλλον εάν θέλετε, δημιουργώντας μια νέα μονάδα python (π.χ. prod.py). Οι διαμορφώσεις σέλινου προτίθενται με CELERY_. Για αυτήν την ανάρτηση, διαμόρφωσα το RabbitMQ ως μεσίτη και το SQLite ως αποτέλεσμα bac-end.

Αρχείο: config/local.py:

CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest: [email protected] :5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'

Σενάριο 1 - Δημιουργία και εξαγωγή αναφορών

Η πρώτη περίπτωση που θα καλύψουμε είναι η δημιουργία αναφορών και η εξαγωγή. Σε αυτό το παράδειγμα, θα μάθετε πώς να ορίζετε μια εργασία που παράγει μια αναφορά CSV και να την προγραμματίζετε σε τακτά διαστήματα με σέλινο .

Χρησιμοποιήστε την περιγραφή περίπτωσης: πάρτε τα πεντακόσια πιο καυτά αποθετήρια από το GitHub ανά επιλεγμένη περίοδο (ημέρα, εβδομάδα, μήνας), ομαδοποιήστε τα ανά θέματα και εξαγάγετε το αποτέλεσμα στο αρχείο CSV.

Εάν παρέχουμε μια υπηρεσία HTTP που θα εκτελέσει αυτήν τη λειτουργία που ενεργοποιείται κάνοντας κλικ σε ένα κουμπί με την ένδειξη 'Δημιουργία αναφοράς', η εφαρμογή θα σταματήσει και θα περιμένει την ολοκλήρωση της εργασίας προτού επιστρέψει μια απόκριση HTTP. Αυτό είναι κακό. Θέλουμε η εφαρμογή ιστού μας να είναι γρήγορη και δεν θέλουμε οι χρήστες μας να περιμένουν ενώ το back-end μας υπολογίζει τα αποτελέσματα. Αντί να περιμένουμε να παραχθούν τα αποτελέσματα, θα προτιμούσαμε να παρατάξουμε την εργασία σε διαδικασίες εργαζομένων μέσω μιας καταχωρημένης ουράς στο Σέλινο και να απαντήσουμε με ένα task_id στο μπροστινό μέρος. Τότε το front-end θα χρησιμοποιούσε το task_id για να υποβάλετε ερώτημα στο αποτέλεσμα της εργασίας με ασύγχρονο τρόπο (π.χ. AJAX) και θα ενημερώνει τον χρήστη με την πρόοδο της εργασίας. Τέλος, όταν ολοκληρωθεί η διαδικασία, τα αποτελέσματα μπορούν να χρησιμοποιηθούν ως αρχείο για λήψη μέσω HTTP.

Λεπτομέρειες εφαρμογής

Πρώτα απ 'όλα, ας αποσυνθέσουμε τη διαδικασία στις μικρότερες δυνατές μονάδες της και να δημιουργήσουμε έναν αγωγό:

  1. Λάτρεις είναι οι εργαζόμενοι που είναι υπεύθυνοι για τη λήψη αποθετηρίων από την υπηρεσία GitHub.
  2. ο Συγκεντρωτής είναι ο εργαζόμενος που είναι υπεύθυνος για την ενοποίηση των αποτελεσμάτων σε μία λίστα.
  3. ο Εισαγωγέας είναι ο εργαζόμενος που δημιουργεί αναφορές CSV για τα πιο καυτά αποθετήρια στο GitHub.
Ένας αγωγός των εργαζομένων της Celery Python
Σχήμα 2: Ένας αγωγός εργαζομένων με Σέλινο και Πύθωνα

Η λήψη αποθετηρίων είναι ένα αίτημα HTTP χρησιμοποιώντας το API αναζήτησης GitHub GET /search/repositories. Ωστόσο, υπάρχει ένας περιορισμός της υπηρεσίας GitHub API που πρέπει να αντιμετωπιστεί: Το API επιστρέφει έως και 100 αποθετήρια ανά αίτημα αντί για 500. Θα μπορούσαμε να στείλουμε πέντε αιτήματα ένα κάθε φορά, αλλά δεν θέλουμε να κρατήσουμε τον χρήστη μας σε αναμονή για πέντε μεμονωμένα αιτήματα δεδομένου ότι τα αιτήματα HTTP είναι μια δεσμευμένη λειτουργία I / O. Αντ 'αυτού, μπορούμε να εκτελέσουμε πέντε ταυτόχρονες αιτήσεις HTTP με την κατάλληλη παράμετρο σελίδας. Έτσι, η σελίδα θα βρίσκεται στο εύρος [1..5]. Ας ορίσουμε μια εργασία που ονομάζεται fetch_hot_repos/3 -> list στο toyex/tasks.py μονάδα μέτρησης:

Αρχείο: celery_uncovered/toyex/local.py

@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items

Λοιπόν fetch_hot_repos δημιουργεί ένα αίτημα στο GitHub API και ανταποκρίνεται στον χρήστη με μια λίστα αποθετηρίων. Λαμβάνει τρεις παραμέτρους που θα καθορίσουν το ωφέλιμο φορτίο των αιτημάτων μας:

  • since - Φιλτράρει τα αποθετήρια κατά την ημερομηνία δημιουργίας.
  • per_page - Αριθμός αποτελεσμάτων προς επιστροφή ανά αίτημα (περιορίζεται σε 100).
  • page —- Ζητούμενος αριθμός σελίδας (στο εύρος [1..5]).

Σημείωση: Για να χρησιμοποιήσετε το GitHub Search API, θα χρειαστείτε ένα κουπόνι OAuth για να περάσετε τους ελέγχους ελέγχου ταυτότητας. Στην περίπτωσή μας, αποθηκεύεται στις ρυθμίσεις στην ενότητα GITHUB_OAUTH.

Στη συνέχεια, πρέπει να ορίσουμε μια κύρια εργασία που θα είναι υπεύθυνη για τη συγκέντρωση αποτελεσμάτων και την εξαγωγή τους σε ένα αρχείο CSV: produce_hot_repo_report_task/2->filepath:

γρύλισμα vs gulp vs webpack

Αρχείο: celery_uncovered/toyex/local.py

@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)

Αυτή η εργασία χρησιμοποιεί celery.canvas.group για εκτέλεση πέντε ταυτόχρονων κλήσεων fetch_hot_repos/3. Αυτά τα αποτελέσματα περιμένουν και στη συνέχεια μειώνονται σε μια λίστα αντικειμένων αποθετηρίου. Στη συνέχεια, το σύνολο αποτελεσμάτων μας ομαδοποιείται ανά θέμα και τελικά εξάγεται σε ένα αρχείο CSV που δημιουργήθηκε κάτω από το MEDIA_ROOT/ Ευρετήριο.

Για να προγραμματίζετε την εργασία περιοδικά, ίσως θέλετε να προσθέσετε μια καταχώριση στη λίστα χρονοδιαγραμμάτων στο αρχείο διαμόρφωσης:

Αρχείο: config/local.py

from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }

Δοκιμάστε το

Για να ξεκινήσουμε και να δοκιμάσουμε πώς λειτουργεί η εργασία, πρώτα πρέπει να ξεκινήσουμε τη διαδικασία Σέλινου:

$ celery -A celery_uncovered worker -l info

Στη συνέχεια, πρέπει να δημιουργήσουμε το celery_uncovered/media/ Ευρετήριο. Στη συνέχεια, θα μπορείτε να δοκιμάσετε τη λειτουργικότητά του είτε μέσω Shell ή Celerybeat:

Κέλυφος :

from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)

Σέλινο :

# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info

Μπορείτε να παρακολουθήσετε τα αποτελέσματα στο MEDIA_ROOT/ Ευρετήριο.

Σενάριο 2 - Αναφορά στις Διακομιστής 500 Σφάλματα μέσω email

Μία από τις πιο συχνές περιπτώσεις χρήσης για το Σέλινο είναι η αποστολή ειδοποιήσεων μέσω email. Η ειδοποίηση μέσω ηλεκτρονικού ταχυδρομείου είναι μια σύνδεση χωρίς σύνδεση I / O που αξιοποιεί είτε έναν τοπικό διακομιστή SMTP είτε ένα SES τρίτου μέρους. Υπάρχουν πολλές περιπτώσεις χρήσης που περιλαμβάνουν αποστολή email και, για τα περισσότερα, ο χρήστης δεν χρειάζεται να περιμένει μέχρι να ολοκληρωθεί αυτή η διαδικασία προτού λάβει μια απόκριση HTTP. Γι 'αυτό προτιμάται η εκτέλεση τέτοιων εργασιών στο παρασκήνιο και η άμεση ανταπόκριση στον χρήστη.

Χρησιμοποιήστε την περιγραφή περίπτωσης: Αναφέρετε σφάλματα 50Χ στο email του διαχειριστή μέσω του σέλινου.

Οι Python και Django έχουν το απαραίτητο υπόβαθρο για να παίξουν καταγραφή συστήματος . Δεν θα βρω λεπτομέρειες για το πώς λειτουργεί πραγματικά η καταγραφή της Python. Ωστόσο, εάν δεν το έχετε δοκιμάσει ποτέ πριν ή χρειάζεστε νέα, διαβάστε την τεκμηρίωση του ενσωματωμένου ξύλευση μονάδα μέτρησης. Αυτό το θέλετε σίγουρα στο περιβάλλον παραγωγής σας. Ο Django έχει έναν ειδικό χειριστή logger ΔιαχειριστήςEmailHandler ότι στέλνει email στους διαχειριστές για κάθε μήνυμα καταγραφής που λαμβάνει.

Λεπτομέρειες εφαρμογής

Η κύρια ιδέα είναι να επεκταθεί το send_mail μέθοδος του AdminEmailHandler τάξη με τέτοιο τρόπο ώστε να μπορεί να στέλνει αλληλογραφία μέσω σέλινου. Αυτό θα μπορούσε να γίνει όπως φαίνεται στο παρακάτω σχήμα:

Χειρισμός email διαχειριστή με Celery & Python
Εικόνα 3: Χειρισμός email διαχειριστή με Celery και Python

Πρώτον, πρέπει να δημιουργήσουμε μια εργασία που ονομάζεται report_error_task που καλεί mail_admins με το παρεχόμενο θέμα και το μήνυμα:

Αρχείο: celery_uncovered/toyex/tasks.py

@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)

Στη συνέχεια, επεκτείνουμε το AdminEmailHandler έτσι ώστε να καλεί εσωτερικά μόνο την καθορισμένη εργασία σέλινου:

Αρχείο: celery_uncovered/toyex/admin_email.py

from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)

Τέλος, πρέπει να ρυθμίσουμε την καταγραφή. Η διαμόρφωση της σύνδεσης στο Django είναι αρκετά απλή. Αυτό που χρειάζεστε είναι να παρακάμψετε LOGGING έτσι ώστε η μηχανή καταγραφής να αρχίσει να χρησιμοποιεί ένα νέο χειριστή:

Αρχείο config/settings/local.py

LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }

Παρατηρήστε ότι σκόπιμα ρυθμίζω φίλτρα χειριστή require_debug_true για να δοκιμάσετε αυτήν τη λειτουργικότητα ενώ η εφαρμογή εκτελείται σε κατάσταση εντοπισμού σφαλμάτων.

Δοκιμάστε το

Για να το δοκιμάσω, ετοίμασα μια προβολή Django που εξυπηρετεί μια λειτουργία 'διαίρεση με μηδέν' στο localhost:8000/report-error. Πρέπει επίσης να ξεκινήσετε ένα κοντέινερ MailHog Docker για να ελέγξετε ότι το email έχει σταλεί.

$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)

Επιπλέον λεπτομέρειες

Ως εργαλείο δοκιμής αλληλογραφίας, έχω ρυθμίσει το MailHog και διαμόρφωσα την αλληλογραφία Django για να το χρησιμοποιώ για παράδοση SMTP. Υπάρχουν πολλοί τρόποι ανάπτυξη και εκτέλεση MailHog. Αποφάσισα να πάω με ένα κοντέινερ Docker. Μπορείτε να βρείτε τις λεπτομέρειες στο αντίστοιχο αρχείο README:

Αρχείο: docker/mailhog/README.md

$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025

Για να διαμορφώσετε την εφαρμογή σας ώστε να χρησιμοποιεί το MailHog, πρέπει να προσθέσετε τις ακόλουθες γραμμές στη διαμόρφωσή σας:

Αρχείο: config/settings/local.py

EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')

Πέρα από τις προεπιλεγμένες εργασίες σέλινου

Οι εργασίες σέλινου θα μπορούσαν να δημιουργηθούν από οποιαδήποτε δυνατότητα κλήσης. Από προεπιλογή, κάθε εργασία που καθορίζεται από τον χρήστη εγχέεται με celery.app.task.Task ως γονική (αφηρημένη) τάξη. Αυτή η τάξη περιέχει τη λειτουργικότητα εκτέλεσης εργασιών ασύγχρονα (μεταβίβασή της μέσω δικτύου σε εργαζόμενο σέλινου) ή συγχρονισμένα (για δοκιμαστικούς σκοπούς), δημιουργώντας υπογραφές και πολλά άλλα βοηθητικά προγράμματα. Στα επόμενα παραδείγματα, θα προσπαθήσουμε να επεκτείνουμε Celery.app.task.Task και στη συνέχεια χρησιμοποιήστε το ως βασική τάξη για να προσθέσετε μερικές χρήσιμες συμπεριφορές στις εργασίες μας.

Σενάριο 3 - Καταγραφή αρχείων ανά εργασία

Σε ένα από τα έργα μου, ανέπτυξα μια εφαρμογή που παρέχει στον τελικό χρήστη ένα εργαλείο τύπου Extract, Transform, Load (ETL) που μπόρεσε να καταπιεί και στη συνέχεια να φιλτράρει ένα τεράστιο ποσό ιεραρχικών δεδομένων. Το back-end χωρίστηκε σε δύο ενότητες:

  • Ενορχήστρωση αγωγού επεξεργασίας δεδομένων με Σέλινο
  • Επεξεργασία δεδομένων με Go

Το Σέλινο αναπτύχθηκε με μία παρουσία Celerybeat και περισσότερους από 40 εργαζόμενους. Υπήρχαν περισσότερες από είκοσι διαφορετικές εργασίες που συνθέτουν τις δραστηριότητες αγωγών και ενορχήστρωσης. Κάθε τέτοια εργασία μπορεί να αποτύχει σε κάποιο σημείο. Όλες αυτές οι αποτυχίες απορρίφθηκαν στο αρχείο καταγραφής κάθε εργαζόμενου. Σε κάποιο σημείο, άρχισε να καθίσταται άβολο να εντοπίσετε και να διατηρήσετε το επίπεδο σέλινου. Τελικά, αποφασίσαμε να απομονώσουμε το αρχείο καταγραφής εργασιών σε ένα αρχείο συγκεκριμένης εργασίας.

Χρησιμοποιήστε την περιγραφή περίπτωσης: Επεκτείνετε το σέλινο έτσι ώστε κάθε εργασία να καταγράφει την τυπική έξοδο και τα λάθη σε αρχεία

Το Celery παρέχει στις εφαρμογές Python μεγάλο έλεγχο του τι κάνει εσωτερικά. Στέλνει με ένα γνωστό πλαίσιο σημάτων. Οι εφαρμογές που χρησιμοποιούν το Σέλινο μπορούν να εγγραφούν σε μερικές από αυτές προκειμένου να αυξήσουν τη συμπεριφορά ορισμένων ενεργειών. Θα αξιοποιήσουμε σήματα επιπέδου εργασίας για να παρέχουμε λεπτομερή παρακολούθηση μεμονωμένων κύκλων ζωής εργασιών. Το σέλινο έρχεται πάντα με ένα back-end καταγραφής, και θα επωφεληθούμε από αυτό, ενώ θα ξεπεράσουμε λίγο σε μερικά σημεία για να πετύχουμε τους στόχους μας.

Λεπτομέρειες εφαρμογής

Το σέλινο υποστηρίζει ήδη την καταγραφή ανά εργασία. Για να αποθηκεύσετε σε ένα αρχείο, είναι απαραίτητο να αποστείλετε την έξοδο καταγραφής στην κατάλληλη θέση. Στην περίπτωσή μας, η σωστή θέση της εργασίας είναι ένα αρχείο που ταιριάζει με το όνομα της εργασίας. Στην παρουσίαση σέλινου, θα παρακάμψουμε την ενσωματωμένη διαμόρφωση καταγραφής με δυναμικά συμπεράσματα χειριστές καταγραφής. Μπορείτε να εγγραφείτε στο celeryd_after_setup σήμα και στη συνέχεια διαμορφώστε την καταγραφή του συστήματος εκεί:

πώς να συντάξετε την κατάσταση ταμειακών ροών από τον ισολογισμό

Αρχείο: celery_uncovered/toyex/celery_conf.py

@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + '.log')) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)

Παρατηρήστε ότι για κάθε εργασία που είναι καταχωρημένη στην εφαρμογή Σέλινο, δημιουργούμε έναν αντίστοιχο καταγραφέα με τον χειριστή της. Κάθε χειριστής είναι του τύπου logging.FileHandler και ως εκ τούτου κάθε τέτοια παρουσία λαμβάνει ένα όνομα αρχείου ως είσοδο. Το μόνο που χρειάζεστε για να ξεκινήσετε είναι να εισαγάγετε αυτήν την ενότητα στο celery_uncovered/celery.py στο τέλος του αρχείου:

import celery_uncovered.tricks.celery_conf

Ένας συγκεκριμένος καταγραφέας εργασιών θα μπορούσε να ληφθεί καλώντας get_task_logger(task_name). Για να γενικεύσετε μια τέτοια συμπεριφορά για κάθε εργασία, είναι απαραίτητο να επεκτείνετε ελαφρώς celery.current_app.Task με μερικές μεθόδους χρησιμότητας:

Αρχείο: celery_uncovered/tricks/celery_ext.py

τα ερωτήματα πολυμέσων για έναν ιστότοπο που χρησιμοποιεί αποκριτικό σχεδιασμό
class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)

Τώρα, στην περίπτωση κλήσης στο task.log_msg('Hello, my name is: %s', task.request.id), η έξοδος καταγραφής θα δρομολογηθεί στο αντίστοιχο αρχείο με το όνομα εργασίας.

Δοκιμάστε το

Για να ξεκινήσετε και να δοκιμάσετε πώς λειτουργεί αυτή η εργασία, ξεκινήστε πρώτα τη διαδικασία σέλινου:

$ celery -A celery_uncovered worker -l info

Τότε θα μπορείτε να δοκιμάσετε τη λειτουργικότητα μέσω της Shell:

from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)

Τέλος, για να δείτε το αποτέλεσμα, μεταβείτε στο celery_uncovered/logs και ανοίξτε το αντίστοιχο αρχείο καταγραφής που ονομάζεται celery_uncovered.tricks.tasks.add.log. Ενδέχεται να δείτε κάτι παρόμοιο όπως παρακάτω μετά την εκτέλεση αυτής της εργασίας πολλές φορές:

Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...

Σενάριο 4 - Εργασίες Scope-Aware

Ας φανταστούμε μια εφαρμογή Python για διεθνείς χρήστες που βασίζεται στο Celery και στο Django. Οι χρήστες μπορούν να ορίσουν σε ποια γλώσσα (τοπικές ρυθμίσεις) χρησιμοποιούν την εφαρμογή σας.

Πρέπει να σχεδιάσετε ένα πολύγλωσσο, ενημερωτικό σύστημα ειδοποιήσεων μέσω ηλεκτρονικού ταχυδρομείου. Για να στείλετε ειδοποιήσεις μέσω email, έχετε καταχωρίσει μια ειδική εργασία σέλινου που αντιμετωπίζεται από μια συγκεκριμένη ουρά. Αυτή η εργασία λαμβάνει ορισμένα βασικά ορίσματα ως εισαγωγή και μια τρέχουσα τοποθεσία χρήστη, ώστε να αποστέλλεται μήνυμα ηλεκτρονικού ταχυδρομείου στην επιλεγμένη γλώσσα του χρήστη.

Τώρα φανταστείτε ότι έχουμε πολλές τέτοιες εργασίες, αλλά κάθε μία από αυτές τις εργασίες δέχεται ένα επιχείρημα τοπικής ρύθμισης. Σε αυτήν την περίπτωση, δεν θα ήταν καλύτερο να το λύσουμε σε υψηλότερο επίπεδο αφαίρεσης; Εδώ, βλέπουμε ακριβώς πώς να το κάνουμε αυτό.

Χρησιμοποιήστε την περιγραφή περίπτωσης: Κληρονομεί αυτόματα το εύρος από ένα περιβάλλον εκτέλεσης και εισάγει το στο τρέχον πλαίσιο εκτέλεσης ως παράμετρο.

Λεπτομέρειες εφαρμογής

Και πάλι, όπως κάναμε με την καταγραφή εργασιών, θέλουμε να επεκτείνουμε μια βασική κατηγορία εργασιών celery.current_app.Task και να παρακάμψετε μερικές μεθόδους που είναι υπεύθυνες για την κλήση εργασιών. Για τους σκοπούς αυτής της επίδειξης, παρακάμπτω το celery.current_app.Task::apply_async μέθοδος. Υπάρχουν επιπλέον εργασίες για αυτήν την ενότητα που θα σας βοηθήσουν να δημιουργήσετε μια πλήρως λειτουργική αντικατάσταση.

Αρχείο: celery_uncovered/tricks/celery_ext.py

class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)

Το βασικό στοιχείο είναι να μεταβιβάσετε την τρέχουσα τοπική ρύθμιση ως όρισμα κλειδιού-τιμής σε μια εργασία κλήσης από προεπιλογή. Εάν μια εργασία κλήθηκε με μια συγκεκριμένη τοποθεσία ως επιχείρημα, τότε είναι αμετάβλητη.

Δοκιμάστε το

Για να δοκιμάσουμε αυτήν τη λειτουργικότητα, ας καθορίσουμε μια εικονική εργασία τύπου ScopeBasedTask. Εντοπίζει ένα αρχείο ανά τοπικό αναγνωριστικό και διαβάζει το περιεχόμενό του ως JSON:

Αρχείο: celery_uncovered/tricks/tasks.py

@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ['locales', 'sc_%i.json' % kwargs['scenario_id']] return read_fixture(*fixture_parts)

Τώρα αυτό που πρέπει να κάνετε είναι να επαναλάβετε τα βήματα της εκκίνησης του σέλινου, της εκκίνησης του κελύφους και της δοκιμής εκτέλεσης αυτής της εργασίας σε διαφορετικά σενάρια. Τα φωτιστικά βρίσκονται κάτω από το celery_uncovered/tricks/fixtures/locales/ Ευρετήριο.

συμπέρασμα

Αυτή η ανάρτηση είχε ως στόχο να εξερευνήσει το Σέλινο από διαφορετικές οπτικές γωνίες. Δείξαμε το Σέλινο σε συμβατικά παραδείγματα όπως αλληλογραφία και δημιουργία αναφορών, καθώς και κοινά κόλπα σε μερικές ενδιαφέρουσες επιχειρηματικές περιπτώσεις. Το σέλινο βασίζεται σε μια φιλοσοφία που βασίζεται σε δεδομένα και η ομάδα σας μπορεί να κάνει τη ζωή τους πολύ πιο απλή, παρουσιάζοντάς την ως μέρος της στοίβας συστήματος. Η ανάπτυξη υπηρεσιών με βάση το σέλινο δεν είναι πολύ περίπλοκη εάν έχετε βασική εμπειρία στο Python και θα πρέπει να μπορείτε να την αποκτήσετε αρκετά γρήγορα. Η προεπιλεγμένη διαμόρφωση είναι αρκετά καλή για τις περισσότερες χρήσεις, αλλά εάν απαιτείται, μπορεί να είναι πολύ ευέλικτη.

Η ομάδα μας έκανε την επιλογή να χρησιμοποιήσει το Σέλινο ως παρασκήνιο ενορχήστρωσης για εργασίες στο παρασκήνιο και μακροχρόνιες εργασίες. Το χρησιμοποιούμε εκτενώς για μια ποικιλία περιπτώσεων χρήσης, εκ των οποίων μόνο λίγες αναφέρθηκαν σε αυτήν την ανάρτηση. Λαμβάνουμε και αναλύουμε gigabytes δεδομένων κάθε μέρα, αλλά αυτή είναι μόνο η αρχή των οριζόντιων τεχνικών κλιμάκωσης.

Κατανόηση των βασικών

Τι είναι το σέλινο για Python;

Το σέλινο είναι ένας από τους πιο δημοφιλείς διαχειριστές υποβάθρου στον κόσμο της Python. Το σέλινο είναι συμβατό με αρκετούς μεσίτες μηνυμάτων όπως το RabbitMQ ή το Redis και μπορεί να λειτουργήσει τόσο ως παραγωγός όσο και ως καταναλωτής.

Τι είναι το Pub-Sub;

Το μοτίβο δημοσίευσης-εγγραφής (ή παραγωγού-καταναλωτή) είναι ένα μοτίβο κατανεμημένων μηνυμάτων σε συστήματα υπολογιστών όπου οι εκδότες μεταδίδουν μηνύματα μέσω ενός μεσίτη μηνυμάτων και οι συνδρομητές ακούνε τα μηνύματα. Και τα δύο μπορούν να είναι μεμονωμένα συστατικά του συστήματος, ούτε επίγνωση ούτε σε άμεση επικοινωνία με το άλλο.

Ομιλίες σχεδιασμού: Έρευνα σε δράση με την ερευνητή UX Caitria O'Neill

Σχεδιασμός Ux

Ομιλίες σχεδιασμού: Έρευνα σε δράση με την ερευνητή UX Caitria O'Neill
Senior Back-end Engineer, Talent Leads Team

Senior Back-end Engineer, Talent Leads Team

Αλλα

Δημοφιλείς Αναρτήσεις
Scaling Scala: Τρόπος Dockerize χρησιμοποιώντας Kubernetes
Scaling Scala: Τρόπος Dockerize χρησιμοποιώντας Kubernetes
Μείωση του κόστους σε ένα ψηφιακό μέλλον πετρελαίου και φυσικού αερίου
Μείωση του κόστους σε ένα ψηφιακό μέλλον πετρελαίου και φυσικού αερίου
Το GWT Toolkit: Δημιουργήστε ισχυρές διεπαφές JavaScript χρησιμοποιώντας Java
Το GWT Toolkit: Δημιουργήστε ισχυρές διεπαφές JavaScript χρησιμοποιώντας Java
Επισκόπηση των δημοφιλών δημιουργών στατικών ιστότοπων
Επισκόπηση των δημοφιλών δημιουργών στατικών ιστότοπων
Γνωρίστε το Volt, ένα πολλά υποσχόμενο Ruby Framework για δυναμικές εφαρμογές
Γνωρίστε το Volt, ένα πολλά υποσχόμενο Ruby Framework για δυναμικές εφαρμογές
 
Οι μεγάλες ερωτήσεις οδηγούν σε εξαιρετικό σχεδιασμό - Ένας οδηγός για τη διαδικασία σκέψης σχεδιασμού
Οι μεγάλες ερωτήσεις οδηγούν σε εξαιρετικό σχεδιασμό - Ένας οδηγός για τη διαδικασία σκέψης σχεδιασμού
Η Ψυχολογία του Σχεδιασμού και η Νευροεπιστήμη του Amazing UX
Η Ψυχολογία του Σχεδιασμού και η Νευροεπιστήμη του Amazing UX
APIs στα κοινωνικά δίκτυα: Η διαδικτυακή πύλη στον πραγματικό κόσμο
APIs στα κοινωνικά δίκτυα: Η διαδικτυακή πύλη στον πραγματικό κόσμο
Οδηγός επένδυσης Family Office: Μια εναλλακτική λύση στο επιχειρηματικό κεφάλαιο
Οδηγός επένδυσης Family Office: Μια εναλλακτική λύση στο επιχειρηματικό κεφάλαιο
Αρχές Σχεδιασμού - Εισαγωγή στην Οπτική Ιεραρχία
Αρχές Σχεδιασμού - Εισαγωγή στην Οπτική Ιεραρχία
Δημοφιλείς Αναρτήσεις
  • τι είναι ένα hedge fund για οικογενειακό γραφείο
  • Η προθυμία πληρωμής είναι ένας τρόπος μέτρησης της ελαστικότητας της ζήτησης.
  • Επιδόσεις php 7 έναντι java
  • llc s corp ή c corp
  • πώς να φτιάξετε ένα επεξηγητικό βίντεο στα εφέ
  • οπτικό σχέδιο εναντίον γραφιστικής
  • πώς να δημιουργήσετε τη δική σας γλώσσα κωδικοποίησης
Κατηγορίες
  • Επενδυτές & Χρηματοδότηση
  • Σχεδιασμός Διεπαφής Χρήστη
  • Τεχνολογία
  • Διαχείριση Έργου
  • © 2022 | Ολα Τα Δικαιώματα Διατηρούνται

    portaldacalheta.pt