Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками

Автор: | 11.08.2020

Этот урок открывает цикл статей, посвященных параллельному программированию в Python. В рамках данного урока будут рассмотрены вопросы терминологии, относящиеся к параллельному программированию, GIL, создание и управление потоками в Python.

Синхронность и асинхронность. Параллелизм и конкурентность

Для начала разберемся с терминологией, которую мы будем использовать в рамках данного цикла статей, посвященного параллельному программированию на Python.

Синхронное выполнение программы подразумевает последовательное выполнение операций. Асинхронное – предполагает возможность независимого выполнения задач.

Приведем пример из математики, представьте, что у нас есть функция:

\( f(x)=(x+1)^2 \)

Для того, чтобы определить, чему равно значение функции при x=4, нам необходимо вначале вычислить выражение (x+1) и только потом, полученное значение возвести в квадрат:

\( f(4) = (4+1)^2 \)

\( f(4) = 5^2 \)

\( f(4) = 25 \)

Это пример синхронного порядка вычисления: операции были выполнены последовательно и, в данном случае, по-другому быть не могло.

Теперь посмотрите на такую функцию:

\( f(x) = x^2 + 2*x \)

Для вычисления значения функции в точке x=4 мы также можем придерживаться синхронного порядка: вначале выполнить операцию возведения в квадрат, потом вычислим произведение и просуммируем полученные результаты:

\( f(4) = 4^2 + 2*4 \)

\( f(4) = 16 + 2*4 \)

\( f(4) = 16 + 8 \)

\( f(4) = 24 \)

Если внимательно посмотреть на эту функцию, то можно заметить, что для того, чтобы вычислить x^2 не нужно знать значение произведения 2*x и наоборот. Операции вычисления квадратного корня и произведения можно выполнять независимо друг от друга.

\( f(4) = 4^2 + 2*4 \)

… значения 4^2 и 2*4 вычисляются независимо разными вычислителями…

\( f(4) = 16 + 8 \)

\( f(4) = 24 \)

Более житейский пример будет выглядеть так: синхронность —  это когда вы сначала сварили картошку, а потом помыли кастрюлю, и помыть ее раньше того, как в ней приготовили вы не можете. Асинхронность —  это когда вы варите картошку и одновременно прибираетесь на кухне – эти задачи можно выполнять параллельно.

Теперь несколько слов о конкурентности и параллелизме. Конкурентность предполагает выполнение нескольких задач одним исполнителем. Из примера с готовкой: один человек варит картошку и прибирается, при этом, в процессе, он может переключаться: немного прибрался, пошел помешал-посмотрел на картошку, и делает он это до тех пор, пока все не будет готово. 

Параллельность предполагает параллельное выполнение задач разными исполнителями: один человек занимается готовкой, другой приборкой. В примере с математикой операции 4^2 и 2*4 могут выполнять два разных процессора.

Несколько слов о GIL

Для того, чтобы двигаться дальше необходимо сказать несколько слов о GIL. GIL —  это аббревиатура от Global Interpreter Lock – глобальная блокировка интерпретатора. Он является элементом эталонной реализации языка Python, которая носит название CPython. Суть GIL заключается в том, что выполнять байт код может только один поток. Это нужно для того, чтобы упростить работу с памятью (на уровне интерпретатора) и сделать комфортной разработку модулей на языке C. Это приводит к некоторым особенностям, о которых необходимо помнить. Условно, все задачи можно разделить на две большие группы: в первую входят те, что преимущественно используют процессор для своего выполнения, например, математические, их ещё называют CPU-bound, во вторую – задачи работающие с вводом выводом (диск, сеть и т.п.), такие задачи называют IO-bound. Если вы запустили в одном интерпретаторе несколько потоков, которые в основном используют процессор, то скорее всего получите общее замедление работы, а не прирост производительности. Пока выполняется одна задача, остальные простаивают (из-за GIL), переключение происходит через определенные промежутки времени. Таким образом, в каждый конкретный момент времени, будет выполняться только один поток, несмотря на то, что у вас может быть многоядерный процессор (или многопроцессорный сервер), плюс ко всему, будет тратиться время на переключение между задачами. Если код в потоках в основном выполняет операции ввода-вывода, то в этом случае ситуация будет в вашу пользу. В CPython все стандартные библиотечные функций, которые выполняют блокирующий ввод-вывод, освобождают GIL, это дает возможность поработать другим потокам, пока ожидается ответ от ОС.

Потоки в Python

Потоки позволяют запустить выполнение нескольких задач в конкурентном режиме в рамках одного процесса интерпретатора. При этом, нужно помнить о GIL. Все потоки будут выполняться на одном CPU, даже если задачи могут выполняться параллельно.  Поэтому есть такое правило, если ваши задачи в основном потребляют ресурсы процессора, то используйте процессы, если ввод-вывод, то потоки и другие инструменты асинхронного программирования, которые в Python обладают довольно мощным функционалом.

Создание и ожидание завершения работы потоков. Класс Thread

За создание, управление и мониторинг потоков отвечает класс Thread из модуля threading. Поток можно создать на базе функции, либо реализовать свой класс – наследник Thread и переопределить в нем метод run(). Для начала рассмотрим вариант создания потока на базе функции: 

from threading import Thread
from time import sleep

def func():
    for i in range(5):
        print(f"from child thread: {i}")
        sleep(0.5)

th = Thread(target=func)
th.start()

for i in range(5):
    print(f"from main thread: {i}")
    sleep(1)

В приведенном выше примере мы импортировали нужные модули. После этого объявили функцию func(), которая выводит пять раз сообщение с числовым маркером с задержкой в 500 мс. Далее создали объект класса Thread, в нем, через параметр target, указали, какую функцию запускать как поток и запустили его. В главном потоке  добавили код вывода сообщений с интервалом в 1000 мс.

В результате запуска этого кода получим следующее:

from child thread: 0
from main thread: 0
from child thread: 1
from main thread: 1
from child thread: 2
from child thread: 3
from main thread: 2
from child thread: 4
from main thread: 3
from main thread: 4

Как вы можете видеть, код из главного и дочернего потоков выполняются псевдопараллельно (во всяком случае создается такое ощущение), т.к. задержка в дочернем потоке меньше, то сообщение из него появляются чаще.

Если необходимо дождаться завершения работы потока(ов) перед тем как начать выполнять какую-то другую работу, то воспользуйтесь методом join():

th1 = Thread(target=func)
th2 = Thread(target=func)

th1.start()
th2.start()

th1.join()
th2.join()

print("--> stop")

У join() есть параметр timeout, через который задается время ожидания завершения работы потоков.

Для того, чтобы определить выполняет ли поток какую-то работу или завершился используется метод is_alive().

th = Thread(target=func)
print(f"thread status: {th.is_alive()}")

th.start()
print(f"thread status: {th.is_alive()}")

sleep(5)
print(f"thread status: {th.is_alive()}")

В результате получим следующее:

thread status: False

from child thread: 0
thread status: True

from child thread: 1
from child thread: 2
from child thread: 3
from child thread: 4
thread status: False

Для задания потоку имени воспользуйтесь свойством name.

Создание классов наследников от Thread

Ещё одни способ создавать и управлять потоками – это реализовать класс наследник от Thread и переопределить у него метод run(). 

class CustomThread(Thread):
    def __init__(self, limit):
        Thread.__init__(self)
        self._limit = limit

    def run(self):
        for i in range(self._limit):
            print(f"from CustomThread: {i}")
            sleep(0.5)

cth = CustomThread(3)
cth.start()

В терминале получим следующее:

from CustomThread: 0
from CustomThread: 1
from CustomThread: 2

Принудительное завершение работы потока

В Python у объектов класса Thread нет методов для принудительного завершения работы потока. Один из вариантов решения этой задачи – это создать специальный флаг, через который потоку будет передаваться сигнал остановки. Доступ к такому флагу должен управляться объектом синхронизации.

from threading import Thread, Lock
from time import sleep

lock = Lock()

stop_thread = False

def infinit_worker():
    print("Start infinit_worker()")
    while True:
        print("--> thread work")
        lock.acquire()

        if stop_thread is True:
           break

        lock.release()

        sleep(0.1)

    print("Stop infinit_worker()")

# Create and start thread
th = Thread(target=infinit_worker)
th.start()

sleep(2)

# Stop thread
lock.acquire()
stop_thread = True
lock.release()

Если мы запустим эту программу, то в консоли увидим следующее:

Start infinit_worker()
--> thread work
--> thread work
--> thread work
--> thread work
--> thread work
Stop infinit_worker()

Разберемся с этим кодом более подробно. В строке 4 мы создаем объект класса Lock, он используется для синхронизации доступа к ресурсам из нескольких потоков, про них мы более подробно расскажем в следующей статье. В нашем случае, ресурс — это переменная stop_thread, объявленная в строке 6, которая используется как сигнал для остановки потока. После этого, в строке 8, объявляется функция infinit_worker(), ее мы запустим как поток. В ней выполняется бесконечный цикл, каждый проход которого отмечается выводом в терминал сообщения “–> thread work” и проверкой состояния переменной stop_thread. В главном потоке программы создается и запускается дочерний поток (строки 24, 25), выполняется функция задержки и принудительно завершается поток путем установки переменной stop_thread значения True.

Потоки-демоны

Есть такая разновидность потоков, которые называются демоны (терминология взята из мира Unix-подобных систем). Python-приложение не будет закрыто до тех пор, пока в нем работает хотя бы один недемонический поток.

Пример:

def func():
    for i in range(5):
        print(f"from child thread: {i}")
        sleep(0.5)

th = Thread(target=func)
th.start()
print("App stop")

Вывод программы:

from child thread: 0
App stop
from child thread: 1
from child thread: 2
from child thread: 3
from child thread: 4

Как вы можете видеть, приложение продолжает работать, даже после того, как главный поток завершился (сообщение: “App stop”).

Для того, чтобы потоки не мешали остановке приложения (т.е. чтобы они останавливались вместе с завершением работы программы) необходимо при создании объекта Thread аргументу daemon присвоить значение True, либо после создания потока, перед его запуском присвоить свойству deamon значение True. Изменим процесс создания потока в приведенной выше программе:

th = Thread(target=func, daemon=True)

Запустим ее, получим следующий результат:

from child thread: 0
App stop

Поток остановился вместе с остановкой приложения.

P.S.

Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта. Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.
Книга: Линейная алгебра на Python
Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas.  Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге Pandas. Работа с данными”.
Книга: Pandas. Работа с данными

Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками: 2 комментария

  1. Константин

    Замечательные уроки, коротко и понятно излагаете важные вещи!
    Жду следующие статьи, продолжайте в том же духе!

Добавить комментарий

Ваш адрес email не будет опубликован.