December 10, 2021

פייתון 25 - מקביליות ואסינכרוניות חלק א


פרק קודם:

פייתון 24 - ג'נרטורים

מה נלמד

  • המעבד וליבותיו
  • מקביליות
  • אסינכרוניות
  • Threading
  • סנכרון תהליכונים
  • בעיות נפוצות בתהליכונים
  • GIL

הקדמה

למדנו שבפייתון הקוד רץ ע”פ רצף סידור הקוד.

1
2
3
print("This is the first line")
print("This is the second line")
print("This is the third line")

דרך נוספת להריץ את הקוד היא במקביל - ז”א שקוד א וקוד ב ירוצו אחד ליד השני ללא קשר לסדר הופעתם.

בפסודו קוד:

1
2
3
4
5
runParallel:
print("This is a line")

runParallel:
print("This is another line")

כאשר דברים רצים במקביל - לא נוכל לדעת מה סדר הופעתם אלה אם כן נסנכרן ידנית את הסדר!
ולכן או שתודפס קודם השורה הראשונה או השנייה - אין לדעת.

המעבד וליבותיו

המעבד היא היחידה הראשית במחשב המבצעת פקודות ומקבלת אותות.
למשל - כל כפתור שנלחץ במקלדת מועבר כאות למעבד והמעבד מחליט מה להריץ ואם האות חשוב מספיק כדי לתת לו עדיפות.

כל מעבד עובד כ”שעון” והוא מייצר חזרות חשמליות.
השעון מגדיר את קצב הפקודות לשנייה ובד”כ הוא רשום כיום כ - Ghz - ג’יגה הרץ.

איך לצפות במידע על המעבד

  1. לחצו WinKey+ R.
  2. כתבו msinfo32 והריצו.

תחת Processor יופיע לכם המידע.
למשל אצלי:

Processor 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz, 2304 Mhz, 8 Core(s), 16 Logical Processor(s)

נוכל לראות שכתוב 2.30 Ghz.
זאת אומרת הוא מקבל 2.3 מיליארד חזרות לשנייה.
וכל ליבה תאורטית מקבלת גם כן 2.30 Ghz.

ליבות

כל ליבה היא יחידת הרצה.
מה היא מריצה? קוד כמובן!

שימו לב שבאדום כתוב 8 cores וישנם גם 16 logical cores.
המערכת הפעלה יודעת לנצל ליבות וליצור ליבות ווירטואליות.
לא ניכנס לזה לעומק אך מה שחשוב לדעת זה שכמות הקוד המקבילי האמיתי ירוץ לפי מספר הליבות.
וכמות הקוד שירוץ באופן אסינכרוני ירוץ לפי מספר הליבות הווירטואליות.

מקביליות מול אסינכרוניות

קוד חוסם

קוד חוסם הוא קוד שרץ בצורה רציפה:

מבצע פעולה כלשהי, מחכה 5 שניות ואז שוב מבצע פעולה .

קוד מקבילי

מריץ 2 פעולות במקביל ללא הפרעה.

קוד אסינכרוני

קוד שלא חוסם קטע קוד אחר:
בזמן שהפעולה הראשונה מתבצעת ניתן לבצע פעולה שנייה.

אחרי שהפעולה הראשונה מסתיימת הקוד מקבל עדכון שהוא הסתיים ומבצע בהתאם.

חשוב לזכור:

קוד מקבילי =\= קוד אסינכרוני

קוד שעובד במקביל יכול להיות לא אסינכורני,
וקוד שעובד באופן אסינכרוני יכול לרוץ גם לא באופן מקבילי!

Threading API

נלמד בחלק הזה על הפונקציות של thread - אלו פונקציות הבסיס לכל התהליכונים.
בחלק השני נלמד על פונקציות חדשות יותר תחת asyncio.

Thread - תהליכון

תהליכון הוא אובייקט של מערכת ההפעלה הנותן לנו אפשרות להריץ קוד באחת מהליבות של מערכת ההפעלה.
הקוד ירוץ במקביל.

תהליכון מול תהליך - Executable vs Thread

קיימת טרמינולוגיה לא נכונה על תהליכים ותהליכונים.
Executable מאחסן בתוכו מידע ותהליכונים.
Thread - תהליכון, אשר מריץ קוד.

לעיתים אומרים “תריץ את התהליך” - אך זו אמרה לא נכונה, כי מה שרץ באמת זה תהליכונים ולא תהליכים!
לכל Executable קיים תהליך ראשי - The main Thread.
כל הקוד שרץ רץ בתוכו.

ואיך פייתון נראה?

אני מריץ קוד מקבילי עם תהליכונים דרך Visual Studio Code ככה שיש לי גם דיבאגר.
במבט דרך Process Explorer אני יכול לראות איזה תהליכים רצים ואת התהליכונים של ההרצה.

שימו לב שנראה את סביבת הריצה של פייתון ולא הקוד שלנו!
מכיוון שהסביבה של פייתון ממסכת לנו את המורכבויות!

אובייקט Thread

המודול threading נותן לנו כלים כדי להשתמש בתהליכונים.
האובייקט Thread משמש אותנו כדי ליצור תהליכון ולהריץ אותו.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time

def do(threadName):
for i in range(5):
print(f'Hello from {threadName}\n')

# This is the main thread
try:
threadOne = threading.Thread(target=do, args=['Thread-One'])
threadTwo = threading.Thread(target=do, args=['Thread-Two'])
threadOne.start()
threadTwo.start()
except:
print("Error: unable to start thread")

# time.sleep halts execution for 3 seconds
time.sleep(3)

בדוגמא הזו אנו יוצרים שני תהליכונים עם פונקציה שמדפיסה 5 לפני שהתליכון יוצא.

התהליכון הראשי שלנו ישן 3 שניות במהלך הזמן הזה.

שאלה: האם שני התליכונים שיצרנו נגמרים לפני ה-3 שניות?

כדי לקבל את האובייקט Thread שכרגע אנחנו נמצאים בו ניתן להשתמש ב - threading.current_thread().

1
2
3
4
5
6
import threading

thread = threading.current_thread()

print(thread.getName())
print(thread.native_id)

מה יודפס?

getName & setName

לכל תהליך ולכל תהליכונים יש מספר המייצג אותו הנקרא - Id.
כדי לקבל את המספר של התהליכון נשתמש ב - native_id.
נוכל לתת לתהליכונים גם שם שמייצג מה התהליכון מבצע.

  • getName - נותן לנו את השם
  • setName - נותן לתהליכון שם חדש

start & run

למה יש שני מתודות שלפי שמן עושות את אותו הדבר?

  • run - המתודה מבצעת את הפעולה שהוגדרה לתהליכון, נגיע לזה עוד מעט שאפשר לרשת את מחלקת התהליכון.
  • start - מתודה לא משתנה שמתחילה את הפעולה שמוגדרת ב-run.

ניתן ליצור תהליכון שעוד לא התחיל את העבודה.

1
2
3
4
5
6
7
import threading

def MakeThread(func):
return threading.Thread(target=func)

thread = MakeThread(lambda: print("hello world"))
thread.start()

join

מתודת הסנכרון הכי בסיסית של תהליכונים.
מחכה שהתהליכון יסתיים או שהזמן שהוגדר ייעבור.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
import time

def Run():
for i in range(5):
time.sleep(1)
print(i)

thread = threading.Thread(target= Run)
thread.start()

print('Waiting....')

thread.join()

print('Done!')

Daemon threads

מה מגדיר “האם התהליך יכול לצאת”?
כשכל התהליכונים הראשיים של התהליך הסתיימו!

תהליכון יכול להיות מוגדר כראשי או ברקע.
בפייתון הטרמינולוגיה הזו נקראת - Daemon thread.(מושג הנפוץ בלינוקס).

מה ייקרה אם נגדיר את התהליכון שייעבוד ברקע?

1
2
3
4
5
6
7
8
9
10
import threading
import time

def Run():
for i in range(5):
time.sleep(1)
print(i)

thread = threading.Thread(target= Run, daemon=True)
thread.start()

ירושת Thread

ניתן ליצור תהליכים ע”י ירושה.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

class MyWorker(threading.Thread):
def run(self):
i = 10
while(i > 0):
print(i)
time.sleep(1)
i -= 1

worker = MyWorker()

worker.start()

כפי שהזכרנו קודם פונקציית ה-Run מבצעת את הפעולה עצמה שהתהליכון מבצע.
ופונקצייה ה-Start מתחילה את התהליכון.

בשיטת הירושה ניתן לנהל את עיצוב הקוד בעזרת מחלקות ולא רק פונקציות.
לשתי השיטות שימושים שונים!


תרגיל

  1. הריצו שני תהליכונים הסופרים מ1-100 ותחכו שהם יסתיימו לפני
    כתבו מה הבעיה בפלט שייצא לכם.

  2. תכתבו תכנית אשר מקבלת כקלט נתיב לתיקייה והתכנית עוקבת אחרי שינויים בתיקייה.
    השינויים הבסיסיים שיש לממש זה - אם קובץ נוסף או נמחק
    אין צורך לבדוק תתי תיקיות.

תוכלו להיעזר בפרק: פייתון 11 - קידוד וקבצים


סנכרון תהליכונים

כעת נוכל לתקן את הבעיה שהייתה לנו בתרגיל הראשון!

בשביל לתקן את הספירה של שני תהליכונים נצטרך “לדבר” בין התהליכונים.
אחד הכלים הכי נפוצים אצלנו זה בעזרת נעילות.
ניתן לראות נעילה כמו רמזור - עד שאנחנו לא חוצים ולא משחררים את הנעילה התהליכון השני מחכה.
ממש כמו רמזור כאשר מכונית אחת חוצה ואחרת מחכה לתהליך להסתיים!

Lock

כדי לנעול בפייתון משתמשים ב- threading.lock.

1
2
3
4
5
6
7
8
9
import threading

lock = threading.Lock()

lock.acquire()

# כל מה שכאן מוגן בעזרת ה-Lock

lock.release()

או בפשטות יותר בעזרת with:

1
2
3
4
5
6
import threading
lock = threading.Lock():

with lock:
# כל מה שכאן מוגן בעזרת ה-Lock
pass

זה מתורגם בפייתון ל:

1
2
3
4
5
lock.acquire()
try:
# כל מה שכאן מוגן בעזרת ה-Lock
finally:
lock.release()

בעצם לא משנה אם תקרה שגיאה אנחנו כל הזמן נשחרר את ה-Lock.
אחרת יכולה להיווצר בעיה בשם - DeadLock.
שאנחנו נעולים לעד!
עוד על הנושא הזה בהמשך.


תרגיל

  1. תסנכרנו את הפלט של התרגיל הראשון הקודם!

Context-Switching

כאשר ליבה מריצה תהליכון זה לא התהליכון היחיד שהיא יכולה להריץ.
כמות התהליכים במערכת יכולה להגיע למאות ואלפים,
וכמות התהליכונים לאלפים ועשרות אלפים!

אז… איך 8 ליבות מריצות כל כך הרבה תהליכונים?

  1. לא כל התהליכונים רצים - חלק התסיימו, חלקם מחכים ולחלקם מערכת ההפעלה עשתה הפסקה.
  2. בעזרת סנכרון חכם בין הריצות של תהליכונים.

בעצם המערכת ההפעלה מתזמנת תהליכונים על הליבות של המעבד ונותנת עדיפות לחלק מהתהליכונים.
למשל - תוכנות אשר אתם משתמשים בהם ומוצגים מקבלים עדיפות על תוכנות שסגרתם או מזערתם.

מה שזה אומר בפועל - שחלק מהתהליכונים מקבלים זמן ריצה יותר גדולה.
ולרוב זה לא בשליטתנו.

מתואר בציור תהליכון 1 מקבל זמן ריצה, ולאחר Context-Switch תהליכון 2 רץ.

מתוך זה ניתן להסיק - כדאי להריץ תהליכונים כמספר הליבות.
לא נוכל להריץ 128 תהליכונים על מעבד עם 8 ליבות פיזיות,
אחרת 120 תהליכונים אחרים ייצטרכו “להתחרות” על זמן ריצה ונשלם את הזמן רק על להחליף זמני ריצה של תהליכונים.

CPU Bound

פעולות אשר מחשבות ומשתמשות בזמן ריצה נקראות CPU Bound.

פעולות כאלו נרצה להריץ כמספר הליבות

למשל אם במערכת שלי יש 8 ליבות פיזיות ו16 ווירטואליות,

IO Bound

פעולות אשר טוענות קבצים מדיסק, מחכות לקלט מהאינטרנט או כל פעולה אחרת שהיא - Input/Output נקראת IO Bound.
מכיוון שיש מנגנוני סנכרון שמחכים למידע ניתן להריץ מספר בלתי מוגבל של תהליכונים שהם IO Bound.

תרגיל

מנגנוני סנכרון

המנגון Lock מאפשר לנו ליצור “צומת” דרכים עבור תהליכונים כדי שהם לא ייתנגשו.
ל-Lock יש שם נוסף והוא בד”כ נקרא Mutex.
לכן אם אתם שומעים שמדברים על Mutexים תדעו שמדובר בנעילה!

מה יכול לקרות כשהם מתנגשים ללא מנגנון סנכרון מספיק טוב?

גישה לאותו המשתנה

מה ייקרה בקוד הבא?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time
from dataclasses import dataclass

@dataclass
class Data:
Output: str = ''

data = Data()

def Count(data: Data):
for i in range(10000):
i += 1
data.Data = str(i)
print(data.Data)


threadOne = threading.Thread(target=Count, args=[data])
threadTwo = threading.Thread(target=Count, args=[data])

threadOne.start()
threadTwo.start()

threadOne.join()
threadTwo.join()

הקוד יוצר משתנה מסוג Data המועבר לשני התהליכונים.
השימוש בו לא מסונכרן ולכן לא נדע במהלך הריצה של התהליכונים מה יהיה באמת ב- data.Data.
הריצו את הקוד ותראו את התוצאה!

RaceCondition

במקרה הקלאסי של שני תהליכונים לא מסונכרנים יכול להיווצר לנו מצב תחרותי שיהיה תלוי במי מהתהליכונים יגיע לתוצאה קודם.

נסו לענות על השאלות הבאות לפני שאתם ממשיכים:

  1. מה יודפס?
  2. כיצד יש לתקן זאת?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading
import time
from dataclasses import dataclass

@dataclass
class Data:
Output: str = ''
HasData: bool = False

data = Data()

def PushIntoData(data: Data):
time.sleep(5)
data.HasData= True
data.Data = 'Output'

def PullData(data: Data):
time.sleep(3)
if data.HasData:
print(data.Data)

threadOne = threading.Thread(target=PushIntoData, args=[data])
threadTwo = threading.Thread(target=PullData, args=[data])

threadOne.start()
threadTwo.start()

threadOne.join()
threadTwo.join()
  1. לא יודפס כלום כי לתהליכון הראשון לוקח יותר זמן לשים מידע מאשר לתהליכון השני לקרוא מידע.
  2. אם עניתם שיש לתקן זאת בעזרת השמה של יותר זמן ב-PullData אתם טועים!
    ב-Race condition המשמעות של “זמן” היא בדיוק הבעיה.
    תהליכונים מתנגשים כי אין בינהם סנכרון ותזמון התהליכונים גורם לבעיה הזו מלכתחילה.
    לתקן את זה בעזרת time.sleep() זהו טריק מלוכלך שלא מומלץ לעשות את זה.

אז מה אפשר לעשות?
לסנכרן כמובן!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time
from dataclasses import dataclass

@dataclass
class Data:
Output: str = ''
HasData: bool = False
__locking: threading.Lock = threading.Lock()

def WaitForData(self):
hasData = False
while not hasData:
with self.__locking:
hasData = self.HasData

def PopulateData(self, data):
with self.__locking:
self.HasData = True
self.Output = data


data = Data()

def PushIntoData(data: Data):
time.sleep(5)
data.PopulateData('Output')

def PullData(data: Data):
data.WaitForData()
print(data.Output)

threadOne = threading.Thread(target=PushIntoData, args=[data])
threadTwo = threading.Thread(target=PullData, args=[data])

threadOne.start()
threadTwo.start()

threadOne.join()
threadTwo.join()

שימו לב שכעת ב-PullData אנחנו לא מחכים בעזרת זמן אלה מחכים בעזרת לולאה שבודקת את הערך.
הקטע קוד:

1
2
with self.__locking:
hasData = self.HasData

הוא המסנכרן.
השיטה הזו נקראת Spinlock.
יש דרכים יותר טובות לממש את השיטה הזו אבל בהגדרה שלה היא לולאה אשר מחכה לערך שיהיה מוכן.

DeadLock

סנכרון יכול להסתבך כאשר אנחנו לא שמים לב כיצד אנחנו מסנכרנים את האובייקטים שלנו.

בדוגמא הקודמת נעשתה טעות בקוד, האם אתם יכולים לזהות אותה?
נסו להריץ את הקוד.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time
from dataclasses import dataclass

@dataclass
class Data:
Output: str = ''
HasData: bool = False
__locking: threading.Lock = threading.Lock()

def WaitForData(self):
hasData = False
while not hasData:
self.__locking.acquire()
hasData = self.HasData
self.__locking.release()

def PopulateData(self, data):
self.__locking.acquire()
self.HasData = True
self.Output = data


data = Data()

def PushIntoData(data: Data):
time.sleep(5)
data.WaitForData()
data.PopulateData('Output')

def PullData(data: Data):
data.WaitForData()
print(data.Output)

threadOne = threading.Thread(target=PushIntoData, args=[data])
threadTwo = threading.Thread(target=PullData, args=[data])

threadOne.start()
threadTwo.start()

threadOne.join()
threadTwo.join()

אחת הסיבות ש-with Lock כל כך נוח מכיוון שהוא מבצע נעילה ויציאה אוטומטית!
בדוגמא הזו שכחו לכתוב self.__locking.release() במתודה PopulateData.
וכך גרמנו ל-Deadlock!

התהליכון נעול והתהליכון השני מחכה שהראשון יצא, אך הוא לעולם לא ייצא!

RLock

לעיתים יש לנו פונקציות רקורסיביות או לולאות על קוד שצריך סנכרון.
במידה כזו או שנסכרן את הכל בעזרת Lock או שנוכל להשתמש ב-RLock.

  • RLock - Reentrant lock.

ה-RLock יכול להינעל באותו תהליכון כמה פעמים שהוא רוצה,
אך הוא צריך להשתחרר כשתהליכון אחר יירצה לנעול גם כן.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

def Do(lock, value):
with lock:
print(value)
if value == 0:
return 0
return Do(lock, value - 1)

lock = threading.RLock()
threadOne = threading.Thread(target=Do, args=[lock, 5])
threadTwo = threading.Thread(target=Do, args=[lock, 5])

threadOne.start()
threadTwo.start()

threadOne.join()
threadTwo.join()

Atomics & GIL

האם גישה למשתנה היא פעולה שצריכה סנכרון בין תהליכונים?
כגון:

1
a = 1

בפייתון הפעולה הזו אינה צריכה סנכרון אלה אם נרצה לסנכרן את סדר הפעולות.
ז”א שאין בפייתון סכנה שמשהו יידרוס את המשתנה ויהיה שם ערך לא חוקי מכיוון ששני תהליכונים כותבים לאותו משתנה.

הסיבה לזו היא ה - GIL - Global interpreter lock.
זהו Lock רגיל שנועל את סביבת ההרצה של פייתון.
כל פקודה בפייתון שרצה צריכה להתבצע בתוך ה-Lock הזה.
ולכן פעולות כמו a=1 הן בטוחות לשימוש בסביבת תהליכונים מרובים.

וזו הסיבה למה לעיתים תהליכון אחד יכול להיות יותר מהיר מקוד עם כמה תהליכונים!
השיטה הזו אופטימלית לסנכרון ובקרת תהליכונים אך פוגעת בקוד מרובה תהליכונים!

בזכות ה-GIL כל פעולה כזו נקראת פעולה אטומית.
פעולה אטומית זו פעולה שניתן לבטוח בה שהיא תסתיים בפעולה אחת.

למשל זו פעולה אטומית:

1
a = 1

פונקציות לא פעולות אטומיות:

1
2
3
4
def Do():
a = 1
b = 2
return a + b

פעולה Do מורכבת מ-4 פעולות:

  1. לשים a =1
  2. b= 2
  3. חיבור a+b
  4. החזרת הערך

תרגילים

  1. כתבו פונקציה שמבצעת פעולה כלשהי במקביל על כל הליבות של המערכת על רשימה.
    הפונקציה תחלק את הרשימה לתתי רשימות ע”פ כמות הליבות, למשל אם יש לנו 80 איברים במערך ו8 ליבות,
    יהיו לנו 8 תהליכונים וכל תהליכון ייטפל ב10 איברים.
    את הפונקציה יש לקבל כפרמטר.

כדי לקבל את מספר הליבות השתמשו ב:

1
2
3
import multiprocessing

cores = multiprocessing.cpu_count()

הפונקציה שיש לממש:

1
2
def PerformParallelOn(data: list, func):
pass


בפרק הבא נכסה את כל מנגנוני הסנכרון ונעבור לתכנות מודרני עם - asyncio.
נלמד כיצד לכתוב פעולות אסינכרוניות בקלות בעזרת שיטת ה - async-await ולסנכרן פעולות בעזרת condition ו-Semaphore.

AsyncIO:

פייתון 26 - אסינכרוניות חלק ב

על הפוסט

הפוסט נכתב על ידי Ilya, רישיון על ידי CC BY-NC-ND 4.0.

שתפו את הפוסט

Email Facebook Linkedin Print

קנו לי קפה

#Software#Python