Pertanyaan Bagaimana cara menggunakan threading dengan Python?


Saya mencoba memahami threading dengan Python. Saya telah melihat dokumentasi dan contoh, tetapi sejujurnya, banyak contoh yang terlalu canggih dan saya kesulitan memahami mereka.

Bagaimana Anda dengan jelas menunjukkan tugas dibagi untuk multi-threading?


928
2018-05-17 04:24


asal


Jawaban:


Karena pertanyaan ini ditanyakan pada tahun 2010, telah ada penyederhanaan nyata dalam bagaimana melakukan multithreading sederhana dengan python dengan peta dan kolam.

Kode di bawah ini berasal dari postingan artikel / blog yang harus Anda periksa (tidak ada afiliasi) - Paralelisme dalam satu baris: Model yang Lebih Baik untuk Tugas Hari ke Hari. Saya akan meringkas di bawah - akhirnya hanya menjadi beberapa baris kode:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

Yang merupakan versi multithread dari:

results = []
for item in my_array:
    results.append(my_function(item))

Deskripsi

Peta adalah fungsi kecil yang keren, dan kunci untuk dengan mudah menyuntikkan paralelisme ke dalam kode Python Anda. Bagi mereka yang tidak dikenal, peta adalah sesuatu yang diangkat dari bahasa fungsional seperti Lisp. Ini adalah fungsi yang memetakan fungsi lain secara berurutan.

Peta menangani iterasi atas urutan bagi kita, menerapkan fungsi, dan menyimpan semua hasil dalam daftar yang berguna di bagian akhir.

enter image description here


Pelaksanaan

Versi paralel fungsi peta disediakan oleh dua pustaka: multiprocessing, dan juga anak tiri yang sedikit diketahui, tetapi sama-sama fantastis: multiprocessing.dummy.

multiprocessing.dummy persis sama dengan modul multiprocessing, tetapi menggunakan utas sebagai gantinya (perbedaan penting - gunakan banyak proses untuk tugas-tugas intensif CPU; utas untuk (dan selama) IO):

multiprocessing.dummy mereplikasi API multiprocessing tetapi tidak lebih dari pembungkus di sekitar modul threading.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Dan hasil waktu:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Melewati banyak argumen (Bekerja seperti ini hanya dengan Python 3.3 dan yang lebih baru):

Untuk melewati beberapa array:

results = pool.starmap(function, zip(list_a, list_b))

atau meneruskan konstanta dan larik:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Jika Anda menggunakan versi Python yang lebih lama, Anda dapat melewati banyak argumen melalui solusi ini.

(Terimakasih untuk pengguna136036 untuk komentar yang membantu)


1008
2018-02-11 19:53



Berikut ini contoh sederhana: Anda perlu mencoba beberapa URL alternatif dan mengembalikan konten yang pertama untuk merespons.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Ini adalah kasus di mana threading digunakan sebagai pengoptimalan sederhana: setiap subthread menunggu URL untuk diselesaikan dan ditanggapi, untuk memasukkan isinya ke antrian; setiap utas adalah daemon (tidak akan menjaga proses naik jika utas utama berakhir - itu lebih umum daripada tidak); utas utama memulai semua subthread, apakah a get pada antrian untuk menunggu sampai salah satu dari mereka melakukan put, lalu memancarkan hasil dan mengakhiri (yang menghapus setiap subthread yang mungkin masih berjalan, karena mereka adalah benang daemon).

Penggunaan benang dengan benar di Python selalu terhubung ke operasi I / O (karena CPython tidak menggunakan banyak core untuk menjalankan tugas-tugas yang terikat dengan CPU, satu-satunya alasan untuk threading tidak menghalangi proses sementara menunggu beberapa I / O ). Antrean hampir selalu merupakan cara terbaik untuk menata pekerjaan ke untaian dan / atau mengumpulkan hasil kerja, dengan cara, dan mereka secara intrinsik threadsafe sehingga mereka menyelamatkan Anda dari mengkhawatirkan tentang kunci, kondisi, peristiwa, semaphores, dan antar lain. koordinasi benang / konsep komunikasi.


672
2018-05-17 04:36



CATATAN: Untuk paralelisasi aktual dengan Python, Anda harus menggunakan multiprocessing modul untuk fork beberapa proses yang dilakukan secara paralel (karena kunci interpreter global, benang Python menyediakan interleaving tetapi sebenarnya dijalankan secara serial, tidak paralel, dan hanya berguna ketika interleaving I / O operasi).

Namun, jika Anda hanya mencari interleaving (atau melakukan operasi I / O yang dapat diparalelisasi meskipun ada kunci interpreter global), maka threading modul adalah tempat untuk memulai. Sebagai contoh yang sangat sederhana, mari kita pertimbangkan masalah menjumlahkan rentang besar dengan menjumlahkan subrange secara paralel:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Perhatikan bahwa contoh di atas adalah contoh yang sangat bodoh, karena sama sekali tidak ada I / O dan akan dieksekusi secara serial meskipun disisipkan (dengan tambahan biaya alih konteks) di CPython karena kunci juru bahasa global.


226
2018-05-17 04:35



Seperti yang disebutkan orang lain, CPython dapat menggunakan utas hanya untuk I \ O menunggu karena GIL. Jika Anda ingin memanfaatkan beberapa core untuk tugas-tugas yang terikat dengan CPU, gunakan multiprocessing:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

86
2018-03-08 22:22



Sekedar catatan, Antrean tidak diperlukan untuk threading.

Ini adalah contoh paling sederhana yang dapat saya bayangkan yang menunjukkan 10 proses yang berjalan bersamaan.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

84
2017-09-23 16:07



Jawaban dari Alex Martelli membantu saya, namun di sini adalah versi modifikasi yang saya pikir lebih bermanfaat (setidaknya untuk saya).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

38
2017-10-01 15:50



Saya menemukan ini sangat berguna: buat sebanyak mungkin benang sebagai inti dan biarkan mereka mengeksekusi sejumlah tugas (besar) (dalam hal ini, memanggil program shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

19
2018-06-06 23:51



Bagi saya, contoh sempurna untuk Threading adalah memantau kejadian Asynchronous. Lihatlah kode ini.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Anda dapat bermain dengan kode ini dengan membuka sesi IPython dan melakukan sesuatu seperti:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Tunggu beberapa menit

>>>a[0] = 2
Mon = 2

15
2018-04-14 04:18