Pertanyaan Bagaimana cara mendapatkan ThreadPoolExecutor untuk meningkatkan thread ke max sebelum antrian?


Saya sudah frustasi untuk beberapa waktu dengan perilaku default ThreadPoolExecutor yang mendukung ExecutorService ulir-kolam yang begitu banyak dari kita gunakan. Mengutip dari Javadocs:

Jika ada lebih dari corePoolSize tetapi kurang dari thread maximumPoolSize yang berjalan, thread baru akan dibuat hanya jika antrian penuh.

Apa artinya ini adalah bahwa jika Anda menentukan kolam thread dengan kode berikut, itu akan tak pernah mulai utas ke - 2 karena LinkedBlockingQueue tidak dibatasi.

ExecutorService threadPool =
    new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Hanya jika Anda memiliki dibatasi antrian dan antrean penuh ada benang apa pun di atas nomor inti yang dimulai. Saya menduga sejumlah besar programmer Java multithreaded junior tidak menyadari perilaku ini ThreadPoolExecutor.

Sekarang saya memiliki kasus penggunaan khusus di mana ini tidak optimal. Saya mencari cara, tanpa menulis kelas TPE saya sendiri, untuk mengatasinya.

Persyaratan saya adalah untuk layanan web yang membuat panggilan balik ke pihak ketiga yang mungkin tidak dapat diandalkan.

  • Saya tidak ingin melakukan panggilan balik serentak dengan permintaan web, jadi saya ingin menggunakan rangkaian ulir.
  • Saya biasanya mendapatkan beberapa menit ini jadi saya tidak ingin memiliki newFixedThreadPool(...) dengan sejumlah besar utas yang sebagian besar tidak aktif.
  • Sering kali saya mendapatkan lonjakan lalu lintas ini dan saya ingin menaikan jumlah untaian ke beberapa nilai maksimum (misalkan 50).
  • Saya harus membuat terbaik mencoba untuk melakukan semua callback jadi saya ingin memasukkan tambahan di atas 50. Saya tidak ingin membanjiri server web saya yang lain dengan menggunakan newCachedThreadPool().

Bagaimana saya bisa mengatasi keterbatasan ini di ThreadPoolExecutor di mana antrean harus dibatasi dan penuh sebelum lebih banyak benang akan dimulai? Bagaimana saya bisa mendapatkannya untuk memulai lebih banyak thread sebelum tugas antrian?

Edit:

@Flavio membuat poin bagus tentang menggunakan ThreadPoolExecutor.allowCoreThreadTimeOut(true) untuk memiliki batas waktu utas inti dan keluar. Saya menganggap itu tapi saya masih menginginkan fitur core-threads. Saya tidak ingin jumlah utas di kolam jatuh di bawah ukuran inti jika memungkinkan.


76
2017-10-22 21:02


asal


Jawaban:


Bagaimana saya bisa mengatasi keterbatasan ini di ThreadPoolExecutor di mana antrean harus dibatasi dan penuh sebelum lebih banyak rangkaian akan dimulai.

Saya percaya saya akhirnya menemukan solusi yang agak elegan (mungkin sedikit hacky) untuk pembatasan ini dengan ThreadPoolExecutor. Ini melibatkan perpanjangan LinkedBlockingQueue untuk memilikinya kembali false untuk queue.offer(...) ketika sudah ada beberapa tugas yang diantrekan. Jika rangkaian saat ini tidak mengikuti tugas antrian, TPE akan menambahkan utas tambahan. Jika pool sudah di max thread, maka RejectedExecutionHandler akan dipanggil. Ini adalah handler yang kemudian melakukan put(...) ke dalam antrian.

Tentunya aneh untuk menulis antrian di mana offer(...) dapat kembali false dan put() tidak pernah memblokir jadi itu bagian hack. Tetapi ini bekerja dengan baik dengan penggunaan TPE antrian jadi saya tidak melihat ada masalah dengan melakukan hal ini.

Begini kodenya:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Dengan mekanisme ini, ketika saya mengirimkan tugas ke antrian, ThreadPoolExecutor akan:

  1. Skala jumlah benang hingga ukuran inti awalnya (di sini 1).
  2. Tawarkan ke antrean. Jika antrian kosong maka akan antri untuk ditangani oleh utas yang ada.
  3. Jika antrian memiliki 1 atau lebih elemen, maka offer(...) akan kembali salah.
  4. Jika false dikembalikan, naikkan jumlah untaian di kolam hingga mencapai angka maksimal (di sini 50).
  5. Jika di max kemudian memanggil RejectedExecutionHandler
  6. Itu RejectedExecutionHandler kemudian masukkan tugas ke antrian untuk diproses oleh utas pertama yang tersedia dalam urutan FIFO.

Meskipun dalam contoh kode saya di atas, antrean tidak dibatasi, Anda juga dapat mendefinisikannya sebagai antrean yang dibatasi. Misalnya, jika Anda menambahkan kapasitas 1000 ke LinkedBlockingQueue maka itu akan:

  1. skala utas maksimal
  2. kemudian antri hingga penuh dengan 1000 tugas
  3. kemudian blokir penelepon hingga ruang tersedia untuk antrean.

Selain itu, jika Anda benar-benar perlu menggunakannya offer(...) dalam RejectedExecutionHandler maka Anda bisa menggunakan offer(E, long, TimeUnit) metode sebaliknya dengan Long.MAX_VALUE sebagai batas waktu.

Edit:

Saya sudah tweak saya offer(...) metode mengganti tanggapan per @Ralf. Ini hanya akan meningkatkan jumlah utas di kolam jika mereka tidak mengikuti beban.

Edit:

Tweak lain untuk jawaban ini adalah untuk benar-benar menanyakan TPE jika ada utas yang menganggur dan hanya enqueue item jika ada. Anda harus membuat kelas yang benar untuk ini dan menambahkan ourQueue.setThreadPoolExecutor(tpe); metode di atasnya.

Maka Anda offer(...) metode mungkin terlihat seperti ini:

  1. Periksa untuk melihat apakah tpe.getPoolSize() == tpe.getMaximumPoolSize() dalam hal ini sebut saja super.offer(...).
  2. Lain jika tpe.getPoolSize() > tpe.getActiveCount() lalu telepon super.offer(...) karena sepertinya ada utas yang menganggur.
  3. Jika tidak, kembalilah false untuk membuat utas lagi.

Mungkin ini:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Perhatikan bahwa metode get pada TPE mahal karena mereka mengakses volatile sawah atau (dalam kasus getActiveCount()) kunci TPE dan jalankan daftar-thread. Selain itu, ada kondisi balapan di sini yang dapat menyebabkan tugas menjadi tidak layak atau utas benang lain ketika ada utas yang tidak aktif.


35
2017-10-22 21:22



Tetapkan ukuran inti dan ukuran maksimum ke nilai yang sama, dan biarkan inti utas dihapus dari kolam dengan allowCoreThreadTimeOut(true).


21
2018-06-30 15:37



Saya sudah punya dua jawaban lain untuk pertanyaan ini, tetapi saya menduga ini adalah yang terbaik.

Ini didasarkan pada teknik jawaban yang diterima saat ini, yaitu:

  1. Ganti antrean offer() metode untuk (terkadang) mengembalikan false,
  2. yang menyebabkan ThreadPoolExecutor baik menelurkan thread baru atau menolak tugas, dan
  3. mengatur RejectedExecutionHandler untuk sebenarnya antri tugas pada penolakan.

Masalahnya adalah kapan offer() harus mengembalikan salah. Jawaban yang diterima saat ini mengembalikan nilai false ketika antrean memiliki beberapa tugas di dalamnya, tetapi seperti yang saya tunjukkan dalam komentar saya di sana, ini menyebabkan efek yang tidak diinginkan. Bergantian, jika Anda selalu salah, Anda akan terus memunculkan utas baru bahkan ketika ada antrean yang menunggu antrean.

Solusinya adalah dengan menggunakan Java 7 LinkedTransferQueue dan miliki offer() panggilan tryTransfer(). Ketika ada benang konsumen menunggu, tugas itu hanya akan diteruskan ke utas itu. Jika tidak, offer() akan mengembalikan false dan ThreadPoolExecutor akan menelurkan benang baru.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

14
2017-11-22 19:43



Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain.

Berikut ini adalah versi yang terasa lebih mudah: Meningkatkan corePoolSize (hingga batas maksimumKoolSize) setiap kali tugas baru dijalankan, kemudian menurunkan corePoolSize (ke batas pengguna yang ditentukan "ukuran inti kolam") kapan pun tugas selesai.

Untuk meletakkannya dengan cara lain, lacak jumlah tugas yang berjalan atau dikerjakan, dan pastikan bahwa corePoolSize sama dengan jumlah tugas selama itu antara pengguna yang ditentukan "ukuran kolam inti" dan maksimumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Seperti yang tertulis, kelas tidak mendukung perubahan pengguna yang ditentukan corePoolSize atau maximumPoolSize setelah konstruksi, dan tidak mendukung manipulasi antrian kerja secara langsung atau melalui remove() atau purge().


7
2017-10-23 10:15



Kami memiliki subkelas ThreadPoolExecutor yang membutuhkan tambahan creationThreshold dan menimpa execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

mungkin itu juga membantu, tetapi milikmu terlihat lebih berseni tentu saja ...


5
2018-06-20 14:02



Jawaban yang disarankan hanya menyelesaikan satu (1) masalah dengan rangkaian thread JDK:

  1. Kolam thread JDK bias terhadap antrian. Jadi alih-alih memunculkan utas baru, mereka akan mengantre tugas. Hanya jika antrian mencapai batasnya akan kolam benang menelurkan sebuah thread baru.

  2. Penarikan benang tidak terjadi ketika beban mencerahkan. Sebagai contoh jika kita memiliki semburan pekerjaan yang memukul pool yang menyebabkan pool pergi ke max, diikuti dengan beban ringan dari max 2 tugas pada suatu waktu, pool akan menggunakan semua thread untuk melayani beban ringan yang mencegah thread pensiun. (hanya 2 utas yang diperlukan ...)

Tidak senang dengan perilaku di atas, saya pergi ke depan dan menerapkan kolam untuk mengatasi kekurangan di atas.

Untuk menyelesaikan 2) Menggunakan penjadwalan Lifo menyelesaikan masalah. Ide ini disajikan oleh Ben Maurer pada ACM applicative 2015 conference: Sistem @ skala Facebook 

Jadi implementasi baru lahir:

LifoThreadPoolExecutorSQP

Sejauh ini implementasi ini meningkatkan kinerja eksekusi async untuk ZEL.

Implementasinya adalah spin yang mampu mengurangi overhead switch konteks, menghasilkan kinerja yang unggul untuk kasus penggunaan tertentu.

Semoga ini membantu ...

PS: JDK Fork Gabung Kolam menerapkan ExecutorService dan bekerja sebagai kolam thread "normal", Implementasi performant, Menggunakan penjadwalan LIFO Thread, namun tidak ada kontrol atas ukuran antrian internal, batas waktu pensiun ...


3
2017-11-27 19:15



Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain.

Saya memiliki proposal lain, mengikuti ide asli mengubah antrian untuk mengembalikan salah. Dalam hal ini, semua tugas dapat memasuki antrean, tetapi setiap kali ada tugas yang dilakukan setelahnya execute(), kita mengikutinya dengan tugas no-op sentinel yang antrian menolak, menyebabkan thread baru untuk bertelur, yang akan mengeksekusi no-op segera diikuti oleh sesuatu dari antrian.

Karena benang pekerja mungkin melakukan pemungutan suara LinkedBlockingQueue untuk tugas baru, mungkin ada tugas untuk mendapatkan antrean bahkan ketika ada utas yang tersedia. Untuk menghindari pemijahan new threads bahkan ketika ada thread yang tersedia, kita perlu melacak berapa banyak thread yang menunggu tugas baru di antrian, dan hanya menelurkan thread baru ketika ada lebih banyak tugas pada antrian daripada menunggu thread.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});

1
2017-10-22 21:52



Solusi terbaik yang bisa saya pikirkan adalah memperpanjang.

ThreadPoolExecutor menawarkan beberapa metode hook: beforeExecute dan afterExecute. Dalam ekstensi Anda, Anda dapat mempertahankan penggunaan antrean yang dibatasi untuk memberi makan dalam tugas dan antrian tak terbatas kedua untuk menangani overflow. Ketika seseorang menelepon submit, Anda bisa mencoba untuk menempatkan permintaan ke antrian yang dibatasi. Jika Anda bertemu dengan pengecualian, Anda hanya menempel tugas di antrean overflow Anda. Anda kemudian bisa memanfaatkan afterExecute hook untuk melihat apakah ada sesuatu di antrean overflow setelah menyelesaikan tugas. Dengan cara ini, eksekutor akan mengurus barang-barang di antrean dibatasi pertama, dan secara otomatis menarik dari antrian tak terbatas ini sebagai waktu memungkinkan.

Sepertinya lebih banyak pekerjaan daripada solusi Anda, tetapi setidaknya itu tidak melibatkan perilaku antrian yang tidak terduga. Saya juga membayangkan bahwa ada cara yang lebih baik untuk memeriksa status antrean dan utas daripada bergantung pada pengecualian, yang cukup lambat untuk dibuang.


0