Nykyaikaiset verkkosovellukset ja niiden taustalla olevat järjestelmät ovat nopeampi ja reagoivampi kuin koskaan ennen. On kuitenkin edelleen monia tapauksia, joissa haluat ladata raskaan tehtävän suorittamisen muille koko järjestelmän arkkitehtuurin osille sen sijaan, että käsittelisit niitä pääkehässäsi. Tällaisten tehtävien tunnistaminen on yhtä yksinkertaista kuin tarkistaa, kuuluvatko ne johonkin seuraavista luokista:
Selkeä ratkaisu taustatehtävän suorittamiseen olisi sen suorittaminen erillisessä säikeessä tai prosessissa. Python on korkean tason Turingin täydellinen ohjelmointikieli, joka valitettavasti ei tarjoa sisäänrakennettua samanaikaisuutta asteikolla, joka vastaa Erlangin, Go: n, Java: n, Scalan tai Akkan tasoa. Ne perustuvat Tony Hoaren kommunikoiviin peräkkäisiin prosesseihin ( CSP ). Python-säikeitä puolestaan koordinoi ja ajoittaa yleinen tulkkilukko ( GIL ), joka estää useita alkuperäisiä ketjuja suorittamasta Python-tavukoodeja kerralla. GIL: stä pääseminen on paljon keskustelua Python-kehittäjät , mutta se ei ole tämän artikkelin keskipiste. Pythonin samanaikainen ohjelmointi on vanhanaikaista, vaikka olet tervetullut lukemaan siitä Pythonin monisäikeinen opetusohjelma kaveri ApeeScapeer Marcus McCurdy. Joten prosessien välisen kommunikaation suunnittelu on johdonmukaista virhealtista prosessia ja johtaa koodin kytkentään ja huonoon järjestelmän ylläpidettävyyteen, puhumattakaan siitä, että se vaikuttaa negatiivisesti skaalautuvuuteen. Lisäksi Python-prosessi on normaali prosessi käyttöjärjestelmässä (OS), ja siitä tulee koko Python-standardikirjaston kanssa raskaansarjan. Kun sovelluksen prosessien määrä kasvaa, siirtymisestä tällaisesta prosessista toiseen tulee aikaa vievää toimintaa.
Ymmärtääksesi samanaikaisuuden Pythonin kanssa paremmin, katso tätä uskomatonta David Beazleyn puhetta osoitteessa PyCon’15 .
Paljon parempi ratkaisu on palvella a jaettu jono tai sen tunnettua sisarusparadigmaa kutsutaan julkaise-tilaa . Kuten kuvassa 1 on esitetty, on olemassa kahden tyyppisiä sovelluksia, joissa yksi, nimeltään kustantaja , lähettää viestejä ja toinen, nimeltään tilaaja , vastaanottaa viestejä. Nämä kaksi agenttia eivät ole vuorovaikutuksessa keskenään suoraan eivätkä edes ole tietoisia toisistaan. Kustantajat lähettävät viestejä keskeiseen jonoon tai välittäjä ja tilaajat saavat kiinnostavia viestejä tältä välittäjältä. Tässä menetelmässä on kaksi pääetua:
On olemassa paljon viestijärjestelmiä, jotka tukevat sellaisia paradigmoja ja tarjoavat siistin API: n, jota ohjaavat joko TCP- tai HTTP-protokollat, esim. JMS, RabbitMQ, Redis Pub / Sub, Apache ActiveMQ jne.
Selleri on yksi suosituimmista taustatyöntekijöistä Python-maailmassa. Selleri on yhteensopiva useiden viestinvälittäjien kanssa, kuten RabbitMQ tai Redis, ja se voi toimia sekä tuottajana että kuluttajana.
Selleri on asynkroninen tehtäväjono / työjono, joka perustuu hajautettuun viestien välitykseen. Se on keskittynyt reaaliaikaisiin operaatioihin, mutta tukee myös ajoitusta. Suoritusyksiköt, joita kutsutaan tehtäviksi, suoritetaan samanaikaisesti yhdellä tai useammalla työntekijäpalvelimella moniprosessointia käyttäen, Eventlet tai tuuletettu . Tehtävät voidaan suorittaa asynkronisesti (taustalla) tai synkronisesti (odota, kunnes se on valmis). - Selleri-projekti
Noudata Sellerin käytön aloittamista vain vaiheittaisten ohjeiden avulla viralliset asiakirjat .
Tämän artikkelin painopiste on antaa sinulle hyvä käsitys siitä, mitä käyttötapauksia Selleri voi kattaa. Tässä artikkelissa näytämme paitsi mielenkiintoisia esimerkkejä myös yritämme oppia käyttämään Selleriä reaalimaailman tehtäviin, kuten taustapostitukseen, raporttien luomiseen, kirjaamiseen ja virheraportointiin. Jaan tapani testata tehtäviä emuloinnin ulkopuolella ja lopuksi esitän muutamia temppuja, joita ei ole (hyvin) dokumentoitu virallisissa asiakirjoissa, minkä löytäminen kesti tuntikausia itselleni.
Jos sinulla ei ole aikaisempaa kokemusta selleristä, suosittelen ensin kokeilemaan sitä virallisen opetusohjelman mukaisesti.
Jos tämä artikkeli kiehtoo sinua ja saa sinut sukeltamaan koodiin heti, noudata tätä GitHub-arkisto tässä artikkelissa käytetylle koodille. README
tiedosto antaa sinulle nopean ja likainen lähestymistapa juoksemiseen ja pelaamiseen esimerkkisovelluksilla.
Ensinnäkin käymme läpi käytännön esimerkkejä, jotka osoittavat lukijalle kuinka yksinkertaisesti ja tyylikkäästi Selleri ratkaisee näennäisesti ei-triviaalit tehtävät. Kaikki esimerkit esitetään Django-puitteissa; Suurin osa niistä voidaan kuitenkin helposti siirtää muihin Python-kehyksiin (pullo, pyramidi).
Projektin asettelun loi Keitinleikkuri Django ; Pidin kuitenkin vain muutamia riippuvuuksia, jotka mielestäni helpottavat näiden käyttötapausten kehittämistä ja valmistelua. Poistin myös tarpeettomat moduulit tälle viestille ja sovelluksille melun vähentämiseksi ja koodin ymmärtämisen helpottamiseksi.
- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
celery_uncovered/{toyex,tricks,advex}
sisältää erilaisia sovelluksia, joita käsittelemme tässä viestissä. Jokainen sovellus sisältää joukon esimerkkejä vaaditun selleri-tason ymmärtämisen mukaan.celery_uncovered/celery.py
määrittelee selleri-esiintymän. Tiedosto: celery_uncovered/celery.py
:
from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
Sitten meidän on varmistettava, että Selleri alkaa yhdessä Djangon kanssa. Siksi tuomme sovelluksen kansioon celery_uncovered/__init__.py
.
Tiedosto: celery_uncovered/__init__.py
:
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])
config/settings
on sovelluksen ja sellerin määritysten lähde. Suoritusympäristöstä riippuen Django käynnistää vastaavat asetukset: local.py
kehitykseen tai test.py
kokeiluun. Voit myös määrittää oman ympäristön, jos haluat, luomalla uuden python-moduulin (esim. prod.py
). Selleri kokoonpanoissa on etuliite CELERY_
. Tätä viestiä varten määritin RabbitMQ: n välittäjäksi ja SQLiten tulokseksi bac-end.
Tiedosto: config/local.py
:
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest: [email protected] :5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'
Ensimmäinen tapaus on raporttien luominen ja vienti. Tässä esimerkissä opit määrittelemään tehtävän, joka tuottaa CSV-raportin, ja aikatauluttamaan sen säännöllisin väliajoin selleri .
Käyttötapauksen kuvaus: hae viisisataa kuuminta arkistoa GitHubista valittua jaksoa (päivä, viikko, kuukausi) kohden, ryhmitä ne aiheiden mukaan ja vie tulos CSV-tiedostoon.
Jos tarjoamme HTTP-palvelun, joka suorittaa tämän toiminnon, joka käynnistetään napsauttamalla painiketta Luo raportti, sovellus pysähtyy ja odottaa tehtävän suorittamista ennen kuin se lähettää HTTP-vastauksen takaisin. Tämä on huono. Haluamme, että verkkosovelluksemme on nopea, emmekä halua käyttäjien odottavan, kun käyttöliittymämme laskee tulokset. Sen sijaan, että odottaisimme tulosten tuottamista, haluaisimme jonottaa tehtävän työntekijän prosesseihin rekisteröidyn jonon kautta Selleriin ja vastaamaan task_id
etupäähän. Sitten käyttöliittymä käyttäisi task_id
kyselemään tehtävän tulosta asynkronisesti (esim. AJAX) ja pitää käyttäjän ajan tasalla tehtävän etenemisestä. Lopuksi, kun prosessi on valmis, tulokset voidaan näyttää tiedostona, jonka voi ladata HTTP: n kautta.
Hajotetaan ensin prosessi pienimpiin mahdollisiin yksiköihin ja luodaan putki:
Arkistojen hakeminen on HTTP-pyyntö, joka käyttää GitHub-hakusovellusliittymä GET /search/repositories
. GitHub API -palvelussa on kuitenkin rajoituksia, jotka tulisi käsitellä: API palauttaa jopa 100 arkistoa per pyyntö 500 sijasta. Voisimme lähettää viisi pyyntöä yksi kerrallaan, mutta emme halua pitää käyttäjää odottamassa viidelle yksittäiselle pyynnölle, koska HTTP-pyynnöt ovat I / O-sidottu operaatio. Sen sijaan voimme suorittaa viisi samanaikaista HTTP-pyyntöä sopivalla sivuparametrilla. Joten sivu on alueella [1..5]. Määritellään tehtävä nimeltään fetch_hot_repos/3 -> list
toyex/tasks.py
moduuli:
Tiedosto: celery_uncovered/toyex/local.py
@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items
Joten fetch_hot_repos
luo pyynnön GitHub-sovellusliittymään ja vastaa käyttäjälle luettelolla arkistoja. Se vastaanottaa kolme parametria, jotka määrittelevät pyynnön hyötykuorman:
since
- Suodattaa arkistot luomispäivänä.per_page
- Palautettavien tulosten määrä per pyyntö (rajoitettu 100: lla).page
—- Pyydetty sivunumero (alueella [1..5]). Huomautus: Jotta voit käyttää GitHub Search -sovellusliittymää, tarvitset OAuth-tunnuksen todentamistarkistusten läpäisemiseksi. Meidän tapauksessamme se tallennetaan asetuksiin kohdassa GITHUB_OAUTH
.
Seuraavaksi meidän on määriteltävä päätehtävä, joka on vastuussa tulosten yhdistämisestä ja viennistä CSV-tiedostoon: produce_hot_repo_report_task/2->filepath:
Tiedosto: celery_uncovered/toyex/local.py
@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)
Tämä tehtävä käyttää celery.canvas.group
suorittaa viisi samanaikaista puhelua fetch_hot_repos/3
Tuloksia odotetaan ja supistetaan sitten luetteloksi arkistoobjekteista. Sitten tulosjoukkomme ryhmitellään aiheen mukaan ja viedään lopuksi luotuun CSV-tiedostoon MEDIA_ROOT/
-kohdassa hakemistoon.
Voit ajoittaa tehtävän säännöllisesti, haluat ehkä lisätä merkinnän määritystiedoston aikataululuetteloon:
Tiedosto: config/local.py
from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }
Jotta voimme käynnistää ja testata tehtävän toiminnan, meidän on ensin aloitettava selleri-prosessi:
$ celery -A celery_uncovered worker -l info
Seuraavaksi meidän on luotava celery_uncovered/media/
hakemistoon. Sitten voit testata sen toimivuutta joko Shellin tai Celerybeatin kautta:
Kuori :
from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)
Selleri :
# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info
Voit katsella tuloksia kohdassa MEDIA_ROOT/
hakemistoon.
Yksi sellerin yleisimmistä käyttötapauksista on sähköposti-ilmoitusten lähettäminen. Sähköposti-ilmoitus on offline-I / O-sidottu operaatio, joka hyödyntää joko paikallista SMTP-palvelinta tai kolmannen osapuolen SES-palvelua. On monia käyttötapauksia, joihin liittyy sähköpostin lähettäminen, ja useimmissa tapauksissa käyttäjän ei tarvitse odottaa tämän prosessin päättymistä ennen kuin hän saa HTTP-vastauksen. Siksi on suositeltavaa suorittaa tällaiset tehtävät taustalla ja vastata käyttäjälle välittömästi.
Käyttötapauksen kuvaus: Ilmoita 50X-virheistä järjestelmänvalvojan sähköpostille Celeryn kautta.
Pythonilla ja Djangolla on tarvittava tausta esiintymiseen järjestelmän kirjaaminen . En pääse yksityiskohtiin siitä, miten Pythonin kirjaaminen todella toimii. Jos et kuitenkaan ole koskaan kokeillut sitä aiemmin tai tarvitset päivitystä, lue sisäänrakennetun laitteen ohjeet puunkorjuu moduuli. Haluat ehdottomasti tämän tuotantoympäristössäsi. Djangolla on erityinen hakkuukoneenkäsittelijä JärjestelmänvalvojaEmailKäsittelijä joka lähettää järjestelmänvalvojille sähköpostia jokaisesta vastaanotetusta lokiviestistä.
Pääajatuksena on laajentaa send_mail
menetelmä AdminEmailHandler
luokassa siten, että se voisi lähettää sähköpostia sellerin kautta. Tämä voidaan tehdä alla olevan kuvan mukaisesti:
Ensinnäkin meidän on määritettävä tehtävä nimeltään report_error_task
joka soittaa mail_admins
annettu aihe ja viesti:
Tiedosto: celery_uncovered/toyex/tasks.py
@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)
Seuraavaksi laajennamme AdminEmailHandler-sovellusta siten, että se kutsuu sisäisesti vain määriteltyä selleri-tehtävää:
Tiedosto: celery_uncovered/toyex/admin_email.py
from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)
Lopuksi meidän on määritettävä kirjaaminen. Djangossa puunkorjuun määritys on melko yksinkertaista. Tarvitset ohittaa LOGGING
siten, että hakkuukone käyttää uutta määriteltyä käsittelijää:
Tiedosto config/settings/local.py
LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }
Huomaa, että olen asettanut tarkoituksella käsittelijän suodattimet require_debug_true
tämän toiminnon testaamiseksi sovelluksen ollessa käynnissä virheenkorjaustilassa.
Sen testaamiseksi valmistin Django-näkymän, joka palvelee 'jakaminen nollalla' -toimintoa kohdassa localhost:8000/report-error
Sinun on myös käynnistettävä MailHog Docker -säiliö testataksesi, että sähköposti todella lähetetään.
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)
Postin testaustyökaluna perustin MailHogin ja määritin Django-postituksen käyttämään sitä SMTP-toimitukseen. On monia tapoja ottaa käyttöön ja suorittaa MailHog. Päätin mennä Docker-kontin kanssa. Löydät tiedot vastaavasta README-tiedostosta:
Tiedosto: docker/mailhog/README.md
$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025
Jos haluat määrittää sovelluksesi käyttämään MailHogia, sinun on lisättävä seuraavat rivit kokoonpanoon:
Tiedosto: config/settings/local.py
EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')
Selleri-tehtävät voidaan luoda kaikista kutsuttavista toiminnoista. Oletusarvoisesti kaikille käyttäjän määrittelemille tehtäville syötetään celery.app.task.Task
vanhempana (abstrakti) luokka. Tämä luokka sisältää tehtävien suorittamisen asynkronisesti (sen siirtämisen verkon kautta Selleri-työntekijälle) tai synkronoinnin (testausta varten), allekirjoitusten luomisen ja monia muita apuohjelmia. Seuraavissa esimerkeissä yritämme laajentaa Celery.app.task.Task
ja käytä sitä sitten perusluokkana muutaman hyödyllisen käyttäytymisen lisäämiseksi tehtäviimme.
Yhdessä projektissani kehitin sovellusta, joka tarjoaa loppukäyttäjälle Pura, muunna, lataa (ETL) -työkalun, joka pystyi nielemään ja suodattamaan valtavan määrän hierarkkisia tietoja. Taustapää jaettiin kahteen moduuliin:
Selleri otettiin käyttöön yhden selleri-ilmentymän ja yli 40 työntekijän kanssa. Oli yli kaksikymmentä erilaista tehtävää, jotka koostuivat putki- ja orkestrointitoiminnoista. Jokainen tällainen tehtävä voi epäonnistua jossain vaiheessa. Kaikki nämä viat vietiin jokaisen työntekijän järjestelmälokiin. Jossain vaiheessa Sellerikerroksen virheenkorjaus ja ylläpito alkoi olla hankalaa. Lopulta päätimme eristää tehtävälokin tehtäväkohtaiseen tiedostoon.
Käyttötapauksen kuvaus: Laajenna selleri niin, että jokainen tehtävä kirjaa vakiotuloksensa ja virheensä tiedostoihin
Selleri tarjoaa Python-sovelluksille erinomaisen hallinnan siitä, mitä se tekee sisäisesti. Se toimitetaan tutulla signaalikehyksellä. Selleriä käyttävät sovellukset voivat tilata muutamia niistä tiettyjen toimintojen käyttäytymisen lisäämiseksi. Aiomme hyödyntää tehtävätason signaaleja yksittäisten tehtävien elinkaarien tarkan seurannan tarjoamiseksi. Selleri on aina mukana hakkuupäässä, ja aiomme hyödyntää sitä samalla, kun vain muutamissa paikoissa ohitamme tavoitteidemme saavuttamiseksi.
Selleri tukee jo kirjaamista tehtävää kohti. Tiedostoon tallentaminen edellyttää lokilähdön lähettämistä oikeaan paikkaan. Meidän tapauksessamme tehtävän oikea sijainti on tiedosto, joka vastaa tehtävän nimeä. Selleri-ilmentymässä ohitamme sisäänrakennetut lokikokoonpanot dynaamisesti pääteltyjen lokien käsittelijöiden kanssa. Voit tilata celeryd_after_setup
ja määritä sitten järjestelmän kirjaaminen sinne:
Tiedosto: celery_uncovered/toyex/celery_conf.py
@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + '.log')) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)
Huomaa, että rakennamme jokaiselle selleri-sovelluksessa rekisteröidylle tehtävälle vastaavan puunkorjuun sen käsittelijällä. Jokainen käsittelijä on tyyppiä logging.FileHandler
, ja siksi kukin tällainen esiintymä vastaanottaa tiedostonimen syötteenä. Ainoa mitä tarvitset tämän käynnissä olevan, on tuoda tämä moduuli celery_uncovered/celery.py
tiedoston lopussa:
import celery_uncovered.tricks.celery_conf
Tietyn tehtävän kirjaajan voisi vastaanottaa soittamalla get_task_logger(task_name)
. Tällaisen käyttäytymisen yleistämiseksi jokaisessa tehtävässä on tarpeen laajentaa celery.current_app.Task
muutamalla hyödyllisyysmenetelmällä:
Tiedosto: celery_uncovered/tricks/celery_ext.py
class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)
Nyt, kun kutsutaan task.log_msg('Hello, my name is: %s', task.request.id)
, lokilähtö reititetään vastaavaan tiedostoon tehtävän nimellä.
Käynnistä selleri-prosessi aloittaaksesi ja testataksesi tämän tehtävän toiminnan:
$ celery -A celery_uncovered worker -l info
Sitten voit testata toimivuutta Shellin kautta:
from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)
Lopuksi näet tuloksen siirtymällä kohtaan celery_uncovered/logs
ja avaa vastaava lokitiedosto nimeltä celery_uncovered.tricks.tasks.add.log
. Saatat nähdä jotain samanlaista kuin alla suoritettuasi tämän tehtävän useita kertoja:
Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...
Kuvitelkaamme Python-sovellusta kansainvälisille käyttäjille, joka on rakennettu Celerylle ja Djangolle. Käyttäjät voivat määrittää, millä kielellä (kielellä) he käyttävät sovellustasi.
Sinun on suunniteltava monikielinen, aluetietoinen sähköposti-ilmoitusjärjestelmä. Sähköposti-ilmoitusten lähettämiseksi olet rekisteröinyt erityisen selleri-tehtävän, jota hoitaa tietty jono. Tämä tehtävä saa joitain keskeisiä argumentteja syötteenä ja nykyisen käyttäjän kielen, jotta sähköposti lähetetään käyttäjän valitsemalla kielellä.
Kuvittele nyt, että meillä on monia tällaisia tehtäviä, mutta jokainen näistä tehtävistä hyväksyy paikallisen argumentin. Eikö tässä tapauksessa olisi parempi ratkaista se korkeammalla abstraktiotasolla? Täällä näemme, miten se tehdään.
Käyttötapauksen kuvaus: Peri laajuus automaattisesti yhdestä suorituskontekstista ja lisää se nykyiseen suorituskontekstiin parametrina.
Jälleen, kuten teimme tehtävälokien kanssa, haluamme laajentaa perustehtäväluokkaa celery.current_app.Task
ja ohittaa muutama tapa, joka on vastuussa tehtävien kutsumisesta. Tätä mielenosoitusta varten ohitan celery.current_app.Task::apply_async
menetelmä. Tässä moduulissa on ylimääräisiä tehtäviä, jotka auttavat sinua tuottamaan täysin toimivan korvaavan tuotteen.
Tiedosto: celery_uncovered/tricks/celery_ext.py
class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)
Avainne on siirtää nykyinen kieliasetus avainarvoargumenttina oletusarvoisesti kutsuvaan tehtävään. Jos tehtävää kutsutaan argumenttina tietyllä kielialueella, se ei muutu.
Testataksemme tämän toiminnon, määritetään määritetty näennäistehtävä, jonka tyyppi on ScopeBasedTask
Se etsii tiedoston aluetunnuksella ja lukee sen sisällön JSON: na.
Tiedosto: celery_uncovered/tricks/tasks.py
@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ['locales', 'sc_%i.json' % kwargs['scenario_id']] return read_fixture(*fixture_parts)
Nyt sinun on toistettava vaiheet sellerin käynnistämisen, kuoren käynnistämisen ja tämän tehtävän suorittamisen testaamiseksi eri tilanteissa. Valaisimet sijaitsevat celery_uncovered/tricks/fixtures/locales/
-kohdan alla hakemistoon.
Tämän viestin tarkoituksena oli tutkia selleriä eri näkökulmista. Esittelin selleriä tavanomaisissa esimerkeissä, kuten postitus ja raporttien luominen sekä jaettuja temppuja mielenkiintoisiin kapealla liiketoiminnan käyttötapauksiin. Selleri on rakennettu tietopohjaiseen filosofiaan, ja tiimisi voi tehdä heidän elämästään huomattavasti yksinkertaisemman ottamalla sen osaksi järjestelmässään. Selleri-pohjaisten palvelujen kehittäminen ei ole kovin monimutkaista, jos sinulla on perustiedot Pythonista, ja sinun pitäisi pystyä noutamaan se melko nopeasti. Oletuskokoonpano on tarpeeksi hyvä useimpiin käyttötarkoituksiin, mutta tarvittaessa ne voivat olla hyvin joustavia.
Tiimimme päätti käyttää Selleriä orkestroinnin taustana taustatehtäviin ja pitkäaikaisiin tehtäviin. Käytämme sitä laajasti erilaisiin käyttötapauksiin, joista tässä viestissä mainittiin vain harvat. Syömme ja analysoimme gigatavua dataa joka päivä, mutta tämä on vasta horisontaalisen skaalaustekniikan alku.
Selleri on yksi suosituimmista taustatyöntekijöistä Python-maailmassa. Selleri on yhteensopiva useiden viestinvälittäjien kanssa, kuten RabbitMQ tai Redis, ja se voi toimia sekä tuottajana että kuluttajana.
Julkaise-tilaa (tai tuottaja-kuluttaja) -malli on hajautettu viestintämalli tietokonejärjestelmissä, joissa julkaisijat lähettävät viestejä viestivälittäjän kautta ja tilaajat kuuntelevat viestejä. Molemmat voivat olla järjestelmän eristettyjä komponentteja, eivätkä ne ole tietoisia eikä suorassa yhteydessä toisen kanssa.