{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Потоки" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Что такое Thread (Поток)?\n", "Thread – это отдельный поток выполнения. Это означает, что в вашей программе могут работать две и более подпрограммы одновременно. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Для создания потоков мы будем использовать стандартный модуль threading." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import threading" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Пример многопоточной программы\n", "Пусть два потока параллельно выводят каждый в свой файл заданное число строк. Для начала нам понадобится функция, которая выполнит задуманный нами сценарий. Аргументами целевой функции будут число строк и имя текстового файла для записи." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def prescript(thefile, num):\n", " with open(thefile, 'w') as f:\n", " for i in range(num):\n", " if num > 500:\n", " f.write('ABC\\n')\n", " else:\n", " f.write('abc\\n')\n", "thread1 = threading.Thread(target=prescript, args=('f1.txt', 200,))\n", "thread2 = threading.Thread(target=prescript, args=('f2.txt', 1000,))\n", "thread1.start()\n", "thread2.start()\n", "#thread1.join()\n", "#thread2.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Метод **start()** запускает ранее созданный поток. Метод **join()** останавливает поток, когда тот выполнит свои задачи и освобождает ресурсы." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Thread(group, target, name, args, kwargs)\n", "Здесь **group** - группа потоков (пока не используется, должен быть равен *None*), **target** - объект, который будет вызван в методе **run()**, **name** - имя потока, **args** и **kwargs** - последовательность и словарь позиционных и именованных параметров (соответственно) для вызова заданного в параметре **target** объекта. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Стандартные методы работы с потоками\n", "- current_thread() — смотрим, какой поток вызвал функцию;\n", "- active_count() — считаем работающие в данный момент экземпляры класса Thread;\n", "- enumerate() — получаем список работающих потоков.\n", "\n", "Ещё можно управлять потоком через методы класса:\n", "- is_alive() — спрашиваем поток: «Жив ещё?» — получаем true или false;\n", "- getName() — узнаём имя потока;\n", "- setName(any_name) — даём потоку имя;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "У каждого потока, пока он работает, есть уникальный идентификационный номер, который хранится в переменной **ident**." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "20812\n" ] } ], "source": [ "print(thread1.ident)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Timer\n", "Отсрочить операции в вызываемых потоком функциях можно с помощью таймера. В инициализаторе объектов класса **Timer** всего два аргумента — время ожидания в секундах и функция, которую нужно в итоге выполнить:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Waiting...\n", "The timer has done its job!\n" ] } ], "source": [ "import threading\n", "print (\"Waiting...\")\n", "def timer_test():\n", " print (\"The timer has done its job!\")\n", "tim = threading.Timer(5.0, timer_test)\n", "tim.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Простая блокировка в Python\n", "Взаимоисключение (mutual exception, кратко - mutex) - простейшая блокировка, которая на время работы потока с ресурсом закрывает последний от других обращений. Реализуют это с помощью класса **Lock**." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import threading\n", "mutex = threading.Lock()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Мы создали блокировку с именем mutex. Теперь её можно ставить и снимать методами **acquire()** и **release()**:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "resource = 0\n", "\n", "def thread_safe_function():\n", " global resource\n", " for i in range(1000000):\n", " mutex.acquire()\n", " # Делаем что-то с переменной resource\n", " mutex.release()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Обратите внимание: обойти простую блокировку не может даже поток, который её активировал. Он будет заблокирован, если попытается повторно захватить ресурс, который удерживает." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### С блокировками и без. Пример–сравнение\n", "Что происходит, когда два потока бьются за ресурсы, и как при этом сохранить целостность данных?\n", "\n", "Возьмём простейшие операции инкремента и декремента (увеличения и уменьшения числа). В роли общих ресурсов выступят глобальные числовые переменные: назовём их protected_resource и unprotected_resource. К каждой обратятся по два потока: один будет в цикле увеличивать значение с 0 до 500 000, другой — уменьшать до 0. Первую переменную обработаем с блокировками, а вторую — без." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import threading\n", "\n", "protected_resource = 0\n", "unprotected_resource = 0\n", "\n", "NUM = 500000\n", "mutex = threading.Lock()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Потокобезопасный инкремент\n", "def safe_plus():\n", " global protected_resource\n", " for i in range(NUM):\n", " # Ставим блокировку\n", " mutex.acquire()\n", " x = protected_resource\n", " x += 1\n", " protected_resource = x\n", " mutex.release()\n", "\n", "# Потокобезопасный декремент\n", "def safe_minus():\n", " global protected_resource\n", " for i in range(NUM):\n", " mutex.acquire()\n", " x = protected_resource\n", " x -= 1\n", " protected_resource = x\n", " mutex.release()\n", "\n", "# То же, но без блокировки\n", "def risky_plus():\n", " global unprotected_resource\n", " for i in range(NUM):\n", " x = unprotected_resource\n", " x += 1\n", " unprotected_resource = x\n", "\n", "def risky_minus():\n", " global unprotected_resource\n", " for i in range(NUM):\n", " x = unprotected_resource\n", " x -= 1\n", " unprotected_resource = x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Создадим 4 потока, которые будут выполнять функции с блокировками и без:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Результат при работе с блокировкой 0\n", "Результат без блокировки 0\n" ] } ], "source": [ "thread1 = threading.Thread(target = safe_plus)\n", "thread2 = threading.Thread(target = safe_minus)\n", "thread3 = threading.Thread(target = risky_plus)\n", "thread4 = threading.Thread(target = risky_minus)\n", "thread1.start()\n", "thread2.start()\n", "thread3.start()\n", "thread4.start()\n", "thread1.join()\n", "thread2.join()\n", "thread3.join()\n", "thread4.join()\n", "print (\"Результат при работе с блокировкой %s\" % protected_resource)\n", "print (\"Результат без блокировки %s\" % unprotected_resource)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Запускаем код несколько раз подряд и видим, что полученное без блокировки значение меняется случайным образом. При использовании блокировки всё работает последовательно: сначала значение растёт, затем — уменьшается, и в итоге получаем 0. А потоки thread3 и thread4 работают без блокировки и наперебой обращаются к глобальной переменной. Каждый выполняет столько операций своего цикла, сколько успевает за время активности. Поэтому при каждом запуске получаем случайные числа." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Как избежать взаимных блокировок?\n", "Следите, чтобы у нескольких блокировок не было шанса сработать одновременно. Иначе одна заглушка перекроет один поток, другая — другой, и может случиться взаимная блокировка — тупик (deadlock). Это ситуация, когда ни один поток не имеет права действовать и программа зависает или рушится.\n", "\n", "Если есть «захват» мьютекса, ничто не должно помешать последующему «высвобождению». Это значит, что release() должен срабатывать, как только блокировка становится не нужна.\n", "\n", "Пишите код так, чтобы блокировки снимались, даже если функция выбрасывает исключение и завершает работу нештатно. Подстраховаться можно с помощью конструкции try-except-finally:\n", "\n", "try:\n", " mutex.acquire()\n", " # Ваш код...\n", "\n", "except SomethingGoesWrong:\n", " # Обрабатываем исключения\n", "\n", "finally:\n", " # Ещё код\n", " mutex.release()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.9" } }, "nbformat": 4, "nbformat_minor": 4 }