11 min. read

פרק קודם:

פייתון 24 - מקביליות ואסינכרוניות חלק א

מה נלמד

  • אסינכרוניות
  • רכיבי פייתון - asyncio
  • Awaitables: tasks,futures & gather
  • Sleep & Timeouts
  • Cancellation
  • asyncio & Threads
  • async iterators

AsyncIO

קוד מקבילי קיים כבר הרבה זמן - עם הזמן והשימוש בו מתכנתים עלו על כמה תבניות שחוזרות על עצמן:

  • קוד שמחכה לסיום פעולה כלשהי.
  • סנכרון שני קטעי קוד שצריכים לרוץ על אותו משאב
  • יצירת לולאות או תורים

פייתון כמו שפות רבות אחרות זיהו את התבניות האלו ואיגדו אותם תחת ספרייה שנקראת asyncio.

אסינכרוניות

לאסינכרוניות יש הרבה משמעויות כיום כאשר רובן מתרכזות בפרקטיקה.
זאת אומרת איך לבצע קוד לא חוסם.
כדי לכתוב קוד אסינכרוני טוב כדאי קודם כל להבין מהו קוד אסינכרוני.

קוד אסינכרוני

קוד אסינכרוני הוא עקרון שנותן למתזמן כלשהו לתזמן את סדר פעולות הקריאה.

בדוגמא הבאה אנחנו מבצעים 3 פונקציות אסינכרוניות שונות.
Action1Async, Action2Async, Action3Async.

בזמן שהפונקציה הראשונה מחכה המתזמן יודע להריץ את הפונקציה השנייה,
וכאשר הפונקציה השנייה מחכה המתזמן יודע להריץ את הפונקציה השלישית.

על הקונספט הזה כבר למדנו - Coroutines.

מהו Coroutine?

אלו פונקציות שעושות משימות רבות ללא חסימה בינהן.
בדוגמא למעלה למרות שאנחנו מחכים על פונקציה לסיים - המתזמן יודע לתזמן את הפונקציה הבאה בתור.

מיהו המתזמן?

לולאת אירועים - Event Loop

אם נזכר בפוסט - פייתון 23 - ג'נרטורים,
נראה שימוש בפונקצייה - ReadAllAsync.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def ReadFileAsync(name):
#....
pass

def ReadAllAsync(names):
routines = [ReadFileAsync(name) for name in names]
pending = list(routines)
tasks = {task: None for task in pending}

while len(pending) > 0:
for gen in pending:
try:
tasks[gen] = gen.send(tasks[gen])
except StopIteration:
pending.remove(gen)

זהו בעצם מימוש נאיבי מאוד של הקונספט ללולאת אירועים.

לולאת אירועים כמו שהיא נשמעת היא מאוד פשוטה:

  1. מכילה אירוע אחד או יותר
  2. במקרה שיש יותר מאירוע אחד נעשה שימוש בתורים (כמו רשימה)
  3. לוקחת את האירוע האחרון
  4. לעבד את האירוע
  5. לחכות שוב לאירועים

הלולאה שמומשה בפונקציה ReadAllAsync פועלת באופן הזה.

  1. צריך גוף ללולאה

    1
    2
    while len(pending) > 0:
    for gen in pending:
  2. הלולאה לוקחת את ה-“אירוע” האחרון tasks[gen] = gen.send(tasks[gen])

  3. במקרה הזה צריך לנקות את האירועים שהסתיימו:

    1
    2
    except StopIteration:
    pending.remove(gen)

למזלנו לא צריך לממש לולאת אירועים משלנו כי - asyncio נותנת לנו את זה!

Hello world - Async Await

בפרק 23 למדנו על - yield from.
פייתון לקחו את הקונספט הזה ומימשו אותו מחדש עבור אסינכרוניות - קוד לא חוסם.
והוסיפו שתי מילים שמורות נוספות:

  • async - מצהיר על פונקציה אסינכרונית
  • await - מחכה לפונקציה אסינכרונית שתסתיים.

כדי להשתמש ב-asyncio יש לעשות import asyncio
ולהריץ פונקציה אסינכרונית ב- asyncio.run(...).

דוגמא בסיסית:

1
2
3
4
5
6
import asyncio

async def HelloWorldAsync():
print("Hello World")

asyncio.run(HelloWorldAsync())

חייבים async

מה קורה כשאין async?

בפייתון יש כמה סוגי משתנים חדשים שעוזרים לנו להתמודד עם קוד אסינכרוני.
הסוג הראשון כבר הכרנו - coroutine.

כאשר אנחנו מתייגים פונקציה כ-async פייתון יוצרת לנו coroutine מאחורי הקלעים.
ככה הפונקציה asyncio.run(...) יודעת להריץ את הקוד שלנו.

חייבים להחזיר אובייקט מתאים

מה קורה כשאנחנו לא קוראים לפונקציה אלה מעבירים אותה כפרמטר?
בדוגמא הזו אין סוגריים לפונקציה שאנחנו מעבירים ל-run.

הפונקציה run מקבלת coroutine ולא פונקציה אחרת!
ולכן חייבים להחזיר לה coroutine.

עצם הגדרת ה-async בפונקציה אנחנו מחזירים coroutine.

async

כדי להגדיר פונקציה כאסינכרונית מוסיפים async לפונקציה.
פונקציה אסינכרונית היא למעשה coroutine.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def MyAsyncFunc():
pass

async def AnotherAsyncFunc():
pass

def AsyncFuncInDisguise():
pass

def WhatAmI():
return MyAsyncFunc

def WhatAmITwo():
return MyAsyncFunc()

מה מהם הן coroutines?

await

כדי לבצע coroutine או ניתן להחזיר אותם כמו שראינו ב-WhatAmITwo או לבצע await.
אחרת לא יוחזר ערך ההחזרה הנכון!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio 

async def MyAsyncFunc():
pass

async def AnotherAsyncFunc():
pass

def AsyncFuncInDisguise():
pass

def WhatAmI():
return MyAsyncFunc

def WhatAmITwo():
return MyAsyncFunc()

async def main():
await MyAsyncFunc()
await AnotherAsyncFunc()
# בגלל שזה מחזיר קו-רוטינה ניתן לבצע את זה!
await WhatAmITwo()

asyncio.run(main())

פונקציה מול אובייקט coroutine

coroutine function - זוהי הפונקציה שמוגדרת בעזרת async def.
coroutine object - זהו ה-coroutine שיוחזר מהקריאה ל-coroutine function.

כמו שראינו בדוגמא הזו:

1
2
3
4
5
6
7
8
9
async def MyAsyncFunc():
pass

def NonAsync():
return MyAsyncFunc()

myCoroutine = NonAsync()
# NonAsync is NOT a coroutine.
# myCoroutine IS a coroutine.

ניתן לומר גם awaitable object או בקיצור awaitable,
על כל אובייקט שניתן לשים עליו await.

1
2
async def Do(myObj):
await myObj

מבלי לדעת מהו myObj ניתן לומר עליו שהוא awaitable.

Future

asyncio מתחלק לפונקציות ומחלקות ברמה “גבוה” וברמה “נמוכה”.

נלמד על רכיב ברמה הנמוכה שעליו דברים נכתבים, והוא ה-Future.
באפליקציות אין צורך להשתמש ב-Future ישירות.

Future - אובייקט awaitable הנותן לנו את התכונות הבאות:

  • לאפשר לסיים או לבדוק אם התסיים.
  • להחזיר ערך.
  • לזרוק שגיאה.
  • לבדוק ביטול או לבטל את ההרצה.

הוא משמש את פייתון כדי ליצור אובייקטים המאפשרים לקרוא לפונקציות ע”פ ערך ההחזרה שלהם בצורה אסינכרונית.

פונקציות

  • set_result - נותן לאובייקט תוצאה ומסמן אותו כהסתיים.
  • set_exception - מסמן אותו כהסתיים ונותן לאובייקט שגיאה.
  • done - מחזיר True אם האובייקט הסתיים.
  • cancelled - מחזיר True אם האובייקט התבטל.
  • add_done_callback - מוסיף פונקציה לקריאה כשהאובייקט מסתיים.
  • result - מחזיר את התוצאה במידה ויש או מעלה שגיאה רגילה או שגיאת ביטול במידה ונתנו לו שגיאות.
    אם האובייקט לא הסתיים אז מעלה שגיאת InvalidStateError.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def Another(future):
for i in range(5):
print(i)
if i == 2:
future.set_result(True)
await asyncio.sleep(1)


async def main():
loop = asyncio.get_running_loop()

future = loop.create_future()
future.add_done_callback(lambda obj: print(f"From done {obj}"))

await Another(future)
await future
print('Finished')

asyncio.run(main())

Tasks

Task הוא אובייקט awaitable הנותן לנו להריץ כמה פעולות באופן אסינכרוני.
Task מממש את Future.

כדי להריץ Task חדש משתמשים ב-asyncio.create_task(...).
שימו לב שצריך פונקציה אסינכרונית כדי שתהיה לפונקציה גישה ל-event loop הפנימי.

לא תוכלו להריץ אותו כך:

1
2
3
4
5
6
7
8
9
10
import asyncio

async def Func():
pass

def Main():
task = asyncio.create_task(Func())
return task

asyncio.run(Main())

דוגמא:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
from dataclasses import dataclass

@dataclass
class Result:
Result: bool = False

async def ReadFileAsync(result: Result):
await asyncio.sleep(5)
print("Read file")
result.Result = True

async def SpinCheck(result: Result):
while not result.Result:
print("Spinning")
await asyncio.sleep(1)

async def main():
result = Result()
task = asyncio.create_task(ReadFileAsync(result))
task2 = asyncio.create_task(SpinCheck(result))

await task
await task2

asyncio.run(main())

Task cancel

מה שמיוחד באובייקטי Task שקל לבטל אותם במידת הצורך.
הדוגמא הבאה מבטלת פונקציה שרצה בצורה אסינכרונית בצורה רנדומלית:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import random

async def SpinForAsync(seconds):
for i in range(seconds):
print(f"Running {i}")
await asyncio.sleep(1)

async def CheckAsync(task: asyncio.Task):
while True:
await asyncio.sleep(1)
rand = random.randint(1, 10)
if rand < 3:
print("Cancelling!")
task.cancel()
break
else:
print(f"{rand} Not cancelled")

async def RunAsync():
task = asyncio.create_task(SpinForAsync(25))
waitTask = asyncio.create_task(CheckAsync(task))

await waitTask
try:
await task
except asyncio.CancelledError:
print("Caught cancellation")

asyncio.run(RunAsync())

כדי לבטל יש לקרוא ל-cancel:

1
task.cancel()

gather

ניתן לחכות לכמות אובייקטי Task בעזרת gather.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def RunAsync(name):
for i in range(10):
print(f'{name}_{i}')
await asyncio.sleep(1)

async def Main():
taskA = asyncio.create_task(RunAsync('A'))
taskB = asyncio.create_task(RunAsync('B'))
taskC = asyncio.create_task(RunAsync('C'))

await asyncio.gather(taskA, taskB, taskC)

asyncio.run(Main())

תרחיש נפוץ זה יהיה להריץ כמה Tasks בלולאה ולחכות לכולם:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def RunAsync(name):
for i in range(10):
print(f'{name}_{i}')
await asyncio.sleep(1)

async def Main():
names = ['A','B','C']
tasks = [asyncio.create_task(RunAsync(name)) for name in names]
await asyncio.gather(*tasks)

asyncio.run(Main())

האופרנד * ב-*tasks מפרק את הרשימה לבודדים וככה ניתן להעביר אותו ל-gather.
מכיוון ש-asyncio.gather לא מקבל list ישירות אנחנו צריכים לפרק אותו כדי להעביר לפונקציה.

Wait

כדי לחכות לcoroutines ניתן להשתמש בפונקציית wait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def RunAsync(name, times):
for i in range(times):
print(f'{name}_{i}')
await asyncio.sleep(1)

async def Main():
taskA = asyncio.create_task(RunAsync('A', 5))
taskB = asyncio.create_task(RunAsync('B', 3))
taskC = asyncio.create_task(RunAsync('C', 2))

await asyncio.wait({taskA, taskB, taskC})

asyncio.run(Main())

במקום לבצע gather,
Main מחכה לשלושת הפונקציות.

תרחיש נפוץ יהיה לחכות לראשון שיסתיים, בעזרת return_when= ניתן לומר לו לסיים לחכות כשהראשון יצא:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def RunAsync(name, times):
for i in range(times):
print(f'{name}_{i}')
await asyncio.sleep(1)

async def Main():
taskA = asyncio.create_task(RunAsync('A', 5))
taskB = asyncio.create_task(RunAsync('B', 3))
taskC = asyncio.create_task(RunAsync('C', 2))

await asyncio.wait({taskA, taskB, taskC}, return_when=asyncio.FIRST_COMPLETED)

asyncio.run(Main())

במקרה הזה כש-C יסתיים התכנית תצא.

ניתן להעביר ל-return_when את הפרמטרים הבאים:

  • asyncio.FIRST_EXCEPTION - יסיים לחכות במידה ואחד מהם ייזרוק שגיאה. אחרת ייחכה לכולם.
  • asyncio.FIRST_COMPLETED - הראשון שיסתיים.
  • asyncio.ALL_COMPLETED - מחכה שכולם יסתיימו.

wait_for

דרך נוספת לחכות היא בעזרת wait_for שגם נותנת לנו דרך ליצור timeout.

to_thread

בפרק 24 למדנו על CPU Bound ו-IO Bound.
פרק 24: פייתון 24 - מקביליות ואסינכרוניות חלק א

פונקציית to_thread מאפשרת לנו להזריק coroutine לתהליכון.
זה מאוד שימושי עבור פונקציות שהן IO Bound כדי לאפשר מקביליות יותר טובה.

תהליכון אחד ייחכה למשאב ה-IO כאשר שאר הCoroutines יוכלו לרוץ במקביל.

בנוסף לכך הכרנו כבר את המושג GIL - Global intercepter Lock.
מכיוון שיש לנו את ה-GIL פונקציות מקביליות לא רצות בצורה אופטימלית עם תהליכונים.
במיוחד עם קוד אסינכרוני שעובד עם לולאת אירועים - מס’ תהליכונים רב יאיט את ריצת התהליכונים עבור חישובים שהם CPU Bound.
ולמה?
כי ה-GIL נועל את ההרצה.

למרות זאת נוכל להריץ פונקציות שהיו חוסמות אותנו בצורה אסינכרונית עם to_thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time

def ReadLongSync():
print("Reading...")
time.sleep(3)
print("Done reading...")


async def MainAsync():
print("Start")
await asyncio.to_thread(ReadLongSync)
print("Finished")

asyncio.run(MainAsync())

זאת אחת הדרכים גם ליצור אינטגרציה עם פונקציות סינכרוניות לקוד אסינכרוני מקבילי.
ReadLongSync היא סנכרונית (ללא async).
כשהשתמשנו ב-to_thread בעצם קיבלנו coroutine!


Async iterators

זוכרים iterator מפרק 23?

פייתון 23 - ג'נרטורים

נלמד על תוסף חדש בפייתון 3.5: async for.

בדוגמא הבאה אנחנו מדמים hasing ארוך -
במקרה שלנו המימוש כולל את md5 והמתנה של 2 שניות.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import hashlib


class PasswordHasherAsync:
def __init__(self, passwords: list[str]):
self.__passwords = passwords
self.__iter = passwords.__iter__()

async def __LongHashEmulateAsync(self, password):
await asyncio.sleep(2)
return hashlib.md5(password.encode()).hexdigest()

def __aiter__(self):
return self

async def __anext__(self):
pwd = ''
try:
pwd = self.__iter.__next__()
except StopIteration:
raise StopAsyncIteration
return await self.__LongHashEmulateAsync(pwd)


async def MainAsync():
passwords = ['1234', '5jfd8', '3f6sas1']
async for i in PasswordHasherAsync(passwords):
print(i)

asyncio.run(MainAsync())

שני החלקים הכי חשובים אלו:

1
async def __anext__(self):

כאן אנחנו מגדירים פונקציות next אסינכרונית!
פייתון נתנו את השם anext.
ומכיוון שאלו פונקציות מיוחדות אנחנו כותבים __anext__.

חייבים להגדיר גם פונקציית __aiter__ כדי לומר לפייתון שהמחלקה הזו מממשת איטרטור אסינכרוני.

1
2
def __aiter__(self):
return self

החלק השני הוא השימוש באיטרטור בצורה פשוטה עם async for.

1
async for i in PasswordHasherAsync(passwords):

שימו לב לעוד משהו - עטפנו איטרטור סנכרוני שהוא הרשימה passwords.
בצורה הזו ניתן להשתמש באיטרטור שלה כדי לרוץ על הרשימה בצורה ידנית.

aiofiles

מודול aiofiles הוא מודול חיצוני שמעניק לנו פעולות אסינכרוניות על קבצים.
הוא דומה מאוד למודול הרגיל של הקבצים open('fileName','w') אך מוסיפים async!

קריאת שורות בצורה אסינכרונית:

1
2
3
4
5
6
7
8
9
import asyncio
import aiofiles

async def ReadFileAsync(fileName):
async with aiofiles.open(fileName, 'r') as file:
async for line in file:
print(line)

asyncio.run(ReadFileAsync('one.txt'))

ניתן לקרוא עליהם עוד כאן


תרגילים

  1. כתבו תכנית אסינכרונית שמקבלת כקלט כמות מספרים לייצר.
    התכנית תייצר בצורה אסינכרונית רשימה עם מספרים רנדומלים כגודל המספר שהתקבל כקלט ותדפיס אותם.

  2. ניקח את התכנית מתרגיל 1 ונשתמש בפונקציה שיוצרת מספרים רנדומליים.
    הפעם נוסיף מילון שיהווה שיחזיק כמפתח שם קובץ והערך יהיה כמות המספרים הרנדומליים שיש לייצר לתוכו.
    המילון ייראה כך:

1
dic = {'one.txt' : 5, 'two.txt': 6, 'three.txt':25000}

עבור כל קובץ יש ליצור task חדש ולחכות לכולם שיסתיימו!

  1. כתבו תכנית שמקבלת נתיב של קובץ ובודקת כל שנייה אם הוא קיים או לא.
    התכנית תהיה אסינכרונית -
    במידה והמשתמש לחץ על האות q התכנית תצא.
    במידה והמשתמש לחץ r התכנית תבטל את הבדיקה ותבקש נתיב קובץ אחר.
    אחרי שהמשתמש שם נתיב אחר התכנית תחזור לבדוק אם הקובץ קיים או לא.

עבור התרגיל הזה אני ממליץ להתקין את חבילת aioconsole.

1
pip install aioconsole

השימוש פשוט:

1
2
3
4
5
6
7
import aioconsole

async def MyFunc():
userIn = await aioconsole.ainput('Enter: ')
print(UserIn)

asyncio.run(MyFunc)


AsyncIO הוא שיפור משמעותי אבל כל קוד פייתוני וחייב לדעת כדי לפתח קוד ברמה גבוה!

בפרקים הבאים נעלה את רמת איכות הקוד שלנו עם Linterים.
כלים שעוזרים להפיק מהקוד את המיטב.

פייתון 26 - לינטרים ופורמט

אהבתם? מוזמנים להביע תמיכה כאן: כוס קפה