Pertanyaan Objek memori bersama dalam multiprocessing


Misalkan saya memiliki besar dalam array memori numpy, saya memiliki fungsi func yang mengambil array raksasa ini sebagai input (bersama dengan beberapa parameter lainnya). func dengan parameter yang berbeda dapat dijalankan secara paralel. Sebagai contoh:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Jika saya menggunakan multiprocessing library, maka array raksasa itu akan disalin beberapa kali ke dalam proses yang berbeda.

Apakah ada cara untuk membiarkan proses yang berbeda berbagi larik yang sama? Objek array ini hanya-baca dan tidak akan pernah dimodifikasi.

Apa yang lebih rumit, jika arr bukan array, tetapi objek python sewenang-wenang, apakah ada cara untuk membagikannya?

[DIMULAI]

Saya membaca jawabannya tetapi saya masih agak bingung. Karena fork () adalah copy-on-write, kita tidak boleh meminta biaya tambahan ketika memunculkan proses baru di python multiprocessing library. Tetapi kode berikut menunjukkan ada overhead besar:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

output (dan omong-omong, biaya meningkat karena ukuran array meningkat, jadi saya menduga masih ada overhead yang terkait dengan penyalinan memori):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Mengapa ada overhead yang sangat besar, jika kita tidak menyalin array? Dan bagian apa yang disimpan oleh memori bersama?


75
2018-05-23 14:20


asal


Jawaban:


Jika Anda menggunakan sistem operasi yang menggunakan copy-on-write fork() semantik (seperti unix umum), maka selama Anda tidak pernah mengubah struktur data Anda, itu akan tersedia untuk semua proses anak tanpa mengambil memori tambahan. Anda tidak perlu melakukan sesuatu yang istimewa (kecuali pastikan Anda tidak mengubah objek).

Hal yang paling efisien kamu dapat lakukan untuk masalah Anda akan mengemas array Anda ke dalam struktur array yang efisien (menggunakan numpy atau array), tempatkan itu di memori bersama, bungkus dengan multiprocessing.Array, dan berikan itu ke fungsi Anda. Jawaban ini menunjukkan cara melakukannya.

Jika Anda menginginkan sebuah dapat ditulisi objek bersama, maka Anda harus membungkusnya dengan semacam sinkronisasi atau penguncian. multiprocessing menyediakan dua metode untuk melakukan ini: satu menggunakan memori bersama (cocok untuk nilai sederhana, array, atau ctypes) atau a Manager proksi, di mana satu proses memegang memori dan seorang manajer menengahi akses ke sana dari proses lain (bahkan melalui jaringan).

Itu Manager Pendekatan dapat digunakan dengan objek Python sewenang-wenang, tetapi akan lebih lambat daripada yang setara menggunakan memori bersama karena objek perlu diserialisasi / deserialized dan dikirim antar proses.

Ada sebuah kekayaan perpustakaan dan pendekatan pemrosesan paralel yang tersedia dalam Python. multiprocessing adalah perpustakaan yang sangat baik dan berpengetahuan luas, tetapi jika Anda memiliki kebutuhan khusus mungkin salah satu pendekatan lain mungkin lebih baik.


75
2018-05-23 16:42



Saya mengalami masalah yang sama dan menulis kelas utilitas memori bersama untuk mengatasinya.

Saya menggunakan multiprocessing.RawArray (lockfree), dan juga akses ke array tidak disinkronkan sama sekali (lockfree), hati-hati jangan sampai menembak kaki Anda sendiri.

Dengan solusi, saya mendapatkan speedups dengan faktor kira-kira 3 pada quad-core i7.

Begini kodenya: Jangan ragu untuk menggunakan dan memperbaikinya, dan tolong laporkan semua bug.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

11
2018-05-15 10:55