001 package org.openstreetmap.gui.jmapviewer; 002 003 //License: GPL. Copyright 2008 by Jan Peter Stotz 004 005 import java.util.concurrent.BlockingDeque; 006 import java.util.concurrent.LinkedBlockingDeque; 007 import java.util.concurrent.TimeUnit; 008 009 import org.openstreetmap.gui.jmapviewer.interfaces.TileJob; 010 011 /** 012 * A generic class that processes a list of {@link Runnable} one-by-one using 013 * one or more {@link Thread}-instances. The number of instances varies between 014 * 1 and {@link #WORKER_THREAD_MAX_COUNT} (default: 8). If an instance is idle 015 * more than {@link #WORKER_THREAD_TIMEOUT} seconds (default: 30), the instance 016 * ends itself. 017 * 018 * @author Jan Peter Stotz 019 */ 020 public class JobDispatcher { 021 022 private static final JobDispatcher instance = new JobDispatcher(); 023 024 /** 025 * @return the singelton instance of the {@link JobDispatcher} 026 */ 027 public static JobDispatcher getInstance() { 028 return instance; 029 } 030 031 private JobDispatcher() { 032 addWorkerThread().firstThread = true; 033 } 034 035 protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<TileJob>(); 036 037 public static int WORKER_THREAD_MAX_COUNT = 8; 038 039 /** 040 * Specifies the time span in seconds that a worker thread waits for new 041 * jobs to perform. If the time span has elapsed the worker thread 042 * terminates itself. Only the first worker thread works differently, it 043 * ignores the timeout and will never terminate itself. 044 */ 045 public static int WORKER_THREAD_TIMEOUT = 30; 046 047 /** 048 * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code> 049 */ 050 protected boolean modeLIFO = false; 051 052 /** 053 * Total number of worker threads currently idle or active 054 */ 055 protected int workerThreadCount = 0; 056 057 /** 058 * Number of worker threads currently idle 059 */ 060 protected int workerThreadIdleCount = 0; 061 062 /** 063 * Just an id for identifying an worker thread instance 064 */ 065 protected int workerThreadId = 0; 066 067 /** 068 * Removes all jobs from the queue that are currently not being processed. 069 */ 070 public void cancelOutstandingJobs() { 071 jobQueue.clear(); 072 } 073 074 /** 075 * Function to set the maximum number of workers for tile loading. 076 */ 077 static public void setMaxWorkers(int workers) { 078 WORKER_THREAD_MAX_COUNT = workers; 079 } 080 081 /** 082 * Function to set the LIFO/FIFO mode for tile loading job. 083 * 084 * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode 085 */ 086 public void setLIFO(boolean lifo) { 087 modeLIFO = lifo; 088 } 089 090 /** 091 * Adds a job to the queue. 092 * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile 093 * prevents skipping). 094 * 095 * @param job the the job to be added 096 */ 097 public void addJob(TileJob job) { 098 try { 099 if(job.getTile() != null) { 100 for(TileJob oldJob : jobQueue) { 101 if(oldJob.getTile() == job.getTile()) { 102 return; 103 } 104 } 105 } 106 jobQueue.put(job); 107 if (workerThreadIdleCount == 0 && workerThreadCount < WORKER_THREAD_MAX_COUNT) 108 addWorkerThread(); 109 } catch (InterruptedException e) { 110 } 111 } 112 113 protected JobThread addWorkerThread() { 114 JobThread jobThread = new JobThread(++workerThreadId); 115 synchronized (this) { 116 workerThreadCount++; 117 } 118 jobThread.start(); 119 return jobThread; 120 } 121 122 public class JobThread extends Thread { 123 124 Runnable job; 125 boolean firstThread = false; 126 127 public JobThread(int threadId) { 128 super("OSMJobThread " + threadId); 129 setDaemon(true); 130 job = null; 131 } 132 133 @Override 134 public void run() { 135 executeJobs(); 136 synchronized (instance) { 137 workerThreadCount--; 138 } 139 } 140 141 protected void executeJobs() { 142 while (!isInterrupted()) { 143 try { 144 synchronized (instance) { 145 workerThreadIdleCount++; 146 } 147 if(modeLIFO) { 148 if (firstThread) 149 job = jobQueue.takeLast(); 150 else 151 job = jobQueue.pollLast(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS); 152 } else { 153 if (firstThread) 154 job = jobQueue.take(); 155 else 156 job = jobQueue.poll(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS); 157 } 158 } catch (InterruptedException e1) { 159 return; 160 } finally { 161 synchronized (instance) { 162 workerThreadIdleCount--; 163 } 164 } 165 if (job == null) 166 return; 167 try { 168 job.run(); 169 job = null; 170 } catch (Exception e) { 171 e.printStackTrace(); 172 } 173 } 174 } 175 } 176 177 }