פרק קודם:
פייתון 25 - מקביליות ואסינכרוניות חלק אמה נלמד
- אסינכרוניות
- רכיבי פייתון - asyncio
- Awaitables: tasks,futures & gather
- Sleep & Timeouts
- Cancellation
- asyncio & Threads
- async iterators
AsyncIO
קוד מקבילי קיים כבר הרבה זמן - עם הזמן והשימוש בו מתכנתים עלו על כמה תבניות שחוזרות על עצמן:
- קוד שמחכה לסיום פעולה כלשהי.
- סנכרון שני קטעי קוד שצריכים לרוץ על אותו משאב
- יצירת לולאות או תורים
פייתון כמו שפות רבות אחרות זיהו את התבניות האלו ואיגדו אותם תחת ספרייה שנקראת asyncio
.
אסינכרוניות
לאסינכרוניות יש הרבה משמעויות כיום כאשר רובן מתרכזות בפרקטיקה.
זאת אומרת איך לבצע קוד לא חוסם.
כדי לכתוב קוד אסינכרוני טוב כדאי קודם כל להבין מהו קוד אסינכרוני.
קוד אסינכרוני
קוד אסינכרוני הוא עקרון שנותן למתזמן
כלשהו לתזמן את סדר פעולות הקריאה.
בדוגמא הבאה אנחנו מבצעים 3 פונקציות אסינכרוניות שונות.Action1Async
, Action2Async
, Action3Async
.
בזמן שהפונקציה הראשונה מחכה המתזמן יודע להריץ את הפונקציה השנייה,
וכאשר הפונקציה השנייה מחכה המתזמן יודע להריץ את הפונקציה השלישית.
על הקונספט הזה כבר למדנו - Coroutines
.
מהו Coroutine?
אלו פונקציות שעושות משימות רבות ללא חסימה בינהן.
בדוגמא למעלה למרות שאנחנו מחכים על פונקציה לסיים - המתזמן יודע לתזמן את הפונקציה הבאה בתור.
מיהו המתזמן?
לולאת אירועים - Event Loop
אם נזכר בפוסט - פייתון 24 - ג'נרטורים,
נראה שימוש בפונקצייה - ReadAllAsync
.
1 | def ReadFileAsync(name): |
זהו בעצם מימוש נאיבי מאוד של הקונספט ללולאת אירועים
.
לולאת אירועים כמו שהיא נשמעת היא מאוד פשוטה:
- מכילה אירוע אחד או יותר
- במקרה שיש יותר מאירוע אחד נעשה שימוש ב
תורים
(כמורשימה
) - לוקחת את האירוע האחרון
- לעבד את האירוע
- לחכות שוב לאירועים
הלולאה שמומשה בפונקציה ReadAllAsync
פועלת באופן הזה.
צריך גוף ללולאה
1
2while len(pending) > 0:
for gen in pending:הלולאה לוקחת את ה-“אירוע” האחרון
tasks[gen] = gen.send(tasks[gen])
במקרה הזה צריך לנקות את האירועים שהסתיימו:
1
2except StopIteration:
pending.remove(gen)
למזלנו לא צריך לממש לולאת אירועים משלנו כי - asyncio
נותנת לנו את זה!
Hello world - Async Await
בפרק 23 למדנו על - yield from
.
פייתון לקחו את הקונספט הזה ומימשו אותו מחדש עבור אסינכרוניות - קוד לא חוסם.
והוסיפו שתי מילים שמורות נוספות:
async
- מצהיר על פונקציה אסינכרוניתawait
- מחכה לפונקציה אסינכרונית שתסתיים.
כדי להשתמש ב-asyncio
יש לעשות import asyncio
ולהריץ פונקציה אסינכרונית ב- asyncio.run(...)
.
דוגמא בסיסית:
1 | import asyncio |
חייבים async
מה קורה כשאין async
?
בפייתון יש כמה סוגי משתנים חדשים שעוזרים לנו להתמודד עם קוד אסינכרוני.
הסוג הראשון כבר הכרנו - coroutine
.
כאשר אנחנו מתייגים פונקציה כ-async
פייתון יוצרת לנו coroutine
מאחורי הקלעים.
ככה הפונקציה asyncio.run(...)
יודעת להריץ את הקוד שלנו.
חייבים להחזיר אובייקט מתאים
מה קורה כשאנחנו לא קוראים לפונקציה אלה מעבירים אותה כפרמטר?
בדוגמא הזו אין סוגריים לפונקציה שאנחנו מעבירים ל-run
.
הפונקציה run
מקבלת coroutine
ולא פונקציה אחרת!
ולכן חייבים להחזיר לה coroutine
.
עצם הגדרת ה-async
בפונקציה אנחנו מחזירים coroutine
.
async
כדי להגדיר פונקציה כאסינכרונית מוסיפים async
לפונקציה.
פונקציה אסינכרונית היא למעשה coroutine
.
1 | async def MyAsyncFunc(): |
מה מהם הן coroutines
?
MyAsyncFunc
היא coroutine
.AnotherAsyncFunc
היא coroutine
.AsyncFuncInDisguise
היא לא coroutine
.
WhatAmI
מחזירה פונקציה ולא coroutine
.
לכן היא לא coroutine
.
WhatAmITwo
מחזירה את ערך ההחזרה של הפונקציה.
ולכן היא כן - coroutine
.
await
כדי לבצע coroutine או ניתן להחזיר אותם כמו שראינו ב-WhatAmITwo
או לבצע await
.
אחרת לא יוחזר ערך ההחזרה הנכון!
1 | import asyncio |
פונקציה מול אובייקט coroutine
coroutine function
- זוהי הפונקציה שמוגדרת בעזרת async def
.coroutine object
- זהו ה-coroutine שיוחזר מהקריאה ל-coroutine function
.
כמו שראינו בדוגמא הזו:
1 | async def MyAsyncFunc(): |
ניתן לומר גם awaitable object
או בקיצור awaitable
,
על כל אובייקט שניתן לשים עליו await
.
1 | async def Do(myObj): |
מבלי לדעת מהו myObj
ניתן לומר עליו שהוא awaitable
.
Future
asyncio
מתחלק לפונקציות ומחלקות ברמה “גבוה” וברמה “נמוכה”.
נלמד על רכיב ברמה הנמוכה שעליו דברים נכתבים, והוא ה-Future
.
באפליקציות אין צורך להשתמש ב-Future
ישירות.
Future
- אובייקט awaitable
הנותן לנו את התכונות הבאות:
- לאפשר לסיים או לבדוק אם התסיים.
- להחזיר ערך.
- לזרוק שגיאה.
- לבדוק ביטול או לבטל את ההרצה.
הוא משמש את פייתון כדי ליצור אובייקטים המאפשרים לקרוא לפונקציות ע”פ ערך ההחזרה שלהם בצורה אסינכרונית.
פונקציות
set_result
- נותן לאובייקט תוצאה ומסמן אותו כהסתיים.set_exception
- מסמן אותו כהסתיים ונותן לאובייקט שגיאה.done
- מחזירTrue
אם האובייקט הסתיים.cancelled
- מחזירTrue
אם האובייקט התבטל.add_done_callback
- מוסיף פונקציה לקריאה כשהאובייקט מסתיים.result
- מחזיר את התוצאה במידה ויש או מעלה שגיאה רגילה או שגיאת ביטול במידה ונתנו לו שגיאות.
אם האובייקט לא הסתיים אז מעלה שגיאתInvalidStateError
.
1 | import asyncio |
Tasks
Task
הוא אובייקט awaitable
הנותן לנו להריץ כמה פעולות באופן אסינכרוני.Task
מממש את Future
.
כדי להריץ Task
חדש משתמשים ב-asyncio.create_task(...)
.
שימו לב שצריך פונקציה אסינכרונית כדי שתהיה לפונקציה גישה ל-event loop
הפנימי.
לא תוכלו להריץ אותו כך:
1 | import asyncio |
דוגמא:
1 | import asyncio |
Task cancel
מה שמיוחד באובייקטי Task
שקל לבטל אותם במידת הצורך.
הדוגמא הבאה מבטלת פונקציה שרצה בצורה אסינכרונית בצורה רנדומלית:
1 | import asyncio |
כדי לבטל יש לקרוא ל-cancel
:
1 | task.cancel() |
gather
ניתן לחכות לכמות אובייקטי Task
בעזרת gather
.
1 | import asyncio |
תרחיש נפוץ זה יהיה להריץ כמה Tasks
בלולאה ולחכות לכולם:
1 | import asyncio |
האופרנד *
ב-*tasks
מפרק את הרשימה לבודדים וככה ניתן להעביר אותו ל-gather
.
מכיוון ש-asyncio.gather
לא מקבל list
ישירות אנחנו צריכים לפרק אותו כדי להעביר לפונקציה.
Wait
כדי לחכות לcoroutines
ניתן להשתמש בפונקציית wait
:
1 | import asyncio |
במקום לבצע gather
,
Main
מחכה לשלושת הפונקציות.
תרחיש נפוץ יהיה לחכות לראשון שיסתיים, בעזרת return_when=
ניתן לומר לו לסיים לחכות כשהראשון יצא:
1 | import asyncio |
במקרה הזה כש-C
יסתיים התכנית תצא.
ניתן להעביר ל-return_when
את הפרמטרים הבאים:
asyncio.FIRST_EXCEPTION
- יסיים לחכות במידה ואחד מהם ייזרוק שגיאה. אחרת ייחכה לכולם.asyncio.FIRST_COMPLETED
- הראשון שיסתיים.asyncio.ALL_COMPLETED
- מחכה שכולם יסתיימו.
wait_for
דרך נוספת לחכות היא בעזרת wait_for
שגם נותנת לנו דרך ליצור timeout.
to_thread
בפרק 24 למדנו על CPU Bound
ו-IO Bound
.
פרק 24: פייתון 25 - מקביליות ואסינכרוניות חלק א
פונקציית to_thread
מאפשרת לנו להזריק coroutine
לתהליכון.
זה מאוד שימושי עבור פונקציות שהן IO Bound
כדי לאפשר מקביליות יותר טובה.
תהליכון אחד ייחכה למשאב ה-IO
כאשר שאר הCoroutines
יוכלו לרוץ במקביל.
בנוסף לכך הכרנו כבר את המושג GIL
- Global intercepter Lock
.
מכיוון שיש לנו את ה-GIL
פונקציות מקביליות לא רצות בצורה אופטימלית עם תהליכונים.
במיוחד עם קוד אסינכרוני שעובד עם לולאת אירועים - מס’ תהליכונים רב יאיט את ריצת התהליכונים עבור חישובים שהם CPU Bound
.
ולמה?
כי ה-GIL
נועל את ההרצה.
למרות זאת נוכל להריץ פונקציות שהיו חוסמות אותנו בצורה אסינכרונית עם to_thread
.
1 | import asyncio |
זאת אחת הדרכים גם ליצור אינטגרציה עם פונקציות סינכרוניות לקוד אסינכרוני מקבילי.ReadLongSync
היא סנכרונית (ללא async
).
כשהשתמשנו ב-to_thread
בעצם קיבלנו coroutine
!
1 | async def to_thread(func, /, *args, **kwargs): |
כבר הכרנו מעט את functools
שנותן לנו פונקציות שימושיות לבניית פונקציות.
מה קורה כאן בעצם?
יש לנו שני פרמטרים חשובים - loop
ו-context (ctx)
.
ה-loop
הוא ה-Event loop שאנחנו משתמשים בו.
ה-ctx
מכיל את המידע עבור הריצה.
זהו משתנה פנימי שמועתק כדי להכיל את המידע, ניתן היה להשתמש גם ב-threading.local
אבל זה לא עוזר כשהקוד שלנו רץ בצורה אסינכרונית על תהליכון אחד!
ולכן יש context
שונה.
החלק השני של הפונקציה הוא partial
שנותן לנו לבנות פונקצייה חדשה.
אנחנו בונים את ctx.run
עם הפונקציה שאנחנו מעבירים והפרמטרים שלה.
ובסוף מריצים את זה בתוך ה-Event Loop עם run_in_executor
.
והיא מתזמנת את הפונקציה run
שקוראת בתורה לפונקציה שנתנו func
בצורה אסינכרונית.
Async iterators
זוכרים iterator
מפרק 23?
נלמד על תוסף חדש בפייתון 3.5
: async for
.
בדוגמא הבאה אנחנו מדמים hasing
ארוך -
במקרה שלנו המימוש כולל את md5
והמתנה של 2 שניות.
Hashing
הוא מנגנון שלוקח ערך והופך אותו לתווים רנדומליים בצורה חד-חד ערכית.
ז”א שעבור כל ערך שנביא לפונקציה הזו נקבל hash
שונה.
נרצה שהפונקציה תהיה דטרמיניסטית, עבור אותו ערך נקבל תוצאה זהה.
למשל עבור 1234
נקבל 81dc9bdb52d04dc20036dbd8313ed055
.
ונרצה שכל פעם שנביא לפונקציה 1234
נקבל את הערך הזה.
בצורה הזו אין צורך לשמור סיסמאות מהמשתמש אלה נשתמש בפונקציה הזו ונשווה את ה-Hash
שהתקבל.
בצורה הזו נשמור על הסיסמאות:
- לא שומרים סיסמאות בשום מקום - אלה את ה-Hash.
- לא שולחים סיסמאות, שולחים את ה-Hash.
- בדרך כלל משתמשים באלגוריתמי הצפנה גם ל-Hash כדי לא לשלוח אותו בצורה גלויה.
טיפ:
אם אתר נותן לכם לראות את הסיסמא שלכם במלואה - כנראה הם שומרים אותה וזה לא בטיחותי!
1 | import asyncio |
שני החלקים הכי חשובים אלו:
1 | async def __anext__(self): |
כאן אנחנו מגדירים פונקציות next
אסינכרונית!
פייתון נתנו את השם anext
.
ומכיוון שאלו פונקציות מיוחדות אנחנו כותבים __anext__
.
חייבים להגדיר גם פונקציית __aiter__
כדי לומר לפייתון שהמחלקה הזו מממשת איטרטור אסינכרוני.
1 | def __aiter__(self): |
החלק השני הוא השימוש באיטרטור בצורה פשוטה עם async for
.
1 | async for i in PasswordHasherAsync(passwords): |
שימו לב לעוד משהו - עטפנו איטרטור סנכרוני שהוא הרשימה passwords
.
בצורה הזו ניתן להשתמש באיטרטור שלה כדי לרוץ על הרשימה בצורה ידנית.
aiofiles
מודול aiofiles
הוא מודול חיצוני שמעניק לנו פעולות אסינכרוניות על קבצים.
הוא דומה מאוד למודול הרגיל של הקבצים open('fileName','w')
אך מוסיפים async
!
קריאת שורות בצורה אסינכרונית:
1 | import asyncio |
תרגילים
כתבו תכנית אסינכרונית שמקבלת כקלט כמות מספרים לייצר.
התכנית תייצר בצורה אסינכרונית רשימה עם מספרים רנדומלים כגודל המספר שהתקבל כקלט ותדפיס אותם.ניקח את התכנית מתרגיל 1 ונשתמש בפונקציה שיוצרת מספרים רנדומליים.
הפעם נוסיף מילון שיהווה שיחזיק כמפתח שם קובץ והערך יהיה כמות המספרים הרנדומליים שיש לייצר לתוכו.
המילון ייראה כך:
1 | dic = {'one.txt' : 5, 'two.txt': 6, 'three.txt':25000} |
עבור כל קובץ יש ליצור task
חדש ולחכות לכולם שיסתיימו!
gather
או wait
.
- כתבו תכנית שמקבלת נתיב של קובץ ובודקת כל שנייה אם הוא קיים או לא.
התכנית תהיה אסינכרונית -
במידה והמשתמש לחץ על האותq
התכנית תצא.
במידה והמשתמש לחץr
התכנית תבטל את הבדיקה ותבקש נתיב קובץ אחר.
אחרי שהמשתמש שם נתיב אחר התכנית תחזור לבדוק אם הקובץ קיים או לא.
עבור התרגיל הזה אני ממליץ להתקין את חבילת aioconsole
.
1 | pip install aioconsole |
השימוש פשוט:
1 | import aioconsole |
1 | import asyncio |
1 | import asyncio |
1 | import asyncio |
AsyncIO
הוא שיפור משמעותי אבל כל קוד פייתוני וחייב לדעת כדי לפתח קוד ברמה גבוה!
בפרקים הבאים נעלה את רמת איכות הקוד שלנו עם Linter
ים.
כלים שעוזרים להפיק מהקוד את המיטב.