Python. Урок 23. Потоки и процессы в Python. Часть 2. Синхронизация потоков

Для синхронизации доступа к ресурсам из нескольких потоков Python предоставляет набор объектов, каждый из которых обладает рядом особенностей, делающих их пригодными для решения некоторой группы специфических задач. В этом уроке будут рассмотрены: Lock– и RLock-объекты, условные переменные (Condition), семафоры (Semaphore), события (Event), таймеры (Timer) и барьеры (Barrier).

Синхронизация потоков

Начнем наш обзор с наиболее простого и, в то же время, общего по своим свойствам Lock-объекта.

Lock-объект

Lock-объект может находится в двух состояниях: захваченное (заблокированное) и не захваченное (не заблокированное, свободное). После создания он находится в свободном состоянии. Для работы с Lock-объектом используются методы acquire() и release().  Если Lock свободен, то вызов метода acquire() переводит его в заблокированное состояние. Повторный вызов acquire() приведет к блокировке инициировавшего это действие потока до тех пор, пока Lock не будет разблокирован каким-то другим потоком с помощью метода release(). Вывоз метода release() на свободном Lock-объекте приведет к выбросу исключения RuntimeError.

Метод acquire() имеет следующую сигнатуру:

acquire(blocking=True, timeout=-1)

Параметры метода:

  • blocking
    • Если параметр равен True, то при вызове на захваченном Lock-объекте выполнение потока остановится, после того как захват будет произведет, метод вернет True. Если параметр равен False, то при вызове на захваченном Lock-объекте поток не будет заблокирован и метод вернет False, если захват будет произведет, то вернет True.
  • timeout
    • Задает время, в течении которого поток будет находиться в заблокированном состоянии при попытке захватить уже занятый Lock-объект. Если в течении заданного времени поток не освободится, то метод вернет значение False.

При успешном захвате Lock-объекта, метод acquire() возвращает значение True.

У Lock-объекта также есть метод locked(), который возвращает True если объект захвачен, False в противном случае.

Мы уже успели познакомиться с Lock-объектом, когда изучали вопрос принудительного завершения работы потока в “Уроке 22. Процессы и потоки в Python. Часть 1”.

Освободить Lock-объект может любой поток (на обязательно тот, который вызвал acquire()).

Хорошей практикой при работе с Lock-объектами является помещение кода работы с разделяемым ресурсом в блоке try, а освобождать блокировку следует в finally:

lock_obj.acquire()
try:
    # Работа с разделяемым ресурсом
finally:
    lock_obj.release()

Lock-объекты поддерживают протокол менеджера контекста (см. “Урок 21. Работа с контекстным менеджером), это позволяет работать с ними через оператор with. Приведенный выше код с try…finally эквивалентен следующему:

with lock_obj:
    # Работа с разделяемым ресурсом

RLock-объект

В отличии от рассмотренного выше Lock-объекта RLock может освободить только тот поток, который его захватил. Повторный захват потоком уже захваченного RLock-объекта не блокирует его. RLock-объекты поддерживают возможность вложенного захвата, при этом освобождение происходит только после того, как был выполнен release() для внешнего acquire(). Сигнатуры и назначение методов release() и acquire() RLock-объектов совпадают с приведенными для Lock, но в отличии от него у RLock нет метода locked(). RLock-объекты поддерживают протокол менеджера контекста.

Условные переменные (threading.Condition)

Основное назначение условных переменных – это синхронизация работы потоков, которая предполагает ожидание готовности некоторого ресурса и оповещение об этом событии. Наиболее явно такой тип работы выражен в паттерне Producer-Consumer (Производитель – Потребитель). Условные переменные для организации работы внутри себя используют Lock- или RLock-объекты, захватом и освобождением которых управлять не придется, хотя и возможно, если возникнет такая необходимость.

Порядок работы с условными переменными выглядит так:

На стороне Consumer’а: проверить доступен ли ресурс, если нет, то перейти в режим ожидания с помощью метода wait(), и ожидать оповещение от Producer’а о том, что ресурс готов и с ним можно работать. Метод wait() может быть вызван с таймаутом, по истечении которого поток выйдет из состояния блокировки и продолжит работу.

На стороне Producer’а: произвести работы по подготовке ресурса, после того, как ресурс готов оповестить об этом ожидающие потоки с помощью методов notify() или notify_all(). Разница между ними в том, что notify() разблокирует только один поток (если он вызван без параметров), а notify_all() все потоки, которые находятся в режиме ожидания.

Ниже представлен пример работы с условной переменной.

from threading import Condition, Thread
from queue import Queue
from time import sleep


cv = Condition()
q = Queue()


# Consumer function for order processing
def order_processor(name):
   while True:
       with cv:
           # Wait while queue is empty
           while q.empty():
               cv.wait()

           try:
               # Get data (order) from queue
               order = q.get_nowait()
               print(f"{name}: {order}")

               # If get "stop" message then stop thread
               if order == "stop":                   
                   break

           except:
               pass

           sleep(0.1)


# Run order processors
Thread(target=order_processor, args=("thread 1",)).start()
Thread(target=order_processor, args=("thread 2",)).start()
Thread(target=order_processor, args=("thread 3",)).start()

# Put data into queue
for i in range(10):
   q.put(f"order {i}")

# Put stop-commands for consumers
for _ in range(3):
   q.put("stop")

# Notify all consumers
with cv:
   cv.notify_all()

В этом примере мы создаем функцию order_processor, которая может реализовывать в себе бизнес логику, например, обработку заказа. При этом, если она получает сообщение stop, то прекращает свое выполнение. В главном потоке мы создаем и запускаем три потока для обработки заказов. Запущенные потоки видят, что очередь пуста и “встают на блокировку” при вызове wait(). В главном потоке в очередь добавляются десять заказов и сообщения для остановки обработчиков, после этого вызывается метод notify_all() для оповещения всех заблокированных потоков о том, что данные для обработки есть в очереди.

При создании объекта Condition вы можете передать в конструктор объект Lock или RLock, с которым хотите работать. Перечислим методы объекта Condition с кратким описанием:

  • acquire(*args)
    • Захват объекта-блокировки.
  • release()
    • Освобождение объекта-блокировки.
  • wait(timeout=None)
    • Блокировка выполнения потока до оповещения о снятии блокировки. Через параметр timeout можно задать время ожидания оповещения о снятии блокировки. Если вызвать wait() на Условной переменной, у которой предварительно не был вызван acquire(), то будет выброшено исключение RuntimeError.
  • wait_for(predicate, timeout=None)
    • Метод позволяет сократить количество кода, которое нужно написать для контроля готовности ресурса и ожидания оповещения. Он заменяет собой следующую конструкцию:
while not predicate():
    cv.wait()
  • notify(n=1)
    • Снимает блокировку с остановленного методом wait() потока. Если необходимо разблокировать несколько потоков, то для этого следует передать их количество через аргумент n.
  • notify_all()
    • Снимает блокировку со всех остановленных методом wait() потоков.

Семафоры (threading.Semaphore)

Реализация классического семафора, предложенного Дейкстрой. Суть его идеи заключается в том, при каждом вызове метода acquire() происходит уменьшение счетчика семафора на единицу, а при вызове release() – увеличение. Значение счетчика не может быть меньше нуля, если на момент вызова acquire() его значение равно нулю, то происходит блокировка потока до тех пор, пока не будет вызван release().

Семафоры поддерживают протокол менеджера контекста.

Для работы с семафорами в Python есть класс Semaphore, при создании его объекта можно указать начальное значение счетчика через параметр value. Semaphore предоставляет два метода:

  • acquire(blocking=True, timeout=None)
    • Если значение внутреннего счетчика больше нуля, то счетчик уменьшается на единицу и метод возвращает True. Если значение счетчика равно нулю, то вызвавший данный метод поток блокируется, до тех пор, пока не будет кем-то вызван метод release(). Дополнительно при вызове метода можно указать параметры blocking и timeout, их назначение совпадает с acquire() для Lock.
  • release()
    • Увеличивает значение внутреннего счетчика на единицу.

Существует ещё один класс, реализующий алгоритм семафора BoundedSemaphore, в отличии от Semaphore, он проверяет, чтобы значение внутреннего счетчика было не больше того, что передано при создании объекта через аргумент value, если это происходит, то выбрасывается исключение ValueError.

С помощью семафоров удобно управлять доступом к ресурсу, который имеет ограничение на количество одновременных обращений к нему (например, количество подключений к базе данных и т.п.)

В качестве примера приведем программу, моделирующую продажу билетов: обслуживание одного клиента занимает одну секунду, касс всего три, клиентов пять. 

from threading import Thread, BoundedSemaphore
from time import sleep, time


ticket_office = BoundedSemaphore(value=3)


def ticket_buyer(number):
   start_service = time()
   with ticket_office:       
       sleep(1)       
       print(f"client {number}, service time: {time() - start_service}")


buyer = [Thread(target=ticket_buyer, args=(i,)) for i in range(5)]

for b in buyer:
   b.start()

Вывод программы:

client 0, service time: 1.0011110305786133
client 2, service time: 1.0013604164123535
client 1, service time: 1.001556158065796
client 3, service time: 2.002437114715576
client 4, service time: 2.0027763843536377

Как вы можете видеть, вначале обслуживание получили клиенты с номерами 0, 1, 2 и только после того, как кассы по продаже билетов освободились, были обслужены клиенты 3 и 4.

События (threading.Event)

События по своему назначению и алгоритму работы похожи на рассмотренные ранее условные переменные. Основная задача, которую они решают – это взаимодействие между потоками через механизм оповещения. Объект класса Event управляет внутренним флагом, который сбрасывается с помощью метода clear() и устанавливается методом set(). Потоки, которые используют объект Event для синхронизации блокируются при вызове метода wait(), если флаг сброшен.

Методы класса Event:

  • is_set()
    • Возвращает True если флаг находится в взведенном состоянии.
  • set()
    • Переводит флаг в взведенное состояние.
  • clear()
    • Переводит флаг в сброшенное состояние.
  • wait(timeout=None)
    • Блокирует вызвавший данный метод поток если флаг соответствующего Event-объекта находится в сброшенном состоянии. Время нахождения в состоянии блокировки можно задать через параметр timeout.

Пример работы с Event-объектом:

from threading import Thread, Event
from time import sleep, time


event = Event()


def worker(name: str):   
   event.wait()
   print(f"Worker: {name}")


# Clear event
event.clear()

# Create and start workers
workers = [Thread(target=worker, args=(f"wrk {i}",)) for i in range(5)]
for w in workers:
   w.start()

print("Main thread")

event.set()

Содержимое консоли после вызова приведенной Python-программы:

Main thread
Worker: wrk 1
Worker: wrk 2
Worker: wrk 3
Worker: wrk 4
Worker: wrk 0

Порядок пробуждения потоков при использовании объекта Event никак не регламентируется, поэтому вы можете видеть, что поток с номером 0 закончил работу последним, хотя был запущен первым.

Таймеры (threading.Timer)

Модуль threading предоставляет удобный инструмент для запуска задач по таймеру – класс Timer. При создании таймера указывается функция, которая будет выполнена, когда он сработает. Timer реализован как поток, является наследником от Thread, поэтому для его запуска необходимо вызвать start(), если необходимо остановить работу таймера, то вызовите cancel().

Конструктор класса Timer:

Timer(interval, function, args=None, kwargs=None)

Параметры:

  • interval
    • Количество секунд, по истечении которых будет вызвана функция function.
  • function
    • Функция, вызов которой нужно осуществить по таймеру.
  • args, kwargs
    • Аргументы функции function.

Методы класса Timer:

  • cancel()
    • Останавливает выполнение таймера

Пример работы с таймером:

from threading import Timer
from time import sleep, time

timer = Timer(interval=3,function=lambda: print("Message from Timer!"))
timer.start()

Барьеры (threading.Barrier)

Последний инструмент для синхронизации работы потоков, который мы рассмотрим является Barrier. Он позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.

Конструктор класса:

Barrier(parties, action=None, timeout=None)

Параметры:

  • parties
    • Количество потоков, которые будут работать в рамках барьера.
  • action
    • Определяет функцию, которая будет вызвана, когда потоки будут освобождены (достигнут барьера).
  • timeout
    • Таймаут, который будет использовать как значение по умолчанию для методов wait().

Свойства и методы класса:

  • wait(timeout=None)
    • Блокирует работу потока до тех пор, пока не будет получено уведомление либо не пройдет время указанное в timeout.
  • reset()
    • Переводит Barrier в исходное (пустое) состояние. Потокам, ожидающим уведомления, будет передано исключение BrokenBarrierError.
  • abort()
    • Останавливает работу барьера, переводит его в состояние “разрушен” (broken). Все текущие и последующие вызовы метода wait() будут завершены с ошибкой с выбросом исключения BrokenBarrierError.
  • parties
    • Количество потоков, которое нужно для достижения барьера.
  • n_waiting
    • Количество потоков, которое ожидает срабатывания барьера.
  • broken
    • Значение флага равное True указывает на то, что барьер находится в “разрушенном” состоянии.

Пример работы с классом Barrier:

from threading import Barrier, Thread
from time import sleep, time


br = Barrier(3)
store = []


def f1(x):
   print("Calc part1")
   store.append(x**2)
   sleep(0.5)
   br.wait()


def f2(x):
   print("Calc part2")
   store.append(x*2)
   sleep(1)
   br.wait()


Thread(target=f1, args=(3,)).start()
Thread(target=f2, args=(7,)).start()

br.wait()

print("Result: ", sum(store))

Результат работы программы:

Calc part1
Calc part2
Result:  23

Данный объект синхронизации может применяться в случаях когда необходимо дождаться результатов работы всех потоков (как в примере выше), либо для синхронизации процесса инициализации потоков, когда перед стартом их работы требуется, чтобы все потоки выполнили процедуру инициализации.

P.S.

Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта. Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.
Книга: Линейная алгебра на Python
Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas.  Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге Pandas. Работа с данными”.
Книга: Pandas. Работа с данными

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *