English 中文(简体)
配有矩形锁定队列队列的线索 Polol 执行器
原标题:ThreadPoolExecutor with ArrayBlockingQueue

我开始读更多关于 Java Doc 的Tread Pool 执行器的更多信息, 当我在其中一个项目中使用它的时候。 因此,有谁能解释一下这条线实际上意味着什么? 我知道每个参数代表什么, 但我想从这里的一些专家那里 更一般地理解它。

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

Updated:- Problem Statement is:-

每条线索都使用1到1000之间独特的身份识别码, 程序必须运行60分钟或以上, 所以在60分钟之内, 所有的身份识别码都有可能完成, 所以我需要再次重新使用这些身份识别码。 因此, 下面的这个程序是我用以上执行程序写的 。

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

我的问题是:这个代码对考量业绩是否正确吗?我还能做点什么使其更准确?任何帮助都会感激不尽。

最佳回答

[首先,我道歉,这是对前一个答案的答复, 但我想格式化]。

除了在现实中, 当某个项目被提交到带完整队列的 TreadPolol 执行器时, 您没有 DON T 块 。 原因是 Tread Pool 执行器调用屏蔽 Quue. offer (T 项) 方法, 按定义它是一种非阻塞方法。 它要么添加该项目并返回真实, 要么不添加( 在完整时) 并返回虚假 。 然后, Tread Pool 执行器调用已注册的拒绝执行器来处理这种情况 。

校对:Portnoy

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

默认情况下, 使用 ThreadPolol 执行器. Abort Policy () 来从 Tread Pool 执行器的“ 上交” 或“ 执行” 方法中丢出拒绝执行的例外 。

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you re using the AbortPolicy as the 
   // RejectedExecutionHandler
}

但是,您可以使用其他处理器来做不同的事情, 比如忽略错误( 丢弃策略), 或者在称为“ execute” 或“ submit” 方法( CallerRuns Policy) 的线索中运行它。 这个示例允许在队列满后, 任何线索调用“ submit” 或“ excute” 的方法来运行所要求的任务 。 (这意味着在任何特定的时间, 您可以在池内某个 s 上方运行另外一件事情 ) :

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

如果您想要屏蔽和等待, 您可以执行您自己拒绝的操作Handler, 这将屏蔽到队列上有一个空位( 这是粗略估计, 我还没有编译或运行此选项, 但是您应该得到这个想法 ) :

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
问题回答

它创建了一个 < code> 执行器服务 , 处理一个线索库的执行。 在这种情况下, 最初和最大的线索数在此情况下都是 10 。 当一个线索在1 秒( 1 000 ms) 中闲置时, 它会杀死它( 闲置的计时器), 但是, 因为最大和核心的线索数相同, 这永远不会发生( 它总是在周围保留10 条线索, 并且不会运行超过 10 条线索 ) 。

它使用 ArrayBlocking Quue 来管理有 10 个空档的执行请求, 所以当队列满时( 在10 条线被排队后), 它会阻塞呼叫器 。

如果线索被拒绝( 在此处情况下, 原因是服务关闭, 因为线索将被排队, 或者在队列满后排队时您将被阻塞), 那么所提供的 < code> Runnable 将会在调用线上执行 。

考虑 emaphore 。 这些是用于相同目的的 。 请在下面检查使用 semaphore 的代码。 不确定这是不是你想要的。 但如果没有其他的获取许可, 这将阻碍它 。 您同样需要身份识别吗?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}

另一种解决办法是用大型超时(最多292年,可视为无限)黑入后台队列,以 offer 取代 offer ,代之以 offer (最多292年)。


// helper method
private static boolean interruptibleInfiniteOffer(BlockingQueue<Runnable> q, Runnable r) {
    try {
        return q.offer(r, Long.MAX_VALUE, TimeUnit.NANOSECONDS); // infinite == ~292 years
    } catch (InterruptedException e) {
        return false;
    }
}

// fixed size pool with blocking (instead of rejecting) if bounded queue is full
public static ThreadPoolExecutor getFixedSizePoolWithLimitedWaitingQueue(int nThreads, int maxItemsInTheQueue) {
    BlockingQueue<Runnable> queue = maxItemsInTheQueue == 0
            ? new SynchronousQueue<>() { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} }
            : new ArrayBlockingQueue<>(maxItemsInTheQueue) { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} };
    return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS, queue);
}




相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签