URL: https://www.progressiverobot.com/how-to-use-threadpoolexecutor-in-python-3-pt/

*O autor selecionou a COVID-19 Relief Fund​​​​​ para receber uma doação como parte do programa Write for DOnations.*

Introdução

Os _threads_ em Python são uma forma de paralelismo que permitem que seu programa execute vários procedimentos ao mesmo tempo. O paralelismo em Python também pode ser alcançado usando vários processos, mas os threads são particularmente adequados para acelerar aplicativos que envolvam quantidades significativas de E/S (entrada/saída).

Alguns exemplo de operações limitadas por E/S incluem realizar solicitações Web e ler dados de arquivos. Em contraste com as operações limitadas por E/S, as operações limitadas por CPU (como realizar operações matemáticas com a biblioteca padrão do Python) não serão tão beneficiadas com os threads em Python.

O Python 3 inclui o utilitário ThreadPoolExecutor para executar o código em um thread.

Neste tutorial, usaremos o ThreadPoolExecutor para fazer solicitações de rede de forma conveniente. Definiremos uma função adequada para a invocação dentro de threads, usaremos o ThreadPoolExecutor para executar essa função e processaremos os resultados dessas execuções.

Para este tutorial, faremos solicitações de rede para verificar a existência de páginas da Wikipédia.

Nota: o fato de as operações limitadas por E/S se beneficiarem mais dos threads do que as operações limitadas por CPU tem origem em uma idiossincrasia em Python chamada _global interpreter lock_. Saiba mais sobre o global interpreter lock do Python na documentação oficial do Python.

Pré-requisitos

python illustration for: Pré-requisitos

Para aproveitar ao máximo este tutorial, é recomendado ter alguma familiaridade com a programação em Python e a um ambiente de programação local do Python com requests (solicitações) instaladas.

Você pode revisar estes tutoriais para as informações básicas necessárias:

				
					
pip install --user requests==2.23.0

				
			

Passo 1 — Definindo uma função para ser executada em threads

Vamos começar definindo uma função que gostaríamos de executar com a ajuda dos threads.

Usando o nano ou seu editor de texto/ambiente de desenvolvimento preferido, abra este arquivo:

				
					
nano wiki_page_function.py

				
			

Para este tutorial, vamos escrever uma função que determina se uma página da Wikipédia existe ou não:

				
					
[label wiki_page_function.py]

import requests



def get_wiki_page_existence(wiki_page_url, timeout=10):

 response = requests.get(url=wiki_page_url, timeout=timeout)



 page_status = "unknown"

 if response.status_code == 200:

 page_status = "exists"

 elif response.status_code == 404:

 page_status = "does not exist"



 return wiki_page_url + " - " + page_status

				
			

A função get_wiki_page_existence aceita dois argumentos: uma URL de uma página da Wikipédia (wiki_page_url) e um número de segundos timeout para se esperar por uma resposta dessa URL.

A get_wiki_page_existence usa o pacote requests para fazer uma solicitação Web a essa URL. Dependendo do código de status da response (resposta) HTTP, uma string que descreve se a página existe ou não é retornada. Códigos de status diferentes representam resultados diferentes de uma solicitação HTTP. Este procedimento pressupõe que um código de status 200 de "sucesso" significa que a página da Wikipédia existe e um código de status 404 "não encontrado" significa que a página da Wikipédia não existe.

Conforme descrito na seção Pré-requisitos, você precisará do pacote requests instalado para executar esta função.

Vamos tentar executar a função adicionando a url e a chamada de função após a função get_wiki_page_existence:

				
					
[label wiki_page_function.py]

. . .

url = "https://en.wikipedia.org/wiki/Ocean"

print(get_wiki_page_existence(wiki_page_url=url))

				
			

Uma vez adicionado o código, salve e feche o arquivo.

Se executarmos este código:

				
					
python wiki_page_function.py

				
			

Veremos um resultado como o seguinte:

				
					
[secondary_label Output]

https://en.wikipedia.org/wiki/Ocean - exists

				
			

Chamar a função get_wiki_page_existence com uma página da Wikipédia válida retorna uma string que confirma que a página, de fato, existe.

[warning]

Aviso: em geral, não é seguro compartilhar o estado ou objetos Python entre threads sem tomar cuidados especiais para evitar erros de simultaneidade. Ao definir uma função a ser executada em um thread, é melhor definir uma função que execute uma tarefa única e não compartilhe ou publique o estado em outros threads. A get_wiki_page_existence é um exemplo de uma função como essa.

Passo 2 — Usando o ThreadPoolExecutor para executar uma função em threads

Agora que temos uma função adequada à invocação com threads, podemos usar o ThreadPoolExecutor para realizar várias invocações dessa função de maneira conveniente.

Vamos adicionar o seguinte código destacado ao seu programa em wiki_page_function.py:

				
					
[label wiki_page_function.py]

import requests

<^>import concurrent.futures<^>



def get_wiki_page_existence(wiki_page_url, timeout=10):

 response = requests.get(url=wiki_page_url, timeout=timeout)



 page_status = "unknown"

 if response.status_code == 200:

 page_status = "exists"

 elif response.status_code == 404:

 page_status = "does not exist"



 return wiki_page_url + " - " + page_status



<^>wiki_page_urls = [<^>

 <^>"https://en.wikipedia.org/wiki/Ocean",<^>

 <^>"https://en.wikipedia.org/wiki/Island",<^>

 <^>"https://en.wikipedia.org/wiki/this_page_does_not_exist",<^>

 <^>"https://en.wikipedia.org/wiki/Shark",<^>

<^>]<^>

<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>

 <^>futures = []<^>

 <^>for url in wiki_page_urls:<^>

 <^>futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))<^>

 <^>for future in concurrent.futures.as_completed(futures):<^>

 <^>print(future.result())<^>

				
			

Vamos dar uma olhada em como esse código funciona:

  • O concurrent.futures é importado para nos dar acesso ao ThreadPoolExecutor.
  • A declaração with é usada para criar um executor de instância do ThreadPoolExecutor que irá esvaziar os threads imediatamente após a conclusão.
  • Quatro tarefas são submitted (submetidas) ao executor: uma para cada uma das URLs na lista wiki_page_urls.
  • Cada chamada a submit retorna uma instância Future que está armazenada na lista futures.
  • A função as_completed espera cada chamada get_wiki_page_existence Future ser concluída para podermos imprimir seu resultado.

Se executarmos esse programa novamente com o seguinte comando:

				
					
python wiki_page_function.py

				
			

Veremos um resultado como o seguinte:

				
					
[secondary_label Output]

https://en.wikipedia.org/wiki/Island - exists

https://en.wikipedia.org/wiki/Ocean - exists

https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist

https://en.wikipedia.org/wiki/Shark - exists

				
			

Esse resultado faz sentido: 3 das URLs são páginas válidas da Wikipédia, e uma delas, a this_page_does_not_exist, não é. Observe que seu resultado pode estar ordenado de maneira diferente do que este. A função concurrent.futures.as_completed nesse exemplo retorna resultados assim que eles estiverem disponíveis, independentemente da ordem em que as tarefas foram enviadas.

Passo 3 — Processando exceções de execuções de funções em threads

No passo anterior, get_wiki_page_existence retornou com sucesso um valor para todas as nossas invocações. Neste passo, veremos que o ThreadPoolExecutor também pode apurar exceções geradas em invocações de função em threads.

Vamos considerar o seguinte bloco de código de exemplo:

				
					
[label wiki_page_function.py]

import requests

import concurrent.futures





def get_wiki_page_existence(wiki_page_url, timeout=10):

 response = requests.get(url=wiki_page_url, timeout=timeout)



 page_status = "unknown"

 if response.status_code == 200:

 page_status = "exists"

 elif response.status_code == 404:

 page_status = "does not exist"



 return wiki_page_url + " - " + page_status





wiki_page_urls = [

 "https://en.wikipedia.org/wiki/Ocean",

 "https://en.wikipedia.org/wiki/Island",

 "https://en.wikipedia.org/wiki/this_page_does_not_exist",

 "https://en.wikipedia.org/wiki/Shark",

]

<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>

 <^>futures = []<^>

 <^>for url in wiki_page_urls:<^>

 <^>futures.append(<^>

 <^>executor.submit(<^>

 <^>get_wiki_page_existence, wiki_page_url=url, timeout=0.00001<^>

 <^>)<^>

 <^>)<^>

 <^>for future in concurrent.futures.as_completed(futures):<^>

 <^>try:<^>

 <^>print(future.result())<^>

 <^>except requests.ConnectTimeout:<^>

 <^>print("ConnectTimeout.")<^>

				
			

Este bloco de código é quase idêntico ao que usamos no Passo 2, mas possui duas diferenças chave:

  • Agora, passamos timeout=0.001 para get_wiki_page_existence. Como o pacote requests não será capaz de completar sua solicitação Web à Wikipédia em 0.00001 segundos, ele criará uma exceção ConnectTimeout.
  • Nós capturamos exceções ConnectTimeout geradas pelo future.result() e imprimimos uma string cada vez que fazemos isso.

Se executarmos o programa novamente, veremos o seguinte resultado:

				
					
[secondary_label Output]

ConnectTimeout.

ConnectTimeout.

ConnectTimeout.

ConnectTimeout.

				
			

Quatro mensagens ConnectTimeout são impressas — uma para cada uma de nossas quatro wiki_page_urls, uma vez que nenhuma delas pôde ser concluída em 0.00001 segundos e cada uma das quatro chamadas get_wiki_page_existence gerou a exceção ConnectTimeout.

Agora, você viu que se uma chamada de função submetida a um ThreadPoolExecutor gera uma exceção, então essa exceção pode ser apurada normalmente chamando o Future.result. Chamar o Future.result em todas as suas invocações enviadas garante que seu programa não perca nenhuma exceção gerada em sua função em threads.

Passo 4 — Comparando o tempo de execução com e sem threads

Agora, vamos verificar se usar o ThreadPoolExecutor realmente torna seu programa mais rápido.

Primeiro, vamos cronometrar o get_wiki_page_existence se executarmos ele sem threads:

				
					
[label wiki_page_function.py]

<^>import time<^>

import requests

import concurrent.futures





def get_wiki_page_existence(wiki_page_url, timeout=10):

 response = requests.get(url=wiki_page_url, timeout=timeout)



 page_status = "unknown"

 if response.status_code == 200:

 page_status = "exists"

 elif response.status_code == 404:

 page_status = "does not exist"



 return wiki_page_url + " - " + page_status



<^>wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]<^>



<^>print("Running without threads:")<^>

<^>without_threads_start = time.time()<^>

<^>for url in wiki_page_urls:<^>

 <^>print(get_wiki_page_existence(wiki_page_url=url))<^>

<^>print("Without threads time:", time.time() - without_threads_start)<^>

				
			

Nesse exemplo de código, chamamos nossa função get_wiki_page_existence com cinquenta URLs de páginas diferentes da Wikipedia uma a uma. Usamos a função time.time() para imprimir o número de segundos que nosso programa leva para ser executado.

Se executarmos esse código novamente como antes, veremos um resultado como o seguinte:

				
					
[secondary_label Output]

Running without threads:

https://en.wikipedia.org/wiki/0 - exists

https://en.wikipedia.org/wiki/1 - exists

. . .

https://en.wikipedia.org/wiki/48 - exists

https://en.wikipedia.org/wiki/49 - exists

Without threads time: 5.803015232086182

				
			

As entradas 2-47 nesse resultado foram omitidas para maior concisão.

O número de segundos impressos depois de Without threads time será diferente quando você executar o código em sua máquina – não tem problema, você só está recebendo um número que servirá como base para se comparar com uma solução que usa o ThreadPoolExecutor. Neste caso, foram ~5.803 segundos.

Vamos executar as mesmas cinquenta URLs da Wikipedia através do get_wiki_page_existence, mas desta vez usando o ThreadPoolExecutor:

				
					
[label wiki_page_function.py]

import time

import requests

import concurrent.futures





def get_wiki_page_existence(wiki_page_url, timeout=10):

 response = requests.get(url=wiki_page_url, timeout=timeout)



 page_status = "unknown"

 if response.status_code == 200:

 page_status = "exists"

 elif response.status_code == 404:

 page_status = "does not exist"



 return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]



<^>print("Running threaded:")<^>

<^>threaded_start = time.time()<^>

<^>with concurrent.futures.ThreadPoolExecutor() as executor:<^>

 <^>futures = []<^>

 <^>for url in wiki_page_urls:<^>

 <^>futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))<^>

 <^>for future in concurrent.futures.as_completed(futures):<^>

 <^>print(future.result())<^>

<^>print("Threaded time:", time.time() - threaded_start)<^>

				
			

O código é o mesmo que criamos no Passo 2, apenas com a adição de algumas declarações de impressão que nos mostram o número de segundos que o nosso código leva para ser executado.

Se executarmos o programa novamente, veremos o seguinte:

				
					
[secondary_label Output]

Running threaded:

https://en.wikipedia.org/wiki/1 - exists

https://en.wikipedia.org/wiki/0 - exists

. . .

https://en.wikipedia.org/wiki/48 - exists

https://en.wikipedia.org/wiki/49 - exists

Threaded time: 1.2201685905456543

				
			

Novamente, o número de segundos impressos após Threaded time será diferente em seu computador (assim como a ordem do seu resultado).

Agora, compare o tempo de execução para obter as cinquenta URLs de páginas da Wikipédia com e sem threads.

Na máquina usada neste tutorial, o processo sem threads levou ~5.803 segundos e com threads levou ~1.220 segundos. Nosso programa foi executado de maneira significativamente mais rápida com threads.

Conclusão

Neste tutorial, você aprendeu como usar o utilitário ThreadPoolExecutor em Python 3 para executar eficientemente códigos limitados por E/S. Você criou uma função adequada à invocação dentro de threads, aprendeu como recuperar tanto o resultado quanto as exceções de execuções em threads dessa função e observou o ganho de desempenho obtido usando threads.

A partir daqui, você pode aprender mais sobre outras funções de simultaneidade oferecidas pelo módulo concurrent.futures.