Этот урок открывает цикл статей, посвященных параллельному программированию в Python. В рамках данного урока будут рассмотрены вопросы терминологии, относящиеся к параллельному программированию, GIL, создание и управление потоками в Python.
- Синхронность и асинхронность. Параллелизм в конкурентность
- Несколько слов о GIL
- Потоки в Python
- Создание и ожидание завершения работы потоков. Класс Thread
- Создание классов наследников от Thread
- Принудительное завершение работы потока
- Потоки-демоны
Синхронность и асинхронность. Параллелизм и конкурентность
Для начала разберемся с терминологией, которую мы будем использовать в рамках данного цикла статей, посвященного параллельному программированию на Python.
Синхронное выполнение программы подразумевает последовательное выполнение операций. Асинхронное – предполагает возможность независимого выполнения задач.
Приведем пример из математики, представьте, что у нас есть функция:
\( f(x)=(x+1)^2 \)
Для того, чтобы определить, чему равно значение функции при x=4, нам необходимо вначале вычислить выражение (x+1) и только потом, полученное значение возвести в квадрат:
\( f(4) = (4+1)^2 \)
\( f(4) = 5^2 \)
\( f(4) = 25 \)
Это пример синхронного порядка вычисления: операции были выполнены последовательно и, в данном случае, по-другому быть не могло.
Теперь посмотрите на такую функцию:
\( f(x) = x^2 + 2*x \)
Для вычисления значения функции в точке x=4 мы также можем придерживаться синхронного порядка: вначале выполнить операцию возведения в квадрат, потом вычислим произведение и просуммируем полученные результаты:
\( f(4) = 4^2 + 2*4 \)
\( f(4) = 16 + 2*4 \)
\( f(4) = 16 + 8 \)
\( f(4) = 24 \)
Если внимательно посмотреть на эту функцию, то можно заметить, что для того, чтобы вычислить x^2 не нужно знать значение произведения 2*x и наоборот. Операции вычисления квадратного корня и произведения можно выполнять независимо друг от друга.
\( f(4) = 4^2 + 2*4 \)
… значения 4^2 и 2*4 вычисляются независимо разными вычислителями…
\( f(4) = 16 + 8 \)
\( f(4) = 24 \)
Более житейский пример будет выглядеть так: синхронность — это когда вы сначала сварили картошку, а потом помыли кастрюлю, и помыть ее раньше того, как в ней приготовили вы не можете. Асинхронность — это когда вы варите картошку и одновременно прибираетесь на кухне – эти задачи можно выполнять параллельно.
Теперь несколько слов о конкурентности и параллелизме. Конкурентность предполагает выполнение нескольких задач одним исполнителем. Из примера с готовкой: один человек варит картошку и прибирается, при этом, в процессе, он может переключаться: немного прибрался, пошел помешал-посмотрел на картошку, и делает он это до тех пор, пока все не будет готово.
Параллельность предполагает параллельное выполнение задач разными исполнителями: один человек занимается готовкой, другой приборкой. В примере с математикой операции 4^2 и 2*4 могут выполнять два разных процессора.
Несколько слов о GIL
Для того, чтобы двигаться дальше необходимо сказать несколько слов о GIL. GIL — это аббревиатура от Global Interpreter Lock – глобальная блокировка интерпретатора. Он является элементом эталонной реализации языка Python, которая носит название CPython. Суть GIL заключается в том, что выполнять байт код может только один поток. Это нужно для того, чтобы упростить работу с памятью (на уровне интерпретатора) и сделать комфортной разработку модулей на языке C. Это приводит к некоторым особенностям, о которых необходимо помнить. Условно, все задачи можно разделить на две большие группы: в первую входят те, что преимущественно используют процессор для своего выполнения, например, математические, их ещё называют CPU-bound, во вторую – задачи работающие с вводом выводом (диск, сеть и т.п.), такие задачи называют IO-bound. Если вы запустили в одном интерпретаторе несколько потоков, которые в основном используют процессор, то скорее всего получите общее замедление работы, а не прирост производительности. Пока выполняется одна задача, остальные простаивают (из-за GIL), переключение происходит через определенные промежутки времени. Таким образом, в каждый конкретный момент времени, будет выполняться только один поток, несмотря на то, что у вас может быть многоядерный процессор (или многопроцессорный сервер), плюс ко всему, будет тратиться время на переключение между задачами. Если код в потоках в основном выполняет операции ввода-вывода, то в этом случае ситуация будет в вашу пользу. В CPython все стандартные библиотечные функций, которые выполняют блокирующий ввод-вывод, освобождают GIL, это дает возможность поработать другим потокам, пока ожидается ответ от ОС.
Потоки в Python
Потоки позволяют запустить выполнение нескольких задач в конкурентном режиме в рамках одного процесса интерпретатора. При этом, нужно помнить о GIL. Все потоки будут выполняться на одном CPU, даже если задачи могут выполняться параллельно. Поэтому есть такое правило, если ваши задачи в основном потребляют ресурсы процессора, то используйте процессы, если ввод-вывод, то потоки и другие инструменты асинхронного программирования, которые в Python обладают довольно мощным функционалом.
Создание и ожидание завершения работы потоков. Класс Thread
За создание, управление и мониторинг потоков отвечает класс Thread из модуля threading. Поток можно создать на базе функции, либо реализовать свой класс – наследник Thread и переопределить в нем метод run(). Для начала рассмотрим вариант создания потока на базе функции:
from threading import Thread from time import sleep def func(): for i in range(5): print(f"from child thread: {i}") sleep(0.5) th = Thread(target=func) th.start() for i in range(5): print(f"from main thread: {i}") sleep(1)
В приведенном выше примере мы импортировали нужные модули. После этого объявили функцию func(), которая выводит пять раз сообщение с числовым маркером с задержкой в 500 мс. Далее создали объект класса Thread, в нем, через параметр target, указали, какую функцию запускать как поток и запустили его. В главном потоке добавили код вывода сообщений с интервалом в 1000 мс.
В результате запуска этого кода получим следующее:
from child thread: 0 from main thread: 0 from child thread: 1 from main thread: 1 from child thread: 2 from child thread: 3 from main thread: 2 from child thread: 4 from main thread: 3 from main thread: 4
Как вы можете видеть, код из главного и дочернего потоков выполняются псевдопараллельно (во всяком случае создается такое ощущение), т.к. задержка в дочернем потоке меньше, то сообщение из него появляются чаще.
Если необходимо дождаться завершения работы потока(ов) перед тем как начать выполнять какую-то другую работу, то воспользуйтесь методом join():
th1 = Thread(target=func) th2 = Thread(target=func) th1.start() th2.start() th1.join() th2.join() print("--> stop")
У join() есть параметр timeout, через который задается время ожидания завершения работы потоков.
Для того, чтобы определить выполняет ли поток какую-то работу или завершился используется метод is_alive().
th = Thread(target=func) print(f"thread status: {th.is_alive()}") th.start() print(f"thread status: {th.is_alive()}") sleep(5) print(f"thread status: {th.is_alive()}")
В результате получим следующее:
thread status: False from child thread: 0 thread status: True from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4 thread status: False
Для задания потоку имени воспользуйтесь свойством name.
Создание классов наследников от Thread
Ещё одни способ создавать и управлять потоками – это реализовать класс наследник от Thread и переопределить у него метод run().
class CustomThread(Thread): def __init__(self, limit): Thread.__init__(self) self._limit = limit def run(self): for i in range(self._limit): print(f"from CustomThread: {i}") sleep(0.5) cth = CustomThread(3) cth.start()
В терминале получим следующее:
from CustomThread: 0 from CustomThread: 1 from CustomThread: 2
Принудительное завершение работы потока
В Python у объектов класса Thread нет методов для принудительного завершения работы потока. Один из вариантов решения этой задачи – это создать специальный флаг, через который потоку будет передаваться сигнал остановки. Доступ к такому флагу должен управляться объектом синхронизации.
from threading import Thread, Lock from time import sleep lock = Lock() stop_thread = False def infinit_worker(): print("Start infinit_worker()") while True: print("--> thread work") lock.acquire() if stop_thread is True: break lock.release() sleep(0.1) print("Stop infinit_worker()") # Create and start thread th = Thread(target=infinit_worker) th.start() sleep(2) # Stop thread lock.acquire() stop_thread = True lock.release()
Если мы запустим эту программу, то в консоли увидим следующее:
Start infinit_worker() --> thread work --> thread work --> thread work --> thread work --> thread work Stop infinit_worker()
Разберемся с этим кодом более подробно. В строке 4 мы создаем объект класса Lock, он используется для синхронизации доступа к ресурсам из нескольких потоков, про них мы более подробно расскажем в следующей статье. В нашем случае, ресурс — это переменная stop_thread, объявленная в строке 6, которая используется как сигнал для остановки потока. После этого, в строке 8, объявляется функция infinit_worker(), ее мы запустим как поток. В ней выполняется бесконечный цикл, каждый проход которого отмечается выводом в терминал сообщения “–> thread work” и проверкой состояния переменной stop_thread. В главном потоке программы создается и запускается дочерний поток (строки 24, 25), выполняется функция задержки и принудительно завершается поток путем установки переменной stop_thread значения True.
Потоки-демоны
Есть такая разновидность потоков, которые называются демоны (терминология взята из мира Unix-подобных систем). Python-приложение не будет закрыто до тех пор, пока в нем работает хотя бы один недемонический поток.
Пример:
def func(): for i in range(5): print(f"from child thread: {i}") sleep(0.5) th = Thread(target=func) th.start() print("App stop")
Вывод программы:
from child thread: 0 App stop from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4
Как вы можете видеть, приложение продолжает работать, даже после того, как главный поток завершился (сообщение: “App stop”).
Для того, чтобы потоки не мешали остановке приложения (т.е. чтобы они останавливались вместе с завершением работы программы) необходимо при создании объекта Thread аргументу daemon присвоить значение True, либо после создания потока, перед его запуском присвоить свойству deamon значение True. Изменим процесс создания потока в приведенной выше программе:
th = Thread(target=func, daemon=True)
Запустим ее, получим следующий результат:
from child thread: 0 App stop
Поток остановился вместе с остановкой приложения.
P.S.
Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта. Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.
Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas. Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге “Pandas. Работа с данными”.
Замечательные уроки, коротко и понятно излагаете важные вещи!
Жду следующие статьи, продолжайте в том же духе!
Спасибо Вам за эти уроки! Очень полезно.