Table of Contents
URL: https://www.progressiverobot.com/how-to-use-threadpoolexecutor-in-python-3-pt/
*O autor selecionou a COVID-19 Relief Fund para receber uma doação como parte do programa Write for DOnations.*
Introdução
Os _threads_ em Python são uma forma de paralelismo que permitem que seu programa execute vários procedimentos ao mesmo tempo. O paralelismo em Python também pode ser alcançado usando vários processos, mas os threads são particularmente adequados para acelerar aplicativos que envolvam quantidades significativas de E/S (entrada/saída).
Alguns exemplo de operações limitadas por E/S incluem realizar solicitações Web e ler dados de arquivos. Em contraste com as operações limitadas por E/S, as operações limitadas por CPU (como realizar operações matemáticas com a biblioteca padrão do Python) não serão tão beneficiadas com os threads em Python.
O Python 3 inclui o utilitário ThreadPoolExecutor para executar o código em um thread.
Neste tutorial, usaremos o ThreadPoolExecutor para fazer solicitações de rede de forma conveniente. Definiremos uma função adequada para a invocação dentro de threads, usaremos o ThreadPoolExecutor para executar essa função e processaremos os resultados dessas execuções.
Para este tutorial, faremos solicitações de rede para verificar a existência de páginas da Wikipédia.
Nota: o fato de as operações limitadas por E/S se beneficiarem mais dos threads do que as operações limitadas por CPU tem origem em uma idiossincrasia em Python chamada _global interpreter lock_. Saiba mais sobre o global interpreter lock do Python na documentação oficial do Python.
Pré-requisitos
Para aproveitar ao máximo este tutorial, é recomendado ter alguma familiaridade com a programação em Python e a um ambiente de programação local do Python com requests (solicitações) instaladas.
Você pode revisar estes tutoriais para as informações básicas necessárias:
- Para instalar o pacote
requestsem seu ambiente de programação local do Python, execute este comando:
pip install --user requests==2.23.0
Passo 1 — Definindo uma função para ser executada em threads
Vamos começar definindo uma função que gostaríamos de executar com a ajuda dos threads.
Usando o nano ou seu editor de texto/ambiente de desenvolvimento preferido, abra este arquivo:
nano wiki_page_function.py
Para este tutorial, vamos escrever uma função que determina se uma página da Wikipédia existe ou não:
[label wiki_page_function.py]
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
A função get_wiki_page_existence aceita dois argumentos: uma URL de uma página da Wikipédia (wiki_page_url) e um número de segundos timeout para se esperar por uma resposta dessa URL.
A get_wiki_page_existence usa o pacote requests para fazer uma solicitação Web a essa URL. Dependendo do código de status da response (resposta) HTTP, uma string que descreve se a página existe ou não é retornada. Códigos de status diferentes representam resultados diferentes de uma solicitação HTTP. Este procedimento pressupõe que um código de status 200 de "sucesso" significa que a página da Wikipédia existe e um código de status 404 "não encontrado" significa que a página da Wikipédia não existe.
Conforme descrito na seção Pré-requisitos, você precisará do pacote requests instalado para executar esta função.
Vamos tentar executar a função adicionando a url e a chamada de função após a função get_wiki_page_existence:
[label wiki_page_function.py]
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
Uma vez adicionado o código, salve e feche o arquivo.
Se executarmos este código:
python wiki_page_function.py
Veremos um resultado como o seguinte:
[secondary_label Output]
https://en.wikipedia.org/wiki/Ocean - exists
Chamar a função get_wiki_page_existence com uma página da Wikipédia válida retorna uma string que confirma que a página, de fato, existe.
[warning]
Aviso: em geral, não é seguro compartilhar o estado ou objetos Python entre threads sem tomar cuidados especiais para evitar erros de simultaneidade. Ao definir uma função a ser executada em um thread, é melhor definir uma função que execute uma tarefa única e não compartilhe ou publique o estado em outros threads. A get_wiki_page_existence é um exemplo de uma função como essa.
Passo 2 — Usando o ThreadPoolExecutor para executar uma função em threads
Agora que temos uma função adequada à invocação com threads, podemos usar o ThreadPoolExecutor para realizar várias invocações dessa função de maneira conveniente.
Vamos adicionar o seguinte código destacado ao seu programa em wiki_page_function.py:
[label wiki_page_function.py]
import requests
<^>import concurrent.futures<^>
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
<^>wiki_page_urls = [<^>
<^>"https://en.wikipedia.org/wiki/Ocean",<^>
<^>"https://en.wikipedia.org/wiki/Island",<^>
<^>"https://en.wikipedia.org/wiki/this_page_does_not_exist",<^>
<^>"https://en.wikipedia.org/wiki/Shark",<^>
<^>]<^>
<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>
<^>futures = []<^>
<^>for url in wiki_page_urls:<^>
<^>futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))<^>
<^>for future in concurrent.futures.as_completed(futures):<^>
<^>print(future.result())<^>
Vamos dar uma olhada em como esse código funciona:
- O
concurrent.futuresé importado para nos dar acesso aoThreadPoolExecutor.
- A declaração
withé usada para criar umexecutorde instância doThreadPoolExecutorque irá esvaziar os threads imediatamente após a conclusão.
- Quatro tarefas são
submitted(submetidas) aoexecutor: uma para cada uma das URLs na listawiki_page_urls.
- Cada chamada a
submitretorna uma instânciaFutureque está armazenada na listafutures.
- A função
as_completedespera cada chamadaget_wiki_page_existenceFutureser concluída para podermos imprimir seu resultado.
Se executarmos esse programa novamente com o seguinte comando:
python wiki_page_function.py
Veremos um resultado como o seguinte:
[secondary_label Output]
https://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Esse resultado faz sentido: 3 das URLs são páginas válidas da Wikipédia, e uma delas, a this_page_does_not_exist, não é. Observe que seu resultado pode estar ordenado de maneira diferente do que este. A função concurrent.futures.as_completed nesse exemplo retorna resultados assim que eles estiverem disponíveis, independentemente da ordem em que as tarefas foram enviadas.
Passo 3 — Processando exceções de execuções de funções em threads
No passo anterior, get_wiki_page_existence retornou com sucesso um valor para todas as nossas invocações. Neste passo, veremos que o ThreadPoolExecutor também pode apurar exceções geradas em invocações de função em threads.
Vamos considerar o seguinte bloco de código de exemplo:
[label wiki_page_function.py]
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>
<^>futures = []<^>
<^>for url in wiki_page_urls:<^>
<^>futures.append(<^>
<^>executor.submit(<^>
<^>get_wiki_page_existence, wiki_page_url=url, timeout=0.00001<^>
<^>)<^>
<^>)<^>
<^>for future in concurrent.futures.as_completed(futures):<^>
<^>try:<^>
<^>print(future.result())<^>
<^>except requests.ConnectTimeout:<^>
<^>print("ConnectTimeout.")<^>
Este bloco de código é quase idêntico ao que usamos no Passo 2, mas possui duas diferenças chave:
- Agora, passamos
timeout=0.001paraget_wiki_page_existence. Como o pacoterequestsnão será capaz de completar sua solicitação Web à Wikipédia em0.00001segundos, ele criará uma exceçãoConnectTimeout.
- Nós capturamos exceções
ConnectTimeoutgeradas pelofuture.result()e imprimimos uma string cada vez que fazemos isso.
Se executarmos o programa novamente, veremos o seguinte resultado:
[secondary_label Output]
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
Quatro mensagens ConnectTimeout são impressas — uma para cada uma de nossas quatro wiki_page_urls, uma vez que nenhuma delas pôde ser concluída em 0.00001 segundos e cada uma das quatro chamadas get_wiki_page_existence gerou a exceção ConnectTimeout.
Agora, você viu que se uma chamada de função submetida a um ThreadPoolExecutor gera uma exceção, então essa exceção pode ser apurada normalmente chamando o Future.result. Chamar o Future.result em todas as suas invocações enviadas garante que seu programa não perca nenhuma exceção gerada em sua função em threads.
Passo 4 — Comparando o tempo de execução com e sem threads
Agora, vamos verificar se usar o ThreadPoolExecutor realmente torna seu programa mais rápido.
Primeiro, vamos cronometrar o get_wiki_page_existence se executarmos ele sem threads:
[label wiki_page_function.py]
<^>import time<^>
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
<^>wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]<^>
<^>print("Running without threads:")<^>
<^>without_threads_start = time.time()<^>
<^>for url in wiki_page_urls:<^>
<^>print(get_wiki_page_existence(wiki_page_url=url))<^>
<^>print("Without threads time:", time.time() - without_threads_start)<^>
Nesse exemplo de código, chamamos nossa função get_wiki_page_existence com cinquenta URLs de páginas diferentes da Wikipedia uma a uma. Usamos a função time.time() para imprimir o número de segundos que nosso programa leva para ser executado.
Se executarmos esse código novamente como antes, veremos um resultado como o seguinte:
[secondary_label Output]
Running without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
As entradas 2-47 nesse resultado foram omitidas para maior concisão.
O número de segundos impressos depois de Without threads time será diferente quando você executar o código em sua máquina – não tem problema, você só está recebendo um número que servirá como base para se comparar com uma solução que usa o ThreadPoolExecutor. Neste caso, foram ~5.803 segundos.
Vamos executar as mesmas cinquenta URLs da Wikipedia através do get_wiki_page_existence, mas desta vez usando o ThreadPoolExecutor:
[label wiki_page_function.py]
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
<^>print("Running threaded:")<^>
<^>threaded_start = time.time()<^>
<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>
<^>futures = []<^>
<^>for url in wiki_page_urls:<^>
<^>futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))<^>
<^>for future in concurrent.futures.as_completed(futures):<^>
<^>print(future.result())<^>
<^>print("Threaded time:", time.time() - threaded_start)<^>
O código é o mesmo que criamos no Passo 2, apenas com a adição de algumas declarações de impressão que nos mostram o número de segundos que o nosso código leva para ser executado.
Se executarmos o programa novamente, veremos o seguinte:
[secondary_label Output]
Running threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Novamente, o número de segundos impressos após Threaded time será diferente em seu computador (assim como a ordem do seu resultado).
Agora, compare o tempo de execução para obter as cinquenta URLs de páginas da Wikipédia com e sem threads.
Na máquina usada neste tutorial, o processo sem threads levou ~5.803 segundos e com threads levou ~1.220 segundos. Nosso programa foi executado de maneira significativamente mais rápida com threads.
Conclusão
Neste tutorial, você aprendeu como usar o utilitário ThreadPoolExecutor em Python 3 para executar eficientemente códigos limitados por E/S. Você criou uma função adequada à invocação dentro de threads, aprendeu como recuperar tanto o resultado quanto as exceções de execuções em threads dessa função e observou o ganho de desempenho obtido usando threads.
A partir daqui, você pode aprender mais sobre outras funções de simultaneidade oferecidas pelo módulo concurrent.futures.