multiprocessing
Модуль multiprocessing в Python предоставляет возможности для работы с многопроцессорным программированием, позволяя создавать и управлять процессами, обмениваться данными между процессами, использовать пулы процессов и другие механизмы для параллельного выполнения задач.
Некоторые ключевые функции и классы модуля multiprocessing:
- Process: Класс для создания и управления процессами. Позволяет запускать функции в новом процессе.
- Pool: Класс, предоставляющий пул процессов для выполнения задач параллельно. Позволяет управлять пулом процессов и распределять задачи между процессами.
- Queue: Класс для обмена данными между процессами через очередь, обеспечивая безопасность при работе с разделяемыми данными.
- Manager: Класс для управления разделяемыми объектами между процессами. Позволяет создавать разделяемые списки, словари, очереди и другие.
- Lock, Event, Condition, Semaphore: Классы для синхронизации процессов и предотвращения состязаний (race conditions) при доступе к общим ресурсам.
- Pipe: Механизм для обмена данными между двумя процессами через двусторонний канал.
- cpu_count(): Функция для возвращения количества доступных в системе процессоров (ядер).
Использование модуля multiprocessing позволяет эффективно использовать ресурсы многопроцессорной системы, ускорить выполнение задач и реализовать параллельное выполнение вычислений. Однако следует учитывать особенности многопроцессорного программирования, такие как управление разделяемыми данными, синхронизация процессов и предотвращение возникновения конфликтов при доступе к общим ресурсам.
Так же сразу необходимо вспомнить о популярной функции map(), в multiprocessing.Pool, которая используется для распределения задач на выполнение между процессами в пуле. Она позволяет применить указанную функцию к каждому элементу входного списка последовательно или параллельно в нескольких процессах. Затем map() ожидает завершения всех задач и возвращает список результатов в том же порядке, в котором были переданы входные данные.
Пример использования map() с процессами из пула:
from multiprocessing import Pool
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Создание пула из 4 процессов
with Pool(processes=4) as pool:
# Входной список значений
values = [1, 2, 3, 4, 5]
# Применение функции square к каждому элементу в списке параллельно
results = pool.map(square, values)
print(results)
В приведенном примере функция square() применяется к каждому элементу входного списка values параллельно с использованием четырех процессов в пуле. После завершения выполнения всех задач метод map() возвращает список с результатами применения функции к каждому элементу входного списка.
map() удобно использовать для обработки данных в пуле процессов, когда требуется распределить задачи на выполнение параллельно и получить результаты после завершения всех задач.
Если же мы не знаем количество доступных ядер в нашей системе, то код можно написать с использованием упомянутой выше функции cpu_count():
from multiprocessing import Pool, cpu_count
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Создание пула из 4 процессов
with Pool(processes=cpu_count()) as pool:
# Входной список значений
values = [1, 2, 3, 4, 5]
# Применение функции square к каждому элементу в списке параллельно
results = pool.map(square, values)
print(results)
threading
Модуль threading в Python предоставляет инструменты для работы с потоками выполнения (threads) в многопоточном программировании. Потоки позволяют выполнять несколько задач в одном процессе параллельно, что помогает улучшить отзывчивость программы и увеличить общую производительность.
Некоторые ключевые функции и классы модуля threading:
- Thread: Класс для создания и управления потоками выполнения. Позволяет запускать функции в новом потоке.
- Lock, Event, Condition, Semaphore: Классы для синхронизации потоков и предотвращения состязаний (race conditions) при доступе к общим ресурсам.
- Timer: Класс для выполнения функции через определенное время.
- Barrier: Класс для организации точек синхронизации, где потоки могут остановиться и дождаться друг друга.
- local: Класс для хранения данных в потоке-локальном хранилище, доступном только в рамках данного потока.
- enumerate(): Функция для получения списка всех активных потоков в программе.
- current_thread(): Функция для получения объекта текущего исполняющегося потока.
Использование модуля threading позволяет создавать и управлять потоками выполнения в Python, что полезно при реализации параллельных задач и улучшении обработки задач, которые могут быть выполнены независимо друг от друга. При работе с потоками важно учитывать потенциальные проблемы многопоточности, такие как состязания при доступе к общим ресурсам или возможные блокировки, и применять синхронизацию для избежания этих проблем.
Теперь для решения той же задачи используем модуль threading и создадим потоки для вызова функции square параллельно.
Вот переписанный вариант функции с использованием модуля threading:
import threading
# Функция, которая будет применена к каждому элементу
def square(n):
return n * n
if __name__ == '__main__':
# Входной список значений
values = [1, 2, 3, 4, 5]
# Список для хранения результатов
results = []
lock = threading.Lock() # Создаем объект блокировки
# Функция для выполнения в потоке
def worker(value):
result = square(value)
with lock: # Блокируем доступ к общему ресурсу results
results.append(result)
# Создание и запуск потоков для вызова функции square
threads = []
for value in values:
thread = threading.Thread(target=worker, args=(value,))
thread.start()
threads.append(thread)
# Ожидание завершения всех потоков
for thread in threads:
thread.join()
print(results)
Этот код создает потоки для вызова функции square параллельно для каждого элемента входного списка values. Но с использованием внутренней функции worker, для исключения гонки данных и синхронизации доступа к общему ресурсу (в данном случае списка results). Для этого используется блокировка (lock) из модуля threading. Каждый поток выполняет функцию square для своего элемента и добавляет результат в список results. После запуска всех потоков, программa ожидает окончания выполнения каждого потока с помощью метода join(). В итоге, результаты будут выведены на экран.
Класс threading.Thread из примера принимает следующие аргументы при создании экземпляра:
- group: Этот аргумент предназначен для совместимости со стандартом Java threading API, но он не используется в Python. Всегда должен быть None.
- target: Это функция, которая будет выполнена в отдельном потоке.
- args: Это кортеж аргументов, которые будут переданы в функцию target.
- kwargs: Это словарь аргументов ключевого слова, который будет передан в функцию target.
- daemon: Этот флаг указывает, является ли поток демоническим. Демонические потоки завершаются автоматически, когда все программы завершают работу. По умолчанию False.
Есть и другие аргументы, которые threading.Thread принимает, но они обычно не используются повседневно.
В каких случаях нужно использовать threading, а в каких multiprocessing?
Использование модуля threading и multiprocessing в Python зависит от конкретной задачи и требований приложения. Вот несколько общих рекомендаций о том, в каких случаях лучше использовать каждый из этих подходов:
Использование модуля threading:
- Потоки подходят для задач, которые блокируются часто (например, ввод-вывод операции), так как потоки разделяют один процесс и общую память.
- Потоки имеют более низкий порог создания и уничтожения, поэтому их удобно использовать для коротких и быстрых операций.
- Если задача требует обмена данными между потоками без каких-либо проблем с синхронизацией процессов
Использование модуля multiprocessing:
- Мультипроцессинг особенно полезен в тех случаях, когда у вас есть задачи, которые критически нагружают процессор и не блокируются.
- Если вам нужно обрабатывать большой объем данных параллельно и каждая задача требует отдельного процесса.
- Когда программа работает на многопроцессорной системе и вы хотите использовать все ядра процессора на максимум.
В целом, при выборе между threading и multiprocessing стоит учитывать, что потоки используют общую память и могут работать быстрее, но требуют больше внимания к синхронизации и работают в рамках одного процесса. Мультипроцессинг, в свою очередь, работает в разных процессах и предпочтительнее при выполнении вычислительно интенсивных задач или обработке больших объемов данных.
Хороший пример
Пример выше так себе пример, честно говоря. И для демонстрации преимущества обоих модулей перед последовательным выполнением программы я покажу другой. Задача: скачать информацию о 20 персонажах «Звездных войн» из онлайн базы и записать в базу данных SQLite3.
Для этого я напишу функцию, которая будет получать json-объект с характеристиками персонажа и вызывать функцию для записи информации в нашу базу. И, соответственно, функцию, которая записывает полученную информацию в нашу базу. А так же еще три функции с последовательным выполнением первой функции, с использованием пула потоков и пула процессов.
Первая функция, которая получает данные, принимает лишь один аргумент- целое число. Так как к используемому API мы всегда делаем одинаковый запрос, меняя только ID персонажа: ttps://swapi.dev/api/peple/целое_число/.
Кроме того, в написанном мной коде используется модули logging и time для замера времени выполнения каждой функции и вывода этой информации. Вы можете использовать обычные принты, если хотите. И еще в данном случае я не использую модуль threading для работы с потоками, вместо него ThreadPool из multiprocessing.pool.
Пишем импорты:
import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool
Настраиваем логирование:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Функция получения данных:
def get_peoples_data(number: int) -> None:
url = f"https://swapi.dev/api/people/{number}/"
response = requests.get(url)
data = response.json()
database_manage(data)
Функция записи в базу данных:
def database_manage(data: dict) -> None:
conn = sqlite3.connect('starwarscharacters.db')
cursor = conn.cursor()
cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
table_exists = cursor.fetchone()
if not table_exists:
cursor.execute('''CREATE TABLE characters
(name text,
height integer,
mass integer,
haircolor text,
skincolor text,
eyecolor text,
birthyear text,
gender text)''')
try:
cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
VALUES (?,?,?,?,?,?,?,?)''',
(data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
data['eye_color'], data['birth_year'], data['gender']))
except:
pass
finally:
conn.commit()
conn.close()
Функция последовательного выполнения с замером времени и выводом лога:
def sequential_approach():
start = time.time()
input_value = [i for i in range(1, 21)]
for inp in input_value:
get_peoples_data(inp)
end = time.time()
logger.info(f'Time taken in seconds for sequential - {end - start}')
Функция с пулом процессов:
def high_load_map():
start = time.time()
input_value = [i for i in range(1, 21)]
with Pool(processes=cpu_count()) as pool:
pool.map(get_peoples_data, input_value)
end = time.time()
logger.info(f'Time taken in seconds - {end - start}')
Функция с пулом потоков:
def execution_with_threadpool():
pool = ThreadPool(processes=cpu_count() * 5)
input_value = [i for i in range(1, 21)]
start = time.time()
pool.map(get_peoples_data, input_value)
pool.close()
pool.join()
end = time.time()
logger.info(f'Time taken in seconds with threadpool - {end - start}')
Пул потоков тут создается с помощью ThreadPool(processes=cpu_count() * 5), где число потоков задается как умножение количества доступных ядер ЦП на 5. То есть если у вас есть 4 ядра CPU, то будет создан пул из 20 потоков, что должно обеспечить максимальную скорость выполнения некоторых задач. Однако, забегая вперед, скажу, что эта функция оказалась медленнее функции с пулом процессов.
Ну и стандартная финалочка с вызовом функций:
if __name__ == '__main__':
#sequential_approach()
high_load_map()
#execution_with_threadpool()
Так как каждая из этих функций по итогу выполнения приводит к записи в одну базу данных, я вызывал их поочередно, комментруя остальные.
Вот так выглядит код целиком.
import time
import sqlite3
import logging
import requests
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_peoples_data(number: int) -> None:
url = f"https://swapi.dev/api/people/{number}/"
response = requests.get(url)
data = response.json()
database_manage(data)
def database_manage(data: dict) -> None:
conn = sqlite3.connect('starwarscharacters.db')
cursor = conn.cursor()
cursor.execute('''SELECT name FROM sqlite_master WHERE type='table' AND name='characters' ''')
table_exists = cursor.fetchone()
if not table_exists:
cursor.execute('''CREATE TABLE characters
(name text,
height integer,
mass integer,
haircolor text,
skincolor text,
eyecolor text,
birthyear text,
gender text)''')
try:
cursor.execute('''INSERT INTO characters (name, height, mass, haircolor, skincolor, eyecolor, birthyear, gender)
VALUES (?,?,?,?,?,?,?,?)''',
(data['name'], data['height'], data['mass'], data['hair_color'], data['skin_color'],
data['eye_color'], data['birth_year'], data['gender']))
except:
pass
finally:
conn.commit()
conn.close()
def sequential_approach():
start = time.time()
input_value = [i for i in range(1, 21)]
for inp in input_value:
get_peoples_data(inp)
end = time.time()
logger.info(f'Time taken in seconds for sequential - {end - start}')
def high_load_map():
start = time.time()
input_value = [i for i in range(1, 21)]
with Pool(processes=cpu_count()) as pool:
pool.map(get_peoples_data, input_value)
end = time.time()
logger.info(f'Time taken in seconds - {end - start}')
def execution_with_threadpool():
pool = ThreadPool(processes=cpu_count() * 5)
input_value = [i for i in range(1, 21)]
start = time.time()
pool.map(get_peoples_data, input_value)
pool.close()
pool.join()
end = time.time()
logger.info(f'Time taken in seconds with threadpool - {end - start}')
if __name__ == '__main__':
#sequential_approach()
high_load_map()
#execution_with_threadpool()
Не буду приводить наглядные результаты исследования скорости всех функций, какая из них оказалась самой шустрой вы уже и так поняли. А вот почему, я ответить не могу, так как сам всего лишь изучаю этот вопрос. Спасибо за внимание.