001    package org.openstreetmap.gui.jmapviewer;
002    
003    //License: GPL. Copyright 2008 by Jan Peter Stotz
004    
005    import java.util.concurrent.BlockingDeque;
006    import java.util.concurrent.LinkedBlockingDeque;
007    import java.util.concurrent.TimeUnit;
008    
009    import org.openstreetmap.gui.jmapviewer.interfaces.TileJob;
010    
011    /**
012     * A generic class that processes a list of {@link Runnable} one-by-one using
013     * one or more {@link Thread}-instances. The number of instances varies between
014     * 1 and {@link #WORKER_THREAD_MAX_COUNT} (default: 8). If an instance is idle
015     * more than {@link #WORKER_THREAD_TIMEOUT} seconds (default: 30), the instance
016     * ends itself.
017     *
018     * @author Jan Peter Stotz
019     */
020    public class JobDispatcher {
021    
022        private static final JobDispatcher instance = new JobDispatcher();
023    
024        /**
025         * @return the singelton instance of the {@link JobDispatcher}
026         */
027        public static JobDispatcher getInstance() {
028            return instance;
029        }
030    
031        private JobDispatcher() {
032            addWorkerThread().firstThread = true;
033        }
034    
035        protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<TileJob>();
036    
037        public static int WORKER_THREAD_MAX_COUNT = 8;
038    
039        /**
040         * Specifies the time span in seconds that a worker thread waits for new
041         * jobs to perform. If the time span has elapsed the worker thread
042         * terminates itself. Only the first worker thread works differently, it
043         * ignores the timeout and will never terminate itself.
044         */
045        public static int WORKER_THREAD_TIMEOUT = 30;
046    
047        /**
048         * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code>
049         */
050        protected boolean modeLIFO = false;
051    
052        /**
053         * Total number of worker threads currently idle or active
054         */
055        protected int workerThreadCount = 0;
056    
057        /**
058         * Number of worker threads currently idle
059         */
060        protected int workerThreadIdleCount = 0;
061    
062        /**
063         * Just an id for identifying an worker thread instance
064         */
065        protected int workerThreadId = 0;
066    
067        /**
068         * Removes all jobs from the queue that are currently not being processed.
069         */
070        public void cancelOutstandingJobs() {
071            jobQueue.clear();
072        }
073    
074        /**
075         * Function to set the maximum number of workers for tile loading.
076         */
077        static public void setMaxWorkers(int workers) {
078            WORKER_THREAD_MAX_COUNT = workers;
079        }
080    
081        /**
082         * Function to set the LIFO/FIFO mode for tile loading job.
083         *
084         * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode
085         */
086        public void setLIFO(boolean lifo) {
087            modeLIFO = lifo;
088        }
089    
090        /**
091         * Adds a job to the queue.
092         * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile
093         * prevents skipping).
094         *
095         * @param job the the job to be added
096         */
097        public void addJob(TileJob job) {
098            try {
099                if(job.getTile() != null) {
100                    for(TileJob oldJob : jobQueue) {
101                        if(oldJob.getTile() == job.getTile()) {
102                            return;
103                        }
104                    }
105                }
106                jobQueue.put(job);
107                if (workerThreadIdleCount == 0 && workerThreadCount < WORKER_THREAD_MAX_COUNT)
108                    addWorkerThread();
109            } catch (InterruptedException e) {
110            }
111        }
112    
113        protected JobThread addWorkerThread() {
114            JobThread jobThread = new JobThread(++workerThreadId);
115            synchronized (this) {
116                workerThreadCount++;
117            }
118            jobThread.start();
119            return jobThread;
120        }
121    
122        public class JobThread extends Thread {
123    
124            Runnable job;
125            boolean firstThread = false;
126    
127            public JobThread(int threadId) {
128                super("OSMJobThread " + threadId);
129                setDaemon(true);
130                job = null;
131            }
132    
133            @Override
134            public void run() {
135                executeJobs();
136                synchronized (instance) {
137                    workerThreadCount--;
138                }
139            }
140    
141            protected void executeJobs() {
142                while (!isInterrupted()) {
143                    try {
144                        synchronized (instance) {
145                            workerThreadIdleCount++;
146                        }
147                        if(modeLIFO) {
148                            if (firstThread)
149                                job = jobQueue.takeLast();
150                            else
151                                job = jobQueue.pollLast(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS);
152                        } else {
153                            if (firstThread)
154                                job = jobQueue.take();
155                            else
156                                job = jobQueue.poll(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS);
157                        }
158                    } catch (InterruptedException e1) {
159                        return;
160                    } finally {
161                        synchronized (instance) {
162                            workerThreadIdleCount--;
163                        }
164                    }
165                    if (job == null)
166                        return;
167                    try {
168                        job.run();
169                        job = null;
170                    } catch (Exception e) {
171                        e.printStackTrace();
172                    }
173                }
174            }
175        }
176    
177    }